Skip to content

Commit 53b080e

Browse files
committed
add classpath to cache file
1 parent 0d14dc7 commit 53b080e

File tree

3 files changed

+21
-8
lines changed

3 files changed

+21
-8
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package com.dtstack.flink.sql;
2222

2323
import com.dtstack.flink.sql.classloader.DtClassLoader;
24+
import com.dtstack.flink.sql.enums.ECacheType;
2425
import com.dtstack.flink.sql.parser.CreateFuncParser;
2526
import com.dtstack.flink.sql.parser.InsertSqlParser;
2627
import com.dtstack.flink.sql.side.SideSqlExec;
@@ -83,6 +84,8 @@
8384

8485
public class Main {
8586

87+
private static final String CLASS_FILE_NAME_FMT = "class_path_%d";
88+
8689
private static final ObjectMapper objMapper = new ObjectMapper();
8790

8891
private static final Logger LOG = LoggerFactory.getLogger(Main.class);
@@ -272,15 +275,22 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
272275
classPathSet.add( PluginUtil.getRemoteJarFilePath(tableInfo.getType(), TargetTableInfo.TARGET_SUFFIX, remoteSqlPluginPath));
273276
} else if(tableInfo instanceof SideTableInfo){
274277

278+
String sideOperator = ECacheType.ALL.name().equals(((SideTableInfo) tableInfo).getCacheType()) ? "all" : "async";
275279
sideTableMap.put(tableInfo.getName(), (SideTableInfo) tableInfo);
276-
classPathSet.add(PluginUtil.getRemoteJarFilePath(tableInfo.getType(), SideTableInfo.TARGET_SUFFIX, remoteSqlPluginPath));
280+
classPathSet.add(PluginUtil.getRemoteSideJarFilePath(tableInfo.getType(), sideOperator, SideTableInfo.TARGET_SUFFIX, remoteSqlPluginPath));
277281
}else {
278282
throw new RuntimeException("not support table type:" + tableInfo.getType());
279283
}
280284
}
281285

282286
//The plug-in information corresponding to the table is loaded into the classPath env
283287
addEnvClassPath(env, classPathSet);
288+
int i = 0;
289+
for(URL url : classPathSet){
290+
String classFileName = String.format(CLASS_FILE_NAME_FMT, i);
291+
env.registerCachedFile(url.getPath(), classFileName, true);
292+
i++;
293+
}
284294
}
285295

286296
private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws IOException {

core/src/main/java/com/dtstack/flink/sql/util/PluginUtil.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,18 @@ public static Properties stringToProperties(String str) throws IOException{
9494
return properties;
9595
}
9696

97-
public static URL getRemoteJarFilePath(String pluginType, String tableType,String remoteSqlRootDir) throws MalformedURLException {
97+
public static URL getRemoteJarFilePath(String pluginType, String tableType, String remoteSqlRootDir) throws MalformedURLException {
9898
String dirName = pluginType + tableType.toLowerCase();
9999
String jarName = String.format("%s-%s.jar", pluginType, tableType.toLowerCase());
100100
return new URL("file:" + remoteSqlRootDir + SP + dirName + SP + jarName);
101101
}
102102

103+
public static URL getRemoteSideJarFilePath(String pluginType, String sideOperator, String tableType, String remoteSqlRootDir) throws MalformedURLException {
104+
String dirName = pluginType + sideOperator + tableType.toLowerCase();
105+
String jarName = String.format("%s-%s-%s.jar", pluginType, sideOperator, tableType.toLowerCase());
106+
return new URL("file:" + remoteSqlRootDir + SP + dirName + SP + jarName);
107+
}
108+
103109
public static String upperCaseFirstChar(String str){
104110
return str.substring(0, 1).toUpperCase() + str.substring(1);
105111
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ public boolean accept(File dir, String name) {
107107
}
108108

109109
YarnClient yarnClient = YarnClient.createYarnClient();
110+
haYarnConf(yarnConf);
110111
yarnClient.init(yarnConf);
111112
yarnClient.start();
112113
ApplicationId applicationId = null;
@@ -138,16 +139,12 @@ public boolean accept(File dir, String name) {
138139

139140
}
140141

141-
if(org.apache.commons.lang3.StringUtils.isEmpty(applicationId.toString())) {
142+
if(StringUtils.isEmpty(applicationId.toString())) {
142143
throw new RuntimeException("No flink session found on yarn cluster.");
143144
}
144145

145-
AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(config, yarnConf, ".", yarnClient, false);
146-
Field confField = AbstractYarnClusterDescriptor.class.getDeclaredField("conf");
147-
confField.setAccessible(true);
148-
haYarnConf(yarnConf);
149-
confField.set(clusterDescriptor, yarnConf);
150146

147+
AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(config, yarnConf, ".", yarnClient, false);
151148
ClusterClient clusterClient = clusterDescriptor.retrieve(applicationId);
152149
clusterClient.setDetached(true);
153150
return clusterClient;

0 commit comments

Comments
 (0)