Skip to content

Commit 7f19daa

Browse files
committed
初步完成flink 多维表join 改造
1 parent 0859b55 commit 7f19daa

File tree

5 files changed

+168
-191
lines changed

5 files changed

+168
-191
lines changed

core/src/main/java/com/dtstack/flink/sql/side/FieldReplaceInfo.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@
2121
package com.dtstack.flink.sql.side;
2222

2323
import com.google.common.collect.HashBasedTable;
24+
import org.apache.commons.lang3.StringUtils;
2425

2526
/**
26-
* Reason:
27+
* 用于记录转换之后的表和原来表直接字段的关联关系
2728
* Date: 2018/8/30
2829
* Company: www.dtstack.com
2930
* @author xuchao
@@ -37,6 +38,8 @@ public class FieldReplaceInfo {
3738

3839
private String targetTableAlias = null;
3940

41+
private FieldReplaceInfo preNode = null;
42+
4043
public void setMappingTable(HashBasedTable<String, String, String> mappingTable) {
4144
this.mappingTable = mappingTable;
4245
}
@@ -57,7 +60,39 @@ public String getTargetTableAlias() {
5760
return targetTableAlias;
5861
}
5962

63+
public FieldReplaceInfo getPreNode() {
64+
return preNode;
65+
}
66+
67+
public void setPreNode(FieldReplaceInfo preNode) {
68+
this.preNode = preNode;
69+
}
70+
6071
public void setTargetTableAlias(String targetTableAlias) {
6172
this.targetTableAlias = targetTableAlias;
6273
}
74+
75+
/**
76+
* 根据原始的tableName + fieldName 获取转换之后的fieldName
77+
* @param tableName
78+
* @param fieldName
79+
* @return
80+
*/
81+
public String getTargetFieldName(String tableName, String fieldName){
82+
String targetFieldName = mappingTable.get(tableName, fieldName);
83+
if(StringUtils.isNotBlank(targetFieldName)){
84+
return targetFieldName;
85+
}
86+
87+
if(preNode == null){
88+
return null;
89+
}
90+
91+
String preNodeTargetFieldName = preNode.getTargetFieldName(tableName, fieldName);
92+
if(StringUtils.isBlank(preNodeTargetFieldName)){
93+
return null;
94+
}
95+
96+
return mappingTable.get(preNode.getTargetTableName(), preNodeTargetFieldName);
97+
}
6398
}

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ public class JoinInfo implements Serializable {
4242
//左表是否是维表
4343
private boolean leftIsSideTable;
4444

45+
private boolean leftIsTmpTable = false;
46+
4547
//右表是否是维表
4648
private boolean rightIsSideTable;
4749

@@ -64,8 +66,6 @@ public class JoinInfo implements Serializable {
6466
private SqlNode selectNode;
6567

6668
private JoinType joinType;
67-
// 左边是中间转换表,做表映射关系,给替换属性名称使用
68-
private Map<String, String> leftTabMapping;
6969

7070
public String getSideTableName(){
7171
if(leftIsSideTable){
@@ -194,4 +194,12 @@ public JoinType getJoinType() {
194194
public void setJoinType(JoinType joinType) {
195195
this.joinType = joinType;
196196
}
197+
198+
public boolean isLeftIsTmpTable() {
199+
return leftIsTmpTable;
200+
}
201+
202+
public void setLeftIsTmpTable(boolean leftIsTmpTable) {
203+
this.leftIsTmpTable = leftIsTmpTable;
204+
}
197205
}

core/src/main/java/com/dtstack/flink/sql/side/ParserJoinField.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,15 @@ public class ParserJoinField {
5151
*/
5252
public static List<FieldInfo> getRowTypeInfo(SqlNode sqlNode, JoinScope scope, boolean getAll){
5353

54-
if(sqlNode.getKind() != SqlKind.SELECT){
55-
throw new RuntimeException("------not select node--------\n" + sqlNode.toString());
56-
}
57-
5854
List<FieldInfo> fieldInfoList = Lists.newArrayList();
5955
if(getAll){
6056
return getAllField(scope);
6157
}
6258

59+
if(sqlNode.getKind() != SqlKind.SELECT){
60+
throw new RuntimeException("------not select node--------\n" + sqlNode.toString());
61+
}
62+
6363
SqlSelect sqlSelect = (SqlSelect)sqlNode;
6464
SqlNodeList sqlNodeList = sqlSelect.getSelectList();
6565
for(SqlNode fieldNode : sqlNodeList.getList()){
@@ -107,6 +107,7 @@ public static List<FieldInfo> getRowTypeInfo(SqlNode sqlNode, JoinScope scope, b
107107

108108
return fieldInfoList;
109109
}
110+
110111
//TODO 丢弃多余的PROCTIME
111112
private static List<FieldInfo> getAllField(JoinScope scope){
112113
Iterator prefixId = scope.getChildren().iterator();

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 60 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@
2121
package com.dtstack.flink.sql.side;
2222

2323
import com.dtstack.flink.sql.config.CalciteConfig;
24-
import com.dtstack.flink.sql.util.ParseUtils;
25-
import com.google.common.collect.HashBasedTable;
24+
import com.google.common.base.Strings;
25+
import com.google.common.collect.Maps;
26+
import com.google.common.collect.Queues;
2627
import org.apache.calcite.sql.JoinType;
2728
import org.apache.calcite.sql.SqlAsOperator;
2829
import org.apache.calcite.sql.SqlBasicCall;
@@ -42,17 +43,10 @@
4243
import org.apache.calcite.sql.parser.SqlParserPos;
4344
import org.apache.commons.lang3.StringUtils;
4445
import org.apache.flink.api.java.tuple.Tuple2;
45-
import com.google.common.base.Strings;
46-
import com.google.common.collect.Lists;
47-
import com.google.common.collect.Maps;
48-
import com.google.common.collect.Queues;
4946
import org.apache.flink.table.api.Table;
5047
import org.slf4j.Logger;
5148
import org.slf4j.LoggerFactory;
5249

53-
import java.util.HashMap;
54-
import java.util.Iterator;
55-
import java.util.List;
5650
import java.util.Map;
5751
import java.util.Queue;
5852
import java.util.Set;
@@ -71,13 +65,6 @@ public class SideSQLParser {
7165

7266
private Map<String, Table> localTableCache = Maps.newHashMap();
7367
private final char SPLIT = '_';
74-
private String tempSQL = "SELECT * FROM TMP";
75-
76-
/** 处理连续join时,中间表存储子表字段映射 */
77-
private Map<String, HashBasedTable<String, String, String>> midTableFileNameMapping = Maps.newHashMap();
78-
/** 处理连续join时,原始表与中间表的映射 */
79-
private Map<String, List<Tuple2<String, String>>> midTableNameMapping = Maps.newHashMap();
80-
8168

8269
public Queue<Object> getExeQueue(String exeSql, Set<String> sideTableSet) throws SqlParseException {
8370
System.out.println("---exeSql---");
@@ -88,14 +75,6 @@ public Queue<Object> getExeQueue(String exeSql, Set<String> sideTableSet) throws
8875
Queue<Object> queueInfo = Queues.newLinkedBlockingQueue();
8976
SqlParser sqlParser = SqlParser.create(exeSql, CalciteConfig.MYSQL_LEX_CONFIG);
9077
SqlNode sqlNode = sqlParser.parseStmt();
91-
SqlNode original = sqlNode;
92-
93-
try {
94-
checkAndReplaceMultiJoin(sqlNode, sideTableSet);
95-
} catch (Exception e) {
96-
sqlNode = original;
97-
LOG.error("checkAndReplaceMultiJoin method error ", e);
98-
}
9978

10079
parseSql(sqlNode, sideTableSet, queueInfo);
10180
queueInfo.offer(sqlNode);
@@ -126,7 +105,7 @@ private void checkAndReplaceMultiJoin(SqlNode sqlNode, Set<String> sideTableSet)
126105
}
127106
break;
128107
case JOIN:
129-
convertMultiJoinToNestQuery((SqlJoin) sqlNode, sideTableSet);
108+
convertSideJoinToNewQuery((SqlJoin) sqlNode, sideTableSet);
130109
break;
131110
case AS:
132111
SqlNode info = ((SqlBasicCall) sqlNode).getOperands()[0];
@@ -213,10 +192,6 @@ private Object parseSql(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object>
213192
return "";
214193
}
215194

216-
private boolean isMultiJoinSqlNode(SqlNode sqlNode) {
217-
return sqlNode.getKind() == JOIN && ((SqlJoin) sqlNode).getLeft().getKind() == JOIN;
218-
}
219-
220195
private AliasInfo getSqlNodeAliasInfo(SqlNode sqlNode) {
221196
SqlNode info = ((SqlBasicCall) sqlNode).getOperands()[0];
222197
SqlNode alias = ((SqlBasicCall) sqlNode).getOperands()[1];
@@ -228,49 +203,18 @@ private AliasInfo getSqlNodeAliasInfo(SqlNode sqlNode) {
228203
return aliasInfo;
229204
}
230205

231-
private void convertMultiJoinToNestQuery(SqlNode sqlNode, Set<String> sideTableSet) {
232-
SqlKind sqlKind = sqlNode.getKind();
233-
switch (sqlKind) {
234-
case JOIN:
235-
checkAndReplaceMultiJoin(((SqlJoin) sqlNode).getRight(), sideTableSet);
236-
checkAndReplaceMultiJoin(((SqlJoin) sqlNode).getLeft(), sideTableSet);
237-
if (isMultiJoinSqlNode(sqlNode)) {
238-
AliasInfo rightTableAliasInfo = getSqlNodeAliasInfo(((SqlJoin) sqlNode).getRight());
239-
String rightTableName = StringUtils.isEmpty(rightTableAliasInfo.getName()) ? rightTableAliasInfo.getAlias() : rightTableAliasInfo.getName();
240-
if (sideTableSet.contains(rightTableName)) {
241-
List<Tuple2<String, String>> joinIncludeSourceTable = Lists.newArrayList();
242-
ParseUtils.parseLeftNodeTableName(((SqlJoin) sqlNode).getLeft(), joinIncludeSourceTable, sideTableSet);
243-
244-
// last source table alias name + side or child query table alias name + _0
245-
String leftFirstTableAlias = joinIncludeSourceTable.get(0).f0;
246-
String internalTableName = buildInternalTableName(leftFirstTableAlias, SPLIT, rightTableName) + "_0";
247-
midTableNameMapping.put(internalTableName, joinIncludeSourceTable);
248-
249-
// select * from xxx
250-
SqlNode newSource = buildSelectByLeftNode(((SqlJoin) sqlNode).getLeft());
251-
// ( select * from xxx) as xxx_0
252-
SqlBasicCall newAsNode = buildAsSqlNode(internalTableName, newSource);
253-
((SqlJoin) sqlNode).setLeft(newAsNode);
254-
255-
String asNodeAlias = buildInternalTableName(internalTableName, SPLIT, rightTableName);
256-
buildAsSqlNode(asNodeAlias, sqlNode);
257-
258-
HashBasedTable<String, String, String> mappingTable = HashBasedTable.create();
259-
Set<String> sourceTableName = localTableCache.keySet();
260-
joinIncludeSourceTable.stream().filter((Tuple2<String, String> tabName) -> {
261-
return null != tabName.f1 && sourceTableName.contains(tabName.f1);
262-
}).forEach((Tuple2<String, String> tabName ) -> {
263-
String realTableName = tabName.f1;
264-
String tableAlias = tabName.f0;
265-
Table table = localTableCache.get(realTableName);
266-
ParseUtils.fillFieldNameMapping(mappingTable, table.getSchema().getFieldNames(), tableAlias);
267-
268-
});
269-
// ParseUtils.replaceJoinConditionTabName(((SqlJoin) sqlNode).getCondition(), mappingTable, internalTableName);
270-
System.out.println("");
271-
}
272-
}
273-
break;
206+
/**
207+
* 将和维表关联的join 替换为一个新的查询
208+
* @param sqlNode
209+
* @param sideTableSet
210+
*/
211+
private void convertSideJoinToNewQuery(SqlJoin sqlNode, Set<String> sideTableSet) {
212+
checkAndReplaceMultiJoin(sqlNode.getLeft(), sideTableSet);
213+
checkAndReplaceMultiJoin(sqlNode.getRight(), sideTableSet);
214+
215+
AliasInfo rightTableAliasInfo = getSqlNodeAliasInfo(sqlNode.getRight());
216+
if(sideTableSet.contains(rightTableAliasInfo.getName())){
217+
//构建新的查询
274218
}
275219
}
276220

@@ -288,15 +232,21 @@ private JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet, Queue<
288232
SqlNode leftNode = joinNode.getLeft();
289233
SqlNode rightNode = joinNode.getRight();
290234
JoinType joinType = joinNode.getJoinType();
235+
291236
String leftTbName = "";
292237
String leftTbAlias = "";
293238
String rightTableName = "";
294239
String rightTableAlias = "";
240+
boolean leftTbisTmp = false;
241+
295242
Tuple2<String, String> rightTableNameAndAlias = null;
296243
if(leftNode.getKind() == IDENTIFIER){
297244
leftTbName = leftNode.toString();
298245
} else if (leftNode.getKind() == JOIN) {
299-
System.out.println(leftNode.toString());
246+
//处理连续join
247+
SqlBasicCall sqlBasicCall = dealNestJoin((SqlJoin) leftNode, sideTableSet, queueInfo);
248+
leftTbName = sqlBasicCall.getOperands()[0].toString();
249+
leftTbisTmp = true;
300250
} else if (leftNode.getKind() == AS) {
301251
AliasInfo aliasInfo = (AliasInfo) parseSql(leftNode, sideTableSet, queueInfo);
302252
leftTbName = aliasInfo.getName();
@@ -320,19 +270,6 @@ private JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet, Queue<
320270
throw new RuntimeException("side join not support join type of right[current support inner join and left join]");
321271
}
322272

323-
Iterator iterator = ((HashMap) midTableNameMapping).values().iterator();
324-
while (iterator.hasNext()) {
325-
List<Tuple2<String, String>> next = (List) iterator.next();
326-
String finalRightTableAlias = rightTableAlias;
327-
String finalRightTableName = rightTableName;
328-
next.forEach(tp2 -> {
329-
if (tp2.f1 == null && tp2.f0 == finalRightTableAlias) {
330-
tp2.f1 = finalRightTableName;
331-
}
332-
});
333-
}
334-
335-
336273
JoinInfo tableInfo = new JoinInfo();
337274
tableInfo.setLeftTableName(leftTbName);
338275
tableInfo.setRightTableName(rightTableName);
@@ -341,11 +278,15 @@ private JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet, Queue<
341278
} else {
342279
tableInfo.setLeftTableAlias(leftTbAlias);
343280
}
281+
344282
if (StringUtils.isEmpty(rightTableAlias)){
345283
tableInfo.setRightTableAlias(rightTableName);
346284
} else {
347285
tableInfo.setRightTableAlias(rightTableAlias);
348286
}
287+
288+
tableInfo.setLeftIsTmpTable(leftTbisTmp);
289+
349290
tableInfo.setLeftIsSideTable(leftIsSide);
350291
tableInfo.setRightIsSideTable(rightIsSide);
351292
tableInfo.setLeftNode(leftNode);
@@ -355,6 +296,24 @@ private JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet, Queue<
355296
return tableInfo;
356297
}
357298

299+
//构建新的查询
300+
private SqlBasicCall dealNestJoin(SqlJoin joinNode, Set<String> sideTableSet, Queue<Object> queueInfo){
301+
SqlNode rightNode = joinNode.getRight();
302+
303+
Tuple2<String, String> rightTableNameAndAlias = parseRightNode(rightNode, sideTableSet, queueInfo);
304+
String rightTableName = rightTableNameAndAlias.f0;
305+
boolean rightIsSide = checkIsSideTable(rightTableName, sideTableSet);
306+
307+
if(!rightIsSide){
308+
return null;
309+
}
310+
311+
JoinInfo joinInfo = dealJoinNode(joinNode, sideTableSet, queueInfo);
312+
queueInfo.offer(joinInfo);
313+
return buildAsNodeByJoinInfo(joinInfo, null, null);
314+
315+
}
316+
358317
private Tuple2<String, String> parseRightNode(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object> queueInfo) {
359318
Tuple2<String, String> tabName = new Tuple2<>("", "");
360319
if(sqlNode.getKind() == IDENTIFIER){
@@ -367,19 +326,25 @@ private Tuple2<String, String> parseRightNode(SqlNode sqlNode, Set<String> sideT
367326
return tabName;
368327
}
369328

370-
private SqlNode buildSelectByLeftNode(SqlNode leftNode) {
371-
SqlParser sqlParser = SqlParser.create(tempSQL, CalciteConfig.MYSQL_LEX_CONFIG);
372-
SqlNode sqlNode = null;
373-
try {
374-
sqlNode = sqlParser.parseStmt();
375-
}catch (Exception e) {
376-
LOG.error("tmp sql parse error..", e);
329+
private Tuple2<String, String> parseLeftNode(SqlNode sqlNode){
330+
Tuple2<String, String> tabName = new Tuple2<>("", "");
331+
if(sqlNode.getKind() == IDENTIFIER){
332+
tabName.f0 = sqlNode.toString();
333+
tabName.f1 = sqlNode.toString();
334+
}else if (sqlNode.getKind() == AS){
335+
SqlNode info = ((SqlBasicCall)sqlNode).getOperands()[0];
336+
SqlNode alias = ((SqlBasicCall) sqlNode).getOperands()[1];
337+
338+
tabName.f0 = info.toString();
339+
tabName.f1 = alias.toString();
340+
}else {
341+
throw new RuntimeException("");
377342
}
378343

379-
((SqlSelect) sqlNode).setFrom(leftNode);
380-
return sqlNode;
344+
return tabName;
381345
}
382346

347+
383348
/**
384349
*
385350
* @param joinInfo
@@ -452,14 +417,6 @@ private boolean checkIsSideTable(String tableName, Set<String> sideTableList){
452417
return false;
453418
}
454419

455-
public Map<String, HashBasedTable<String, String, String>> getMidTableFileNameMapping() {
456-
return midTableFileNameMapping;
457-
}
458-
459-
public Map<String, List<Tuple2<String, String>>> getMidTableNameMapping() {
460-
return midTableNameMapping;
461-
}
462-
463420
public void setLocalTableCache(Map<String, Table> localTableCache) {
464421
this.localTableCache = localTableCache;
465422
}

0 commit comments

Comments
 (0)