Skip to content

Commit fd6fa5f

Browse files
author
dapeng
committed
fix join重复tablename不注册的问题
1 parent b929fcc commit fd6fa5f

File tree

4 files changed

+49
-27
lines changed

4 files changed

+49
-27
lines changed

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ public class JoinInfo implements Serializable {
6969

7070
private String scope = "";
7171

72+
private String newTableName = null;
73+
7274
/**
7375
* 左表需要查询的字段信息和output的时候对应的列名称
7476
*/
@@ -96,13 +98,12 @@ public String getNonSideTable(){
9698
}
9799

98100
public String getNewTableName(){
99-
//兼容左边表是as 的情况
100-
String leftStr = leftTableName;
101-
leftStr = Strings.isNullOrEmpty(leftStr) ? leftTableAlias : leftStr;
102-
String newName = leftStr + "_" + rightTableName;
103-
return TableUtils.buildTableNameWithScope(newName, scope);
101+
return this.newTableName;
104102
}
105103

104+
public void setNewTableName(String newTableName){
105+
this.newTableName = newTableName;
106+
}
106107

107108
public String getNewTableAlias(){
108109
String newName = leftTableAlias + "_" + rightTableAlias;

core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
9191
Set<Tuple2<String, String>> joinFieldSet,
9292
Map<String, String> tableRef,
9393
Map<String, String> fieldRef,
94-
String scope) {
94+
String scope,
95+
Set<String> joinTableNames) {
9596

9697
SqlNode leftNode = joinNode.getLeft();
9798
SqlNode rightNode = joinNode.getRight();
@@ -108,13 +109,13 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
108109
if (leftNode.getKind() == JOIN) {
109110
//处理连续join
110111
dealNestJoin(joinNode, sideTableSet, queueInfo, parentWhere, parentSelectList,
111-
parentGroupByList, joinFieldSet, tableRef, fieldRef, scope);
112+
parentGroupByList, joinFieldSet, tableRef, fieldRef, scope, joinTableNames);
112113
leftNode = joinNode.getLeft();
113114
}
114115

115116
if (leftNode.getKind() == AS) {
116117
AliasInfo aliasInfo = (AliasInfo) sideSQLParser.parseSql(leftNode, sideTableSet, queueInfo,
117-
parentWhere, parentSelectList, parentGroupByList, scope);
118+
parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
118119
leftTbName = aliasInfo.getName();
119120
leftTbAlias = aliasInfo.getAlias();
120121
} else if(leftNode.getKind() == IDENTIFIER){
@@ -126,7 +127,7 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
126127
Preconditions.checkState(!leftIsSide, "side-table must be at the right of join operator");
127128

128129
Tuple2<String, String> rightTableNameAndAlias = parseRightNode(rightNode, sideTableSet, queueInfo,
129-
parentWhere, parentSelectList, parentGroupByList, scope);
130+
parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
130131
rightTableName = rightTableNameAndAlias.f0;
131132
rightTableAlias = rightTableNameAndAlias.f1;
132133

@@ -150,7 +151,8 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
150151
tableInfo.setJoinType(joinType);
151152
tableInfo.setCondition(joinNode.getCondition());
152153
tableInfo.setScope(scope);
153-
154+
tableInfo.setNewTableName(TableUtils.buildTableNameWithScope(leftTbName, leftTbAlias, rightTableName, scope, joinTableNames));
155+
joinTableNames.add(tableInfo.getNewTableName());
154156
TableUtils.replaceJoinFieldRefTableName(joinNode.getCondition(), fieldRef);
155157

156158
//extract 需要查询的字段信息
@@ -262,20 +264,21 @@ private JoinInfo dealNestJoin(SqlJoin joinNode,
262264
Set<Tuple2<String, String>> joinFieldSet,
263265
Map<String, String> tableRef,
264266
Map<String, String> fieldRef,
265-
String scope){
267+
String scope,
268+
Set<String> joinTableNames){
266269

267270
SqlJoin leftJoinNode = (SqlJoin) joinNode.getLeft();
268271
SqlNode parentRightJoinNode = joinNode.getRight();
269272
SqlNode rightNode = leftJoinNode.getRight();
270273

271274
Tuple2<String, String> rightTableNameAndAlias = parseRightNode(rightNode, sideTableSet, queueInfo,
272-
parentWhere, parentSelectList, parentGroupByList, scope);
275+
parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
273276
Tuple2<String, String> parentRightJoinInfo = parseRightNode(parentRightJoinNode, sideTableSet,
274-
queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
277+
queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
275278
boolean parentRightIsSide = checkIsSideTable(parentRightJoinInfo.f0, sideTableSet);
276279

277280
JoinInfo joinInfo = dealJoinNode(leftJoinNode, sideTableSet, queueInfo, parentWhere, parentSelectList,
278-
parentGroupByList, joinFieldSet, tableRef, fieldRef, scope);
281+
parentGroupByList, joinFieldSet, tableRef, fieldRef, scope, joinTableNames);
279282

280283
String rightTableName = rightTableNameAndAlias.f0;
281284
boolean rightIsSide = checkIsSideTable(rightTableName, sideTableSet);
@@ -670,12 +673,12 @@ private void extractSelectField(SqlNode selectNode,
670673

671674
private Tuple2<String, String> parseRightNode(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object> queueInfo,
672675
SqlNode parentWhere, SqlNodeList selectList, SqlNodeList parentGroupByList,
673-
String scope) {
676+
String scope, Set<String> joinTableNames) {
674677
Tuple2<String, String> tabName = new Tuple2<>("", "");
675678
if(sqlNode.getKind() == IDENTIFIER){
676679
tabName.f0 = sqlNode.toString();
677680
}else{
678-
AliasInfo aliasInfo = (AliasInfo)sideSQLParser.parseSql(sqlNode, sideTableSet, queueInfo, parentWhere, selectList, parentGroupByList, scope);
681+
AliasInfo aliasInfo = (AliasInfo)sideSQLParser.parseSql(sqlNode, sideTableSet, queueInfo, parentWhere, selectList, parentGroupByList, scope, joinTableNames);
679682
tabName.f0 = aliasInfo.getName();
680683
tabName.f1 = aliasInfo.getAlias();
681684
}

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public Queue<Object> getExeQueue(String exeSql, Set<String> sideTableSet, String
7171
FlinkPlannerImpl flinkPlanner = FlinkPlanner.getFlinkPlanner();
7272
SqlNode sqlNode = flinkPlanner.parse(exeSql);
7373

74-
parseSql(sqlNode, sideTableSet, queueInfo, null, null, null, scope);
74+
parseSql(sqlNode, sideTableSet, queueInfo, null, null, null, scope, Sets.newHashSet());
7575
queueInfo.offer(sqlNode);
7676
return queueInfo;
7777
}
@@ -92,31 +92,32 @@ public Object parseSql(SqlNode sqlNode,
9292
SqlNode parentWhere,
9393
SqlNodeList parentSelectList,
9494
SqlNodeList parentGroupByList,
95-
String scope){
95+
String scope,
96+
Set<String> joinTableNames){
9697
SqlKind sqlKind = sqlNode.getKind();
9798
switch (sqlKind){
9899
case WITH: {
99100
SqlWith sqlWith = (SqlWith) sqlNode;
100101
SqlNodeList sqlNodeList = sqlWith.withList;
101102
for (SqlNode withAsTable : sqlNodeList) {
102103
SqlWithItem sqlWithItem = (SqlWithItem) withAsTable;
103-
parseSql(sqlWithItem.query, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
104+
parseSql(sqlWithItem.query, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
104105
queueInfo.add(sqlWithItem);
105106
}
106-
parseSql(sqlWith.body, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
107+
parseSql(sqlWith.body, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
107108
break;
108109
}
109110
case INSERT:
110111
SqlNode sqlSource = ((SqlInsert)sqlNode).getSource();
111-
return parseSql(sqlSource, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
112+
return parseSql(sqlSource, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
112113
case SELECT:
113114
SqlNode sqlFrom = ((SqlSelect)sqlNode).getFrom();
114115
SqlNode sqlWhere = ((SqlSelect)sqlNode).getWhere();
115116
SqlNodeList selectList = ((SqlSelect)sqlNode).getSelectList();
116117
SqlNodeList groupByList = ((SqlSelect) sqlNode).getGroup();
117118

118119
if(sqlFrom.getKind() != IDENTIFIER){
119-
Object result = parseSql(sqlFrom, sideTableSet, queueInfo, sqlWhere, selectList, groupByList, scope);
120+
Object result = parseSql(sqlFrom, sideTableSet, queueInfo, sqlWhere, selectList, groupByList, scope, joinTableNames);
120121
if(result instanceof JoinInfo){
121122
return TableUtils.dealSelectResultWithJoinInfo((JoinInfo) result, (SqlSelect) sqlNode, queueInfo);
122123
}else if(result instanceof AliasInfo){
@@ -138,7 +139,7 @@ public Object parseSql(SqlNode sqlNode,
138139
Map<String, String> tableRef = Maps.newHashMap();
139140
Map<String, String> fieldRef = Maps.newHashMap();
140141
return joinNodeDealer.dealJoinNode((SqlJoin) sqlNode, sideTableSet, queueInfo,
141-
parentWhere, parentSelectList, parentGroupByList, joinFieldSet, tableRef, fieldRef, scope);
142+
parentWhere, parentSelectList, parentGroupByList, joinFieldSet, tableRef, fieldRef, scope, joinTableNames);
142143
case AS:
143144
SqlNode info = ((SqlBasicCall)sqlNode).getOperands()[0];
144145
SqlNode alias = ((SqlBasicCall) sqlNode).getOperands()[1];
@@ -147,7 +148,7 @@ public Object parseSql(SqlNode sqlNode,
147148
if(info.getKind() == IDENTIFIER){
148149
infoStr = info.toString();
149150
} else {
150-
infoStr = parseSql(info, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope).toString();
151+
infoStr = parseSql(info, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames).toString();
151152
}
152153

153154
AliasInfo aliasInfo = new AliasInfo();
@@ -160,12 +161,12 @@ public Object parseSql(SqlNode sqlNode,
160161
SqlNode unionLeft = ((SqlBasicCall)sqlNode).getOperands()[0];
161162
SqlNode unionRight = ((SqlBasicCall)sqlNode).getOperands()[1];
162163

163-
parseSql(unionLeft, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
164-
parseSql(unionRight, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
164+
parseSql(unionLeft, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
165+
parseSql(unionRight, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
165166
break;
166167
case ORDER_BY:
167168
SqlOrderBy sqlOrderBy = (SqlOrderBy) sqlNode;
168-
parseSql(sqlOrderBy.query, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
169+
parseSql(sqlOrderBy.query, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
169170

170171
case LITERAL:
171172
return LITERAL.toString();

core/src/main/java/com/dtstack/flink/sql/util/TableUtils.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,15 @@
4141
import org.apache.calcite.sql.SqlSelect;
4242
import org.apache.calcite.sql.fun.SqlCase;
4343
import org.apache.calcite.sql.parser.SqlParserPos;
44+
import org.apache.commons.collections.CollectionUtils;
4445
import org.apache.commons.lang3.StringUtils;
4546
import org.apache.flink.table.api.Table;
4647

4748
import java.util.List;
4849
import java.util.Map;
4950
import java.util.Queue;
5051
import java.util.Set;
52+
import java.util.stream.Collector;
5153

5254
import static org.apache.calcite.sql.SqlKind.*;
5355
import static org.apache.calcite.sql.SqlKind.CASE;
@@ -710,4 +712,19 @@ public static String buildTableNameWithScope(String tableName, String scope){
710712
return tableName + "_" + scope;
711713
}
712714

715+
public static String buildTableNameWithScope(String leftTableName, String leftTableAlias, String rightTableName, String scope, Set<String> existTableNames){
716+
//兼容左边表是as 的情况
717+
String leftStr = Strings.isNullOrEmpty(leftTableName) ? leftTableAlias : leftTableName;
718+
String newName = leftStr + "_" + rightTableName;
719+
String newTableName = TableUtils.buildTableNameWithScope(newName, scope);
720+
if (CollectionUtils.isEmpty(existTableNames)) {
721+
return newName;
722+
}
723+
if (!existTableNames.contains(newName)) {
724+
return newName;
725+
}
726+
return newName + "_" + System.currentTimeMillis();
727+
728+
}
729+
713730
}

0 commit comments

Comments
 (0)