Skip to content

Commit 1ec65d0

Browse files
author
gituser
committed
Merge branch '1.8_release_3.10.x' into 1.8_test_4.0.x
2 parents 781bf35 + ceafe44 commit 1ec65d0

File tree

16 files changed

+181
-100
lines changed

16 files changed

+181
-100
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ private static void sqlTranslation(String localSqlPluginPath,
229229
}
230230
if (isSide) {
231231
//sql-dimensional table contains the dimension table of execution
232-
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, null, null);
232+
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, null, String.valueOf(scope));
233233
} else {
234234
LOG.info("----------exec sql without dimension join-----------");
235235
LOG.info("----------real sql exec is--------------------------\n{}", result.getExecSql());
@@ -321,7 +321,7 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
321321
URL sinkTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), AbstractTargetTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
322322
pluginClassPathSets.add(sinkTablePathUrl);
323323
} else if (tableInfo instanceof AbstractSideTableInfo) {
324-
String sideOperator = ECacheType.ALL.name().equals(((AbstractSideTableInfo) tableInfo).getCacheType()) ? "all" : "async";
324+
String sideOperator = ECacheType.ALL.name().equalsIgnoreCase(((AbstractSideTableInfo) tableInfo).getCacheType()) ? "all" : "async";
325325
sideTableMap.put(tableInfo.getName(), (AbstractSideTableInfo) tableInfo);
326326

327327
URL sideTablePathUrl = PluginUtil.buildSidePathByLoadMode(tableInfo.getType(), sideOperator, AbstractSideTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ public class JoinInfo implements Serializable {
6969

7070
private String scope = "";
7171

72+
private String newTableName = null;
73+
7274
/**
7375
* 左表需要查询的字段信息和output的时候对应的列名称
7476
*/
@@ -96,13 +98,12 @@ public String getNonSideTable(){
9698
}
9799

98100
public String getNewTableName(){
99-
//兼容左边表是as 的情况
100-
String leftStr = leftTableName;
101-
leftStr = Strings.isNullOrEmpty(leftStr) ? leftTableAlias : leftStr;
102-
String newName = leftStr + "_" + rightTableName;
103-
return TableUtils.buildTableNameWithScope(newName, scope);
101+
return this.newTableName;
104102
}
105103

104+
public void setNewTableName(String newTableName){
105+
this.newTableName = newTableName;
106+
}
106107

107108
public String getNewTableAlias(){
108109
String newName = leftTableAlias + "_" + rightTableAlias;

core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
9191
Set<Tuple2<String, String>> joinFieldSet,
9292
Map<String, String> tableRef,
9393
Map<String, String> fieldRef,
94-
String scope) {
94+
String scope,
95+
Set<String> joinTableNames) {
9596

9697
SqlNode leftNode = joinNode.getLeft();
9798
SqlNode rightNode = joinNode.getRight();
@@ -108,13 +109,13 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
108109
if (leftNode.getKind() == JOIN) {
109110
//处理连续join
110111
dealNestJoin(joinNode, sideTableSet, queueInfo, parentWhere, parentSelectList,
111-
parentGroupByList, joinFieldSet, tableRef, fieldRef, scope);
112+
parentGroupByList, joinFieldSet, tableRef, fieldRef, scope, joinTableNames);
112113
leftNode = joinNode.getLeft();
113114
}
114115

115116
if (leftNode.getKind() == AS) {
116117
AliasInfo aliasInfo = (AliasInfo) sideSQLParser.parseSql(leftNode, sideTableSet, queueInfo,
117-
parentWhere, parentSelectList, parentGroupByList, scope);
118+
parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
118119
leftTbName = aliasInfo.getName();
119120
leftTbAlias = aliasInfo.getAlias();
120121
} else if(leftNode.getKind() == IDENTIFIER){
@@ -126,7 +127,7 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
126127
Preconditions.checkState(!leftIsSide, "side-table must be at the right of join operator");
127128

128129
Tuple2<String, String> rightTableNameAndAlias = parseRightNode(rightNode, sideTableSet, queueInfo,
129-
parentWhere, parentSelectList, parentGroupByList, scope);
130+
parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
130131
rightTableName = rightTableNameAndAlias.f0;
131132
rightTableAlias = rightTableNameAndAlias.f1;
132133

@@ -150,7 +151,8 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
150151
tableInfo.setJoinType(joinType);
151152
tableInfo.setCondition(joinNode.getCondition());
152153
tableInfo.setScope(scope);
153-
154+
tableInfo.setNewTableName(TableUtils.buildTableNameWithScope(leftTbName, leftTbAlias, rightTableName, scope, joinTableNames));
155+
joinTableNames.add(tableInfo.getNewTableName());
154156
TableUtils.replaceJoinFieldRefTableName(joinNode.getCondition(), fieldRef);
155157

156158
//extract 需要查询的字段信息
@@ -262,20 +264,21 @@ private JoinInfo dealNestJoin(SqlJoin joinNode,
262264
Set<Tuple2<String, String>> joinFieldSet,
263265
Map<String, String> tableRef,
264266
Map<String, String> fieldRef,
265-
String scope){
267+
String scope,
268+
Set<String> joinTableNames){
266269

267270
SqlJoin leftJoinNode = (SqlJoin) joinNode.getLeft();
268271
SqlNode parentRightJoinNode = joinNode.getRight();
269272
SqlNode rightNode = leftJoinNode.getRight();
270273

271274
Tuple2<String, String> rightTableNameAndAlias = parseRightNode(rightNode, sideTableSet, queueInfo,
272-
parentWhere, parentSelectList, parentGroupByList, scope);
275+
parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
273276
Tuple2<String, String> parentRightJoinInfo = parseRightNode(parentRightJoinNode, sideTableSet,
274-
queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
277+
queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
275278
boolean parentRightIsSide = checkIsSideTable(parentRightJoinInfo.f0, sideTableSet);
276279

277280
JoinInfo joinInfo = dealJoinNode(leftJoinNode, sideTableSet, queueInfo, parentWhere, parentSelectList,
278-
parentGroupByList, joinFieldSet, tableRef, fieldRef, scope);
281+
parentGroupByList, joinFieldSet, tableRef, fieldRef, scope, joinTableNames);
279282

280283
String rightTableName = rightTableNameAndAlias.f0;
281284
boolean rightIsSide = checkIsSideTable(rightTableName, sideTableSet);
@@ -670,12 +673,12 @@ private void extractSelectField(SqlNode selectNode,
670673

671674
private Tuple2<String, String> parseRightNode(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object> queueInfo,
672675
SqlNode parentWhere, SqlNodeList selectList, SqlNodeList parentGroupByList,
673-
String scope) {
676+
String scope, Set<String> joinTableNames) {
674677
Tuple2<String, String> tabName = new Tuple2<>("", "");
675678
if(sqlNode.getKind() == IDENTIFIER){
676679
tabName.f0 = sqlNode.toString();
677680
}else{
678-
AliasInfo aliasInfo = (AliasInfo)sideSQLParser.parseSql(sqlNode, sideTableSet, queueInfo, parentWhere, selectList, parentGroupByList, scope);
681+
AliasInfo aliasInfo = (AliasInfo)sideSQLParser.parseSql(sqlNode, sideTableSet, queueInfo, parentWhere, selectList, parentGroupByList, scope, joinTableNames);
679682
tabName.f0 = aliasInfo.getName();
680683
tabName.f1 = aliasInfo.getAlias();
681684
}

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public Queue<Object> getExeQueue(String exeSql, Set<String> sideTableSet, String
7171
FlinkPlannerImpl flinkPlanner = FlinkPlanner.getFlinkPlanner();
7272
SqlNode sqlNode = flinkPlanner.parse(exeSql);
7373

74-
parseSql(sqlNode, sideTableSet, queueInfo, null, null, null, scope);
74+
parseSql(sqlNode, sideTableSet, queueInfo, null, null, null, scope, Sets.newHashSet());
7575
queueInfo.offer(sqlNode);
7676
return queueInfo;
7777
}
@@ -92,31 +92,32 @@ public Object parseSql(SqlNode sqlNode,
9292
SqlNode parentWhere,
9393
SqlNodeList parentSelectList,
9494
SqlNodeList parentGroupByList,
95-
String scope){
95+
String scope,
96+
Set<String> joinTableNames){
9697
SqlKind sqlKind = sqlNode.getKind();
9798
switch (sqlKind){
9899
case WITH: {
99100
SqlWith sqlWith = (SqlWith) sqlNode;
100101
SqlNodeList sqlNodeList = sqlWith.withList;
101102
for (SqlNode withAsTable : sqlNodeList) {
102103
SqlWithItem sqlWithItem = (SqlWithItem) withAsTable;
103-
parseSql(sqlWithItem.query, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
104+
parseSql(sqlWithItem.query, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
104105
queueInfo.add(sqlWithItem);
105106
}
106-
parseSql(sqlWith.body, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
107+
parseSql(sqlWith.body, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
107108
break;
108109
}
109110
case INSERT:
110111
SqlNode sqlSource = ((SqlInsert)sqlNode).getSource();
111-
return parseSql(sqlSource, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
112+
return parseSql(sqlSource, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
112113
case SELECT:
113114
SqlNode sqlFrom = ((SqlSelect)sqlNode).getFrom();
114115
SqlNode sqlWhere = ((SqlSelect)sqlNode).getWhere();
115116
SqlNodeList selectList = ((SqlSelect)sqlNode).getSelectList();
116117
SqlNodeList groupByList = ((SqlSelect) sqlNode).getGroup();
117118

118119
if(sqlFrom.getKind() != IDENTIFIER){
119-
Object result = parseSql(sqlFrom, sideTableSet, queueInfo, sqlWhere, selectList, groupByList, scope);
120+
Object result = parseSql(sqlFrom, sideTableSet, queueInfo, sqlWhere, selectList, groupByList, scope, joinTableNames);
120121
if(result instanceof JoinInfo){
121122
return TableUtils.dealSelectResultWithJoinInfo((JoinInfo) result, (SqlSelect) sqlNode, queueInfo);
122123
}else if(result instanceof AliasInfo){
@@ -138,7 +139,7 @@ public Object parseSql(SqlNode sqlNode,
138139
Map<String, String> tableRef = Maps.newHashMap();
139140
Map<String, String> fieldRef = Maps.newHashMap();
140141
return joinNodeDealer.dealJoinNode((SqlJoin) sqlNode, sideTableSet, queueInfo,
141-
parentWhere, parentSelectList, parentGroupByList, joinFieldSet, tableRef, fieldRef, scope);
142+
parentWhere, parentSelectList, parentGroupByList, joinFieldSet, tableRef, fieldRef, scope, joinTableNames);
142143
case AS:
143144
SqlNode info = ((SqlBasicCall)sqlNode).getOperands()[0];
144145
SqlNode alias = ((SqlBasicCall) sqlNode).getOperands()[1];
@@ -147,7 +148,7 @@ public Object parseSql(SqlNode sqlNode,
147148
if(info.getKind() == IDENTIFIER){
148149
infoStr = info.toString();
149150
} else {
150-
infoStr = parseSql(info, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope).toString();
151+
infoStr = parseSql(info, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames).toString();
151152
}
152153

153154
AliasInfo aliasInfo = new AliasInfo();
@@ -160,12 +161,12 @@ public Object parseSql(SqlNode sqlNode,
160161
SqlNode unionLeft = ((SqlBasicCall)sqlNode).getOperands()[0];
161162
SqlNode unionRight = ((SqlBasicCall)sqlNode).getOperands()[1];
162163

163-
parseSql(unionLeft, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
164-
parseSql(unionRight, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
164+
parseSql(unionLeft, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
165+
parseSql(unionRight, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
165166
break;
166167
case ORDER_BY:
167168
SqlOrderBy sqlOrderBy = (SqlOrderBy) sqlNode;
168-
parseSql(sqlOrderBy.query, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
169+
parseSql(sqlOrderBy.query, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
169170

170171
case LITERAL:
171172
return LITERAL.toString();

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableParser.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public abstract class AbstractTableParser {
4444
private static final String CHAR_TYPE_NO_LENGTH = "CHAR";
4545

4646
private static Pattern primaryKeyPattern = Pattern.compile("(?i)PRIMARY\\s+KEY\\s*\\((.*)\\)");
47-
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(.+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
47+
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(.+?)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
4848
private static Pattern physicalFieldFunPattern = Pattern.compile("\\w+\\((\\w+)\\)$");
4949
private static Pattern charTypePattern = Pattern.compile("(?i)CHAR\\((\\d*)\\)$");
5050

@@ -84,7 +84,7 @@ public boolean dealKeyPattern(String fieldRow, AbstractTableInfo tableInfo){
8484

8585
public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo) {
8686

87-
List<String> fieldRows = DtStringUtil.splitIgnoreQuota(fieldsInfo, ',');
87+
List<String> fieldRows = DtStringUtil.splitField(fieldsInfo);
8888

8989
for (String fieldRow : fieldRows) {
9090
fieldRow = fieldRow.trim();

core/src/main/java/com/dtstack/flink/sql/util/ClassUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class ClassUtil {
3737
public static Class<?> stringConvertClass(String str) {
3838

3939
// 这部分主要是告诉Class转TypeInfomation的方法,字段是Array类型
40-
String lowerStr = str.toLowerCase();
40+
String lowerStr = str.toLowerCase().trim();
4141
if (lowerStr.startsWith("array")) {
4242
return Array.newInstance(Integer.class, 0).getClass();
4343
}

core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,61 @@ public class DtStringUtil {
4646

4747
private static ObjectMapper objectMapper = new ObjectMapper();
4848

49-
5049
/**
5150
* Split the specified string delimiter --- ignored quotes delimiter
5251
* @param str
5352
* @param delimiter
5453
* @return
5554
*/
56-
public static List<String> splitIgnoreQuota(String str, char delimiter){
55+
public static List<String> splitIgnoreQuota(String str, char delimiter) {
56+
List<String> tokensList = new ArrayList<>();
57+
boolean inQuotes = false;
58+
boolean inSingleQuotes = false;
59+
int bracketLeftNum = 0;
60+
StringBuilder b = new StringBuilder();
61+
char[] chars = str.toCharArray();
62+
int idx = 0;
63+
for (char c : chars) {
64+
char flag = 0;
65+
if (idx > 0) {
66+
flag = chars[idx - 1];
67+
}
68+
if (c == delimiter) {
69+
if (inQuotes) {
70+
b.append(c);
71+
} else if (inSingleQuotes) {
72+
b.append(c);
73+
} else if (bracketLeftNum > 0) {
74+
b.append(c);
75+
} else {
76+
tokensList.add(b.toString());
77+
b = new StringBuilder();
78+
}
79+
} else if (c == '\"' && '\\' != flag && !inSingleQuotes) {
80+
inQuotes = !inQuotes;
81+
b.append(c);
82+
} else if (c == '\'' && '\\' != flag && !inQuotes) {
83+
inSingleQuotes = !inSingleQuotes;
84+
b.append(c);
85+
} else if (c == '(' && !inSingleQuotes && !inQuotes) {
86+
bracketLeftNum++;
87+
b.append(c);
88+
} else if (c == ')' && !inSingleQuotes && !inQuotes) {
89+
bracketLeftNum--;
90+
b.append(c);
91+
} else {
92+
b.append(c);
93+
}
94+
idx++;
95+
}
96+
97+
tokensList.add(b.toString());
98+
99+
return tokensList;
100+
}
101+
102+
public static List<String> splitField(String str) {
103+
final char delimiter = ',';
57104
List<String> tokensList = new ArrayList<>();
58105
boolean inQuotes = false;
59106
boolean inSingleQuotes = false;
@@ -106,7 +153,6 @@ public static List<String> splitIgnoreQuota(String str, char delimiter){
106153
return tokensList;
107154
}
108155

109-
110156
public static String replaceIgnoreQuota(String str, String oriStr, String replaceStr){
111157
String splitPatternStr = oriStr + "(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)(?=(?:[^']*'[^']*')*[^']*$)";
112158
return str.replaceAll(splitPatternStr, replaceStr);

0 commit comments

Comments
 (0)