2222import com .dtstack .flink .sql .option .Options ;
2323import com .dtstack .flink .sql .util .PluginUtil ;
2424import com .esotericsoftware .minlog .Log ;
25+ import com .fasterxml .jackson .databind .ObjectMapper ;
2526import org .apache .commons .io .Charsets ;
2627import org .apache .commons .lang .StringUtils ;
2728import org .apache .flink .client .program .ClusterClient ;
@@ -63,6 +64,10 @@ public class ClusterClientFactory {
6364
6465 private static final Logger LOG = LoggerFactory .getLogger (ClusterClientFactory .class );
6566
67+ private static final String HA_CLUSTER_ID = "high-availability.cluster-id" ;
68+
69+ private static final String HADOOP_CONF = "fs.hdfs.hadoopconf" ;
70+
6671 public static ClusterClient createClusterClient (Options launcherOptions ) throws Exception {
6772 String mode = launcherOptions .getMode ();
6873 if (mode .equals (ClusterMode .standalone .name ())) {
@@ -92,7 +97,7 @@ public static ClusterClient createYarnSessionClient(Options launcherOptions) {
9297
9398 if (StringUtils .isNotBlank (yarnConfDir )) {
9499 try {
95- config .setString ("fs.hdfs.hadoopconf" , yarnConfDir );
100+ config .setString (HADOOP_CONF , yarnConfDir );
96101 FileSystem .initialize (config );
97102
98103 YarnConfiguration yarnConf = YarnConfLoader .getYarnConf (yarnConfDir );
@@ -105,8 +110,6 @@ public static ClusterClient createYarnSessionClient(Options launcherOptions) {
105110 yarnSessionConf = URLDecoder .decode (yarnSessionConf , Charsets .UTF_8 .toString ());
106111 Properties yarnSessionConfProperties = PluginUtil .jsonStrToObject (yarnSessionConf , Properties .class );
107112
108- LOG .info ("current yarn config:\n {}" , yarnSessionConfProperties );
109-
110113 Object yid = yarnSessionConfProperties .get ("yid" );
111114
112115 if (null != yid ) {
@@ -115,16 +118,15 @@ public static ClusterClient createYarnSessionClient(Options launcherOptions) {
115118 applicationId = getYarnClusterApplicationId (yarnClient );
116119 }
117120
118- LOG .info ("applicationId= {}" , applicationId .toString ());
121+ LOG .info ("current applicationId = {}" , applicationId .toString ());
119122
120123 if (StringUtils .isEmpty (applicationId .toString ())) {
121124 throw new RuntimeException ("No flink session found on yarn cluster." );
122125 }
123126
124- if (config .getString ("high-availability.cluster-id" , null ) == null ) {
125- config .setString ("high-availability.cluster-id" , applicationId .toString ());
127+ if (config .getString (HA_CLUSTER_ID , null ) == null ) {
128+ config .setString (HA_CLUSTER_ID , applicationId .toString ());
126129 }
127- LOG .info ("current config detail:\n {}" , config );
128130
129131 AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor (config , yarnConf , flinkConfDir , yarnClient , false );
130132 ClusterClient clusterClient = clusterDescriptor .retrieve (applicationId );
0 commit comments