Skip to content

Commit a59e859

Browse files
author
xuchao
committed
[tmp][core]修改对维表回测流对支持
1 parent 4f189b6 commit a59e859

File tree

5 files changed

+140
-662
lines changed

5 files changed

+140
-662
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,8 @@ public static List<URL> getExternalJarUrls(String addJarListStr) throws java.io.
202202
private static void sqlTranslation(String localSqlPluginPath,
203203
String pluginLoadMode,
204204
StreamTableEnvironment tableEnv,
205-
SqlTree sqlTree,Map<String, AbstractSideTableInfo> sideTableMap,
205+
SqlTree sqlTree,
206+
Map<String, AbstractSideTableInfo> sideTableMap,
206207
Map<String, Table> registerTableCache) throws Exception {
207208

208209
SideSqlExec sideSqlExec = new SideSqlExec();
@@ -242,7 +243,8 @@ private static void sqlTranslation(String localSqlPluginPath,
242243
} else {
243244
LOG.info("----------exec sql without dimension join-----------");
244245
LOG.info("----------real sql exec is--------------------------\n{}", result.getExecSql());
245-
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql());
246+
247+
FlinkSQLExec.sqlInsert(tableEnv, result.getExecSql(), SideSqlExec.getDimTableNewTable().keySet() );
246248
if (LOG.isInfoEnabled()) {
247249
LOG.info("exec sql: " + result.getExecSql());
248250
}

core/src/main/java/com/dtstack/flink/sql/exec/FlinkSQLExec.java

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
package com.dtstack.flink.sql.exec;
2020

21+
import com.dtstack.flink.sql.util.TableUtils;
2122
import org.apache.calcite.sql.SqlIdentifier;
23+
import org.apache.calcite.sql.SqlNode;
2224
import org.apache.flink.sql.parser.dml.RichSqlInsert;
2325
import org.apache.flink.table.api.Table;
2426
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
@@ -32,6 +34,7 @@
3234
import org.apache.flink.table.planner.delegation.PlannerBase;
3335
import org.apache.flink.table.planner.delegation.StreamPlanner;
3436
import org.apache.flink.table.planner.operations.SqlToOperationConverter;
37+
import org.apache.flink.table.planner.plan.QueryOperationConverter;
3538
import org.apache.flink.table.sinks.TableSink;
3639
import org.slf4j.Logger;
3740
import org.slf4j.LoggerFactory;
@@ -41,6 +44,7 @@
4144
import java.lang.reflect.InvocationTargetException;
4245
import java.lang.reflect.Method;
4346
import java.util.Arrays;
47+
import java.util.Collection;
4448

4549

4650
/**
@@ -51,14 +55,62 @@
5155
public class FlinkSQLExec {
5256
private static final Logger LOG = LoggerFactory.getLogger(FlinkSQLExec.class);
5357

54-
public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throws Exception {
58+
public static void sqlInsert(StreamTableEnvironment tableEnv, SqlNode sqlNode, Collection<String> newRegisterTableList) throws Exception{
59+
boolean isGroupByTimeWindow = TableUtils.checkIsGroupByTimeWindow(sqlNode, newRegisterTableList);
60+
if(!isGroupByTimeWindow){
61+
QueryOperationConverter.setProducesUpdates(true);
62+
}
63+
64+
sqlInsert(tableEnv, (RichSqlInsert) sqlNode);
65+
QueryOperationConverter.setProducesUpdates(false);
66+
}
67+
68+
public static void sqlInsert(StreamTableEnvironment tableEnv, String stmt, Collection<String> newRegisterTableList) throws Exception{
5569
StreamTableEnvironmentImpl tableEnvImpl = ((StreamTableEnvironmentImpl) tableEnv);
5670
StreamPlanner streamPlanner = (StreamPlanner) tableEnvImpl.getPlanner();
5771
FlinkPlannerImpl flinkPlanner = streamPlanner.createFlinkPlanner();
5872

5973
RichSqlInsert insert = (RichSqlInsert) flinkPlanner.validate(flinkPlanner.parser().parse(stmt));
60-
TableImpl queryResult = extractQueryTableFromInsertCaluse(tableEnvImpl, flinkPlanner, insert);
74+
boolean isGroupByTimeWindow = TableUtils.checkIsGroupByTimeWindow(insert, newRegisterTableList);
75+
if(!isGroupByTimeWindow){
76+
QueryOperationConverter.setProducesUpdates(true);
77+
}
6178

79+
sqlInsert(tableEnv, insert);
80+
QueryOperationConverter.setProducesUpdates(false);
81+
}
82+
83+
public static Table sqlQuery(StreamTableEnvironment tableEnv, SqlNode sqlNode, Collection<String> newRegisterTableList){
84+
boolean isGroupByTimeWindow = TableUtils.checkIsGroupByTimeWindow(sqlNode, newRegisterTableList);
85+
if(!isGroupByTimeWindow){
86+
QueryOperationConverter.setProducesUpdates(true);
87+
}
88+
89+
Table resultTable = tableEnv.sqlQuery(sqlNode.toString());
90+
QueryOperationConverter.setProducesUpdates(false);
91+
return resultTable;
92+
}
93+
94+
public static void insertInto(StreamTableEnvironment tableEnv,
95+
SqlNode sqlNode,
96+
String targetTableName,
97+
Table fromTable,
98+
Collection<String> newRegisterTableList){
99+
boolean isGroupByTimeWindow = TableUtils.checkIsGroupByTimeWindow(sqlNode, newRegisterTableList);
100+
if(!isGroupByTimeWindow){
101+
QueryOperationConverter.setProducesUpdates(true);
102+
}
103+
104+
tableEnv.insertInto(targetTableName, fromTable);
105+
QueryOperationConverter.setProducesUpdates(false);
106+
}
107+
108+
public static void sqlInsert(StreamTableEnvironment tableEnv, RichSqlInsert insert) throws Exception {
109+
StreamTableEnvironmentImpl tableEnvImpl = ((StreamTableEnvironmentImpl) tableEnv);
110+
StreamPlanner streamPlanner = (StreamPlanner) tableEnvImpl.getPlanner();
111+
FlinkPlannerImpl flinkPlanner = streamPlanner.createFlinkPlanner();
112+
113+
TableImpl queryResult = extractQueryTableFromInsertCaluse(tableEnvImpl, flinkPlanner, insert);
62114
String targetTableName = ((SqlIdentifier) insert.getTargetTable()).names.get(0);
63115
TableSink tableSink = getTableSinkByPlanner(streamPlanner, targetTableName);
64116

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import java.time.LocalDateTime;
6969
import java.util.ArrayList;
7070
import java.util.Arrays;
71+
import java.util.Collection;
7172
import java.util.LinkedList;
7273
import java.util.List;
7374
import java.util.Map;
@@ -101,6 +102,9 @@ public class SideSqlExec {
101102

102103
private Map<String, Table> localTableCache = Maps.newHashMap();
103104

105+
//维表重新注册之后的名字缓存
106+
private static Map<String, Table> dimTableNewTable = Maps.newHashMap();
107+
104108
public void exec(String sql,
105109
Map<String, AbstractSideTableInfo> sideTableMap,
106110
StreamTableEnvironment tableEnv,
@@ -136,7 +140,8 @@ public void exec(String sql,
136140

137141

138142
if (pollSqlNode.getKind() == INSERT) {
139-
FlinkSQLExec.sqlUpdate(tableEnv, pollSqlNode.toString());
143+
Collection<String> newRegisterTableList = dimTableNewTable.keySet();
144+
FlinkSQLExec.sqlInsert(tableEnv, pollSqlNode, newRegisterTableList);
140145
if (LOG.isInfoEnabled()) {
141146
LOG.info("----------real exec sql-----------\n{}", pollSqlNode.toString());
142147
}
@@ -148,18 +153,19 @@ public void exec(String sql,
148153
SqlWithItem sqlWithItem = (SqlWithItem) pollSqlNode;
149154
String TableAlias = sqlWithItem.name.toString();
150155
Table table = tableEnv.sqlQuery(sqlWithItem.query.toString());
151-
tableEnv.registerTable(TableAlias, table);
156+
tableEnv.createTemporaryView(TableAlias, table);
152157

153158
} else if (pollSqlNode.getKind() == SELECT) {
154159
Preconditions.checkState(createView != null, "select sql must included by create view");
155-
Table table = tableEnv.sqlQuery(pollObj.toString());
160+
Collection<String> newRegisterTableList = dimTableNewTable.keySet();
161+
Table table = FlinkSQLExec.sqlQuery(tableEnv, pollSqlNode, newRegisterTableList);
156162

157163
if (createView.getFieldsInfoStr() == null) {
158164
tableEnv.registerTable(createView.getTableName(), table);
159165
} else {
160166
if (checkFieldsInfo(createView, table)) {
161167
table = table.as(tmpFields);
162-
tableEnv.registerTable(createView.getTableName(), table);
168+
tableEnv.createTemporaryView(createView.getTableName(), table);
163169
} else {
164170
throw new RuntimeException("Fields mismatch");
165171
}
@@ -170,7 +176,7 @@ public void exec(String sql,
170176

171177
} else if (pollObj instanceof JoinInfo) {
172178
LOG.info("----------exec join info----------\n{}", pollObj.toString());
173-
joinFun(pollObj, localTableCache, sideTableMap, tableEnv);
179+
joinFun(pollObj, localTableCache, dimTableNewTable,sideTableMap, tableEnv);
174180
}
175181
}
176182

@@ -441,6 +447,7 @@ protected void dealAsSourceTable(StreamTableEnvironment tableEnv,
441447

442448
private void joinFun(Object pollObj,
443449
Map<String, Table> localTableCache,
450+
Map<String, Table> dimTableNewTable,
444451
Map<String, AbstractSideTableInfo> sideTableMap,
445452
StreamTableEnvironment tableEnv) throws Exception {
446453
JoinInfo joinInfo = (JoinInfo) pollObj;
@@ -525,7 +532,6 @@ private void joinFun(Object pollObj,
525532
RowTypeInfo typeInfo = new RowTypeInfo(fieldDataTypes, targetTable.getSchema().getFieldNames());
526533

527534
DataStream<BaseRow> adaptStream = tableEnv.toRetractStream(targetTable, typeInfo)
528-
.filter(f -> f.f0)
529535
.map(f -> RowDataConvert.convertToBaseRow(f));
530536

531537
//join side table before keyby ===> Reducing the size of each dimension table cache of async
@@ -565,6 +571,7 @@ private void joinFun(Object pollObj,
565571
Table joinTable = tableEnv.fromDataStream(dsOut);
566572
tableEnv.createTemporaryView(targetTableName, joinTable);
567573
localTableCache.put(joinInfo.getNewTableName(), joinTable);
574+
dimTableNewTable.put(joinInfo.getNewTableName(), joinTable);
568575
}
569576
}
570577

@@ -593,4 +600,8 @@ private boolean checkFieldsInfo(CreateTmpTableParser.SqlParserResult result, Tab
593600
return true;
594601
}
595602

603+
public static Map<String, Table> getDimTableNewTable(){
604+
return dimTableNewTable;
605+
}
606+
596607
}

core/src/main/java/com/dtstack/flink/sql/util/TableUtils.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.calcite.sql.SqlBasicCall;
3333
import org.apache.calcite.sql.SqlDataTypeSpec;
3434
import org.apache.calcite.sql.SqlIdentifier;
35+
import org.apache.calcite.sql.SqlInsert;
3536
import org.apache.calcite.sql.SqlJoin;
3637
import org.apache.calcite.sql.SqlKind;
3738
import org.apache.calcite.sql.SqlLiteral;
@@ -45,6 +46,7 @@
4546
import org.apache.commons.lang3.StringUtils;
4647
import org.apache.flink.table.api.Table;
4748

49+
import java.util.Collection;
4850
import java.util.List;
4951
import java.util.Map;
5052
import java.util.Queue;
@@ -733,4 +735,67 @@ public static String buildTableNameWithScope(String leftTableName, String leftTa
733735
return TableUtils.buildTableNameWithScope(newName, scope) + "_" + System.currentTimeMillis();
734736
}
735737

738+
/**
739+
* 判断目标查询是否是基于新构建出来的表的窗口group by
740+
* @param sqlNode
741+
* @param newRegisterTableList
742+
* @return
743+
*/
744+
public static boolean checkIsGroupByTimeWindow(SqlNode sqlNode, Collection<String> newRegisterTableList) {
745+
SqlKind sqlKind = sqlNode.getKind();
746+
switch (sqlKind) {
747+
case SELECT:
748+
SqlSelect selectNode = (SqlSelect) sqlNode;
749+
SqlNodeList groupNodeList = selectNode.getGroup();
750+
if ( groupNodeList == null || groupNodeList.size() == 0) {
751+
return false;
752+
}
753+
754+
SqlNode fromNode = selectNode.getFrom();
755+
if (fromNode.getKind() != IDENTIFIER
756+
&& fromNode.getKind() != AS) {
757+
return false;
758+
}
759+
760+
if(selectNode.getFrom().getKind() == AS){
761+
SqlNode asNode = ((SqlBasicCall) selectNode.getFrom()).getOperands()[0];
762+
if(asNode.getKind() != IDENTIFIER){
763+
return false;
764+
}
765+
766+
fromNode = asNode;
767+
}
768+
769+
String tableName = fromNode.toString();
770+
for (SqlNode node : groupNodeList.getList()) {
771+
if (node.getKind() == OTHER_FUNCTION) {
772+
String functionName = ((SqlBasicCall) node).getOperator().toString().toLowerCase();
773+
boolean isTimeGroupByFunction = checkIsTimeGroupByFunction(functionName);
774+
if(isTimeGroupByFunction && newRegisterTableList.contains(tableName)){
775+
return true;
776+
}
777+
}
778+
}
779+
780+
return false;
781+
case INSERT:
782+
return checkIsGroupByTimeWindow(((SqlInsert) sqlNode).getSource(), newRegisterTableList);
783+
case UNION:
784+
SqlNode unionLeft = ((SqlBasicCall) sqlNode).getOperands()[0];
785+
SqlNode unionRight = ((SqlBasicCall) sqlNode).getOperands()[1];
786+
return checkIsGroupByTimeWindow(unionLeft, newRegisterTableList)
787+
|| checkIsGroupByTimeWindow(unionRight, newRegisterTableList);
788+
789+
default:
790+
return false;
791+
792+
}
793+
}
794+
795+
public static boolean checkIsTimeGroupByFunction(String functionName ){
796+
return functionName.equalsIgnoreCase("tumble")
797+
|| functionName.equalsIgnoreCase("session")
798+
|| functionName.equalsIgnoreCase("hop");
799+
}
800+
736801
}

0 commit comments

Comments
 (0)