Skip to content

Commit f5cd850

Browse files
authored
Merge pull request #15 from harbby/master
Add `with` support
2 parents abbd32c + 912b0ba commit f5cd850

File tree

2 files changed

+20
-0
lines changed

2 files changed

+20
-0
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,12 @@
2929
import org.apache.calcite.sql.SqlJoin;
3030
import org.apache.calcite.sql.SqlKind;
3131
import org.apache.calcite.sql.SqlNode;
32+
import org.apache.calcite.sql.SqlNodeList;
3233
import org.apache.calcite.sql.SqlOperator;
3334
import org.apache.calcite.sql.SqlOrderBy;
3435
import org.apache.calcite.sql.SqlSelect;
36+
import org.apache.calcite.sql.SqlWith;
37+
import org.apache.calcite.sql.SqlWithItem;
3538
import org.apache.calcite.sql.parser.SqlParseException;
3639
import org.apache.calcite.sql.parser.SqlParser;
3740
import org.apache.calcite.sql.parser.SqlParserPos;
@@ -70,6 +73,17 @@ public Queue<Object> getExeQueue(String exeSql, Set<String> sideTableSet) throws
7073
private Object parseSql(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object> queueInfo){
7174
SqlKind sqlKind = sqlNode.getKind();
7275
switch (sqlKind){
76+
case WITH: {
77+
SqlWith sqlWith = (SqlWith) sqlNode;
78+
SqlNodeList sqlNodeList = sqlWith.withList;
79+
for (SqlNode withAsTable : sqlNodeList) {
80+
SqlWithItem sqlWithItem = (SqlWithItem) withAsTable;
81+
parseSql(sqlWithItem.query, sideTableSet, queueInfo);
82+
queueInfo.add(sqlWithItem);
83+
}
84+
parseSql(sqlWith.body, sideTableSet, queueInfo);
85+
break;
86+
}
7387
case INSERT:
7488
SqlNode sqlSource = ((SqlInsert)sqlNode).getSource();
7589
return parseSql(sqlSource, sideTableSet, queueInfo);

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.calcite.sql.SqlOperator;
4040
import org.apache.calcite.sql.SqlOrderBy;
4141
import org.apache.calcite.sql.SqlSelect;
42+
import org.apache.calcite.sql.SqlWithItem;
4243
import org.apache.calcite.sql.fun.SqlCase;
4344
import org.apache.calcite.sql.parser.SqlParseException;
4445
import org.apache.calcite.sql.parser.SqlParserPos;
@@ -120,6 +121,11 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
120121
Table table = tableEnv.sqlQuery(aliasInfo.getName());
121122
tableEnv.registerTable(aliasInfo.getAlias(), table);
122123
localTableCache.put(aliasInfo.getAlias(), table);
124+
} else if (pollSqlNode.getKind() == WITH_ITEM) {
125+
SqlWithItem sqlWithItem = (SqlWithItem) pollSqlNode;
126+
String TableAlias = sqlWithItem.name.toString();
127+
Table table = tableEnv.sqlQuery(sqlWithItem.query.toString());
128+
tableEnv.registerTable(TableAlias, table);
123129
}
124130

125131
}else if (pollObj instanceof JoinInfo){

0 commit comments

Comments
 (0)