Skip to content

Commit 4e8a00e

Browse files
author
dapeng
committed
阻塞逻辑重新调整
1 parent 267511a commit 4e8a00e

File tree

1 file changed

+32
-4
lines changed

1 file changed

+32
-4
lines changed

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.Collection;
4444
import java.util.List;
4545
import java.util.Map;
46+
import java.util.concurrent.Future;
4647
import java.util.concurrent.ScheduledFuture;
4748
import java.util.concurrent.atomic.AtomicBoolean;
4849
import java.util.concurrent.atomic.AtomicInteger;
@@ -77,6 +78,8 @@ public class RdbAsyncReqRow extends BaseAsyncReqRow {
7778

7879
private transient SQLClient rdbSqlClient;
7980

81+
private final static AtomicBoolean CONN_STATUS = new AtomicBoolean(true);
82+
8083
private Logger logger = LoggerFactory.getLogger(getClass());
8184

8285
public RdbAsyncReqRow(BaseSideInfo sideInfo) {
@@ -91,13 +94,32 @@ protected void preInvoke(CRow input, ResultFuture<CRow> resultFuture){
9194

9295
@Override
9396
public void handleAsyncInvoke(Map<String, Object> inputParams, CRow input, ResultFuture<CRow> resultFuture) throws Exception {
97+
98+
while (!CONN_STATUS.get()){//network is unhealth
99+
Thread.sleep(100);
100+
}
101+
rdbSqlClient.getConnection(conn -> {
102+
if(conn.failed()){
103+
CONN_STATUS.set(false);
104+
connectWithRetry(inputParams, input, resultFuture, rdbSqlClient);
105+
return;
106+
}
107+
CONN_STATUS.set(true);
108+
ScheduledFuture<?> timerFuture = registerTimer(input, resultFuture);
109+
cancelTimerWhenComplete(resultFuture, timerFuture);
110+
handleQuery(conn.result(), inputParams, input, resultFuture);
111+
});
112+
113+
}
114+
115+
private void connectWithRetry(Map<String, Object> inputParams, CRow input, ResultFuture<CRow> resultFuture, SQLClient rdbSqlClient) {
94116
AtomicInteger failCounter = new AtomicInteger(0);
95117
AtomicBoolean finishFlag = new AtomicBoolean(false);
96118
while(!finishFlag.get()){
97-
AtomicBoolean connectFinish = new AtomicBoolean(false);
119+
AtomicBoolean connFinish = new AtomicBoolean(false);
98120
rdbSqlClient.getConnection(conn -> {
121+
connFinish.set(true);
99122
if(conn.failed()){
100-
connectFinish.set(true);
101123
if(failCounter.getAndIncrement() % 1000 == 0){
102124
logger.error("getConnection error", conn.cause());
103125
}
@@ -108,16 +130,22 @@ public void handleAsyncInvoke(Map<String, Object> inputParams, CRow input, Resul
108130
conn.result().close();
109131
return;
110132
}
133+
CONN_STATUS.set(true);
111134
ScheduledFuture<?> timerFuture = registerTimer(input, resultFuture);
112135
cancelTimerWhenComplete(resultFuture, timerFuture);
113136
handleQuery(conn.result(), inputParams, input, resultFuture);
114137
finishFlag.set(true);
115138
});
139+
while(!connFinish.get()){
140+
try {
141+
Thread.sleep(50);
142+
} catch (InterruptedException e){
143+
logger.error("", e);
144+
}
116145

117-
while(!connectFinish.get()){
118-
Thread.sleep(50);
119146
}
120147
}
148+
121149
}
122150

123151
@Override

0 commit comments

Comments
 (0)