Skip to content

Commit 6e3fe25

Browse files
author
gituser
committed
Merge branch 'hotfix_1.10_4.0.x_30324' into 1.10_release_4.0.x
2 parents e5bfe5c + bb822cf commit 6e3fe25

File tree

3 files changed

+43
-19
lines changed

3 files changed

+43
-19
lines changed

core/src/main/java/com/dtstack/flink/sql/GetPlan.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.dtstack.flink.sql.exec.ParamsInfo;
2424
import org.apache.commons.lang.exception.ExceptionUtils;
2525
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
2628

2729
/**
2830
* local模式获取sql任务的执行计划
@@ -32,15 +34,19 @@
3234
*/
3335
public class GetPlan {
3436

37+
private static final Logger LOG = LoggerFactory.getLogger(GetPlan.class);
38+
3539
public static String getExecutionPlan(String[] args) {
3640
try {
3741
long start = System.currentTimeMillis();
3842
ParamsInfo paramsInfo = ExecuteProcessHelper.parseParams(args);
43+
paramsInfo.setGetPlan(true);
3944
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExecution(paramsInfo);
4045
String executionPlan = env.getExecutionPlan();
4146
long end = System.currentTimeMillis();
4247
return ApiResult.createSuccessResultJsonStr(executionPlan, end - start);
4348
} catch (Exception e) {
49+
LOG.error("Get plan error", e);
4450
return ApiResult.createErrorResultJsonStr(ExceptionUtils.getFullStackTrace(e));
4551
}
4652
}

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,6 @@
1818

1919
package com.dtstack.flink.sql.exec;
2020

21-
import com.dtstack.flink.sql.parser.CreateFuncParser;
22-
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
23-
import com.dtstack.flink.sql.parser.FlinkPlanner;
24-
import com.dtstack.flink.sql.parser.InsertSqlParser;
25-
import com.dtstack.flink.sql.parser.SqlParser;
26-
import com.dtstack.flink.sql.parser.SqlTree;
27-
import org.apache.flink.api.common.typeinfo.TypeInformation;
28-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
29-
import org.apache.flink.streaming.api.datastream.DataStream;
30-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
31-
import org.apache.flink.table.api.*;
32-
import org.apache.flink.table.api.java.StreamTableEnvironment;
33-
import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
34-
import org.apache.flink.table.sinks.TableSink;
35-
3621
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
3722
import com.dtstack.flink.sql.enums.ClusterMode;
3823
import com.dtstack.flink.sql.enums.ECacheType;
@@ -42,8 +27,14 @@
4227
import com.dtstack.flink.sql.function.FunctionManager;
4328
import com.dtstack.flink.sql.option.OptionParser;
4429
import com.dtstack.flink.sql.option.Options;
45-
import com.dtstack.flink.sql.side.SideSqlExec;
30+
import com.dtstack.flink.sql.parser.CreateFuncParser;
31+
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
32+
import com.dtstack.flink.sql.parser.FlinkPlanner;
33+
import com.dtstack.flink.sql.parser.InsertSqlParser;
34+
import com.dtstack.flink.sql.parser.SqlParser;
35+
import com.dtstack.flink.sql.parser.SqlTree;
4636
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
37+
import com.dtstack.flink.sql.side.SideSqlExec;
4738
import com.dtstack.flink.sql.sink.StreamSinkFactory;
4839
import com.dtstack.flink.sql.source.StreamSourceFactory;
4940
import com.dtstack.flink.sql.table.AbstractSourceTableInfo;
@@ -62,6 +53,17 @@
6253
import org.apache.calcite.sql.SqlNode;
6354
import org.apache.commons.io.Charsets;
6455
import org.apache.commons.lang3.StringUtils;
56+
import org.apache.flink.api.common.typeinfo.TypeInformation;
57+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
58+
import org.apache.flink.streaming.api.datastream.DataStream;
59+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
60+
import org.apache.flink.table.api.EnvironmentSettings;
61+
import org.apache.flink.table.api.Table;
62+
import org.apache.flink.table.api.TableConfig;
63+
import org.apache.flink.table.api.TableEnvironment;
64+
import org.apache.flink.table.api.java.StreamTableEnvironment;
65+
import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
66+
import org.apache.flink.table.sinks.TableSink;
6567
import org.slf4j.Logger;
6668
import org.slf4j.LoggerFactory;
6769

@@ -71,13 +73,13 @@
7173
import java.net.URLClassLoader;
7274
import java.net.URLDecoder;
7375
import java.time.ZoneId;
76+
import java.util.ArrayList;
7477
import java.util.Arrays;
7578
import java.util.List;
7679
import java.util.Map;
7780
import java.util.Properties;
7881
import java.util.Set;
7982
import java.util.TimeZone;
80-
import java.util.ArrayList;
8183

8284
/**
8385
* 任务执行时的流程方法
@@ -158,7 +160,7 @@ public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInf
158160
Map<String, Table> registerTableCache = Maps.newHashMap();
159161

160162
//register udf
161-
ExecuteProcessHelper.registerUserDefinedFunction(sqlTree, paramsInfo.getJarUrlList(), tableEnv);
163+
ExecuteProcessHelper.registerUserDefinedFunction(sqlTree, paramsInfo.getJarUrlList(), tableEnv, paramsInfo.isGetPlan());
162164
//register table schema
163165
Set<URL> classPathSets = ExecuteProcessHelper.registerTable(sqlTree, env, tableEnv, paramsInfo.getLocalSqlPluginPath(),
164166
paramsInfo.getRemoteSqlPluginPath(), paramsInfo.getPluginLoadMode(), sideTableMap, registerTableCache);
@@ -243,13 +245,19 @@ private static void sqlTranslation(String localSqlPluginPath,
243245
}
244246
}
245247

246-
public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrlList, TableEnvironment tableEnv)
248+
public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrlList, TableEnvironment tableEnv, boolean getPlan)
247249
throws IllegalAccessException, InvocationTargetException {
248250
// udf和tableEnv须由同一个类加载器加载
249251
ClassLoader levelClassLoader = tableEnv.getClass().getClassLoader();
250252
URLClassLoader classLoader = null;
251253
List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList();
252254
for (CreateFuncParser.SqlParserResult funcInfo : funcList) {
255+
// 构建plan的情况下,udf和tableEnv不需要是同一个类加载器
256+
if (getPlan) {
257+
URL[] urls = jarUrlList.toArray(new URL[0]);
258+
classLoader = URLClassLoader.newInstance(urls);
259+
}
260+
253261
//classloader
254262
if (classLoader == null) {
255263
classLoader = ClassLoaderManager.loadExtraJar(jarUrlList, (URLClassLoader) levelClassLoader);

core/src/main/java/com/dtstack/flink/sql/exec/ParamsInfo.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.net.URL;
2323
import java.util.List;
24+
import java.util.Objects;
2425
import java.util.Properties;
2526

2627
/**
@@ -39,6 +40,7 @@ public class ParamsInfo {
3940
private String pluginLoadMode;
4041
private String deployMode;
4142
private Properties confProp;
43+
private boolean getPlan = false;
4244

4345
public ParamsInfo(String sql, String name, List<URL> jarUrlList, String localSqlPluginPath,
4446
String remoteSqlPluginPath, String pluginLoadMode, String deployMode, Properties confProp) {
@@ -52,6 +54,14 @@ public ParamsInfo(String sql, String name, List<URL> jarUrlList, String localSql
5254
this.confProp = confProp;
5355
}
5456

57+
public boolean isGetPlan() {
58+
return getPlan;
59+
}
60+
61+
public void setGetPlan(boolean getPlan) {
62+
this.getPlan = getPlan;
63+
}
64+
5565
public String getSql() {
5666
return sql;
5767
}

0 commit comments

Comments
 (0)