Skip to content

Commit 6610ce0

Browse files
author
dapeng
committed
Merge branch 'feat_1.8_asyncException_mergeDev' into feat_1.8_asyncException_mergeTest
2 parents 4eddca1 + bbd8cf9 commit 6610ce0

File tree

1 file changed

+5
-6
lines changed

1 file changed

+5
-6
lines changed

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public class RdbAsyncReqRow extends BaseAsyncReqRow {
8282

8383
private transient SQLClient rdbSqlClient;
8484

85-
private final static AtomicBoolean CONN_STATUS = new AtomicBoolean(true);
85+
private AtomicBoolean connectionStatus = new AtomicBoolean(true);
8686

8787
public RdbAsyncReqRow(BaseSideInfo sideInfo) {
8888
super(sideInfo);
@@ -105,7 +105,7 @@ protected void preInvoke(CRow input, ResultFuture<CRow> resultFuture){
105105
public void handleAsyncInvoke(Map<String, Object> inputParams, CRow input, ResultFuture<CRow> resultFuture) throws Exception {
106106

107107
AtomicLong networkLogCounter = new AtomicLong(0L);
108-
while (!CONN_STATUS.get()){//network is unhealth
108+
while (!connectionStatus.get()){//network is unhealth
109109
if(networkLogCounter.getAndIncrement() % 1000 == 0){
110110
LOG.info("network unhealth to block task");
111111
}
@@ -114,11 +114,11 @@ public void handleAsyncInvoke(Map<String, Object> inputParams, CRow input, Resul
114114
Map<String, Object> params = formatInputParam(inputParams);
115115
rdbSqlClient.getConnection(conn -> {
116116
if(conn.failed()){
117-
CONN_STATUS.set(false);
117+
connectionStatus.set(false);
118118
connectWithRetry(params, input, resultFuture, rdbSqlClient);
119119
return;
120120
}
121-
CONN_STATUS.set(true);
121+
connectionStatus.set(true);
122122
ScheduledFuture<?> timerFuture = registerTimer(input, resultFuture);
123123
cancelTimerWhenComplete(resultFuture, timerFuture);
124124
handleQuery(conn.result(), params, input, resultFuture);
@@ -141,10 +141,9 @@ private void connectWithRetry(Map<String, Object> inputParams, CRow input, Resul
141141
resultFuture.completeExceptionally(conn.cause());
142142
finishFlag.set(true);
143143
}
144-
conn.result().close();
145144
return;
146145
}
147-
CONN_STATUS.set(true);
146+
connectionStatus.set(true);
148147
ScheduledFuture<?> timerFuture = registerTimer(input, resultFuture);
149148
cancelTimerWhenComplete(resultFuture, timerFuture);
150149
handleQuery(conn.result(), inputParams, input, resultFuture);

0 commit comments

Comments
 (0)