Skip to content

Commit fc23d5d

Browse files
author
gituser
committed
Merge branch 'hotfix_1.8_3.10.x_29266' into 1.8_release_3.10.x
2 parents 00a2d36 + d417e51 commit fc23d5d

File tree

5 files changed

+50
-43
lines changed

5 files changed

+50
-43
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ private static void sqlTranslation(String localSqlPluginPath,
229229
}
230230
if (isSide) {
231231
//sql-dimensional table contains the dimension table of execution
232-
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, null, null);
232+
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, null, String.valueOf(scope));
233233
} else {
234234
LOG.info("----------exec sql without dimension join-----------");
235235
LOG.info("----------real sql exec is--------------------------\n{}", result.getExecSql());

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ public class JoinInfo implements Serializable {
6969

7070
private String scope = "";
7171

72+
private String newTableName = null;
73+
7274
/**
7375
* 左表需要查询的字段信息和output的时候对应的列名称
7476
*/
@@ -96,13 +98,12 @@ public String getNonSideTable(){
9698
}
9799

98100
public String getNewTableName(){
99-
//兼容左边表是as 的情况
100-
String leftStr = leftTableName;
101-
leftStr = Strings.isNullOrEmpty(leftStr) ? leftTableAlias : leftStr;
102-
String newName = leftStr + "_" + rightTableName;
103-
return TableUtils.buildTableNameWithScope(newName, scope);
101+
return this.newTableName;
104102
}
105103

104+
public void setNewTableName(String newTableName){
105+
this.newTableName = newTableName;
106+
}
106107

107108
public String getNewTableAlias(){
108109
String newName = leftTableAlias + "_" + rightTableAlias;

core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
9191
Set<Tuple2<String, String>> joinFieldSet,
9292
Map<String, String> tableRef,
9393
Map<String, String> fieldRef,
94-
String scope) {
94+
String scope,
95+
Set<String> joinTableNames) {
9596

9697
SqlNode leftNode = joinNode.getLeft();
9798
SqlNode rightNode = joinNode.getRight();
@@ -108,13 +109,13 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
108109
if (leftNode.getKind() == JOIN) {
109110
//处理连续join
110111
dealNestJoin(joinNode, sideTableSet, queueInfo, parentWhere, parentSelectList,
111-
parentGroupByList, joinFieldSet, tableRef, fieldRef, scope);
112+
parentGroupByList, joinFieldSet, tableRef, fieldRef, scope, joinTableNames);
112113
leftNode = joinNode.getLeft();
113114
}
114115

115116
if (leftNode.getKind() == AS) {
116117
AliasInfo aliasInfo = (AliasInfo) sideSQLParser.parseSql(leftNode, sideTableSet, queueInfo,
117-
parentWhere, parentSelectList, parentGroupByList, scope);
118+
parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
118119
leftTbName = aliasInfo.getName();
119120
leftTbAlias = aliasInfo.getAlias();
120121
} else if(leftNode.getKind() == IDENTIFIER){
@@ -126,7 +127,7 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
126127
Preconditions.checkState(!leftIsSide, "side-table must be at the right of join operator");
127128

128129
Tuple2<String, String> rightTableNameAndAlias = parseRightNode(rightNode, sideTableSet, queueInfo,
129-
parentWhere, parentSelectList, parentGroupByList, scope);
130+
parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
130131
rightTableName = rightTableNameAndAlias.f0;
131132
rightTableAlias = rightTableNameAndAlias.f1;
132133

@@ -150,7 +151,8 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
150151
tableInfo.setJoinType(joinType);
151152
tableInfo.setCondition(joinNode.getCondition());
152153
tableInfo.setScope(scope);
153-
154+
tableInfo.setNewTableName(TableUtils.buildTableNameWithScope(leftTbName, leftTbAlias, rightTableName, scope, joinTableNames));
155+
joinTableNames.add(tableInfo.getNewTableName());
154156
TableUtils.replaceJoinFieldRefTableName(joinNode.getCondition(), fieldRef);
155157

156158
//extract 需要查询的字段信息
@@ -262,20 +264,21 @@ private JoinInfo dealNestJoin(SqlJoin joinNode,
262264
Set<Tuple2<String, String>> joinFieldSet,
263265
Map<String, String> tableRef,
264266
Map<String, String> fieldRef,
265-
String scope){
267+
String scope,
268+
Set<String> joinTableNames){
266269

267270
SqlJoin leftJoinNode = (SqlJoin) joinNode.getLeft();
268271
SqlNode parentRightJoinNode = joinNode.getRight();
269272
SqlNode rightNode = leftJoinNode.getRight();
270273

271274
Tuple2<String, String> rightTableNameAndAlias = parseRightNode(rightNode, sideTableSet, queueInfo,
272-
parentWhere, parentSelectList, parentGroupByList, scope);
275+
parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
273276
Tuple2<String, String> parentRightJoinInfo = parseRightNode(parentRightJoinNode, sideTableSet,
274-
queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
277+
queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
275278
boolean parentRightIsSide = checkIsSideTable(parentRightJoinInfo.f0, sideTableSet);
276279

277280
JoinInfo joinInfo = dealJoinNode(leftJoinNode, sideTableSet, queueInfo, parentWhere, parentSelectList,
278-
parentGroupByList, joinFieldSet, tableRef, fieldRef, scope);
281+
parentGroupByList, joinFieldSet, tableRef, fieldRef, scope, joinTableNames);
279282

280283
String rightTableName = rightTableNameAndAlias.f0;
281284
boolean rightIsSide = checkIsSideTable(rightTableName, sideTableSet);
@@ -670,12 +673,12 @@ private void extractSelectField(SqlNode selectNode,
670673

671674
private Tuple2<String, String> parseRightNode(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object> queueInfo,
672675
SqlNode parentWhere, SqlNodeList selectList, SqlNodeList parentGroupByList,
673-
String scope) {
676+
String scope, Set<String> joinTableNames) {
674677
Tuple2<String, String> tabName = new Tuple2<>("", "");
675678
if(sqlNode.getKind() == IDENTIFIER){
676679
tabName.f0 = sqlNode.toString();
677680
}else{
678-
AliasInfo aliasInfo = (AliasInfo)sideSQLParser.parseSql(sqlNode, sideTableSet, queueInfo, parentWhere, selectList, parentGroupByList, scope);
681+
AliasInfo aliasInfo = (AliasInfo)sideSQLParser.parseSql(sqlNode, sideTableSet, queueInfo, parentWhere, selectList, parentGroupByList, scope, joinTableNames);
679682
tabName.f0 = aliasInfo.getName();
680683
tabName.f1 = aliasInfo.getAlias();
681684
}

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public Queue<Object> getExeQueue(String exeSql, Set<String> sideTableSet, String
7171
FlinkPlannerImpl flinkPlanner = FlinkPlanner.getFlinkPlanner();
7272
SqlNode sqlNode = flinkPlanner.parse(exeSql);
7373

74-
parseSql(sqlNode, sideTableSet, queueInfo, null, null, null, scope);
74+
parseSql(sqlNode, sideTableSet, queueInfo, null, null, null, scope, Sets.newHashSet());
7575
queueInfo.offer(sqlNode);
7676
return queueInfo;
7777
}
@@ -92,31 +92,32 @@ public Object parseSql(SqlNode sqlNode,
9292
SqlNode parentWhere,
9393
SqlNodeList parentSelectList,
9494
SqlNodeList parentGroupByList,
95-
String scope){
95+
String scope,
96+
Set<String> joinTableNames){
9697
SqlKind sqlKind = sqlNode.getKind();
9798
switch (sqlKind){
9899
case WITH: {
99100
SqlWith sqlWith = (SqlWith) sqlNode;
100101
SqlNodeList sqlNodeList = sqlWith.withList;
101102
for (SqlNode withAsTable : sqlNodeList) {
102103
SqlWithItem sqlWithItem = (SqlWithItem) withAsTable;
103-
parseSql(sqlWithItem.query, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
104+
parseSql(sqlWithItem.query, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
104105
queueInfo.add(sqlWithItem);
105106
}
106-
parseSql(sqlWith.body, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
107+
parseSql(sqlWith.body, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
107108
break;
108109
}
109110
case INSERT:
110111
SqlNode sqlSource = ((SqlInsert)sqlNode).getSource();
111-
return parseSql(sqlSource, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
112+
return parseSql(sqlSource, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
112113
case SELECT:
113114
SqlNode sqlFrom = ((SqlSelect)sqlNode).getFrom();
114115
SqlNode sqlWhere = ((SqlSelect)sqlNode).getWhere();
115116
SqlNodeList selectList = ((SqlSelect)sqlNode).getSelectList();
116117
SqlNodeList groupByList = ((SqlSelect) sqlNode).getGroup();
117118

118119
if(sqlFrom.getKind() != IDENTIFIER){
119-
Object result = parseSql(sqlFrom, sideTableSet, queueInfo, sqlWhere, selectList, groupByList, scope);
120+
Object result = parseSql(sqlFrom, sideTableSet, queueInfo, sqlWhere, selectList, groupByList, scope, joinTableNames);
120121
if(result instanceof JoinInfo){
121122
return TableUtils.dealSelectResultWithJoinInfo((JoinInfo) result, (SqlSelect) sqlNode, queueInfo);
122123
}else if(result instanceof AliasInfo){
@@ -138,7 +139,7 @@ public Object parseSql(SqlNode sqlNode,
138139
Map<String, String> tableRef = Maps.newHashMap();
139140
Map<String, String> fieldRef = Maps.newHashMap();
140141
return joinNodeDealer.dealJoinNode((SqlJoin) sqlNode, sideTableSet, queueInfo,
141-
parentWhere, parentSelectList, parentGroupByList, joinFieldSet, tableRef, fieldRef, scope);
142+
parentWhere, parentSelectList, parentGroupByList, joinFieldSet, tableRef, fieldRef, scope, joinTableNames);
142143
case AS:
143144
SqlNode info = ((SqlBasicCall)sqlNode).getOperands()[0];
144145
SqlNode alias = ((SqlBasicCall) sqlNode).getOperands()[1];
@@ -147,7 +148,7 @@ public Object parseSql(SqlNode sqlNode,
147148
if(info.getKind() == IDENTIFIER){
148149
infoStr = info.toString();
149150
} else {
150-
infoStr = parseSql(info, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope).toString();
151+
infoStr = parseSql(info, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames).toString();
151152
}
152153

153154
AliasInfo aliasInfo = new AliasInfo();
@@ -160,12 +161,12 @@ public Object parseSql(SqlNode sqlNode,
160161
SqlNode unionLeft = ((SqlBasicCall)sqlNode).getOperands()[0];
161162
SqlNode unionRight = ((SqlBasicCall)sqlNode).getOperands()[1];
162163

163-
parseSql(unionLeft, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
164-
parseSql(unionRight, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
164+
parseSql(unionLeft, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
165+
parseSql(unionRight, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
165166
break;
166167
case ORDER_BY:
167168
SqlOrderBy sqlOrderBy = (SqlOrderBy) sqlNode;
168-
parseSql(sqlOrderBy.query, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
169+
parseSql(sqlOrderBy.query, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
169170

170171
case LITERAL:
171172
return LITERAL.toString();

core/src/main/java/com/dtstack/flink/sql/util/TableUtils.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626
import com.google.common.collect.HashBasedTable;
2727
import com.google.common.collect.HashBiMap;
2828
import com.google.common.collect.Lists;
29-
import com.google.common.collect.Sets;
30-
import com.typesafe.config.ConfigException;
3129
import org.apache.calcite.sql.SqlAsOperator;
3230
import org.apache.calcite.sql.SqlBasicCall;
3331
import org.apache.calcite.sql.SqlDataTypeSpec;
@@ -41,6 +39,7 @@
4139
import org.apache.calcite.sql.SqlSelect;
4240
import org.apache.calcite.sql.fun.SqlCase;
4341
import org.apache.calcite.sql.parser.SqlParserPos;
42+
import org.apache.commons.collections.CollectionUtils;
4443
import org.apache.commons.lang3.StringUtils;
4544
import org.apache.flink.table.api.Table;
4645

@@ -189,25 +188,13 @@ public static void extractSelectFieldToFieldInfo(SqlNode fieldNode, String fromN
189188
}
190189
}
191190

192-
public static String buildInternalTableName(String left, char split, String right) {
193-
StringBuilder sb = new StringBuilder();
194-
return sb.append(left).append(split).append(right).toString();
195-
}
196-
197191
public static SqlBasicCall buildAsNodeByJoinInfo(JoinInfo joinInfo, SqlNode sqlNode0, String tableAlias) {
198192
SqlOperator operator = new SqlAsOperator();
199193

200194
SqlParserPos sqlParserPos = new SqlParserPos(0, 0);
201195
String newTableName = joinInfo.getNewTableName();
202-
String lefTbAlias = joinInfo.getLeftTableAlias();
203196

204-
if(Strings.isNullOrEmpty(lefTbAlias)){
205-
Set<String> fromTableSet = Sets.newHashSet();
206-
TableUtils.getFromTableInfo(joinInfo.getLeftNode(), fromTableSet);
207-
lefTbAlias = StringUtils.join(fromTableSet, "_");
208-
}
209-
210-
String newTableAlias = !StringUtils.isEmpty(tableAlias) ? tableAlias : buildInternalTableName(lefTbAlias, SPLIT, joinInfo.getRightTableAlias());
197+
String newTableAlias = !StringUtils.isEmpty(tableAlias) ? tableAlias : joinInfo.getNewTableAlias();
211198

212199
if (null == sqlNode0) {
213200
sqlNode0 = new SqlIdentifier(newTableName, null, sqlParserPos);
@@ -710,4 +697,19 @@ public static String buildTableNameWithScope(String tableName, String scope){
710697
return tableName + "_" + scope;
711698
}
712699

700+
public static String buildTableNameWithScope(String leftTableName, String leftTableAlias, String rightTableName, String scope, Set<String> existTableNames){
701+
//兼容左边表是as 的情况
702+
String leftStr = Strings.isNullOrEmpty(leftTableName) ? leftTableAlias : leftTableName;
703+
String newName = leftStr + "_" + rightTableName;
704+
if (CollectionUtils.isEmpty(existTableNames)) {
705+
return TableUtils.buildTableNameWithScope(newName, scope);
706+
}
707+
708+
if (!existTableNames.contains(newName)) {
709+
return TableUtils.buildTableNameWithScope(newName, scope);
710+
}
711+
712+
return TableUtils.buildTableNameWithScope(newName, scope) + "_" + System.currentTimeMillis();
713+
}
714+
713715
}

0 commit comments

Comments
 (0)