Skip to content

Commit c81cbb2

Browse files
author
dapeng
committed
Merge remote-tracking branch 'origin/feat_1.8_asyncException' into feat_1.8_asyncException_mergeDev
# Conflicts: # rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java
2 parents b0443af + 393c98e commit c81cbb2

File tree

2 files changed

+4
-8
lines changed

2 files changed

+4
-8
lines changed

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import java.util.concurrent.ScheduledFuture;
5252
import java.util.concurrent.ThreadPoolExecutor;
5353
import java.util.concurrent.TimeUnit;
54-
import java.util.concurrent.atomic.AtomicLong;
5554

5655
/**
5756
* All interfaces inherit naming rules: type + "AsyncReqRow" such as == "MysqlAsyncReqRow
@@ -65,7 +64,6 @@ public abstract class BaseAsyncReqRow extends RichAsyncFunction<CRow, CRow> impl
6564
private static final Logger LOG = LoggerFactory.getLogger(BaseAsyncReqRow.class);
6665
private static final long serialVersionUID = 2098635244857937717L;
6766
private RuntimeContext runtimeContext;
68-
private final static AtomicLong FAIL_NUM = new AtomicLong(0);
6967
private static int TIMEOUT_LOG_FLUSH_NUM = 10;
7068
private int timeOutNum = 0;
7169
protected BaseSideInfo sideInfo;
@@ -258,9 +256,9 @@ protected void cancelTimerWhenComplete(ResultFuture<CRow> resultFuture, Schedule
258256
}
259257

260258
protected void dealFillDataError(CRow input, ResultFuture<CRow> resultFuture, Throwable e) {
261-
if(FAIL_NUM.incrementAndGet() > sideInfo.getSideTableInfo().getAsyncFailMaxNum(Long.MAX_VALUE)){
259+
parseErrorRecords.inc();
260+
if(parseErrorRecords.getCount() > sideInfo.getSideTableInfo().getAsyncFailMaxNum(Long.MAX_VALUE)){
262261
LOG.info("dealFillDataError", e);
263-
parseErrorRecords.inc();
264262
resultFuture.completeExceptionally(e);
265263
} else {
266264
dealMissKey(input, resultFuture);

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,6 @@ public class RdbAsyncReqRow extends BaseAsyncReqRow {
8484

8585
private final static AtomicBoolean CONN_STATUS = new AtomicBoolean(true);
8686

87-
private Logger logger = LoggerFactory.getLogger(getClass());
88-
8987
public RdbAsyncReqRow(BaseSideInfo sideInfo) {
9088
super(sideInfo);
9189
init(sideInfo);
@@ -137,7 +135,7 @@ private void connectWithRetry(Map<String, Object> inputParams, CRow input, Resul
137135
try {
138136
if(conn.failed()){
139137
if(failCounter.getAndIncrement() % 1000 == 0){
140-
logger.error("getConnection error", conn.cause());
138+
LOG.error("getConnection error", conn.cause());
141139
}
142140
if(failCounter.get() >= sideInfo.getSideTableInfo().getAsyncFailMaxNum(3L)){
143141
dealFillDataError(input, resultFuture, conn.cause());
@@ -161,7 +159,7 @@ private void connectWithRetry(Map<String, Object> inputParams, CRow input, Resul
161159
try {
162160
latch.wait();
163161
} catch (InterruptedException e) {
164-
logger.error("", e);
162+
LOG.error("", e);
165163
}
166164
}
167165

0 commit comments

Comments
 (0)