Skip to content

Commit 2027b41

Browse files
committed
parse where case
1 parent 3d317bc commit 2027b41

File tree

4 files changed

+52
-32
lines changed

4 files changed

+52
-32
lines changed

core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.calcite.sql.parser.SqlParser;
2727
import org.apache.commons.lang3.StringUtils;
2828
import com.google.common.collect.Lists;
29+
import org.apache.flink.table.calcite.FlinkPlannerImpl;
2930

3031
import java.util.List;
3132

@@ -53,18 +54,9 @@ public static InsertSqlParser newInstance(){
5354

5455
@Override
5556
public void parseSql(String sql, SqlTree sqlTree) {
56-
SqlParser.Config config = SqlParser
57-
.configBuilder()
58-
.setLex(Lex.MYSQL)
59-
.build();
60-
61-
SqlParser sqlParser = SqlParser.create(sql,config);
62-
SqlNode sqlNode = null;
63-
try {
64-
sqlNode = sqlParser.parseStmt();
65-
} catch (SqlParseException e) {
66-
throw new RuntimeException("", e);
67-
}
57+
58+
FlinkPlannerImpl flinkPlanner = FlinkPlanner.getFlinkPlanner();
59+
SqlNode sqlNode = flinkPlanner.parse(sql);
6860

6961
SqlParseResult sqlParseResult = new SqlParseResult();
7062
parseNode(sqlNode, sqlParseResult);

core/src/main/java/com/dtstack/flink/sql/side/SidePredicatesParser.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -138,16 +138,19 @@ private void extractPredicateInfo(SqlNode whereNode, List<PredicateInfo> predica
138138

139139
private void fillPredicateInfoToList(SqlBasicCall whereNode, List<PredicateInfo> predicatesInfoList, String operatorName, SqlKind operatorKind,
140140
int fieldIndex, int conditionIndex) {
141-
SqlIdentifier fieldFullPath = (SqlIdentifier) whereNode.getOperands()[fieldIndex];
142-
if (fieldFullPath.names.size() == 2) {
143-
String ownerTable = fieldFullPath.names.get(0);
144-
String fieldName = fieldFullPath.names.get(1);
145-
String content = (operatorKind == SqlKind.BETWEEN) ? whereNode.getOperands()[conditionIndex].toString() + " AND " +
146-
whereNode.getOperands()[2].toString() : whereNode.getOperands()[conditionIndex].toString();
141+
SqlNode sqlNode = whereNode.getOperands()[fieldIndex];
142+
if (sqlNode.getKind() == SqlKind.IDENTIFIER) {
143+
SqlIdentifier fieldFullPath = (SqlIdentifier) sqlNode;
144+
if (fieldFullPath.names.size() == 2) {
145+
String ownerTable = fieldFullPath.names.get(0);
146+
String fieldName = fieldFullPath.names.get(1);
147+
String content = (operatorKind == SqlKind.BETWEEN) ? whereNode.getOperands()[conditionIndex].toString() + " AND " +
148+
whereNode.getOperands()[2].toString() : whereNode.getOperands()[conditionIndex].toString();
147149

148-
PredicateInfo predicateInfo = PredicateInfo.builder().setOperatorName(operatorName).setOperatorKind(operatorKind.toString())
149-
.setOwnerTable(ownerTable).setFieldName(fieldName).setCondition(content).build();
150-
predicatesInfoList.add(predicateInfo);
150+
PredicateInfo predicateInfo = PredicateInfo.builder().setOperatorName(operatorName).setOperatorKind(operatorKind.toString())
151+
.setOwnerTable(ownerTable).setFieldName(fieldName).setCondition(content).build();
152+
predicatesInfoList.add(predicateInfo);
153+
}
151154
}
152155
}
153156

core/src/main/java/com/dtstack/flink/sql/util/FieldReplaceUtil.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,22 @@ private static SqlNode replaceNodeInfo(SqlNode groupNode,
146146
}
147147

148148
return sqlBasicCall;
149-
}else{
149+
} else if (groupNode.getKind() == CASE) {
150+
SqlCase sqlCase = (SqlCase) groupNode;
151+
152+
for (int i = 0; i < sqlCase.getWhenOperands().size(); i++) {
153+
SqlNode sqlNode = sqlCase.getWhenOperands().getList().get(i);
154+
SqlNode replaceNode = replaceSelectFieldName(sqlNode, oldTbName, newTbName, mappingField);
155+
sqlCase.getWhenOperands().set(i,replaceNode);
156+
}
157+
158+
for (int i = 0; i < sqlCase.getThenOperands().size(); i++) {
159+
SqlNode sqlNode = sqlCase.getThenOperands().getList().get(i);
160+
SqlNode replaceNode = replaceSelectFieldName(sqlNode, oldTbName, newTbName, mappingField);
161+
sqlCase.getThenOperands().set(i,replaceNode);
162+
}
163+
return sqlCase;
164+
} else {
150165
return groupNode;
151166
}
152167
}

core/src/main/java/com/dtstack/flink/sql/util/TableUtils.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,12 @@ private static void replaceConditionNode(SqlNode selectNode, String oldTbName, S
568568
}else if(selectNode.getKind() == OTHER){
569569
//不处理
570570
return;
571-
}else{
571+
} else if (selectNode.getKind() == CASE) {
572+
SqlCase sqlCase = (SqlCase) selectNode;
573+
574+
sqlCase.getWhenOperands().getList().forEach(sqlNode -> replaceConditionNode(sqlNode, oldTbName, newTbName, fieldReplaceRef));
575+
sqlCase.getThenOperands().getList().forEach(sqlNode -> replaceConditionNode(sqlNode, oldTbName, newTbName, fieldReplaceRef));
576+
} else {
572577
throw new RuntimeException(String.format("not support node kind of %s to replace name now.", selectNode.getKind()));
573578
}
574579
}
@@ -579,14 +584,14 @@ private static void replaceConditionNode(SqlNode selectNode, String oldTbName, S
579584
* @param fieldInfos
580585
*/
581586
public static void getConditionRefTable(SqlNode selectNode, Set<String> fieldInfos) {
582-
if(selectNode.getKind() == IDENTIFIER){
587+
if (selectNode.getKind() == IDENTIFIER) {
583588
SqlIdentifier sqlIdentifier = (SqlIdentifier) selectNode;
584589

585590
fieldInfos.add(sqlIdentifier.toString());
586591
return;
587-
}else if(selectNode.getKind() == LITERAL || selectNode.getKind() == LITERAL_CHAIN){//字面含义
592+
} else if (selectNode.getKind() == LITERAL || selectNode.getKind() == LITERAL_CHAIN) {//字面含义
588593
return;
589-
}else if( AGGREGATE.contains(selectNode.getKind())
594+
} else if (AGGREGATE.contains(selectNode.getKind())
590595
|| AVG_AGG_FUNCTIONS.contains(selectNode.getKind())
591596
|| COMPARISON.contains(selectNode.getKind())
592597
|| selectNode.getKind() == OTHER_FUNCTION
@@ -616,26 +621,31 @@ public static void getConditionRefTable(SqlNode selectNode, Set<String> fieldInf
616621
|| selectNode.getKind() == TIMESTAMP_DIFF
617622
|| selectNode.getKind() == LIKE
618623

619-
){
624+
) {
620625
SqlBasicCall sqlBasicCall = (SqlBasicCall) selectNode;
621-
for(int i=0; i<sqlBasicCall.getOperands().length; i++){
626+
for (int i = 0; i < sqlBasicCall.getOperands().length; i++) {
622627
SqlNode sqlNode = sqlBasicCall.getOperands()[i];
623-
if(sqlNode instanceof SqlLiteral){
628+
if (sqlNode instanceof SqlLiteral) {
624629
continue;
625630
}
626631

627-
if(sqlNode instanceof SqlDataTypeSpec){
632+
if (sqlNode instanceof SqlDataTypeSpec) {
628633
continue;
629634
}
630635

631636
getConditionRefTable(sqlNode, fieldInfos);
632637
}
633638

634639
return;
635-
}else if(selectNode.getKind() == OTHER){
640+
} else if (selectNode.getKind() == OTHER) {
636641
//不处理
637642
return;
638-
}else{
643+
} else if (selectNode.getKind() == CASE) {
644+
SqlCase sqlCase = (SqlCase) selectNode;
645+
646+
sqlCase.getWhenOperands().getList().forEach(sqlNode -> getConditionRefTable(sqlNode, fieldInfos));
647+
sqlCase.getThenOperands().getList().forEach(sqlNode -> getConditionRefTable(sqlNode, fieldInfos));
648+
} else {
639649
throw new RuntimeException(String.format("not support node kind of %s to replace name now.", selectNode.getKind()));
640650
}
641651
}

0 commit comments

Comments
 (0)