Skip to content

Commit abbd32c

Browse files
authored
Merge pull request #201 from zhihui-ge/master
yarn session模式下可以指定applicationID提交任务
2 parents 171ceb3 + 62ce160 commit abbd32c

File tree

3 files changed

+45
-2
lines changed

3 files changed

+45
-2
lines changed

README.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ mvn clean package -Dmaven.test.skip
5858
#### 1.4.1 启动命令
5959

6060
```
61-
sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack/150_flinkplugin/sqlplugin -localSqlPluginPath D:\gitspace\flinkStreamSQL\plugins -addjar \["udf.jar\"\] -mode yarn -flinkconf D:\flink_home\kudu150etc -yarnconf D:\hadoop\etc\hadoopkudu -confProp \{\"time.characteristic\":\"EventTime\",\"sql.checkpoint.interval\":10000\}
61+
sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack/150_flinkplugin/sqlplugin -localSqlPluginPath D:\gitspace\flinkStreamSQL\plugins -addjar \["udf.jar\"\] -mode yarn -flinkconf D:\flink_home\kudu150etc -yarnconf D:\hadoop\etc\hadoopkudu -confProp \{\"time.characteristic\":\"EventTime\",\"sql.checkpoint.interval\":10000\} -yarnSessionConf \{\"yid\":\"application_1564971615273_38182\"}
6262
```
6363

6464
#### 1.4.2 命令行参数选项
@@ -149,6 +149,11 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
149149
* 描述:per_job 模式下指定的yarn queue
150150
* 必选:否
151151
* 默认值:false
152+
153+
* **yarnSessionConf**
154+
* 描述:yarn session 模式下指定的运行的一些参数,[可参考](https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html),目前只支持指定yid
155+
* 必选:否
156+
* 默认值:false
152157

153158
## 2 结构
154159
### 2.1 源表插件

core/src/main/java/com/dtstack/flink/sql/option/Options.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ public class Options {
6868
@OptionRequired(description = "yarn queue")
6969
private String queue = "default";
7070

71+
@OptionRequired(description = "yarn session configuration,such as yid")
72+
private String yarnSessionConf = "{}";
73+
7174
public String getMode() {
7275
return mode;
7376
}
@@ -171,4 +174,12 @@ public String getQueue() {
171174
public void setQueue(String queue) {
172175
this.queue = queue;
173176
}
177+
178+
public String getYarnSessionConf() {
179+
return yarnSessionConf;
180+
}
181+
182+
public void setYarnSessionConf(String yarnSessionConf) {
183+
this.yarnSessionConf = yarnSessionConf;
184+
}
174185
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151

5252
import com.dtstack.flink.sql.enums.ClusterMode;
5353
import org.apache.hadoop.yarn.exceptions.YarnException;
54+
import org.apache.hadoop.yarn.util.StringHelper;
55+
5456
import java.io.IOException;
5557
import java.util.stream.Collectors;
5658
import java.util.stream.Stream;
@@ -113,7 +115,15 @@ public static ClusterClient createYarnClient(Options launcherOptions, String mod
113115
ApplicationId applicationId = null;
114116
ClusterClient clusterClient = null;
115117
if(mode.equals(ClusterMode.yarn.name())) {//on yarn cluster mode
116-
applicationId = getYarnClusterApplicationId(yarnClient);
118+
String yarnSessionConf = launcherOptions.getYarnSessionConf();
119+
yarnSessionConf = URLDecoder.decode(yarnSessionConf, Charsets.UTF_8.toString());
120+
Properties yarnSessionConfProperties = PluginUtil.jsonStrToObject(yarnSessionConf, Properties.class);
121+
String yid = yarnSessionConfProperties.get("yid").toString();
122+
if(StringUtils.isNotBlank(yid)){
123+
applicationId = toApplicationId(yid);
124+
}else{
125+
applicationId = getYarnClusterApplicationId(yarnClient);
126+
}
117127
System.out.println("applicationId="+applicationId.toString());
118128

119129
AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
@@ -249,4 +259,21 @@ private static org.apache.hadoop.conf.Configuration haYarnConf(org.apache.hadoop
249259
return yarnConf;
250260
}
251261

262+
private static ApplicationId toApplicationId(String appIdStr) {
263+
Iterator<String> it = StringHelper._split(appIdStr).iterator();
264+
if (!(it.next()).equals("application")) {
265+
throw new IllegalArgumentException("Invalid ApplicationId prefix: " + appIdStr + ". The valid ApplicationId should start with prefix " + "application");
266+
} else {
267+
try {
268+
return toApplicationId(it);
269+
} catch (NumberFormatException e) {
270+
throw new IllegalArgumentException("Invalid AppAttemptId: " + appIdStr, e);
271+
}
272+
}
273+
}
274+
275+
private static ApplicationId toApplicationId(Iterator<String> it) throws NumberFormatException {
276+
return ApplicationId.newInstance(Long.parseLong(it.next()), Integer.parseInt(it.next()));
277+
}
278+
252279
}

0 commit comments

Comments
 (0)