Skip to content

Commit 72b4ec2

Browse files
author
xuchao
committed
添加对重复使用维表导致的数据预期不一致问题
1 parent b59ef71 commit 72b4ec2

File tree

7 files changed

+98
-45
lines changed

7 files changed

+98
-45
lines changed

core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.api.common.JobExecutionResult;
2323
import org.apache.flink.api.java.ExecutionEnvironment;
2424
import org.apache.flink.configuration.Configuration;
25+
import org.apache.flink.configuration.RestOptions;
2526
import org.apache.flink.configuration.TaskManagerOptions;
2627
import org.apache.flink.runtime.jobgraph.JobGraph;
2728
import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -93,34 +94,48 @@ public JobExecutionResult execute(String jobName) throws Exception {
9394
// transform the streaming program into a JobGraph
9495
StreamGraph streamGraph = getStreamGraph();
9596
streamGraph.setJobName(jobName);
97+
return execute(streamGraph);
98+
}
99+
100+
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
96101

97102
JobGraph jobGraph = streamGraph.getJobGraph();
98103
jobGraph.setClasspaths(classpaths);
104+
jobGraph.setAllowQueuedScheduling(true);
99105

100106
Configuration configuration = new Configuration();
101107
configuration.addAll(jobGraph.getJobConfiguration());
102-
103-
configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "512M");
104-
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
108+
configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
105109

106110
// add (and override) the settings with what the user defined
107111
configuration.addAll(this.conf);
108112

109-
MiniClusterConfiguration.Builder configBuilder = new MiniClusterConfiguration.Builder();
110-
configBuilder.setConfiguration(configuration);
113+
if (!configuration.contains(RestOptions.BIND_PORT)) {
114+
configuration.setString(RestOptions.BIND_PORT, "0");
115+
}
116+
117+
int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
118+
119+
MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
120+
.setConfiguration(configuration)
121+
.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
122+
.build();
111123

112124
if (LOG.isInfoEnabled()) {
113125
LOG.info("Running job on local embedded Flink mini cluster");
114126
}
115127

116-
MiniCluster exec = new MiniCluster(configBuilder.build());
128+
MiniCluster miniCluster = new MiniCluster(cfg);
129+
117130
try {
118-
exec.start();
119-
return exec.executeJobBlocking(jobGraph);
131+
miniCluster.start();
132+
configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort());
133+
134+
return miniCluster.executeJobBlocking(jobGraph);
120135
}
121136
finally {
122137
transformations.clear();
123-
exec.closeAsync();
138+
miniCluster.close();
124139
}
125140
}
126141
}

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,14 +194,18 @@ private static void sqlTranslation(String localSqlPluginPath,
194194

195195
SideSqlExec sideSqlExec = new SideSqlExec();
196196
sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);
197+
198+
int scope = 0;
197199
for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) {
198-
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, result);
200+
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, result, scope + "");
201+
scope++;
199202
}
200203

201204
for (InsertSqlParser.SqlParseResult result : sqlTree.getExecSqlList()) {
202205
if (LOG.isInfoEnabled()) {
203206
LOG.info("exe-sql:\n" + result.getExecSql());
204207
}
208+
205209
boolean isSide = false;
206210
for (String tableName : result.getTargetTableList()) {
207211
if (sqlTree.getTmpTableMap().containsKey(tableName)) {
@@ -212,7 +216,7 @@ private static void sqlTranslation(String localSqlPluginPath,
212216
SqlNode sqlNode = flinkPlanner.parse(realSql);
213217
String tmpSql = ((SqlInsert) sqlNode).getSource().toString();
214218
tmp.setExecSql(tmpSql);
215-
sideSqlExec.exec(tmp.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, tmp);
219+
sideSqlExec.exec(tmp.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, tmp, scope + "");
216220
} else {
217221
for (String sourceTable : result.getSourceTableList()) {
218222
if (sideTableMap.containsKey(sourceTable)) {
@@ -222,7 +226,7 @@ private static void sqlTranslation(String localSqlPluginPath,
222226
}
223227
if (isSide) {
224228
//sql-dimensional table contains the dimension table of execution
225-
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, null);
229+
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, null, null);
226230
} else {
227231
System.out.println("----------exec sql without dimension join-----------");
228232
System.out.println("----------real sql exec is--------------------------");
@@ -233,6 +237,8 @@ private static void sqlTranslation(String localSqlPluginPath,
233237
}
234238
}
235239
}
240+
241+
scope++;
236242
}
237243
}
238244
}

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
package com.dtstack.flink.sql.side;
2222

23+
import com.dtstack.flink.sql.util.TableUtils;
2324
import com.google.common.collect.HashBasedTable;
2425
import com.google.common.collect.Maps;
2526
import org.apache.calcite.sql.JoinType;
@@ -66,6 +67,8 @@ public class JoinInfo implements Serializable {
6667

6768
private JoinType joinType;
6869

70+
private String scope = "";
71+
6972
/**
7073
* 左表需要查询的字段信息和output的时候对应的列名称
7174
*/
@@ -96,12 +99,14 @@ public String getNewTableName(){
9699
//兼容左边表是as 的情况
97100
String leftStr = leftTableName;
98101
leftStr = Strings.isNullOrEmpty(leftStr) ? leftTableAlias : leftStr;
99-
return leftStr + "_" + rightTableName;
102+
String newName = leftStr + "_" + rightTableName;
103+
return TableUtils.buildTableNameWithScope(newName, scope);
100104
}
101105

102106

103107
public String getNewTableAlias(){
104-
return leftTableAlias + "_" + rightTableAlias;
108+
String newName = leftTableAlias + "_" + rightTableAlias;
109+
return TableUtils.buildTableNameWithScope(newName, scope);
105110
}
106111

107112
public boolean isLeftIsSideTable() {
@@ -233,6 +238,14 @@ public HashBasedTable<String, String, String> getTableFieldRef(){
233238
return mappingTable;
234239
}
235240

241+
public String getScope() {
242+
return scope;
243+
}
244+
245+
public void setScope(String scope) {
246+
this.scope = scope;
247+
}
248+
236249
@Override
237250
public String toString() {
238251
return "JoinInfo{" +

core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
8989
SqlNodeList parentGroupByList,
9090
Set<Tuple2<String, String>> joinFieldSet,
9191
Map<String, String> tableRef,
92-
Map<String, String> fieldRef) {
92+
Map<String, String> fieldRef,
93+
String scope) {
9394

9495
SqlNode leftNode = joinNode.getLeft();
9596
SqlNode rightNode = joinNode.getRight();
@@ -105,13 +106,14 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
105106

106107
if (leftNode.getKind() == JOIN) {
107108
//处理连续join
108-
dealNestJoin(joinNode, sideTableSet,
109-
queueInfo, parentWhere, parentSelectList, parentGroupByList, joinFieldSet, tableRef, fieldRef);
109+
dealNestJoin(joinNode, sideTableSet, queueInfo, parentWhere, parentSelectList,
110+
parentGroupByList, joinFieldSet, tableRef, fieldRef, scope);
110111
leftNode = joinNode.getLeft();
111112
}
112113

113114
if (leftNode.getKind() == AS) {
114-
AliasInfo aliasInfo = (AliasInfo) sideSQLParser.parseSql(leftNode, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList);
115+
AliasInfo aliasInfo = (AliasInfo) sideSQLParser.parseSql(leftNode, sideTableSet, queueInfo,
116+
parentWhere, parentSelectList, parentGroupByList, scope);
115117
leftTbName = aliasInfo.getName();
116118
leftTbAlias = aliasInfo.getAlias();
117119
} else if(leftNode.getKind() == IDENTIFIER){
@@ -122,7 +124,8 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
122124
boolean leftIsSide = checkIsSideTable(leftTbName, sideTableSet);
123125
Preconditions.checkState(!leftIsSide, "side-table must be at the right of join operator");
124126

125-
Tuple2<String, String> rightTableNameAndAlias = parseRightNode(rightNode, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList);
127+
Tuple2<String, String> rightTableNameAndAlias = parseRightNode(rightNode, sideTableSet, queueInfo,
128+
parentWhere, parentSelectList, parentGroupByList, scope);
126129
rightTableName = rightTableNameAndAlias.f0;
127130
rightTableAlias = rightTableNameAndAlias.f1;
128131

@@ -145,6 +148,8 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
145148
tableInfo.setRightNode(rightNode);
146149
tableInfo.setJoinType(joinType);
147150
tableInfo.setCondition(joinNode.getCondition());
151+
tableInfo.setScope(scope);
152+
148153
TableUtils.replaceJoinFieldRefTableName(joinNode.getCondition(), fieldRef);
149154

150155
//extract 需要查询的字段信息
@@ -255,16 +260,21 @@ private JoinInfo dealNestJoin(SqlJoin joinNode,
255260
SqlNodeList parentGroupByList,
256261
Set<Tuple2<String, String>> joinFieldSet,
257262
Map<String, String> tableRef,
258-
Map<String, String> fieldRef){
263+
Map<String, String> fieldRef,
264+
String scope){
259265

260266
SqlJoin leftJoinNode = (SqlJoin) joinNode.getLeft();
261267
SqlNode parentRightJoinNode = joinNode.getRight();
262268
SqlNode rightNode = leftJoinNode.getRight();
263-
Tuple2<String, String> rightTableNameAndAlias = parseRightNode(rightNode, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList);
264-
Tuple2<String, String> parentRightJoinInfo = parseRightNode(parentRightJoinNode, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList);
269+
270+
Tuple2<String, String> rightTableNameAndAlias = parseRightNode(rightNode, sideTableSet, queueInfo,
271+
parentWhere, parentSelectList, parentGroupByList, scope);
272+
Tuple2<String, String> parentRightJoinInfo = parseRightNode(parentRightJoinNode, sideTableSet,
273+
queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
265274
boolean parentRightIsSide = checkIsSideTable(parentRightJoinInfo.f0, sideTableSet);
266275

267-
JoinInfo joinInfo = dealJoinNode(leftJoinNode, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, joinFieldSet, tableRef, fieldRef);
276+
JoinInfo joinInfo = dealJoinNode(leftJoinNode, sideTableSet, queueInfo, parentWhere, parentSelectList,
277+
parentGroupByList, joinFieldSet, tableRef, fieldRef, scope);
268278

269279
String rightTableName = rightTableNameAndAlias.f0;
270280
boolean rightIsSide = checkIsSideTable(rightTableName, sideTableSet);
@@ -616,12 +626,13 @@ private void extractSelectField(SqlNode selectNode,
616626

617627

618628
private Tuple2<String, String> parseRightNode(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object> queueInfo,
619-
SqlNode parentWhere, SqlNodeList selectList, SqlNodeList parentGroupByList) {
629+
SqlNode parentWhere, SqlNodeList selectList, SqlNodeList parentGroupByList,
630+
String scope) {
620631
Tuple2<String, String> tabName = new Tuple2<>("", "");
621632
if(sqlNode.getKind() == IDENTIFIER){
622633
tabName.f0 = sqlNode.toString();
623634
}else{
624-
AliasInfo aliasInfo = (AliasInfo)sideSQLParser.parseSql(sqlNode, sideTableSet, queueInfo, parentWhere, selectList, parentGroupByList);
635+
AliasInfo aliasInfo = (AliasInfo)sideSQLParser.parseSql(sqlNode, sideTableSet, queueInfo, parentWhere, selectList, parentGroupByList, scope);
625636
tabName.f0 = aliasInfo.getName();
626637
tabName.f1 = aliasInfo.getAlias();
627638
}

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class SideSQLParser {
6161

6262
private Map<String, Table> localTableCache = Maps.newHashMap();
6363

64-
public Queue<Object> getExeQueue(String exeSql, Set<String> sideTableSet) throws SqlParseException {
64+
public Queue<Object> getExeQueue(String exeSql, Set<String> sideTableSet, String scope) throws SqlParseException {
6565
System.out.println("----------exec original Sql----------");
6666
System.out.println(exeSql);
6767
LOG.info("----------exec original Sql----------");
@@ -71,7 +71,7 @@ public Queue<Object> getExeQueue(String exeSql, Set<String> sideTableSet) throws
7171
FlinkPlannerImpl flinkPlanner = FlinkPlanner.getFlinkPlanner();
7272
SqlNode sqlNode = flinkPlanner.parse(exeSql);
7373

74-
parseSql(sqlNode, sideTableSet, queueInfo, null, null, null);
74+
parseSql(sqlNode, sideTableSet, queueInfo, null, null, null, scope);
7575
queueInfo.offer(sqlNode);
7676
return queueInfo;
7777
}
@@ -91,31 +91,32 @@ public Object parseSql(SqlNode sqlNode,
9191
Queue<Object> queueInfo,
9292
SqlNode parentWhere,
9393
SqlNodeList parentSelectList,
94-
SqlNodeList parentGroupByList){
94+
SqlNodeList parentGroupByList,
95+
String scope){
9596
SqlKind sqlKind = sqlNode.getKind();
9697
switch (sqlKind){
9798
case WITH: {
9899
SqlWith sqlWith = (SqlWith) sqlNode;
99100
SqlNodeList sqlNodeList = sqlWith.withList;
100101
for (SqlNode withAsTable : sqlNodeList) {
101102
SqlWithItem sqlWithItem = (SqlWithItem) withAsTable;
102-
parseSql(sqlWithItem.query, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList);
103+
parseSql(sqlWithItem.query, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
103104
queueInfo.add(sqlWithItem);
104105
}
105-
parseSql(sqlWith.body, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList);
106+
parseSql(sqlWith.body, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
106107
break;
107108
}
108109
case INSERT:
109110
SqlNode sqlSource = ((SqlInsert)sqlNode).getSource();
110-
return parseSql(sqlSource, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList);
111+
return parseSql(sqlSource, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
111112
case SELECT:
112113
SqlNode sqlFrom = ((SqlSelect)sqlNode).getFrom();
113114
SqlNode sqlWhere = ((SqlSelect)sqlNode).getWhere();
114115
SqlNodeList selectList = ((SqlSelect)sqlNode).getSelectList();
115116
SqlNodeList groupByList = ((SqlSelect) sqlNode).getGroup();
116117

117118
if(sqlFrom.getKind() != IDENTIFIER){
118-
Object result = parseSql(sqlFrom, sideTableSet, queueInfo, sqlWhere, selectList, groupByList);
119+
Object result = parseSql(sqlFrom, sideTableSet, queueInfo, sqlWhere, selectList, groupByList, scope);
119120
if(result instanceof JoinInfo){
120121
return TableUtils.dealSelectResultWithJoinInfo((JoinInfo) result, (SqlSelect) sqlNode, queueInfo);
121122
}else if(result instanceof AliasInfo){
@@ -137,7 +138,7 @@ public Object parseSql(SqlNode sqlNode,
137138
Map<String, String> tableRef = Maps.newHashMap();
138139
Map<String, String> fieldRef = Maps.newHashMap();
139140
return joinNodeDealer.dealJoinNode((SqlJoin) sqlNode, sideTableSet, queueInfo,
140-
parentWhere, parentSelectList, parentGroupByList, joinFieldSet, tableRef, fieldRef);
141+
parentWhere, parentSelectList, parentGroupByList, joinFieldSet, tableRef, fieldRef, scope);
141142
case AS:
142143
SqlNode info = ((SqlBasicCall)sqlNode).getOperands()[0];
143144
SqlNode alias = ((SqlBasicCall) sqlNode).getOperands()[1];
@@ -146,7 +147,7 @@ public Object parseSql(SqlNode sqlNode,
146147
if(info.getKind() == IDENTIFIER){
147148
infoStr = info.toString();
148149
} else {
149-
infoStr = parseSql(info, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList).toString();
150+
infoStr = parseSql(info, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope).toString();
150151
}
151152

152153
AliasInfo aliasInfo = new AliasInfo();
@@ -159,12 +160,12 @@ public Object parseSql(SqlNode sqlNode,
159160
SqlNode unionLeft = ((SqlBasicCall)sqlNode).getOperands()[0];
160161
SqlNode unionRight = ((SqlBasicCall)sqlNode).getOperands()[1];
161162

162-
parseSql(unionLeft, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList);
163-
parseSql(unionRight, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList);
163+
parseSql(unionLeft, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
164+
parseSql(unionRight, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
164165
break;
165166
case ORDER_BY:
166167
SqlOrderBy sqlOrderBy = (SqlOrderBy) sqlNode;
167-
parseSql(sqlOrderBy.query, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList);
168+
parseSql(sqlOrderBy.query, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
168169

169170
case LITERAL:
170171
return LITERAL.toString();

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ public void exec(String sql,
101101
StreamTableEnvironment tableEnv,
102102
Map<String, Table> tableCache,
103103
StreamQueryConfig queryConfig,
104-
CreateTmpTableParser.SqlParserResult createView) throws Exception {
104+
CreateTmpTableParser.SqlParserResult createView,
105+
String scope) throws Exception {
105106
if(localSqlPluginPath == null){
106107
throw new RuntimeException("need to set localSqlPluginPath");
107108
}
@@ -121,7 +122,7 @@ public void exec(String sql,
121122

122123
SideSQLParser sideSQLParser = new SideSQLParser();
123124
sideSQLParser.setLocalTableCache(localTableCache);
124-
Queue<Object> exeQueue = sideSQLParser.getExeQueue(sql, sideTableMap.keySet());
125+
Queue<Object> exeQueue = sideSQLParser.getExeQueue(sql, sideTableMap.keySet(), scope);
125126
Object pollObj = null;
126127

127128
while((pollObj = exeQueue.poll()) != null){
@@ -452,9 +453,9 @@ private void joinFun(Object pollObj,
452453
replaceInfo.setTargetTableName(targetTableName);
453454
replaceInfo.setTargetTableAlias(targetTableAlias);
454455

455-
if (!tableEnv.isRegistered(joinInfo.getNewTableName())){
456+
if (!tableEnv.isRegistered(targetTableName)){
456457
Table joinTable = tableEnv.fromDataStream(dsOut);
457-
tableEnv.registerTable(joinInfo.getNewTableName(), joinTable);
458+
tableEnv.registerTable(targetTableName, joinTable);
458459
localTableCache.put(joinInfo.getNewTableName(), joinTable);
459460
}
460461
}

0 commit comments

Comments
 (0)