Skip to content

Commit fe6f2e6

Browse files
committed
Merge branch 'feat_1.8_mergeGetPlanNew' into 'v1.8.0_dev'
flinksql 执行计划 See merge request !229
2 parents a690319 + e922674 commit fe6f2e6

File tree

9 files changed

+784
-278
lines changed

9 files changed

+784
-278
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql;
20+
21+
import com.dtstack.flink.sql.exec.ApiResult;
22+
import com.dtstack.flink.sql.exec.ExecuteProcessHelper;
23+
import com.dtstack.flink.sql.exec.ParamsInfo;
24+
import org.apache.commons.lang.exception.ExceptionUtils;
25+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
26+
27+
/**
28+
* local模式获取sql任务的执行计划
29+
* Date: 2020/2/17
30+
* Company: www.dtstack.com
31+
* @author maqi
32+
*/
33+
public class GetPlan {
34+
35+
public static String getExecutionPlan(String[] args) {
36+
try {
37+
long start = System.currentTimeMillis();
38+
ParamsInfo paramsInfo = ExecuteProcessHelper.parseParams(args);
39+
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExecution(paramsInfo);
40+
String executionPlan = env.getExecutionPlan();
41+
long end = System.currentTimeMillis();
42+
return ApiResult.createSuccessResultJsonStr(executionPlan, end - start);
43+
} catch (Exception e) {
44+
return ApiResult.createErrorResultJsonStr(ExceptionUtils.getFullStackTrace(e));
45+
}
46+
}
47+
}

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 8 additions & 273 deletions
Original file line numberDiff line numberDiff line change
@@ -19,64 +19,14 @@
1919

2020
package com.dtstack.flink.sql;
2121

22-
import com.dtstack.flink.sql.config.CalciteConfig;
23-
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
24-
import com.dtstack.flink.sql.enums.ClusterMode;
25-
import com.dtstack.flink.sql.enums.ECacheType;
26-
import com.dtstack.flink.sql.environment.MyLocalStreamEnvironment;
27-
import com.dtstack.flink.sql.environment.StreamEnvConfigManager;
28-
import com.dtstack.flink.sql.exec.FlinkSQLExec;
29-
import com.dtstack.flink.sql.option.OptionParser;
30-
import com.dtstack.flink.sql.parser.CreateFuncParser;
31-
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
32-
import com.dtstack.flink.sql.parser.InsertSqlParser;
33-
import com.dtstack.flink.sql.parser.SqlParser;
34-
import com.dtstack.flink.sql.parser.SqlTree;
35-
import com.dtstack.flink.sql.side.SideSqlExec;
36-
import com.dtstack.flink.sql.side.SideTableInfo;
37-
import com.dtstack.flink.sql.table.SourceTableInfo;
38-
import com.dtstack.flink.sql.table.TableInfo;
39-
import com.dtstack.flink.sql.table.TargetTableInfo;
40-
import com.dtstack.flink.sql.sink.StreamSinkFactory;
41-
import com.dtstack.flink.sql.source.StreamSourceFactory;
42-
import com.dtstack.flink.sql.util.DtStringUtil;
43-
import com.dtstack.flink.sql.watermarker.WaterMarkerAssigner;
44-
import com.dtstack.flink.sql.function.FunctionManager;
45-
import com.dtstack.flink.sql.util.PluginUtil;
46-
import org.apache.calcite.sql.SqlInsert;
47-
import org.apache.calcite.sql.SqlNode;
48-
import org.apache.commons.io.Charsets;
49-
import org.apache.flink.api.common.typeinfo.TypeInformation;
50-
import org.apache.flink.api.java.tuple.Tuple2;
51-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
52-
import com.google.common.base.Strings;
53-
import com.google.common.collect.Lists;
54-
import com.google.common.collect.Maps;
55-
import com.google.common.collect.Sets;
56-
import com.fasterxml.jackson.databind.ObjectMapper;
57-
import org.apache.flink.streaming.api.datastream.DataStream;
22+
23+
24+
import com.dtstack.flink.sql.exec.ExecuteProcessHelper;
25+
import com.dtstack.flink.sql.exec.ParamsInfo;
5826
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
59-
import org.apache.flink.table.api.StreamQueryConfig;
60-
import org.apache.flink.table.api.Table;
61-
import org.apache.flink.table.api.TableEnvironment;
62-
import org.apache.flink.table.api.java.StreamTableEnvironment;
63-
import org.apache.flink.table.sinks.TableSink;
64-
import org.apache.flink.types.Row;
6527
import org.slf4j.Logger;
6628
import org.slf4j.LoggerFactory;
6729

68-
import java.io.File;
69-
import java.lang.reflect.InvocationTargetException;
70-
import java.net.URL;
71-
import java.net.URLClassLoader;
72-
import java.net.URLDecoder;
73-
import java.util.List;
74-
import java.util.Map;
75-
import java.util.Optional;
76-
import java.util.Properties;
77-
import java.util.Set;
78-
79-
import com.dtstack.flink.sql.option.Options;
8030
import ch.qos.logback.classic.Level;
8131
import ch.qos.logback.classic.LoggerContext;
8232

@@ -87,228 +37,13 @@
8737
*/
8838

8939
public class Main {
90-
91-
private static final String CLASS_FILE_NAME_FMT = "class_path_%d";
92-
93-
private static final ObjectMapper objMapper = new ObjectMapper();
94-
9540
private static final Logger LOG = LoggerFactory.getLogger(Main.class);
9641

97-
9842
public static void main(String[] args) throws Exception {
99-
100-
OptionParser optionParser = new OptionParser(args);
101-
Options options = optionParser.getOptions();
102-
String sql = options.getSql();
103-
String name = options.getName();
104-
String addJarListStr = options.getAddjar();
105-
String localSqlPluginPath = options.getLocalSqlPluginPath();
106-
String remoteSqlPluginPath = options.getRemoteSqlPluginPath();
107-
String pluginLoadMode = options.getPluginLoadMode();
108-
String deployMode = options.getMode();
109-
String confProp = options.getConfProp();
110-
sql = URLDecoder.decode(sql, Charsets.UTF_8.name());
111-
SqlParser.setLocalSqlPluginRoot(localSqlPluginPath);
112-
setLogLevel(options.getLogLevel());
113-
114-
List<String> addJarFileList = Lists.newArrayList();
115-
if (!Strings.isNullOrEmpty(addJarListStr)) {
116-
addJarListStr = URLDecoder.decode(addJarListStr, Charsets.UTF_8.name());
117-
addJarFileList = objMapper.readValue(addJarListStr, List.class);
118-
}
119-
120-
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
121-
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
122-
StreamExecutionEnvironment env = getStreamExeEnv(confProperties, deployMode);
123-
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
124-
StreamQueryConfig streamQueryConfig = StreamEnvConfigManager.getStreamQueryConfig(tableEnv, confProperties);
125-
126-
List<URL> jarURList = Lists.newArrayList();
127-
SqlTree sqlTree = SqlParser.parseSql(sql);
128-
129-
//Get External jar to load
130-
for (String addJarPath : addJarFileList) {
131-
File tmpFile = new File(addJarPath);
132-
jarURList.add(tmpFile.toURI().toURL());
133-
}
134-
135-
Map<String, SideTableInfo> sideTableMap = Maps.newHashMap();
136-
Map<String, Table> registerTableCache = Maps.newHashMap();
137-
138-
//register udf
139-
registerUserDefinedFunction(sqlTree, jarURList, tableEnv);
140-
//register table schema
141-
Set<URL> classPathSets = registerTable(sqlTree, env, tableEnv, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode, sideTableMap, registerTableCache);
142-
// cache classPathSets
143-
registerPluginUrlToCachedFile(env, classPathSets);
144-
145-
sqlTranslation(localSqlPluginPath, tableEnv, sqlTree, sideTableMap, registerTableCache, streamQueryConfig);
146-
147-
if (env instanceof MyLocalStreamEnvironment) {
148-
((MyLocalStreamEnvironment) env).setClasspaths(ClassLoaderManager.getClassPath());
149-
}
150-
151-
env.execute(name);
152-
}
153-
154-
private static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironment tableEnv,SqlTree sqlTree,Map<String, SideTableInfo> sideTableMap,Map<String, Table> registerTableCache, StreamQueryConfig queryConfig) throws Exception {
155-
SideSqlExec sideSqlExec = new SideSqlExec();
156-
sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);
157-
for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) {
158-
sideSqlExec.registerTmpTable(result, sideTableMap, tableEnv, registerTableCache);
159-
}
160-
161-
for (InsertSqlParser.SqlParseResult result : sqlTree.getExecSqlList()) {
162-
if (LOG.isInfoEnabled()) {
163-
LOG.info("exe-sql:\n" + result.getExecSql());
164-
}
165-
boolean isSide = false;
166-
for (String tableName : result.getTargetTableList()) {
167-
if (sqlTree.getTmpTableMap().containsKey(tableName)) {
168-
CreateTmpTableParser.SqlParserResult tmp = sqlTree.getTmpTableMap().get(tableName);
169-
String realSql = DtStringUtil.replaceIgnoreQuota(result.getExecSql(), "`", "");
170-
171-
SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql, CalciteConfig.MYSQL_LEX_CONFIG).parseStmt();
172-
String tmpSql = ((SqlInsert) sqlNode).getSource().toString();
173-
tmp.setExecSql(tmpSql);
174-
sideSqlExec.registerTmpTable(tmp, sideTableMap, tableEnv, registerTableCache);
175-
} else {
176-
for (String sourceTable : result.getSourceTableList()) {
177-
if (sideTableMap.containsKey(sourceTable)) {
178-
isSide = true;
179-
break;
180-
}
181-
}
182-
if (isSide) {
183-
//sql-dimensional table contains the dimension table of execution
184-
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig);
185-
}else{
186-
System.out.println("----------exec sql without dimension join-----------" );
187-
System.out.println("----------real sql exec is--------------------------");
188-
System.out.println(result.getExecSql());
189-
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql(), queryConfig);
190-
if(LOG.isInfoEnabled()){
191-
System.out.println();
192-
LOG.info("exec sql: " + result.getExecSql());
193-
}
194-
}
195-
}
196-
}
197-
}
198-
199-
200-
}
201-
202-
203-
private static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarURList, TableEnvironment tableEnv)
204-
throws IllegalAccessException, InvocationTargetException {
205-
// udf和tableEnv须由同一个类加载器加载
206-
ClassLoader levelClassLoader = tableEnv.getClass().getClassLoader();
207-
URLClassLoader classLoader = null;
208-
List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList();
209-
for (CreateFuncParser.SqlParserResult funcInfo : funcList) {
210-
//classloader
211-
if (classLoader == null) {
212-
classLoader = ClassLoaderManager.loadExtraJar(jarURList, (URLClassLoader) levelClassLoader);
213-
}
214-
FunctionManager.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(), tableEnv, classLoader);
215-
}
216-
}
217-
218-
/**
219-
* 向Flink注册源表和结果表,返回执行时插件包的全路径
220-
* @param sqlTree
221-
* @param env
222-
* @param tableEnv
223-
* @param localSqlPluginPath
224-
* @param remoteSqlPluginPath
225-
* @param pluginLoadMode 插件加载模式 classpath or shipfile
226-
* @param sideTableMap
227-
* @param registerTableCache
228-
* @return
229-
* @throws Exception
230-
*/
231-
private static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, String localSqlPluginPath,
232-
String remoteSqlPluginPath, String pluginLoadMode, Map<String, SideTableInfo> sideTableMap, Map<String, Table> registerTableCache) throws Exception {
233-
Set<URL> pluginClassPatshSets = Sets.newHashSet();
234-
WaterMarkerAssigner waterMarkerAssigner = new WaterMarkerAssigner();
235-
for (TableInfo tableInfo : sqlTree.getTableInfoMap().values()) {
236-
237-
if (tableInfo instanceof SourceTableInfo) {
238-
239-
SourceTableInfo sourceTableInfo = (SourceTableInfo) tableInfo;
240-
Table table = StreamSourceFactory.getStreamSource(sourceTableInfo, env, tableEnv, localSqlPluginPath);
241-
tableEnv.registerTable(sourceTableInfo.getAdaptName(), table);
242-
//Note --- parameter conversion function can not be used inside a function of the type of polymerization
243-
//Create table in which the function is arranged only need adaptation sql
244-
String adaptSql = sourceTableInfo.getAdaptSelectSql();
245-
Table adaptTable = adaptSql == null ? table : tableEnv.sqlQuery(adaptSql);
246-
247-
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getTypes(), adaptTable.getSchema().getColumnNames());
248-
DataStream adaptStream = tableEnv.toRetractStream(adaptTable, typeInfo)
249-
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
250-
.returns(typeInfo);
251-
252-
String fields = String.join(",", typeInfo.getFieldNames());
253-
254-
if (waterMarkerAssigner.checkNeedAssignWaterMarker(sourceTableInfo)) {
255-
adaptStream = waterMarkerAssigner.assignWaterMarker(adaptStream, typeInfo, sourceTableInfo);
256-
fields += ",ROWTIME.ROWTIME";
257-
} else {
258-
fields += ",PROCTIME.PROCTIME";
259-
}
260-
261-
Table regTable = tableEnv.fromDataStream(adaptStream, fields);
262-
tableEnv.registerTable(tableInfo.getName(), regTable);
263-
if (LOG.isInfoEnabled()) {
264-
LOG.info("registe table {} success.", tableInfo.getName());
265-
}
266-
registerTableCache.put(tableInfo.getName(), regTable);
267-
268-
URL sourceTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
269-
pluginClassPatshSets.add(sourceTablePathUrl);
270-
} else if (tableInfo instanceof TargetTableInfo) {
271-
272-
TableSink tableSink = StreamSinkFactory.getTableSink((TargetTableInfo) tableInfo, localSqlPluginPath);
273-
TypeInformation[] flinkTypes = FunctionManager.transformTypes(tableInfo.getFieldClasses());
274-
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);
275-
276-
URL sinkTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), TargetTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
277-
pluginClassPatshSets.add(sinkTablePathUrl);
278-
} else if (tableInfo instanceof SideTableInfo) {
279-
String sideOperator = ECacheType.ALL.name().equals(((SideTableInfo) tableInfo).getCacheType()) ? "all" : "async";
280-
sideTableMap.put(tableInfo.getName(), (SideTableInfo) tableInfo);
281-
282-
URL sideTablePathUrl = PluginUtil.buildSidePathByLoadMode(tableInfo.getType(), sideOperator, SideTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
283-
pluginClassPatshSets.add(sideTablePathUrl);
284-
} else {
285-
throw new RuntimeException("not support table type:" + tableInfo.getType());
286-
}
287-
}
288-
return pluginClassPatshSets;
289-
}
290-
291-
/**
292-
* perjob模式将job依赖的插件包路径存储到cacheFile,在外围将插件包路径传递给jobgraph
293-
* @param env
294-
* @param classPathSet
295-
*/
296-
private static void registerPluginUrlToCachedFile(StreamExecutionEnvironment env, Set<URL> classPathSet) {
297-
int i = 0;
298-
for (URL url : classPathSet) {
299-
String classFileName = String.format(CLASS_FILE_NAME_FMT, i);
300-
env.registerCachedFile(url.getPath(), classFileName, true);
301-
i++;
302-
}
303-
}
304-
305-
private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws Exception {
306-
StreamExecutionEnvironment env = !ClusterMode.local.name().equals(deployMode) ?
307-
StreamExecutionEnvironment.getExecutionEnvironment() :
308-
new MyLocalStreamEnvironment();
309-
310-
StreamEnvConfigManager.streamExecutionEnvironmentConfig(env, confProperties);
311-
return env;
43+
ParamsInfo paramsInfo = ExecuteProcessHelper.parseParams(args);
44+
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExecution(paramsInfo);
45+
env.execute(paramsInfo.getName());
46+
LOG.info("program {} execution success", paramsInfo.getName());
31247
}
31348
private static void setLogLevel(String level){
31449
LoggerContext loggerContext= (LoggerContext) LoggerFactory.getILoggerFactory();

0 commit comments

Comments
 (0)