Skip to content

Commit 1a375e8

Browse files
WTZ468071157WTZ468071157
authored andcommitted
use RestClusterClient
1 parent 53f4f40 commit 1a375e8

File tree

2 files changed

+7
-9
lines changed

2 files changed

+7
-9
lines changed

launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
* to you under the Apache License, Version 2.0 (the
77
* "License"); you may not use this file except in compliance
88
* with the License. You may obtain a copy of the License at
9-
*
9+
* <p>
1010
* http://www.apache.org/licenses/LICENSE-2.0
11-
*
11+
* <p>
1212
* Unless required by applicable law or agreed to in writing, software
1313
* distributed under the License is distributed on an "AS IS" BASIS,
1414
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,6 +26,8 @@
2626
import org.apache.commons.lang.StringUtils;
2727
import org.apache.flink.client.program.ClusterClient;
2828
import org.apache.flink.client.program.MiniClusterClient;
29+
import org.apache.flink.client.program.rest.RestClusterClient;
30+
import org.apache.flink.client.program.rest.RestClusterClientConfiguration;
2931
import org.apache.flink.configuration.Configuration;
3032
import org.apache.flink.configuration.GlobalConfiguration;
3133
import org.apache.flink.configuration.JobManagerOptions;
@@ -70,10 +72,7 @@ public static ClusterClient createClusterClient(Options launcherOptions) throws
7072
public static ClusterClient createStandaloneClient(Options launcherOptions) throws Exception {
7173
String flinkConfDir = launcherOptions.getFlinkconf();
7274
Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
73-
MiniClusterConfiguration.Builder configBuilder = new MiniClusterConfiguration.Builder();
74-
configBuilder.setConfiguration(config);
75-
MiniCluster miniCluster = new MiniCluster(configBuilder.build());
76-
MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster);
75+
RestClusterClient clusterClient = new RestClusterClient<>(config, "clusterClient");
7776
LeaderConnectionInfo connectionInfo = clusterClient.getClusterConnectionInfo();
7877
InetSocketAddress address = AkkaUtils.getInetSocketAddressFromAkkaURL(connectionInfo.getAddress());
7978
config.setString(JobManagerOptions.ADDRESS, address.getAddress().getHostName());
@@ -122,7 +121,7 @@ public static ClusterClient createYarnSessionClient(Options launcherOptions) {
122121
} catch (Exception e) {
123122
throw new RuntimeException(e);
124123
}
125-
}else{
124+
} else {
126125
throw new RuntimeException("yarn mode must set param of 'yarnconf'!!!");
127126
}
128127
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,7 @@ public static void main(String[] args) throws Exception {
8989
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
9090
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
9191

92-
// LOG.info("current mode is {}", mode);
93-
System.out.println("current mode is " + mode);
92+
LOG.info("current mode is {}", mode);
9493

9594
if (mode.equals(ClusterMode.local.name())) {
9695
String[] localArgs = argList.toArray(new String[0]);

0 commit comments

Comments
 (0)