Skip to content

Commit 2079ce4

Browse files
author
dapeng
committed
添加對countdown的捕獲
1 parent a8947ec commit 2079ce4

File tree

1 file changed

+11
-12
lines changed

1 file changed

+11
-12
lines changed

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -119,19 +119,18 @@ private void connectWithRetry(Map<String, Object> inputParams, CRow input, Resul
119119
while(!finishFlag.get()){
120120
CountDownLatch latch = new CountDownLatch(1);
121121
rdbSqlClient.getConnection(conn -> {
122-
if(conn.failed()){
123-
if(failCounter.getAndIncrement() % 1000 == 0){
124-
logger.error("getConnection error", conn.cause());
125-
}
126-
if(failCounter.get() >= sideInfo.getSideTableInfo().getAsyncFailMaxNum(3)){
127-
resultFuture.completeExceptionally(conn.cause());
128-
finishFlag.set(true);
129-
}
130-
latch.countDown();
131-
conn.result().close();
132-
return;
133-
}
134122
try {
123+
if(conn.failed()){
124+
if(failCounter.getAndIncrement() % 1000 == 0){
125+
logger.error("getConnection error", conn.cause());
126+
}
127+
if(failCounter.get() >= sideInfo.getSideTableInfo().getAsyncFailMaxNum(3)){
128+
resultFuture.completeExceptionally(conn.cause());
129+
finishFlag.set(true);
130+
}
131+
conn.result().close();
132+
return;
133+
}
135134
CONN_STATUS.set(true);
136135
ScheduledFuture<?> timerFuture = registerTimer(input, resultFuture);
137136
cancelTimerWhenComplete(resultFuture, timerFuture);

0 commit comments

Comments
 (0)