Skip to content

Commit 06f0cc1

Browse files
committed
[hotfix-32959][core] join dim table and group need retract , but window group not needed. NOTE:udaf need override retract method.
1 parent f64705e commit 06f0cc1

File tree

3 files changed

+145
-15
lines changed

3 files changed

+145
-15
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/FlinkSQLExec.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ public class FlinkSQLExec {
5656
private static final Logger LOG = LoggerFactory.getLogger(FlinkSQLExec.class);
5757

5858
public static void sqlInsert(StreamTableEnvironment tableEnv, SqlNode sqlNode, Collection<String> newRegisterTableList) throws Exception{
59-
boolean isGroupByTimeWindow = TableUtils.checkIsGroupByTimeWindow(sqlNode, newRegisterTableList);
60-
if(!isGroupByTimeWindow){
59+
boolean isGroupByTimeWindow = TableUtils.checkIsDimTableGroupBy(sqlNode, newRegisterTableList);
60+
if(isGroupByTimeWindow){
6161
QueryOperationConverter.setProducesUpdates(true);
6262
}
6363

@@ -71,8 +71,8 @@ public static void sqlInsert(StreamTableEnvironment tableEnv, String stmt, Colle
7171
FlinkPlannerImpl flinkPlanner = streamPlanner.createFlinkPlanner();
7272

7373
RichSqlInsert insert = (RichSqlInsert) flinkPlanner.validate(flinkPlanner.parser().parse(stmt));
74-
boolean isGroupByTimeWindow = TableUtils.checkIsGroupByTimeWindow(insert, newRegisterTableList);
75-
if(!isGroupByTimeWindow){
74+
boolean isGroupByTimeWindow = TableUtils.checkIsDimTableGroupBy(insert, newRegisterTableList);
75+
if(isGroupByTimeWindow){
7676
QueryOperationConverter.setProducesUpdates(true);
7777
}
7878

@@ -81,8 +81,8 @@ public static void sqlInsert(StreamTableEnvironment tableEnv, String stmt, Colle
8181
}
8282

8383
public static Table sqlQuery(StreamTableEnvironment tableEnv, SqlNode sqlNode, Collection<String> newRegisterTableList){
84-
boolean isGroupByTimeWindow = TableUtils.checkIsGroupByTimeWindow(sqlNode, newRegisterTableList);
85-
if(!isGroupByTimeWindow){
84+
boolean isGroupByTimeWindow = TableUtils.checkIsDimTableGroupBy(sqlNode, newRegisterTableList);
85+
if(isGroupByTimeWindow){
8686
QueryOperationConverter.setProducesUpdates(true);
8787
}
8888

@@ -96,8 +96,8 @@ public static void insertInto(StreamTableEnvironment tableEnv,
9696
String targetTableName,
9797
Table fromTable,
9898
Collection<String> newRegisterTableList){
99-
boolean isGroupByTimeWindow = TableUtils.checkIsGroupByTimeWindow(sqlNode, newRegisterTableList);
100-
if(!isGroupByTimeWindow){
99+
boolean isGroupByTimeWindow = TableUtils.checkIsDimTableGroupBy(sqlNode, newRegisterTableList);
100+
if(isGroupByTimeWindow){
101101
QueryOperationConverter.setProducesUpdates(true);
102102
}
103103

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.flink.table.api.Table;
5454
import org.apache.flink.table.api.java.StreamTableEnvironment;
5555
import org.apache.flink.table.dataformat.BaseRow;
56+
import org.apache.flink.table.planner.plan.QueryOperationConverter;
5657
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
5758
import org.apache.flink.table.runtime.typeutils.BigDecimalTypeInfo;
5859
import org.apache.flink.table.runtime.typeutils.LegacyLocalDateTimeTypeInfo;
@@ -147,13 +148,15 @@ public void exec(String sql,
147148
}
148149

149150
} else if (pollSqlNode.getKind() == AS) {
150-
dealAsSourceTable(tableEnv, pollSqlNode, tableCache);
151+
Collection<String> newRegisterTableList = dimTableNewTable.keySet();
152+
dealAsSourceTable(tableEnv, pollSqlNode, tableCache, newRegisterTableList);
151153

152154
} else if (pollSqlNode.getKind() == WITH_ITEM) {
153155
SqlWithItem sqlWithItem = (SqlWithItem) pollSqlNode;
154-
String TableAlias = sqlWithItem.name.toString();
155-
Table table = tableEnv.sqlQuery(sqlWithItem.query.toString());
156-
tableEnv.createTemporaryView(TableAlias, table);
156+
String tableAlias = sqlWithItem.name.toString();
157+
Collection<String> newRegisterTableList = dimTableNewTable.keySet();
158+
Table table = FlinkSQLExec.sqlQuery(tableEnv, sqlWithItem.query, newRegisterTableList);
159+
tableEnv.createTemporaryView(tableAlias, table);
157160

158161
} else if (pollSqlNode.getKind() == SELECT) {
159162
Preconditions.checkState(createView != null, "select sql must included by create view");
@@ -420,14 +423,21 @@ public List<String> getConditionFields(SqlNode conditionNode, String specifyTabl
420423

421424
protected void dealAsSourceTable(StreamTableEnvironment tableEnv,
422425
SqlNode pollSqlNode,
423-
Map<String, Table> tableCache) throws SqlParseException {
426+
Map<String, Table> tableCache,
427+
Collection<String> newRegisterTableList) throws SqlParseException {
424428

425429
AliasInfo aliasInfo = parseASNode(pollSqlNode);
426430
if (localTableCache.containsKey(aliasInfo.getName())) {
427431
return;
428432
}
429433

434+
boolean isGroupByTimeWindow = TableUtils.checkIsDimTableGroupBy(pollSqlNode, newRegisterTableList);
435+
if(isGroupByTimeWindow){
436+
QueryOperationConverter.setProducesUpdates(true);
437+
}
430438
Table table = tableEnv.sqlQuery(aliasInfo.getName());
439+
QueryOperationConverter.setProducesUpdates(false);
440+
431441
tableEnv.registerTable(aliasInfo.getAlias(), table);
432442
localTableCache.put(aliasInfo.getAlias(), table);
433443

core/src/main/java/com/dtstack/flink/sql/util/TableUtils.java

Lines changed: 122 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import com.google.common.base.Preconditions;
2727
import com.google.common.base.Strings;
2828
import com.google.common.collect.HashBasedTable;
29-
import com.google.common.collect.HashBiMap;
3029
import com.google.common.collect.Lists;
3130
import org.apache.calcite.sql.SqlAsOperator;
3231
import org.apache.calcite.sql.SqlBasicCall;
@@ -54,9 +53,45 @@
5453
import java.util.regex.Matcher;
5554
import java.util.regex.Pattern;
5655

57-
import static org.apache.calcite.sql.SqlKind.*;
56+
import static org.apache.calcite.sql.SqlKind.AGGREGATE;
57+
import static org.apache.calcite.sql.SqlKind.AND;
58+
import static org.apache.calcite.sql.SqlKind.AS;
59+
import static org.apache.calcite.sql.SqlKind.AVG_AGG_FUNCTIONS;
60+
import static org.apache.calcite.sql.SqlKind.BETWEEN;
5861
import static org.apache.calcite.sql.SqlKind.CASE;
62+
import static org.apache.calcite.sql.SqlKind.CAST;
63+
import static org.apache.calcite.sql.SqlKind.COALESCE;
64+
import static org.apache.calcite.sql.SqlKind.COMPARISON;
65+
import static org.apache.calcite.sql.SqlKind.CONTAINS;
66+
import static org.apache.calcite.sql.SqlKind.DIVIDE;
67+
import static org.apache.calcite.sql.SqlKind.EQUALS;
68+
import static org.apache.calcite.sql.SqlKind.HOP;
69+
import static org.apache.calcite.sql.SqlKind.HOP_END;
70+
import static org.apache.calcite.sql.SqlKind.HOP_START;
71+
import static org.apache.calcite.sql.SqlKind.IDENTIFIER;
72+
import static org.apache.calcite.sql.SqlKind.IS_NOT_NULL;
73+
import static org.apache.calcite.sql.SqlKind.IS_NULL;
74+
import static org.apache.calcite.sql.SqlKind.LIKE;
75+
import static org.apache.calcite.sql.SqlKind.LITERAL;
76+
import static org.apache.calcite.sql.SqlKind.LITERAL_CHAIN;
77+
import static org.apache.calcite.sql.SqlKind.MINUS;
78+
import static org.apache.calcite.sql.SqlKind.NOT_IN;
79+
import static org.apache.calcite.sql.SqlKind.OR;
5980
import static org.apache.calcite.sql.SqlKind.OTHER;
81+
import static org.apache.calcite.sql.SqlKind.OTHER_FUNCTION;
82+
import static org.apache.calcite.sql.SqlKind.PLUS;
83+
import static org.apache.calcite.sql.SqlKind.SELECT;
84+
import static org.apache.calcite.sql.SqlKind.SESSION;
85+
import static org.apache.calcite.sql.SqlKind.SESSION_END;
86+
import static org.apache.calcite.sql.SqlKind.SESSION_START;
87+
import static org.apache.calcite.sql.SqlKind.TIMES;
88+
import static org.apache.calcite.sql.SqlKind.TIMESTAMP_ADD;
89+
import static org.apache.calcite.sql.SqlKind.TIMESTAMP_DIFF;
90+
import static org.apache.calcite.sql.SqlKind.TRIM;
91+
import static org.apache.calcite.sql.SqlKind.TUMBLE;
92+
import static org.apache.calcite.sql.SqlKind.TUMBLE_END;
93+
import static org.apache.calcite.sql.SqlKind.TUMBLE_START;
94+
import static org.apache.calcite.sql.SqlKind.UNION;
6095

6196
/**
6297
* 表的解析相关
@@ -798,4 +833,89 @@ public static boolean checkIsTimeGroupByFunction(String functionName ){
798833
|| functionName.equalsIgnoreCase("hop");
799834
}
800835

836+
/**
837+
* 判断group by中是否包含维表,包含则需要撤回,不管嵌套多少层子查询只要有一层包含都需要撤回
838+
*
839+
* @param sqlNode sql语句
840+
* @param newRegisterTableList 维表集合
841+
* @return true:需要撤回,false:和原生保持一样
842+
*/
843+
public static boolean checkIsDimTableGroupBy(SqlNode sqlNode, Collection<String> newRegisterTableList) {
844+
// 维表集合为空
845+
if (newRegisterTableList == null || newRegisterTableList.size() == 0) {
846+
return false;
847+
}
848+
SqlKind sqlKind = sqlNode.getKind();
849+
switch (sqlKind) {
850+
case SELECT:
851+
SqlSelect selectNode = (SqlSelect) sqlNode;
852+
SqlNodeList groupNodeList = selectNode.getGroup();
853+
SqlNode fromNode = selectNode.getFrom();
854+
855+
// 1.(sub query) group by
856+
// 2.(sub query) as alias group by
857+
// 3.tableName group by
858+
// 4.tableName as alias group by
859+
860+
// (子查询) group by:1.(sub query) group by
861+
if (fromNode.getKind() == SELECT) {
862+
return checkIsDimTableGroupBy(fromNode, newRegisterTableList);
863+
}
864+
865+
// 表名 as 别名 group by、(子查询) as 别名 group by、表名 group by
866+
if (fromNode.getKind() == AS || fromNode.getKind() == IDENTIFIER) {
867+
SqlNode operand;
868+
// 表名 as 别名 group by:4.tableName as alias group by
869+
if (fromNode.getKind() == AS) {
870+
operand = ((SqlBasicCall) fromNode).getOperands()[0];
871+
} else {
872+
// 表名 group by:3.tableName group by
873+
operand = fromNode;
874+
}
875+
// 最里层是表名 group by,且group by字段不为空,且表名包含在维表中
876+
if (operand.getKind() == IDENTIFIER
877+
&& groupNodeList != null
878+
&& groupNodeList.size() != 0
879+
&& newRegisterTableList.contains(operand.toString())) {
880+
boolean isRetract = false;
881+
// 判断完所有的group by字段
882+
for (SqlNode node : groupNodeList.getList()) {
883+
// 判断是否有函数
884+
if (node.getKind() == OTHER_FUNCTION) {
885+
String functionName = ((SqlBasicCall) node).getOperator().toString().toLowerCase();
886+
boolean isTimeGroupByFunction = checkIsTimeGroupByFunction(functionName);
887+
// 只要有窗口就不需要撤回,直接返回
888+
if (isTimeGroupByFunction) {
889+
return false;
890+
}
891+
// 非窗口需要撤回,继续迭代后面的字段
892+
isRetract = true;
893+
} else {
894+
// 其他情况需要撤回,继续迭代后面的字段
895+
isRetract = true;
896+
}
897+
}
898+
return isRetract;
899+
} else {
900+
// (子查询) as 别名 group by:2.(sub query) as alias group by
901+
// 没有group by语句也会走进来,但是最后会返回不需要撤回
902+
return checkIsDimTableGroupBy(fromNode, newRegisterTableList);
903+
}
904+
}
905+
906+
return false;
907+
case INSERT:
908+
return checkIsDimTableGroupBy(((SqlInsert) sqlNode).getSource(), newRegisterTableList);
909+
case UNION:
910+
SqlNode unionLeft = ((SqlBasicCall) sqlNode).getOperands()[0];
911+
SqlNode unionRight = ((SqlBasicCall) sqlNode).getOperands()[1];
912+
return checkIsDimTableGroupBy(unionLeft, newRegisterTableList)
913+
|| checkIsDimTableGroupBy(unionRight, newRegisterTableList);
914+
case AS:
915+
SqlNode info = ((SqlBasicCall) sqlNode).getOperands()[0];
916+
return checkIsDimTableGroupBy(info, newRegisterTableList);
917+
default:
918+
return false;
919+
}
920+
}
801921
}

0 commit comments

Comments
 (0)