Skip to content

Commit dd35dda

Browse files
committed
Merge branch 'feat_1.8_restartStrategy' into 'v1.8.0_dev'
添加重启策略的指定参数 See merge request !228
2 parents 1687594 + 23317ad commit dd35dda

File tree

3 files changed

+30
-7
lines changed

3 files changed

+30
-7
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
149149
* taskmanager.slots:per_job模式下指定每个taskmanager对应的slot数量(默认1)
150150
* savePointPath:任务恢复点的路径(默认无)
151151
* allowNonRestoredState:指示保存点是否允许非还原状态的标志(默认false)
152+
* restore.enable:是否失败重启(默认是true)
153+
* failure.interval:衡量失败率的时间段,单位分钟(默认6m)
154+
* delay.interval:连续两次重启尝试间的间隔,单位是秒(默认10s)
152155
* [prometheus 相关参数](docs/prometheus.md) per_job可指定metric写入到外部监控组件,以prometheus pushgateway举例
153156

154157

core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,14 @@ public class ConfigConstrant {
6363
public static final String CHECKPOINTS_DIRECTORY_KEY = "state.checkpoints.dir";
6464
public static final String STATE_BACKEND_INCREMENTAL_KEY = "state.backend.incremental";
6565

66+
public static final String RESTOREENABLE = "restore.enable";
67+
6668

6769
// restart plocy
6870
public static final int failureRate = 3;
6971

70-
public static final int failureInterval = 6; //min
72+
public static final String FAILUREINTERVAL = "failure.interval"; //min
7173

72-
public static final int delayInterval = 10; //sec
74+
public static final String DELAYINTERVAL= "delay.interval"; //sec
7375

7476
}

core/src/main/java/com/dtstack/flink/sql/environment/StreamEnvConfigManager.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,15 @@ public static void streamExecutionEnvironmentConfig(StreamExecutionEnvironment s
101101
}
102102
});
103103

104-
streamEnv.setRestartStrategy(RestartStrategies.failureRateRestart(
105-
ConfigConstrant.failureRate,
106-
Time.of(ConfigConstrant.failureInterval, TimeUnit.MINUTES),
107-
Time.of(ConfigConstrant.delayInterval, TimeUnit.SECONDS)
108-
));
104+
if(isRestore(confProperties).get()){
105+
streamEnv.setRestartStrategy(RestartStrategies.failureRateRestart(
106+
ConfigConstrant.failureRate,
107+
Time.of(getFailureInterval(confProperties).get(), TimeUnit.MINUTES),
108+
Time.of(getDelayInterval(confProperties).get(), TimeUnit.SECONDS)
109+
));
110+
} else {
111+
streamEnv.setRestartStrategy(RestartStrategies.noRestart());
112+
}
109113

110114
// checkpoint config
111115
Optional<Boolean> checkpointingEnabled = isCheckpointingEnabled(confProperties);
@@ -163,6 +167,20 @@ public static Optional<Long> getAutoWatermarkInterval(Properties properties) {
163167
return StringUtils.isNotBlank(autoWatermarkInterval) ? Optional.of(Long.valueOf(autoWatermarkInterval)) : Optional.empty();
164168
}
165169

170+
public static Optional<Boolean> isRestore(Properties properties){
171+
String restoreEnable = properties.getProperty(ConfigConstrant.RESTOREENABLE, "true");
172+
return Optional.of(Boolean.valueOf(restoreEnable));
173+
}
174+
175+
public static Optional<Integer> getDelayInterval(Properties properties){
176+
String delayInterval = properties.getProperty(ConfigConstrant.DELAYINTERVAL, "10");
177+
return Optional.of(Integer.valueOf(delayInterval));
178+
}
179+
public static Optional<Integer> getFailureInterval(Properties properties){
180+
String failureInterval = properties.getProperty(ConfigConstrant.FAILUREINTERVAL, "6");
181+
return Optional.of(Integer.valueOf(failureInterval));
182+
}
183+
166184
/**
167185
* #ProcessingTime(默认), IngestionTime, EventTime
168186
* @param properties

0 commit comments

Comments
 (0)