Skip to content

Commit 215acfc

Browse files
committed
[opt-33307][core] 调整日志信息和代码格式
1 parent c38da77 commit 215acfc

File tree

6 files changed

+154
-131
lines changed

6 files changed

+154
-131
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,13 @@ public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInf
170170
// cache classPathSets
171171
ExecuteProcessHelper.registerPluginUrlToCachedFile(env, classPathSets);
172172

173-
ExecuteProcessHelper.sqlTranslation(paramsInfo.getLocalSqlPluginPath(), paramsInfo.getPluginLoadMode(),tableEnv, sqlTree, sideTableMap, registerTableCache);
173+
ExecuteProcessHelper.sqlTranslation(
174+
paramsInfo.getLocalSqlPluginPath(),
175+
paramsInfo.getPluginLoadMode(),
176+
tableEnv,
177+
sqlTree,
178+
sideTableMap,
179+
registerTableCache);
174180

175181
if (env instanceof MyLocalStreamEnvironment) {
176182
((MyLocalStreamEnvironment) env).setClasspaths(ClassLoaderManager.getClassPath());
@@ -282,8 +288,14 @@ public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrl
282288
* @return
283289
* @throws Exception
284290
*/
285-
public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, String localSqlPluginPath,
286-
String remoteSqlPluginPath, String pluginLoadMode, Map<String, AbstractSideTableInfo> sideTableMap, Map<String, Table> registerTableCache) throws Exception {
291+
public static Set<URL> registerTable(SqlTree sqlTree,
292+
StreamExecutionEnvironment env,
293+
StreamTableEnvironment tableEnv,
294+
String localSqlPluginPath,
295+
String remoteSqlPluginPath,
296+
String pluginLoadMode,
297+
Map<String, AbstractSideTableInfo> sideTableMap,
298+
Map<String, Table> registerTableCache) throws Exception {
287299
Set<URL> pluginClassPathSets = Sets.newHashSet();
288300
WaterMarkerAssigner waterMarkerAssigner = new WaterMarkerAssigner();
289301
for (AbstractTableInfo tableInfo : sqlTree.getTableInfoMap().values()) {

core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java

Lines changed: 72 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,17 @@
1717
*/
1818

1919

20-
2120
package com.dtstack.flink.sql.parser;
2221

2322
import com.dtstack.flink.sql.util.DtStringUtil;
23+
import com.google.common.collect.Lists;
2424
import org.apache.calcite.sql.SqlBasicCall;
2525
import org.apache.calcite.sql.SqlJoin;
2626
import org.apache.calcite.sql.SqlKind;
27+
import org.apache.calcite.sql.SqlMatchRecognize;
2728
import org.apache.calcite.sql.SqlNode;
2829
import org.apache.calcite.sql.SqlSelect;
29-
import org.apache.calcite.sql.SqlMatchRecognize;
30-
import com.google.common.collect.Lists;
30+
3131
import java.util.List;
3232
import java.util.regex.Matcher;
3333
import java.util.regex.Pattern;
@@ -38,6 +38,7 @@
3838
* parser create tmp table sql
3939
* Date: 2018/6/26
4040
* Company: www.dtstack.com
41+
*
4142
* @author yanxi
4243
*/
4344
public class CreateTmpTableParser implements IParser {
@@ -51,27 +52,87 @@ public class CreateTmpTableParser implements IParser {
5152

5253
private static final Pattern EMPTYVIEW = Pattern.compile(EMPTY_STR);
5354

54-
private FlinkPlanner flinkPlanner = new FlinkPlanner();
55+
private final FlinkPlanner flinkPlanner = new FlinkPlanner();
5556

56-
public static CreateTmpTableParser newInstance(){
57+
public static CreateTmpTableParser newInstance() {
5758
return new CreateTmpTableParser();
5859
}
5960

61+
private static void parseNode(SqlNode sqlNode, CreateTmpTableParser.SqlParserResult sqlParseResult) {
62+
SqlKind sqlKind = sqlNode.getKind();
63+
switch (sqlKind) {
64+
case SELECT:
65+
SqlNode sqlFrom = ((SqlSelect) sqlNode).getFrom();
66+
if (sqlFrom.getKind() == IDENTIFIER) {
67+
sqlParseResult.addSourceTable(sqlFrom.toString());
68+
} else {
69+
parseNode(sqlFrom, sqlParseResult);
70+
}
71+
break;
72+
case JOIN:
73+
SqlNode leftNode = ((SqlJoin) sqlNode).getLeft();
74+
SqlNode rightNode = ((SqlJoin) sqlNode).getRight();
75+
76+
if (leftNode.getKind() == IDENTIFIER) {
77+
sqlParseResult.addSourceTable(leftNode.toString());
78+
} else {
79+
parseNode(leftNode, sqlParseResult);
80+
}
81+
82+
if (rightNode.getKind() == IDENTIFIER) {
83+
sqlParseResult.addSourceTable(rightNode.toString());
84+
} else {
85+
parseNode(rightNode, sqlParseResult);
86+
}
87+
break;
88+
case AS:
89+
//不解析column,所以 as 相关的都是表
90+
SqlNode identifierNode = ((SqlBasicCall) sqlNode).getOperands()[0];
91+
if (identifierNode.getKind() != IDENTIFIER) {
92+
parseNode(identifierNode, sqlParseResult);
93+
} else {
94+
sqlParseResult.addSourceTable(identifierNode.toString());
95+
}
96+
break;
97+
case UNION:
98+
SqlNode unionLeft = ((SqlBasicCall) sqlNode).getOperands()[0];
99+
SqlNode unionRight = ((SqlBasicCall) sqlNode).getOperands()[1];
100+
if (unionLeft.getKind() == IDENTIFIER) {
101+
sqlParseResult.addSourceTable(unionLeft.toString());
102+
} else {
103+
parseNode(unionLeft, sqlParseResult);
104+
}
105+
if (unionRight.getKind() == IDENTIFIER) {
106+
sqlParseResult.addSourceTable(unionRight.toString());
107+
} else {
108+
parseNode(unionRight, sqlParseResult);
109+
}
110+
break;
111+
case MATCH_RECOGNIZE:
112+
SqlMatchRecognize node = (SqlMatchRecognize) sqlNode;
113+
sqlParseResult.addSourceTable(node.getTableRef().toString());
114+
break;
115+
default:
116+
//do nothing
117+
break;
118+
}
119+
}
120+
60121
@Override
61122
public boolean verify(String sql) {
62-
if (Pattern.compile(EMPTY_STR).matcher(sql).find()){
123+
if (Pattern.compile(EMPTY_STR).matcher(sql).find()) {
63124
return true;
64125
}
65126
return NONEMPTYVIEW.matcher(sql).find();
66127
}
67128

68129
@Override
69130
public void parseSql(String sql, SqlTree sqlTree) {
70-
if (NONEMPTYVIEW.matcher(sql).find()){
131+
if (NONEMPTYVIEW.matcher(sql).find()) {
71132
Matcher matcher = NONEMPTYVIEW.matcher(sql);
72133
String tableName = null;
73134
String selectSql = null;
74-
if(matcher.find()) {
135+
if (matcher.find()) {
75136
tableName = matcher.group(1);
76137
selectSql = "select " + matcher.group(2);
77138
}
@@ -92,12 +153,11 @@ public void parseSql(String sql, SqlTree sqlTree) {
92153
sqlTree.addTmpSql(sqlParseResult);
93154
sqlTree.addTmplTableInfo(tableName, sqlParseResult);
94155
} else {
95-
if (EMPTYVIEW.matcher(sql).find())
96-
{
156+
if (EMPTYVIEW.matcher(sql).find()) {
97157
Matcher matcher = EMPTYVIEW.matcher(sql);
98158
String tableName = null;
99159
String fieldsInfoStr = null;
100-
if (matcher.find()){
160+
if (matcher.find()) {
101161
tableName = matcher.group(1);
102162
fieldsInfoStr = matcher.group(2);
103163
}
@@ -106,68 +166,6 @@ public void parseSql(String sql, SqlTree sqlTree) {
106166
sqlParseResult.setTableName(tableName);
107167
sqlTree.addTmplTableInfo(tableName, sqlParseResult);
108168
}
109-
110-
}
111-
112-
}
113-
114-
private static void parseNode(SqlNode sqlNode, CreateTmpTableParser.SqlParserResult sqlParseResult){
115-
SqlKind sqlKind = sqlNode.getKind();
116-
switch (sqlKind){
117-
case SELECT:
118-
SqlNode sqlFrom = ((SqlSelect)sqlNode).getFrom();
119-
if(sqlFrom.getKind() == IDENTIFIER){
120-
sqlParseResult.addSourceTable(sqlFrom.toString());
121-
}else{
122-
parseNode(sqlFrom, sqlParseResult);
123-
}
124-
break;
125-
case JOIN:
126-
SqlNode leftNode = ((SqlJoin)sqlNode).getLeft();
127-
SqlNode rightNode = ((SqlJoin)sqlNode).getRight();
128-
129-
if(leftNode.getKind() == IDENTIFIER){
130-
sqlParseResult.addSourceTable(leftNode.toString());
131-
}else{
132-
parseNode(leftNode, sqlParseResult);
133-
}
134-
135-
if(rightNode.getKind() == IDENTIFIER){
136-
sqlParseResult.addSourceTable(rightNode.toString());
137-
}else{
138-
parseNode(rightNode, sqlParseResult);
139-
}
140-
break;
141-
case AS:
142-
//不解析column,所以 as 相关的都是表
143-
SqlNode identifierNode = ((SqlBasicCall)sqlNode).getOperands()[0];
144-
if(identifierNode.getKind() != IDENTIFIER){
145-
parseNode(identifierNode, sqlParseResult);
146-
}else {
147-
sqlParseResult.addSourceTable(identifierNode.toString());
148-
}
149-
break;
150-
case UNION:
151-
SqlNode unionLeft = ((SqlBasicCall)sqlNode).getOperands()[0];
152-
SqlNode unionRight = ((SqlBasicCall)sqlNode).getOperands()[1];
153-
if(unionLeft.getKind() == IDENTIFIER){
154-
sqlParseResult.addSourceTable(unionLeft.toString());
155-
}else{
156-
parseNode(unionLeft, sqlParseResult);
157-
}
158-
if(unionRight.getKind() == IDENTIFIER){
159-
sqlParseResult.addSourceTable(unionRight.toString());
160-
}else{
161-
parseNode(unionRight, sqlParseResult);
162-
}
163-
break;
164-
case MATCH_RECOGNIZE:
165-
SqlMatchRecognize node = (SqlMatchRecognize) sqlNode;
166-
sqlParseResult.addSourceTable(node.getTableRef().toString());
167-
break;
168-
default:
169-
//do nothing
170-
break;
171169
}
172170
}
173171

@@ -204,13 +202,12 @@ public void setFieldsInfoStr(String fieldsInfoStr) {
204202
this.fieldsInfoStr = fieldsInfoStr;
205203
}
206204

207-
public void addSourceTable(String sourceTable){
205+
public void addSourceTable(String sourceTable) {
208206
sourceTableList.add(sourceTable);
209207
}
210208

211209
public List<String> getSourceTableList() {
212210
return sourceTableList;
213211
}
214-
215212
}
216213
}

core/src/main/java/com/dtstack/flink/sql/parser/FlinkPlanner.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
* Company: www.dtstack.com
4141
* @author maqi
4242
*/
43+
@Deprecated
4344
public class FlinkPlanner {
4445

4546
private final TableConfig tableConfig = new TableConfig();
@@ -54,7 +55,8 @@ public class FlinkPlanner {
5455
catalogManager,
5556
moduleManager);
5657
private final PlannerContext plannerContext =
57-
new PlannerContext(tableConfig,
58+
new PlannerContext(
59+
tableConfig,
5860
functionCatalog,
5961
catalogManager,
6062
asRootSchema(new CatalogManagerCalciteSchema(catalogManager, false)),

0 commit comments

Comments
 (0)