From afdf40b87402c0c7d603eec8a9f8b453c6e42a02 Mon Sep 17 00:00:00 2001 From: Lajith Date: Wed, 12 Mar 2025 14:49:27 +0530 Subject: [PATCH 01/17] [FLINK-33634] Add Conditions to Flink CRD's Status field --- .../api/status/FlinkDeploymentStatus.java | 176 ++++++++++++++++++ .../operator/api/utils/ConditionUtils.java | 51 +++++ .../FlinkDeploymentControllerTest.java | 46 +++++ .../flinkdeployments.flink.apache.org-v1.yml | 19 ++ 4 files changed, 292 insertions(+) create mode 100644 flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java index 136d3415f3..c618736a8c 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java @@ -18,9 +18,12 @@ package org.apache.flink.kubernetes.operator.api.status; import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; +import org.apache.flink.kubernetes.operator.api.utils.ConditionUtils; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import io.fabric8.kubernetes.api.model.Condition; import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; @@ -28,7 +31,9 @@ import lombok.ToString; import lombok.experimental.SuperBuilder; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; /** Last observed status of the Flink deployment. */ @@ -55,4 +60,175 @@ public class FlinkDeploymentStatus extends CommonStatus { /** Information about the TaskManagers for the scale subresource. */ private TaskManagerInfo taskManager; + + /** Condition of the CR . */ + private List conditions = new ArrayList<>(); + + private String phase; + + public List getConditions() { + if (reconciliationStatus != null + && reconciliationStatus.deserializeLastReconciledSpec() != null + && reconciliationStatus.deserializeLastReconciledSpec().getJob() == null) { + switch (jobManagerDeploymentStatus) { + case READY: + updateCondition( + conditions, + ConditionUtils.runningTrue( + "JobManager is running and ready to receive REST API call", + "JobManager is running and ready to receive REST API call")); + break; + case MISSING: + updateCondition( + conditions, + ConditionUtils.runningFalse( + "JobManager deployment not found ", + "JobManager deployment not found ")); + break; + case DEPLOYING: + updateCondition( + conditions, + ConditionUtils.runningFalse( + "JobManager process is starting up", + "JobManager process is starting up")); + break; + case DEPLOYED_NOT_READY: + updateCondition( + conditions, + ConditionUtils.runningFalse( + "JobManager is running but not ready yet to receive REST API calls", + "JobManager is running but not ready yet to receive REST API calls")); + break; + case ERROR: + updateCondition( + conditions, + ConditionUtils.runningFalse( + "Deployment in terminal error, requires spec change for reconciliation to continue", + "JobManager deployment failed")); + } + } else if (getJobStatus() != null && getJobStatus().getState() != null) { + switch (getJobStatus().getState()) { + case RECONCILING: + updateCondition( + conditions, + ConditionUtils.runningFalse( + JobStatus.RECONCILING.name(), "Job is currently reconciling")); + break; + case CREATED: + updateCondition( + conditions, + ConditionUtils.runningFalse( + JobStatus.CREATED.name(), "Job is created")); + break; + case RUNNING: + updateCondition( + conditions, + ConditionUtils.runningTrue(JobStatus.RUNNING.name(), "Job is running")); + break; + case FAILING: + updateCondition( + conditions, + ConditionUtils.runningFalse( + JobStatus.FAILING.name(), "Job has failed")); + break; + case RESTARTING: + updateCondition( + conditions, + ConditionUtils.runningFalse( + JobStatus.RESTARTING.name(), + "The job is currently undergoing a restarting")); + break; + case FAILED: + updateCondition( + conditions, + ConditionUtils.runningFalse( + JobStatus.FAILED.name(), + "The job has failed with a non-recoverable task failure")); + break; + case FINISHED: + updateCondition( + conditions, + ConditionUtils.runningFalse( + JobStatus.FINISHED.name(), + "Job's tasks have successfully finished")); + break; + + case CANCELED: + updateCondition( + conditions, + ConditionUtils.runningFalse( + JobStatus.CANCELED.name(), "Job has been cancelled")); + break; + case SUSPENDED: + updateCondition( + conditions, + ConditionUtils.runningFalse( + JobStatus.SUSPENDED.name(), "The job has been suspended")); + break; + } + } + return conditions; + } + + public String getPhase() { + if (reconciliationStatus != null + && reconciliationStatus.deserializeLastReconciledSpec() != null + && reconciliationStatus.deserializeLastReconciledSpec().getJob() == null) { + switch (jobManagerDeploymentStatus) { + case READY: + phase = "Running"; + break; + case MISSING: + case ERROR: + case DEPLOYING: + phase = "Pending"; + break; + } + } else if (getJobStatus() != null && getJobStatus().getState() != null) { + switch (getJobStatus().getState()) { + case RECONCILING: + phase = "Pending"; + break; + case CREATED: + phase = JobStatus.CREATED.name(); + break; + case RUNNING: + phase = JobStatus.RUNNING.name(); + break; + case FAILING: + phase = JobStatus.FAILING.name(); + break; + case RESTARTING: + phase = JobStatus.RESTARTING.name(); + break; + case FAILED: + phase = JobStatus.FAILED.name(); + break; + case FINISHED: + phase = JobStatus.FINISHED.name(); + break; + case CANCELED: + phase = JobStatus.CANCELED.name(); + break; + case SUSPENDED: + phase = JobStatus.SUSPENDED.name(); + break; + } + } + return phase; + } + + private static void updateCondition(List conditions, Condition condition) { + if (conditions.isEmpty()) { + conditions.add(condition); + return; + } + // If new condition is same as last condition, ignore + Condition existingCondition = conditions.get(conditions.size() - 1); + if (existingCondition.getType().equals(condition.getType()) + && existingCondition.getMessage().equals(condition.getMessage())) { + return; + } + conditions.add(condition); + } } diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java new file mode 100644 index 0000000000..0a235690cb --- /dev/null +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.api.utils; + +import io.fabric8.kubernetes.api.model.Condition; +import io.fabric8.kubernetes.api.model.ConditionBuilder; + +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * Creates a condition object with the specified parameters. + * + * @return A condition object with the type, message, status, reason and timestamp. + */ +public class ConditionUtils { + public static Condition runningTrue(final String message, final String reason) { + return crCondition("Running", "True", message, reason); + } + + public static Condition runningFalse(final String message, final String reason) { + return crCondition("Running", "False", message, reason); + } + + private static Condition crCondition( + final String type, final String status, final String message, final String reason) { + return new ConditionBuilder() + .withType(type) + .withStatus(status) + .withMessage(message) + .withReason(reason) + .withLastTransitionTime( + new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'").format(new Date())) + .build(); + } +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 0354a1256e..108daa7d1b 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -69,6 +69,7 @@ import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED; import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED; import static org.apache.flink.kubernetes.operator.utils.EventRecorder.Reason.ValidationError; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -130,6 +131,16 @@ public void verifyBasicReconcileLoop(FlinkVersion flinkVersion) throws Exception assertEquals( org.apache.flink.api.common.JobStatus.RUNNING, appCluster.getStatus().getJobStatus().getState()); + + // Validate status conditions + assertEquals( + org.apache.flink.api.common.JobStatus.RUNNING.name(), + appCluster.getStatus().getPhase()); + assertThat(appCluster.getStatus().getConditions()).isNotNull(); + assertThat(appCluster.getStatus().getConditions()) + .extracting("message") + .contains(org.apache.flink.api.common.JobStatus.RUNNING.name()); + assertEquals(7, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isUpdateStatus()); @@ -1167,6 +1178,14 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E org.apache.flink.api.common.JobStatus.RECONCILING, appCluster.getStatus().getJobStatus().getState()); assertEquals(4, testController.getInternalStatusUpdateCount()); + + // Validate status conditions + assertEquals("Pending", appCluster.getStatus().getPhase()); + assertThat(appCluster.getStatus().getConditions()).isNotNull(); + assertThat(appCluster.getStatus().getConditions()) + .extracting("message") + .contains(org.apache.flink.api.common.JobStatus.RECONCILING.name()); + assertFalse(updateControl.isUpdateStatus()); assertEquals( Optional.of( @@ -1190,6 +1209,14 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E org.apache.flink.api.common.JobStatus.RECONCILING, appCluster.getStatus().getJobStatus().getState()); assertEquals(5, testController.getInternalStatusUpdateCount()); + + // Validate status conditions + assertEquals("Pending", appCluster.getStatus().getPhase()); + assertThat(appCluster.getStatus().getConditions()).isNotNull(); + assertThat(appCluster.getStatus().getConditions()) + .extracting("message") + .contains(org.apache.flink.api.common.JobStatus.RECONCILING.name()); + assertFalse(updateControl.isUpdateStatus()); assertEquals( Optional.of( @@ -1203,6 +1230,16 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E assertEquals( org.apache.flink.api.common.JobStatus.RUNNING, appCluster.getStatus().getJobStatus().getState()); + + // Validate status conditions + assertEquals( + org.apache.flink.api.common.JobStatus.RUNNING.name(), + appCluster.getStatus().getPhase()); + assertThat(appCluster.getStatus().getConditions()).isNotNull(); + assertThat(appCluster.getStatus().getConditions()) + .extracting("message") + .contains(org.apache.flink.api.common.JobStatus.RUNNING.name()); + assertEquals(6, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isUpdateStatus()); assertEquals( @@ -1225,6 +1262,15 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E configManager.getOperatorConfiguration().getReconcileInterval().toMillis()), updateControl.getScheduleDelay()); + // Validate status conditions + assertEquals( + org.apache.flink.api.common.JobStatus.RUNNING.name(), + appCluster.getStatus().getPhase()); + assertThat(appCluster.getStatus().getConditions()).isNotNull(); + assertThat(appCluster.getStatus().getConditions()) + .extracting("message") + .contains(org.apache.flink.api.common.JobStatus.RUNNING.name()); + // Validate job status JobStatus jobStatus = appCluster.getStatus().getJobStatus(); JobStatusMessage expectedJobStatus = flinkService.listJobs().get(0).f1; diff --git a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml index f498ae2431..3b0e143b6f 100644 --- a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml +++ b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml @@ -10229,6 +10229,23 @@ spec: additionalProperties: type: string type: object + conditions: + items: + properties: + lastTransitionTime: + type: string + message: + type: string + observedGeneration: + type: integer + reason: + type: string + status: + type: string + type: + type: string + type: object + type: array error: type: string jobManagerDeploymentStatus: @@ -10391,6 +10408,8 @@ spec: type: string observedGeneration: type: integer + phase: + type: string reconciliationStatus: properties: lastReconciledSpec: From 1d84164b11d4a2d1e47a7289caacf1c0941b5aea Mon Sep 17 00:00:00 2001 From: Lajith Date: Wed, 12 Mar 2025 15:11:40 +0530 Subject: [PATCH 02/17] [FLINK-33634] Add Conditions to Flink CRD's Status field --- .../controller/FlinkDeploymentControllerTest.java | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 108daa7d1b..bceb79c798 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -131,16 +131,6 @@ public void verifyBasicReconcileLoop(FlinkVersion flinkVersion) throws Exception assertEquals( org.apache.flink.api.common.JobStatus.RUNNING, appCluster.getStatus().getJobStatus().getState()); - - // Validate status conditions - assertEquals( - org.apache.flink.api.common.JobStatus.RUNNING.name(), - appCluster.getStatus().getPhase()); - assertThat(appCluster.getStatus().getConditions()).isNotNull(); - assertThat(appCluster.getStatus().getConditions()) - .extracting("message") - .contains(org.apache.flink.api.common.JobStatus.RUNNING.name()); - assertEquals(7, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isUpdateStatus()); @@ -1182,7 +1172,7 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E // Validate status conditions assertEquals("Pending", appCluster.getStatus().getPhase()); assertThat(appCluster.getStatus().getConditions()).isNotNull(); - assertThat(appCluster.getStatus().getConditions()) + assertThat(appCluster.getStatus().getConditions()).hasSize(1) .extracting("message") .contains(org.apache.flink.api.common.JobStatus.RECONCILING.name()); @@ -1213,7 +1203,7 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E // Validate status conditions assertEquals("Pending", appCluster.getStatus().getPhase()); assertThat(appCluster.getStatus().getConditions()).isNotNull(); - assertThat(appCluster.getStatus().getConditions()) + assertThat(appCluster.getStatus().getConditions()).hasSize(1) .extracting("message") .contains(org.apache.flink.api.common.JobStatus.RECONCILING.name()); From 727bb4bb0d64b3b28094eb2e71f2ca267574e036 Mon Sep 17 00:00:00 2001 From: Lajith Date: Sat, 15 Mar 2025 22:11:30 +0530 Subject: [PATCH 03/17] [FLINK-33634] Add Conditions to Flink CRD's Status field --- .../operator/controller/FlinkDeploymentControllerTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index bceb79c798..d486369b55 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -1172,7 +1172,8 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E // Validate status conditions assertEquals("Pending", appCluster.getStatus().getPhase()); assertThat(appCluster.getStatus().getConditions()).isNotNull(); - assertThat(appCluster.getStatus().getConditions()).hasSize(1) + assertThat(appCluster.getStatus().getConditions()) + .hasSize(1) .extracting("message") .contains(org.apache.flink.api.common.JobStatus.RECONCILING.name()); @@ -1203,7 +1204,8 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E // Validate status conditions assertEquals("Pending", appCluster.getStatus().getPhase()); assertThat(appCluster.getStatus().getConditions()).isNotNull(); - assertThat(appCluster.getStatus().getConditions()).hasSize(1) + assertThat(appCluster.getStatus().getConditions()) + .hasSize(1) .extracting("message") .contains(org.apache.flink.api.common.JobStatus.RECONCILING.name()); From c66fd1f3f9d6910b43a8abcdbdab6111cee9a9b4 Mon Sep 17 00:00:00 2001 From: Lajith Date: Thu, 20 Mar 2025 00:24:15 +0530 Subject: [PATCH 04/17] [FLINK-33634] Add Conditions to Flink CRD's Status field --- .../api/status/FlinkDeploymentStatus.java | 4 +- .../FlinkDeploymentControllerTest.java | 104 ++++++++++++++++++ 2 files changed, 106 insertions(+), 2 deletions(-) diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java index c618736a8c..a1fd0a98a7 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java @@ -82,8 +82,8 @@ public List getConditions() { updateCondition( conditions, ConditionUtils.runningFalse( - "JobManager deployment not found ", - "JobManager deployment not found ")); + "JobManager deployment not found", + "JobManager deployment not found")); break; case DEPLOYING: updateCondition( diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index d486369b55..adbb57252f 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -281,6 +281,14 @@ public void verifyFailedDeployment() throws Exception { // Validate status assertNotNull(appCluster.getStatus().getError()); + // Validate status conditions + assertEquals("Pending", appCluster.getStatus().getPhase()); + assertThat(appCluster.getStatus().getConditions()).isNotNull(); + assertThat(appCluster.getStatus().getConditions()) + .hasSize(1) + .extracting("message") + .contains(org.apache.flink.api.common.JobStatus.RECONCILING.name()); + // next cycle should not create another event updateControl = testController.reconcile( @@ -365,6 +373,14 @@ public void verifyInProgressDeploymentWithError(String reason) throws Exception org.apache.flink.api.common.JobStatus.RECONCILING, appCluster.getStatus().getJobStatus().getState()); + // Validate status conditions + assertEquals("Pending", appCluster.getStatus().getPhase()); + assertThat(appCluster.getStatus().getConditions()).isNotNull(); + assertThat(appCluster.getStatus().getConditions()) + .hasSize(1) + .extracting("message") + .contains(org.apache.flink.api.common.JobStatus.RECONCILING.name()); + // Validate status status assertNotNull(appCluster.getStatus().getError()); @@ -449,6 +465,15 @@ public void verifyUpgradeFromSavepointLegacyMode(FlinkVersion flinkVersion) thro assertEquals( "savepoint_1", appCluster.getStatus().getJobStatus().getUpgradeSavepointPath()); + // Validate status conditions + assertEquals( + org.apache.flink.api.common.JobStatus.FINISHED.name(), + appCluster.getStatus().getPhase()); + assertThat(appCluster.getStatus().getConditions()).isNotNull(); + assertThat(appCluster.getStatus().getConditions()) + .extracting("message") + .contains(org.apache.flink.api.common.JobStatus.FINISHED.name()); + // Resume from last savepoint appCluster.getSpec().getJob().setState(JobState.RUNNING); testController.reconcile(appCluster, context); @@ -658,6 +683,9 @@ public void verifyReconcileWithBadConfig() throws Exception { JobManagerDeploymentStatus.DEPLOYING, appCluster.getStatus().getJobManagerDeploymentStatus()); + // Validate status conditions + assertEquals("Pending", appCluster.getStatus().getPhase()); + // Check when the bad config is applied, observe() will change the cluster state correctly appCluster.getSpec().getJobManager().setReplicas(-1); // Next reconcile will set error msg and observe with previous validated config @@ -672,6 +700,9 @@ public void verifyReconcileWithBadConfig() throws Exception { JobManagerDeploymentStatus.DEPLOYED_NOT_READY, appCluster.getStatus().getJobManagerDeploymentStatus()); + // Validate status conditions + assertEquals("Pending", appCluster.getStatus().getPhase()); + // Make sure we do validation before getting effective config in reconcile(). appCluster.getSpec().getJobManager().setReplicas(1); appCluster.getSpec().getJob().setParallelism(0); @@ -684,6 +715,10 @@ public void verifyReconcileWithBadConfig() throws Exception { assertEquals( JobManagerDeploymentStatus.READY, appCluster.getStatus().getJobManagerDeploymentStatus()); + + assertEquals( + org.apache.flink.api.common.JobStatus.RUNNING.name(), + appCluster.getStatus().getPhase()); } @Test @@ -698,6 +733,13 @@ public void verifyReconcileWithAChangedOperatorModeToSession() throws Exception JobManagerDeploymentStatus.DEPLOYING, appCluster.getStatus().getJobManagerDeploymentStatus()); + // Validate status conditions + assertEquals("Pending", appCluster.getStatus().getPhase()); + assertThat(appCluster.getStatus().getConditions()).isNotNull(); + assertThat(appCluster.getStatus().getConditions()) + .extracting("message") + .contains(org.apache.flink.api.common.JobStatus.RECONCILING.name()); + updateControl = testController.reconcile(appCluster, context); JobStatus jobStatus = appCluster.getStatus().getJobStatus(); assertFalse(updateControl.isUpdateStatus()); @@ -707,6 +749,13 @@ public void verifyReconcileWithAChangedOperatorModeToSession() throws Exception // jobStatus has not been set at this time assertEquals(org.apache.flink.api.common.JobStatus.RECONCILING, jobStatus.getState()); + // Validate status conditions + assertEquals("Pending", appCluster.getStatus().getPhase()); + assertThat(appCluster.getStatus().getConditions()).isNotNull(); + assertThat(appCluster.getStatus().getConditions()) + .extracting("message") + .contains(org.apache.flink.api.common.JobStatus.RECONCILING.name()); + // Switches operator mode to SESSION appCluster.getSpec().setJob(null); // Validation fails and JobObserver should still be used @@ -728,6 +777,17 @@ public void verifyReconcileWithAChangedOperatorModeToSession() throws Exception assertEquals(expectedJobStatus.getJobId().toHexString(), jobStatus.getJobId()); assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName()); assertEquals(expectedJobStatus.getJobState(), jobStatus.getState()); + + // Validate status conditions + assertEquals( + org.apache.flink.api.common.JobStatus.RUNNING.name(), + appCluster.getStatus().getPhase()); + assertThat(appCluster.getStatus().getConditions()).isNotNull(); + assertThat(appCluster.getStatus().getConditions()) + .extracting("message") + .contains( + org.apache.flink.api.common.JobStatus.RECONCILING.name(), + org.apache.flink.api.common.JobStatus.RUNNING.name()); } @Test @@ -742,12 +802,31 @@ public void verifyReconcileWithAChangedOperatorModeToApplication() throws Except JobManagerDeploymentStatus.DEPLOYING, appCluster.getStatus().getJobManagerDeploymentStatus()); + // Validate status conditions + assertEquals("Pending", appCluster.getStatus().getPhase()); + assertThat(appCluster.getStatus().getConditions()).isNotNull(); + assertThat(appCluster.getStatus().getConditions()) + .hasSize(2) + .extracting("message") + .contains("JobManager deployment not found", "JobManager process is starting up"); + updateControl = testController.reconcile(appCluster, context); JobStatus jobStatus = appCluster.getStatus().getJobStatus(); assertFalse(updateControl.isUpdateStatus()); assertEquals( JobManagerDeploymentStatus.DEPLOYED_NOT_READY, appCluster.getStatus().getJobManagerDeploymentStatus()); + + // Validate status conditions + assertEquals("Pending", appCluster.getStatus().getPhase()); + assertThat(appCluster.getStatus().getConditions()).isNotNull(); + assertThat(appCluster.getStatus().getConditions()) + .hasSize(3) + .extracting("message") + .contains( + "JobManager deployment not found", + "JobManager process is starting up", + "JobManager is running but not ready yet to receive REST API calls"); // jobStatus has not been set at this time assertNull(jobStatus.getState()); @@ -765,6 +844,18 @@ public void verifyReconcileWithAChangedOperatorModeToApplication() throws Except .getError() .contains("Cannot switch from session to job cluster")); assertNull(ReconciliationUtils.getDeployedSpec(appCluster).getJob()); + + // Validate status conditions + assertEquals("Running", appCluster.getStatus().getPhase()); + assertThat(appCluster.getStatus().getConditions()).isNotNull(); + assertThat(appCluster.getStatus().getConditions()) + .hasSize(4) + .extracting("message") + .contains( + "JobManager deployment not found", + "JobManager process is starting up", + "JobManager is running but not ready yet to receive REST API calls", + "JobManager is running and ready to receive REST API call"); } private void testUpgradeNotReadyCluster(FlinkDeployment appCluster) throws Exception { @@ -910,6 +1001,15 @@ private void testUpgradeNotReadyCluster(FlinkDeployment appCluster) throws Excep assertEquals( JobManagerDeploymentStatus.READY, appCluster.getStatus().getJobManagerDeploymentStatus()); + + // Validate status conditions + assertEquals( + org.apache.flink.api.common.JobStatus.RUNNING.name(), + appCluster.getStatus().getPhase()); + assertThat(appCluster.getStatus().getConditions()).isNotNull(); + assertThat(appCluster.getStatus().getConditions()) + .extracting("message") + .contains(org.apache.flink.api.common.JobStatus.RUNNING.name()); } @Test @@ -1156,6 +1256,10 @@ private void verifyReconcileInitialSuspendedDeployment(FlinkDeployment appCluste assertNull(appCluster.getStatus().getError()); assertNull(reconciliationStatus.deserializeLastReconciledSpec()); assertNull(reconciliationStatus.getLastStableSpec()); + + // Validate status conditions + assertNull(appCluster.getStatus().getPhase()); + assertThat(appCluster.getStatus().getConditions()).isEmpty(); } private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws Exception { From c623f33a571089ab5f1a014b147a04637b885ddb Mon Sep 17 00:00:00 2001 From: Lajith Date: Wed, 26 Mar 2025 17:09:58 +0530 Subject: [PATCH 05/17] [FLINK-33634] Add Conditions to Flink CRD's Status field --- .../api/status/FlinkDeploymentStatus.java | 18 +++++++++------ .../operator/api/utils/ConditionUtils.java | 22 ++++++++++++++----- 2 files changed, 28 insertions(+), 12 deletions(-) diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java index a1fd0a98a7..be211819a1 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java @@ -70,43 +70,45 @@ public List getConditions() { if (reconciliationStatus != null && reconciliationStatus.deserializeLastReconciledSpec() != null && reconciliationStatus.deserializeLastReconciledSpec().getJob() == null) { + // Populate conditions for SessionMode deployment switch (jobManagerDeploymentStatus) { case READY: updateCondition( conditions, ConditionUtils.runningTrue( - "JobManager is running and ready to receive REST API call", - "JobManager is running and ready to receive REST API call")); + JobManagerDeploymentStatus.READY.name(), + "JobManager is running and ready to receive REST API calls")); break; case MISSING: updateCondition( conditions, ConditionUtils.runningFalse( - "JobManager deployment not found", + JobManagerDeploymentStatus.MISSING.name(), "JobManager deployment not found")); break; case DEPLOYING: updateCondition( conditions, ConditionUtils.runningFalse( - "JobManager process is starting up", + JobManagerDeploymentStatus.DEPLOYING.name(), "JobManager process is starting up")); break; case DEPLOYED_NOT_READY: updateCondition( conditions, ConditionUtils.runningFalse( - "JobManager is running but not ready yet to receive REST API calls", + JobManagerDeploymentStatus.DEPLOYED_NOT_READY.name(), "JobManager is running but not ready yet to receive REST API calls")); break; case ERROR: updateCondition( conditions, ConditionUtils.runningFalse( - "Deployment in terminal error, requires spec change for reconciliation to continue", + JobManagerDeploymentStatus.ERROR.name(), "JobManager deployment failed")); } } else if (getJobStatus() != null && getJobStatus().getState() != null) { + // Populate conditions for ApplicationMode deployment switch (getJobStatus().getState()) { case RECONCILING: updateCondition( @@ -136,7 +138,7 @@ public List getConditions() { conditions, ConditionUtils.runningFalse( JobStatus.RESTARTING.name(), - "The job is currently undergoing a restarting")); + "The job is currently restarting")); break; case FAILED: updateCondition( @@ -174,6 +176,7 @@ public String getPhase() { if (reconciliationStatus != null && reconciliationStatus.deserializeLastReconciledSpec() != null && reconciliationStatus.deserializeLastReconciledSpec().getJob() == null) { + // populate phase for SessionMode deployment switch (jobManagerDeploymentStatus) { case READY: phase = "Running"; @@ -185,6 +188,7 @@ public String getPhase() { break; } } else if (getJobStatus() != null && getJobStatus().getState() != null) { + // populate phase for ApplicationMode deployment switch (getJobStatus().getState()) { case RECONCILING: phase = "Pending"; diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java index 0a235690cb..44b49673ac 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java @@ -23,16 +23,28 @@ import java.text.SimpleDateFormat; import java.util.Date; -/** - * Creates a condition object with the specified parameters. - * - * @return A condition object with the type, message, status, reason and timestamp. - */ +/** Creates a condition object with the type, status, message and reason. */ public class ConditionUtils { + /** + * Create a condition with type "Running", status "True", the given message and reason, and + * current timestamp. + * + * @param message Message for the condition. + * @param reason Reason for the condition. + * @return A condition object. + */ public static Condition runningTrue(final String message, final String reason) { return crCondition("Running", "True", message, reason); } + /** + * Create a condition with type "Running", status "False", the given message and reason, and + * current timestamp. + * + * @param message Message for the condition. + * @param reason Reason for the condition. + * @return A condition object. + */ public static Condition runningFalse(final String message, final String reason) { return crCondition("Running", "False", message, reason); } From 196c49440a9298c85ccc9fb3d5978789590f0d2f Mon Sep 17 00:00:00 2001 From: Lajith Date: Fri, 28 Mar 2025 11:55:39 +0530 Subject: [PATCH 06/17] [FLINK-33634] Add Conditions to Flink CRD's Status field --- .../api/status/FlinkDeploymentStatus.java | 77 +++++----- .../operator/api/utils/ConditionUtils.java | 142 ++++++++++++++---- .../FlinkDeploymentControllerTest.java | 37 ++--- 3 files changed, 167 insertions(+), 89 deletions(-) diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java index be211819a1..fe40c39916 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java @@ -75,37 +75,37 @@ public List getConditions() { case READY: updateCondition( conditions, - ConditionUtils.runningTrue( - JobManagerDeploymentStatus.READY.name(), - "JobManager is running and ready to receive REST API calls")); + ConditionUtils.crCondition( + ConditionUtils.SESSION_MODE_CONDITION.get( + JobManagerDeploymentStatus.READY.name()))); break; case MISSING: updateCondition( conditions, - ConditionUtils.runningFalse( - JobManagerDeploymentStatus.MISSING.name(), - "JobManager deployment not found")); + ConditionUtils.crCondition( + ConditionUtils.SESSION_MODE_CONDITION.get( + JobManagerDeploymentStatus.MISSING.name()))); break; case DEPLOYING: updateCondition( conditions, - ConditionUtils.runningFalse( - JobManagerDeploymentStatus.DEPLOYING.name(), - "JobManager process is starting up")); + ConditionUtils.crCondition( + ConditionUtils.SESSION_MODE_CONDITION.get( + JobManagerDeploymentStatus.DEPLOYING.name()))); break; case DEPLOYED_NOT_READY: updateCondition( conditions, - ConditionUtils.runningFalse( - JobManagerDeploymentStatus.DEPLOYED_NOT_READY.name(), - "JobManager is running but not ready yet to receive REST API calls")); + ConditionUtils.crCondition( + ConditionUtils.SESSION_MODE_CONDITION.get( + JobManagerDeploymentStatus.DEPLOYED_NOT_READY.name()))); break; case ERROR: updateCondition( conditions, - ConditionUtils.runningFalse( - JobManagerDeploymentStatus.ERROR.name(), - "JobManager deployment failed")); + ConditionUtils.crCondition( + ConditionUtils.SESSION_MODE_CONDITION.get( + JobManagerDeploymentStatus.ERROR.name()))); } } else if (getJobStatus() != null && getJobStatus().getState() != null) { // Populate conditions for ApplicationMode deployment @@ -113,59 +113,66 @@ public List getConditions() { case RECONCILING: updateCondition( conditions, - ConditionUtils.runningFalse( - JobStatus.RECONCILING.name(), "Job is currently reconciling")); + ConditionUtils.crCondition( + ConditionUtils.APPLICATION_MODE_CONDITION.get( + JobStatus.RECONCILING.name()))); break; case CREATED: updateCondition( conditions, - ConditionUtils.runningFalse( - JobStatus.CREATED.name(), "Job is created")); + ConditionUtils.crCondition( + ConditionUtils.APPLICATION_MODE_CONDITION.get( + JobStatus.CREATED.name()))); break; case RUNNING: updateCondition( conditions, - ConditionUtils.runningTrue(JobStatus.RUNNING.name(), "Job is running")); + ConditionUtils.crCondition( + ConditionUtils.APPLICATION_MODE_CONDITION.get( + JobStatus.RUNNING.name()))); break; case FAILING: updateCondition( conditions, - ConditionUtils.runningFalse( - JobStatus.FAILING.name(), "Job has failed")); + ConditionUtils.crCondition( + ConditionUtils.APPLICATION_MODE_CONDITION.get( + JobStatus.FAILING.name()))); break; case RESTARTING: updateCondition( conditions, - ConditionUtils.runningFalse( - JobStatus.RESTARTING.name(), - "The job is currently restarting")); + ConditionUtils.crCondition( + ConditionUtils.APPLICATION_MODE_CONDITION.get( + JobStatus.RESTARTING.name()))); break; case FAILED: updateCondition( conditions, - ConditionUtils.runningFalse( - JobStatus.FAILED.name(), - "The job has failed with a non-recoverable task failure")); + ConditionUtils.crCondition( + ConditionUtils.APPLICATION_MODE_CONDITION.get( + JobStatus.FAILED.name()))); break; case FINISHED: updateCondition( conditions, - ConditionUtils.runningFalse( - JobStatus.FINISHED.name(), - "Job's tasks have successfully finished")); + ConditionUtils.crCondition( + ConditionUtils.APPLICATION_MODE_CONDITION.get( + JobStatus.FINISHED.name()))); break; case CANCELED: updateCondition( conditions, - ConditionUtils.runningFalse( - JobStatus.CANCELED.name(), "Job has been cancelled")); + ConditionUtils.crCondition( + ConditionUtils.APPLICATION_MODE_CONDITION.get( + JobStatus.CANCELED.name()))); break; case SUSPENDED: updateCondition( conditions, - ConditionUtils.runningFalse( - JobStatus.SUSPENDED.name(), "The job has been suspended")); + ConditionUtils.crCondition( + ConditionUtils.APPLICATION_MODE_CONDITION.get( + JobStatus.SUSPENDED.name()))); break; } } diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java index 44b49673ac..f4f08bcbfc 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java @@ -17,47 +17,127 @@ package org.apache.flink.kubernetes.operator.api.utils; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; + import io.fabric8.kubernetes.api.model.Condition; import io.fabric8.kubernetes.api.model.ConditionBuilder; import java.text.SimpleDateFormat; import java.util.Date; +import java.util.Map; /** Creates a condition object with the type, status, message and reason. */ public class ConditionUtils { - /** - * Create a condition with type "Running", status "True", the given message and reason, and - * current timestamp. - * - * @param message Message for the condition. - * @param reason Reason for the condition. - * @return A condition object. - */ - public static Condition runningTrue(final String message, final String reason) { - return crCondition("Running", "True", message, reason); - } - - /** - * Create a condition with type "Running", status "False", the given message and reason, and - * current timestamp. - * - * @param message Message for the condition. - * @param reason Reason for the condition. - * @return A condition object. - */ - public static Condition runningFalse(final String message, final String reason) { - return crCondition("Running", "False", message, reason); - } - - private static Condition crCondition( - final String type, final String status, final String message, final String reason) { - return new ConditionBuilder() - .withType(type) - .withStatus(status) - .withMessage(message) - .withReason(reason) + public static Condition crCondition(Condition condition) { + return new ConditionBuilder(condition) .withLastTransitionTime( new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'").format(new Date())) .build(); } + + public static final Map SESSION_MODE_CONDITION = + Map.of( + JobManagerDeploymentStatus.READY.name(), + new ConditionBuilder() + .withType("Running") + .withStatus("True") + .withMessage("Ready") + .withReason("JobManager is running and ready to receive REST API calls") + .build(), + JobManagerDeploymentStatus.MISSING.name(), + new ConditionBuilder() + .withType("Running") + .withStatus("False") + .withMessage("Missing") + .withReason("JobManager deployment not found") + .build(), + JobManagerDeploymentStatus.DEPLOYING.name(), + new ConditionBuilder() + .withType("Running") + .withStatus("False") + .withMessage("Deploying") + .withReason("JobManager process is starting up") + .build(), + JobManagerDeploymentStatus.DEPLOYED_NOT_READY.name(), + new ConditionBuilder() + .withType("Running") + .withStatus("False") + .withMessage("DeployedNotReady") + .withReason( + "JobManager is running but not ready yet to receive REST API calls") + .build(), + JobManagerDeploymentStatus.ERROR.name(), + new ConditionBuilder() + .withType("Running") + .withStatus("False") + .withMessage("Error") + .withReason("JobManager deployment failed") + .build()); + + public static final Map APPLICATION_MODE_CONDITION = + Map.of( + JobStatus.RECONCILING.name(), + new ConditionBuilder() + .withType("Running") + .withStatus("False") + .withMessage("Reconciling") + .withReason("Job is currently reconciling") + .build(), + JobStatus.CREATED.name(), + new ConditionBuilder() + .withType("Running") + .withStatus("False") + .withMessage("JobCreated") + .withReason("Job is created") + .build(), + JobStatus.RUNNING.name(), + new ConditionBuilder() + .withType("Running") + .withStatus("True") + .withMessage("JobRunning") + .withReason("Job is running") + .build(), + JobStatus.FAILING.name(), + new ConditionBuilder() + .withType("Running") + .withStatus("False") + .withMessage("JobFailing") + .withReason("Job has failed") + .build(), + JobStatus.RESTARTING.name(), + new ConditionBuilder() + .withType("Running") + .withStatus("False") + .withMessage("JobRestarting") + .withReason("The job is currently restarting") + .build(), + JobStatus.FAILED.name(), + new ConditionBuilder() + .withType("Running") + .withStatus("False") + .withMessage("JobFailed") + .withReason("The job has failed with a non-recoverable task failure") + .build(), + JobStatus.FINISHED.name(), + new ConditionBuilder() + .withType("Running") + .withStatus("False") + .withMessage("JobFinished") + .withReason("Job's tasks have successfully finished") + .build(), + JobStatus.CANCELED.name(), + new ConditionBuilder() + .withType("Running") + .withStatus("False") + .withMessage("JobCancelled") + .withReason("Job has been cancelled") + .build(), + JobStatus.SUSPENDED.name(), + new ConditionBuilder() + .withType("Running") + .withStatus("False") + .withMessage("JobSuspended") + .withReason("The job has been suspended") + .build()); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index adbb57252f..586423def1 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -287,7 +287,7 @@ public void verifyFailedDeployment() throws Exception { assertThat(appCluster.getStatus().getConditions()) .hasSize(1) .extracting("message") - .contains(org.apache.flink.api.common.JobStatus.RECONCILING.name()); + .contains("Reconciling"); // next cycle should not create another event updateControl = @@ -379,7 +379,7 @@ public void verifyInProgressDeploymentWithError(String reason) throws Exception assertThat(appCluster.getStatus().getConditions()) .hasSize(1) .extracting("message") - .contains(org.apache.flink.api.common.JobStatus.RECONCILING.name()); + .contains("Reconciling"); // Validate status status assertNotNull(appCluster.getStatus().getError()); @@ -472,7 +472,7 @@ public void verifyUpgradeFromSavepointLegacyMode(FlinkVersion flinkVersion) thro assertThat(appCluster.getStatus().getConditions()).isNotNull(); assertThat(appCluster.getStatus().getConditions()) .extracting("message") - .contains(org.apache.flink.api.common.JobStatus.FINISHED.name()); + .contains("JobFinished"); // Resume from last savepoint appCluster.getSpec().getJob().setState(JobState.RUNNING); @@ -738,7 +738,7 @@ public void verifyReconcileWithAChangedOperatorModeToSession() throws Exception assertThat(appCluster.getStatus().getConditions()).isNotNull(); assertThat(appCluster.getStatus().getConditions()) .extracting("message") - .contains(org.apache.flink.api.common.JobStatus.RECONCILING.name()); + .contains("Reconciling"); updateControl = testController.reconcile(appCluster, context); JobStatus jobStatus = appCluster.getStatus().getJobStatus(); @@ -754,7 +754,7 @@ public void verifyReconcileWithAChangedOperatorModeToSession() throws Exception assertThat(appCluster.getStatus().getConditions()).isNotNull(); assertThat(appCluster.getStatus().getConditions()) .extracting("message") - .contains(org.apache.flink.api.common.JobStatus.RECONCILING.name()); + .contains("Reconciling"); // Switches operator mode to SESSION appCluster.getSpec().setJob(null); @@ -785,9 +785,7 @@ public void verifyReconcileWithAChangedOperatorModeToSession() throws Exception assertThat(appCluster.getStatus().getConditions()).isNotNull(); assertThat(appCluster.getStatus().getConditions()) .extracting("message") - .contains( - org.apache.flink.api.common.JobStatus.RECONCILING.name(), - org.apache.flink.api.common.JobStatus.RUNNING.name()); + .contains("Reconciling", "JobRunning"); } @Test @@ -808,7 +806,7 @@ public void verifyReconcileWithAChangedOperatorModeToApplication() throws Except assertThat(appCluster.getStatus().getConditions()) .hasSize(2) .extracting("message") - .contains("JobManager deployment not found", "JobManager process is starting up"); + .contains("Missing", "Deploying"); updateControl = testController.reconcile(appCluster, context); JobStatus jobStatus = appCluster.getStatus().getJobStatus(); @@ -823,10 +821,7 @@ public void verifyReconcileWithAChangedOperatorModeToApplication() throws Except assertThat(appCluster.getStatus().getConditions()) .hasSize(3) .extracting("message") - .contains( - "JobManager deployment not found", - "JobManager process is starting up", - "JobManager is running but not ready yet to receive REST API calls"); + .contains("Missing", "Deploying", "DeployedNotReady"); // jobStatus has not been set at this time assertNull(jobStatus.getState()); @@ -851,11 +846,7 @@ public void verifyReconcileWithAChangedOperatorModeToApplication() throws Except assertThat(appCluster.getStatus().getConditions()) .hasSize(4) .extracting("message") - .contains( - "JobManager deployment not found", - "JobManager process is starting up", - "JobManager is running but not ready yet to receive REST API calls", - "JobManager is running and ready to receive REST API call"); + .contains("Missing", "Deploying", "DeployedNotReady", "Ready"); } private void testUpgradeNotReadyCluster(FlinkDeployment appCluster) throws Exception { @@ -1009,7 +1000,7 @@ private void testUpgradeNotReadyCluster(FlinkDeployment appCluster) throws Excep assertThat(appCluster.getStatus().getConditions()).isNotNull(); assertThat(appCluster.getStatus().getConditions()) .extracting("message") - .contains(org.apache.flink.api.common.JobStatus.RUNNING.name()); + .contains("JobRunning"); } @Test @@ -1279,7 +1270,7 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E assertThat(appCluster.getStatus().getConditions()) .hasSize(1) .extracting("message") - .contains(org.apache.flink.api.common.JobStatus.RECONCILING.name()); + .contains("Reconciling"); assertFalse(updateControl.isUpdateStatus()); assertEquals( @@ -1311,7 +1302,7 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E assertThat(appCluster.getStatus().getConditions()) .hasSize(1) .extracting("message") - .contains(org.apache.flink.api.common.JobStatus.RECONCILING.name()); + .contains("Reconciling"); assertFalse(updateControl.isUpdateStatus()); assertEquals( @@ -1334,7 +1325,7 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E assertThat(appCluster.getStatus().getConditions()).isNotNull(); assertThat(appCluster.getStatus().getConditions()) .extracting("message") - .contains(org.apache.flink.api.common.JobStatus.RUNNING.name()); + .contains("JobRunning"); assertEquals(6, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isUpdateStatus()); @@ -1365,7 +1356,7 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E assertThat(appCluster.getStatus().getConditions()).isNotNull(); assertThat(appCluster.getStatus().getConditions()) .extracting("message") - .contains(org.apache.flink.api.common.JobStatus.RUNNING.name()); + .contains("JobRunning"); // Validate job status JobStatus jobStatus = appCluster.getStatus().getJobStatus(); From 32294f4ecd914a5362347704fc04a3e48ef8f9fb Mon Sep 17 00:00:00 2001 From: Lajith Date: Fri, 28 Mar 2025 12:09:09 +0530 Subject: [PATCH 07/17] [FLINK-33634] Add Conditions to Flink CRD's Status field --- .../kubernetes/operator/api/status/FlinkDeploymentStatus.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java index fe40c39916..b05720c788 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java @@ -189,10 +189,12 @@ public String getPhase() { phase = "Running"; break; case MISSING: - case ERROR: case DEPLOYING: phase = "Pending"; break; + case ERROR: + phase = "Failed"; + break; } } else if (getJobStatus() != null && getJobStatus().getState() != null) { // populate phase for ApplicationMode deployment From 7b6d6a13308195452c61fe9d509d56163c3cc5e8 Mon Sep 17 00:00:00 2001 From: Lajith Date: Mon, 12 May 2025 10:30:32 +0530 Subject: [PATCH 08/17] [FLINK-33634] Add Conditions to Flink CRD's Status field --- .../api/status/FlinkDeploymentStatus.java | 89 ++++++------------- .../operator/api/utils/ConditionUtils.java | 87 +++++++++--------- .../FlinkDeploymentControllerTest.java | 77 +++++----------- .../flinkdeployments.flink.apache.org-v1.yml | 2 - 4 files changed, 91 insertions(+), 164 deletions(-) diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java index b05720c788..4d02a0b79b 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java @@ -35,6 +35,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.kubernetes.operator.api.utils.ConditionUtils.CONDITION_TYPE_RUNNING; /** Last observed status of the Flink deployment. */ @Experimental @@ -64,8 +67,6 @@ public class FlinkDeploymentStatus extends CommonStatus { /** Condition of the CR . */ private List conditions = new ArrayList<>(); - private String phase; - public List getConditions() { if (reconciliationStatus != null && reconciliationStatus.deserializeLastReconciledSpec() != null @@ -179,69 +180,29 @@ public List getConditions() { return conditions; } - public String getPhase() { - if (reconciliationStatus != null - && reconciliationStatus.deserializeLastReconciledSpec() != null - && reconciliationStatus.deserializeLastReconciledSpec().getJob() == null) { - // populate phase for SessionMode deployment - switch (jobManagerDeploymentStatus) { - case READY: - phase = "Running"; - break; - case MISSING: - case DEPLOYING: - phase = "Pending"; - break; - case ERROR: - phase = "Failed"; - break; - } - } else if (getJobStatus() != null && getJobStatus().getState() != null) { - // populate phase for ApplicationMode deployment - switch (getJobStatus().getState()) { - case RECONCILING: - phase = "Pending"; - break; - case CREATED: - phase = JobStatus.CREATED.name(); - break; - case RUNNING: - phase = JobStatus.RUNNING.name(); - break; - case FAILING: - phase = JobStatus.FAILING.name(); - break; - case RESTARTING: - phase = JobStatus.RESTARTING.name(); - break; - case FAILED: - phase = JobStatus.FAILED.name(); - break; - case FINISHED: - phase = JobStatus.FINISHED.name(); - break; - case CANCELED: - phase = JobStatus.CANCELED.name(); - break; - case SUSPENDED: - phase = JobStatus.SUSPENDED.name(); - break; + private static void updateCondition(List conditions, Condition newCondition) { + if (newCondition.getType().equals(CONDITION_TYPE_RUNNING)) { + Optional existingCondition = + conditions.stream() + .filter( + c -> + c.getType().equals(CONDITION_TYPE_RUNNING) + && c.getReason() + .equals(newCondition.getReason()) + && c.getMessage() + .equals(newCondition.getMessage())) + .findFirst(); + // Until there is a condition change which reflects the latest state, no need to add condition to list. + if (existingCondition.isPresent()) { + return; } + // Remove existing Condition with type running and then add a new condition that reflects the current state. + conditions.removeIf( + c -> + c.getType().equals(CONDITION_TYPE_RUNNING) + && !c.getMessage().equals(newCondition.getMessage()) + && !c.getReason().equals(newCondition.getReason())); } - return phase; - } - - private static void updateCondition(List conditions, Condition condition) { - if (conditions.isEmpty()) { - conditions.add(condition); - return; - } - // If new condition is same as last condition, ignore - Condition existingCondition = conditions.get(conditions.size() - 1); - if (existingCondition.getType().equals(condition.getType()) - && existingCondition.getMessage().equals(condition.getMessage())) { - return; - } - conditions.add(condition); + conditions.add(newCondition); } } diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java index f4f08bcbfc..9131e1330e 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java @@ -29,6 +29,8 @@ /** Creates a condition object with the type, status, message and reason. */ public class ConditionUtils { + public static final String CONDITION_TYPE_RUNNING = "Running"; + public static Condition crCondition(Condition condition) { return new ConditionBuilder(condition) .withLastTransitionTime( @@ -40,104 +42,105 @@ public static Condition crCondition(Condition condition) { Map.of( JobManagerDeploymentStatus.READY.name(), new ConditionBuilder() - .withType("Running") + .withType(CONDITION_TYPE_RUNNING) .withStatus("True") - .withMessage("Ready") - .withReason("JobManager is running and ready to receive REST API calls") + .withReason("Ready") + .withMessage( + "JobManager is running and ready to receive REST API calls") .build(), JobManagerDeploymentStatus.MISSING.name(), new ConditionBuilder() - .withType("Running") + .withType(CONDITION_TYPE_RUNNING) .withStatus("False") - .withMessage("Missing") - .withReason("JobManager deployment not found") + .withReason("Missing") + .withMessage("JobManager deployment not found") .build(), JobManagerDeploymentStatus.DEPLOYING.name(), new ConditionBuilder() - .withType("Running") + .withType(CONDITION_TYPE_RUNNING) .withStatus("False") - .withMessage("Deploying") - .withReason("JobManager process is starting up") + .withReason("Deploying") + .withMessage("JobManager process is starting up") .build(), JobManagerDeploymentStatus.DEPLOYED_NOT_READY.name(), new ConditionBuilder() - .withType("Running") + .withType(CONDITION_TYPE_RUNNING) .withStatus("False") - .withMessage("DeployedNotReady") - .withReason( + .withReason("DeployedNotReady") + .withMessage( "JobManager is running but not ready yet to receive REST API calls") .build(), JobManagerDeploymentStatus.ERROR.name(), new ConditionBuilder() - .withType("Running") + .withType(CONDITION_TYPE_RUNNING) .withStatus("False") - .withMessage("Error") - .withReason("JobManager deployment failed") + .withReason("Error") + .withMessage("JobManager deployment failed") .build()); public static final Map APPLICATION_MODE_CONDITION = Map.of( JobStatus.RECONCILING.name(), new ConditionBuilder() - .withType("Running") + .withType(CONDITION_TYPE_RUNNING) .withStatus("False") - .withMessage("Reconciling") - .withReason("Job is currently reconciling") + .withReason("Reconciling") + .withMessage("Job is currently reconciling") .build(), JobStatus.CREATED.name(), new ConditionBuilder() - .withType("Running") + .withType(CONDITION_TYPE_RUNNING) .withStatus("False") - .withMessage("JobCreated") - .withReason("Job is created") + .withReason("JobCreated") + .withMessage("Job is created") .build(), JobStatus.RUNNING.name(), new ConditionBuilder() - .withType("Running") + .withType(CONDITION_TYPE_RUNNING) .withStatus("True") - .withMessage("JobRunning") - .withReason("Job is running") + .withReason("JobRunning") + .withMessage("Job is running") .build(), JobStatus.FAILING.name(), new ConditionBuilder() - .withType("Running") + .withType(CONDITION_TYPE_RUNNING) .withStatus("False") - .withMessage("JobFailing") - .withReason("Job has failed") + .withReason("JobFailing") + .withMessage("Job has failed") .build(), JobStatus.RESTARTING.name(), new ConditionBuilder() - .withType("Running") + .withType(CONDITION_TYPE_RUNNING) .withStatus("False") - .withMessage("JobRestarting") - .withReason("The job is currently restarting") + .withReason("JobRestarting") + .withMessage("The job is currently restarting") .build(), JobStatus.FAILED.name(), new ConditionBuilder() - .withType("Running") + .withType(CONDITION_TYPE_RUNNING) .withStatus("False") - .withMessage("JobFailed") - .withReason("The job has failed with a non-recoverable task failure") + .withReason("JobFailed") + .withMessage("The job has failed with a non-recoverable task failure") .build(), JobStatus.FINISHED.name(), new ConditionBuilder() - .withType("Running") + .withType(CONDITION_TYPE_RUNNING) .withStatus("False") - .withMessage("JobFinished") - .withReason("Job's tasks have successfully finished") + .withReason("JobFinished") + .withMessage("Job's tasks have successfully finished") .build(), JobStatus.CANCELED.name(), new ConditionBuilder() - .withType("Running") + .withType(CONDITION_TYPE_RUNNING) .withStatus("False") - .withMessage("JobCancelled") - .withReason("Job has been cancelled") + .withReason("JobCancelled") + .withMessage("Job has been cancelled") .build(), JobStatus.SUSPENDED.name(), new ConditionBuilder() - .withType("Running") + .withType(CONDITION_TYPE_RUNNING) .withStatus("False") - .withMessage("JobSuspended") - .withReason("The job has been suspended") + .withReason("JobSuspended") + .withMessage("The job has been suspended") .build()); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 586423def1..c12904bf94 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -282,11 +282,10 @@ public void verifyFailedDeployment() throws Exception { assertNotNull(appCluster.getStatus().getError()); // Validate status conditions - assertEquals("Pending", appCluster.getStatus().getPhase()); assertThat(appCluster.getStatus().getConditions()).isNotNull(); assertThat(appCluster.getStatus().getConditions()) .hasSize(1) - .extracting("message") + .extracting("reason") .contains("Reconciling"); // next cycle should not create another event @@ -374,11 +373,10 @@ public void verifyInProgressDeploymentWithError(String reason) throws Exception appCluster.getStatus().getJobStatus().getState()); // Validate status conditions - assertEquals("Pending", appCluster.getStatus().getPhase()); assertThat(appCluster.getStatus().getConditions()).isNotNull(); assertThat(appCluster.getStatus().getConditions()) .hasSize(1) - .extracting("message") + .extracting("reason") .contains("Reconciling"); // Validate status status @@ -466,12 +464,9 @@ public void verifyUpgradeFromSavepointLegacyMode(FlinkVersion flinkVersion) thro "savepoint_1", appCluster.getStatus().getJobStatus().getUpgradeSavepointPath()); // Validate status conditions - assertEquals( - org.apache.flink.api.common.JobStatus.FINISHED.name(), - appCluster.getStatus().getPhase()); assertThat(appCluster.getStatus().getConditions()).isNotNull(); assertThat(appCluster.getStatus().getConditions()) - .extracting("message") + .extracting("reason") .contains("JobFinished"); // Resume from last savepoint @@ -683,9 +678,6 @@ public void verifyReconcileWithBadConfig() throws Exception { JobManagerDeploymentStatus.DEPLOYING, appCluster.getStatus().getJobManagerDeploymentStatus()); - // Validate status conditions - assertEquals("Pending", appCluster.getStatus().getPhase()); - // Check when the bad config is applied, observe() will change the cluster state correctly appCluster.getSpec().getJobManager().setReplicas(-1); // Next reconcile will set error msg and observe with previous validated config @@ -700,9 +692,6 @@ public void verifyReconcileWithBadConfig() throws Exception { JobManagerDeploymentStatus.DEPLOYED_NOT_READY, appCluster.getStatus().getJobManagerDeploymentStatus()); - // Validate status conditions - assertEquals("Pending", appCluster.getStatus().getPhase()); - // Make sure we do validation before getting effective config in reconcile(). appCluster.getSpec().getJobManager().setReplicas(1); appCluster.getSpec().getJob().setParallelism(0); @@ -715,10 +704,6 @@ public void verifyReconcileWithBadConfig() throws Exception { assertEquals( JobManagerDeploymentStatus.READY, appCluster.getStatus().getJobManagerDeploymentStatus()); - - assertEquals( - org.apache.flink.api.common.JobStatus.RUNNING.name(), - appCluster.getStatus().getPhase()); } @Test @@ -734,10 +719,9 @@ public void verifyReconcileWithAChangedOperatorModeToSession() throws Exception appCluster.getStatus().getJobManagerDeploymentStatus()); // Validate status conditions - assertEquals("Pending", appCluster.getStatus().getPhase()); assertThat(appCluster.getStatus().getConditions()).isNotNull(); assertThat(appCluster.getStatus().getConditions()) - .extracting("message") + .extracting("reason") .contains("Reconciling"); updateControl = testController.reconcile(appCluster, context); @@ -750,10 +734,9 @@ public void verifyReconcileWithAChangedOperatorModeToSession() throws Exception assertEquals(org.apache.flink.api.common.JobStatus.RECONCILING, jobStatus.getState()); // Validate status conditions - assertEquals("Pending", appCluster.getStatus().getPhase()); assertThat(appCluster.getStatus().getConditions()).isNotNull(); assertThat(appCluster.getStatus().getConditions()) - .extracting("message") + .extracting("reason") .contains("Reconciling"); // Switches operator mode to SESSION @@ -779,13 +762,10 @@ public void verifyReconcileWithAChangedOperatorModeToSession() throws Exception assertEquals(expectedJobStatus.getJobState(), jobStatus.getState()); // Validate status conditions - assertEquals( - org.apache.flink.api.common.JobStatus.RUNNING.name(), - appCluster.getStatus().getPhase()); assertThat(appCluster.getStatus().getConditions()).isNotNull(); assertThat(appCluster.getStatus().getConditions()) - .extracting("message") - .contains("Reconciling", "JobRunning"); + .extracting("reason") + .contains("JobRunning"); } @Test @@ -801,12 +781,11 @@ public void verifyReconcileWithAChangedOperatorModeToApplication() throws Except appCluster.getStatus().getJobManagerDeploymentStatus()); // Validate status conditions - assertEquals("Pending", appCluster.getStatus().getPhase()); assertThat(appCluster.getStatus().getConditions()).isNotNull(); assertThat(appCluster.getStatus().getConditions()) - .hasSize(2) - .extracting("message") - .contains("Missing", "Deploying"); + .hasSize(1) + .extracting("reason") + .contains("Deploying"); updateControl = testController.reconcile(appCluster, context); JobStatus jobStatus = appCluster.getStatus().getJobStatus(); @@ -816,12 +795,11 @@ public void verifyReconcileWithAChangedOperatorModeToApplication() throws Except appCluster.getStatus().getJobManagerDeploymentStatus()); // Validate status conditions - assertEquals("Pending", appCluster.getStatus().getPhase()); assertThat(appCluster.getStatus().getConditions()).isNotNull(); assertThat(appCluster.getStatus().getConditions()) - .hasSize(3) - .extracting("message") - .contains("Missing", "Deploying", "DeployedNotReady"); + .hasSize(1) + .extracting("reason") + .contains("DeployedNotReady"); // jobStatus has not been set at this time assertNull(jobStatus.getState()); @@ -841,12 +819,11 @@ public void verifyReconcileWithAChangedOperatorModeToApplication() throws Except assertNull(ReconciliationUtils.getDeployedSpec(appCluster).getJob()); // Validate status conditions - assertEquals("Running", appCluster.getStatus().getPhase()); assertThat(appCluster.getStatus().getConditions()).isNotNull(); assertThat(appCluster.getStatus().getConditions()) - .hasSize(4) - .extracting("message") - .contains("Missing", "Deploying", "DeployedNotReady", "Ready"); + .hasSize(1) + .extracting("reason") + .contains("Ready"); } private void testUpgradeNotReadyCluster(FlinkDeployment appCluster) throws Exception { @@ -994,12 +971,9 @@ private void testUpgradeNotReadyCluster(FlinkDeployment appCluster) throws Excep appCluster.getStatus().getJobManagerDeploymentStatus()); // Validate status conditions - assertEquals( - org.apache.flink.api.common.JobStatus.RUNNING.name(), - appCluster.getStatus().getPhase()); assertThat(appCluster.getStatus().getConditions()).isNotNull(); assertThat(appCluster.getStatus().getConditions()) - .extracting("message") + .extracting("reason") .contains("JobRunning"); } @@ -1249,7 +1223,6 @@ private void verifyReconcileInitialSuspendedDeployment(FlinkDeployment appCluste assertNull(reconciliationStatus.getLastStableSpec()); // Validate status conditions - assertNull(appCluster.getStatus().getPhase()); assertThat(appCluster.getStatus().getConditions()).isEmpty(); } @@ -1265,11 +1238,10 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E assertEquals(4, testController.getInternalStatusUpdateCount()); // Validate status conditions - assertEquals("Pending", appCluster.getStatus().getPhase()); assertThat(appCluster.getStatus().getConditions()).isNotNull(); assertThat(appCluster.getStatus().getConditions()) .hasSize(1) - .extracting("message") + .extracting("reason") .contains("Reconciling"); assertFalse(updateControl.isUpdateStatus()); @@ -1297,11 +1269,10 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E assertEquals(5, testController.getInternalStatusUpdateCount()); // Validate status conditions - assertEquals("Pending", appCluster.getStatus().getPhase()); assertThat(appCluster.getStatus().getConditions()).isNotNull(); assertThat(appCluster.getStatus().getConditions()) .hasSize(1) - .extracting("message") + .extracting("reason") .contains("Reconciling"); assertFalse(updateControl.isUpdateStatus()); @@ -1319,12 +1290,9 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E appCluster.getStatus().getJobStatus().getState()); // Validate status conditions - assertEquals( - org.apache.flink.api.common.JobStatus.RUNNING.name(), - appCluster.getStatus().getPhase()); assertThat(appCluster.getStatus().getConditions()).isNotNull(); assertThat(appCluster.getStatus().getConditions()) - .extracting("message") + .extracting("reason") .contains("JobRunning"); assertEquals(6, testController.getInternalStatusUpdateCount()); @@ -1350,12 +1318,9 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E updateControl.getScheduleDelay()); // Validate status conditions - assertEquals( - org.apache.flink.api.common.JobStatus.RUNNING.name(), - appCluster.getStatus().getPhase()); assertThat(appCluster.getStatus().getConditions()).isNotNull(); assertThat(appCluster.getStatus().getConditions()) - .extracting("message") + .extracting("reason") .contains("JobRunning"); // Validate job status diff --git a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml index 3b0e143b6f..8230132547 100644 --- a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml +++ b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml @@ -10408,8 +10408,6 @@ spec: type: string observedGeneration: type: integer - phase: - type: string reconciliationStatus: properties: lastReconciledSpec: From 4f6679bfdf7335c2dc05a1dcb5a4e16a006e24b3 Mon Sep 17 00:00:00 2001 From: Lajith Date: Mon, 12 May 2025 22:38:11 +0530 Subject: [PATCH 09/17] [FLINK-33634] Add Conditions to Flink CRD's Status field --- .../operator/api/status/FlinkDeploymentStatus.java | 10 +++++----- .../controller/FlinkDeploymentControllerTest.java | 5 ++++- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java index 4d02a0b79b..560635b544 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java @@ -68,9 +68,7 @@ public class FlinkDeploymentStatus extends CommonStatus { private List conditions = new ArrayList<>(); public List getConditions() { - if (reconciliationStatus != null - && reconciliationStatus.deserializeLastReconciledSpec() != null - && reconciliationStatus.deserializeLastReconciledSpec().getJob() == null) { + if (getJobStatus() != null && getJobStatus().getState() == null) { // Populate conditions for SessionMode deployment switch (jobManagerDeploymentStatus) { case READY: @@ -192,11 +190,13 @@ private static void updateCondition(List conditions, Condition newCon && c.getMessage() .equals(newCondition.getMessage())) .findFirst(); - // Until there is a condition change which reflects the latest state, no need to add condition to list. + // Until there is a condition change which reflects the latest state, no need to add + // condition to list. if (existingCondition.isPresent()) { return; } - // Remove existing Condition with type running and then add a new condition that reflects the current state. + // Remove existing Condition with type running and then add a new condition that + // reflects the current state. conditions.removeIf( c -> c.getType().equals(CONDITION_TYPE_RUNNING) diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index c12904bf94..06911b4a5c 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -1223,7 +1223,10 @@ private void verifyReconcileInitialSuspendedDeployment(FlinkDeployment appCluste assertNull(reconciliationStatus.getLastStableSpec()); // Validate status conditions - assertThat(appCluster.getStatus().getConditions()).isEmpty(); + assertThat(appCluster.getStatus().getConditions()) + .hasSize(1) + .extracting("message") + .contains("JobManager deployment not found"); } private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws Exception { From 93be98a3d616d497c9c7e49a4c46501813d6db83 Mon Sep 17 00:00:00 2001 From: Lajith Date: Wed, 14 May 2025 12:17:55 +0530 Subject: [PATCH 10/17] [FLINK-33634] Add Conditions to Flink CRD's Status field --- .../api/status/FlinkDeploymentStatus.java | 113 ++---------------- .../operator/api/utils/ConditionUtils.java | 2 +- .../FlinkDeploymentControllerTest.java | 2 +- 3 files changed, 12 insertions(+), 105 deletions(-) diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java index 560635b544..7301215199 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java @@ -18,7 +18,6 @@ package org.apache.flink.kubernetes.operator.api.status; import org.apache.flink.annotation.Experimental; -import org.apache.flink.api.common.JobStatus; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; import org.apache.flink.kubernetes.operator.api.utils.ConditionUtils; @@ -70,110 +69,18 @@ public class FlinkDeploymentStatus extends CommonStatus { public List getConditions() { if (getJobStatus() != null && getJobStatus().getState() == null) { // Populate conditions for SessionMode deployment - switch (jobManagerDeploymentStatus) { - case READY: - updateCondition( - conditions, - ConditionUtils.crCondition( - ConditionUtils.SESSION_MODE_CONDITION.get( - JobManagerDeploymentStatus.READY.name()))); - break; - case MISSING: - updateCondition( - conditions, - ConditionUtils.crCondition( - ConditionUtils.SESSION_MODE_CONDITION.get( - JobManagerDeploymentStatus.MISSING.name()))); - break; - case DEPLOYING: - updateCondition( - conditions, - ConditionUtils.crCondition( - ConditionUtils.SESSION_MODE_CONDITION.get( - JobManagerDeploymentStatus.DEPLOYING.name()))); - break; - case DEPLOYED_NOT_READY: - updateCondition( - conditions, - ConditionUtils.crCondition( - ConditionUtils.SESSION_MODE_CONDITION.get( - JobManagerDeploymentStatus.DEPLOYED_NOT_READY.name()))); - break; - case ERROR: - updateCondition( - conditions, - ConditionUtils.crCondition( - ConditionUtils.SESSION_MODE_CONDITION.get( - JobManagerDeploymentStatus.ERROR.name()))); - } + updateCondition( + conditions, + ConditionUtils.crCondition( + ConditionUtils.SESSION_MODE_CONDITION.get( + jobManagerDeploymentStatus.name()))); } else if (getJobStatus() != null && getJobStatus().getState() != null) { // Populate conditions for ApplicationMode deployment - switch (getJobStatus().getState()) { - case RECONCILING: - updateCondition( - conditions, - ConditionUtils.crCondition( - ConditionUtils.APPLICATION_MODE_CONDITION.get( - JobStatus.RECONCILING.name()))); - break; - case CREATED: - updateCondition( - conditions, - ConditionUtils.crCondition( - ConditionUtils.APPLICATION_MODE_CONDITION.get( - JobStatus.CREATED.name()))); - break; - case RUNNING: - updateCondition( - conditions, - ConditionUtils.crCondition( - ConditionUtils.APPLICATION_MODE_CONDITION.get( - JobStatus.RUNNING.name()))); - break; - case FAILING: - updateCondition( - conditions, - ConditionUtils.crCondition( - ConditionUtils.APPLICATION_MODE_CONDITION.get( - JobStatus.FAILING.name()))); - break; - case RESTARTING: - updateCondition( - conditions, - ConditionUtils.crCondition( - ConditionUtils.APPLICATION_MODE_CONDITION.get( - JobStatus.RESTARTING.name()))); - break; - case FAILED: - updateCondition( - conditions, - ConditionUtils.crCondition( - ConditionUtils.APPLICATION_MODE_CONDITION.get( - JobStatus.FAILED.name()))); - break; - case FINISHED: - updateCondition( - conditions, - ConditionUtils.crCondition( - ConditionUtils.APPLICATION_MODE_CONDITION.get( - JobStatus.FINISHED.name()))); - break; - - case CANCELED: - updateCondition( - conditions, - ConditionUtils.crCondition( - ConditionUtils.APPLICATION_MODE_CONDITION.get( - JobStatus.CANCELED.name()))); - break; - case SUSPENDED: - updateCondition( - conditions, - ConditionUtils.crCondition( - ConditionUtils.APPLICATION_MODE_CONDITION.get( - JobStatus.SUSPENDED.name()))); - break; - } + updateCondition( + conditions, + ConditionUtils.crCondition( + ConditionUtils.APPLICATION_MODE_CONDITION.get( + getJobStatus().getState().name()))); } return conditions; } diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java index 9131e1330e..a9ac19e24b 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java @@ -44,7 +44,7 @@ public static Condition crCondition(Condition condition) { new ConditionBuilder() .withType(CONDITION_TYPE_RUNNING) .withStatus("True") - .withReason("Ready") + .withReason("JobManagerReady") .withMessage( "JobManager is running and ready to receive REST API calls") .build(), diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 06911b4a5c..6bfcb44edf 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -823,7 +823,7 @@ public void verifyReconcileWithAChangedOperatorModeToApplication() throws Except assertThat(appCluster.getStatus().getConditions()) .hasSize(1) .extracting("reason") - .contains("Ready"); + .contains("JobManagerReady"); } private void testUpgradeNotReadyCluster(FlinkDeployment appCluster) throws Exception { From 9faab16ec94df24bc5236300edf6a2abb41d9bb6 Mon Sep 17 00:00:00 2001 From: Lajith Date: Wed, 14 May 2025 13:43:46 +0530 Subject: [PATCH 11/17] [FLINK-33634] Add Conditions to Flink CRD's Status field --- .../api/status/FlinkDeploymentStatus.java | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java index 7301215199..12cf5fc9c7 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java @@ -18,6 +18,7 @@ package org.apache.flink.kubernetes.operator.api.status; import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; import org.apache.flink.kubernetes.operator.api.utils.ConditionUtils; @@ -67,20 +68,22 @@ public class FlinkDeploymentStatus extends CommonStatus { private List conditions = new ArrayList<>(); public List getConditions() { - if (getJobStatus() != null && getJobStatus().getState() == null) { - // Populate conditions for SessionMode deployment - updateCondition( - conditions, - ConditionUtils.crCondition( - ConditionUtils.SESSION_MODE_CONDITION.get( - jobManagerDeploymentStatus.name()))); - } else if (getJobStatus() != null && getJobStatus().getState() != null) { - // Populate conditions for ApplicationMode deployment - updateCondition( - conditions, - ConditionUtils.crCondition( - ConditionUtils.APPLICATION_MODE_CONDITION.get( - getJobStatus().getState().name()))); + if (getJobStatus() != null) { + JobStatus jobStatus = getJobStatus().getState(); + if (jobStatus == null) { + // Populate conditions for SessionMode deployment + updateCondition( + conditions, + ConditionUtils.crCondition( + ConditionUtils.SESSION_MODE_CONDITION.get( + jobManagerDeploymentStatus.name()))); + } else if (jobStatus != null) { + // Populate conditions for ApplicationMode deployment + updateCondition( + conditions, + ConditionUtils.crCondition( + ConditionUtils.APPLICATION_MODE_CONDITION.get(jobStatus.name()))); + } } return conditions; } From b40dcf3fa74110d8e9a04b3024e93b79ae3c8fc5 Mon Sep 17 00:00:00 2001 From: Lajith Date: Fri, 16 May 2025 16:16:37 +0530 Subject: [PATCH 12/17] [FLINK-33634] Add Conditions to Flink CRD's Status field --- .../api/status/FlinkDeploymentStatus.java | 53 +----- .../JobManagerDeploymentConditionStatus.java | 35 ++++ .../operator/api/utils/ConditionUtils.java | 165 ++++++++---------- .../FlinkDeploymentControllerTest.java | 20 +-- 4 files changed, 119 insertions(+), 154 deletions(-) create mode 100644 flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentConditionStatus.java diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java index 12cf5fc9c7..9ba4456d56 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java @@ -18,7 +18,6 @@ package org.apache.flink.kubernetes.operator.api.status; import org.apache.flink.annotation.Experimental; -import org.apache.flink.api.common.JobStatus; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; import org.apache.flink.kubernetes.operator.api.utils.ConditionUtils; @@ -35,9 +34,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; - -import static org.apache.flink.kubernetes.operator.api.utils.ConditionUtils.CONDITION_TYPE_RUNNING; /** Last observed status of the Flink deployment. */ @Experimental @@ -68,51 +64,8 @@ public class FlinkDeploymentStatus extends CommonStatus { private List conditions = new ArrayList<>(); public List getConditions() { - if (getJobStatus() != null) { - JobStatus jobStatus = getJobStatus().getState(); - if (jobStatus == null) { - // Populate conditions for SessionMode deployment - updateCondition( - conditions, - ConditionUtils.crCondition( - ConditionUtils.SESSION_MODE_CONDITION.get( - jobManagerDeploymentStatus.name()))); - } else if (jobStatus != null) { - // Populate conditions for ApplicationMode deployment - updateCondition( - conditions, - ConditionUtils.crCondition( - ConditionUtils.APPLICATION_MODE_CONDITION.get(jobStatus.name()))); - } - } - return conditions; - } - - private static void updateCondition(List conditions, Condition newCondition) { - if (newCondition.getType().equals(CONDITION_TYPE_RUNNING)) { - Optional existingCondition = - conditions.stream() - .filter( - c -> - c.getType().equals(CONDITION_TYPE_RUNNING) - && c.getReason() - .equals(newCondition.getReason()) - && c.getMessage() - .equals(newCondition.getMessage())) - .findFirst(); - // Until there is a condition change which reflects the latest state, no need to add - // condition to list. - if (existingCondition.isPresent()) { - return; - } - // Remove existing Condition with type running and then add a new condition that - // reflects the current state. - conditions.removeIf( - c -> - c.getType().equals(CONDITION_TYPE_RUNNING) - && !c.getMessage().equals(newCondition.getMessage()) - && !c.getReason().equals(newCondition.getReason())); - } - conditions.add(newCondition); + Condition condition = ConditionUtils.getCondition(this); + ConditionUtils.updateLastTransitionTime(conditions, condition); + return condition == null ? List.of() : List.of(condition); } } diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentConditionStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentConditionStatus.java new file mode 100644 index 0000000000..3403838ba8 --- /dev/null +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentConditionStatus.java @@ -0,0 +1,35 @@ +package org.apache.flink.kubernetes.operator.api.status; + +/** Condition Status of the Flink JobManager Kubernetes deployment. */ +public enum JobManagerDeploymentConditionStatus { + READY("True", "JobManagerReady", "JobManager is running and ready to receive REST API calls"), + MISSING("False", "JobManagerDeploymentMissing", "JobManager deployment not found"), + DEPLOYING("False", "JobManagerIsDeploying", "JobManager process is starting up"), + DEPLOYED_NOT_READY( + "False", + "DeployedNotReady", + "JobManager is running but not ready yet to receive REST API calls"), + ERROR("False", "Error", "JobManager deployment failed"); + + private String status; + private String reason; + private String message; + + JobManagerDeploymentConditionStatus(String status, String reason, String message) { + this.status = status; + this.reason = reason; + this.message = message; + } + + public String getReason() { + return reason; + } + + public String getMessage() { + return message; + } + + public String getStatus() { + return status; + } +} diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java index a9ac19e24b..6d99564a85 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java @@ -18,6 +18,8 @@ package org.apache.flink.kubernetes.operator.api.utils; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; +import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentConditionStatus; import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; import io.fabric8.kubernetes.api.model.Condition; @@ -25,122 +27,105 @@ import java.text.SimpleDateFormat; import java.util.Date; +import java.util.List; import java.util.Map; +import static org.apache.flink.api.common.JobStatus.RUNNING; + /** Creates a condition object with the type, status, message and reason. */ public class ConditionUtils { public static final String CONDITION_TYPE_RUNNING = "Running"; - - public static Condition crCondition(Condition condition) { - return new ConditionBuilder(condition) - .withLastTransitionTime( - new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'").format(new Date())) - .build(); - } - - public static final Map SESSION_MODE_CONDITION = + private static final Map SESSION_MODE_CONDITION = Map.of( JobManagerDeploymentStatus.READY.name(), new ConditionBuilder() .withType(CONDITION_TYPE_RUNNING) - .withStatus("True") - .withReason("JobManagerReady") - .withMessage( - "JobManager is running and ready to receive REST API calls") + .withStatus(JobManagerDeploymentConditionStatus.READY.getStatus()) + .withReason(JobManagerDeploymentConditionStatus.READY.getReason()) + .withMessage(JobManagerDeploymentConditionStatus.READY.getMessage()) .build(), JobManagerDeploymentStatus.MISSING.name(), new ConditionBuilder() .withType(CONDITION_TYPE_RUNNING) - .withStatus("False") - .withReason("Missing") - .withMessage("JobManager deployment not found") + .withStatus(JobManagerDeploymentConditionStatus.MISSING.getStatus()) + .withReason(JobManagerDeploymentConditionStatus.MISSING.getReason()) + .withMessage(JobManagerDeploymentConditionStatus.MISSING.getMessage()) .build(), JobManagerDeploymentStatus.DEPLOYING.name(), new ConditionBuilder() .withType(CONDITION_TYPE_RUNNING) - .withStatus("False") - .withReason("Deploying") - .withMessage("JobManager process is starting up") + .withStatus(JobManagerDeploymentConditionStatus.DEPLOYING.getStatus()) + .withReason(JobManagerDeploymentConditionStatus.DEPLOYING.getReason()) + .withMessage(JobManagerDeploymentConditionStatus.DEPLOYING.getMessage()) .build(), JobManagerDeploymentStatus.DEPLOYED_NOT_READY.name(), new ConditionBuilder() .withType(CONDITION_TYPE_RUNNING) - .withStatus("False") - .withReason("DeployedNotReady") + .withStatus( + JobManagerDeploymentConditionStatus.DEPLOYED_NOT_READY + .getStatus()) + .withReason( + JobManagerDeploymentConditionStatus.DEPLOYED_NOT_READY + .getReason()) .withMessage( - "JobManager is running but not ready yet to receive REST API calls") + JobManagerDeploymentConditionStatus.DEPLOYED_NOT_READY + .getMessage()) .build(), JobManagerDeploymentStatus.ERROR.name(), new ConditionBuilder() .withType(CONDITION_TYPE_RUNNING) - .withStatus("False") - .withReason("Error") - .withMessage("JobManager deployment failed") + .withStatus(JobManagerDeploymentConditionStatus.ERROR.getStatus()) + .withReason(JobManagerDeploymentConditionStatus.ERROR.getReason()) + .withMessage(JobManagerDeploymentConditionStatus.ERROR.getMessage()) .build()); - public static final Map APPLICATION_MODE_CONDITION = - Map.of( - JobStatus.RECONCILING.name(), - new ConditionBuilder() - .withType(CONDITION_TYPE_RUNNING) - .withStatus("False") - .withReason("Reconciling") - .withMessage("Job is currently reconciling") - .build(), - JobStatus.CREATED.name(), - new ConditionBuilder() - .withType(CONDITION_TYPE_RUNNING) - .withStatus("False") - .withReason("JobCreated") - .withMessage("Job is created") - .build(), - JobStatus.RUNNING.name(), - new ConditionBuilder() - .withType(CONDITION_TYPE_RUNNING) - .withStatus("True") - .withReason("JobRunning") - .withMessage("Job is running") - .build(), - JobStatus.FAILING.name(), - new ConditionBuilder() - .withType(CONDITION_TYPE_RUNNING) - .withStatus("False") - .withReason("JobFailing") - .withMessage("Job has failed") - .build(), - JobStatus.RESTARTING.name(), - new ConditionBuilder() - .withType(CONDITION_TYPE_RUNNING) - .withStatus("False") - .withReason("JobRestarting") - .withMessage("The job is currently restarting") - .build(), - JobStatus.FAILED.name(), - new ConditionBuilder() - .withType(CONDITION_TYPE_RUNNING) - .withStatus("False") - .withReason("JobFailed") - .withMessage("The job has failed with a non-recoverable task failure") - .build(), - JobStatus.FINISHED.name(), - new ConditionBuilder() - .withType(CONDITION_TYPE_RUNNING) - .withStatus("False") - .withReason("JobFinished") - .withMessage("Job's tasks have successfully finished") - .build(), - JobStatus.CANCELED.name(), - new ConditionBuilder() - .withType(CONDITION_TYPE_RUNNING) - .withStatus("False") - .withReason("JobCancelled") - .withMessage("Job has been cancelled") - .build(), - JobStatus.SUSPENDED.name(), - new ConditionBuilder() - .withType(CONDITION_TYPE_RUNNING) - .withStatus("False") - .withReason("JobSuspended") - .withMessage("The job has been suspended") - .build()); + public static Condition getCondition(FlinkDeploymentStatus flinkDeploymentStatus) { + org.apache.flink.kubernetes.operator.api.status.JobStatus status = + flinkDeploymentStatus.getJobStatus(); + Condition conditionToAdd = null; + if (status != null) { + + JobStatus jobStatus = status.getState(); + + conditionToAdd = + jobStatus == null + ? SESSION_MODE_CONDITION.get( + flinkDeploymentStatus.getJobManagerDeploymentStatus().name()) + : getApplicationModeCondition(jobStatus); + } + + return conditionToAdd; + } + + public static void updateLastTransitionTime(List conditions, Condition condition) { + if (condition == null) { + return; + } + if (isLastTransactionTimeStampUpdateRequired(conditions, condition)) { + condition.setLastTransitionTime( + new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'").format(new Date())); + } else { + condition.setLastTransitionTime(conditions.get(0).getLastTransitionTime()); + } + } + + private static Condition getApplicationModeCondition(JobStatus jobStatus) { + return new ConditionBuilder() + .withType(CONDITION_TYPE_RUNNING) + .withStatus(jobStatus == RUNNING ? "True" : "False") + .withReason(toCameCase(jobStatus.name())) + .withMessage("Job state " + jobStatus.name()) + .build(); + } + + private static String toCameCase(String reason) { + reason = reason.toLowerCase(); + return reason.substring(0, 1).toUpperCase() + reason.substring(1); + } + + private static boolean isLastTransactionTimeStampUpdateRequired( + List conditions, Condition newCondition) { + return conditions.isEmpty() + || !conditions.get(0).getStatus().equals(newCondition.getStatus()); + } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 6bfcb44edf..aa39acb548 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -467,7 +467,7 @@ public void verifyUpgradeFromSavepointLegacyMode(FlinkVersion flinkVersion) thro assertThat(appCluster.getStatus().getConditions()).isNotNull(); assertThat(appCluster.getStatus().getConditions()) .extracting("reason") - .contains("JobFinished"); + .contains("Finished"); // Resume from last savepoint appCluster.getSpec().getJob().setState(JobState.RUNNING); @@ -763,9 +763,7 @@ public void verifyReconcileWithAChangedOperatorModeToSession() throws Exception // Validate status conditions assertThat(appCluster.getStatus().getConditions()).isNotNull(); - assertThat(appCluster.getStatus().getConditions()) - .extracting("reason") - .contains("JobRunning"); + assertThat(appCluster.getStatus().getConditions()).extracting("reason").contains("Running"); } @Test @@ -785,7 +783,7 @@ public void verifyReconcileWithAChangedOperatorModeToApplication() throws Except assertThat(appCluster.getStatus().getConditions()) .hasSize(1) .extracting("reason") - .contains("Deploying"); + .contains("JobManagerIsDeploying"); updateControl = testController.reconcile(appCluster, context); JobStatus jobStatus = appCluster.getStatus().getJobStatus(); @@ -972,9 +970,7 @@ private void testUpgradeNotReadyCluster(FlinkDeployment appCluster) throws Excep // Validate status conditions assertThat(appCluster.getStatus().getConditions()).isNotNull(); - assertThat(appCluster.getStatus().getConditions()) - .extracting("reason") - .contains("JobRunning"); + assertThat(appCluster.getStatus().getConditions()).extracting("reason").contains("Running"); } @Test @@ -1294,9 +1290,7 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E // Validate status conditions assertThat(appCluster.getStatus().getConditions()).isNotNull(); - assertThat(appCluster.getStatus().getConditions()) - .extracting("reason") - .contains("JobRunning"); + assertThat(appCluster.getStatus().getConditions()).extracting("reason").contains("Running"); assertEquals(6, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isUpdateStatus()); @@ -1322,9 +1316,7 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E // Validate status conditions assertThat(appCluster.getStatus().getConditions()).isNotNull(); - assertThat(appCluster.getStatus().getConditions()) - .extracting("reason") - .contains("JobRunning"); + assertThat(appCluster.getStatus().getConditions()).extracting("reason").contains("Running"); // Validate job status JobStatus jobStatus = appCluster.getStatus().getJobStatus(); From 70030db23d265ec2603b8af1bff935ff060e4a33 Mon Sep 17 00:00:00 2001 From: Lajith Date: Fri, 16 May 2025 16:33:47 +0530 Subject: [PATCH 13/17] [FLINK-33634] Add Conditions to Flink CRD's Status field --- .../api/status/JobManagerDeploymentConditionStatus.java | 2 +- .../flink/kubernetes/operator/api/utils/ConditionUtils.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentConditionStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentConditionStatus.java index 3403838ba8..c2bb6bc713 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentConditionStatus.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentConditionStatus.java @@ -8,7 +8,7 @@ public enum JobManagerDeploymentConditionStatus { DEPLOYED_NOT_READY( "False", "DeployedNotReady", - "JobManager is running but not ready yet to receive REST API calls"), + "JobManager is running but not yet ready to receive REST API calls"), ERROR("False", "Error", "JobManager deployment failed"); private String status; diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java index 6d99564a85..b3b06186ff 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java @@ -113,12 +113,12 @@ private static Condition getApplicationModeCondition(JobStatus jobStatus) { return new ConditionBuilder() .withType(CONDITION_TYPE_RUNNING) .withStatus(jobStatus == RUNNING ? "True" : "False") - .withReason(toCameCase(jobStatus.name())) + .withReason(toCamelCase(jobStatus.name())) .withMessage("Job state " + jobStatus.name()) .build(); } - private static String toCameCase(String reason) { + private static String toCamelCase(String reason) { reason = reason.toLowerCase(); return reason.substring(0, 1).toUpperCase() + reason.substring(1); } From 93110315904496e75e81d4d0ae0c324b4839168b Mon Sep 17 00:00:00 2001 From: Lajith Date: Sun, 18 May 2025 23:52:39 +0530 Subject: [PATCH 14/17] [FLINK-33634] Add Conditions to Flink CRD's Status field --- .../JobManagerDeploymentConditionStatus.java | 35 ---------- .../status/JobManagerDeploymentStatus.java | 28 ++++++-- .../operator/api/utils/ConditionUtils.java | 70 +++++-------------- 3 files changed, 41 insertions(+), 92 deletions(-) delete mode 100644 flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentConditionStatus.java diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentConditionStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentConditionStatus.java deleted file mode 100644 index c2bb6bc713..0000000000 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentConditionStatus.java +++ /dev/null @@ -1,35 +0,0 @@ -package org.apache.flink.kubernetes.operator.api.status; - -/** Condition Status of the Flink JobManager Kubernetes deployment. */ -public enum JobManagerDeploymentConditionStatus { - READY("True", "JobManagerReady", "JobManager is running and ready to receive REST API calls"), - MISSING("False", "JobManagerDeploymentMissing", "JobManager deployment not found"), - DEPLOYING("False", "JobManagerIsDeploying", "JobManager process is starting up"), - DEPLOYED_NOT_READY( - "False", - "DeployedNotReady", - "JobManager is running but not yet ready to receive REST API calls"), - ERROR("False", "Error", "JobManager deployment failed"); - - private String status; - private String reason; - private String message; - - JobManagerDeploymentConditionStatus(String status, String reason, String message) { - this.status = status; - this.reason = reason; - this.message = message; - } - - public String getReason() { - return reason; - } - - public String getMessage() { - return message; - } - - public String getStatus() { - return status; - } -} diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java index 54a0181bc0..faecf29f85 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java @@ -21,18 +21,36 @@ public enum JobManagerDeploymentStatus { /** JobManager is running and ready to receive REST API calls. */ - READY, + READY("JobManagerReady", "JobManager is running and ready to receive REST API calls"), /** JobManager is running but not ready yet to receive REST API calls. */ - DEPLOYED_NOT_READY, + DEPLOYED_NOT_READY( + "DeployedNotReady", + "JobManager is running but not yet ready to receive REST API calls"), /** JobManager process is starting up. */ - DEPLOYING, + DEPLOYING("JobManagerIsDeploying", "JobManager process is starting up"), /** JobManager deployment not found, probably not started or killed by user. */ // TODO: currently a mix of SUSPENDED and ERROR, needs cleanup - MISSING, + MISSING("JobManagerDeploymentMissing", "JobManager deployment not found"), /** Deployment in terminal error, requires spec change for reconciliation to continue. */ - ERROR; + ERROR("Error", "JobManager deployment failed"); + + private String reason; + private String message; + + JobManagerDeploymentStatus(String reason, String message) { + this.reason = reason; + this.message = message; + } + + public String getReason() { + return reason; + } + + public String getMessage() { + return message; + } } diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java index b3b06186ff..11b5573650 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java @@ -19,7 +19,6 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; -import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentConditionStatus; import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; import io.fabric8.kubernetes.api.model.Condition; @@ -28,56 +27,13 @@ import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; -import java.util.Map; import static org.apache.flink.api.common.JobStatus.RUNNING; +import static org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus.READY; /** Creates a condition object with the type, status, message and reason. */ public class ConditionUtils { public static final String CONDITION_TYPE_RUNNING = "Running"; - private static final Map SESSION_MODE_CONDITION = - Map.of( - JobManagerDeploymentStatus.READY.name(), - new ConditionBuilder() - .withType(CONDITION_TYPE_RUNNING) - .withStatus(JobManagerDeploymentConditionStatus.READY.getStatus()) - .withReason(JobManagerDeploymentConditionStatus.READY.getReason()) - .withMessage(JobManagerDeploymentConditionStatus.READY.getMessage()) - .build(), - JobManagerDeploymentStatus.MISSING.name(), - new ConditionBuilder() - .withType(CONDITION_TYPE_RUNNING) - .withStatus(JobManagerDeploymentConditionStatus.MISSING.getStatus()) - .withReason(JobManagerDeploymentConditionStatus.MISSING.getReason()) - .withMessage(JobManagerDeploymentConditionStatus.MISSING.getMessage()) - .build(), - JobManagerDeploymentStatus.DEPLOYING.name(), - new ConditionBuilder() - .withType(CONDITION_TYPE_RUNNING) - .withStatus(JobManagerDeploymentConditionStatus.DEPLOYING.getStatus()) - .withReason(JobManagerDeploymentConditionStatus.DEPLOYING.getReason()) - .withMessage(JobManagerDeploymentConditionStatus.DEPLOYING.getMessage()) - .build(), - JobManagerDeploymentStatus.DEPLOYED_NOT_READY.name(), - new ConditionBuilder() - .withType(CONDITION_TYPE_RUNNING) - .withStatus( - JobManagerDeploymentConditionStatus.DEPLOYED_NOT_READY - .getStatus()) - .withReason( - JobManagerDeploymentConditionStatus.DEPLOYED_NOT_READY - .getReason()) - .withMessage( - JobManagerDeploymentConditionStatus.DEPLOYED_NOT_READY - .getMessage()) - .build(), - JobManagerDeploymentStatus.ERROR.name(), - new ConditionBuilder() - .withType(CONDITION_TYPE_RUNNING) - .withStatus(JobManagerDeploymentConditionStatus.ERROR.getStatus()) - .withReason(JobManagerDeploymentConditionStatus.ERROR.getReason()) - .withMessage(JobManagerDeploymentConditionStatus.ERROR.getMessage()) - .build()); public static Condition getCondition(FlinkDeploymentStatus flinkDeploymentStatus) { org.apache.flink.kubernetes.operator.api.status.JobStatus status = @@ -89,8 +45,8 @@ public static Condition getCondition(FlinkDeploymentStatus flinkDeploymentStatus conditionToAdd = jobStatus == null - ? SESSION_MODE_CONDITION.get( - flinkDeploymentStatus.getJobManagerDeploymentStatus().name()) + ? getSessionModeCondition( + flinkDeploymentStatus.getJobManagerDeploymentStatus()) : getApplicationModeCondition(jobStatus); } @@ -101,11 +57,12 @@ public static void updateLastTransitionTime(List conditions, Conditio if (condition == null) { return; } - if (isLastTransactionTimeStampUpdateRequired(conditions, condition)) { + Condition existingCondition = conditions.isEmpty() ? null : conditions.get(0); + if (isLastTransactionTimeStampUpdateRequired(existingCondition, condition)) { condition.setLastTransitionTime( new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'").format(new Date())); } else { - condition.setLastTransitionTime(conditions.get(0).getLastTransitionTime()); + condition.setLastTransitionTime(existingCondition.getLastTransitionTime()); } } @@ -118,14 +75,23 @@ private static Condition getApplicationModeCondition(JobStatus jobStatus) { .build(); } + private static Condition getSessionModeCondition(JobManagerDeploymentStatus jmStatus) { + return new ConditionBuilder() + .withType(CONDITION_TYPE_RUNNING) + .withStatus(jmStatus == READY ? "True" : "False") + .withReason(jmStatus.getReason()) + .withMessage(jmStatus.getMessage()) + .build(); + } + private static String toCamelCase(String reason) { reason = reason.toLowerCase(); return reason.substring(0, 1).toUpperCase() + reason.substring(1); } private static boolean isLastTransactionTimeStampUpdateRequired( - List conditions, Condition newCondition) { - return conditions.isEmpty() - || !conditions.get(0).getStatus().equals(newCondition.getStatus()); + Condition existingCondition, Condition newCondition) { + return existingCondition == null + || !existingCondition.getStatus().equals(newCondition.getStatus()); } } From 00f5541d0602a60959212986a66cb5cec18618e0 Mon Sep 17 00:00:00 2001 From: Lajith Date: Thu, 9 Oct 2025 07:10:33 +0530 Subject: [PATCH 15/17] [FLINK-33634] Add Conditions to Flink CRD's Status field --- .../flink/kubernetes/operator/api}/Mode.java | 5 +- .../api/status/FlinkDeploymentStatus.java | 7 -- .../operator/api/utils/ConditionUtils.java | 87 ++++++++++++++----- .../controller/FlinkDeploymentController.java | 3 + .../FlinkDeploymentObserverFactory.java | 2 +- .../deployment/ReconcilerFactory.java | 2 +- .../service/StandaloneFlinkService.java | 2 +- .../service/StandaloneFlinkServiceTest.java | 2 +- 8 files changed, 76 insertions(+), 34 deletions(-) rename {flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config => flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api}/Mode.java (91%) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/Mode.java similarity index 91% rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java rename to flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/Mode.java index adb37e7c27..5f5392d581 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/Mode.java @@ -16,9 +16,8 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.config; +package org.apache.flink.kubernetes.operator.api; -import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; /** The mode of {@link FlinkDeployment}. */ @@ -45,7 +44,7 @@ public static Mode getMode(FlinkDeployment flinkApp) { : getMode(lastReconciledSpec); } - private static Mode getMode(FlinkDeploymentSpec spec) { + public static Mode getMode(FlinkDeploymentSpec spec) { return spec.getJob() != null ? APPLICATION : SESSION; } } diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java index 9ba4456d56..fd2780f37d 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java @@ -19,7 +19,6 @@ import org.apache.flink.annotation.Experimental; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; -import org.apache.flink.kubernetes.operator.api.utils.ConditionUtils; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import io.fabric8.kubernetes.api.model.Condition; @@ -62,10 +61,4 @@ public class FlinkDeploymentStatus extends CommonStatus { /** Condition of the CR . */ private List conditions = new ArrayList<>(); - - public List getConditions() { - Condition condition = ConditionUtils.getCondition(this); - ConditionUtils.updateLastTransitionTime(conditions, condition); - return condition == null ? List.of() : List.of(condition); - } } diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java index 11b5573650..6e1098a40d 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java @@ -18,6 +18,9 @@ package org.apache.flink.kubernetes.operator.api.utils; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.kubernetes.operator.api.Mode; +import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; +import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentReconciliationStatus; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; @@ -35,35 +38,48 @@ public class ConditionUtils { public static final String CONDITION_TYPE_RUNNING = "Running"; - public static Condition getCondition(FlinkDeploymentStatus flinkDeploymentStatus) { - org.apache.flink.kubernetes.operator.api.status.JobStatus status = - flinkDeploymentStatus.getJobStatus(); + /** + * Creates a List of Condition object based on the provided FlinkDeploymentStatus. + * + * @param flinkDeploymentStatus the FlinkDeploymentStatus object containing job status + * information + * @return a list of Condition object representing the current status of the Flink deployment + */ + public static List createConditionFromStatus( + FlinkDeploymentStatus flinkDeploymentStatus) { + + FlinkDeploymentReconciliationStatus reconciliationStatus = + flinkDeploymentStatus.getReconciliationStatus(); Condition conditionToAdd = null; - if (status != null) { - JobStatus jobStatus = status.getState(); + if (reconciliationStatus != null) { + FlinkDeploymentSpec deploymentSpec = + reconciliationStatus.deserializeLastReconciledSpec(); - conditionToAdd = - jobStatus == null - ? getSessionModeCondition( - flinkDeploymentStatus.getJobManagerDeploymentStatus()) - : getApplicationModeCondition(jobStatus); + if (deploymentSpec != null) { + switch (Mode.getMode(deploymentSpec)) { + case APPLICATION: + conditionToAdd = + getApplicationModeCondition( + flinkDeploymentStatus.getJobStatus().getState()); + break; + case SESSION: + conditionToAdd = + getSessionModeCondition( + flinkDeploymentStatus.getJobManagerDeploymentStatus()); + } + updateLastTransitionTime(flinkDeploymentStatus.getConditions(), conditionToAdd); + } } - - return conditionToAdd; + return conditionToAdd == null ? List.of() : List.of(conditionToAdd); } - public static void updateLastTransitionTime(List conditions, Condition condition) { + private static void updateLastTransitionTime(List conditions, Condition condition) { if (condition == null) { return; } Condition existingCondition = conditions.isEmpty() ? null : conditions.get(0); - if (isLastTransactionTimeStampUpdateRequired(existingCondition, condition)) { - condition.setLastTransitionTime( - new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'").format(new Date())); - } else { - condition.setLastTransitionTime(existingCondition.getLastTransitionTime()); - } + condition.setLastTransitionTime(getLastTransitionTimeStamp(existingCondition, condition)); } private static Condition getApplicationModeCondition(JobStatus jobStatus) { @@ -84,14 +100,45 @@ private static Condition getSessionModeCondition(JobManagerDeploymentStatus jmSt .build(); } + /** + * Reason in the condition object should be a CamelCase string, so need to convert JobStatus as + * all the keywords are one noun, so we only need to upper case the first letter. + * + * @return CamelCase reason as String + */ private static String toCamelCase(String reason) { reason = reason.toLowerCase(); return reason.substring(0, 1).toUpperCase() + reason.substring(1); } - private static boolean isLastTransactionTimeStampUpdateRequired( + private static boolean isLastTransitionTimeStampUpdateRequired( Condition existingCondition, Condition newCondition) { return existingCondition == null || !existingCondition.getStatus().equals(newCondition.getStatus()); } + + /** + * get the last transition time for the condition , returns the current time if there is no + * existing condition or if the condition status has changed, otherwise returns existing + * condition LastTransitionTime. + * + * @param existingCondition The current condition object, may be null. + * @param condition The new condition object to compare against the existing one. + * @return A string representing the last transition time in the format + * "yyyy-MM-dd'T'HH:mm:ss'Z'". Returns a new timestamp if the existing condition is null or + * the status has changed, otherwise returns the last transition time of the existing + * condition. + */ + private static String getLastTransitionTimeStamp( + Condition existingCondition, Condition condition) { + String lastTransitionTime; + if (existingCondition == null + || !existingCondition.getStatus().equals(condition.getStatus())) { + lastTransitionTime = + new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'").format(new Date()); + } else { + lastTransitionTime = existingCondition.getLastTransitionTime(); + } + return lastTransitionTime; + } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index 32a3109d12..2fa6cae1ec 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -23,6 +23,7 @@ import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; +import org.apache.flink.kubernetes.operator.api.utils.ConditionUtils; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException; import org.apache.flink.kubernetes.operator.exception.ReconciliationException; @@ -165,6 +166,8 @@ public UpdateControl reconcile(FlinkDeployment flinkApp, Contex throw new ReconciliationException(e); } + flinkApp.getStatus() + .setConditions(ConditionUtils.createConditionFromStatus(flinkApp.getStatus())); LOG.debug("End of reconciliation"); statusRecorder.patchAndCacheStatus(flinkApp, ctx.getKubernetesClient()); return ReconciliationUtils.toUpdateControl( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/FlinkDeploymentObserverFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/FlinkDeploymentObserverFactory.java index c5e1124184..6facfb16ec 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/FlinkDeploymentObserverFactory.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/FlinkDeploymentObserverFactory.java @@ -19,8 +19,8 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.Mode; import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode; -import org.apache.flink.kubernetes.operator.config.Mode; import org.apache.flink.kubernetes.operator.observer.Observer; import org.apache.flink.kubernetes.operator.utils.EventRecorder; diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java index 429caaa657..3a389a79e7 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java @@ -20,10 +20,10 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.autoscaler.JobAutoScaler; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.Mode; import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext; -import org.apache.flink.kubernetes.operator.config.Mode; import org.apache.flink.kubernetes.operator.reconciler.Reconciler; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.StatusRecorder; diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java index 7c483f7f42..ac3dab3382 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java @@ -27,10 +27,10 @@ import org.apache.flink.kubernetes.KubernetesClusterClientFactory; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.Mode; import org.apache.flink.kubernetes.operator.api.spec.JobSpec; import org.apache.flink.kubernetes.operator.artifact.ArtifactManager; import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; -import org.apache.flink.kubernetes.operator.config.Mode; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient; import org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient; diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java index fde2b12421..a871466bd0 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java @@ -24,11 +24,11 @@ import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.Mode; import org.apache.flink.kubernetes.operator.api.spec.JobSpec; import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode; import org.apache.flink.kubernetes.operator.artifact.ArtifactManager; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; -import org.apache.flink.kubernetes.operator.config.Mode; import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils; import org.apache.flink.util.concurrent.Executors; From b748ec71c65246cc694884e4a72a0d7afc77968f Mon Sep 17 00:00:00 2001 From: Lajith Date: Thu, 9 Oct 2025 11:31:40 +0530 Subject: [PATCH 16/17] [FLINK-33634] Add Conditions to Flink CRD's Status field --- .../FlinkDeploymentControllerTest.java | 112 ++++-------------- 1 file changed, 22 insertions(+), 90 deletions(-) diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 7c94b87984..65cdfaf050 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -130,7 +130,7 @@ public void verifyBasicReconcileLoop(FlinkVersion flinkVersion) throws Exception assertEquals( org.apache.flink.api.common.JobStatus.RUNNING, appCluster.getStatus().getJobStatus().getState()); - assertEquals(7, testController.getInternalStatusUpdateCount()); + assertEquals(8, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isPatchStatus()); FlinkDeploymentReconciliationStatus reconciliationStatus = @@ -277,16 +277,11 @@ public void verifyFailedDeployment() throws Exception { validatingResponseProvider.assertValidated(); + validateConditionStatus(appCluster, "Reconciling"); + // Validate status assertNotNull(appCluster.getStatus().getError()); - // Validate status conditions - assertThat(appCluster.getStatus().getConditions()).isNotNull(); - assertThat(appCluster.getStatus().getConditions()) - .hasSize(1) - .extracting("reason") - .contains("Reconciling"); - // next cycle should not create another event updateControl = testController.reconcile( @@ -366,13 +361,7 @@ public void verifyUpgradeFromSavepointLegacyMode(FlinkVersion flinkVersion) thro appCluster.getStatus().getJobManagerDeploymentStatus()); assertEquals( "savepoint_1", appCluster.getStatus().getJobStatus().getUpgradeSavepointPath()); - - // Validate status conditions - assertThat(appCluster.getStatus().getConditions()).isNotNull(); - assertThat(appCluster.getStatus().getConditions()) - .extracting("reason") - .contains("Finished"); - + validateConditionStatus(appCluster, "Finished"); // Resume from last savepoint appCluster.getSpec().getJob().setState(JobState.RUNNING); testController.reconcile(appCluster, context); @@ -622,12 +611,6 @@ public void verifyReconcileWithAChangedOperatorModeToSession() throws Exception JobManagerDeploymentStatus.DEPLOYING, appCluster.getStatus().getJobManagerDeploymentStatus()); - // Validate status conditions - assertThat(appCluster.getStatus().getConditions()).isNotNull(); - assertThat(appCluster.getStatus().getConditions()) - .extracting("reason") - .contains("Reconciling"); - updateControl = testController.reconcile(appCluster, context); JobStatus jobStatus = appCluster.getStatus().getJobStatus(); assertFalse(updateControl.isPatchStatus()); @@ -637,12 +620,7 @@ public void verifyReconcileWithAChangedOperatorModeToSession() throws Exception // jobStatus has not been set at this time assertEquals(org.apache.flink.api.common.JobStatus.RECONCILING, jobStatus.getState()); - // Validate status conditions - assertThat(appCluster.getStatus().getConditions()).isNotNull(); - assertThat(appCluster.getStatus().getConditions()) - .extracting("reason") - .contains("Reconciling"); - + validateConditionStatus(appCluster, "Reconciling"); // Switches operator mode to SESSION appCluster.getSpec().setJob(null); // Validation fails and JobObserver should still be used @@ -664,10 +642,7 @@ public void verifyReconcileWithAChangedOperatorModeToSession() throws Exception assertEquals(expectedJobStatus.getJobId().toHexString(), jobStatus.getJobId()); assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName()); assertEquals(expectedJobStatus.getJobState(), jobStatus.getState()); - - // Validate status conditions - assertThat(appCluster.getStatus().getConditions()).isNotNull(); - assertThat(appCluster.getStatus().getConditions()).extracting("reason").contains("Running"); + validateConditionStatus(appCluster, "Running"); } @Test @@ -681,13 +656,7 @@ public void verifyReconcileWithAChangedOperatorModeToApplication() throws Except assertEquals( JobManagerDeploymentStatus.DEPLOYING, appCluster.getStatus().getJobManagerDeploymentStatus()); - - // Validate status conditions - assertThat(appCluster.getStatus().getConditions()).isNotNull(); - assertThat(appCluster.getStatus().getConditions()) - .hasSize(1) - .extracting("reason") - .contains("JobManagerIsDeploying"); + validateConditionStatus(appCluster, "JobManagerIsDeploying"); updateControl = testController.reconcile(appCluster, context); JobStatus jobStatus = appCluster.getStatus().getJobStatus(); @@ -695,16 +664,11 @@ public void verifyReconcileWithAChangedOperatorModeToApplication() throws Except assertEquals( JobManagerDeploymentStatus.DEPLOYED_NOT_READY, appCluster.getStatus().getJobManagerDeploymentStatus()); - - // Validate status conditions - assertThat(appCluster.getStatus().getConditions()).isNotNull(); - assertThat(appCluster.getStatus().getConditions()) - .hasSize(1) - .extracting("reason") - .contains("DeployedNotReady"); // jobStatus has not been set at this time assertNull(jobStatus.getState()); + validateConditionStatus(appCluster, "DeployedNotReady"); + // Switches operator mode to APPLICATION appCluster.getSpec().setJob(TestUtils.buildSessionJob().getSpec().getJob()); // Validation fails and JobObserver should still be used @@ -720,12 +684,7 @@ public void verifyReconcileWithAChangedOperatorModeToApplication() throws Except .contains("Cannot switch from session to job cluster")); assertNull(ReconciliationUtils.getDeployedSpec(appCluster).getJob()); - // Validate status conditions - assertThat(appCluster.getStatus().getConditions()).isNotNull(); - assertThat(appCluster.getStatus().getConditions()) - .hasSize(1) - .extracting("reason") - .contains("JobManagerReady"); + validateConditionStatus(appCluster, "JobManagerReady"); } private void testUpgradeNotReadyCluster(FlinkDeployment appCluster) throws Exception { @@ -871,10 +830,6 @@ private void testUpgradeNotReadyCluster(FlinkDeployment appCluster) throws Excep assertEquals( JobManagerDeploymentStatus.READY, appCluster.getStatus().getJobManagerDeploymentStatus()); - - // Validate status conditions - assertThat(appCluster.getStatus().getConditions()).isNotNull(); - assertThat(appCluster.getStatus().getConditions()).extracting("reason").contains("Running"); } @Test @@ -1175,12 +1130,6 @@ private void verifyReconcileInitialSuspendedDeployment(FlinkDeployment appCluste assertNull(appCluster.getStatus().getError()); assertNull(reconciliationStatus.deserializeLastReconciledSpec()); assertNull(reconciliationStatus.getLastStableSpec()); - - // Validate status conditions - assertThat(appCluster.getStatus().getConditions()) - .hasSize(1) - .extracting("message") - .contains("JobManager deployment not found"); } private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws Exception { @@ -1193,15 +1142,7 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E org.apache.flink.api.common.JobStatus.RECONCILING, appCluster.getStatus().getJobStatus().getState()); assertEquals(4, testController.getInternalStatusUpdateCount()); - - // Validate status conditions - assertThat(appCluster.getStatus().getConditions()).isNotNull(); - assertThat(appCluster.getStatus().getConditions()) - .hasSize(1) - .extracting("reason") - .contains("Reconciling"); - - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchStatus()); assertEquals( Optional.of( configManager @@ -1224,15 +1165,7 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E org.apache.flink.api.common.JobStatus.RECONCILING, appCluster.getStatus().getJobStatus().getState()); assertEquals(5, testController.getInternalStatusUpdateCount()); - - // Validate status conditions - assertThat(appCluster.getStatus().getConditions()).isNotNull(); - assertThat(appCluster.getStatus().getConditions()) - .hasSize(1) - .extracting("reason") - .contains("Reconciling"); - - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchStatus()); assertEquals( Optional.of( configManager.getOperatorConfiguration().getRestApiReadyDelay().toMillis()), @@ -1245,12 +1178,7 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E assertEquals( org.apache.flink.api.common.JobStatus.RUNNING, appCluster.getStatus().getJobStatus().getState()); - - // Validate status conditions - assertThat(appCluster.getStatus().getConditions()).isNotNull(); - assertThat(appCluster.getStatus().getConditions()).extracting("reason").contains("Running"); - - assertEquals(6, testController.getInternalStatusUpdateCount()); + assertEquals(7, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isPatchStatus()); assertEquals( Optional.of( @@ -1265,17 +1193,13 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E assertEquals( org.apache.flink.api.common.JobStatus.RUNNING, appCluster.getStatus().getJobStatus().getState()); - assertEquals(6, testController.getInternalStatusUpdateCount()); + assertEquals(7, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isPatchStatus()); assertEquals( Optional.of( configManager.getOperatorConfiguration().getReconcileInterval().toMillis()), updateControl.getScheduleDelay()); - // Validate status conditions - assertThat(appCluster.getStatus().getConditions()).isNotNull(); - assertThat(appCluster.getStatus().getConditions()).extracting("reason").contains("Running"); - // Validate job status JobStatus jobStatus = appCluster.getStatus().getJobStatus(); JobStatusMessage expectedJobStatus = flinkService.listJobs().get(0).f1; @@ -1339,4 +1263,12 @@ private String getIngressHost(HasMetadata ingress) { return ingressRuleV1beta1.getHost(); } } + + private void validateConditionStatus(FlinkDeployment appCluster, String reason) { + assertThat(appCluster.getStatus().getConditions()).isNotNull(); + assertThat(appCluster.getStatus().getConditions()) + .hasSize(1) + .extracting("reason") + .contains(reason); + } } From 642e79903e6d186544c7492ff5b5b7637c26cda8 Mon Sep 17 00:00:00 2001 From: Lajith Date: Thu, 13 Nov 2025 13:40:32 +0530 Subject: [PATCH 17/17] [FLINK-33634] Add Conditions to Flink CRD's Status field --- .../operator/api/utils/ConditionUtils.java | 28 +- .../operator/api/utils/BaseTestUtils.java | 43 +++ .../api/utils/ConditionUtilsTest.java | 314 ++++++++++++++++++ 3 files changed, 369 insertions(+), 16 deletions(-) create mode 100644 flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtilsTest.java diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java index 6e1098a40d..8428b70a52 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java @@ -27,8 +27,7 @@ import io.fabric8.kubernetes.api.model.Condition; import io.fabric8.kubernetes.api.model.ConditionBuilder; -import java.text.SimpleDateFormat; -import java.util.Date; +import java.time.Instant; import java.util.List; import static org.apache.flink.api.common.JobStatus.RUNNING; @@ -78,7 +77,11 @@ private static void updateLastTransitionTime(List conditions, Conditi if (condition == null) { return; } - Condition existingCondition = conditions.isEmpty() ? null : conditions.get(0); + Condition existingCondition = + conditions.stream() + .filter(c -> c.getType().equals(condition.getType())) + .findFirst() + .orElse(null); condition.setLastTransitionTime(getLastTransitionTimeStamp(existingCondition, condition)); } @@ -87,7 +90,7 @@ private static Condition getApplicationModeCondition(JobStatus jobStatus) { .withType(CONDITION_TYPE_RUNNING) .withStatus(jobStatus == RUNNING ? "True" : "False") .withReason(toCamelCase(jobStatus.name())) - .withMessage("Job state " + jobStatus.name()) + .withMessage("Job status " + jobStatus.name()) .build(); } @@ -111,12 +114,6 @@ private static String toCamelCase(String reason) { return reason.substring(0, 1).toUpperCase() + reason.substring(1); } - private static boolean isLastTransitionTimeStampUpdateRequired( - Condition existingCondition, Condition newCondition) { - return existingCondition == null - || !existingCondition.getStatus().equals(newCondition.getStatus()); - } - /** * get the last transition time for the condition , returns the current time if there is no * existing condition or if the condition status has changed, otherwise returns existing @@ -124,18 +121,17 @@ private static boolean isLastTransitionTimeStampUpdateRequired( * * @param existingCondition The current condition object, may be null. * @param condition The new condition object to compare against the existing one. - * @return A string representing the last transition time in the format - * "yyyy-MM-dd'T'HH:mm:ss'Z'". Returns a new timestamp if the existing condition is null or - * the status has changed, otherwise returns the last transition time of the existing - * condition. + * @return A string representing the last transition time in ISO 8601 format with nanosecond + * precision (e.g., "2025-10-30T07:35:35.189752790Z"). Returns a new timestamp if the + * existing condition is null or the status has changed, otherwise returns the last + * transition time of the existing condition. */ private static String getLastTransitionTimeStamp( Condition existingCondition, Condition condition) { String lastTransitionTime; if (existingCondition == null || !existingCondition.getStatus().equals(condition.getStatus())) { - lastTransitionTime = - new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'").format(new Date()); + lastTransitionTime = Instant.now().toString(); } else { lastTransitionTime = existingCondition.getLastTransitionTime(); } diff --git a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java index f3bd54f2f2..5a47bbd207 100644 --- a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java +++ b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java @@ -17,6 +17,7 @@ package org.apache.flink.kubernetes.operator.api.utils; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.TaskManagerOptions; @@ -39,8 +40,10 @@ import org.apache.flink.kubernetes.operator.api.spec.TaskManagerSpec; import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.api.status.CheckpointType; +import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentReconciliationStatus; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus; +import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; @@ -49,6 +52,7 @@ import io.fabric8.kubernetes.api.model.PodTemplateSpec; import java.time.Instant; +import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -268,4 +272,43 @@ public static FlinkStateSnapshot buildFlinkStateSnapshotCheckpoint( return snapshot; } + + public static FlinkDeploymentStatus createApplicationModeStatus(JobStatus jobStatus) { + FlinkDeploymentStatus status = new FlinkDeploymentStatus(); + org.apache.flink.kubernetes.operator.api.status.JobStatus flinkJobStatus = + new org.apache.flink.kubernetes.operator.api.status.JobStatus(); + flinkJobStatus.setState(jobStatus); + status.setJobStatus(flinkJobStatus); + status.setConditions(new ArrayList<>()); + + FlinkDeploymentReconciliationStatus reconciliationStatus = + new FlinkDeploymentReconciliationStatus(); + + FlinkDeployment deployment = BaseTestUtils.buildApplicationCluster(); + String serializedSpec = SpecUtils.writeSpecWithMeta(deployment.getSpec(), deployment); + reconciliationStatus.setLastReconciledSpec(serializedSpec); + + status.setReconciliationStatus(reconciliationStatus); + status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY); + + return status; + } + + public static FlinkDeploymentStatus createSessionModeStatus( + JobManagerDeploymentStatus jmStatus) { + FlinkDeploymentStatus status = new FlinkDeploymentStatus(); + status.setJobManagerDeploymentStatus(jmStatus); + status.setConditions(new ArrayList<>()); + + FlinkDeploymentReconciliationStatus reconciliationStatus = + new FlinkDeploymentReconciliationStatus(); + + FlinkDeployment deployment = BaseTestUtils.buildSessionCluster(); + String serializedSpec = SpecUtils.writeSpecWithMeta(deployment.getSpec(), deployment); + reconciliationStatus.setLastReconciledSpec(serializedSpec); + + status.setReconciliationStatus(reconciliationStatus); + + return status; + } } diff --git a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtilsTest.java b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtilsTest.java new file mode 100644 index 0000000000..d510d121d1 --- /dev/null +++ b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtilsTest.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.api.utils; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentReconciliationStatus; +import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; +import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; + +import io.fabric8.kubernetes.api.model.Condition; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Test for {@link ConditionUtils}. */ +class ConditionUtilsTest { + + /** + * Helper method to get a condition by type from a list of conditions. + * + * @param conditions the list of conditions + * @param type the condition type to find + * @return the condition with the specified type, or null if not found + */ + private Condition getConditionByType(List conditions, String type) { + return conditions.stream().filter(c -> type.equals(c.getType())).findFirst().orElse(null); + } + + @Test + void testCreateConditionFromStatusWithNullReconciliationStatus() { + FlinkDeploymentStatus status = new FlinkDeploymentStatus(); + status.setReconciliationStatus(null); + + List conditions = ConditionUtils.createConditionFromStatus(status); + + assertTrue( + conditions.isEmpty(), + "Should return empty list when reconciliation status is null"); + } + + @Test + void testCreateConditionFromStatusWithNullDeploymentSpec() { + FlinkDeploymentStatus status = new FlinkDeploymentStatus(); + FlinkDeploymentReconciliationStatus reconciliationStatus = + new FlinkDeploymentReconciliationStatus(); + status.setReconciliationStatus(reconciliationStatus); + + List conditions = ConditionUtils.createConditionFromStatus(status); + + assertTrue(conditions.isEmpty(), "Should return empty list when deployment spec is null"); + } + + @Test + void testApplicationModeConditionWithRunningJob() { + FlinkDeploymentStatus status = BaseTestUtils.createApplicationModeStatus(JobStatus.RUNNING); + + List conditions = ConditionUtils.createConditionFromStatus(status); + + assertEquals(1, conditions.size(), "Should return one condition"); + Condition condition = getConditionByType(conditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(condition); + assertEquals(ConditionUtils.CONDITION_TYPE_RUNNING, condition.getType()); + assertEquals("True", condition.getStatus()); + assertEquals("Running", condition.getReason()); + assertEquals("Job status RUNNING", condition.getMessage()); + assertNotNull(condition.getLastTransitionTime()); + } + + @Test + void testApplicationModeConditionWithFailedJob() { + FlinkDeploymentStatus status = BaseTestUtils.createApplicationModeStatus(JobStatus.FAILED); + + List conditions = ConditionUtils.createConditionFromStatus(status); + + assertEquals(1, conditions.size()); + Condition condition = getConditionByType(conditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(condition); + assertEquals(ConditionUtils.CONDITION_TYPE_RUNNING, condition.getType()); + assertEquals("False", condition.getStatus()); + assertEquals("Failed", condition.getReason()); + assertEquals("Job status FAILED", condition.getMessage()); + assertNotNull(condition.getLastTransitionTime()); + } + + @Test + void testApplicationModeConditionWithCanceledJob() { + FlinkDeploymentStatus status = + BaseTestUtils.createApplicationModeStatus(JobStatus.CANCELED); + + List conditions = ConditionUtils.createConditionFromStatus(status); + + assertEquals(1, conditions.size()); + Condition condition = getConditionByType(conditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(condition); + assertEquals("False", condition.getStatus()); + assertEquals("Canceled", condition.getReason()); + assertEquals("Job status CANCELED", condition.getMessage()); + assertNotNull(condition.getLastTransitionTime()); + } + + @Test + void testApplicationModeConditionWithFinishedJob() { + FlinkDeploymentStatus status = + BaseTestUtils.createApplicationModeStatus(JobStatus.FINISHED); + + List conditions = ConditionUtils.createConditionFromStatus(status); + + assertEquals(1, conditions.size()); + Condition condition = getConditionByType(conditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(condition); + assertEquals("False", condition.getStatus()); + assertEquals("Finished", condition.getReason()); + assertEquals("Job status FINISHED", condition.getMessage()); + assertNotNull(condition.getLastTransitionTime()); + } + + @Test + void testApplicationModeConditionWithCreatedJob() { + FlinkDeploymentStatus status = BaseTestUtils.createApplicationModeStatus(JobStatus.CREATED); + + List conditions = ConditionUtils.createConditionFromStatus(status); + + assertEquals(1, conditions.size()); + Condition condition = getConditionByType(conditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(condition); + assertEquals("False", condition.getStatus()); + assertEquals("Created", condition.getReason()); + assertNotNull(condition.getLastTransitionTime()); + } + + @Test + void testSessionModeConditionWithReadyJobManager() { + FlinkDeploymentStatus status = + BaseTestUtils.createSessionModeStatus(JobManagerDeploymentStatus.READY); + + List conditions = ConditionUtils.createConditionFromStatus(status); + + assertEquals(1, conditions.size()); + Condition condition = getConditionByType(conditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(condition); + assertEquals(ConditionUtils.CONDITION_TYPE_RUNNING, condition.getType()); + assertEquals("True", condition.getStatus()); + assertEquals(JobManagerDeploymentStatus.READY.getReason(), condition.getReason()); + assertEquals(JobManagerDeploymentStatus.READY.getMessage(), condition.getMessage()); + assertNotNull(condition.getLastTransitionTime()); + } + + @Test + void testSessionModeConditionWithDeployingJobManager() { + FlinkDeploymentStatus status = + BaseTestUtils.createSessionModeStatus(JobManagerDeploymentStatus.DEPLOYING); + + List conditions = ConditionUtils.createConditionFromStatus(status); + + assertEquals(1, conditions.size()); + Condition condition = getConditionByType(conditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(condition); + assertEquals("False", condition.getStatus()); + assertEquals(JobManagerDeploymentStatus.DEPLOYING.getReason(), condition.getReason()); + assertNotNull(condition.getLastTransitionTime()); + } + + @Test + void testSessionModeConditionWithMissingJobManager() { + FlinkDeploymentStatus status = + BaseTestUtils.createSessionModeStatus(JobManagerDeploymentStatus.MISSING); + + List conditions = ConditionUtils.createConditionFromStatus(status); + + assertEquals(1, conditions.size()); + Condition condition = getConditionByType(conditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(condition); + assertEquals("False", condition.getStatus()); + assertNotNull(condition.getLastTransitionTime()); + } + + @Test + void testConditionTypeIsAlwaysRunning() { + // Test application mode + FlinkDeploymentStatus appStatus = + BaseTestUtils.createApplicationModeStatus(JobStatus.RUNNING); + List appConditions = ConditionUtils.createConditionFromStatus(appStatus); + Condition appCondition = + getConditionByType(appConditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(appCondition); + assertEquals(ConditionUtils.CONDITION_TYPE_RUNNING, appCondition.getType()); + + // Test session mode + FlinkDeploymentStatus sessionStatus = + BaseTestUtils.createSessionModeStatus(JobManagerDeploymentStatus.READY); + List sessionConditions = ConditionUtils.createConditionFromStatus(sessionStatus); + Condition sessionCondition = + getConditionByType(sessionConditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(sessionCondition); + assertEquals(ConditionUtils.CONDITION_TYPE_RUNNING, sessionCondition.getType()); + } + + @Test + void testGetLastTransitionTimeStamp_StatusUnchanged() throws InterruptedException { + // Test that timestamp is preserved when status doesn't change + FlinkDeploymentStatus status = BaseTestUtils.createApplicationModeStatus(JobStatus.RUNNING); + + // First call - creates initial condition with timestamp + List firstConditions = ConditionUtils.createConditionFromStatus(status); + Condition firstCondition = + getConditionByType(firstConditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(firstCondition); + String firstTimestamp = firstCondition.getLastTransitionTime(); + + // Add the condition to status + status.getConditions().add(firstCondition); + + // Second call - status unchanged (still RUNNING) + List secondConditions = ConditionUtils.createConditionFromStatus(status); + Condition secondCondition = + getConditionByType(secondConditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(secondCondition); + String secondTimestamp = secondCondition.getLastTransitionTime(); + + // Timestamp should be preserved since status didn't change + assertEquals(firstTimestamp, secondTimestamp); + } + + @Test + void testGetLastTransitionTimeStamp_StatusChanged() throws InterruptedException { + // Test that timestamp is updated when status changes + FlinkDeploymentStatus status = BaseTestUtils.createApplicationModeStatus(JobStatus.RUNNING); + + // First call - creates initial condition with RUNNING status + List firstConditions = ConditionUtils.createConditionFromStatus(status); + Condition firstCondition = + getConditionByType(firstConditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(firstCondition); + String firstTimestamp = firstCondition.getLastTransitionTime(); + + // Add the condition to status + status.getConditions().add(firstCondition); + + // Change status to FAILED + status.getJobStatus().setState(JobStatus.FAILED); + + // Second call - status changed to FAILED + List secondConditions = ConditionUtils.createConditionFromStatus(status); + Condition secondCondition = + getConditionByType(secondConditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(secondCondition); + String secondTimestamp = secondCondition.getLastTransitionTime(); + + // Timestamp should be different since status changed + assertTrue( + !firstTimestamp.equals(secondTimestamp), + "Timestamp should be updated when status changes"); + } + + @Test + void testGetLastTransitionTimeStamp_SessionMode() throws InterruptedException { + // Test timestamp behavior in session mode + FlinkDeploymentStatus status = + BaseTestUtils.createSessionModeStatus(JobManagerDeploymentStatus.READY); + + // First call + List firstConditions = ConditionUtils.createConditionFromStatus(status); + Condition firstCondition = + getConditionByType(firstConditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(firstCondition); + String firstTimestamp = firstCondition.getLastTransitionTime(); + + // Add condition to status + status.getConditions().add(firstCondition); + + // Second call with same status + List secondConditions = ConditionUtils.createConditionFromStatus(status); + Condition secondCondition = + getConditionByType(secondConditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(secondCondition); + String secondTimestamp = secondCondition.getLastTransitionTime(); + // Timestamp should be preserved + assertEquals(firstTimestamp, secondTimestamp); + + // Change to DEPLOYING + status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING); + + // Third call with changed status + List thirdConditions = ConditionUtils.createConditionFromStatus(status); + Condition thirdCondition = + getConditionByType(thirdConditions, ConditionUtils.CONDITION_TYPE_RUNNING); + assertNotNull(thirdCondition); + String thirdTimestamp = thirdCondition.getLastTransitionTime(); + // Timestamp should be updated + assertTrue( + !secondTimestamp.equals(thirdTimestamp), + "Timestamp should be updated when JobManager status changes"); + } +}