Skip to content

Commit 3ae5e2a

Browse files
committed
flinksql 150 classloader
1 parent 0655426 commit 3ae5e2a

File tree

9 files changed

+215
-90
lines changed

9 files changed

+215
-90
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
package com.dtstack.flink.sql.classloader;
21+
22+
/**
23+
* company: www.dtstack.com
24+
* author: toutian
25+
* create: 2019/10/14
26+
*/
27+
public class ClassLoaderCallBackMethod {
28+
29+
public static <R> R callbackAndReset(DtSupplier<R> supplier, ClassLoader toSetClassLoader) throws Exception {
30+
ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
31+
Thread.currentThread().setContextClassLoader(toSetClassLoader);
32+
try {
33+
return supplier.get(toSetClassLoader);
34+
} finally {
35+
Thread.currentThread().setContextClassLoader(oldClassLoader);
36+
}
37+
}
38+
39+
40+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.classloader;
20+
21+
import com.dtstack.flink.sql.util.PluginUtil;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
import java.net.URL;
26+
import java.util.Map;
27+
import java.util.concurrent.ConcurrentHashMap;
28+
29+
/**
30+
* company: www.dtstack.com
31+
* author: toutian
32+
* create: 2019/10/14
33+
*/
34+
public class ClassLoaderManager {
35+
36+
private static final Logger LOG = LoggerFactory.getLogger(ClassLoaderManager.class);
37+
38+
private static Map<String, DtClassLoader> pluginClassLoader = new ConcurrentHashMap<>();
39+
40+
public static <R> R newInstance(String pluginJarPath, DtSupplier<R> supplier) throws Exception {
41+
ClassLoader classLoader = retrieveClassLoad(pluginJarPath);
42+
return ClassLoaderCallBackMethod.callbackAndReset(supplier, classLoader);
43+
}
44+
45+
private static DtClassLoader retrieveClassLoad(String pluginJarPath) {
46+
return pluginClassLoader.computeIfAbsent(pluginJarPath, k -> {
47+
try {
48+
URL[] urls = PluginUtil.getPluginJarUrls(pluginJarPath);
49+
ClassLoader parentClassLoader = Thread.currentThread().getContextClassLoader();
50+
DtClassLoader classLoader = new DtClassLoader(urls, parentClassLoader);
51+
LOG.info("pluginJarPath:{} create ClassLoad successful...", pluginJarPath);
52+
return classLoader;
53+
} catch (Throwable e) {
54+
LOG.error("retrieve ClassLoad happens error:{}", e);
55+
throw new RuntimeException("retrieve ClassLoad happens error");
56+
}
57+
});
58+
}
59+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
package com.dtstack.flink.sql.classloader;
21+
22+
/**
23+
* Represents a supplier of results.
24+
*
25+
* <p>There is no requirement that a new or distinct result be returned each
26+
* time the supplier is invoked.
27+
*
28+
* <p>This is a <a href="package-summary.html">functional interface</a>
29+
* whose functional method is {@link #get()}.
30+
*
31+
* @param <T> the type of results supplied by this supplier
32+
*
33+
* @since 1.8
34+
*/
35+
@FunctionalInterface
36+
public interface DtSupplier<T> {
37+
38+
/**
39+
* Gets a result.
40+
*
41+
* @return a result
42+
*/
43+
T get(ClassLoader cl) throws Exception;
44+
}

core/src/main/java/com/dtstack/flink/sql/side/StreamSideFactory.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,10 @@
1616
* limitations under the License.
1717
*/
1818

19-
2019

2120
package com.dtstack.flink.sql.side;
2221

23-
import com.dtstack.flink.sql.classloader.DtClassLoader;
22+
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2423
import com.dtstack.flink.sql.enums.ECacheType;
2524
import com.dtstack.flink.sql.table.AbsSideTableParser;
2625
import com.dtstack.flink.sql.table.AbsTableParser;
@@ -30,6 +29,7 @@
3029
* get specify side parser
3130
* Date: 2018/7/25
3231
* Company: www.dtstack.com
32+
*
3333
* @author xuchao
3434
*/
3535

@@ -40,18 +40,15 @@ public class StreamSideFactory {
4040
public static AbsTableParser getSqlParser(String pluginType, String sqlRootDir, String cacheType) throws Exception {
4141

4242
String sideOperator = ECacheType.ALL.name().equals(cacheType) ? "all" : "async";
43-
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
4443
String pluginJarPath = PluginUtil.getSideJarFileDirPath(pluginType, sideOperator, "side", sqlRootDir);
45-
46-
DtClassLoader dtClassLoader = (DtClassLoader) classLoader;
47-
PluginUtil.addPluginJar(pluginJarPath, dtClassLoader);
4844
String className = PluginUtil.getSqlParserClassName(pluginType, CURR_TYPE);
4945

50-
Class<?> sideParser = dtClassLoader.loadClass(className);
51-
if(!AbsSideTableParser.class.isAssignableFrom(sideParser)){
52-
throw new RuntimeException("class " + sideParser.getName() + " not subClass of AbsSideTableParser");
53-
}
54-
55-
return sideParser.asSubclass(AbsTableParser.class).newInstance();
46+
return ClassLoaderManager.newInstance(pluginJarPath, (cl) -> {
47+
Class<?> sideParser = cl.loadClass(className);
48+
if (!AbsSideTableParser.class.isAssignableFrom(sideParser)) {
49+
throw new RuntimeException("class " + sideParser.getName() + " not subClass of AbsSideTableParser");
50+
}
51+
return sideParser.asSubclass(AbsTableParser.class).newInstance();
52+
});
5653
}
5754
}

core/src/main/java/com/dtstack/flink/sql/side/operator/SideAsyncOperator.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
package com.dtstack.flink.sql.side.operator;
2121

22-
import com.dtstack.flink.sql.classloader.DtClassLoader;
22+
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2323
import com.dtstack.flink.sql.side.AsyncReqRow;
2424
import com.dtstack.flink.sql.side.FieldInfo;
2525
import com.dtstack.flink.sql.side.JoinInfo;
@@ -49,14 +49,13 @@ public class SideAsyncOperator {
4949

5050
private static AsyncReqRow loadAsyncReq(String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo,
5151
JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) throws Exception {
52-
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
5352
String pathOfType = String.format(PATH_FORMAT, sideType);
5453
String pluginJarPath = PluginUtil.getJarFileDirPath(pathOfType, sqlRootDir);
55-
DtClassLoader dtClassLoader = (DtClassLoader) classLoader;
56-
PluginUtil.addPluginJar(pluginJarPath, dtClassLoader);
5754
String className = PluginUtil.getSqlSideClassName(sideType, "side", "Async");
58-
return dtClassLoader.loadClass(className).asSubclass(AsyncReqRow.class)
59-
.getConstructor(RowTypeInfo.class, JoinInfo.class, List.class, SideTableInfo.class).newInstance(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
55+
return ClassLoaderManager.newInstance(pluginJarPath, (cl) ->
56+
cl.loadClass(className).asSubclass(AsyncReqRow.class)
57+
.getConstructor(RowTypeInfo.class, JoinInfo.class, List.class, SideTableInfo.class)
58+
.newInstance(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
6059
}
6160

6261
public static DataStream getSideJoinDataStream(DataStream inputStream, String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo, JoinInfo joinInfo,

core/src/main/java/com/dtstack/flink/sql/side/operator/SideWithAllCacheOperator.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
package com.dtstack.flink.sql.side.operator;
2121

22-
import com.dtstack.flink.sql.classloader.DtClassLoader;
22+
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2323
import com.dtstack.flink.sql.side.AllReqRow;
2424
import com.dtstack.flink.sql.side.FieldInfo;
2525
import com.dtstack.flink.sql.side.JoinInfo;
@@ -28,7 +28,6 @@
2828
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2929
import org.apache.flink.streaming.api.datastream.DataStream;
3030

31-
import java.net.MalformedURLException;
3231
import java.util.List;
3332

3433
/**
@@ -47,18 +46,13 @@ private static AllReqRow loadFlatMap(String sideType, String sqlRootDir, RowType
4746
JoinInfo joinInfo, List<FieldInfo> outFieldInfoList,
4847
SideTableInfo sideTableInfo) throws Exception {
4948

50-
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
5149
String pathOfType = String.format(PATH_FORMAT, sideType);
5250
String pluginJarPath = PluginUtil.getJarFileDirPath(pathOfType, sqlRootDir);
53-
54-
DtClassLoader dtClassLoader = (DtClassLoader) classLoader;
55-
PluginUtil.addPluginJar(pluginJarPath, dtClassLoader);
5651
String className = PluginUtil.getSqlSideClassName(sideType, "side", "All");
5752

58-
return dtClassLoader.loadClass(className).asSubclass(AllReqRow.class).getConstructor(RowTypeInfo.class, JoinInfo.class, List.class, SideTableInfo.class)
59-
.newInstance(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
60-
61-
53+
return ClassLoaderManager.newInstance(pluginJarPath, (cl) -> cl.loadClass(className).asSubclass(AllReqRow.class)
54+
.getConstructor(RowTypeInfo.class, JoinInfo.class, List.class, SideTableInfo.class)
55+
.newInstance(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
6256
}
6357

6458
public static DataStream getSideJoinDataStream(DataStream inputStream, String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo, JoinInfo joinInfo,

core/src/main/java/com/dtstack/flink/sql/sink/StreamSinkFactory.java

Lines changed: 17 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,11 @@
2020

2121
package com.dtstack.flink.sql.sink;
2222

23-
import com.dtstack.flink.sql.classloader.DtClassLoader;
23+
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2424
import com.dtstack.flink.sql.table.AbsTableParser;
2525
import com.dtstack.flink.sql.table.TargetTableInfo;
2626
import com.dtstack.flink.sql.util.DtStringUtil;
2727
import com.dtstack.flink.sql.util.PluginUtil;
28-
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
2928
import org.apache.flink.table.sinks.TableSink;
3029

3130
/**
@@ -42,51 +41,33 @@ public class StreamSinkFactory {
4241
private static final String DIR_NAME_FORMAT = "%ssink";
4342

4443
public static AbsTableParser getSqlParser(String pluginType, String sqlRootDir) throws Exception {
45-
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
46-
47-
if(!(classLoader instanceof DtClassLoader)){
48-
throw new RuntimeException("it's not a correct classLoader instance, it's type must be DtClassLoader!");
49-
}
50-
51-
DtClassLoader dtClassLoader = (DtClassLoader) classLoader;
52-
5344
String pluginJarPath = PluginUtil.getJarFileDirPath(String.format(DIR_NAME_FORMAT, pluginType), sqlRootDir);
54-
PluginUtil.addPluginJar(pluginJarPath, dtClassLoader);
5545
String typeNoVersion = DtStringUtil.getPluginTypeWithoutVersion(pluginType);
5646
String className = PluginUtil.getSqlParserClassName(typeNoVersion, CURR_TYPE);
57-
Class<?> targetParser = dtClassLoader.loadClass(className);
5847

59-
if(!AbsTableParser.class.isAssignableFrom(targetParser)){
60-
throw new RuntimeException("class " + targetParser.getName() + " not subClass of AbsTableParser");
61-
}
62-
63-
return targetParser.asSubclass(AbsTableParser.class).newInstance();
48+
return ClassLoaderManager.newInstance(pluginJarPath, (cl) -> {
49+
Class<?> targetParser = cl.loadClass(className);
50+
if(!AbsTableParser.class.isAssignableFrom(targetParser)){
51+
throw new RuntimeException("class " + targetParser.getName() + " not subClass of AbsTableParser");
52+
}
53+
return targetParser.asSubclass(AbsTableParser.class).newInstance();
54+
});
6455
}
6556

6657
public static TableSink getTableSink(TargetTableInfo targetTableInfo, String localSqlRootDir) throws Exception {
67-
68-
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
69-
if(!(classLoader instanceof DtClassLoader)){
70-
throw new RuntimeException("it's not a correct classLoader instance, it's type must be DtClassLoader!");
71-
}
72-
73-
DtClassLoader dtClassLoader = (DtClassLoader) classLoader;
74-
7558
String pluginType = targetTableInfo.getType();
7659
String pluginJarDirPath = PluginUtil.getJarFileDirPath(String.format(DIR_NAME_FORMAT, pluginType), localSqlRootDir);
77-
78-
PluginUtil.addPluginJar(pluginJarDirPath, dtClassLoader);
79-
8060
String typeNoVersion = DtStringUtil.getPluginTypeWithoutVersion(pluginType);
8161
String className = PluginUtil.getGenerClassName(typeNoVersion, CURR_TYPE);
82-
Class<?> sinkClass = dtClassLoader.loadClass(className);
83-
84-
if(!IStreamSinkGener.class.isAssignableFrom(sinkClass)){
85-
throw new RuntimeException("class " + sinkClass + " not subClass of IStreamSinkGener");
86-
}
8762

88-
IStreamSinkGener streamSinkGener = sinkClass.asSubclass(IStreamSinkGener.class).newInstance();
89-
Object result = streamSinkGener.genStreamSink(targetTableInfo);
90-
return (TableSink) result;
63+
return ClassLoaderManager.newInstance(pluginJarDirPath, (cl) -> {
64+
Class<?> sinkClass = cl.loadClass(className);
65+
if(!IStreamSinkGener.class.isAssignableFrom(sinkClass)){
66+
throw new RuntimeException("class " + sinkClass + " not subClass of IStreamSinkGener");
67+
}
68+
IStreamSinkGener streamSinkGener = sinkClass.asSubclass(IStreamSinkGener.class).newInstance();
69+
Object result = streamSinkGener.genStreamSink(targetTableInfo);
70+
return (TableSink) result;
71+
});
9172
}
9273
}

0 commit comments

Comments
 (0)