Skip to content

Commit ccc0ccd

Browse files
committed
extract common method
1 parent 74b2a9b commit ccc0ccd

File tree

6 files changed

+183
-391
lines changed

6 files changed

+183
-391
lines changed

core/src/main/java/com/dtstack/flink/sql/CommonProcess.java

Lines changed: 0 additions & 234 deletions
This file was deleted.

core/src/main/java/com/dtstack/flink/sql/GetPlan.java

Lines changed: 5 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -18,39 +18,15 @@
1818

1919
package com.dtstack.flink.sql;
2020

21-
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
22-
import com.dtstack.flink.sql.enums.ClusterMode;
23-
import com.dtstack.flink.sql.environment.MyLocalStreamEnvironment;
24-
import com.dtstack.flink.sql.environment.StreamEnvConfigManager;
25-
import com.dtstack.flink.sql.option.OptionParser;
26-
import com.dtstack.flink.sql.option.Options;
27-
import com.dtstack.flink.sql.parser.SqlParser;
28-
import com.dtstack.flink.sql.parser.SqlTree;
29-
import com.dtstack.flink.sql.side.SideTableInfo;
30-
import com.dtstack.flink.sql.util.PluginUtil;
21+
import com.dtstack.flink.sql.exec.BuildProcess;
22+
import com.dtstack.flink.sql.exec.ParamsInfo;
3123
import com.fasterxml.jackson.databind.ObjectMapper;
3224
import com.fasterxml.jackson.databind.node.ObjectNode;
33-
import com.google.common.base.Strings;
34-
import com.google.common.collect.Lists;
35-
import com.google.common.collect.Maps;
36-
import org.apache.commons.io.Charsets;
3725
import org.apache.commons.lang.exception.ExceptionUtils;
3826
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
39-
import org.apache.flink.table.api.StreamQueryConfig;
40-
import org.apache.flink.table.api.Table;
41-
import org.apache.flink.table.api.java.StreamTableEnvironment;
42-
43-
import java.io.File;
44-
import java.net.URL;
45-
import java.net.URLDecoder;
46-
import java.util.Arrays;
47-
import java.util.List;
48-
import java.util.Map;
49-
import java.util.Properties;
50-
import java.util.Set;
5127

5228
/**
53-
* 获取sql任务的执行计划
29+
* local模式获取sql任务的执行计划
5430
* Date: 2020/2/17
5531
* Company: www.dtstack.com
5632
* @author maqi
@@ -65,54 +41,8 @@ public class GetPlan {
6541

6642
public static String getExecutionPlan(String[] args) {
6743
try {
68-
Arrays.stream(args).forEach(System.out::println);
69-
OptionParser optionParser = new OptionParser(args);
70-
Options options = optionParser.getOptions();
71-
String sql = options.getSql();
72-
String addJarListStr = options.getAddjar();
73-
String localSqlPluginPath = options.getLocalSqlPluginPath();
74-
String deployMode = ClusterMode.local.name();
75-
String confProp = options.getConfProp();
76-
77-
sql = URLDecoder.decode(sql, Charsets.UTF_8.name());
78-
SqlParser.setLocalSqlPluginRoot(localSqlPluginPath);
79-
80-
List<String> addJarFileList = Lists.newArrayList();
81-
if (!Strings.isNullOrEmpty(addJarListStr)) {
82-
addJarListStr = URLDecoder.decode(addJarListStr, Charsets.UTF_8.name());
83-
addJarFileList = OBJECT_MAPPER.readValue(addJarListStr, List.class);
84-
}
85-
86-
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
87-
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
88-
StreamExecutionEnvironment env = CommonProcess.getStreamExeEnv(confProperties, deployMode);
89-
90-
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
91-
StreamQueryConfig streamQueryConfig = StreamEnvConfigManager.getStreamQueryConfig(tableEnv, confProperties);
92-
93-
List<URL> jarUrlList = Lists.newArrayList();
94-
SqlTree sqlTree = SqlParser.parseSql(sql);
95-
96-
//Get External jar to load
97-
for (String addJarPath : addJarFileList) {
98-
File tmpFile = new File(addJarPath);
99-
jarUrlList.add(tmpFile.toURI().toURL());
100-
}
101-
102-
Map<String, SideTableInfo> sideTableMap = Maps.newHashMap();
103-
Map<String, Table> registerTableCache = Maps.newHashMap();
104-
105-
//register udf
106-
CommonProcess.registerUserDefinedFunction(sqlTree, jarUrlList, tableEnv);
107-
//register table schema
108-
Set<URL> classPathSets = CommonProcess.registerTable(sqlTree, env, tableEnv, localSqlPluginPath, null, null, sideTableMap, registerTableCache);
109-
// cache classPathSets
110-
CommonProcess.registerPluginUrlToCachedFile(env, classPathSets);
111-
112-
CommonProcess.sqlTranslation(localSqlPluginPath, tableEnv, sqlTree, sideTableMap, registerTableCache, streamQueryConfig);
113-
114-
((MyLocalStreamEnvironment)env).setClasspaths(ClassLoaderManager.getClassPath());
115-
44+
ParamsInfo paramsInfo = BuildProcess.parseParams(args);
45+
StreamExecutionEnvironment env = BuildProcess.getStreamExecution(paramsInfo);
11646
String executionPlan = env.getExecutionPlan();
11747
return getJsonStr(SUCCESS, executionPlan);
11848
} catch (Exception e) {

0 commit comments

Comments
 (0)