Skip to content

Commit e5dbaf4

Browse files
committed
use flink planner
1 parent 0b24159 commit e5dbaf4

File tree

7 files changed

+76
-76
lines changed

7 files changed

+76
-76
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package com.dtstack.flink.sql;
2121

22-
import com.dtstack.flink.sql.config.CalciteConfig;
2322
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2423
import com.dtstack.flink.sql.enums.ClusterMode;
2524
import com.dtstack.flink.sql.enums.ECacheType;
@@ -29,6 +28,7 @@
2928
import com.dtstack.flink.sql.option.OptionParser;
3029
import com.dtstack.flink.sql.parser.CreateFuncParser;
3130
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
31+
import com.dtstack.flink.sql.parser.FlinkPlanner;
3232
import com.dtstack.flink.sql.parser.InsertSqlParser;
3333
import com.dtstack.flink.sql.parser.SqlParser;
3434
import com.dtstack.flink.sql.parser.SqlTree;
@@ -60,6 +60,7 @@
6060
import org.apache.flink.table.api.Table;
6161
import org.apache.flink.table.api.TableEnvironment;
6262
import org.apache.flink.table.api.java.StreamTableEnvironment;
63+
import org.apache.flink.table.calcite.FlinkPlannerImpl;
6364
import org.apache.flink.table.sinks.TableSink;
6465
import org.apache.flink.types.Row;
6566
import org.slf4j.Logger;
@@ -72,7 +73,6 @@
7273
import java.net.URLDecoder;
7374
import java.util.List;
7475
import java.util.Map;
75-
import java.util.Optional;
7676
import java.util.Properties;
7777
import java.util.Set;
7878

@@ -120,6 +120,8 @@ public static void main(String[] args) throws Exception {
120120
StreamExecutionEnvironment env = getStreamExeEnv(confProperties, deployMode);
121121
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
122122
StreamQueryConfig streamQueryConfig = StreamEnvConfigManager.getStreamQueryConfig(tableEnv, confProperties);
123+
// init global flinkPlanner
124+
FlinkPlanner.createFlinkPlanner(tableEnv.getFrameworkConfig(), tableEnv.getPlanner(), tableEnv.getTypeFactory());
123125

124126
List<URL> jarURList = Lists.newArrayList();
125127
SqlTree sqlTree = SqlParser.parseSql(sql);
@@ -171,8 +173,9 @@ private static void sqlTranslation(String localSqlPluginPath,
171173
if (sqlTree.getTmpTableMap().containsKey(tableName)) {
172174
CreateTmpTableParser.SqlParserResult tmp = sqlTree.getTmpTableMap().get(tableName);
173175
String realSql = DtStringUtil.replaceIgnoreQuota(result.getExecSql(), "`", "");
176+
FlinkPlannerImpl flinkPlanner = FlinkPlanner.getFlinkPlanner();
174177

175-
SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql, CalciteConfig.MYSQL_LEX_CONFIG).parseStmt();
178+
SqlNode sqlNode = flinkPlanner.parse(realSql);
176179
String tmpSql = ((SqlInsert) sqlNode).getSource().toString();
177180
tmp.setExecSql(tmpSql);
178181
sideSqlExec.exec(tmp.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, tmp);

core/src/main/java/com/dtstack/flink/sql/config/CalciteConfig.java

Lines changed: 0 additions & 35 deletions
This file was deleted.

core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,10 @@
2121
package com.dtstack.flink.sql.parser;
2222

2323
import com.dtstack.flink.sql.util.DtStringUtil;
24-
import org.apache.calcite.config.Lex;
2524
import org.apache.calcite.sql.*;
26-
import org.apache.calcite.sql.parser.SqlParseException;
27-
import org.apache.calcite.sql.parser.SqlParser;
2825
import com.google.common.collect.Lists;
26+
import org.apache.flink.table.calcite.FlinkPlannerImpl;
27+
2928
import java.util.List;
3029
import java.util.regex.Matcher;
3130
import java.util.regex.Pattern;
@@ -71,17 +70,12 @@ public void parseSql(String sql, SqlTree sqlTree) {
7170
tableName = matcher.group(1);
7271
selectSql = "select " + matcher.group(2);
7372
}
74-
75-
SqlParser.Config config = SqlParser
76-
.configBuilder()
77-
.setLex(Lex.MYSQL)
78-
.build();
79-
SqlParser sqlParser = SqlParser.create(selectSql,config);
73+
FlinkPlannerImpl flinkPlanner = FlinkPlanner.getFlinkPlanner();
8074

8175
SqlNode sqlNode = null;
8276
try {
83-
sqlNode = sqlParser.parseStmt();
84-
} catch (SqlParseException e) {
77+
sqlNode = flinkPlanner.parse(selectSql);
78+
} catch (Exception e) {
8579
throw new RuntimeException("", e);
8680
}
8781

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.parser;
20+
21+
import org.apache.calcite.plan.RelOptPlanner;
22+
import org.apache.calcite.tools.FrameworkConfig;
23+
import org.apache.flink.table.calcite.FlinkPlannerImpl;
24+
import org.apache.flink.table.calcite.FlinkTypeFactory;
25+
26+
/**
27+
* Date: 2020/3/31
28+
* Company: www.dtstack.com
29+
* @author maqi
30+
*/
31+
public class FlinkPlanner {
32+
33+
public static volatile FlinkPlannerImpl flinkPlanner;
34+
35+
private FlinkPlanner() {
36+
}
37+
38+
public static FlinkPlannerImpl createFlinkPlanner(FrameworkConfig frameworkConfig, RelOptPlanner relOptPlanner, FlinkTypeFactory typeFactory) {
39+
if (flinkPlanner == null) {
40+
synchronized (FlinkPlanner.class) {
41+
if (flinkPlanner == null) {
42+
flinkPlanner = new FlinkPlannerImpl(frameworkConfig, relOptPlanner, typeFactory);
43+
}
44+
}
45+
}
46+
return flinkPlanner;
47+
}
48+
49+
public static FlinkPlannerImpl getFlinkPlanner() {
50+
return flinkPlanner;
51+
}
52+
}

core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
package com.dtstack.flink.sql.side;
2121

22-
import com.dtstack.flink.sql.config.CalciteConfig;
22+
import com.dtstack.flink.sql.parser.FlinkPlanner;
2323
import com.dtstack.flink.sql.util.ParseUtils;
2424
import com.dtstack.flink.sql.util.TableUtils;
2525
import com.google.common.base.Preconditions;
@@ -38,11 +38,11 @@
3838
import org.apache.calcite.sql.SqlOperator;
3939
import org.apache.calcite.sql.fun.SqlCase;
4040
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
41-
import org.apache.calcite.sql.parser.SqlParser;
4241
import org.apache.calcite.sql.parser.SqlParserPos;
4342
import org.apache.commons.collections.CollectionUtils;
4443
import org.apache.commons.lang3.StringUtils;
4544
import org.apache.flink.api.java.tuple.Tuple2;
45+
import org.apache.flink.table.calcite.FlinkPlannerImpl;
4646

4747
import java.util.List;
4848
import java.util.Map;
@@ -416,8 +416,9 @@ private void extractTemporaryQuery(SqlNode node, String tableAlias,
416416
node.toString(),
417417
extractConditionStr);
418418

419-
SqlParser sqlParser = SqlParser.create(tmpSelectSql, CalciteConfig.MYSQL_LEX_CONFIG);
420-
SqlNode sqlNode = sqlParser.parseStmt();
419+
FlinkPlannerImpl flinkPlanner = FlinkPlanner.getFlinkPlanner();
420+
SqlNode sqlNode = flinkPlanner.parse(tmpSelectSql);
421+
421422
SqlBasicCall sqlBasicCall = buildAsSqlNode(tableAlias, sqlNode);
422423
queueInfo.offer(sqlBasicCall);
423424

core/src/main/java/com/dtstack/flink/sql/side/SidePredicatesParser.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package com.dtstack.flink.sql.side;
2020

21-
import com.dtstack.flink.sql.config.CalciteConfig;
21+
import com.dtstack.flink.sql.parser.FlinkPlanner;
2222
import com.google.common.collect.Lists;
2323
import com.google.common.collect.Maps;
2424
import org.apache.calcite.sql.SqlBasicCall;
@@ -30,11 +30,10 @@
3030
import org.apache.calcite.sql.SqlOperator;
3131
import org.apache.calcite.sql.SqlSelect;
3232
import org.apache.calcite.sql.parser.SqlParseException;
33-
import org.apache.calcite.sql.parser.SqlParser;
33+
import org.apache.flink.table.calcite.FlinkPlannerImpl;
3434

3535
import java.util.List;
3636
import java.util.Map;
37-
import java.util.stream.Collectors;
3837

3938
import static org.apache.calcite.sql.SqlKind.*;
4039

@@ -47,8 +46,8 @@
4746
*/
4847
public class SidePredicatesParser {
4948
public void fillPredicatesForSideTable(String exeSql, Map<String, SideTableInfo> sideTableMap) throws SqlParseException {
50-
SqlParser sqlParser = SqlParser.create(exeSql, CalciteConfig.MYSQL_LEX_CONFIG);
51-
SqlNode sqlNode = sqlParser.parseStmt();
49+
FlinkPlannerImpl flinkPlanner = FlinkPlanner.getFlinkPlanner();
50+
SqlNode sqlNode = flinkPlanner.parse(exeSql);
5251
parseSql(sqlNode, sideTableMap, Maps.newHashMap());
5352
}
5453

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,43 +20,29 @@
2020

2121
package com.dtstack.flink.sql.side;
2222

23-
import com.dtstack.flink.sql.config.CalciteConfig;
23+
import com.dtstack.flink.sql.parser.FlinkPlanner;
2424
import com.dtstack.flink.sql.util.TableUtils;
25-
import com.google.common.base.Preconditions;
26-
import com.google.common.collect.Lists;
2725
import com.google.common.collect.Maps;
2826
import com.google.common.collect.Queues;
2927
import com.google.common.collect.Sets;
30-
import org.apache.calcite.sql.JoinType;
31-
import org.apache.calcite.sql.SqlAsOperator;
3228
import org.apache.calcite.sql.SqlBasicCall;
33-
import org.apache.calcite.sql.SqlBinaryOperator;
34-
import org.apache.calcite.sql.SqlDataTypeSpec;
3529
import org.apache.calcite.sql.SqlIdentifier;
3630
import org.apache.calcite.sql.SqlInsert;
3731
import org.apache.calcite.sql.SqlJoin;
3832
import org.apache.calcite.sql.SqlKind;
39-
import org.apache.calcite.sql.SqlLiteral;
4033
import org.apache.calcite.sql.SqlNode;
4134
import org.apache.calcite.sql.SqlNodeList;
42-
import org.apache.calcite.sql.SqlOperator;
4335
import org.apache.calcite.sql.SqlOrderBy;
4436
import org.apache.calcite.sql.SqlSelect;
4537
import org.apache.calcite.sql.SqlWith;
4638
import org.apache.calcite.sql.SqlWithItem;
47-
import org.apache.calcite.sql.fun.SqlCase;
48-
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
4939
import org.apache.calcite.sql.parser.SqlParseException;
50-
import org.apache.calcite.sql.parser.SqlParser;
51-
import org.apache.calcite.sql.parser.SqlParserPos;
52-
import org.apache.commons.collections.CollectionUtils;
53-
import org.apache.commons.lang3.StringUtils;
5440
import org.apache.flink.api.java.tuple.Tuple2;
5541
import org.apache.flink.table.api.Table;
42+
import org.apache.flink.table.calcite.FlinkPlannerImpl;
5643
import org.slf4j.Logger;
5744
import org.slf4j.LoggerFactory;
5845

59-
import java.util.List;
6046
import java.util.Map;
6147
import java.util.Queue;
6248
import java.util.Set;
@@ -82,8 +68,8 @@ public Queue<Object> getExeQueue(String exeSql, Set<String> sideTableSet) throws
8268
LOG.info(exeSql);
8369

8470
Queue<Object> queueInfo = Queues.newLinkedBlockingQueue();
85-
SqlParser sqlParser = SqlParser.create(exeSql, CalciteConfig.MYSQL_LEX_CONFIG);
86-
SqlNode sqlNode = sqlParser.parseStmt();
71+
FlinkPlannerImpl flinkPlanner = FlinkPlanner.getFlinkPlanner();
72+
SqlNode sqlNode = flinkPlanner.parse(exeSql);
8773

8874
parseSql(sqlNode, sideTableSet, queueInfo, null, null, null);
8975
queueInfo.offer(sqlNode);

0 commit comments

Comments
 (0)