Skip to content

Commit 05994ef

Browse files
committed
[fix] udf class conflict
1 parent 59e83f8 commit 05994ef

File tree

3 files changed

+26
-2
lines changed

3 files changed

+26
-2
lines changed

core/src/main/java/com/dtstack/flink/sql/GetPlan.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.dtstack.flink.sql.exec.ParamsInfo;
2424
import org.apache.commons.lang.exception.ExceptionUtils;
2525
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
2628

2729
/**
2830
* local模式获取sql任务的执行计划
@@ -32,15 +34,19 @@
3234
*/
3335
public class GetPlan {
3436

37+
private static final Logger LOG = LoggerFactory.getLogger(GetPlan.class);
38+
3539
public static String getExecutionPlan(String[] args) {
3640
try {
3741
long start = System.currentTimeMillis();
3842
ParamsInfo paramsInfo = ExecuteProcessHelper.parseParams(args);
43+
paramsInfo.setGetPlan(true);
3944
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExecution(paramsInfo);
4045
String executionPlan = env.getExecutionPlan();
4146
long end = System.currentTimeMillis();
4247
return ApiResult.createSuccessResultJsonStr(executionPlan, end - start);
4348
} catch (Exception e) {
49+
LOG.error("Get plan error", e);
4450
return ApiResult.createErrorResultJsonStr(ExceptionUtils.getFullStackTrace(e));
4551
}
4652
}

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.exec;
2020

21+
import com.dtstack.flink.sql.GetPlan;
2122
import com.dtstack.flink.sql.parser.CreateFuncParser;
2223
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
2324
import com.dtstack.flink.sql.parser.FlinkPlanner;
@@ -74,6 +75,7 @@
7475
import java.util.Arrays;
7576
import java.util.List;
7677
import java.util.Map;
78+
import java.util.Objects;
7779
import java.util.Properties;
7880
import java.util.Set;
7981
import java.util.TimeZone;
@@ -158,7 +160,7 @@ public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInf
158160
Map<String, Table> registerTableCache = Maps.newHashMap();
159161

160162
//register udf
161-
ExecuteProcessHelper.registerUserDefinedFunction(sqlTree, paramsInfo.getJarUrlList(), tableEnv);
163+
ExecuteProcessHelper.registerUserDefinedFunction(sqlTree, paramsInfo.getJarUrlList(), tableEnv, paramsInfo.isGetPlan());
162164
//register table schema
163165
Set<URL> classPathSets = ExecuteProcessHelper.registerTable(sqlTree, env, tableEnv, paramsInfo.getLocalSqlPluginPath(),
164166
paramsInfo.getRemoteSqlPluginPath(), paramsInfo.getPluginLoadMode(), sideTableMap, registerTableCache);
@@ -243,13 +245,19 @@ private static void sqlTranslation(String localSqlPluginPath,
243245
}
244246
}
245247

246-
public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrlList, TableEnvironment tableEnv)
248+
public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrlList, TableEnvironment tableEnv, boolean getPlan)
247249
throws IllegalAccessException, InvocationTargetException {
248250
// udf和tableEnv须由同一个类加载器加载
249251
ClassLoader levelClassLoader = tableEnv.getClass().getClassLoader();
250252
URLClassLoader classLoader = null;
251253
List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList();
252254
for (CreateFuncParser.SqlParserResult funcInfo : funcList) {
255+
// 构建plan的情况下,udf和tableEnv不需要是同一个类加载器
256+
if (getPlan) {
257+
URL[] urls = jarUrlList.toArray(new URL[0]);
258+
classLoader = URLClassLoader.newInstance(urls);
259+
}
260+
253261
//classloader
254262
if (classLoader == null) {
255263
classLoader = ClassLoaderManager.loadExtraJar(jarUrlList, (URLClassLoader) levelClassLoader);

core/src/main/java/com/dtstack/flink/sql/exec/ParamsInfo.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.net.URL;
2323
import java.util.List;
24+
import java.util.Objects;
2425
import java.util.Properties;
2526

2627
/**
@@ -39,6 +40,7 @@ public class ParamsInfo {
3940
private String pluginLoadMode;
4041
private String deployMode;
4142
private Properties confProp;
43+
private boolean getPlan = false;
4244

4345
public ParamsInfo(String sql, String name, List<URL> jarUrlList, String localSqlPluginPath,
4446
String remoteSqlPluginPath, String pluginLoadMode, String deployMode, Properties confProp) {
@@ -52,6 +54,14 @@ public ParamsInfo(String sql, String name, List<URL> jarUrlList, String localSql
5254
this.confProp = confProp;
5355
}
5456

57+
public boolean isGetPlan() {
58+
return getPlan;
59+
}
60+
61+
public void setGetPlan(boolean getPlan) {
62+
this.getPlan = getPlan;
63+
}
64+
5565
public String getSql() {
5666
return sql;
5767
}

0 commit comments

Comments
 (0)