1919package com .dtstack .flink .sql .launcher ;
2020
2121import org .apache .commons .lang .StringUtils ;
22+ import org .apache .flink .client .deployment .ClusterRetrieveException ;
2223import org .apache .flink .client .deployment .StandaloneClusterDescriptor ;
2324import org .apache .flink .client .program .ClusterClient ;
2425import org .apache .flink .client .program .StandaloneClusterClient ;
26+ import org .apache .flink .client .program .rest .RestClusterClient ;
2527import org .apache .flink .configuration .ConfigConstants ;
2628import org .apache .flink .configuration .Configuration ;
2729import org .apache .flink .configuration .GlobalConfiguration ;
2830import org .apache .flink .core .fs .FileSystem ;
2931import org .apache .flink .yarn .AbstractYarnClusterDescriptor ;
3032import org .apache .flink .yarn .YarnClusterClient ;
3133import org .apache .flink .yarn .YarnClusterDescriptor ;
34+ import org .apache .hadoop .yarn .api .records .ApplicationId ;
3235import org .apache .hadoop .yarn .api .records .ApplicationReport ;
3336import org .apache .hadoop .yarn .api .records .YarnApplicationState ;
3437import org .apache .hadoop .yarn .client .api .YarnClient ;
5558 */
5659public class ClusterClientFactory {
5760
58- public static ClusterClient createClusterClient (Properties props ) {
61+ public static ClusterClient createClusterClient (Properties props ) throws ClusterRetrieveException {
5962 String clientType = props .getProperty (OPTION_MODE );
6063 if (clientType .equals (ClusterMode .MODE_STANDALONE )) {
6164 return createStandaloneClient (props );
@@ -65,20 +68,20 @@ public static ClusterClient createClusterClient(Properties props) {
6568 throw new IllegalArgumentException ("Unsupported cluster client type: " );
6669 }
6770
68- public static StandaloneClusterClient createStandaloneClient (Properties props ) {
71+ public static RestClusterClient createStandaloneClient (Properties props ) throws ClusterRetrieveException {
6972 String flinkConfDir = props .getProperty (LauncherOptions .OPTION_FLINK_CONF_DIR );
7073 Configuration config = GlobalConfiguration .loadConfiguration (flinkConfDir );
7174 StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor (config );
72- StandaloneClusterClient clusterClient = descriptor .retrieve (null );
75+ RestClusterClient clusterClient = descriptor .retrieve (null );
7376 clusterClient .setDetached (true );
7477 return clusterClient ;
7578 }
7679
77- public static YarnClusterClient createYarnClient (Properties props ) {
80+ public static ClusterClient createYarnClient (Properties props ) {
7881 String flinkConfDir = props .getProperty (LauncherOptions .OPTION_FLINK_CONF_DIR );
7982 Configuration config = GlobalConfiguration .loadConfiguration (flinkConfDir );
8083 String yarnConfDir = props .getProperty (LauncherOptions .OPTION_YARN_CONF_DIR );
81- org . apache . hadoop . conf . Configuration yarnConf = new YarnConfiguration ();
84+ YarnConfiguration yarnConf = new YarnConfiguration ();
8285 if (StringUtils .isNotBlank (yarnConfDir )) {
8386 try {
8487
@@ -96,6 +99,7 @@ public boolean accept(File dir, String name) {
9699 return false ;
97100 }
98101 });
102+
99103 if (xmlFileList != null ) {
100104 for (File xmlFile : xmlFileList ) {
101105 yarnConf .addResource (xmlFile .toURI ().toURL ());
@@ -105,7 +109,7 @@ public boolean accept(File dir, String name) {
105109 YarnClient yarnClient = YarnClient .createYarnClient ();
106110 yarnClient .init (yarnConf );
107111 yarnClient .start ();
108- String applicationId = null ;
112+ ApplicationId applicationId = null ;
109113
110114 Set <String > set = new HashSet <>();
111115 set .add ("Apache Flink" );
@@ -129,24 +133,22 @@ public boolean accept(File dir, String name) {
129133 if (thisMemory > maxMemory || thisMemory == maxMemory && thisCores > maxCores ) {
130134 maxMemory = thisMemory ;
131135 maxCores = thisCores ;
132- applicationId = report .getApplicationId (). toString () ;
136+ applicationId = report .getApplicationId ();
133137 }
134138
135139 }
136140
137- if (org .apache .commons .lang3 .StringUtils .isEmpty (applicationId )) {
141+ if (org .apache .commons .lang3 .StringUtils .isEmpty (applicationId . toString () )) {
138142 throw new RuntimeException ("No flink session found on yarn cluster." );
139143 }
140144
141- yarnClient .stop ();
142-
143- AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor (config , "." );
145+ AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor (config , yarnConf , "." , yarnClient , false );
144146 Field confField = AbstractYarnClusterDescriptor .class .getDeclaredField ("conf" );
145147 confField .setAccessible (true );
146148 haYarnConf (yarnConf );
147149 confField .set (clusterDescriptor , yarnConf );
148150
149- YarnClusterClient clusterClient = clusterDescriptor .retrieve (applicationId );
151+ ClusterClient clusterClient = clusterDescriptor .retrieve (applicationId );
150152 clusterClient .setDetached (true );
151153 return clusterClient ;
152154 }
0 commit comments