Skip to content

Commit 3d66e82

Browse files
committed
yarnsession submit
1 parent fd117c4 commit 3d66e82

File tree

5 files changed

+133
-68
lines changed

5 files changed

+133
-68
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@
5454
import com.google.common.collect.Maps;
5555
import com.google.common.collect.Sets;
5656
import com.fasterxml.jackson.databind.ObjectMapper;
57+
import org.apache.flink.client.program.ContextEnvironment;
5758
import org.apache.flink.streaming.api.datastream.DataStream;
59+
import org.apache.flink.streaming.api.environment.StreamContextEnvironment;
5860
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
5961
import org.apache.flink.table.api.EnvironmentSettings;
6062
import org.apache.flink.table.api.Table;
@@ -66,10 +68,12 @@
6668
import org.slf4j.LoggerFactory;
6769

6870
import java.io.File;
71+
import java.lang.reflect.Field;
6972
import java.lang.reflect.InvocationTargetException;
7073
import java.net.URL;
7174
import java.net.URLClassLoader;
7275
import java.net.URLDecoder;
76+
import java.util.ArrayList;
7377
import java.util.List;
7478
import java.util.Map;
7579
import java.util.Properties;

launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java

Lines changed: 52 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.flink.runtime.minicluster.MiniCluster;
3636
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
3737
import org.apache.flink.runtime.util.LeaderConnectionInfo;
38+
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
3839
import org.apache.flink.yarn.YarnClusterDescriptor;
3940
import org.apache.hadoop.yarn.api.records.ApplicationId;
4041
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -62,7 +63,7 @@ public static ClusterClient createClusterClient(Options launcherOptions) throws
6263
if (mode.equals(ClusterMode.standalone.name())) {
6364
return createStandaloneClient(launcherOptions);
6465
} else if (mode.equals(ClusterMode.yarn.name())) {
65-
// return createYarnSessionClient(launcherOptions);
66+
return createYarnSessionClient(launcherOptions);
6667
}
6768
throw new IllegalArgumentException("Unsupported cluster client type: ");
6869
}
@@ -77,50 +78,56 @@ public static ClusterClient createStandaloneClient(Options launcherOptions) thro
7778
return clusterClient;
7879
}
7980

80-
// public static ClusterClient createYarnSessionClient(Options launcherOptions) {
81-
// String flinkConfDir = StringUtils.isEmpty(launcherOptions.getFlinkconf()) ? "" : launcherOptions.getFlinkconf();
82-
// Configuration config = StringUtils.isEmpty(flinkConfDir) ? new Configuration() : GlobalConfiguration.loadConfiguration(flinkConfDir);
83-
// String yarnConfDir = launcherOptions.getYarnconf();
84-
//
85-
// if (StringUtils.isNotBlank(yarnConfDir)) {
86-
// try {
87-
// config.setString(ConfigConstants.PATH_HADOOP_CONFIG, yarnConfDir);
88-
// FileSystem.initialize(config, null);
89-
//
90-
// YarnConfiguration yarnConf = YarnConfLoader.getYarnConf(yarnConfDir);
91-
// YarnClient yarnClient = YarnClient.createYarnClient();
92-
// yarnClient.init(yarnConf);
93-
// yarnClient.start();
94-
// ApplicationId applicationId = null;
95-
//
96-
// String yarnSessionConf = launcherOptions.getYarnSessionConf();
97-
// yarnSessionConf = URLDecoder.decode(yarnSessionConf, Charsets.UTF_8.toString());
98-
// Properties yarnSessionConfProperties = PluginUtil.jsonStrToObject(yarnSessionConf, Properties.class);
99-
// Object yid = yarnSessionConfProperties.get("yid");
100-
//
101-
// if (null != yid) {
102-
// applicationId = toApplicationId(yid.toString());
103-
// } else {
104-
// applicationId = getYarnClusterApplicationId(yarnClient);
105-
// }
106-
//
107-
// System.out.println("applicationId=" + applicationId.toString());
108-
//
109-
// if (StringUtils.isEmpty(applicationId.toString())) {
110-
// throw new RuntimeException("No flink session found on yarn cluster.");
111-
// }
112-
//
113-
// AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(config, yarnConf, flinkConfDir, yarnClient, false);
114-
// ClusterClient clusterClient = clusterDescriptor.retrieve(applicationId);
115-
// clusterClient.setDetached(true);
116-
// return clusterClient;
117-
// } catch (Exception e) {
118-
// throw new RuntimeException(e);
119-
// }
120-
// }else{
121-
// throw new RuntimeException("yarn mode must set param of 'yarnconf'!!!");
122-
// }
123-
// }
81+
public static ClusterClient createYarnSessionClient(Options launcherOptions) {
82+
String flinkConfDir = StringUtils.isEmpty(launcherOptions.getFlinkconf()) ? "" : launcherOptions.getFlinkconf();
83+
Configuration config = StringUtils.isEmpty(flinkConfDir) ? new Configuration() : GlobalConfiguration.loadConfiguration(flinkConfDir);
84+
String yarnConfDir = launcherOptions.getYarnconf();
85+
86+
if (StringUtils.isNotBlank(yarnConfDir)) {
87+
try {
88+
config.setString(ConfigConstants.PATH_HADOOP_CONFIG, yarnConfDir);
89+
FileSystem.initialize(config, null);
90+
91+
YarnConfiguration yarnConf = YarnConfLoader.getYarnConf(yarnConfDir);
92+
YarnClient yarnClient = YarnClient.createYarnClient();
93+
yarnClient.init(yarnConf);
94+
yarnClient.start();
95+
ApplicationId applicationId = null;
96+
97+
String yarnSessionConf = launcherOptions.getYarnSessionConf();
98+
yarnSessionConf = URLDecoder.decode(yarnSessionConf, Charsets.UTF_8.toString());
99+
Properties yarnSessionConfProperties = PluginUtil.jsonStrToObject(yarnSessionConf, Properties.class);
100+
Object yid = yarnSessionConfProperties.get("yid");
101+
102+
if (null != yid) {
103+
applicationId = toApplicationId(yid.toString());
104+
} else {
105+
applicationId = getYarnClusterApplicationId(yarnClient);
106+
}
107+
108+
System.out.println("applicationId=" + applicationId.toString());
109+
110+
if (StringUtils.isEmpty(applicationId.toString())) {
111+
throw new RuntimeException("No flink session found on yarn cluster.");
112+
}
113+
114+
YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
115+
config,
116+
yarnConf,
117+
yarnClient,
118+
YarnClientYarnClusterInformationRetriever.create(yarnClient),
119+
false);
120+
121+
ClusterClientProvider<ApplicationId> retrieve = clusterDescriptor.retrieve(applicationId);
122+
ClusterClient<ApplicationId> clusterClient = retrieve.getClusterClient();
123+
return clusterClient;
124+
} catch (Exception e) {
125+
throw new RuntimeException(e);
126+
}
127+
}else{
128+
throw new RuntimeException("yarn mode must set param of 'yarnconf'!!!");
129+
}
130+
}
124131

125132

126133
private static ApplicationId getYarnClusterApplicationId(YarnClient yarnClient) throws Exception {

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package com.dtstack.flink.sql.launcher;
2222

2323
import com.dtstack.flink.sql.constrant.ConfigConstrant;
24+
import com.dtstack.flink.sql.launcher.utils.SubmitUtil;
2425
import com.google.common.collect.Lists;
2526
import com.alibaba.fastjson.JSON;
2627
import com.alibaba.fastjson.TypeReference;
@@ -33,11 +34,18 @@
3334
import org.apache.commons.io.Charsets;
3435
import org.apache.commons.lang.BooleanUtils;
3536
import org.apache.commons.lang.StringUtils;
37+
import org.apache.flink.api.common.JobExecutionResult;
38+
import org.apache.flink.api.common.JobID;
39+
import org.apache.flink.api.common.cache.DistributedCache;
40+
import org.apache.flink.client.ClientUtils;
41+
import org.apache.flink.client.deployment.ClusterClientJobClientAdapter;
3642
import org.apache.flink.client.program.ClusterClient;
3743
import org.apache.flink.client.program.PackagedProgram;
3844
import org.apache.flink.client.program.PackagedProgramUtils;
3945
import org.apache.flink.configuration.Configuration;
4046
import org.apache.flink.configuration.GlobalConfiguration;
47+
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
48+
import org.apache.flink.core.execution.JobClient;
4149
import org.apache.flink.runtime.jobgraph.JobGraph;
4250
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
4351

@@ -46,6 +54,9 @@
4654
import java.io.FileInputStream;
4755
import java.io.IOException;
4856
import java.io.InputStreamReader;
57+
import java.io.UnsupportedEncodingException;
58+
import java.net.MalformedURLException;
59+
import java.net.URL;
4960
import java.net.URLDecoder;
5061
import java.util.LinkedList;
5162
import java.util.List;
@@ -106,17 +117,27 @@ public static void main(String[] args) throws Exception {
106117
.setSavepointRestoreSettings(savepointRestoreSettings)
107118
.build();
108119

109-
if(mode.equals(ClusterMode.yarnPer.name())){
110-
String flinkConfDir = launcherOptions.getFlinkconf();
111-
Configuration config = StringUtils.isEmpty(flinkConfDir) ? new Configuration() : GlobalConfiguration.loadConfiguration(flinkConfDir);
112-
JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, config, 1,false);
120+
String flinkConfDir = launcherOptions.getFlinkconf();
121+
Configuration config = StringUtils.isEmpty(flinkConfDir) ? new Configuration() : GlobalConfiguration.loadConfiguration(flinkConfDir);
122+
JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, config, 1,false);
123+
if (mode.equals(ClusterMode.yarnPer.name())) {
113124
PerJobSubmitter.submit(launcherOptions, jobGraph, config);
114125
} else {
115-
// ClusterClient clusterClient = ClusterClientFactory.createClusterClient(launcherOptions);
116-
// clusterClient.run(program, 1);
117-
// clusterClient.shutdown();
126+
ClusterClient clusterClient = ClusterClientFactory.createClusterClient(launcherOptions);
127+
removeSqlPluginAndFillUdfJar(jobGraph,launcherOptions.getAddjar());
128+
129+
JobExecutionResult jobExecutionResult = ClientUtils.submitJob(clusterClient, jobGraph);
130+
String jobID = jobExecutionResult.getJobID().toString();
131+
System.out.println("jobID:" + jobID);
118132
}
133+
}
119134

135+
private static void removeSqlPluginAndFillUdfJar(JobGraph jobGraph, String udfJar) throws UnsupportedEncodingException {
136+
jobGraph.getUserArtifacts().clear();
137+
jobGraph.getUserJars().clear();
138+
if (!StringUtils.isEmpty(udfJar)) {
139+
SubmitUtil.fillUserJarForJobGraph(udfJar, jobGraph);
140+
}
120141
}
121142

122143
private static String[] parseJson(String[] args) {

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobSubmitter.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,23 @@
1818

1919
package com.dtstack.flink.sql.launcher.perjob;
2020

21+
import com.dtstack.flink.sql.launcher.utils.SubmitUtil;
2122
import com.dtstack.flink.sql.option.Options;
2223
import com.dtstack.flink.sql.util.PluginUtil;
2324
import org.apache.commons.io.Charsets;
2425
import org.apache.commons.lang3.StringUtils;
2526
import org.apache.flink.client.deployment.ClusterSpecification;
26-
import org.apache.flink.client.program.ClusterClient;
2727
import org.apache.flink.client.program.ClusterClientProvider;
2828
import org.apache.flink.configuration.Configuration;
29-
import org.apache.flink.core.fs.Path;
3029
import org.apache.flink.runtime.jobgraph.JobGraph;
3130
import org.apache.flink.yarn.YarnClusterDescriptor;
3231
import org.apache.hadoop.yarn.api.records.ApplicationId;
3332
import org.slf4j.Logger;
3433
import org.slf4j.LoggerFactory;
3534
import java.net.URLDecoder;
36-
import java.util.Arrays;
37-
import java.util.List;
3835
import java.util.Properties;
3936

37+
4038
/**
4139
* per job mode submitter
4240
* Date: 2018/11/17
@@ -50,11 +48,7 @@ public class PerJobSubmitter {
5048

5149
public static String submit(Options launcherOptions, JobGraph jobGraph, Configuration flinkConfig) throws Exception {
5250
if (!StringUtils.isBlank(launcherOptions.getAddjar())) {
53-
String addjarPath = URLDecoder.decode(launcherOptions.getAddjar(), Charsets.UTF_8.toString());
54-
List<String> paths = getJarPaths(addjarPath);
55-
paths.forEach( path -> {
56-
jobGraph.addJar(new Path("file://" + path));
57-
});
51+
SubmitUtil.fillUserJarForJobGraph(launcherOptions.getAddjar(),jobGraph);
5852
}
5953

6054
String confProp = launcherOptions.getConfProp();
@@ -78,12 +72,6 @@ public static String submit(Options launcherOptions, JobGraph jobGraph, Configur
7872
return applicationId;
7973
}
8074

81-
private static List<String> getJarPaths(String addjarPath) {
82-
if (addjarPath.length() > 2) {
83-
addjarPath = addjarPath.substring(1,addjarPath.length()-1).replace("\"","");
84-
}
85-
List<String> paths = Arrays.asList(addjarPath.split(","));
86-
return paths;
87-
}
75+
8876

8977
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.launcher.utils;
20+
21+
import org.apache.commons.io.Charsets;
22+
import org.apache.flink.core.fs.Path;
23+
import org.apache.flink.runtime.jobgraph.JobGraph;
24+
25+
import java.io.UnsupportedEncodingException;
26+
import java.net.URLDecoder;
27+
import java.util.Arrays;
28+
import java.util.List;
29+
30+
/**
31+
* Date: 2020/3/3
32+
* Company: www.dtstack.com
33+
* @author maqi
34+
*/
35+
public class SubmitUtil {
36+
37+
public static void fillUserJarForJobGraph(String jarPath, JobGraph jobGraph) throws UnsupportedEncodingException {
38+
String addjarPath = URLDecoder.decode(jarPath, Charsets.UTF_8.toString());
39+
if (addjarPath.length() > 2) {
40+
addjarPath = addjarPath.substring(1, addjarPath.length() - 1).replace("\"", "");
41+
}
42+
List<String> paths = Arrays.asList(addjarPath.split(","));
43+
paths.forEach(path -> jobGraph.addJar(new Path("file://" + path)));
44+
}
45+
}

0 commit comments

Comments
 (0)