Skip to content

Commit 92e3056

Browse files
author
gituser
committed
Merge branch 'hotfix_1.8_3.10.x_24550' into 1.8_release_3.10.x
2 parents 8f0b06e + 75adb0b commit 92e3056

File tree

6 files changed

+163
-33
lines changed

6 files changed

+163
-33
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.enums;
20+
21+
import org.apache.commons.lang.StringUtils;
22+
import org.apache.commons.lang.exception.ExceptionUtils;
23+
24+
import java.util.Arrays;
25+
26+
/**
27+
* Date: 2020/4/2
28+
* Company: www.dtstack.com
29+
* @author maqi
30+
*/
31+
public enum EConnectionErrorCode {
32+
ERROR_NOT_MATCH(0, "错误信息未匹配", new String[]{}),
33+
CONN_DB_INVALID(1, "数据库连接失效,请重新打开", new String[]{"the last packet successfully received from the server was", "Zookeeper session has been expired"}),
34+
CONN_DB_FAILED(2, "数据库连接失败,请检查用户名或密码是否正确", new String[]{"Access denied for user"}),
35+
DB_TABLE_NOT_EXIST(3, "操作的表名不存在", new String[]{"doesn't exist"});
36+
37+
private int code;
38+
private String description;
39+
private String[] baseErrorInfo;
40+
41+
EConnectionErrorCode(int code, String description, String[] baseErrorInfo) {
42+
this.code = code;
43+
this.description = description;
44+
this.baseErrorInfo = baseErrorInfo;
45+
}
46+
47+
48+
public static EConnectionErrorCode resolveErrorCodeFromException(Throwable e) {
49+
final String stackErrorMsg = ExceptionUtils.getFullStackTrace(e);
50+
return Arrays.stream(values())
51+
.filter(errorCode -> matchKnowError(errorCode, stackErrorMsg))
52+
.findAny()
53+
.orElse(ERROR_NOT_MATCH);
54+
}
55+
56+
public static boolean matchKnowError(EConnectionErrorCode errorCode, String errorMsg) {
57+
return Arrays.stream(errorCode.baseErrorInfo)
58+
.filter(baseInfo -> StringUtils.containsIgnoreCase(errorMsg, baseInfo))
59+
.findAny()
60+
.isPresent();
61+
}
62+
}

core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.calcite.sql.parser.SqlParser;
2727
import org.apache.commons.lang3.StringUtils;
2828
import com.google.common.collect.Lists;
29+
import org.apache.flink.table.calcite.FlinkPlannerImpl;
2930

3031
import java.util.List;
3132

@@ -53,18 +54,9 @@ public static InsertSqlParser newInstance(){
5354

5455
@Override
5556
public void parseSql(String sql, SqlTree sqlTree) {
56-
SqlParser.Config config = SqlParser
57-
.configBuilder()
58-
.setLex(Lex.MYSQL)
59-
.build();
60-
61-
SqlParser sqlParser = SqlParser.create(sql,config);
62-
SqlNode sqlNode = null;
63-
try {
64-
sqlNode = sqlParser.parseStmt();
65-
} catch (SqlParseException e) {
66-
throw new RuntimeException("", e);
67-
}
57+
58+
FlinkPlannerImpl flinkPlanner = FlinkPlanner.getFlinkPlanner();
59+
SqlNode sqlNode = flinkPlanner.parse(sql);
6860

6961
SqlParseResult sqlParseResult = new SqlParseResult();
7062
parseNode(sqlNode, sqlParseResult);

core/src/main/java/com/dtstack/flink/sql/side/SidePredicatesParser.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.calcite.sql.SqlOperator;
3131
import org.apache.calcite.sql.SqlSelect;
3232
import org.apache.calcite.sql.parser.SqlParseException;
33+
import org.apache.commons.lang3.StringUtils;
3334
import org.apache.flink.table.calcite.FlinkPlannerImpl;
3435

3536
import java.util.List;
@@ -138,16 +139,23 @@ private void extractPredicateInfo(SqlNode whereNode, List<PredicateInfo> predica
138139

139140
private void fillPredicateInfoToList(SqlBasicCall whereNode, List<PredicateInfo> predicatesInfoList, String operatorName, SqlKind operatorKind,
140141
int fieldIndex, int conditionIndex) {
141-
SqlIdentifier fieldFullPath = (SqlIdentifier) whereNode.getOperands()[fieldIndex];
142-
if (fieldFullPath.names.size() == 2) {
143-
String ownerTable = fieldFullPath.names.get(0);
144-
String fieldName = fieldFullPath.names.get(1);
145-
String content = (operatorKind == SqlKind.BETWEEN) ? whereNode.getOperands()[conditionIndex].toString() + " AND " +
146-
whereNode.getOperands()[2].toString() : whereNode.getOperands()[conditionIndex].toString();
147-
148-
PredicateInfo predicateInfo = PredicateInfo.builder().setOperatorName(operatorName).setOperatorKind(operatorKind.toString())
149-
.setOwnerTable(ownerTable).setFieldName(fieldName).setCondition(content).build();
150-
predicatesInfoList.add(predicateInfo);
142+
SqlNode sqlNode = whereNode.getOperands()[fieldIndex];
143+
if (sqlNode.getKind() == SqlKind.IDENTIFIER) {
144+
SqlIdentifier fieldFullPath = (SqlIdentifier) sqlNode;
145+
if (fieldFullPath.names.size() == 2) {
146+
String ownerTable = fieldFullPath.names.get(0);
147+
String fieldName = fieldFullPath.names.get(1);
148+
String content = (operatorKind == SqlKind.BETWEEN) ? whereNode.getOperands()[conditionIndex].toString() + " AND " +
149+
whereNode.getOperands()[2].toString() : whereNode.getOperands()[conditionIndex].toString();
150+
151+
if (StringUtils.containsIgnoreCase(content,SqlKind.CASE.toString())) {
152+
return;
153+
}
154+
155+
PredicateInfo predicateInfo = PredicateInfo.builder().setOperatorName(operatorName).setOperatorKind(operatorKind.toString())
156+
.setOwnerTable(ownerTable).setFieldName(fieldName).setCondition(content).build();
157+
predicatesInfoList.add(predicateInfo);
158+
}
151159
}
152160
}
153161

core/src/main/java/com/dtstack/flink/sql/util/FieldReplaceUtil.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,22 @@ private static SqlNode replaceNodeInfo(SqlNode groupNode,
146146
}
147147

148148
return sqlBasicCall;
149-
}else{
149+
} else if (groupNode.getKind() == CASE) {
150+
SqlCase sqlCase = (SqlCase) groupNode;
151+
152+
for (int i = 0; i < sqlCase.getWhenOperands().size(); i++) {
153+
SqlNode sqlNode = sqlCase.getWhenOperands().getList().get(i);
154+
SqlNode replaceNode = replaceSelectFieldName(sqlNode, oldTbName, newTbName, mappingField);
155+
sqlCase.getWhenOperands().set(i,replaceNode);
156+
}
157+
158+
for (int i = 0; i < sqlCase.getThenOperands().size(); i++) {
159+
SqlNode sqlNode = sqlCase.getThenOperands().getList().get(i);
160+
SqlNode replaceNode = replaceSelectFieldName(sqlNode, oldTbName, newTbName, mappingField);
161+
sqlCase.getThenOperands().set(i,replaceNode);
162+
}
163+
return sqlCase;
164+
} else {
150165
return groupNode;
151166
}
152167
}

core/src/main/java/com/dtstack/flink/sql/util/TableUtils.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,12 @@ private static void replaceConditionNode(SqlNode selectNode, String oldTbName, S
568568
}else if(selectNode.getKind() == OTHER){
569569
//不处理
570570
return;
571-
}else{
571+
} else if (selectNode.getKind() == CASE) {
572+
SqlCase sqlCase = (SqlCase) selectNode;
573+
574+
sqlCase.getWhenOperands().getList().forEach(sqlNode -> replaceConditionNode(sqlNode, oldTbName, newTbName, fieldReplaceRef));
575+
sqlCase.getThenOperands().getList().forEach(sqlNode -> replaceConditionNode(sqlNode, oldTbName, newTbName, fieldReplaceRef));
576+
} else {
572577
throw new RuntimeException(String.format("not support node kind of %s to replace name now.", selectNode.getKind()));
573578
}
574579
}
@@ -579,14 +584,14 @@ private static void replaceConditionNode(SqlNode selectNode, String oldTbName, S
579584
* @param fieldInfos
580585
*/
581586
public static void getConditionRefTable(SqlNode selectNode, Set<String> fieldInfos) {
582-
if(selectNode.getKind() == IDENTIFIER){
587+
if (selectNode.getKind() == IDENTIFIER) {
583588
SqlIdentifier sqlIdentifier = (SqlIdentifier) selectNode;
584589

585590
fieldInfos.add(sqlIdentifier.toString());
586591
return;
587-
}else if(selectNode.getKind() == LITERAL || selectNode.getKind() == LITERAL_CHAIN){//字面含义
592+
} else if (selectNode.getKind() == LITERAL || selectNode.getKind() == LITERAL_CHAIN) {//字面含义
588593
return;
589-
}else if( AGGREGATE.contains(selectNode.getKind())
594+
} else if (AGGREGATE.contains(selectNode.getKind())
590595
|| AVG_AGG_FUNCTIONS.contains(selectNode.getKind())
591596
|| COMPARISON.contains(selectNode.getKind())
592597
|| selectNode.getKind() == OTHER_FUNCTION
@@ -616,26 +621,31 @@ public static void getConditionRefTable(SqlNode selectNode, Set<String> fieldInf
616621
|| selectNode.getKind() == TIMESTAMP_DIFF
617622
|| selectNode.getKind() == LIKE
618623

619-
){
624+
) {
620625
SqlBasicCall sqlBasicCall = (SqlBasicCall) selectNode;
621-
for(int i=0; i<sqlBasicCall.getOperands().length; i++){
626+
for (int i = 0; i < sqlBasicCall.getOperands().length; i++) {
622627
SqlNode sqlNode = sqlBasicCall.getOperands()[i];
623-
if(sqlNode instanceof SqlLiteral){
628+
if (sqlNode instanceof SqlLiteral) {
624629
continue;
625630
}
626631

627-
if(sqlNode instanceof SqlDataTypeSpec){
632+
if (sqlNode instanceof SqlDataTypeSpec) {
628633
continue;
629634
}
630635

631636
getConditionRefTable(sqlNode, fieldInfos);
632637
}
633638

634639
return;
635-
}else if(selectNode.getKind() == OTHER){
640+
} else if (selectNode.getKind() == OTHER) {
636641
//不处理
637642
return;
638-
}else{
643+
} else if (selectNode.getKind() == CASE) {
644+
SqlCase sqlCase = (SqlCase) selectNode;
645+
646+
sqlCase.getWhenOperands().getList().forEach(sqlNode -> getConditionRefTable(sqlNode, fieldInfos));
647+
sqlCase.getThenOperands().getList().forEach(sqlNode -> getConditionRefTable(sqlNode, fieldInfos));
648+
} else {
639649
throw new RuntimeException(String.format("not support node kind of %s to replace name now.", selectNode.getKind()));
640650
}
641651
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.side;
20+
21+
import com.dtstack.flink.sql.enums.EConnectionErrorCode;
22+
import org.junit.Assert;
23+
import org.junit.Test;
24+
25+
/**
26+
* Date: 2020/4/2
27+
* Company: www.dtstack.com
28+
* @author maqi
29+
*/
30+
public class EConnectionErrorCodeTest {
31+
32+
@Test
33+
public void testResolveErrorCodeFromException(){
34+
EConnectionErrorCode errorCode =
35+
EConnectionErrorCode.resolveErrorCodeFromException(new Exception("The last packet successfully received from the server was 179 milliseconds"));
36+
37+
EConnectionErrorCode ckSessionExpired =
38+
EConnectionErrorCode.resolveErrorCodeFromException(new Exception("Excepetion: Zookeeper session has been expired"));
39+
40+
Assert.assertEquals(errorCode, EConnectionErrorCode.CONN_DB_INVALID);
41+
Assert.assertEquals(ckSessionExpired, EConnectionErrorCode.CONN_DB_INVALID);
42+
}
43+
}

0 commit comments

Comments
 (0)