|
18 | 18 |
|
19 | 19 | package com.dtstack.flink.sql.exec; |
20 | 20 |
|
| 21 | +import com.dtstack.flink.sql.parser.*; |
21 | 22 | import org.apache.flink.api.common.typeinfo.TypeInformation; |
22 | 23 | import org.apache.flink.api.java.tuple.Tuple2; |
23 | 24 | import org.apache.flink.api.java.typeutils.RowTypeInfo; |
|
27 | 28 | import org.apache.flink.table.api.Table; |
28 | 29 | import org.apache.flink.table.api.TableEnvironment; |
29 | 30 | import org.apache.flink.table.api.java.StreamTableEnvironment; |
| 31 | +import org.apache.flink.table.calcite.FlinkPlannerImpl; |
30 | 32 | import org.apache.flink.table.sinks.TableSink; |
31 | 33 | import org.apache.flink.types.Row; |
32 | 34 |
|
33 | 35 | import com.dtstack.flink.sql.classloader.ClassLoaderManager; |
34 | | -import com.dtstack.flink.sql.config.CalciteConfig; |
35 | 36 | import com.dtstack.flink.sql.enums.ClusterMode; |
36 | 37 | import com.dtstack.flink.sql.enums.ECacheType; |
37 | 38 | import com.dtstack.flink.sql.enums.EPluginLoadMode; |
|
40 | 41 | import com.dtstack.flink.sql.function.FunctionManager; |
41 | 42 | import com.dtstack.flink.sql.option.OptionParser; |
42 | 43 | import com.dtstack.flink.sql.option.Options; |
43 | | -import com.dtstack.flink.sql.parser.CreateFuncParser; |
44 | | -import com.dtstack.flink.sql.parser.CreateTmpTableParser; |
45 | | -import com.dtstack.flink.sql.parser.InsertSqlParser; |
46 | | -import com.dtstack.flink.sql.parser.SqlParser; |
47 | | -import com.dtstack.flink.sql.parser.SqlTree; |
48 | 44 | import com.dtstack.flink.sql.side.SideSqlExec; |
49 | 45 | import com.dtstack.flink.sql.side.SideTableInfo; |
50 | 46 | import com.dtstack.flink.sql.sink.StreamSinkFactory; |
@@ -210,7 +206,8 @@ private static void sqlTranslation(String localSqlPluginPath, |
210 | 206 | CreateTmpTableParser.SqlParserResult tmp = sqlTree.getTmpTableMap().get(tableName); |
211 | 207 | String realSql = DtStringUtil.replaceIgnoreQuota(result.getExecSql(), "`", ""); |
212 | 208 |
|
213 | | - SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql, CalciteConfig.MYSQL_LEX_CONFIG).parseStmt(); |
| 209 | + FlinkPlannerImpl flinkPlanner = FlinkPlanner.getFlinkPlanner(); |
| 210 | + SqlNode sqlNode = flinkPlanner.parse(realSql); |
214 | 211 | String tmpSql = ((SqlInsert) sqlNode).getSource().toString(); |
215 | 212 | tmp.setExecSql(tmpSql); |
216 | 213 | sideSqlExec.exec(tmp.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, tmp); |
|
0 commit comments