Skip to content

Commit a8947ec

Browse files
author
dapeng
committed
修改阻塞逻辑
1 parent 4e8a00e commit a8947ec

File tree

1 file changed

+17
-12
lines changed

1 file changed

+17
-12
lines changed

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.Collection;
4444
import java.util.List;
4545
import java.util.Map;
46+
import java.util.concurrent.CountDownLatch;
4647
import java.util.concurrent.Future;
4748
import java.util.concurrent.ScheduledFuture;
4849
import java.util.concurrent.atomic.AtomicBoolean;
@@ -116,9 +117,8 @@ private void connectWithRetry(Map<String, Object> inputParams, CRow input, Resul
116117
AtomicInteger failCounter = new AtomicInteger(0);
117118
AtomicBoolean finishFlag = new AtomicBoolean(false);
118119
while(!finishFlag.get()){
119-
AtomicBoolean connFinish = new AtomicBoolean(false);
120+
CountDownLatch latch = new CountDownLatch(1);
120121
rdbSqlClient.getConnection(conn -> {
121-
connFinish.set(true);
122122
if(conn.failed()){
123123
if(failCounter.getAndIncrement() % 1000 == 0){
124124
logger.error("getConnection error", conn.cause());
@@ -127,22 +127,27 @@ private void connectWithRetry(Map<String, Object> inputParams, CRow input, Resul
127127
resultFuture.completeExceptionally(conn.cause());
128128
finishFlag.set(true);
129129
}
130+
latch.countDown();
130131
conn.result().close();
131132
return;
132133
}
133-
CONN_STATUS.set(true);
134-
ScheduledFuture<?> timerFuture = registerTimer(input, resultFuture);
135-
cancelTimerWhenComplete(resultFuture, timerFuture);
136-
handleQuery(conn.result(), inputParams, input, resultFuture);
137-
finishFlag.set(true);
138-
});
139-
while(!connFinish.get()){
140134
try {
141-
Thread.sleep(50);
142-
} catch (InterruptedException e){
135+
CONN_STATUS.set(true);
136+
ScheduledFuture<?> timerFuture = registerTimer(input, resultFuture);
137+
cancelTimerWhenComplete(resultFuture, timerFuture);
138+
handleQuery(conn.result(), inputParams, input, resultFuture);
139+
finishFlag.set(true);
140+
} catch (Exception e) {
143141
logger.error("", e);
142+
} finally {
143+
latch.countDown();
144144
}
145-
145+
});
146+
//主线程阻塞
147+
try {
148+
latch.wait();
149+
} catch (InterruptedException e) {
150+
logger.error("", e);
146151
}
147152
}
148153

0 commit comments

Comments
 (0)