Skip to content

Commit e035d74

Browse files
author
dapeng
committed
添加重启策略的指定参数
1 parent 887b94f commit e035d74

File tree

2 files changed

+16
-5
lines changed

2 files changed

+16
-5
lines changed

core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ public class ConfigConstrant {
6363
public static final String CHECKPOINTS_DIRECTORY_KEY = "state.checkpoints.dir";
6464
public static final String STATE_BACKEND_INCREMENTAL_KEY = "state.backend.incremental";
6565

66+
public static final String RESTOREENABLE = "restore.enable";
67+
6668

6769
// restart plocy
6870
public static final int failureRate = 3;

core/src/main/java/com/dtstack/flink/sql/environment/StreamEnvConfigManager.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,15 @@ public static void streamExecutionEnvironmentConfig(StreamExecutionEnvironment s
101101
}
102102
});
103103

104-
streamEnv.setRestartStrategy(RestartStrategies.failureRateRestart(
105-
ConfigConstrant.failureRate,
106-
Time.of(ConfigConstrant.failureInterval, TimeUnit.MINUTES),
107-
Time.of(ConfigConstrant.delayInterval, TimeUnit.SECONDS)
108-
));
104+
if(isRestore(confProperties).get()){
105+
streamEnv.setRestartStrategy(RestartStrategies.failureRateRestart(
106+
ConfigConstrant.failureRate,
107+
Time.of(ConfigConstrant.failureInterval, TimeUnit.MINUTES),
108+
Time.of(ConfigConstrant.delayInterval, TimeUnit.SECONDS)
109+
));
110+
} else {
111+
streamEnv.setRestartStrategy(RestartStrategies.noRestart());
112+
}
109113

110114
// checkpoint config
111115
Optional<Boolean> checkpointingEnabled = isCheckpointingEnabled(confProperties);
@@ -163,6 +167,11 @@ public static Optional<Long> getAutoWatermarkInterval(Properties properties) {
163167
return StringUtils.isNotBlank(autoWatermarkInterval) ? Optional.of(Long.valueOf(autoWatermarkInterval)) : Optional.empty();
164168
}
165169

170+
public static Optional<Boolean> isRestore(Properties properties){
171+
String restoreEnable = properties.getProperty(ConfigConstrant.RESTOREENABLE, "true");
172+
return Optional.of(Boolean.valueOf(restoreEnable));
173+
}
174+
166175
/**
167176
* #ProcessingTime(默认), IngestionTime, EventTime
168177
* @param properties

0 commit comments

Comments
 (0)