Skip to content

Commit 8ff6205

Browse files
committed
opt stream cofig manager
1 parent 80b2d0d commit 8ff6205

File tree

2 files changed

+24
-67
lines changed

2 files changed

+24
-67
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,8 @@ public static void main(String[] args) throws Exception {
118118
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
119119
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
120120
StreamExecutionEnvironment env = getStreamExeEnv(confProperties, deployMode);
121-
StreamTableEnvironment tableEnv = getStreamTableEnv(env);
122-
StreamQueryConfig streamQueryConfig = getStreamQueryConfig(tableEnv, confProperties);
121+
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
122+
StreamQueryConfig streamQueryConfig = StreamEnvConfigManager.getStreamQueryConfig(tableEnv, confProperties);
123123

124124
List<URL> jarURList = Lists.newArrayList();
125125
SqlTree sqlTree = SqlParser.parseSql(sql);
@@ -304,14 +304,4 @@ private static StreamExecutionEnvironment getStreamExeEnv(Properties confPropert
304304
StreamEnvConfigManager.streamExecutionEnvironmentConfig(env, confProperties);
305305
return env;
306306
}
307-
308-
private static StreamTableEnvironment getStreamTableEnv(StreamExecutionEnvironment env) {
309-
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
310-
return tableEnv;
311-
}
312-
313-
private static StreamQueryConfig getStreamQueryConfig(StreamTableEnvironment tableEnv, Properties confProperties) {
314-
Optional<StreamQueryConfig> streamQueryConfig = StreamEnvConfigManager.streamTableEnvironmentStateTTLConfig(tableEnv, confProperties);
315-
return streamQueryConfig.isPresent() ? streamQueryConfig.get() : tableEnv.queryConfig();
316-
}
317307
}

core/src/main/java/com/dtstack/flink/sql/environment/StreamEnvConfigManager.java

Lines changed: 22 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,6 @@
5959
* @author maqi
6060
*/
6161
public final class StreamEnvConfigManager {
62-
private static Optional<Long> autoWatermarkInterval;
63-
6462
private StreamEnvConfigManager() {
6563
throw new AssertionError("Singleton class.");
6664
}
@@ -93,30 +91,15 @@ public static void streamExecutionEnvironmentConfig(StreamExecutionEnvironment s
9391
((Configuration) exeConfig.getGlobalJobParameters()).addAll(globalJobParameters);
9492
}
9593

96-
Optional<Integer> envParallelism = getEnvParallelism(confProperties);
97-
if (envParallelism.isPresent()) {
98-
streamEnv.setParallelism(envParallelism.get());
99-
}
100-
101-
Optional<Integer> maxParallelism = getMaxEnvParallelism(confProperties);
102-
if (maxParallelism.isPresent()) {
103-
streamEnv.setMaxParallelism(maxParallelism.get());
104-
}
105-
106-
Optional<Long> bufferTimeoutMillis = getBufferTimeoutMillis(confProperties);
107-
if (bufferTimeoutMillis.isPresent()) {
108-
streamEnv.setBufferTimeout(bufferTimeoutMillis.get());
109-
}
110-
111-
Optional<TimeCharacteristic> streamTimeCharacteristic = getStreamTimeCharacteristic(confProperties);
112-
if (streamTimeCharacteristic.isPresent()) {
113-
streamEnv.setStreamTimeCharacteristic(streamTimeCharacteristic.get());
114-
}
115-
116-
Optional<Long> autoWatermarkInterval = getAutoWatermarkInterval(confProperties);
117-
if (autoWatermarkInterval.isPresent() && streamEnv.getStreamTimeCharacteristic().equals(TimeCharacteristic.EventTime)) {
118-
streamEnv.getConfig().setAutoWatermarkInterval(autoWatermarkInterval.get());
119-
}
94+
getEnvParallelism(confProperties).ifPresent(streamEnv::setParallelism);
95+
getMaxEnvParallelism(confProperties).ifPresent(streamEnv::setMaxParallelism);
96+
getBufferTimeoutMillis(confProperties).ifPresent(streamEnv::setBufferTimeout);
97+
getStreamTimeCharacteristic(confProperties).ifPresent(streamEnv::setStreamTimeCharacteristic);
98+
getAutoWatermarkInterval(confProperties).ifPresent(op -> {
99+
if (streamEnv.getStreamTimeCharacteristic().equals(TimeCharacteristic.EventTime)) {
100+
streamEnv.getConfig().setAutoWatermarkInterval(op);
101+
}
102+
});
120103

121104
streamEnv.setRestartStrategy(RestartStrategies.failureRateRestart(
122105
ConfigConstrant.failureRate,
@@ -127,35 +110,19 @@ public static void streamExecutionEnvironmentConfig(StreamExecutionEnvironment s
127110
// checkpoint config
128111
Optional<Boolean> checkpointingEnabled = isCheckpointingEnabled(confProperties);
129112
if (checkpointingEnabled.get()) {
130-
Optional<Long> checkpointInterval = getCheckpointInterval(confProperties);
131-
streamEnv.enableCheckpointing(checkpointInterval.get());
132-
133-
Optional<CheckpointingMode> checkpointingMode = getCheckpointingMode(confProperties);
134-
if (checkpointingMode.isPresent()) {
135-
streamEnv.getCheckpointConfig().setCheckpointingMode(checkpointingMode.get());
136-
}
137-
Optional<Long> checkpointTimeout = getCheckpointTimeout(confProperties);
138-
if (checkpointTimeout.isPresent()) {
139-
streamEnv.getCheckpointConfig().setCheckpointTimeout(checkpointTimeout.get());
140-
}
141-
142-
Optional<Integer> maxConcurrentCheckpoints = getMaxConcurrentCheckpoints(confProperties);
143-
if (maxConcurrentCheckpoints.isPresent()) {
144-
streamEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(maxConcurrentCheckpoints.get());
145-
}
146-
147-
Optional<CheckpointConfig.ExternalizedCheckpointCleanup> checkpointCleanup = getCheckpointCleanup(confProperties);
148-
if (checkpointCleanup.isPresent()) {
149-
streamEnv.getCheckpointConfig().enableExternalizedCheckpoints(checkpointCleanup.get());
150-
}
151-
152-
Optional<StateBackend> stateBackend = getStateBackend(confProperties);
153-
if (stateBackend.isPresent()) {
154-
streamEnv.setStateBackend(stateBackend.get());
155-
}
113+
getCheckpointInterval(confProperties).ifPresent(streamEnv::enableCheckpointing);
114+
getCheckpointingMode(confProperties).ifPresent(streamEnv.getCheckpointConfig()::setCheckpointingMode);
115+
getCheckpointTimeout(confProperties).ifPresent(streamEnv.getCheckpointConfig()::setCheckpointTimeout);
116+
getMaxConcurrentCheckpoints(confProperties).ifPresent(streamEnv.getCheckpointConfig()::setMaxConcurrentCheckpoints);
117+
getCheckpointCleanup(confProperties).ifPresent(streamEnv.getCheckpointConfig()::enableExternalizedCheckpoints);
118+
getStateBackend(confProperties).ifPresent(streamEnv::setStateBackend);
156119
}
157120
}
158121

122+
public static StreamQueryConfig getStreamQueryConfig(StreamTableEnvironment tableEnv, Properties confProperties) {
123+
return StreamEnvConfigManager.streamTableEnvironmentStateTTLConfig(tableEnv, confProperties).orElseGet(tableEnv::queryConfig);
124+
}
125+
159126
/**
160127
* 设置TableEnvironment状态超时时间
161128
* @param tableEnv
@@ -222,9 +189,9 @@ public static Optional<Boolean> isCheckpointingEnabled(Properties properties) {
222189

223190
public static Optional<Long> getCheckpointInterval(Properties properties) {
224191
// 两个参数主要用来做上层兼容
225-
Long sql_interval = Long.valueOf(properties.getProperty(ConfigConstrant.SQL_CHECKPOINT_INTERVAL_KEY, "0"));
226-
Long flink_interval = Long.valueOf(properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_INTERVAL_KEY, "0"));
227-
long checkpointInterval = Math.max(sql_interval, flink_interval);
192+
Long sqlInterval = Long.valueOf(properties.getProperty(ConfigConstrant.SQL_CHECKPOINT_INTERVAL_KEY, "0"));
193+
Long flinkInterval = Long.valueOf(properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_INTERVAL_KEY, "0"));
194+
long checkpointInterval = Math.max(sqlInterval, flinkInterval);
228195
return Optional.of(checkpointInterval);
229196
}
230197

0 commit comments

Comments
 (0)