Skip to content

Commit ecba3eb

Browse files
committed
udf classloader
1 parent 3ae5e2a commit ecba3eb

File tree

3 files changed

+49
-34
lines changed

3 files changed

+49
-34
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
package com.dtstack.flink.sql;
2222

23-
import com.dtstack.flink.sql.classloader.DtClassLoader;
23+
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2424
import com.dtstack.flink.sql.enums.ECacheType;
2525
import com.dtstack.flink.sql.exec.FlinkSQLExec;
2626
import com.dtstack.flink.sql.parser.CreateFuncParser;
@@ -77,10 +77,7 @@
7777
import java.lang.reflect.InvocationTargetException;
7878
import java.lang.reflect.Method;
7979
import java.net.URL;
80-
import java.net.URLClassLoader;
8180
import java.net.URLDecoder;
82-
import java.util.ArrayList;
83-
import java.util.Arrays;
8481
import java.util.List;
8582
import java.util.Map;
8683
import java.util.Properties;
@@ -144,10 +141,6 @@ public static void main(String[] args) throws Exception {
144141
addJarFileList = objMapper.readValue(addJarListStr, List.class);
145142
}
146143

147-
ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
148-
DtClassLoader parentClassloader = new DtClassLoader(new URL[]{}, threadClassLoader);
149-
Thread.currentThread().setContextClassLoader(parentClassloader);
150-
151144
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
152145
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
153146
StreamExecutionEnvironment env = getStreamExeEnv(confProperties, deployMode);
@@ -166,7 +159,7 @@ public static void main(String[] args) throws Exception {
166159
Map<String, Table> registerTableCache = Maps.newHashMap();
167160

168161
//register udf
169-
registerUDF(sqlTree, jarURList, parentClassloader, tableEnv);
162+
registerUDF(sqlTree, jarURList, tableEnv);
170163
//register table schema
171164
registerTable(sqlTree, env, tableEnv, localSqlPluginPath, remoteSqlPluginPath, sideTableMap, registerTableCache);
172165

@@ -219,9 +212,7 @@ public static void main(String[] args) throws Exception {
219212
}
220213

221214
if(env instanceof MyLocalStreamEnvironment) {
222-
List<URL> urlList = new ArrayList<>();
223-
urlList.addAll(Arrays.asList(parentClassloader.getURLs()));
224-
((MyLocalStreamEnvironment) env).setClasspaths(urlList);
215+
((MyLocalStreamEnvironment) env).setClasspaths(ClassLoaderManager.getClassPath());
225216
}
226217

227218
env.execute(name);
@@ -245,19 +236,12 @@ private static void addEnvClassPath(StreamExecutionEnvironment env, Set<URL> cla
245236
}
246237
}
247238

248-
private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, URLClassLoader parentClassloader,
249-
StreamTableEnvironment tableEnv)
239+
private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, StreamTableEnvironment tableEnv)
250240
throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
251241
//register urf
252-
URLClassLoader classLoader = null;
253242
List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList();
254243
for (CreateFuncParser.SqlParserResult funcInfo : funcList) {
255-
//classloader
256-
if (classLoader == null) {
257-
classLoader = FlinkUtil.loadExtraJar(jarURList, parentClassloader);
258-
}
259-
FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(),
260-
tableEnv, classLoader);
244+
FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(), tableEnv, jarURList);
261245
}
262246
}
263247

core/src/main/java/com/dtstack/flink/sql/classloader/ClassLoaderManager.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,16 @@
1919
package com.dtstack.flink.sql.classloader;
2020

2121
import com.dtstack.flink.sql.util.PluginUtil;
22+
import org.apache.commons.lang3.StringUtils;
2223
import org.slf4j.Logger;
2324
import org.slf4j.LoggerFactory;
2425

2526
import java.net.URL;
27+
import java.util.ArrayList;
28+
import java.util.Arrays;
29+
import java.util.Collections;
30+
import java.util.Comparator;
31+
import java.util.List;
2632
import java.util.Map;
2733
import java.util.concurrent.ConcurrentHashMap;
2834

@@ -42,6 +48,11 @@ public static <R> R newInstance(String pluginJarPath, DtSupplier<R> supplier) th
4248
return ClassLoaderCallBackMethod.callbackAndReset(supplier, classLoader);
4349
}
4450

51+
public static <R> R newInstance(List<URL> jarUrls, DtSupplier<R> supplier) throws Exception {
52+
ClassLoader classLoader = retrieveClassLoad(jarUrls);
53+
return ClassLoaderCallBackMethod.callbackAndReset(supplier, classLoader);
54+
}
55+
4556
private static DtClassLoader retrieveClassLoad(String pluginJarPath) {
4657
return pluginClassLoader.computeIfAbsent(pluginJarPath, k -> {
4758
try {
@@ -56,4 +67,29 @@ private static DtClassLoader retrieveClassLoad(String pluginJarPath) {
5667
}
5768
});
5869
}
70+
71+
private static DtClassLoader retrieveClassLoad(List<URL> jarUrls) {
72+
jarUrls.sort(Comparator.comparing(URL::toString));
73+
String jarUrlkey = StringUtils.join(jarUrls, "_");
74+
return pluginClassLoader.computeIfAbsent(jarUrlkey, k -> {
75+
try {
76+
URL[] urls = jarUrls.toArray(new URL[jarUrls.size()]);
77+
ClassLoader parentClassLoader = Thread.currentThread().getContextClassLoader();
78+
DtClassLoader classLoader = new DtClassLoader(urls, parentClassLoader);
79+
LOG.info("jarUrl:{} create ClassLoad successful...", jarUrlkey);
80+
return classLoader;
81+
} catch (Throwable e) {
82+
LOG.error("retrieve ClassLoad happens error:{}", e);
83+
throw new RuntimeException("retrieve ClassLoad happens error");
84+
}
85+
});
86+
}
87+
88+
public static List<URL> getClassPath() {
89+
List<URL> classPaths = new ArrayList<>();
90+
for (Map.Entry<String, DtClassLoader> entry : pluginClassLoader.entrySet()) {
91+
classPaths.addAll(Arrays.asList(entry.getValue().getURLs()));
92+
}
93+
return classPaths;
94+
}
5995
}

core/src/main/java/com/dtstack/flink/sql/util/FlinkUtil.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package com.dtstack.flink.sql.util;
2222

2323

24+
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2425
import org.apache.commons.lang3.StringUtils;
2526
import org.apache.flink.api.common.typeinfo.TypeInformation;
2627
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
@@ -151,12 +152,11 @@ public static void setStreamTimeCharacteristic(StreamExecutionEnvironment env, P
151152
* TABLE|SCALA
152153
* 注册UDF到table env
153154
*/
154-
public static void registerUDF(String type, String classPath, String funcName, TableEnvironment tableEnv,
155-
ClassLoader classLoader){
155+
public static void registerUDF(String type, String classPath, String funcName, TableEnvironment tableEnv, List<URL> jarURList){
156156
if("SCALA".equalsIgnoreCase(type)){
157-
registerScalaUDF(classPath, funcName, tableEnv, classLoader);
157+
registerScalaUDF(classPath, funcName, tableEnv, jarURList);
158158
}else if("TABLE".equalsIgnoreCase(type)){
159-
registerTableUDF(classPath, funcName, tableEnv, classLoader);
159+
registerTableUDF(classPath, funcName, tableEnv, jarURList);
160160
}else{
161161
throw new RuntimeException("not support of UDF which is not in (TABLE, SCALA)");
162162
}
@@ -169,11 +169,9 @@ public static void registerUDF(String type, String classPath, String funcName, T
169169
* @param funcName
170170
* @param tableEnv
171171
*/
172-
public static void registerScalaUDF(String classPath, String funcName, TableEnvironment tableEnv,
173-
ClassLoader classLoader){
172+
public static void registerScalaUDF(String classPath, String funcName, TableEnvironment tableEnv, List<URL> jarURList){
174173
try{
175-
ScalarFunction udfFunc = Class.forName(classPath, false, classLoader)
176-
.asSubclass(ScalarFunction.class).newInstance();
174+
ScalarFunction udfFunc = ClassLoaderManager.newInstance(jarURList, (cl) -> cl.loadClass(classPath).asSubclass(ScalarFunction.class).newInstance());
177175
tableEnv.registerFunction(funcName, udfFunc);
178176
logger.info("register scala function:{} success.", funcName);
179177
}catch (Exception e){
@@ -189,12 +187,9 @@ public static void registerScalaUDF(String classPath, String funcName, TableEnvi
189187
* @param funcName
190188
* @param tableEnv
191189
*/
192-
public static void registerTableUDF(String classPath, String funcName, TableEnvironment tableEnv,
193-
ClassLoader classLoader){
190+
public static void registerTableUDF(String classPath, String funcName, TableEnvironment tableEnv, List<URL> jarURList){
194191
try {
195-
TableFunction udfFunc = Class.forName(classPath, false, classLoader)
196-
.asSubclass(TableFunction.class).newInstance();
197-
192+
TableFunction udfFunc = ClassLoaderManager.newInstance(jarURList, (cl) -> cl.loadClass(classPath).asSubclass(TableFunction.class).newInstance());
198193
if(tableEnv instanceof StreamTableEnvironment){
199194
((StreamTableEnvironment)tableEnv).registerFunction(funcName, udfFunc);
200195
}else if(tableEnv instanceof BatchTableEnvironment){

0 commit comments

Comments
 (0)