Skip to content

Commit 64c2050

Browse files
committed
deal rdb sink error
1 parent a7cff4d commit 64c2050

File tree

3 files changed

+135
-22
lines changed

3 files changed

+135
-22
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.enums;
20+
21+
import org.apache.commons.lang.StringUtils;
22+
import org.apache.commons.lang.exception.ExceptionUtils;
23+
24+
import java.util.Arrays;
25+
26+
/**
27+
* Date: 2020/4/2
28+
* Company: www.dtstack.com
29+
* @author maqi
30+
*/
31+
public enum EConnectionErrorCode {
32+
ERROR_NOT_MATCH(0, "错误信息未匹配", new String[]{}),
33+
CONN_DB_INVALID(1, "数据库连接失效,请重新打开", new String[]{"the last packet successfully received from the server was"}),
34+
CONN_DB_FAILED(2, "数据库连接失败,请检查用户名或密码是否正确", new String[]{"Access denied for user"}),
35+
DB_TABLE_NOT_EXIST(3, "操作的表名不存在", new String[]{"doesn't exist"});
36+
37+
private int code;
38+
private String description;
39+
private String[] baseErrorInfo;
40+
41+
EConnectionErrorCode(int code, String description, String[] baseErrorInfo) {
42+
this.code = code;
43+
this.description = description;
44+
this.baseErrorInfo = baseErrorInfo;
45+
}
46+
47+
48+
public static EConnectionErrorCode resolveErrorCodeFromException(Throwable e) {
49+
final String stackErrorMsg = ExceptionUtils.getFullStackTrace(e);
50+
return Arrays.stream(values())
51+
.filter(errorCode -> matchKnowError(errorCode, stackErrorMsg))
52+
.findAny()
53+
.orElse(ERROR_NOT_MATCH);
54+
}
55+
56+
public static boolean matchKnowError(EConnectionErrorCode errorCode, String errorMsg) {
57+
return Arrays.stream(errorCode.baseErrorInfo)
58+
.filter(baseInfo -> StringUtils.containsIgnoreCase(errorMsg, baseInfo))
59+
.findAny()
60+
.isPresent();
61+
}
62+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.side;
20+
21+
import com.dtstack.flink.sql.enums.EConnectionErrorCode;
22+
import org.junit.Assert;
23+
import org.junit.Test;
24+
25+
/**
26+
* Date: 2020/4/2
27+
* Company: www.dtstack.com
28+
* @author maqi
29+
*/
30+
public class EConnectionErrorCodeTest {
31+
32+
@Test
33+
public void testResolveErrorCodeFromException(){
34+
EConnectionErrorCode errorCode =
35+
EConnectionErrorCode.resolveErrorCodeFromException(new Exception("The last packet successfully received from the server was 179 milliseconds"));
36+
37+
Assert.assertEquals(errorCode, EConnectionErrorCode.CONN_DB_INVALID);
38+
}
39+
}

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.sink.rdb.format;
2020

21+
import com.dtstack.flink.sql.enums.EConnectionErrorCode;
2122
import com.dtstack.flink.sql.sink.rdb.RdbSink;
2223
import com.dtstack.flink.sql.util.JDBCUtils;
2324
import org.apache.commons.lang3.StringUtils;
@@ -181,7 +182,7 @@ public void writeRecord(Tuple2 tuple2) {
181182
}
182183

183184

184-
private void insertWrite(Row row) {
185+
private void insertWrite(Row row) {
185186
try {
186187
if (batchNum == 1) {
187188
writeSingleRecord(row);
@@ -201,8 +202,6 @@ private void insertWrite(Row row) {
201202
} else {
202203
outDirtyRecords.inc(batchNum == 1 ? batchNum : rows.size());
203204
}
204-
205-
206205
}
207206

208207
}
@@ -213,7 +212,7 @@ private void writeSingleRecord(Row row) {
213212
upload.executeUpdate();
214213
dbConn.commit();
215214
} catch (SQLException e) {
216-
215+
dealSQLException(e);
217216
if (outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0 || LOG.isDebugEnabled()) {
218217
LOG.error("record insert failed,dirty record num:{}, current row:{}", outDirtyRecords.getCount(), row.toString());
219218
LOG.error("", e);
@@ -315,46 +314,59 @@ private void updatePreparedStmt(Row row, PreparedStatement pstmt) throws SQLExce
315314

316315
private synchronized void submitExecuteBatch() {
317316
try {
318-
checkConnectionOpen();
317+
regularlyCheckConnection();
319318
this.upload.executeBatch();
320319
dbConn.commit();
321320
} catch (SQLException e) {
321+
LOG.warn("submitExecuteBatch error {}", e);
322322
try {
323323
dbConn.rollback();
324324
} catch (SQLException e1) {
325-
LOG.error("rollback data error !", e);
325+
dealSQLException(e1);
326326
}
327-
328327
rows.forEach(this::writeSingleRecord);
329328
} finally {
330329
rows.clear();
331330
}
332331
}
333332

334-
private void checkConnectionOpen() {
335-
LOG.info("test db connection Valid check !");
333+
private void dealSQLException(Exception e) {
334+
EConnectionErrorCode errorCode = EConnectionErrorCode.resolveErrorCodeFromException(e);
335+
switch (errorCode) {
336+
case CONN_DB_INVALID:
337+
reconnection();
338+
break;
339+
case CONN_DB_FAILED:
340+
case DB_TABLE_NOT_EXIST:
341+
throw new RuntimeException(e);
342+
default:
343+
}
344+
}
345+
346+
347+
private void regularlyCheckConnection() throws SQLException {
336348
checkTimes++;
337349
if (checkTimes % CONNECTION_CHECK_FREQUENCY != 0) {
338350
return;
339351
}
340352
LOG.warn("db connection Valid check !");
341-
try {
342-
if (dbConn.isClosed() || !dbConn.isValid(100)) {
343-
LOG.info("db connection reconnect..");
344-
dbConn = establishConnection();
345-
upload = dbConn.prepareStatement(insertQuery);
346-
this.dbConn = dbConn;
347-
}
348-
} catch (SQLException e) {
349-
LOG.error("check connection open failed..", e);
350-
} catch (ClassNotFoundException e) {
351-
LOG.error("load jdbc class error when reconnect db..", e);
352-
} catch (IOException e) {
353-
LOG.error("kerberos authentication failed..", e);
353+
if (dbConn.isClosed() || !dbConn.isValid(100)) {
354+
reconnection();
354355
}
355356
checkTimes = 0;
356357
}
357358

359+
public void reconnection() throws RuntimeException {
360+
try {
361+
LOG.info("db connection reconnect..");
362+
dbConn = establishConnection();
363+
upload = dbConn.prepareStatement(insertQuery);
364+
this.dbConn = dbConn;
365+
} catch (Exception e) {
366+
throw new RuntimeException("connection open failed..", e);
367+
}
368+
}
369+
358370
/**
359371
* Executes prepared statement and closes all resources of this instance.
360372
*

0 commit comments

Comments
 (0)