Skip to content

Commit 74b2a9b

Browse files
committed
support get execution plan
1 parent e37e0bb commit 74b2a9b

File tree

4 files changed

+457
-200
lines changed

4 files changed

+457
-200
lines changed
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql;
20+
21+
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
22+
import com.dtstack.flink.sql.config.CalciteConfig;
23+
import com.dtstack.flink.sql.enums.ClusterMode;
24+
import com.dtstack.flink.sql.enums.ECacheType;
25+
import com.dtstack.flink.sql.environment.MyLocalStreamEnvironment;
26+
import com.dtstack.flink.sql.environment.StreamEnvConfigManager;
27+
import com.dtstack.flink.sql.exec.FlinkSQLExec;
28+
import com.dtstack.flink.sql.function.FunctionManager;
29+
import com.dtstack.flink.sql.parser.CreateFuncParser;
30+
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
31+
import com.dtstack.flink.sql.parser.InsertSqlParser;
32+
import com.dtstack.flink.sql.parser.SqlTree;
33+
import com.dtstack.flink.sql.side.SideSqlExec;
34+
import com.dtstack.flink.sql.side.SideTableInfo;
35+
import com.dtstack.flink.sql.sink.StreamSinkFactory;
36+
import com.dtstack.flink.sql.source.StreamSourceFactory;
37+
import com.dtstack.flink.sql.table.SourceTableInfo;
38+
import com.dtstack.flink.sql.table.TableInfo;
39+
import com.dtstack.flink.sql.table.TargetTableInfo;
40+
import com.dtstack.flink.sql.util.DtStringUtil;
41+
import com.dtstack.flink.sql.util.PluginUtil;
42+
import com.dtstack.flink.sql.watermarker.WaterMarkerAssigner;
43+
import com.fasterxml.jackson.databind.ObjectMapper;
44+
import com.google.common.collect.Sets;
45+
import org.apache.calcite.sql.SqlInsert;
46+
import org.apache.calcite.sql.SqlNode;
47+
import org.apache.flink.api.common.typeinfo.TypeInformation;
48+
import org.apache.flink.api.java.tuple.Tuple2;
49+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
50+
import org.apache.flink.streaming.api.datastream.DataStream;
51+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
52+
import org.apache.flink.table.api.StreamQueryConfig;
53+
import org.apache.flink.table.api.Table;
54+
import org.apache.flink.table.api.TableEnvironment;
55+
import org.apache.flink.table.api.java.StreamTableEnvironment;
56+
import org.apache.flink.table.sinks.TableSink;
57+
import org.apache.flink.types.Row;
58+
import org.slf4j.Logger;
59+
import org.slf4j.LoggerFactory;
60+
61+
import java.lang.reflect.InvocationTargetException;
62+
import java.net.URL;
63+
import java.net.URLClassLoader;
64+
import java.util.List;
65+
import java.util.Map;
66+
import java.util.Properties;
67+
import java.util.Set;
68+
69+
/**
70+
* 提取任务执行时共同的流程方法
71+
* Date: 2020/2/17
72+
* Company: www.dtstack.com
73+
* @author maqi
74+
*/
75+
public class CommonProcess {
76+
77+
private static final String CLASS_FILE_NAME_FMT = "class_path_%d";
78+
private static final Logger LOG = LoggerFactory.getLogger(Main.class);
79+
80+
public static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironment tableEnv, SqlTree sqlTree, Map<String, SideTableInfo> sideTableMap, Map<String, Table> registerTableCache, StreamQueryConfig queryConfig) throws Exception {
81+
SideSqlExec sideSqlExec = new SideSqlExec();
82+
sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);
83+
for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) {
84+
sideSqlExec.registerTmpTable(result, sideTableMap, tableEnv, registerTableCache);
85+
}
86+
87+
for (InsertSqlParser.SqlParseResult result : sqlTree.getExecSqlList()) {
88+
if (LOG.isInfoEnabled()) {
89+
LOG.info("exe-sql:\n" + result.getExecSql());
90+
}
91+
boolean isSide = false;
92+
for (String tableName : result.getTargetTableList()) {
93+
if (sqlTree.getTmpTableMap().containsKey(tableName)) {
94+
CreateTmpTableParser.SqlParserResult tmp = sqlTree.getTmpTableMap().get(tableName);
95+
String realSql = DtStringUtil.replaceIgnoreQuota(result.getExecSql(), "`", "");
96+
97+
SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql, CalciteConfig.MYSQL_LEX_CONFIG).parseStmt();
98+
String tmpSql = ((SqlInsert) sqlNode).getSource().toString();
99+
tmp.setExecSql(tmpSql);
100+
sideSqlExec.registerTmpTable(tmp, sideTableMap, tableEnv, registerTableCache);
101+
} else {
102+
for (String sourceTable : result.getSourceTableList()) {
103+
if (sideTableMap.containsKey(sourceTable)) {
104+
isSide = true;
105+
break;
106+
}
107+
}
108+
if (isSide) {
109+
//sql-dimensional table contains the dimension table of execution
110+
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig);
111+
} else {
112+
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql(), queryConfig);
113+
if (LOG.isInfoEnabled()) {
114+
LOG.info("exec sql: " + result.getExecSql());
115+
}
116+
}
117+
}
118+
}
119+
}
120+
}
121+
122+
public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrlList, TableEnvironment tableEnv)
123+
throws IllegalAccessException, InvocationTargetException {
124+
// udf和tableEnv须由同一个类加载器加载
125+
ClassLoader levelClassLoader = tableEnv.getClass().getClassLoader();
126+
URLClassLoader classLoader = null;
127+
List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList();
128+
for (CreateFuncParser.SqlParserResult funcInfo : funcList) {
129+
//classloader
130+
if (classLoader == null) {
131+
classLoader = ClassLoaderManager.loadExtraJar(jarUrlList, (URLClassLoader) levelClassLoader);
132+
}
133+
FunctionManager.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(), tableEnv, classLoader);
134+
}
135+
}
136+
137+
/**
138+
* 向Flink注册源表和结果表,返回执行时插件包的全路径
139+
* @param sqlTree
140+
* @param env
141+
* @param tableEnv
142+
* @param localSqlPluginPath
143+
* @param remoteSqlPluginPath
144+
* @param pluginLoadMode 插件加载模式 classpath or shipfile
145+
* @param sideTableMap
146+
* @param registerTableCache
147+
* @return
148+
* @throws Exception
149+
*/
150+
public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, String localSqlPluginPath,
151+
String remoteSqlPluginPath, String pluginLoadMode, Map<String, SideTableInfo> sideTableMap, Map<String, Table> registerTableCache) throws Exception {
152+
Set<URL> pluginClassPatshSets = Sets.newHashSet();
153+
WaterMarkerAssigner waterMarkerAssigner = new WaterMarkerAssigner();
154+
for (TableInfo tableInfo : sqlTree.getTableInfoMap().values()) {
155+
156+
if (tableInfo instanceof SourceTableInfo) {
157+
158+
SourceTableInfo sourceTableInfo = (SourceTableInfo) tableInfo;
159+
Table table = StreamSourceFactory.getStreamSource(sourceTableInfo, env, tableEnv, localSqlPluginPath);
160+
tableEnv.registerTable(sourceTableInfo.getAdaptName(), table);
161+
//Note --- parameter conversion function can not be used inside a function of the type of polymerization
162+
//Create table in which the function is arranged only need adaptation sql
163+
String adaptSql = sourceTableInfo.getAdaptSelectSql();
164+
Table adaptTable = adaptSql == null ? table : tableEnv.sqlQuery(adaptSql);
165+
166+
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getFieldTypes(), adaptTable.getSchema().getFieldNames());
167+
DataStream adaptStream = tableEnv.toRetractStream(adaptTable, typeInfo)
168+
.map((Tuple2<Boolean, Row> f0) -> {
169+
return f0.f1;
170+
})
171+
.returns(typeInfo);
172+
173+
String fields = String.join(",", typeInfo.getFieldNames());
174+
175+
if (waterMarkerAssigner.checkNeedAssignWaterMarker(sourceTableInfo)) {
176+
adaptStream = waterMarkerAssigner.assignWaterMarker(adaptStream, typeInfo, sourceTableInfo);
177+
fields += ",ROWTIME.ROWTIME";
178+
} else {
179+
fields += ",PROCTIME.PROCTIME";
180+
}
181+
182+
Table regTable = tableEnv.fromDataStream(adaptStream, fields);
183+
tableEnv.registerTable(tableInfo.getName(), regTable);
184+
if (LOG.isInfoEnabled()) {
185+
LOG.info("registe table {} success.", tableInfo.getName());
186+
}
187+
registerTableCache.put(tableInfo.getName(), regTable);
188+
189+
URL sourceTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
190+
pluginClassPatshSets.add(sourceTablePathUrl);
191+
} else if (tableInfo instanceof TargetTableInfo) {
192+
193+
TableSink tableSink = StreamSinkFactory.getTableSink((TargetTableInfo) tableInfo, localSqlPluginPath);
194+
TypeInformation[] flinkTypes = FunctionManager.transformTypes(tableInfo.getFieldClasses());
195+
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);
196+
197+
URL sinkTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), TargetTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
198+
pluginClassPatshSets.add(sinkTablePathUrl);
199+
} else if (tableInfo instanceof SideTableInfo) {
200+
String sideOperator = ECacheType.ALL.name().equals(((SideTableInfo) tableInfo).getCacheType()) ? "all" : "async";
201+
sideTableMap.put(tableInfo.getName(), (SideTableInfo) tableInfo);
202+
203+
URL sideTablePathUrl = PluginUtil.buildSidePathByLoadMode(tableInfo.getType(), sideOperator, SideTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
204+
pluginClassPatshSets.add(sideTablePathUrl);
205+
} else {
206+
throw new RuntimeException("not support table type:" + tableInfo.getType());
207+
}
208+
}
209+
return pluginClassPatshSets;
210+
}
211+
212+
/**
213+
* perjob模式将job依赖的插件包路径存储到cacheFile,在外围将插件包路径传递给jobgraph
214+
* @param env
215+
* @param classPathSet
216+
*/
217+
public static void registerPluginUrlToCachedFile(StreamExecutionEnvironment env, Set<URL> classPathSet) {
218+
int i = 0;
219+
for (URL url : classPathSet) {
220+
String classFileName = String.format(CLASS_FILE_NAME_FMT, i);
221+
env.registerCachedFile(url.getPath(), classFileName, true);
222+
i++;
223+
}
224+
}
225+
226+
public static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws Exception {
227+
StreamExecutionEnvironment env = !ClusterMode.local.name().equals(deployMode) ?
228+
StreamExecutionEnvironment.getExecutionEnvironment() :
229+
new MyLocalStreamEnvironment();
230+
231+
StreamEnvConfigManager.streamExecutionEnvironmentConfig(env, confProperties);
232+
return env;
233+
}
234+
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql;
20+
21+
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
22+
import com.dtstack.flink.sql.enums.ClusterMode;
23+
import com.dtstack.flink.sql.environment.MyLocalStreamEnvironment;
24+
import com.dtstack.flink.sql.environment.StreamEnvConfigManager;
25+
import com.dtstack.flink.sql.option.OptionParser;
26+
import com.dtstack.flink.sql.option.Options;
27+
import com.dtstack.flink.sql.parser.SqlParser;
28+
import com.dtstack.flink.sql.parser.SqlTree;
29+
import com.dtstack.flink.sql.side.SideTableInfo;
30+
import com.dtstack.flink.sql.util.PluginUtil;
31+
import com.fasterxml.jackson.databind.ObjectMapper;
32+
import com.fasterxml.jackson.databind.node.ObjectNode;
33+
import com.google.common.base.Strings;
34+
import com.google.common.collect.Lists;
35+
import com.google.common.collect.Maps;
36+
import org.apache.commons.io.Charsets;
37+
import org.apache.commons.lang.exception.ExceptionUtils;
38+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
39+
import org.apache.flink.table.api.StreamQueryConfig;
40+
import org.apache.flink.table.api.Table;
41+
import org.apache.flink.table.api.java.StreamTableEnvironment;
42+
43+
import java.io.File;
44+
import java.net.URL;
45+
import java.net.URLDecoder;
46+
import java.util.Arrays;
47+
import java.util.List;
48+
import java.util.Map;
49+
import java.util.Properties;
50+
import java.util.Set;
51+
52+
/**
53+
* 获取sql任务的执行计划
54+
* Date: 2020/2/17
55+
* Company: www.dtstack.com
56+
* @author maqi
57+
*/
58+
public class GetPlan {
59+
public static final String STATUS_KEY = "status";
60+
public static final String MSG_KEY = "msg";
61+
public static final Integer FAIL = 0;
62+
public static final Integer SUCCESS = 1;
63+
64+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
65+
66+
public static String getExecutionPlan(String[] args) {
67+
try {
68+
Arrays.stream(args).forEach(System.out::println);
69+
OptionParser optionParser = new OptionParser(args);
70+
Options options = optionParser.getOptions();
71+
String sql = options.getSql();
72+
String addJarListStr = options.getAddjar();
73+
String localSqlPluginPath = options.getLocalSqlPluginPath();
74+
String deployMode = ClusterMode.local.name();
75+
String confProp = options.getConfProp();
76+
77+
sql = URLDecoder.decode(sql, Charsets.UTF_8.name());
78+
SqlParser.setLocalSqlPluginRoot(localSqlPluginPath);
79+
80+
List<String> addJarFileList = Lists.newArrayList();
81+
if (!Strings.isNullOrEmpty(addJarListStr)) {
82+
addJarListStr = URLDecoder.decode(addJarListStr, Charsets.UTF_8.name());
83+
addJarFileList = OBJECT_MAPPER.readValue(addJarListStr, List.class);
84+
}
85+
86+
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
87+
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
88+
StreamExecutionEnvironment env = CommonProcess.getStreamExeEnv(confProperties, deployMode);
89+
90+
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
91+
StreamQueryConfig streamQueryConfig = StreamEnvConfigManager.getStreamQueryConfig(tableEnv, confProperties);
92+
93+
List<URL> jarUrlList = Lists.newArrayList();
94+
SqlTree sqlTree = SqlParser.parseSql(sql);
95+
96+
//Get External jar to load
97+
for (String addJarPath : addJarFileList) {
98+
File tmpFile = new File(addJarPath);
99+
jarUrlList.add(tmpFile.toURI().toURL());
100+
}
101+
102+
Map<String, SideTableInfo> sideTableMap = Maps.newHashMap();
103+
Map<String, Table> registerTableCache = Maps.newHashMap();
104+
105+
//register udf
106+
CommonProcess.registerUserDefinedFunction(sqlTree, jarUrlList, tableEnv);
107+
//register table schema
108+
Set<URL> classPathSets = CommonProcess.registerTable(sqlTree, env, tableEnv, localSqlPluginPath, null, null, sideTableMap, registerTableCache);
109+
// cache classPathSets
110+
CommonProcess.registerPluginUrlToCachedFile(env, classPathSets);
111+
112+
CommonProcess.sqlTranslation(localSqlPluginPath, tableEnv, sqlTree, sideTableMap, registerTableCache, streamQueryConfig);
113+
114+
((MyLocalStreamEnvironment)env).setClasspaths(ClassLoaderManager.getClassPath());
115+
116+
String executionPlan = env.getExecutionPlan();
117+
return getJsonStr(SUCCESS, executionPlan);
118+
} catch (Exception e) {
119+
return getJsonStr(FAIL, ExceptionUtils.getFullStackTrace(e));
120+
}
121+
}
122+
123+
public static String getJsonStr(int status, String msg) {
124+
ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
125+
objectNode.put(STATUS_KEY, status);
126+
objectNode.put(MSG_KEY, msg);
127+
return objectNode.toString();
128+
}
129+
}

0 commit comments

Comments
 (0)