Skip to content

Commit dab993c

Browse files
author
杨思枢_思枢
committed
Merge branch 'v1.5.0_dev_classloader' into 'v1.5.0_dev'
V1.5.0 dev classloader See merge request !88
2 parents 0655426 + 461b876 commit dab993c

File tree

11 files changed

+263
-124
lines changed

11 files changed

+263
-124
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
package com.dtstack.flink.sql;
2222

23-
import com.dtstack.flink.sql.classloader.DtClassLoader;
23+
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2424
import com.dtstack.flink.sql.enums.ECacheType;
2525
import com.dtstack.flink.sql.exec.FlinkSQLExec;
2626
import com.dtstack.flink.sql.parser.CreateFuncParser;
@@ -77,10 +77,7 @@
7777
import java.lang.reflect.InvocationTargetException;
7878
import java.lang.reflect.Method;
7979
import java.net.URL;
80-
import java.net.URLClassLoader;
8180
import java.net.URLDecoder;
82-
import java.util.ArrayList;
83-
import java.util.Arrays;
8481
import java.util.List;
8582
import java.util.Map;
8683
import java.util.Properties;
@@ -144,10 +141,6 @@ public static void main(String[] args) throws Exception {
144141
addJarFileList = objMapper.readValue(addJarListStr, List.class);
145142
}
146143

147-
ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
148-
DtClassLoader parentClassloader = new DtClassLoader(new URL[]{}, threadClassLoader);
149-
Thread.currentThread().setContextClassLoader(parentClassloader);
150-
151144
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
152145
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
153146
StreamExecutionEnvironment env = getStreamExeEnv(confProperties, deployMode);
@@ -166,7 +159,7 @@ public static void main(String[] args) throws Exception {
166159
Map<String, Table> registerTableCache = Maps.newHashMap();
167160

168161
//register udf
169-
registerUDF(sqlTree, jarURList, parentClassloader, tableEnv);
162+
registerUDF(sqlTree, jarURList, tableEnv);
170163
//register table schema
171164
registerTable(sqlTree, env, tableEnv, localSqlPluginPath, remoteSqlPluginPath, sideTableMap, registerTableCache);
172165

@@ -219,9 +212,7 @@ public static void main(String[] args) throws Exception {
219212
}
220213

221214
if(env instanceof MyLocalStreamEnvironment) {
222-
List<URL> urlList = new ArrayList<>();
223-
urlList.addAll(Arrays.asList(parentClassloader.getURLs()));
224-
((MyLocalStreamEnvironment) env).setClasspaths(urlList);
215+
((MyLocalStreamEnvironment) env).setClasspaths(ClassLoaderManager.getClassPath());
225216
}
226217

227218
env.execute(name);
@@ -245,19 +236,12 @@ private static void addEnvClassPath(StreamExecutionEnvironment env, Set<URL> cla
245236
}
246237
}
247238

248-
private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, URLClassLoader parentClassloader,
249-
StreamTableEnvironment tableEnv)
239+
private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, StreamTableEnvironment tableEnv)
250240
throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
251241
//register urf
252-
URLClassLoader classLoader = null;
253242
List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList();
254243
for (CreateFuncParser.SqlParserResult funcInfo : funcList) {
255-
//classloader
256-
if (classLoader == null) {
257-
classLoader = FlinkUtil.loadExtraJar(jarURList, parentClassloader);
258-
}
259-
FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(),
260-
tableEnv, classLoader);
244+
FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(), tableEnv, jarURList);
261245
}
262246
}
263247

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.classloader;
20+
21+
import com.dtstack.flink.sql.util.PluginUtil;
22+
import org.apache.commons.lang3.StringUtils;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import java.net.URL;
27+
import java.util.ArrayList;
28+
import java.util.Arrays;
29+
import java.util.Comparator;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.concurrent.ConcurrentHashMap;
33+
34+
/**
35+
* company: www.dtstack.com
36+
* author: toutian
37+
* create: 2019/10/14
38+
*/
39+
public class ClassLoaderManager {
40+
41+
private static final Logger LOG = LoggerFactory.getLogger(ClassLoaderManager.class);
42+
43+
private static Map<String, DtClassLoader> pluginClassLoader = new ConcurrentHashMap<>();
44+
45+
public static <R> R newInstance(String pluginJarPath, ClassLoaderSupplier<R> supplier) throws Exception {
46+
ClassLoader classLoader = retrieveClassLoad(pluginJarPath);
47+
return ClassLoaderSupplierCallBack.callbackAndReset(supplier, classLoader);
48+
}
49+
50+
public static <R> R newInstance(List<URL> jarUrls, ClassLoaderSupplier<R> supplier) throws Exception {
51+
ClassLoader classLoader = retrieveClassLoad(jarUrls);
52+
return ClassLoaderSupplierCallBack.callbackAndReset(supplier, classLoader);
53+
}
54+
55+
private static DtClassLoader retrieveClassLoad(String pluginJarPath) {
56+
return pluginClassLoader.computeIfAbsent(pluginJarPath, k -> {
57+
try {
58+
URL[] urls = PluginUtil.getPluginJarUrls(pluginJarPath);
59+
ClassLoader parentClassLoader = Thread.currentThread().getContextClassLoader();
60+
DtClassLoader classLoader = new DtClassLoader(urls, parentClassLoader);
61+
LOG.info("pluginJarPath:{} create ClassLoad successful...", pluginJarPath);
62+
return classLoader;
63+
} catch (Throwable e) {
64+
LOG.error("retrieve ClassLoad happens error:{}", e);
65+
throw new RuntimeException("retrieve ClassLoad happens error");
66+
}
67+
});
68+
}
69+
70+
private static DtClassLoader retrieveClassLoad(List<URL> jarUrls) {
71+
jarUrls.sort(Comparator.comparing(URL::toString));
72+
String jarUrlkey = StringUtils.join(jarUrls, "_");
73+
return pluginClassLoader.computeIfAbsent(jarUrlkey, k -> {
74+
try {
75+
URL[] urls = jarUrls.toArray(new URL[jarUrls.size()]);
76+
ClassLoader parentClassLoader = Thread.currentThread().getContextClassLoader();
77+
DtClassLoader classLoader = new DtClassLoader(urls, parentClassLoader);
78+
LOG.info("jarUrl:{} create ClassLoad successful...", jarUrlkey);
79+
return classLoader;
80+
} catch (Throwable e) {
81+
LOG.error("retrieve ClassLoad happens error:{}", e);
82+
throw new RuntimeException("retrieve ClassLoad happens error");
83+
}
84+
});
85+
}
86+
87+
public static List<URL> getClassPath() {
88+
List<URL> classPaths = new ArrayList<>();
89+
for (Map.Entry<String, DtClassLoader> entry : pluginClassLoader.entrySet()) {
90+
classPaths.addAll(Arrays.asList(entry.getValue().getURLs()));
91+
}
92+
return classPaths;
93+
}
94+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
package com.dtstack.flink.sql.classloader;
21+
22+
/**
23+
* Represents a supplier of results.
24+
*
25+
* <p>There is no requirement that a new or distinct result be returned each
26+
* time the supplier is invoked.
27+
*
28+
* <p>This is a <a href="package-summary.html">functional interface</a>
29+
* whose functional method is {@link #get()}.
30+
*
31+
* @param <T> the type of results supplied by this supplier
32+
*
33+
* @since 1.8
34+
*/
35+
@FunctionalInterface
36+
public interface ClassLoaderSupplier<T> {
37+
38+
/**
39+
* Gets a result.
40+
*
41+
* @return a result
42+
*/
43+
T get(ClassLoader cl) throws Exception;
44+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
package com.dtstack.flink.sql.classloader;
21+
22+
/**
23+
* company: www.dtstack.com
24+
* author: toutian
25+
* create: 2019/10/14
26+
*/
27+
public class ClassLoaderSupplierCallBack {
28+
29+
public static <R> R callbackAndReset(ClassLoaderSupplier<R> supplier, ClassLoader toSetClassLoader) throws Exception {
30+
ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
31+
Thread.currentThread().setContextClassLoader(toSetClassLoader);
32+
try {
33+
return supplier.get(toSetClassLoader);
34+
} finally {
35+
Thread.currentThread().setContextClassLoader(oldClassLoader);
36+
}
37+
}
38+
39+
40+
}

core/src/main/java/com/dtstack/flink/sql/side/StreamSideFactory.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,10 @@
1616
* limitations under the License.
1717
*/
1818

19-
2019

2120
package com.dtstack.flink.sql.side;
2221

23-
import com.dtstack.flink.sql.classloader.DtClassLoader;
22+
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2423
import com.dtstack.flink.sql.enums.ECacheType;
2524
import com.dtstack.flink.sql.table.AbsSideTableParser;
2625
import com.dtstack.flink.sql.table.AbsTableParser;
@@ -30,6 +29,7 @@
3029
* get specify side parser
3130
* Date: 2018/7/25
3231
* Company: www.dtstack.com
32+
*
3333
* @author xuchao
3434
*/
3535

@@ -40,18 +40,15 @@ public class StreamSideFactory {
4040
public static AbsTableParser getSqlParser(String pluginType, String sqlRootDir, String cacheType) throws Exception {
4141

4242
String sideOperator = ECacheType.ALL.name().equals(cacheType) ? "all" : "async";
43-
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
4443
String pluginJarPath = PluginUtil.getSideJarFileDirPath(pluginType, sideOperator, "side", sqlRootDir);
45-
46-
DtClassLoader dtClassLoader = (DtClassLoader) classLoader;
47-
PluginUtil.addPluginJar(pluginJarPath, dtClassLoader);
4844
String className = PluginUtil.getSqlParserClassName(pluginType, CURR_TYPE);
4945

50-
Class<?> sideParser = dtClassLoader.loadClass(className);
51-
if(!AbsSideTableParser.class.isAssignableFrom(sideParser)){
52-
throw new RuntimeException("class " + sideParser.getName() + " not subClass of AbsSideTableParser");
53-
}
54-
55-
return sideParser.asSubclass(AbsTableParser.class).newInstance();
46+
return ClassLoaderManager.newInstance(pluginJarPath, (cl) -> {
47+
Class<?> sideParser = cl.loadClass(className);
48+
if (!AbsSideTableParser.class.isAssignableFrom(sideParser)) {
49+
throw new RuntimeException("class " + sideParser.getName() + " not subClass of AbsSideTableParser");
50+
}
51+
return sideParser.asSubclass(AbsTableParser.class).newInstance();
52+
});
5653
}
5754
}

core/src/main/java/com/dtstack/flink/sql/side/operator/SideAsyncOperator.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
package com.dtstack.flink.sql.side.operator;
2121

22-
import com.dtstack.flink.sql.classloader.DtClassLoader;
22+
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2323
import com.dtstack.flink.sql.side.AsyncReqRow;
2424
import com.dtstack.flink.sql.side.FieldInfo;
2525
import com.dtstack.flink.sql.side.JoinInfo;
@@ -49,14 +49,13 @@ public class SideAsyncOperator {
4949

5050
private static AsyncReqRow loadAsyncReq(String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo,
5151
JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) throws Exception {
52-
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
5352
String pathOfType = String.format(PATH_FORMAT, sideType);
5453
String pluginJarPath = PluginUtil.getJarFileDirPath(pathOfType, sqlRootDir);
55-
DtClassLoader dtClassLoader = (DtClassLoader) classLoader;
56-
PluginUtil.addPluginJar(pluginJarPath, dtClassLoader);
5754
String className = PluginUtil.getSqlSideClassName(sideType, "side", "Async");
58-
return dtClassLoader.loadClass(className).asSubclass(AsyncReqRow.class)
59-
.getConstructor(RowTypeInfo.class, JoinInfo.class, List.class, SideTableInfo.class).newInstance(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
55+
return ClassLoaderManager.newInstance(pluginJarPath, (cl) ->
56+
cl.loadClass(className).asSubclass(AsyncReqRow.class)
57+
.getConstructor(RowTypeInfo.class, JoinInfo.class, List.class, SideTableInfo.class)
58+
.newInstance(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
6059
}
6160

6261
public static DataStream getSideJoinDataStream(DataStream inputStream, String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo, JoinInfo joinInfo,

core/src/main/java/com/dtstack/flink/sql/side/operator/SideWithAllCacheOperator.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
package com.dtstack.flink.sql.side.operator;
2121

22-
import com.dtstack.flink.sql.classloader.DtClassLoader;
22+
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2323
import com.dtstack.flink.sql.side.AllReqRow;
2424
import com.dtstack.flink.sql.side.FieldInfo;
2525
import com.dtstack.flink.sql.side.JoinInfo;
@@ -28,7 +28,6 @@
2828
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2929
import org.apache.flink.streaming.api.datastream.DataStream;
3030

31-
import java.net.MalformedURLException;
3231
import java.util.List;
3332

3433
/**
@@ -47,18 +46,13 @@ private static AllReqRow loadFlatMap(String sideType, String sqlRootDir, RowType
4746
JoinInfo joinInfo, List<FieldInfo> outFieldInfoList,
4847
SideTableInfo sideTableInfo) throws Exception {
4948

50-
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
5149
String pathOfType = String.format(PATH_FORMAT, sideType);
5250
String pluginJarPath = PluginUtil.getJarFileDirPath(pathOfType, sqlRootDir);
53-
54-
DtClassLoader dtClassLoader = (DtClassLoader) classLoader;
55-
PluginUtil.addPluginJar(pluginJarPath, dtClassLoader);
5651
String className = PluginUtil.getSqlSideClassName(sideType, "side", "All");
5752

58-
return dtClassLoader.loadClass(className).asSubclass(AllReqRow.class).getConstructor(RowTypeInfo.class, JoinInfo.class, List.class, SideTableInfo.class)
59-
.newInstance(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
60-
61-
53+
return ClassLoaderManager.newInstance(pluginJarPath, (cl) -> cl.loadClass(className).asSubclass(AllReqRow.class)
54+
.getConstructor(RowTypeInfo.class, JoinInfo.class, List.class, SideTableInfo.class)
55+
.newInstance(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
6256
}
6357

6458
public static DataStream getSideJoinDataStream(DataStream inputStream, String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo, JoinInfo joinInfo,

0 commit comments

Comments
 (0)