From ea50d5bac547bbab28fbf085f180d3439f280091 Mon Sep 17 00:00:00 2001 From: shreddedbacon Date: Mon, 26 Feb 2024 19:44:27 +1100 Subject: [PATCH 1/2] feat: support cancelling restores --- internal/helpers/helper_types.go | 32 +++++ internal/messenger/consumer.go | 46 ++++++- internal/messenger/tasks_restore.go | 165 +++++++++++++++++++------- test/e2e/e2e_test.go | 30 +++++ test/e2e/testdata/cancel-restore.json | 17 +++ 5 files changed, 247 insertions(+), 43 deletions(-) create mode 100644 test/e2e/testdata/cancel-restore.json diff --git a/internal/helpers/helper_types.go b/internal/helpers/helper_types.go index 7718518b..0699210a 100644 --- a/internal/helpers/helper_types.go +++ b/internal/helpers/helper_types.go @@ -1,5 +1,13 @@ package helpers +import ( + "context" + + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + // LagoonEnvironmentVariable is used to define Lagoon environment variables. type LagoonEnvironmentVariable struct { Name string `json:"name"` @@ -15,3 +23,27 @@ type LagoonAPIConfiguration struct { SSHHost string SSHPort string } + +func K8UPVersions(ctx context.Context, c client.Client) (bool, bool, error) { + k8upv1alpha1Exists := false + k8upv1Exists := false + crdv1alpha1 := &apiextensionsv1.CustomResourceDefinition{} + if err := c.Get(context.TODO(), types.NamespacedName{Name: "restores.backup.appuio.ch"}, crdv1alpha1); err != nil { + if err := IgnoreNotFound(err); err != nil { + return k8upv1alpha1Exists, k8upv1Exists, err + } + } + if crdv1alpha1.ObjectMeta.Name == "restores.backup.appuio.ch" { + k8upv1alpha1Exists = true + } + crdv1 := &apiextensionsv1.CustomResourceDefinition{} + if err := c.Get(context.TODO(), types.NamespacedName{Name: "restores.k8up.io"}, crdv1); err != nil { + if err := IgnoreNotFound(err); err != nil { + return k8upv1alpha1Exists, k8upv1Exists, err + } + } + if crdv1.ObjectMeta.Name == "restores.k8up.io" { + k8upv1Exists = true + } + return k8upv1alpha1Exists, k8upv1Exists, nil +} diff --git a/internal/messenger/consumer.go b/internal/messenger/consumer.go index aa1b4d71..2c1ab8f1 100644 --- a/internal/messenger/consumer.go +++ b/internal/messenger/consumer.go @@ -320,6 +320,17 @@ func (m *Messenger) Consumer(targetName string) { } } case "deploytarget:restic:backup:restore", "kubernetes:restic:backup:restore": + v1alpha1, v1, err := helpers.K8UPVersions(ctx, m.Client) + if err != nil { + //@TODO: send msg back to lagoon and update task to failed? + message.Ack(false) // ack to remove from queue + return + } + if !v1alpha1 && !v1 { + // k8up not installed + message.Ack(false) // ack to remove from queue + return + } opLog.Info( fmt.Sprintf( "Received backup restoration for project %s, environment %s", @@ -327,7 +338,7 @@ func (m *Messenger) Consumer(targetName string) { jobSpec.Environment.Name, ), ) - err := m.ResticRestore(m.genNamespace(jobSpec), jobSpec) + err = m.ResticRestore(ctx, m.genNamespace(jobSpec), jobSpec, v1alpha1, v1, false) if err != nil { opLog.Error(err, fmt.Sprintf( @@ -340,6 +351,39 @@ func (m *Messenger) Consumer(targetName string) { _ = message.Ack(false) // ack to remove from queue return } + case "deploytarget:restic:cancel:restore": + v1alpha1, v1, err := helpers.K8UPVersions(ctx, m.Client) + if err != nil { + //@TODO: send msg back to lagoon and update task to failed? + message.Ack(false) // ack to remove from queue + return + } + if !v1alpha1 && !v1 { + // k8up not installed + message.Ack(false) // ack to remove from queue + return + } + // if this is a request to cancel a restore attempt + opLog.Info( + fmt.Sprintf( + "Received restore cancellation for project %s, environment %s", + jobSpec.Project.Name, + jobSpec.Environment.Name, + ), + ) + err = m.ResticRestore(ctx, m.genNamespace(jobSpec), jobSpec, v1alpha1, v1, true) + if err != nil { + opLog.Error(err, + fmt.Sprintf( + "Cancel restore for project %s, environment %s failed", + jobSpec.Project.Name, + jobSpec.Environment.Name, + ), + ) + //@TODO: send msg back to lagoon and update task to failed? + message.Ack(false) // ack to remove from queue + return + } case "deploytarget:route:migrate", "kubernetes:route:migrate", "openshift:route:migrate": opLog.Info( fmt.Sprintf( diff --git a/internal/messenger/tasks_restore.go b/internal/messenger/tasks_restore.go index 28dd284d..0c1b90e0 100644 --- a/internal/messenger/tasks_restore.go +++ b/internal/messenger/tasks_restore.go @@ -6,18 +6,24 @@ import ( "fmt" "github.com/go-logr/logr" + "github.com/uselagoon/machinery/api/schema" lagoonv1beta2 "github.com/uselagoon/remote-controller/api/lagoon/v1beta2" "github.com/uselagoon/remote-controller/internal/helpers" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" k8upv1 "github.com/k8up-io/k8up/v2/api/v1" k8upv1alpha1 "github.com/vshn/k8up/api/v1alpha1" - apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" - "k8s.io/apimachinery/pkg/types" ) +type cancelRestore struct { + RestoreName string `json:"restoreName"` + BackupID string `json:"backupId"` +} + // ResticRestore handles creating the restic restore jobs. -func (m *Messenger) ResticRestore(namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error { +func (m *Messenger) ResticRestore(ctx context.Context, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec, v1alpha1, v1, cancel bool) error { opLog := ctrl.Log.WithName("handlers").WithName("LagoonTasks") vers, err := checkRestoreVersionFromCore(jobSpec.Misc.MiscResource) if err != nil { @@ -30,51 +36,42 @@ func (m *Messenger) ResticRestore(namespace string, jobSpec *lagoonv1beta2.Lagoo // just log the error then return return nil } - // check if k8up crds exist in the cluster - k8upv1alpha1Exists := false - k8upv1Exists := false - crdv1alpha1 := &apiextensionsv1.CustomResourceDefinition{} - if err = m.Client.Get(context.TODO(), types.NamespacedName{Name: "restores.backup.appuio.ch"}, crdv1alpha1); err != nil { - if err := helpers.IgnoreNotFound(err); err != nil { - return err - } - } - if crdv1alpha1.Name == "restores.backup.appuio.ch" { - k8upv1alpha1Exists = true - } - crdv1 := &apiextensionsv1.CustomResourceDefinition{} - if err = m.Client.Get(context.TODO(), types.NamespacedName{Name: "restores.k8up.io"}, crdv1); err != nil { - if err := helpers.IgnoreNotFound(err); err != nil { - return err - } - } - if crdv1.Name == "restores.k8up.io" { - k8upv1Exists = true - } + + handlev1alpha1 := false + handlev1 := false // check the version, if there is no version in the payload, assume it is k8up v2 if m.SupportK8upV2 { if vers == "backup.appuio.ch/v1alpha1" { - if k8upv1alpha1Exists { - return m.createv1alpha1Restore(opLog, namespace, jobSpec) + if v1alpha1 { + handlev1alpha1 = true } } else { - if k8upv1Exists { - if err := m.createv1Restore(opLog, namespace, jobSpec); err != nil { - return err - } + if v1 { + handlev1 = true } else { - if k8upv1alpha1Exists { - if err := m.createv1alpha1Restore(opLog, namespace, jobSpec); err != nil { - return err - } + if v1alpha1 { + handlev1alpha1 = true } } } } else { - if k8upv1alpha1Exists { - if err := m.createv1alpha1Restore(opLog, namespace, jobSpec); err != nil { - return err - } + if v1alpha1 { + handlev1alpha1 = true + } + } + + if handlev1alpha1 { + if cancel { + return m.cancelv1alpha1Restore(ctx, opLog, namespace, jobSpec) + } else { + return m.createv1alpha1Restore(ctx, opLog, namespace, jobSpec) + } + } + if handlev1 { + if cancel { + return m.cancelv1Restore(ctx, opLog, namespace, jobSpec) + } else { + return m.createv1Restore(ctx, opLog, namespace, jobSpec) } } return nil @@ -96,7 +93,7 @@ func checkRestoreVersionFromCore(resource []byte) (string, error) { } // createv1alpha1Restore will create a restore task using the restores.backup.appuio.ch v1alpha1 api (k8up v1) -func (m *Messenger) createv1alpha1Restore(opLog logr.Logger, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error { +func (m *Messenger) createv1alpha1Restore(ctx context.Context, opLog logr.Logger, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error { restorev1alpha1 := &k8upv1alpha1.Restore{} if err := json.Unmarshal(jobSpec.Misc.MiscResource, restorev1alpha1); err != nil { opLog.Error(err, @@ -108,7 +105,7 @@ func (m *Messenger) createv1alpha1Restore(opLog logr.Logger, namespace string, j return err } restorev1alpha1.SetNamespace(namespace) - if err := m.Client.Create(context.Background(), restorev1alpha1); err != nil { + if err := m.Client.Create(ctx, restorev1alpha1); err != nil { opLog.Error(err, fmt.Sprintf( "Unable to create restore %s with k8up v1alpha1 api.", @@ -121,7 +118,7 @@ func (m *Messenger) createv1alpha1Restore(opLog logr.Logger, namespace string, j } // createv1Restore will create a restore task using the restores.k8up.io v1 api (k8up v2) -func (m *Messenger) createv1Restore(opLog logr.Logger, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error { +func (m *Messenger) createv1Restore(ctx context.Context, opLog logr.Logger, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error { restorev1 := &k8upv1.Restore{} if err := json.Unmarshal(jobSpec.Misc.MiscResource, restorev1); err != nil { opLog.Error(err, @@ -133,7 +130,7 @@ func (m *Messenger) createv1Restore(opLog logr.Logger, namespace string, jobSpec return err } restorev1.SetNamespace(namespace) - if err := m.Client.Create(context.Background(), restorev1); err != nil { + if err := m.Client.Create(ctx, restorev1); err != nil { opLog.Error(err, fmt.Sprintf( "Unable to create restore %s with k8up v1 api.", @@ -144,3 +141,87 @@ func (m *Messenger) createv1Restore(opLog logr.Logger, namespace string, jobSpec } return nil } + +// cancelv1alpha1Restore will attempt to cancel a restore task using the restores.backup.appuio.ch v1alpha1 api (k8up v1) +func (m *Messenger) cancelv1alpha1Restore(ctx context.Context, opLog logr.Logger, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error { + restorev1alpha1 := &k8upv1alpha1.Restore{} + cr := &cancelRestore{} + if err := json.Unmarshal(jobSpec.Misc.MiscResource, &cr); err != nil { + return err + } + if err := m.Client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: cr.RestoreName}, restorev1alpha1); helpers.IgnoreNotFound(err) != nil { + opLog.Error(err, + fmt.Sprintf( + "Unable to get restore %s with k8up v1alpha1 api.", + cr.RestoreName, + ), + ) + return err + } + if restorev1alpha1.Name != "" { + if err := m.Client.Delete(ctx, restorev1alpha1); err != nil { + opLog.Error(err, + fmt.Sprintf( + "Unable to delete restore %s with k8up v1alpha1 api.", + cr.RestoreName, + ), + ) + return err + } + } + // if no matching restore found, or the restore is deleted, send the cancellation message back to core + m.pubRestoreCancel(opLog, namespace, cr.RestoreName, jobSpec) + return nil +} + +// cancelv1Restore will attempt to cancel a restore task using the restores.k8up.io v1 api (k8up v2) +func (m *Messenger) cancelv1Restore(ctx context.Context, opLog logr.Logger, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error { + restorev1 := &k8upv1.Restore{} + cr := &cancelRestore{} + if err := json.Unmarshal(jobSpec.Misc.MiscResource, &cr); err != nil { + return err + } + if err := m.Client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: cr.RestoreName}, restorev1); helpers.IgnoreNotFound(err) != nil { + opLog.Error(err, + fmt.Sprintf( + "Unable to get restore %s with k8up v1 api.", + cr.RestoreName, + ), + ) + return err + } + if restorev1.Name != "" { + if err := m.Client.Delete(ctx, restorev1); err != nil { + opLog.Error(err, + fmt.Sprintf( + "Unable to delete restore %s with k8up v1alpha1 api.", + cr.RestoreName, + ), + ) + return err + } + } + // if no matching restore found, or the restore is deleted, send the cancellation message back to core + m.pubRestoreCancel(opLog, namespace, cr.RestoreName, jobSpec) + return nil +} + +func (m *Messenger) pubRestoreCancel(opLog logr.Logger, namespace, restorename string, jobSpec *lagoonv1beta2.LagoonTaskSpec) { + msg := schema.LagoonMessage{ + Type: "restore:cancel", + Namespace: namespace, + Meta: &schema.LagoonLogMeta{ + Environment: jobSpec.Environment.Name, + Project: jobSpec.Project.Name, + JobName: restorename, + }, + } + msgBytes, err := json.Marshal(msg) + if err != nil { + opLog.Error(err, "Unable to encode message as JSON") + } + // publish the cancellation result back to lagoon + if err := m.Publish("lagoon-tasks:controller", msgBytes); err != nil { + opLog.Error(err, "Unable to publish message.") + } +} diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index a1645e77..99e08654 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -441,6 +441,36 @@ var _ = Describe("controller", Ordered, func() { Expect(strings.TrimSpace(string(result))).To(Equal(string(testResult))) } + By("validating that restore cancellations are working") + By("creating a restore cancellation task via rabbitmq") + cmd = exec.Command( + "curl", + "-s", + "-u", + "guest:guest", + "-H", + "'Accept: application/json'", + "-H", + "'Content-Type:application/json'", + "-X", + "POST", + "-d", + "@test/e2e/testdata/cancel-restore.json", + "http://172.17.0.1:15672/api/exchanges/%2f/lagoon-tasks/publish", + ) + _, err = utils.Run(cmd) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + + time.Sleep(10 * time.Second) + + By("validating that the restore is deleted") + cmd = exec.Command("kubectl", "get", + "restores.k8up.io", "restore-bf072a0-uqxqo4", + "-n", "nginx-example-main", + ) + _, err = utils.Run(cmd) + ExpectWithOffset(1, err).To(HaveOccurred()) + By("validating that the harbor robot credentials get rotated successfully") cmd = exec.Command(utils.Kubectl(), "get", "pods", "-l", "control-plane=controller-manager", diff --git a/test/e2e/testdata/cancel-restore.json b/test/e2e/testdata/cancel-restore.json new file mode 100644 index 00000000..73edf713 --- /dev/null +++ b/test/e2e/testdata/cancel-restore.json @@ -0,0 +1,17 @@ +{"properties":{"delivery_mode":2},"routing_key":"ci-local-controller-kubernetes:misc", + "payload":"{ + \"misc\":{ + \"miscResource\":\"eyJyZXN0b3JlTmFtZSI6InJlc3RvcmUtYmYwNzJhMC11cXhxbzQiLCJiYWNrdXBJZCI6ImJmMDcyYTA5ZTE3NzI2ZGE1NGFkYzc5OTM2ZWM4NzQ1NTIxOTkzNTk5ZDQxMjExZGZjOTQ2NmRmZDViYzMyYTUifQ==\" + }, + \"key\":\"deploytarget:restic:cancel:restore\", + \"environment\":{ + \"name\":\"main\", + \"openshiftProjectName\":\"nginx-example-main\" + }, + \"project\":{ + \"name\":\"nginx-example\" + }, + \"advancedTask\":{} + }", +"payload_encoding":"string" +} \ No newline at end of file From 8e38346f9bca1ed612450bba0d17a57207589c1a Mon Sep 17 00:00:00 2001 From: shreddedbacon Date: Thu, 20 Nov 2025 10:15:17 +1100 Subject: [PATCH 2/2] fix: linting issues --- internal/helpers/helper_types.go | 4 ++-- internal/messenger/consumer.go | 16 ++++++++-------- internal/messenger/tasks_restore.go | 6 ++---- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/internal/helpers/helper_types.go b/internal/helpers/helper_types.go index 0699210a..b2f19470 100644 --- a/internal/helpers/helper_types.go +++ b/internal/helpers/helper_types.go @@ -33,7 +33,7 @@ func K8UPVersions(ctx context.Context, c client.Client) (bool, bool, error) { return k8upv1alpha1Exists, k8upv1Exists, err } } - if crdv1alpha1.ObjectMeta.Name == "restores.backup.appuio.ch" { + if crdv1alpha1.Name == "restores.backup.appuio.ch" { k8upv1alpha1Exists = true } crdv1 := &apiextensionsv1.CustomResourceDefinition{} @@ -42,7 +42,7 @@ func K8UPVersions(ctx context.Context, c client.Client) (bool, bool, error) { return k8upv1alpha1Exists, k8upv1Exists, err } } - if crdv1.ObjectMeta.Name == "restores.k8up.io" { + if crdv1.Name == "restores.k8up.io" { k8upv1Exists = true } return k8upv1alpha1Exists, k8upv1Exists, nil diff --git a/internal/messenger/consumer.go b/internal/messenger/consumer.go index 2c1ab8f1..65553243 100644 --- a/internal/messenger/consumer.go +++ b/internal/messenger/consumer.go @@ -322,13 +322,13 @@ func (m *Messenger) Consumer(targetName string) { case "deploytarget:restic:backup:restore", "kubernetes:restic:backup:restore": v1alpha1, v1, err := helpers.K8UPVersions(ctx, m.Client) if err != nil { - //@TODO: send msg back to lagoon and update task to failed? - message.Ack(false) // ack to remove from queue + // @TODO: send msg back to lagoon and update task to failed? + _ = message.Ack(false) // ack to remove from queue return } if !v1alpha1 && !v1 { // k8up not installed - message.Ack(false) // ack to remove from queue + _ = message.Ack(false) // ack to remove from queue return } opLog.Info( @@ -354,13 +354,13 @@ func (m *Messenger) Consumer(targetName string) { case "deploytarget:restic:cancel:restore": v1alpha1, v1, err := helpers.K8UPVersions(ctx, m.Client) if err != nil { - //@TODO: send msg back to lagoon and update task to failed? - message.Ack(false) // ack to remove from queue + // @TODO: send msg back to lagoon and update task to failed? + _ = message.Ack(false) // ack to remove from queue return } if !v1alpha1 && !v1 { // k8up not installed - message.Ack(false) // ack to remove from queue + _ = message.Ack(false) // ack to remove from queue return } // if this is a request to cancel a restore attempt @@ -380,8 +380,8 @@ func (m *Messenger) Consumer(targetName string) { jobSpec.Environment.Name, ), ) - //@TODO: send msg back to lagoon and update task to failed? - message.Ack(false) // ack to remove from queue + // @TODO: send msg back to lagoon and update task to failed? + _ = message.Ack(false) // ack to remove from queue return } case "deploytarget:route:migrate", "kubernetes:route:migrate", "openshift:route:migrate": diff --git a/internal/messenger/tasks_restore.go b/internal/messenger/tasks_restore.go index 0c1b90e0..7c66d939 100644 --- a/internal/messenger/tasks_restore.go +++ b/internal/messenger/tasks_restore.go @@ -48,10 +48,8 @@ func (m *Messenger) ResticRestore(ctx context.Context, namespace string, jobSpec } else { if v1 { handlev1 = true - } else { - if v1alpha1 { - handlev1alpha1 = true - } + } else if v1alpha1 { + handlev1alpha1 = true } } } else {