Skip to content

Commit 00a4564

Browse files
WTZ468071157WTZ468071157
authored andcommitted
添加对high-availability.cluster.id的判断,为空则为applicationId
1 parent 5b49c0a commit 00a4564

File tree

1 file changed

+14
-1
lines changed

1 file changed

+14
-1
lines changed

launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@
4444
import org.apache.hadoop.yarn.client.api.YarnClient;
4545
import org.apache.hadoop.yarn.conf.YarnConfiguration;
4646
import org.apache.hadoop.yarn.util.StringHelper;
47+
import org.slf4j.Logger;
48+
import org.slf4j.LoggerFactory;
4749

4850
import java.net.InetSocketAddress;
4951
import java.net.URLDecoder;
@@ -59,6 +61,8 @@
5961
*/
6062
public class ClusterClientFactory {
6163

64+
private static final Logger LOG = LoggerFactory.getLogger(ClusterClientFactory.class);
65+
6266
public static ClusterClient createClusterClient(Options launcherOptions) throws Exception {
6367
String mode = launcherOptions.getMode();
6468
if (mode.equals(ClusterMode.standalone.name())) {
@@ -100,6 +104,9 @@ public static ClusterClient createYarnSessionClient(Options launcherOptions) {
100104
String yarnSessionConf = launcherOptions.getYarnSessionConf();
101105
yarnSessionConf = URLDecoder.decode(yarnSessionConf, Charsets.UTF_8.toString());
102106
Properties yarnSessionConfProperties = PluginUtil.jsonStrToObject(yarnSessionConf, Properties.class);
107+
108+
LOG.info("current yarn config:\n{}", yarnSessionConfProperties);
109+
103110
Object yid = yarnSessionConfProperties.get("yid");
104111

105112
if (null != yid) {
@@ -108,12 +115,18 @@ public static ClusterClient createYarnSessionClient(Options launcherOptions) {
108115
applicationId = getYarnClusterApplicationId(yarnClient);
109116
}
110117

111-
Log.info("applicationId={}", applicationId.toString());
118+
LOG.info("applicationId={}", applicationId.toString());
119+
120+
if (config.getString("high-availability.cluster-id", null) == null) {
121+
config.setString("high-availability.cluster-id", applicationId.toString());
122+
}
112123

113124
if (StringUtils.isEmpty(applicationId.toString())) {
114125
throw new RuntimeException("No flink session found on yarn cluster.");
115126
}
116127

128+
LOG.info("current config detail:\n{}", config);
129+
117130
AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(config, yarnConf, flinkConfDir, yarnClient, false);
118131
ClusterClient clusterClient = clusterDescriptor.retrieve(applicationId);
119132
clusterClient.setDetached(true);

0 commit comments

Comments
 (0)