Skip to content

Commit 69f0d75

Browse files
committed
Merge branch '1.8_test_mergeGetplannew' into '1.8_test_3.10.x'
1.8 test merge getplannew See merge request !232
2 parents 7850cf7 + 7e2e65b commit 69f0d75

File tree

20 files changed

+1725
-711
lines changed

20 files changed

+1725
-711
lines changed

core/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
<calcite.server.version>1.16.0</calcite.server.version>
2121
<jackson.version>2.7.9</jackson.version>
2222
<guava.version>19.0</guava.version>
23+
<logback.version>1.1.7</logback.version>
2324
</properties>
2425

2526
<dependencies>
@@ -122,6 +123,12 @@
122123
<version>4.12</version>
123124
</dependency>
124125

126+
<dependency>
127+
<groupId>ch.qos.logback</groupId>
128+
<artifactId>logback-classic</artifactId>
129+
<version>${logback.version}</version>
130+
</dependency>
131+
125132
</dependencies>
126133

127134
<build>
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql;
20+
21+
import com.dtstack.flink.sql.exec.ApiResult;
22+
import com.dtstack.flink.sql.exec.ExecuteProcessHelper;
23+
import com.dtstack.flink.sql.exec.ParamsInfo;
24+
import org.apache.commons.lang.exception.ExceptionUtils;
25+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
26+
27+
/**
28+
* local模式获取sql任务的执行计划
29+
* Date: 2020/2/17
30+
* Company: www.dtstack.com
31+
* @author maqi
32+
*/
33+
public class GetPlan {
34+
35+
public static String getExecutionPlan(String[] args) {
36+
try {
37+
long start = System.currentTimeMillis();
38+
ParamsInfo paramsInfo = ExecuteProcessHelper.parseParams(args);
39+
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExecution(paramsInfo);
40+
String executionPlan = env.getExecutionPlan();
41+
long end = System.currentTimeMillis();
42+
return ApiResult.createSuccessResultJsonStr(executionPlan, end - start);
43+
} catch (Exception e) {
44+
return ApiResult.createErrorResultJsonStr(ExceptionUtils.getFullStackTrace(e));
45+
}
46+
}
47+
}

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 8 additions & 269 deletions
Original file line numberDiff line numberDiff line change
@@ -19,64 +19,14 @@
1919

2020
package com.dtstack.flink.sql;
2121

22-
import com.dtstack.flink.sql.config.CalciteConfig;
23-
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
24-
import com.dtstack.flink.sql.enums.ClusterMode;
25-
import com.dtstack.flink.sql.enums.ECacheType;
26-
import com.dtstack.flink.sql.environment.MyLocalStreamEnvironment;
27-
import com.dtstack.flink.sql.environment.StreamEnvConfigManager;
28-
import com.dtstack.flink.sql.exec.FlinkSQLExec;
29-
import com.dtstack.flink.sql.option.OptionParser;
30-
import com.dtstack.flink.sql.parser.CreateFuncParser;
31-
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
32-
import com.dtstack.flink.sql.parser.InsertSqlParser;
33-
import com.dtstack.flink.sql.parser.SqlParser;
34-
import com.dtstack.flink.sql.parser.SqlTree;
35-
import com.dtstack.flink.sql.side.SideSqlExec;
36-
import com.dtstack.flink.sql.side.SideTableInfo;
37-
import com.dtstack.flink.sql.table.SourceTableInfo;
38-
import com.dtstack.flink.sql.table.TableInfo;
39-
import com.dtstack.flink.sql.table.TargetTableInfo;
40-
import com.dtstack.flink.sql.sink.StreamSinkFactory;
41-
import com.dtstack.flink.sql.source.StreamSourceFactory;
42-
import com.dtstack.flink.sql.util.DtStringUtil;
43-
import com.dtstack.flink.sql.watermarker.WaterMarkerAssigner;
44-
import com.dtstack.flink.sql.function.FunctionManager;
45-
import com.dtstack.flink.sql.util.PluginUtil;
46-
import org.apache.calcite.sql.SqlInsert;
47-
import org.apache.calcite.sql.SqlNode;
48-
import org.apache.commons.io.Charsets;
49-
import org.apache.flink.api.common.typeinfo.TypeInformation;
50-
import org.apache.flink.api.java.tuple.Tuple2;
51-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
52-
import com.google.common.base.Strings;
53-
import com.google.common.collect.Lists;
54-
import com.google.common.collect.Maps;
55-
import com.google.common.collect.Sets;
56-
import com.fasterxml.jackson.databind.ObjectMapper;
57-
import org.apache.flink.streaming.api.datastream.DataStream;
22+
23+
24+
import com.dtstack.flink.sql.exec.ExecuteProcessHelper;
25+
import com.dtstack.flink.sql.exec.ParamsInfo;
5826
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
59-
import org.apache.flink.table.api.StreamQueryConfig;
60-
import org.apache.flink.table.api.Table;
61-
import org.apache.flink.table.api.TableEnvironment;
62-
import org.apache.flink.table.api.java.StreamTableEnvironment;
63-
import org.apache.flink.table.sinks.TableSink;
64-
import org.apache.flink.types.Row;
6527
import org.slf4j.Logger;
6628
import org.slf4j.LoggerFactory;
6729

68-
import java.io.File;
69-
import java.lang.reflect.InvocationTargetException;
70-
import java.net.URL;
71-
import java.net.URLClassLoader;
72-
import java.net.URLDecoder;
73-
import java.util.List;
74-
import java.util.Map;
75-
import java.util.Optional;
76-
import java.util.Properties;
77-
import java.util.Set;
78-
79-
import com.dtstack.flink.sql.option.Options;
8030

8131
/**
8232
* Date: 2018/6/26
@@ -85,223 +35,12 @@
8535
*/
8636

8737
public class Main {
88-
89-
private static final String CLASS_FILE_NAME_FMT = "class_path_%d";
90-
91-
private static final ObjectMapper objMapper = new ObjectMapper();
92-
9338
private static final Logger LOG = LoggerFactory.getLogger(Main.class);
9439

95-
9640
public static void main(String[] args) throws Exception {
97-
98-
OptionParser optionParser = new OptionParser(args);
99-
Options options = optionParser.getOptions();
100-
String sql = options.getSql();
101-
String name = options.getName();
102-
String addJarListStr = options.getAddjar();
103-
String localSqlPluginPath = options.getLocalSqlPluginPath();
104-
String remoteSqlPluginPath = options.getRemoteSqlPluginPath();
105-
String pluginLoadMode = options.getPluginLoadMode();
106-
String deployMode = options.getMode();
107-
String confProp = options.getConfProp();
108-
109-
sql = URLDecoder.decode(sql, Charsets.UTF_8.name());
110-
SqlParser.setLocalSqlPluginRoot(localSqlPluginPath);
111-
112-
List<String> addJarFileList = Lists.newArrayList();
113-
if (!Strings.isNullOrEmpty(addJarListStr)) {
114-
addJarListStr = URLDecoder.decode(addJarListStr, Charsets.UTF_8.name());
115-
addJarFileList = objMapper.readValue(addJarListStr, List.class);
116-
}
117-
118-
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
119-
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
120-
StreamExecutionEnvironment env = getStreamExeEnv(confProperties, deployMode);
121-
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
122-
StreamQueryConfig streamQueryConfig = StreamEnvConfigManager.getStreamQueryConfig(tableEnv, confProperties);
123-
124-
List<URL> jarURList = Lists.newArrayList();
125-
SqlTree sqlTree = SqlParser.parseSql(sql);
126-
127-
//Get External jar to load
128-
for (String addJarPath : addJarFileList) {
129-
File tmpFile = new File(addJarPath);
130-
jarURList.add(tmpFile.toURI().toURL());
131-
}
132-
133-
Map<String, SideTableInfo> sideTableMap = Maps.newHashMap();
134-
Map<String, Table> registerTableCache = Maps.newHashMap();
135-
136-
//register udf
137-
registerUserDefinedFunction(sqlTree, jarURList, tableEnv);
138-
//register table schema
139-
Set<URL> classPathSets = registerTable(sqlTree, env, tableEnv, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode, sideTableMap, registerTableCache);
140-
// cache classPathSets
141-
registerPluginUrlToCachedFile(env, classPathSets);
142-
143-
sqlTranslation(localSqlPluginPath, tableEnv, sqlTree, sideTableMap, registerTableCache, streamQueryConfig);
144-
145-
if (env instanceof MyLocalStreamEnvironment) {
146-
((MyLocalStreamEnvironment) env).setClasspaths(ClassLoaderManager.getClassPath());
147-
}
148-
149-
env.execute(name);
150-
}
151-
152-
private static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironment tableEnv,SqlTree sqlTree,Map<String, SideTableInfo> sideTableMap,Map<String, Table> registerTableCache, StreamQueryConfig queryConfig) throws Exception {
153-
SideSqlExec sideSqlExec = new SideSqlExec();
154-
sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);
155-
for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) {
156-
sideSqlExec.registerTmpTable(result, sideTableMap, tableEnv, registerTableCache);
157-
}
158-
159-
for (InsertSqlParser.SqlParseResult result : sqlTree.getExecSqlList()) {
160-
if (LOG.isInfoEnabled()) {
161-
LOG.info("exe-sql:\n" + result.getExecSql());
162-
}
163-
boolean isSide = false;
164-
for (String tableName : result.getTargetTableList()) {
165-
if (sqlTree.getTmpTableMap().containsKey(tableName)) {
166-
CreateTmpTableParser.SqlParserResult tmp = sqlTree.getTmpTableMap().get(tableName);
167-
String realSql = DtStringUtil.replaceIgnoreQuota(result.getExecSql(), "`", "");
168-
169-
SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql, CalciteConfig.MYSQL_LEX_CONFIG).parseStmt();
170-
String tmpSql = ((SqlInsert) sqlNode).getSource().toString();
171-
tmp.setExecSql(tmpSql);
172-
sideSqlExec.registerTmpTable(tmp, sideTableMap, tableEnv, registerTableCache);
173-
} else {
174-
for (String sourceTable : result.getSourceTableList()) {
175-
if (sideTableMap.containsKey(sourceTable)) {
176-
isSide = true;
177-
break;
178-
}
179-
}
180-
if (isSide) {
181-
//sql-dimensional table contains the dimension table of execution
182-
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig);
183-
}else{
184-
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql(), queryConfig);
185-
if(LOG.isInfoEnabled()){
186-
LOG.info("exec sql: " + result.getExecSql());
187-
}
188-
}
189-
}
190-
}
191-
}
192-
193-
194-
}
195-
196-
197-
private static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarURList, TableEnvironment tableEnv)
198-
throws IllegalAccessException, InvocationTargetException {
199-
// udf和tableEnv须由同一个类加载器加载
200-
ClassLoader levelClassLoader = tableEnv.getClass().getClassLoader();
201-
URLClassLoader classLoader = null;
202-
List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList();
203-
for (CreateFuncParser.SqlParserResult funcInfo : funcList) {
204-
//classloader
205-
if (classLoader == null) {
206-
classLoader = ClassLoaderManager.loadExtraJar(jarURList, (URLClassLoader) levelClassLoader);
207-
}
208-
FunctionManager.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(), tableEnv, classLoader);
209-
}
210-
}
211-
212-
/**
213-
* 向Flink注册源表和结果表,返回执行时插件包的全路径
214-
* @param sqlTree
215-
* @param env
216-
* @param tableEnv
217-
* @param localSqlPluginPath
218-
* @param remoteSqlPluginPath
219-
* @param pluginLoadMode 插件加载模式 classpath or shipfile
220-
* @param sideTableMap
221-
* @param registerTableCache
222-
* @return
223-
* @throws Exception
224-
*/
225-
private static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, String localSqlPluginPath,
226-
String remoteSqlPluginPath, String pluginLoadMode, Map<String, SideTableInfo> sideTableMap, Map<String, Table> registerTableCache) throws Exception {
227-
Set<URL> pluginClassPatshSets = Sets.newHashSet();
228-
WaterMarkerAssigner waterMarkerAssigner = new WaterMarkerAssigner();
229-
for (TableInfo tableInfo : sqlTree.getTableInfoMap().values()) {
230-
231-
if (tableInfo instanceof SourceTableInfo) {
232-
233-
SourceTableInfo sourceTableInfo = (SourceTableInfo) tableInfo;
234-
Table table = StreamSourceFactory.getStreamSource(sourceTableInfo, env, tableEnv, localSqlPluginPath);
235-
tableEnv.registerTable(sourceTableInfo.getAdaptName(), table);
236-
//Note --- parameter conversion function can not be used inside a function of the type of polymerization
237-
//Create table in which the function is arranged only need adaptation sql
238-
String adaptSql = sourceTableInfo.getAdaptSelectSql();
239-
Table adaptTable = adaptSql == null ? table : tableEnv.sqlQuery(adaptSql);
240-
241-
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getTypes(), adaptTable.getSchema().getColumnNames());
242-
DataStream adaptStream = tableEnv.toRetractStream(adaptTable, typeInfo)
243-
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
244-
.returns(typeInfo);
245-
246-
String fields = String.join(",", typeInfo.getFieldNames());
247-
248-
if (waterMarkerAssigner.checkNeedAssignWaterMarker(sourceTableInfo)) {
249-
adaptStream = waterMarkerAssigner.assignWaterMarker(adaptStream, typeInfo, sourceTableInfo);
250-
fields += ",ROWTIME.ROWTIME";
251-
} else {
252-
fields += ",PROCTIME.PROCTIME";
253-
}
254-
255-
Table regTable = tableEnv.fromDataStream(adaptStream, fields);
256-
tableEnv.registerTable(tableInfo.getName(), regTable);
257-
if (LOG.isInfoEnabled()) {
258-
LOG.info("registe table {} success.", tableInfo.getName());
259-
}
260-
registerTableCache.put(tableInfo.getName(), regTable);
261-
262-
URL sourceTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
263-
pluginClassPatshSets.add(sourceTablePathUrl);
264-
} else if (tableInfo instanceof TargetTableInfo) {
265-
266-
TableSink tableSink = StreamSinkFactory.getTableSink((TargetTableInfo) tableInfo, localSqlPluginPath);
267-
TypeInformation[] flinkTypes = FunctionManager.transformTypes(tableInfo.getFieldClasses());
268-
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);
269-
270-
URL sinkTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), TargetTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
271-
pluginClassPatshSets.add(sinkTablePathUrl);
272-
} else if (tableInfo instanceof SideTableInfo) {
273-
String sideOperator = ECacheType.ALL.name().equals(((SideTableInfo) tableInfo).getCacheType()) ? "all" : "async";
274-
sideTableMap.put(tableInfo.getName(), (SideTableInfo) tableInfo);
275-
276-
URL sideTablePathUrl = PluginUtil.buildSidePathByLoadMode(tableInfo.getType(), sideOperator, SideTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
277-
pluginClassPatshSets.add(sideTablePathUrl);
278-
} else {
279-
throw new RuntimeException("not support table type:" + tableInfo.getType());
280-
}
281-
}
282-
return pluginClassPatshSets;
283-
}
284-
285-
/**
286-
* perjob模式将job依赖的插件包路径存储到cacheFile,在外围将插件包路径传递给jobgraph
287-
* @param env
288-
* @param classPathSet
289-
*/
290-
private static void registerPluginUrlToCachedFile(StreamExecutionEnvironment env, Set<URL> classPathSet) {
291-
int i = 0;
292-
for (URL url : classPathSet) {
293-
String classFileName = String.format(CLASS_FILE_NAME_FMT, i);
294-
env.registerCachedFile(url.getPath(), classFileName, true);
295-
i++;
296-
}
297-
}
298-
299-
private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws Exception {
300-
StreamExecutionEnvironment env = !ClusterMode.local.name().equals(deployMode) ?
301-
StreamExecutionEnvironment.getExecutionEnvironment() :
302-
new MyLocalStreamEnvironment();
303-
304-
StreamEnvConfigManager.streamExecutionEnvironmentConfig(env, confProperties);
305-
return env;
41+
ParamsInfo paramsInfo = ExecuteProcessHelper.parseParams(args);
42+
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExecution(paramsInfo);
43+
env.execute(paramsInfo.getName());
44+
LOG.info("program {} execution success", paramsInfo.getName());
30645
}
30746
}

0 commit comments

Comments
 (0)