Skip to content

Commit a7cff4d

Browse files
committed
rdb sink check connection
1 parent 260ddcd commit a7cff4d

File tree

1 file changed

+13
-5
lines changed

1 file changed

+13
-5
lines changed

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ public class RetractJDBCOutputFormat extends DtRichOutputFormat {
5151
private static final long serialVersionUID = 1L;
5252

5353
private static final Logger LOG = LoggerFactory.getLogger(RetractJDBCOutputFormat.class);
54-
54+
private static final int CONNECTION_CHECK_FREQUENCY = 100;
55+
private int checkTimes;
5556
private String username;
5657
private String password;
5758
private String drivername;
@@ -181,7 +182,6 @@ public void writeRecord(Tuple2 tuple2) {
181182

182183

183184
private void insertWrite(Row row) {
184-
checkConnectionOpen(dbConn);
185185
try {
186186
if (batchNum == 1) {
187187
writeSingleRecord(row);
@@ -315,6 +315,7 @@ private void updatePreparedStmt(Row row, PreparedStatement pstmt) throws SQLExce
315315

316316
private synchronized void submitExecuteBatch() {
317317
try {
318+
checkConnectionOpen();
318319
this.upload.executeBatch();
319320
dbConn.commit();
320321
} catch (SQLException e) {
@@ -330,11 +331,17 @@ private synchronized void submitExecuteBatch() {
330331
}
331332
}
332333

333-
private void checkConnectionOpen(Connection dbConn) {
334+
private void checkConnectionOpen() {
335+
LOG.info("test db connection Valid check !");
336+
checkTimes++;
337+
if (checkTimes % CONNECTION_CHECK_FREQUENCY != 0) {
338+
return;
339+
}
340+
LOG.warn("db connection Valid check !");
334341
try {
335-
if (dbConn.isClosed()) {
342+
if (dbConn.isClosed() || !dbConn.isValid(100)) {
336343
LOG.info("db connection reconnect..");
337-
dbConn= establishConnection();
344+
dbConn = establishConnection();
338345
upload = dbConn.prepareStatement(insertQuery);
339346
this.dbConn = dbConn;
340347
}
@@ -345,6 +352,7 @@ private void checkConnectionOpen(Connection dbConn) {
345352
} catch (IOException e) {
346353
LOG.error("kerberos authentication failed..", e);
347354
}
355+
checkTimes = 0;
348356
}
349357

350358
/**

0 commit comments

Comments
 (0)