Skip to content

Commit debaa0c

Browse files
committed
Merge remote-tracking branch 'origin/v1.8.0_dev' into feat_1.8_codereview
2 parents aac3a27 + fe6f2e6 commit debaa0c

File tree

11 files changed

+811
-277
lines changed

11 files changed

+811
-277
lines changed

core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,11 @@
121121
<artifactId>junit</artifactId>
122122
<version>4.12</version>
123123
</dependency>
124+
<dependency>
125+
<groupId>ch.qos.logback</groupId>
126+
<artifactId>logback-classic</artifactId>
127+
<version>1.1.7</version>
128+
</dependency>
124129

125130
</dependencies>
126131

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql;
20+
21+
import com.dtstack.flink.sql.exec.ApiResult;
22+
import com.dtstack.flink.sql.exec.ExecuteProcessHelper;
23+
import com.dtstack.flink.sql.exec.ParamsInfo;
24+
import org.apache.commons.lang.exception.ExceptionUtils;
25+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
26+
27+
/**
28+
* local模式获取sql任务的执行计划
29+
* Date: 2020/2/17
30+
* Company: www.dtstack.com
31+
* @author maqi
32+
*/
33+
public class GetPlan {
34+
35+
public static String getExecutionPlan(String[] args) {
36+
try {
37+
long start = System.currentTimeMillis();
38+
ParamsInfo paramsInfo = ExecuteProcessHelper.parseParams(args);
39+
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExecution(paramsInfo);
40+
String executionPlan = env.getExecutionPlan();
41+
long end = System.currentTimeMillis();
42+
return ApiResult.createSuccessResultJsonStr(executionPlan, end - start);
43+
} catch (Exception e) {
44+
return ApiResult.createErrorResultJsonStr(ExceptionUtils.getFullStackTrace(e));
45+
}
46+
}
47+
}

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 15 additions & 272 deletions
Original file line numberDiff line numberDiff line change
@@ -19,64 +19,16 @@
1919

2020
package com.dtstack.flink.sql;
2121

22-
import com.dtstack.flink.sql.config.CalciteConfig;
23-
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
24-
import com.dtstack.flink.sql.enums.ClusterMode;
25-
import com.dtstack.flink.sql.enums.ECacheType;
26-
import com.dtstack.flink.sql.environment.MyLocalStreamEnvironment;
27-
import com.dtstack.flink.sql.environment.StreamEnvConfigManager;
28-
import com.dtstack.flink.sql.exec.FlinkSQLExec;
29-
import com.dtstack.flink.sql.option.OptionParser;
30-
import com.dtstack.flink.sql.parser.CreateFuncParser;
31-
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
32-
import com.dtstack.flink.sql.parser.InsertSqlParser;
33-
import com.dtstack.flink.sql.parser.SqlParser;
34-
import com.dtstack.flink.sql.parser.SqlTree;
35-
import com.dtstack.flink.sql.side.SideSqlExec;
36-
import com.dtstack.flink.sql.side.SideTableInfo;
37-
import com.dtstack.flink.sql.table.SourceTableInfo;
38-
import com.dtstack.flink.sql.table.TableInfo;
39-
import com.dtstack.flink.sql.table.TargetTableInfo;
40-
import com.dtstack.flink.sql.sink.StreamSinkFactory;
41-
import com.dtstack.flink.sql.source.StreamSourceFactory;
42-
import com.dtstack.flink.sql.util.DtStringUtil;
43-
import com.dtstack.flink.sql.watermarker.WaterMarkerAssigner;
44-
import com.dtstack.flink.sql.function.FunctionManager;
45-
import com.dtstack.flink.sql.util.PluginUtil;
46-
import org.apache.calcite.sql.SqlInsert;
47-
import org.apache.calcite.sql.SqlNode;
48-
import org.apache.commons.io.Charsets;
49-
import org.apache.flink.api.common.typeinfo.TypeInformation;
50-
import org.apache.flink.api.java.tuple.Tuple2;
51-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
52-
import com.google.common.base.Strings;
53-
import com.google.common.collect.Lists;
54-
import com.google.common.collect.Maps;
55-
import com.google.common.collect.Sets;
56-
import com.fasterxml.jackson.databind.ObjectMapper;
57-
import org.apache.flink.streaming.api.datastream.DataStream;
22+
23+
24+
import com.dtstack.flink.sql.exec.ExecuteProcessHelper;
25+
import com.dtstack.flink.sql.exec.ParamsInfo;
5826
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
59-
import org.apache.flink.table.api.StreamQueryConfig;
60-
import org.apache.flink.table.api.Table;
61-
import org.apache.flink.table.api.TableEnvironment;
62-
import org.apache.flink.table.api.java.StreamTableEnvironment;
63-
import org.apache.flink.table.sinks.TableSink;
64-
import org.apache.flink.types.Row;
6527
import org.slf4j.Logger;
6628
import org.slf4j.LoggerFactory;
6729

68-
import java.io.File;
69-
import java.lang.reflect.InvocationTargetException;
70-
import java.net.URL;
71-
import java.net.URLClassLoader;
72-
import java.net.URLDecoder;
73-
import java.util.List;
74-
import java.util.Map;
75-
import java.util.Optional;
76-
import java.util.Properties;
77-
import java.util.Set;
78-
79-
import com.dtstack.flink.sql.option.Options;
30+
import ch.qos.logback.classic.Level;
31+
import ch.qos.logback.classic.LoggerContext;
8032

8133
/**
8234
* Date: 2018/6/26
@@ -85,227 +37,18 @@
8537
*/
8638

8739
public class Main {
88-
89-
private static final String CLASS_FILE_NAME_FMT = "class_path_%d";
90-
91-
private static final ObjectMapper objMapper = new ObjectMapper();
92-
9340
private static final Logger LOG = LoggerFactory.getLogger(Main.class);
9441

95-
9642
public static void main(String[] args) throws Exception {
97-
98-
OptionParser optionParser = new OptionParser(args);
99-
Options options = optionParser.getOptions();
100-
String sql = options.getSql();
101-
String name = options.getName();
102-
String addJarListStr = options.getAddjar();
103-
String localSqlPluginPath = options.getLocalSqlPluginPath();
104-
String remoteSqlPluginPath = options.getRemoteSqlPluginPath();
105-
String pluginLoadMode = options.getPluginLoadMode();
106-
String deployMode = options.getMode();
107-
String confProp = options.getConfProp();
108-
109-
sql = URLDecoder.decode(sql, Charsets.UTF_8.name());
110-
SqlParser.setLocalSqlPluginRoot(localSqlPluginPath);
111-
112-
List<String> addJarFileList = Lists.newArrayList();
113-
if (!Strings.isNullOrEmpty(addJarListStr)) {
114-
addJarListStr = URLDecoder.decode(addJarListStr, Charsets.UTF_8.name());
115-
addJarFileList = objMapper.readValue(addJarListStr, List.class);
116-
}
117-
118-
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
119-
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
120-
StreamExecutionEnvironment env = getStreamExeEnv(confProperties, deployMode);
121-
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
122-
StreamQueryConfig streamQueryConfig = StreamEnvConfigManager.getStreamQueryConfig(tableEnv, confProperties);
123-
124-
List<URL> jarURList = Lists.newArrayList();
125-
SqlTree sqlTree = SqlParser.parseSql(sql);
126-
127-
//Get External jar to load
128-
for (String addJarPath : addJarFileList) {
129-
File tmpFile = new File(addJarPath);
130-
jarURList.add(tmpFile.toURI().toURL());
131-
}
132-
133-
Map<String, SideTableInfo> sideTableMap = Maps.newHashMap();
134-
Map<String, Table> registerTableCache = Maps.newHashMap();
135-
136-
//register udf
137-
registerUserDefinedFunction(sqlTree, jarURList, tableEnv);
138-
//register table schema
139-
Set<URL> classPathSets = registerTable(sqlTree, env, tableEnv, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode, sideTableMap, registerTableCache);
140-
// cache classPathSets
141-
registerPluginUrlToCachedFile(env, classPathSets);
142-
143-
sqlTranslation(localSqlPluginPath, tableEnv, sqlTree, sideTableMap, registerTableCache, streamQueryConfig);
144-
145-
if (env instanceof MyLocalStreamEnvironment) {
146-
((MyLocalStreamEnvironment) env).setClasspaths(ClassLoaderManager.getClassPath());
147-
}
148-
149-
env.execute(name);
43+
ParamsInfo paramsInfo = ExecuteProcessHelper.parseParams(args);
44+
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExecution(paramsInfo);
45+
env.execute(paramsInfo.getName());
46+
LOG.info("program {} execution success", paramsInfo.getName());
15047
}
151-
152-
private static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironment tableEnv,SqlTree sqlTree,Map<String, SideTableInfo> sideTableMap,Map<String, Table> registerTableCache, StreamQueryConfig queryConfig) throws Exception {
153-
SideSqlExec sideSqlExec = new SideSqlExec();
154-
sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);
155-
for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) {
156-
sideSqlExec.registerTmpTable(result, sideTableMap, tableEnv, registerTableCache);
157-
}
158-
159-
for (InsertSqlParser.SqlParseResult result : sqlTree.getExecSqlList()) {
160-
if (LOG.isInfoEnabled()) {
161-
LOG.info("exe-sql:\n" + result.getExecSql());
162-
}
163-
boolean isSide = false;
164-
for (String tableName : result.getTargetTableList()) {
165-
if (sqlTree.getTmpTableMap().containsKey(tableName)) {
166-
CreateTmpTableParser.SqlParserResult tmp = sqlTree.getTmpTableMap().get(tableName);
167-
String realSql = DtStringUtil.replaceIgnoreQuota(result.getExecSql(), "`", "");
168-
169-
SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql, CalciteConfig.MYSQL_LEX_CONFIG).parseStmt();
170-
String tmpSql = ((SqlInsert) sqlNode).getSource().toString();
171-
tmp.setExecSql(tmpSql);
172-
sideSqlExec.registerTmpTable(tmp, sideTableMap, tableEnv, registerTableCache);
173-
} else {
174-
for (String sourceTable : result.getSourceTableList()) {
175-
if (sideTableMap.containsKey(sourceTable)) {
176-
isSide = true;
177-
break;
178-
}
179-
}
180-
if (isSide) {
181-
//sql-dimensional table contains the dimension table of execution
182-
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig);
183-
}else{
184-
System.out.println("----------exec sql without dimension join-----------" );
185-
System.out.println("----------real sql exec is--------------------------");
186-
System.out.println(result.getExecSql());
187-
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql(), queryConfig);
188-
if(LOG.isInfoEnabled()){
189-
System.out.println();
190-
LOG.info("exec sql: " + result.getExecSql());
191-
}
192-
}
193-
}
194-
}
195-
}
196-
197-
198-
}
199-
200-
201-
private static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarURList, TableEnvironment tableEnv)
202-
throws IllegalAccessException, InvocationTargetException {
203-
// udf和tableEnv须由同一个类加载器加载
204-
ClassLoader levelClassLoader = tableEnv.getClass().getClassLoader();
205-
URLClassLoader classLoader = null;
206-
List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList();
207-
for (CreateFuncParser.SqlParserResult funcInfo : funcList) {
208-
//classloader
209-
if (classLoader == null) {
210-
classLoader = ClassLoaderManager.loadExtraJar(jarURList, (URLClassLoader) levelClassLoader);
211-
}
212-
FunctionManager.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(), tableEnv, classLoader);
213-
}
214-
}
215-
216-
/**
217-
* 向Flink注册源表和结果表,返回执行时插件包的全路径
218-
* @param sqlTree
219-
* @param env
220-
* @param tableEnv
221-
* @param localSqlPluginPath
222-
* @param remoteSqlPluginPath
223-
* @param pluginLoadMode 插件加载模式 classpath or shipfile
224-
* @param sideTableMap
225-
* @param registerTableCache
226-
* @return
227-
* @throws Exception
228-
*/
229-
private static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, String localSqlPluginPath,
230-
String remoteSqlPluginPath, String pluginLoadMode, Map<String, SideTableInfo> sideTableMap, Map<String, Table> registerTableCache) throws Exception {
231-
Set<URL> pluginClassPatshSets = Sets.newHashSet();
232-
WaterMarkerAssigner waterMarkerAssigner = new WaterMarkerAssigner();
233-
for (TableInfo tableInfo : sqlTree.getTableInfoMap().values()) {
234-
235-
if (tableInfo instanceof SourceTableInfo) {
236-
237-
SourceTableInfo sourceTableInfo = (SourceTableInfo) tableInfo;
238-
Table table = StreamSourceFactory.getStreamSource(sourceTableInfo, env, tableEnv, localSqlPluginPath);
239-
tableEnv.registerTable(sourceTableInfo.getAdaptName(), table);
240-
//Note --- parameter conversion function can not be used inside a function of the type of polymerization
241-
//Create table in which the function is arranged only need adaptation sql
242-
String adaptSql = sourceTableInfo.getAdaptSelectSql();
243-
Table adaptTable = adaptSql == null ? table : tableEnv.sqlQuery(adaptSql);
244-
245-
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getTypes(), adaptTable.getSchema().getColumnNames());
246-
DataStream adaptStream = tableEnv.toRetractStream(adaptTable, typeInfo)
247-
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
248-
.returns(typeInfo);
249-
250-
String fields = String.join(",", typeInfo.getFieldNames());
251-
252-
if (waterMarkerAssigner.checkNeedAssignWaterMarker(sourceTableInfo)) {
253-
adaptStream = waterMarkerAssigner.assignWaterMarker(adaptStream, typeInfo, sourceTableInfo);
254-
fields += ",ROWTIME.ROWTIME";
255-
} else {
256-
fields += ",PROCTIME.PROCTIME";
257-
}
258-
259-
Table regTable = tableEnv.fromDataStream(adaptStream, fields);
260-
tableEnv.registerTable(tableInfo.getName(), regTable);
261-
if (LOG.isInfoEnabled()) {
262-
LOG.info("registe table {} success.", tableInfo.getName());
263-
}
264-
registerTableCache.put(tableInfo.getName(), regTable);
265-
266-
URL sourceTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
267-
pluginClassPatshSets.add(sourceTablePathUrl);
268-
} else if (tableInfo instanceof TargetTableInfo) {
269-
270-
TableSink tableSink = StreamSinkFactory.getTableSink((TargetTableInfo) tableInfo, localSqlPluginPath);
271-
TypeInformation[] flinkTypes = FunctionManager.transformTypes(tableInfo.getFieldClasses());
272-
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);
273-
274-
URL sinkTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), TargetTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
275-
pluginClassPatshSets.add(sinkTablePathUrl);
276-
} else if (tableInfo instanceof SideTableInfo) {
277-
String sideOperator = ECacheType.ALL.name().equals(((SideTableInfo) tableInfo).getCacheType()) ? "all" : "async";
278-
sideTableMap.put(tableInfo.getName(), (SideTableInfo) tableInfo);
279-
280-
URL sideTablePathUrl = PluginUtil.buildSidePathByLoadMode(tableInfo.getType(), sideOperator, SideTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
281-
pluginClassPatshSets.add(sideTablePathUrl);
282-
} else {
283-
throw new RuntimeException("not support table type:" + tableInfo.getType());
284-
}
285-
}
286-
return pluginClassPatshSets;
287-
}
288-
289-
/**
290-
* perjob模式将job依赖的插件包路径存储到cacheFile,在外围将插件包路径传递给jobgraph
291-
* @param env
292-
* @param classPathSet
293-
*/
294-
private static void registerPluginUrlToCachedFile(StreamExecutionEnvironment env, Set<URL> classPathSet) {
295-
int i = 0;
296-
for (URL url : classPathSet) {
297-
String classFileName = String.format(CLASS_FILE_NAME_FMT, i);
298-
env.registerCachedFile(url.getPath(), classFileName, true);
299-
i++;
300-
}
301-
}
302-
303-
private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws Exception {
304-
StreamExecutionEnvironment env = !ClusterMode.local.name().equals(deployMode) ?
305-
StreamExecutionEnvironment.getExecutionEnvironment() :
306-
new MyLocalStreamEnvironment();
307-
308-
StreamEnvConfigManager.streamExecutionEnvironmentConfig(env, confProperties);
309-
return env;
48+
private static void setLogLevel(String level){
49+
LoggerContext loggerContext= (LoggerContext) LoggerFactory.getILoggerFactory();
50+
//设置全局日志级别
51+
ch.qos.logback.classic.Logger logger = loggerContext.getLogger("root");
52+
logger.setLevel(Level.toLevel(level, Level.INFO));
31053
}
31154
}

0 commit comments

Comments
 (0)