Skip to content

Commit 04550b0

Browse files
author
dapeng
committed
处理异常策略
1 parent 330be96 commit 04550b0

File tree

3 files changed

+21
-34
lines changed

3 files changed

+21
-34
lines changed

core/src/main/java/com/dtstack/flink/sql/side/AbstractSideTableInfo.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,6 @@ public abstract class AbstractSideTableInfo extends AbstractTableInfo implements
5454

5555
public static final String ASYNC_TIMEOUT_KEY = "asyncTimeout";
5656

57-
public static final String ASYNC_TIMEOUT_NUM_KEY = "asyncTimeoutNum";
58-
5957
public static final String ASYNC_FAIL_MAX_NUM_KEY = "asyncFailMaxNum";
6058

6159
private String cacheType = "none";//None or LRU or ALL

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.concurrent.ScheduledFuture;
5252
import java.util.concurrent.ThreadPoolExecutor;
5353
import java.util.concurrent.TimeUnit;
54+
import java.util.concurrent.atomic.AtomicLong;
5455

5556
/**
5657
* All interfaces inherit naming rules: type + "AsyncReqRow" such as == "MysqlAsyncReqRow
@@ -64,10 +65,9 @@ public abstract class BaseAsyncReqRow extends RichAsyncFunction<CRow, CRow> impl
6465
private static final Logger LOG = LoggerFactory.getLogger(BaseAsyncReqRow.class);
6566
private static final long serialVersionUID = 2098635244857937717L;
6667
private RuntimeContext runtimeContext;
67-
68+
private final static AtomicLong FAIL_NUM = new AtomicLong(0);
6869
private static int TIMEOUT_LOG_FLUSH_NUM = 10;
6970
private int timeOutNum = 0;
70-
7171
protected BaseSideInfo sideInfo;
7272
protected transient Counter parseErrorRecords;
7373

@@ -125,7 +125,7 @@ protected void dealMissKey(CRow input, ResultFuture<CRow> resultFuture){
125125
Row row = fillData(input.row(), null);
126126
resultFuture.complete(Collections.singleton(new CRow(row, input.change())));
127127
} catch (Exception e) {
128-
dealFillDataError(resultFuture, e, input);
128+
dealFillDataError(input, resultFuture, e);
129129
}
130130
}else{
131131
resultFuture.complete(null);
@@ -205,7 +205,7 @@ private void invokeWithCache(Map<String, Object> inputParams, CRow input, Result
205205
Row row = fillData(input.row(), val);
206206
resultFuture.complete(Collections.singleton(new CRow(row, input.change())));
207207
} catch (Exception e) {
208-
dealFillDataError(resultFuture, e, input);
208+
dealFillDataError(input, resultFuture, e);
209209
}
210210
} else if (ECacheContentType.MultiLine == val.getType()) {
211211
try {
@@ -216,7 +216,7 @@ private void invokeWithCache(Map<String, Object> inputParams, CRow input, Result
216216
}
217217
resultFuture.complete(rowList);
218218
} catch (Exception e) {
219-
dealFillDataError(resultFuture, e, input);
219+
dealFillDataError(input, resultFuture, e);
220220
}
221221
} else {
222222
resultFuture.completeExceptionally(new RuntimeException("not support cache obj type " + val.getType()));
@@ -256,11 +256,14 @@ protected void cancelTimerWhenComplete(ResultFuture<CRow> resultFuture, Schedule
256256
}
257257
}
258258

259-
protected void dealFillDataError(ResultFuture<CRow> resultFuture, Exception e, Object sourceData) {
260-
LOG.debug("source data {} join side table error ", sourceData);
261-
LOG.debug("async buid row error..{}", e);
262-
parseErrorRecords.inc();
263-
resultFuture.complete(Collections.emptyList());
259+
protected void dealFillDataError(CRow input, ResultFuture<CRow> resultFuture, Throwable e) {
260+
if(FAIL_NUM.incrementAndGet() > sideInfo.getSideTableInfo().getAsyncFailMaxNum(Long.MAX_VALUE)){
261+
LOG.info("dealFillDataError", e);
262+
parseErrorRecords.inc();
263+
resultFuture.completeExceptionally(e);
264+
} else {
265+
dealMissKey(input, resultFuture);
266+
}
264267
}
265268

266269
@Override

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package com.dtstack.flink.sql.side.rdb.async;
2121

2222
import com.dtstack.flink.sql.enums.ECacheContentType;
23-
import com.dtstack.flink.sql.metric.MetricConstant;
2423
import com.dtstack.flink.sql.side.BaseAsyncReqRow;
2524
import com.dtstack.flink.sql.side.BaseSideInfo;
2625
import com.dtstack.flink.sql.side.CacheMissVal;
@@ -30,9 +29,7 @@
3029
import io.vertx.core.json.JsonArray;
3130
import io.vertx.ext.sql.SQLClient;
3231
import io.vertx.ext.sql.SQLConnection;
33-
import org.apache.calcite.sql.JoinType;
3432
import org.apache.commons.lang3.StringUtils;
35-
import org.apache.flink.metrics.Counter;
3633
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3734
import org.apache.flink.table.runtime.types.CRow;
3835
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
@@ -79,12 +76,8 @@ public class RdbAsyncReqRow extends BaseAsyncReqRow {
7976

8077
private final static AtomicBoolean CONN_STATUS = new AtomicBoolean(true);
8178

82-
private final static AtomicLong FAIL_NUM = new AtomicLong(0);
83-
8479
private Logger logger = LoggerFactory.getLogger(getClass());
8580

86-
private Counter counter = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_SIDE_PARSE_ERROR_RECORDS);
87-
8881
public RdbAsyncReqRow(BaseSideInfo sideInfo) {
8982
super(sideInfo);
9083
}
@@ -98,7 +91,12 @@ protected void preInvoke(CRow input, ResultFuture<CRow> resultFuture){
9891
@Override
9992
public void handleAsyncInvoke(Map<String, Object> inputParams, CRow input, ResultFuture<CRow> resultFuture) throws Exception {
10093

94+
AtomicLong networkLogCounter = new AtomicLong(0L);
10195
while (!CONN_STATUS.get()){//network is unhealth
96+
//todo:统一计数
97+
if(networkLogCounter.getAndIncrement() % 1000 == 0){
98+
LOG.info("network unhealth to block task");
99+
}
102100
Thread.sleep(100);
103101
}
104102
rdbSqlClient.getConnection(conn -> {
@@ -127,12 +125,7 @@ private void connectWithRetry(Map<String, Object> inputParams, CRow input, Resul
127125
logger.error("getConnection error", conn.cause());
128126
}
129127
if(failCounter.get() >= sideInfo.getSideTableInfo().getAsyncFailMaxNum(3L)){
130-
if(FAIL_NUM.incrementAndGet() > sideInfo.getSideTableInfo().getAsyncFailMaxNum(Long.MAX_VALUE)){
131-
counter.inc();
132-
resultFuture.completeExceptionally(conn.cause());
133-
} else {
134-
dealMissKey(input, resultFuture);
135-
}
128+
dealFillDataError(input, resultFuture, conn.cause());
136129
finishFlag.set(true);
137130
}
138131
conn.result().close();
@@ -144,8 +137,7 @@ private void connectWithRetry(Map<String, Object> inputParams, CRow input, Resul
144137
handleQuery(conn.result(), inputParams, input, resultFuture);
145138
finishFlag.set(true);
146139
} catch (Exception e) {
147-
dealFillDataError(resultFuture, e, null);
148-
logger.error("", e);
140+
dealFillDataError(input, resultFuture, e);
149141
} finally {
150142
latch.countDown();
151143
}
@@ -210,13 +202,7 @@ private void handleQuery(SQLConnection connection,Map<String, Object> inputParam
210202
JsonArray params = new JsonArray(Lists.newArrayList(inputParams.values()));
211203
connection.queryWithParams(sideInfo.getSqlCondition(), params, rs -> {
212204
if (rs.failed()) {
213-
if(FAIL_NUM.incrementAndGet() > sideInfo.getSideTableInfo().getAsyncFailMaxNum(Long.MAX_VALUE)){
214-
LOG.error("Cannot retrieve the data from the database", rs.cause());
215-
counter.inc();
216-
resultFuture.completeExceptionally(rs.cause());
217-
} else {
218-
dealMissKey(input, resultFuture);
219-
}
205+
dealFillDataError(input, resultFuture, rs.cause());
220206
return;
221207
}
222208

0 commit comments

Comments
 (0)