diff --git a/e2e/tests/k8s_api.py b/e2e/tests/k8s_api.py index 1f42ad4bc..08a5465bb 100644 --- a/e2e/tests/k8s_api.py +++ b/e2e/tests/k8s_api.py @@ -92,10 +92,13 @@ def get_operator_log(self): namespace='default' ) - def pg_get_status(self, name="acid-minimal-cluster", namespace="default"): + def pg_get(self, name="acid-minimal-cluster", namespace="default"): pg = self.api.custom_objects_api.get_namespaced_custom_object( "acid.zalan.do", "v1", namespace, "postgresqls", name) - return pg.get("status", {}).get("PostgresClusterStatus", None) + return pg + + def pg_get_status(self, name="acid-minimal-cluster", namespace="default"): + return pg_get(self, name, namespace).get("status", {}) def wait_for_pod_start(self, pod_labels, namespace='default'): pod_phase = 'No pod running' diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index f473b5cc4..cb105eb28 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -71,6 +71,19 @@ def eventuallyTrue(self, f, m, retries=60, interval=2): raise time.sleep(interval) + def eventuallyTrueFunc(self, f, xf, m, retries=60, interval=2): + while True: + try: + y = f() + x = xf(y) + self.assertTrue(xf(y), m) + return True + except AssertionError: + retries = retries - 1 + if not retries > 0: + raise + time.sleep(interval) + @classmethod @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def setUpClass(cls): @@ -559,7 +572,7 @@ def compare_config(): pg_patch_config["spec"]["patroni"]["slots"][slot_to_change]["database"] = "bar" del pg_patch_config["spec"]["patroni"]["slots"][slot_to_remove] - + k8s.api.custom_objects_api.patch_namespaced_custom_object( "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_delete_slot_patch) @@ -576,7 +589,7 @@ def compare_config(): self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", get_slot_query%("database", slot_to_change))[0], "bar", "The replication slot cannot be updated", 10, 5) - + # make sure slot from Patroni didn't get deleted self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_slot_query%("slot_name", patroni_slot))), 1, "The replication slot from Patroni gets deleted", 10, 5) @@ -1670,6 +1683,13 @@ def test_overwrite_pooler_deployment(self): self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(name=pooler_name), 2, "Operator did not succeed in overwriting labels") + # status observedGeneration should match metadata.generation + self.eventuallyTrueFunc( + lambda: k8s.pg_get(), + lambda pg: pg.get("metadata", {}).get("generation", 0) == pg.get("status", {}).get("observedGeneration", -1), + "Expected generation and status.observedGeneration to match", + ) + k8s.api.custom_objects_api.patch_namespaced_custom_object( 'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', @@ -1683,6 +1703,13 @@ def test_overwrite_pooler_deployment(self): self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler="+pooler_name), 0, "Pooler pods not scaled down") + # status observedGeneration should match metadata.generation + self.eventuallyTrueFunc( + lambda: k8s.pg_get(), + lambda pg: pg.get("metadata", {}).get("generation", 0) == pg.get("status", {}).get("observedGeneration", -1), + "Expected generation and status.observedGeneration to match", + ) + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_owner_references(self): ''' @@ -2022,7 +2049,7 @@ def test_rolling_update_label_timeout(self): # pod_label_wait_timeout should have been exceeded hence the rolling update is continued on next sync # check if the cluster state is "SyncFailed" - self.eventuallyEqual(lambda: k8s.pg_get_status(), "SyncFailed", "Expected SYNC event to fail") + self.eventuallyEqual(lambda: k8s.pg_get_status(), {"PostgresClusterStatus": "SyncFailed"}, "Expected SYNC event to fail") # wait for next sync, replica should be running normally by now and be ready for switchover k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=master,' + cluster_label) @@ -2037,7 +2064,13 @@ def test_rolling_update_label_timeout(self): # status should again be "SyncFailed" but turn into "Running" on the next sync time.sleep(30) - self.eventuallyEqual(lambda: k8s.pg_get_status(), "Running", "Expected running cluster after two syncs") + self.eventuallyEqual(lambda: k8s.pg_get_status(), {"PostgresClusterStatus": "Running"}, "Expected running cluster after two syncs") + # status observedGeneration should match metadata.generation + self.eventuallyTrueFunc( + lambda: k8s.pg_get(), + lambda pg: pg.get("metadata", {}).get("generation", 0) == pg.get("status", {}).get("observedGeneration", -1), + "Expected generation and status.observedGeneration to match", + ) # revert config changes patch_resync_config = { diff --git a/manifests/postgresql.crd.yaml b/manifests/postgresql.crd.yaml index 44e6a1def..ed269d86d 100644 --- a/manifests/postgresql.crd.yaml +++ b/manifests/postgresql.crd.yaml @@ -4189,6 +4189,9 @@ spec: properties: PostgresClusterStatus: type: string + observedGeneration: + format: int64 + type: integer required: - PostgresClusterStatus type: object diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index 2676aa07e..cb489c939 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -301,6 +301,7 @@ type UserFlags []string // PostgresStatus contains status of the PostgreSQL cluster (running, creation failed etc.) type PostgresStatus struct { PostgresClusterStatus string `json:"PostgresClusterStatus"` + ObservedGeneration int64 `json:"observedGeneration,omitempty"` } // ConnectionPooler Options for connection pooler diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 65977310e..45a469a72 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -275,6 +275,7 @@ func (c *Cluster) Create() (err error) { currentStatus := c.Status.DeepCopy() pg := c.Postgresql.DeepCopy() pg.Status.PostgresClusterStatus = acidv1.ClusterStatusRunning + pg.Status.ObservedGeneration = pgCreateStatus.Generation if err != nil { c.logger.Warningf("cluster created failed: %v", err) @@ -998,6 +999,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { defer func() { currentStatus := newSpec.Status.DeepCopy() newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusRunning + newSpec.Status.ObservedGeneration = newSpec.Generation if updateFailed { newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusUpdateFailed diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 7ed12f5eb..6c43eba69 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -44,6 +44,9 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { c.setSpec(newSpec) defer func() { + // update observedGeneration to reflect that the latest spec + // was processed + newSpec.Status.ObservedGeneration = newSpec.Generation if err != nil { c.logger.Warningf("error while syncing cluster state: %v", err) newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusSyncFailed diff --git a/pkg/cluster/util_test.go b/pkg/cluster/util_test.go index 2a8b0b90a..251352f55 100644 --- a/pkg/cluster/util_test.go +++ b/pkg/cluster/util_test.go @@ -13,6 +13,7 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/zalando/postgres-operator/mocks" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" fakeacidv1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/fake" @@ -589,9 +590,16 @@ func TestInheritedAnnotations(t *testing.T) { // + new PVC cluster.KubeClient.PersistentVolumeClaims(namespace).Create(context.TODO(), &CreatePVCs(namespace, clusterName, filterLabels, 4, "1Gi").Items[3], metav1.CreateOptions{}) + newSpec, err = cluster.KubeClient.Postgresqls(namespace).Update(context.TODO(), newSpec, metav1.UpdateOptions{}) + assert.NoError(t, err) + + generation := newSpec.Generation + err = cluster.Update(cluster.Postgresql.DeepCopy(), newSpec) assert.NoError(t, err) + require.Equal(t, generation, cluster.Postgresql.Status.ObservedGeneration) + err = checkResourcesInheritedAnnotations(cluster, result) assert.NoError(t, err)