Skip to content

Commit 9f39331

Browse files
committed
Merge branch 'v1.4.1'
2 parents 2dee18c + 72c7873 commit 9f39331

File tree

3 files changed

+24
-7
lines changed

3 files changed

+24
-7
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package com.dtstack.flink.sql;
2222

2323
import com.dtstack.flink.sql.classloader.DtClassLoader;
24+
import com.dtstack.flink.sql.enums.ECacheType;
2425
import com.dtstack.flink.sql.parser.CreateFuncParser;
2526
import com.dtstack.flink.sql.parser.InsertSqlParser;
2627
import com.dtstack.flink.sql.side.SideSqlExec;
@@ -272,8 +273,9 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
272273
classPathSet.add( PluginUtil.getRemoteJarFilePath(tableInfo.getType(), TargetTableInfo.TARGET_SUFFIX, remoteSqlPluginPath));
273274
} else if(tableInfo instanceof SideTableInfo){
274275

276+
String sideOperator = ECacheType.ALL.name().equals(((SideTableInfo) tableInfo).getCacheType()) ? "all" : "async";
275277
sideTableMap.put(tableInfo.getName(), (SideTableInfo) tableInfo);
276-
classPathSet.add(PluginUtil.getRemoteJarFilePath(tableInfo.getType(), SideTableInfo.TARGET_SUFFIX, remoteSqlPluginPath));
278+
classPathSet.add(PluginUtil.getRemoteSideJarFilePath(tableInfo.getType(), sideOperator, SideTableInfo.TARGET_SUFFIX, remoteSqlPluginPath));
277279
}else {
278280
throw new RuntimeException("not support table type:" + tableInfo.getType());
279281
}

core/src/main/java/com/dtstack/flink/sql/side/StreamSideFactory.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package com.dtstack.flink.sql.side;
2222

2323
import com.dtstack.flink.sql.classloader.DtClassLoader;
24+
import com.dtstack.flink.sql.enums.ECacheType;
2425
import com.dtstack.flink.sql.table.AbsSideTableParser;
2526
import com.dtstack.flink.sql.table.AbsTableParser;
2627
import com.dtstack.flink.sql.util.PluginUtil;
@@ -36,14 +37,11 @@ public class StreamSideFactory {
3637

3738
private static final String CURR_TYPE = "side";
3839

39-
private static final String SIDE_DIR_TMPL = "%s%sside";
40-
4140
public static AbsTableParser getSqlParser(String pluginType, String sqlRootDir, String cacheType) throws Exception {
4241

43-
cacheType = cacheType == null ? "async" : cacheType;
44-
String sideDir = String.format(SIDE_DIR_TMPL, pluginType, cacheType);
42+
String sideOperator = ECacheType.ALL.name().equals(cacheType) ? "all" : "async";
4543
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
46-
String pluginJarPath = PluginUtil.getJarFileDirPath(sideDir, sqlRootDir);
44+
String pluginJarPath = PluginUtil.getSideJarFileDirPath(pluginType, sideOperator, "side", sqlRootDir);
4745

4846
DtClassLoader dtClassLoader = (DtClassLoader) classLoader;
4947
PluginUtil.addPluginJar(pluginJarPath, dtClassLoader);

core/src/main/java/com/dtstack/flink/sql/util/PluginUtil.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,17 @@ public static String getJarFileDirPath(String type, String sqlRootDir){
6363
return jarPath;
6464
}
6565

66+
public static String getSideJarFileDirPath(String pluginType, String sideOperator, String tableType, String sqlRootDir) throws MalformedURLException {
67+
String dirName = sqlRootDir + SP + pluginType + sideOperator + tableType.toLowerCase();
68+
File jarFile = new File(dirName);
69+
70+
if(!jarFile.exists()){
71+
throw new RuntimeException(String.format("path %s not exists!!!", dirName));
72+
}
73+
74+
return dirName;
75+
}
76+
6677
public static String getGenerClassName(String pluginTypeName, String type) throws IOException {
6778
String pluginClassName = upperCaseFirstChar(pluginTypeName) + upperCaseFirstChar(type);
6879
return CLASS_PRE_STR + "." + type.toLowerCase() + "." + pluginTypeName + "." + pluginClassName;
@@ -94,12 +105,18 @@ public static Properties stringToProperties(String str) throws IOException{
94105
return properties;
95106
}
96107

97-
public static URL getRemoteJarFilePath(String pluginType, String tableType,String remoteSqlRootDir) throws MalformedURLException {
108+
public static URL getRemoteJarFilePath(String pluginType, String tableType, String remoteSqlRootDir) throws MalformedURLException {
98109
String dirName = pluginType + tableType.toLowerCase();
99110
String jarName = String.format("%s-%s.jar", pluginType, tableType.toLowerCase());
100111
return new URL("file:" + remoteSqlRootDir + SP + dirName + SP + jarName);
101112
}
102113

114+
public static URL getRemoteSideJarFilePath(String pluginType, String sideOperator, String tableType, String remoteSqlRootDir) throws MalformedURLException {
115+
String dirName = pluginType + sideOperator + tableType.toLowerCase();
116+
String jarName = String.format("%s-%s-%s.jar", pluginType, sideOperator, tableType.toLowerCase());
117+
return new URL("file:" + remoteSqlRootDir + SP + dirName + SP + jarName);
118+
}
119+
103120
public static String upperCaseFirstChar(String str){
104121
return str.substring(0, 1).toUpperCase() + str.substring(1);
105122
}

0 commit comments

Comments
 (0)