Skip to content

Commit c91046b

Browse files
author
杨思枢_思枢
committed
Merge branch 'v1.8.0_dev_classloader' into 'v1.8.0_dev'
V1.8.0 dev classloader See merge request !89
2 parents 4700955 + 496e9ad commit c91046b

File tree

11 files changed

+272
-135
lines changed

11 files changed

+272
-135
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
package com.dtstack.flink.sql;
2222

23-
import com.dtstack.flink.sql.classloader.DtClassLoader;
23+
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2424
import com.dtstack.flink.sql.constrant.ConfigConstrant;
2525
import com.dtstack.flink.sql.enums.ClusterMode;
2626
import com.dtstack.flink.sql.enums.ECacheType;
@@ -75,10 +75,7 @@
7575
import java.lang.reflect.InvocationTargetException;
7676
import java.lang.reflect.Method;
7777
import java.net.URL;
78-
import java.net.URLClassLoader;
7978
import java.net.URLDecoder;
80-
import java.util.ArrayList;
81-
import java.util.Arrays;
8279
import java.util.List;
8380
import java.util.Map;
8481
import java.util.Properties;
@@ -126,10 +123,6 @@ public static void main(String[] args) throws Exception {
126123
addJarFileList = objMapper.readValue(addJarListStr, List.class);
127124
}
128125

129-
ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
130-
DtClassLoader parentClassloader = new DtClassLoader(new URL[]{}, threadClassLoader);
131-
Thread.currentThread().setContextClassLoader(parentClassloader);
132-
133126
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
134127
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
135128
StreamExecutionEnvironment env = getStreamExeEnv(confProperties, deployMode);
@@ -148,16 +141,14 @@ public static void main(String[] args) throws Exception {
148141
Map<String, Table> registerTableCache = Maps.newHashMap();
149142

150143
//register udf
151-
registerUDF(sqlTree, jarURList, parentClassloader, tableEnv);
144+
registerUDF(sqlTree, jarURList, tableEnv);
152145
//register table schema
153146
registerTable(sqlTree, env, tableEnv, localSqlPluginPath, remoteSqlPluginPath, sideTableMap, registerTableCache);
154147

155148
sqlTranslation(options,tableEnv,sqlTree,sideTableMap,registerTableCache);
156149

157150
if(env instanceof MyLocalStreamEnvironment) {
158-
List<URL> urlList = new ArrayList<>();
159-
urlList.addAll(Arrays.asList(parentClassloader.getURLs()));
160-
((MyLocalStreamEnvironment) env).setClasspaths(urlList);
151+
((MyLocalStreamEnvironment) env).setClasspaths(ClassLoaderManager.getClassPath());
161152
}
162153

163154
env.execute(name);
@@ -224,19 +215,12 @@ private static void addEnvClassPath(StreamExecutionEnvironment env, Set<URL> cla
224215
}
225216
}
226217

227-
private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, URLClassLoader parentClassloader,
228-
StreamTableEnvironment tableEnv)
218+
private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, StreamTableEnvironment tableEnv)
229219
throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
230-
List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList();
231-
if (funcList.isEmpty()) {
232-
return;
233-
}
234-
//load jar
235-
URLClassLoader classLoader = FlinkUtil.loadExtraJar(jarURList, parentClassloader);
236220
//register urf
221+
List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList();
237222
for (CreateFuncParser.SqlParserResult funcInfo : funcList) {
238-
FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(),
239-
tableEnv, classLoader);
223+
FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(), tableEnv, jarURList);
240224
}
241225
}
242226

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.classloader;
20+
21+
import com.dtstack.flink.sql.util.PluginUtil;
22+
import org.apache.commons.lang3.StringUtils;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import java.net.URL;
27+
import java.util.ArrayList;
28+
import java.util.Arrays;
29+
import java.util.Comparator;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.concurrent.ConcurrentHashMap;
33+
34+
/**
35+
* company: www.dtstack.com
36+
* author: toutian
37+
* create: 2019/10/14
38+
*/
39+
public class ClassLoaderManager {
40+
41+
private static final Logger LOG = LoggerFactory.getLogger(ClassLoaderManager.class);
42+
43+
private static Map<String, DtClassLoader> pluginClassLoader = new ConcurrentHashMap<>();
44+
45+
public static <R> R newInstance(String pluginJarPath, ClassLoaderSupplier<R> supplier) throws Exception {
46+
ClassLoader classLoader = retrieveClassLoad(pluginJarPath);
47+
return ClassLoaderSupplierCallBack.callbackAndReset(supplier, classLoader);
48+
}
49+
50+
public static <R> R newInstance(List<URL> jarUrls, ClassLoaderSupplier<R> supplier) throws Exception {
51+
ClassLoader classLoader = retrieveClassLoad(jarUrls);
52+
return ClassLoaderSupplierCallBack.callbackAndReset(supplier, classLoader);
53+
}
54+
55+
private static DtClassLoader retrieveClassLoad(String pluginJarPath) {
56+
return pluginClassLoader.computeIfAbsent(pluginJarPath, k -> {
57+
try {
58+
URL[] urls = PluginUtil.getPluginJarUrls(pluginJarPath);
59+
ClassLoader parentClassLoader = Thread.currentThread().getContextClassLoader();
60+
DtClassLoader classLoader = new DtClassLoader(urls, parentClassLoader);
61+
LOG.info("pluginJarPath:{} create ClassLoad successful...", pluginJarPath);
62+
return classLoader;
63+
} catch (Throwable e) {
64+
LOG.error("retrieve ClassLoad happens error:{}", e);
65+
throw new RuntimeException("retrieve ClassLoad happens error");
66+
}
67+
});
68+
}
69+
70+
private static DtClassLoader retrieveClassLoad(List<URL> jarUrls) {
71+
jarUrls.sort(Comparator.comparing(URL::toString));
72+
String jarUrlkey = StringUtils.join(jarUrls, "_");
73+
return pluginClassLoader.computeIfAbsent(jarUrlkey, k -> {
74+
try {
75+
URL[] urls = jarUrls.toArray(new URL[jarUrls.size()]);
76+
ClassLoader parentClassLoader = Thread.currentThread().getContextClassLoader();
77+
DtClassLoader classLoader = new DtClassLoader(urls, parentClassLoader);
78+
LOG.info("jarUrl:{} create ClassLoad successful...", jarUrlkey);
79+
return classLoader;
80+
} catch (Throwable e) {
81+
LOG.error("retrieve ClassLoad happens error:{}", e);
82+
throw new RuntimeException("retrieve ClassLoad happens error");
83+
}
84+
});
85+
}
86+
87+
public static List<URL> getClassPath() {
88+
List<URL> classPaths = new ArrayList<>();
89+
for (Map.Entry<String, DtClassLoader> entry : pluginClassLoader.entrySet()) {
90+
classPaths.addAll(Arrays.asList(entry.getValue().getURLs()));
91+
}
92+
return classPaths;
93+
}
94+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
package com.dtstack.flink.sql.classloader;
21+
22+
/**
23+
* Represents a supplier of results.
24+
*
25+
* <p>There is no requirement that a new or distinct result be returned each
26+
* time the supplier is invoked.
27+
*
28+
* <p>This is a <a href="package-summary.html">functional interface</a>
29+
* whose functional method is {@link #get()}.
30+
*
31+
* @param <T> the type of results supplied by this supplier
32+
*
33+
* @since 1.8
34+
*/
35+
@FunctionalInterface
36+
public interface ClassLoaderSupplier<T> {
37+
38+
/**
39+
* Gets a result.
40+
*
41+
* @return a result
42+
*/
43+
T get(ClassLoader cl) throws Exception;
44+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
package com.dtstack.flink.sql.classloader;
21+
22+
/**
23+
* company: www.dtstack.com
24+
* author: toutian
25+
* create: 2019/10/14
26+
*/
27+
public class ClassLoaderSupplierCallBack {
28+
29+
public static <R> R callbackAndReset(ClassLoaderSupplier<R> supplier, ClassLoader toSetClassLoader) throws Exception {
30+
ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
31+
Thread.currentThread().setContextClassLoader(toSetClassLoader);
32+
try {
33+
return supplier.get(toSetClassLoader);
34+
} finally {
35+
Thread.currentThread().setContextClassLoader(oldClassLoader);
36+
}
37+
}
38+
39+
40+
}

core/src/main/java/com/dtstack/flink/sql/side/StreamSideFactory.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,10 @@
1616
* limitations under the License.
1717
*/
1818

19-
2019

2120
package com.dtstack.flink.sql.side;
2221

23-
import com.dtstack.flink.sql.classloader.DtClassLoader;
22+
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2423
import com.dtstack.flink.sql.enums.ECacheType;
2524
import com.dtstack.flink.sql.table.AbsSideTableParser;
2625
import com.dtstack.flink.sql.table.AbsTableParser;
@@ -30,6 +29,7 @@
3029
* get specify side parser
3130
* Date: 2018/7/25
3231
* Company: www.dtstack.com
32+
*
3333
* @author xuchao
3434
*/
3535

@@ -40,18 +40,15 @@ public class StreamSideFactory {
4040
public static AbsTableParser getSqlParser(String pluginType, String sqlRootDir, String cacheType) throws Exception {
4141

4242
String sideOperator = ECacheType.ALL.name().equals(cacheType) ? "all" : "async";
43-
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
4443
String pluginJarPath = PluginUtil.getSideJarFileDirPath(pluginType, sideOperator, "side", sqlRootDir);
45-
46-
DtClassLoader dtClassLoader = (DtClassLoader) classLoader;
47-
PluginUtil.addPluginJar(pluginJarPath, dtClassLoader);
4844
String className = PluginUtil.getSqlParserClassName(pluginType, CURR_TYPE);
4945

50-
Class<?> sideParser = dtClassLoader.loadClass(className);
51-
if(!AbsSideTableParser.class.isAssignableFrom(sideParser)){
52-
throw new RuntimeException("class " + sideParser.getName() + " not subClass of AbsSideTableParser");
53-
}
54-
55-
return sideParser.asSubclass(AbsTableParser.class).newInstance();
46+
return ClassLoaderManager.newInstance(pluginJarPath, (cl) -> {
47+
Class<?> sideParser = cl.loadClass(className);
48+
if (!AbsSideTableParser.class.isAssignableFrom(sideParser)) {
49+
throw new RuntimeException("class " + sideParser.getName() + " not subClass of AbsSideTableParser");
50+
}
51+
return sideParser.asSubclass(AbsTableParser.class).newInstance();
52+
});
5653
}
5754
}

core/src/main/java/com/dtstack/flink/sql/side/operator/SideAsyncOperator.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
package com.dtstack.flink.sql.side.operator;
2121

22-
import com.dtstack.flink.sql.classloader.DtClassLoader;
22+
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2323
import com.dtstack.flink.sql.side.AsyncReqRow;
2424
import com.dtstack.flink.sql.side.FieldInfo;
2525
import com.dtstack.flink.sql.side.JoinInfo;
@@ -51,14 +51,13 @@ public class SideAsyncOperator {
5151

5252
private static AsyncReqRow loadAsyncReq(String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo,
5353
JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) throws Exception {
54-
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
5554
String pathOfType = String.format(PATH_FORMAT, sideType);
5655
String pluginJarPath = PluginUtil.getJarFileDirPath(pathOfType, sqlRootDir);
57-
DtClassLoader dtClassLoader = (DtClassLoader) classLoader;
58-
PluginUtil.addPluginJar(pluginJarPath, dtClassLoader);
5956
String className = PluginUtil.getSqlSideClassName(sideType, "side", OPERATOR_TYPE);
60-
return dtClassLoader.loadClass(className).asSubclass(AsyncReqRow.class)
61-
.getConstructor(RowTypeInfo.class, JoinInfo.class, List.class, SideTableInfo.class).newInstance(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
57+
return ClassLoaderManager.newInstance(pluginJarPath, (cl) ->
58+
cl.loadClass(className).asSubclass(AsyncReqRow.class)
59+
.getConstructor(RowTypeInfo.class, JoinInfo.class, List.class, SideTableInfo.class)
60+
.newInstance(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
6261
}
6362

6463
public static DataStream getSideJoinDataStream(DataStream inputStream, String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo, JoinInfo joinInfo,

core/src/main/java/com/dtstack/flink/sql/side/operator/SideWithAllCacheOperator.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
package com.dtstack.flink.sql.side.operator;
2121

22-
import com.dtstack.flink.sql.classloader.DtClassLoader;
22+
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2323
import com.dtstack.flink.sql.side.AllReqRow;
2424
import com.dtstack.flink.sql.side.FieldInfo;
2525
import com.dtstack.flink.sql.side.JoinInfo;
@@ -28,7 +28,6 @@
2828
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2929
import org.apache.flink.streaming.api.datastream.DataStream;
3030

31-
import java.net.MalformedURLException;
3231
import java.util.List;
3332

3433
/**
@@ -49,18 +48,13 @@ private static AllReqRow loadFlatMap(String sideType, String sqlRootDir, RowType
4948
JoinInfo joinInfo, List<FieldInfo> outFieldInfoList,
5049
SideTableInfo sideTableInfo) throws Exception {
5150

52-
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
5351
String pathOfType = String.format(PATH_FORMAT, sideType);
5452
String pluginJarPath = PluginUtil.getJarFileDirPath(pathOfType, sqlRootDir);
55-
56-
DtClassLoader dtClassLoader = (DtClassLoader) classLoader;
57-
PluginUtil.addPluginJar(pluginJarPath, dtClassLoader);
5853
String className = PluginUtil.getSqlSideClassName(sideType, "side", OPERATOR_TYPE);
5954

60-
return dtClassLoader.loadClass(className).asSubclass(AllReqRow.class).getConstructor(RowTypeInfo.class, JoinInfo.class, List.class, SideTableInfo.class)
61-
.newInstance(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
62-
63-
55+
return ClassLoaderManager.newInstance(pluginJarPath, (cl) -> cl.loadClass(className).asSubclass(AllReqRow.class)
56+
.getConstructor(RowTypeInfo.class, JoinInfo.class, List.class, SideTableInfo.class)
57+
.newInstance(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
6458
}
6559

6660
public static DataStream getSideJoinDataStream(DataStream inputStream, String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo, JoinInfo joinInfo,

0 commit comments

Comments
 (0)