Skip to content

Commit b0443af

Browse files
author
dapeng
committed
Merge branch 'feat_1.8_asyncException' into feat_1.8_asyncException_mergeDev
# Conflicts: # rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java
2 parents a0c2236 + 04550b0 commit b0443af

File tree

2 files changed

+20
-33
lines changed

2 files changed

+20
-33
lines changed

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.concurrent.ScheduledFuture;
5252
import java.util.concurrent.ThreadPoolExecutor;
5353
import java.util.concurrent.TimeUnit;
54+
import java.util.concurrent.atomic.AtomicLong;
5455

5556
/**
5657
* All interfaces inherit naming rules: type + "AsyncReqRow" such as == "MysqlAsyncReqRow
@@ -64,10 +65,9 @@ public abstract class BaseAsyncReqRow extends RichAsyncFunction<CRow, CRow> impl
6465
private static final Logger LOG = LoggerFactory.getLogger(BaseAsyncReqRow.class);
6566
private static final long serialVersionUID = 2098635244857937717L;
6667
private RuntimeContext runtimeContext;
67-
68+
private final static AtomicLong FAIL_NUM = new AtomicLong(0);
6869
private static int TIMEOUT_LOG_FLUSH_NUM = 10;
6970
private int timeOutNum = 0;
70-
7171
protected BaseSideInfo sideInfo;
7272
protected transient Counter parseErrorRecords;
7373

@@ -125,7 +125,7 @@ protected void dealMissKey(CRow input, ResultFuture<CRow> resultFuture){
125125
Row row = fillData(input.row(), null);
126126
resultFuture.complete(Collections.singleton(new CRow(row, input.change())));
127127
} catch (Exception e) {
128-
dealFillDataError(resultFuture, e, input);
128+
dealFillDataError(input, resultFuture, e);
129129
}
130130
}else{
131131
resultFuture.complete(null);
@@ -206,7 +206,7 @@ private void invokeWithCache(Map<String, Object> inputParams, CRow input, Result
206206
Row row = fillData(input.row(), val);
207207
resultFuture.complete(Collections.singleton(new CRow(row, input.change())));
208208
} catch (Exception e) {
209-
dealFillDataError(resultFuture, e, input);
209+
dealFillDataError(input, resultFuture, e);
210210
}
211211
} else if (ECacheContentType.MultiLine == val.getType()) {
212212
try {
@@ -217,7 +217,7 @@ private void invokeWithCache(Map<String, Object> inputParams, CRow input, Result
217217
}
218218
resultFuture.complete(rowList);
219219
} catch (Exception e) {
220-
dealFillDataError(resultFuture, e, input);
220+
dealFillDataError(input, resultFuture, e);
221221
}
222222
} else {
223223
resultFuture.completeExceptionally(new RuntimeException("not support cache obj type " + val.getType()));
@@ -257,11 +257,14 @@ protected void cancelTimerWhenComplete(ResultFuture<CRow> resultFuture, Schedule
257257
}
258258
}
259259

260-
protected void dealFillDataError(ResultFuture<CRow> resultFuture, Exception e, Object sourceData) {
261-
LOG.debug("source data {} join side table error ", sourceData);
262-
LOG.debug("async buid row error..{}", e);
263-
parseErrorRecords.inc();
264-
resultFuture.complete(Collections.emptyList());
260+
protected void dealFillDataError(CRow input, ResultFuture<CRow> resultFuture, Throwable e) {
261+
if(FAIL_NUM.incrementAndGet() > sideInfo.getSideTableInfo().getAsyncFailMaxNum(Long.MAX_VALUE)){
262+
LOG.info("dealFillDataError", e);
263+
parseErrorRecords.inc();
264+
resultFuture.completeExceptionally(e);
265+
} else {
266+
dealMissKey(input, resultFuture);
267+
}
265268
}
266269

267270
@Override

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package com.dtstack.flink.sql.side.rdb.async;
2121

2222
import com.dtstack.flink.sql.enums.ECacheContentType;
23-
import com.dtstack.flink.sql.metric.MetricConstant;
2423
import com.dtstack.flink.sql.side.BaseAsyncReqRow;
2524
import com.dtstack.flink.sql.side.BaseSideInfo;
2625
import com.dtstack.flink.sql.side.CacheMissVal;
@@ -34,10 +33,7 @@
3433
import io.vertx.core.json.JsonObject;
3534
import io.vertx.ext.sql.SQLClient;
3635
import io.vertx.ext.sql.SQLConnection;
37-
import org.apache.calcite.sql.JoinType;
3836
import org.apache.commons.lang3.StringUtils;
39-
import org.apache.flink.configuration.Configuration;
40-
import org.apache.flink.metrics.Counter;
4137
import org.apache.flink.streaming.api.functions.async.ResultFuture;
4238
import org.apache.flink.table.runtime.types.CRow;
4339
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
@@ -88,12 +84,8 @@ public class RdbAsyncReqRow extends BaseAsyncReqRow {
8884

8985
private final static AtomicBoolean CONN_STATUS = new AtomicBoolean(true);
9086

91-
private final static AtomicLong FAIL_NUM = new AtomicLong(0);
92-
9387
private Logger logger = LoggerFactory.getLogger(getClass());
9488

95-
private Counter counter = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_SIDE_PARSE_ERROR_RECORDS);
96-
9789
public RdbAsyncReqRow(BaseSideInfo sideInfo) {
9890
super(sideInfo);
9991
init(sideInfo);
@@ -114,7 +106,11 @@ protected void preInvoke(CRow input, ResultFuture<CRow> resultFuture){
114106
@Override
115107
public void handleAsyncInvoke(Map<String, Object> inputParams, CRow input, ResultFuture<CRow> resultFuture) throws Exception {
116108

109+
AtomicLong networkLogCounter = new AtomicLong(0L);
117110
while (!CONN_STATUS.get()){//network is unhealth
111+
if(networkLogCounter.getAndIncrement() % 1000 == 0){
112+
LOG.info("network unhealth to block task");
113+
}
118114
Thread.sleep(100);
119115
}
120116
Map<String, Object> params = formatInputParam(inputParams);
@@ -144,12 +140,7 @@ private void connectWithRetry(Map<String, Object> inputParams, CRow input, Resul
144140
logger.error("getConnection error", conn.cause());
145141
}
146142
if(failCounter.get() >= sideInfo.getSideTableInfo().getAsyncFailMaxNum(3L)){
147-
if(FAIL_NUM.incrementAndGet() > sideInfo.getSideTableInfo().getAsyncFailMaxNum(Long.MAX_VALUE)){
148-
counter.inc();
149-
resultFuture.completeExceptionally(conn.cause());
150-
} else {
151-
dealMissKey(input, resultFuture);
152-
}
143+
dealFillDataError(input, resultFuture, conn.cause());
153144
finishFlag.set(true);
154145
}
155146
conn.result().close();
@@ -161,8 +152,7 @@ private void connectWithRetry(Map<String, Object> inputParams, CRow input, Resul
161152
handleQuery(conn.result(), inputParams, input, resultFuture);
162153
finishFlag.set(true);
163154
} catch (Exception e) {
164-
dealFillDataError(resultFuture, e, null);
165-
logger.error("", e);
155+
dealFillDataError(input, resultFuture, e);
166156
} finally {
167157
latch.countDown();
168158
}
@@ -228,13 +218,7 @@ private void handleQuery(SQLConnection connection, Map<String, Object> inputPara
228218
JsonArray params = new JsonArray(Lists.newArrayList(inputParams.values()));
229219
connection.queryWithParams(sideInfo.getSqlCondition(), params, rs -> {
230220
if (rs.failed()) {
231-
if(FAIL_NUM.incrementAndGet() > sideInfo.getSideTableInfo().getAsyncFailMaxNum(Long.MAX_VALUE)){
232-
LOG.error("Cannot retrieve the data from the database", rs.cause());
233-
counter.inc();
234-
resultFuture.completeExceptionally(rs.cause());
235-
} else {
236-
dealMissKey(input, resultFuture);
237-
}
221+
dealFillDataError(input, resultFuture, rs.cause());
238222
return;
239223
}
240224

0 commit comments

Comments
 (0)