From 06c21cbc7add73af2e795cb896932ac798499d2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Mon, 25 Aug 2025 16:23:59 +0200 Subject: [PATCH 1/4] Override config with env vars MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../operator/config/FlinkConfigManager.java | 57 ++++++++++++++++++- helm/flink-kubernetes-operator/values.yaml | 2 +- 2 files changed, 55 insertions(+), 4 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java index f31fc7ba4b..165f03721d 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java @@ -22,6 +22,7 @@ import org.apache.flink.autoscaler.config.AutoScalerOptions; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.FallbackKey; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; @@ -50,6 +51,7 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; @@ -62,6 +64,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -448,15 +451,63 @@ private static Configuration loadGlobalConfiguration() { @VisibleForTesting protected static Configuration loadGlobalConfiguration(Optional confOverrideDir) { + Configuration res; if (confOverrideDir.isPresent()) { Configuration configOverrides = GlobalConfiguration.loadConfiguration(confOverrideDir.get()); LOG.debug( "Loading default configuration with overrides from {}", confOverrideDir.get()); - return GlobalConfiguration.loadConfiguration(configOverrides); + res = GlobalConfiguration.loadConfiguration(configOverrides); + } else { + LOG.debug("Loading default configuration"); + res = GlobalConfiguration.loadConfiguration(); } - LOG.debug("Loading default configuration"); - return GlobalConfiguration.loadConfiguration(); + overriderConfigurationsFromEnvVariables(res, System::getenv); + return res; + } + + @VisibleForTesting + static void overriderConfigurationsFromEnvVariables( + Configuration res, Supplier> envVariables) { + var envVars = envVariables.get(); + var options = getConfigOptions(KubernetesOperatorConfigOptions.class); + options.forEach( + o -> { + o.fallbackKeys() + .forEach( + k -> { + var fallbackKey = ((FallbackKey) k).getKey(); + String key = keyToEnvVarName(fallbackKey); + String val = envVars.get(key); + if (val != null) { + res.setString(fallbackKey, val); + } + }); + var val = envVars.get(keyToEnvVarName(o.key())); + if (val != null) { + res.setString(o.key(), val); + } + }); + } + + @SuppressWarnings("rawtypes") + private static List getConfigOptions( + Class clazz) { + return Arrays.stream(clazz.getDeclaredFields()) + .filter(f -> f.getType().equals(ConfigOption.class)) + .map( + f -> { + try { + return (ConfigOption) f.get(null); + } catch (IllegalAccessException e) { + throw new IllegalStateException(e); + } + }) + .toList(); + } + + static String keyToEnvVarName(String key) { + return key.replaceAll("\\.", "_").toUpperCase(); } private static void applyDefault( diff --git a/helm/flink-kubernetes-operator/values.yaml b/helm/flink-kubernetes-operator/values.yaml index 00fb5db2d7..b0ee399f02 100644 --- a/helm/flink-kubernetes-operator/values.yaml +++ b/helm/flink-kubernetes-operator/values.yaml @@ -147,7 +147,7 @@ operatorSecurityContext: {} webhookSecurityContext: {} webhook: - create: true + create: false # validator: # create: true # mutator: From 0135943b409e518695cfdeffc28f3e299c3b9feb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Mon, 25 Aug 2025 18:30:12 +0200 Subject: [PATCH 2/4] generalized solution MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../operator/config/FlinkConfigManager.java | 46 +++++-------------- .../config/FlinkConfigManagerTest.java | 15 ++++++ 2 files changed, 27 insertions(+), 34 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java index 165f03721d..08054edd97 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java @@ -22,7 +22,6 @@ import org.apache.flink.autoscaler.config.AutoScalerOptions; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.FallbackKey; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; @@ -51,7 +50,6 @@ import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; @@ -80,6 +78,8 @@ /** Configuration manager for the Flink operator. */ public class FlinkConfigManager { + public static final String ENV_VAR_PREFIX = "FLINK_CONF_"; + private static final Logger LOG = LoggerFactory.getLogger(FlinkConfigManager.class); private static final ObjectMapper objectMapper = new ObjectMapper(); @@ -470,44 +470,22 @@ protected static Configuration loadGlobalConfiguration(Optional confOver static void overriderConfigurationsFromEnvVariables( Configuration res, Supplier> envVariables) { var envVars = envVariables.get(); - var options = getConfigOptions(KubernetesOperatorConfigOptions.class); - options.forEach( - o -> { - o.fallbackKeys() - .forEach( - k -> { - var fallbackKey = ((FallbackKey) k).getKey(); - String key = keyToEnvVarName(fallbackKey); - String val = envVars.get(key); - if (val != null) { - res.setString(fallbackKey, val); - } - }); - var val = envVars.get(keyToEnvVarName(o.key())); - if (val != null) { - res.setString(o.key(), val); + envVars.forEach( + (k, v) -> { + if (k.startsWith(ENV_VAR_PREFIX)) { + res.setString(envVarToKey(k), envVarValueToValue(v)); } }); } - @SuppressWarnings("rawtypes") - private static List getConfigOptions( - Class clazz) { - return Arrays.stream(clazz.getDeclaredFields()) - .filter(f -> f.getType().equals(ConfigOption.class)) - .map( - f -> { - try { - return (ConfigOption) f.get(null); - } catch (IllegalAccessException e) { - throw new IllegalStateException(e); - } - }) - .toList(); + @VisibleForTesting + static String envVarValueToValue(String v) { + return v.replace("_", " "); } - static String keyToEnvVarName(String key) { - return key.replaceAll("\\.", "_").toUpperCase(); + @VisibleForTesting + static String envVarToKey(String key) { + return key.replace(ENV_VAR_PREFIX, "").replace("__", "-").replace("_", ".").toLowerCase(); } private static void applyDefault( diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java index 39414af44c..f4a9f25242 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java @@ -53,6 +53,7 @@ import java.util.regex.Matcher; import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_WATCHED_NAMESPACES; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -428,4 +429,18 @@ public void testConcurrentDefaultConfig() throws InterruptedException { assertTrue(completed2.get()); assertTrue(completed3.get()); } + + @Test + void envVarToFlinkConfig() { + assertThat( + FlinkConfigManager.envVarToKey( + "FLINK_CONF_RESTART__STRATEGY_FAILURE__RATE_DELAY")) + .isEqualTo("restart-strategy.failure-rate.delay"); + } + + @Test + void envVarValueToFlinkConfigValue() { + assertThat(FlinkConfigManager.envVarValueToValue("1")).isEqualTo("1"); + assertThat(FlinkConfigManager.envVarValueToValue("1_m")).isEqualTo("1 m"); + } } From d6470fd52245e93a5b8396eab6c624099c25c3e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Mon, 25 Aug 2025 18:33:47 +0200 Subject: [PATCH 3/4] =?UTF-8?q?revert=20Signed-off-by:=20Attila=20M=C3=A9s?= =?UTF-8?q?z=C3=A1ros=20?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- helm/flink-kubernetes-operator/values.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helm/flink-kubernetes-operator/values.yaml b/helm/flink-kubernetes-operator/values.yaml index b0ee399f02..00fb5db2d7 100644 --- a/helm/flink-kubernetes-operator/values.yaml +++ b/helm/flink-kubernetes-operator/values.yaml @@ -147,7 +147,7 @@ operatorSecurityContext: {} webhookSecurityContext: {} webhook: - create: false + create: true # validator: # create: true # mutator: From 2ac5fae18ec3c3010f706f7a495da6456e38716e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 26 Aug 2025 12:30:37 +0200 Subject: [PATCH 4/4] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../kubernetes/operator/config/FlinkConfigManager.java | 7 +------ .../kubernetes/operator/config/FlinkConfigManagerTest.java | 6 ------ 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java index 08054edd97..32b96d729a 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java @@ -473,16 +473,11 @@ static void overriderConfigurationsFromEnvVariables( envVars.forEach( (k, v) -> { if (k.startsWith(ENV_VAR_PREFIX)) { - res.setString(envVarToKey(k), envVarValueToValue(v)); + res.setString(envVarToKey(k), v); } }); } - @VisibleForTesting - static String envVarValueToValue(String v) { - return v.replace("_", " "); - } - @VisibleForTesting static String envVarToKey(String key) { return key.replace(ENV_VAR_PREFIX, "").replace("__", "-").replace("_", ".").toLowerCase(); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java index f4a9f25242..1f597cd700 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java @@ -437,10 +437,4 @@ void envVarToFlinkConfig() { "FLINK_CONF_RESTART__STRATEGY_FAILURE__RATE_DELAY")) .isEqualTo("restart-strategy.failure-rate.delay"); } - - @Test - void envVarValueToFlinkConfigValue() { - assertThat(FlinkConfigManager.envVarValueToValue("1")).isEqualTo("1"); - assertThat(FlinkConfigManager.envVarValueToValue("1_m")).isEqualTo("1 m"); - } }