Skip to content

Commit 912b0ba

Browse files
committed
Add with support
1 parent 8e570ff commit 912b0ba

File tree

2 files changed

+20
-0
lines changed

2 files changed

+20
-0
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,11 @@
2929
import org.apache.calcite.sql.SqlJoin;
3030
import org.apache.calcite.sql.SqlKind;
3131
import org.apache.calcite.sql.SqlNode;
32+
import org.apache.calcite.sql.SqlNodeList;
3233
import org.apache.calcite.sql.SqlOperator;
3334
import org.apache.calcite.sql.SqlSelect;
35+
import org.apache.calcite.sql.SqlWith;
36+
import org.apache.calcite.sql.SqlWithItem;
3437
import org.apache.calcite.sql.parser.SqlParseException;
3538
import org.apache.calcite.sql.parser.SqlParser;
3639
import org.apache.calcite.sql.parser.SqlParserPos;
@@ -66,6 +69,17 @@ public Queue<Object> getExeQueue(String exeSql, Set<String> sideTableSet) throws
6669
private Object parseSql(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object> queueInfo){
6770
SqlKind sqlKind = sqlNode.getKind();
6871
switch (sqlKind){
72+
case WITH: {
73+
SqlWith sqlWith = (SqlWith) sqlNode;
74+
SqlNodeList sqlNodeList = sqlWith.withList;
75+
for (SqlNode withAsTable : sqlNodeList) {
76+
SqlWithItem sqlWithItem = (SqlWithItem) withAsTable;
77+
parseSql(sqlWithItem.query, sideTableSet, queueInfo);
78+
queueInfo.add(sqlWithItem);
79+
}
80+
parseSql(sqlWith.body, sideTableSet, queueInfo);
81+
break;
82+
}
6983
case INSERT:
7084
SqlNode sqlSource = ((SqlInsert)sqlNode).getSource();
7185
return parseSql(sqlSource, sideTableSet, queueInfo);

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.calcite.sql.SqlNode;
3434
import org.apache.calcite.sql.SqlNodeList;
3535
import org.apache.calcite.sql.SqlSelect;
36+
import org.apache.calcite.sql.SqlWithItem;
3637
import org.apache.calcite.sql.fun.SqlCase;
3738
import org.apache.calcite.sql.parser.SqlParseException;
3839
import org.apache.calcite.sql.parser.SqlParserPos;
@@ -101,6 +102,11 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
101102
Table table = tableEnv.sql(aliasInfo.getName());
102103
tableEnv.registerTable(aliasInfo.getAlias(), table);
103104
localTableCache.put(aliasInfo.getAlias(), table);
105+
} else if (pollSqlNode.getKind() == WITH_ITEM) {
106+
SqlWithItem sqlWithItem = (SqlWithItem) pollSqlNode;
107+
String TableAlias = sqlWithItem.name.toString();
108+
Table table = tableEnv.sqlQuery(sqlWithItem.query.toString());
109+
tableEnv.registerTable(TableAlias, table);
104110
}
105111

106112
}else if (pollObj instanceof JoinInfo){

0 commit comments

Comments
 (0)