1818
1919package com .dtstack .flink .sql .launcher ;
2020
21+ import com .dtstack .flink .sql .enums .ClusterMode ;
2122import com .dtstack .flink .sql .option .Options ;
2223import com .dtstack .flink .sql .util .PluginUtil ;
23- import com .dtstack .flink .sql .yarn .JobParameter ;
24- import com .dtstack .flink .sql .yarn .YarnClusterConfiguration ;
2524import org .apache .commons .io .Charsets ;
2625import org .apache .commons .lang .StringUtils ;
2726import org .apache .flink .client .program .ClusterClient ;
3736import org .apache .flink .runtime .util .LeaderConnectionInfo ;
3837import org .apache .flink .yarn .AbstractYarnClusterDescriptor ;
3938import org .apache .flink .yarn .YarnClusterDescriptor ;
40- import org .apache .hadoop .fs .Path ;
4139import org .apache .hadoop .yarn .api .records .ApplicationId ;
4240import org .apache .hadoop .yarn .api .records .ApplicationReport ;
4341import org .apache .hadoop .yarn .api .records .YarnApplicationState ;
4442import org .apache .hadoop .yarn .client .api .YarnClient ;
45- import org .apache .hadoop .yarn .client .api .YarnClientApplication ;
4643import org .apache .hadoop .yarn .conf .YarnConfiguration ;
47- import java .io .File ;
48- import java .net .InetSocketAddress ;
49- import java .net .URLDecoder ;
50- import java .util .*;
51-
52- import com .dtstack .flink .sql .enums .ClusterMode ;
53- import org .apache .hadoop .yarn .exceptions .YarnException ;
5444import org .apache .hadoop .yarn .util .StringHelper ;
5545
56- import java .io .IOException ;
57- import java .util .stream .Collectors ;
58- import java .util .stream .Stream ;
59- import static java .util .Objects .requireNonNull ;
46+ import java .net .InetSocketAddress ;
47+ import java .net .URLDecoder ;
48+ import java .util .EnumSet ;
49+ import java .util .HashSet ;
50+ import java .util .Iterator ;
51+ import java .util .List ;
52+ import java .util .Properties ;
53+ import java .util .Set ;
6054
6155/**
6256 * @author sishu.yss
@@ -65,10 +59,10 @@ public class ClusterClientFactory {
6559
6660 public static ClusterClient createClusterClient (Options launcherOptions ) throws Exception {
6761 String mode = launcherOptions .getMode ();
68- if (mode .equals (ClusterMode .standalone .name ())) {
62+ if (mode .equals (ClusterMode .standalone .name ())) {
6963 return createStandaloneClient (launcherOptions );
70- } else if (mode .equals (ClusterMode .yarn .name ())) {
71- return createYarnClient (launcherOptions , mode );
64+ } else if (mode .equals (ClusterMode .yarn .name ())) {
65+ return createYarnSessionClient (launcherOptions );
7266 }
7367 throw new IllegalArgumentException ("Unsupported cluster client type: " );
7468 }
@@ -88,122 +82,53 @@ public static ClusterClient createStandaloneClient(Options launcherOptions) thro
8882 return clusterClient ;
8983 }
9084
91- public static ClusterClient createYarnClient (Options launcherOptions , String mode ) {
92- String flinkConfDir = launcherOptions .getFlinkconf ();
93- Configuration flinkConf = GlobalConfiguration .loadConfiguration (flinkConfDir );
85+ public static ClusterClient createYarnSessionClient (Options launcherOptions ) {
86+ String flinkConfDir = StringUtils . isEmpty ( launcherOptions . getFlinkconf ()) ? "" : launcherOptions .getFlinkconf ();
87+ Configuration config = StringUtils . isEmpty ( flinkConfDir ) ? new Configuration () : GlobalConfiguration .loadConfiguration (flinkConfDir );
9488 String yarnConfDir = launcherOptions .getYarnconf ();
95- YarnConfiguration yarnConf ;
96- if (StringUtils .isNotBlank (yarnConfDir )) {
97- try {
98- flinkConf .setString (ConfigConstants .PATH_HADOOP_CONFIG , yarnConfDir );
99- FileSystem .initialize (flinkConf );
10089
101- File dir = new File (yarnConfDir );
102- if (dir .exists () && dir .isDirectory ()) {
103- yarnConf = loadYarnConfiguration (yarnConfDir );
104-
105- YarnClient yarnClient = YarnClient .createYarnClient ();
106- haYarnConf (yarnConf );
107- yarnClient .init (yarnConf );
108- yarnClient .start ();
109-
110- String confProp = launcherOptions .getConfProp ();
111- confProp = URLDecoder .decode (confProp , Charsets .UTF_8 .toString ());
112- System .out .println ("confProp=" +confProp );
113- Properties confProperties = PluginUtil .jsonStrToObject (confProp , Properties .class );
114-
115- ApplicationId applicationId = null ;
116- ClusterClient clusterClient = null ;
117- if (mode .equals (ClusterMode .yarn .name ())) {//on yarn cluster mode
118- String yarnSessionConf = launcherOptions .getYarnSessionConf ();
119- yarnSessionConf = URLDecoder .decode (yarnSessionConf , Charsets .UTF_8 .toString ());
120- Properties yarnSessionConfProperties = PluginUtil .jsonStrToObject (yarnSessionConf , Properties .class );
121- String yid = yarnSessionConfProperties .get ("yid" ).toString ();
122- if (StringUtils .isNotBlank (yid )){
123- applicationId = toApplicationId (yid );
124- }else {
125- applicationId = getYarnClusterApplicationId (yarnClient );
126- }
127- System .out .println ("applicationId=" +applicationId .toString ());
128-
129- AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor (
130- flinkConf , yarnConf , "." , yarnClient , false );
131- clusterClient = clusterDescriptor .retrieve (applicationId );
132-
133- System .out .println ("applicationId=" +applicationId .toString ()+" has retrieve!" );
134- } else {//on yarn per-job mode
135- applicationId = createApplication (yarnClient );
136- System .out .println ("applicationId=" +applicationId .toString ());
137-
138- YarnClusterConfiguration clusterConf = getYarnClusterConfiguration (flinkConf ,yarnConf ,flinkConfDir );
139- //jobmanager+taskmanager param
140- JobParameter appConf = new JobParameter (confProperties );
90+ if (StringUtils .isNotBlank (yarnConfDir )) {
91+ try {
92+ config .setString (ConfigConstants .PATH_HADOOP_CONFIG , yarnConfDir );
93+ FileSystem .initialize (config );
94+
95+ YarnConfiguration yarnConf = YarnConfLoader .getYarnConf (yarnConfDir );
96+ YarnClient yarnClient = YarnClient .createYarnClient ();
97+ yarnClient .init (yarnConf );
98+ yarnClient .start ();
99+ ApplicationId applicationId = null ;
100+
101+ String yarnSessionConf = launcherOptions .getYarnSessionConf ();
102+ yarnSessionConf = URLDecoder .decode (yarnSessionConf , Charsets .UTF_8 .toString ());
103+ Properties yarnSessionConfProperties = PluginUtil .jsonStrToObject (yarnSessionConf , Properties .class );
104+ Object yid = yarnSessionConfProperties .get ("yid" );
105+
106+ if (null != yid ) {
107+ applicationId = toApplicationId (yid .toString ());
108+ } else {
109+ applicationId = getYarnClusterApplicationId (yarnClient );
110+ }
141111
142- com .dtstack .flink .sql .yarn .YarnClusterDescriptor clusterDescriptor = new com .dtstack .flink .sql .yarn .YarnClusterDescriptor (
143- clusterConf , yarnClient , appConf ,applicationId , launcherOptions .getName (),null );
144- clusterClient = clusterDescriptor .deploy ();
112+ System .out .println ("applicationId=" + applicationId .toString ());
145113
146- System .out .println ("applicationId=" +applicationId .toString ()+" has deploy!" );
147- }
148- clusterClient .setDetached (true );
149- yarnClient .stop ();
150- return clusterClient ;
114+ if (StringUtils .isEmpty (applicationId .toString ())) {
115+ throw new RuntimeException ("No flink session found on yarn cluster." );
151116 }
152- } catch (Exception e ) {
117+
118+ AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor (config , yarnConf , flinkConfDir , yarnClient , false );
119+ ClusterClient clusterClient = clusterDescriptor .retrieve (applicationId );
120+ clusterClient .setDetached (true );
121+ return clusterClient ;
122+ } catch (Exception e ) {
153123 throw new RuntimeException (e );
154124 }
125+ }else {
126+ throw new RuntimeException ("yarn mode must set param of 'yarnconf'!!!" );
155127 }
156- throw new UnsupportedOperationException ("Haven't been developed yet!" );
157128 }
158129
159- private static YarnConfiguration loadYarnConfiguration (String yarnConfDir )
160- {
161- org .apache .hadoop .conf .Configuration hadoopConf = new org .apache .hadoop .conf .Configuration ();
162- hadoopConf .set ("fs.hdfs.impl" , "org.apache.hadoop.hdfs.DistributedFileSystem" );
163-
164- Stream .of ("yarn-site.xml" , "core-site.xml" , "hdfs-site.xml" ).forEach (file -> {
165- File site = new File (requireNonNull (yarnConfDir , "ENV HADOOP_CONF_DIR is not setting" ), file );
166- if (site .exists () && site .isFile ()) {
167- hadoopConf .addResource (new org .apache .hadoop .fs .Path (site .toURI ()));
168- }
169- else {
170- throw new RuntimeException (site + " not exists" );
171- }
172- });
173-
174- YarnConfiguration yarnConf = new YarnConfiguration (hadoopConf );
175- // try (PrintWriter pw = new PrintWriter(new FileWriter(yarnSite))) { //write local file
176- // yarnConf.writeXml(pw);
177- // }
178- return yarnConf ;
179- }
180-
181- public static YarnClusterConfiguration getYarnClusterConfiguration (Configuration flinkConf ,YarnConfiguration yarnConf ,String flinkConfDir )
182- {
183- Path flinkJar = new Path (getFlinkJarFile (flinkConfDir ).toURI ());
184- @ SuppressWarnings ("ConstantConditions" ) final Set <Path > resourcesToLocalize = Stream
185- .of ("flink-conf.yaml" , "log4j.properties" )
186- .map (x -> new Path (new File (flinkConfDir , x ).toURI ()))
187- .collect (Collectors .toSet ());
188-
189- return new YarnClusterConfiguration (flinkConf , yarnConf , "" , flinkJar , resourcesToLocalize );
190- }
191130
192- public static final String FLINK_DIST = "flink-dist" ;
193- private static File getFlinkJarFile (String flinkConfDir )
194- {
195- String errorMessage = "error not search " + FLINK_DIST + "*.jar" ;
196- File [] files = requireNonNull (new File (flinkConfDir , "/../lib" ).listFiles (), errorMessage );
197- Optional <File > file = Arrays .stream (files )
198- .filter (f -> f .getName ().startsWith (FLINK_DIST )).findFirst ();
199- return file .orElseThrow (() -> new IllegalArgumentException (errorMessage ));
200- }
201-
202- private static ApplicationId createApplication (YarnClient yarnClient )throws IOException , YarnException {
203- YarnClientApplication app = yarnClient .createApplication ();
204- return app .getApplicationSubmissionContext ().getApplicationId ();
205- }
206- private static ApplicationId getYarnClusterApplicationId (YarnClient yarnClient ) throws Exception {
131+ private static ApplicationId getYarnClusterApplicationId (YarnClient yarnClient ) throws Exception {
207132 ApplicationId applicationId = null ;
208133
209134 Set <String > set = new HashSet <>();
@@ -214,51 +139,31 @@ private static ApplicationId getYarnClusterApplicationId(YarnClient yarnClient)
214139
215140 int maxMemory = -1 ;
216141 int maxCores = -1 ;
217- for (ApplicationReport report : reportList ) {
218- if (!report .getName ().startsWith ("Flink session" )){
142+ for (ApplicationReport report : reportList ) {
143+ if (!report .getName ().startsWith ("Flink session" )) {
219144 continue ;
220145 }
221146
222- if (!report .getYarnApplicationState ().equals (YarnApplicationState .RUNNING )) {
147+ if (!report .getYarnApplicationState ().equals (YarnApplicationState .RUNNING )) {
223148 continue ;
224149 }
225150
226151 int thisMemory = report .getApplicationResourceUsageReport ().getNeededResources ().getMemory ();
227152 int thisCores = report .getApplicationResourceUsageReport ().getNeededResources ().getVirtualCores ();
228- if (thisMemory > maxMemory || thisMemory == maxMemory && thisCores > maxCores ) {
153+ if (thisMemory > maxMemory || thisMemory == maxMemory && thisCores > maxCores ) {
229154 maxMemory = thisMemory ;
230155 maxCores = thisCores ;
231156 applicationId = report .getApplicationId ();
232157 }
233158
234159 }
235160
236- if (StringUtils .isEmpty (applicationId .toString ())) {
161+ if (StringUtils .isEmpty (applicationId .toString ())) {
237162 throw new RuntimeException ("No flink session found on yarn cluster." );
238163 }
239164 return applicationId ;
240165 }
241166
242- /**
243- * 处理yarn HA的配置项
244- */
245- private static org .apache .hadoop .conf .Configuration haYarnConf (org .apache .hadoop .conf .Configuration yarnConf ) {
246- Iterator <Map .Entry <String , String >> iterator = yarnConf .iterator ();
247- while (iterator .hasNext ()) {
248- Map .Entry <String ,String > entry = iterator .next ();
249- String key = entry .getKey ();
250- String value = entry .getValue ();
251- if (key .startsWith ("yarn.resourcemanager.hostname." )) {
252- String rm = key .substring ("yarn.resourcemanager.hostname." .length ());
253- String addressKey = "yarn.resourcemanager.address." + rm ;
254- if (yarnConf .get (addressKey ) == null ) {
255- yarnConf .set (addressKey , value + ":" + YarnConfiguration .DEFAULT_RM_PORT );
256- }
257- }
258- }
259- return yarnConf ;
260- }
261-
262167 private static ApplicationId toApplicationId (String appIdStr ) {
263168 Iterator <String > it = StringHelper ._split (appIdStr ).iterator ();
264169 if (!(it .next ()).equals ("application" )) {
0 commit comments