Skip to content

Commit 6dd208a

Browse files
author
toutian
committed
Merge branch '1.8_modify_params' into 'v1.8.0_dev'
调整savepointpath参数位置 See merge request !161
2 parents 93025d7 + d344537 commit 6dd208a

File tree

4 files changed

+17
-36
lines changed

4 files changed

+17
-36
lines changed

README.md

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
128128
* taskmanager.memory.mb: per_job模式下指定taskmanager的内存大小(单位MB, 默认值:768)
129129
* taskmanager.num: per_job模式下指定taskmanager的实例数(默认1)
130130
* taskmanager.slots:per_job模式下指定每个taskmanager对应的slot数量(默认1)
131+
* savePointPath:任务恢复点的路径(默认无)
132+
* allowNonRestoredState:指示保存点是否允许非还原状态的标志(默认false)
131133
* [prometheus 相关参数](docs/prometheus.md) per_job可指定metric写入到外部监控组件,以prometheus pushgateway举例
132134

133135

@@ -141,16 +143,6 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
141143
* 必选:否
142144
* 默认值:无
143145

144-
* **savePointPath**
145-
* 描述:任务恢复点的路径
146-
* 必选:否
147-
* 默认值:无
148-
149-
* **allowNonRestoredState**
150-
* 描述:指示保存点是否允许非还原状态的标志
151-
* 必选:否
152-
* 默认值:false
153-
154146
* **flinkJarPath**
155147
* 描述:per_job 模式提交需要指定本地的flink jar存放路径
156148
* 必选:否

core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,9 @@ public class ConfigConstrant {
4747

4848
public static final String SQL_MAX_ENV_PARALLELISM = "sql.max.env.parallelism";
4949

50-
public static final String MR_JOB_PARALLELISM = "mr.job.parallelism";
51-
50+
public static final String SAVE_POINT_PATH_KEY = "savePointPath";
51+
public static final String ALLOW_NON_RESTORED_STATE_KEY = "allowNonRestoredState";
52+
5253
public static final String SQL_BUFFER_TIMEOUT_MILLIS = "sql.buffer.timeout.millis";
5354

5455
public static final String FLINK_TIME_CHARACTERISTIC_KEY = "time.characteristic";

core/src/main/java/com/dtstack/flink/sql/option/Options.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,6 @@ public class Options {
5757
@OptionRequired(description = "sql ref prop,eg specify event time")
5858
private String confProp = "{}";
5959

60-
@OptionRequired(description = "Savepoint restore path")
61-
private String savePointPath;
62-
63-
@OptionRequired(description = "Flag indicating whether non restored state is allowed if the savepoint")
64-
private String allowNonRestoredState = "false";
65-
6660
@OptionRequired(description = "flink jar path for submit of perjob mode")
6761
private String flinkJarPath;
6862

@@ -147,22 +141,6 @@ public void setConfProp(String confProp) {
147141
this.confProp = confProp;
148142
}
149143

150-
public String getSavePointPath() {
151-
return savePointPath;
152-
}
153-
154-
public void setSavePointPath(String savePointPath) {
155-
this.savePointPath = savePointPath;
156-
}
157-
158-
public String getAllowNonRestoredState() {
159-
return allowNonRestoredState;
160-
}
161-
162-
public void setAllowNonRestoredState(String allowNonRestoredState) {
163-
this.allowNonRestoredState = allowNonRestoredState;
164-
}
165-
166144
public String getFlinkJarPath() {
167145
return flinkJarPath;
168146
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
package com.dtstack.flink.sql.launcher;
2222

23+
import com.dtstack.flink.sql.constrant.ConfigConstrant;
2324
import com.google.common.collect.Lists;
2425
import com.alibaba.fastjson.JSON;
2526
import com.alibaba.fastjson.TypeReference;
@@ -29,6 +30,7 @@
2930
import com.dtstack.flink.sql.option.OptionParser;
3031
import com.dtstack.flink.sql.option.Options;
3132
import com.dtstack.flink.sql.util.PluginUtil;
33+
import org.apache.commons.io.Charsets;
3234
import org.apache.commons.lang.BooleanUtils;
3335
import org.apache.commons.lang.StringUtils;
3436
import org.apache.flink.client.program.ClusterClient;
@@ -44,9 +46,11 @@
4446
import java.io.FileInputStream;
4547
import java.io.IOException;
4648
import java.io.InputStreamReader;
49+
import java.net.URLDecoder;
4750
import java.util.LinkedList;
4851
import java.util.List;
4952
import java.util.Map;
53+
import java.util.Properties;
5054

5155
/**
5256
* Date: 2017/2/20
@@ -75,6 +79,10 @@ public static void main(String[] args) throws Exception {
7579
String mode = launcherOptions.getMode();
7680
List<String> argList = optionParser.getProgramExeArgList();
7781

82+
String confProp = launcherOptions.getConfProp();
83+
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
84+
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
85+
7886
if(mode.equals(ClusterMode.local.name())) {
7987
String[] localArgs = argList.toArray(new String[argList.size()]);
8088
Main.main(localArgs);
@@ -86,8 +94,10 @@ public static void main(String[] args) throws Exception {
8694
String[] remoteArgs = argList.toArray(new String[argList.size()]);
8795
PackagedProgram program = new PackagedProgram(jarFile, Lists.newArrayList(), remoteArgs);
8896

89-
if(StringUtils.isNotBlank(launcherOptions.getSavePointPath())){
90-
program.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(launcherOptions.getSavePointPath(), BooleanUtils.toBoolean(launcherOptions.getAllowNonRestoredState())));
97+
String savePointPath = confProperties.getProperty(ConfigConstrant.SAVE_POINT_PATH_KEY);
98+
if(StringUtils.isNotBlank(savePointPath)){
99+
String allowNonRestoredState = confProperties.getOrDefault(ConfigConstrant.ALLOW_NON_RESTORED_STATE_KEY, "false").toString();
100+
program.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savePointPath, BooleanUtils.toBoolean(allowNonRestoredState)));
91101
}
92102

93103
if(mode.equals(ClusterMode.yarnPer.name())){

0 commit comments

Comments
 (0)