Skip to content

Commit 393c98e

Browse files
author
dapeng
committed
去除多余日志logger和采用全局计数器
1 parent 04550b0 commit 393c98e

File tree

2 files changed

+4
-10
lines changed

2 files changed

+4
-10
lines changed

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import java.util.concurrent.ScheduledFuture;
5252
import java.util.concurrent.ThreadPoolExecutor;
5353
import java.util.concurrent.TimeUnit;
54-
import java.util.concurrent.atomic.AtomicLong;
5554

5655
/**
5756
* All interfaces inherit naming rules: type + "AsyncReqRow" such as == "MysqlAsyncReqRow
@@ -65,7 +64,6 @@ public abstract class BaseAsyncReqRow extends RichAsyncFunction<CRow, CRow> impl
6564
private static final Logger LOG = LoggerFactory.getLogger(BaseAsyncReqRow.class);
6665
private static final long serialVersionUID = 2098635244857937717L;
6766
private RuntimeContext runtimeContext;
68-
private final static AtomicLong FAIL_NUM = new AtomicLong(0);
6967
private static int TIMEOUT_LOG_FLUSH_NUM = 10;
7068
private int timeOutNum = 0;
7169
protected BaseSideInfo sideInfo;
@@ -257,9 +255,9 @@ protected void cancelTimerWhenComplete(ResultFuture<CRow> resultFuture, Schedule
257255
}
258256

259257
protected void dealFillDataError(CRow input, ResultFuture<CRow> resultFuture, Throwable e) {
260-
if(FAIL_NUM.incrementAndGet() > sideInfo.getSideTableInfo().getAsyncFailMaxNum(Long.MAX_VALUE)){
258+
parseErrorRecords.inc();
259+
if(parseErrorRecords.getCount() > sideInfo.getSideTableInfo().getAsyncFailMaxNum(Long.MAX_VALUE)){
261260
LOG.info("dealFillDataError", e);
262-
parseErrorRecords.inc();
263261
resultFuture.completeExceptionally(e);
264262
} else {
265263
dealMissKey(input, resultFuture);

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,10 @@ public class RdbAsyncReqRow extends BaseAsyncReqRow {
7676

7777
private final static AtomicBoolean CONN_STATUS = new AtomicBoolean(true);
7878

79-
private Logger logger = LoggerFactory.getLogger(getClass());
80-
8179
public RdbAsyncReqRow(BaseSideInfo sideInfo) {
8280
super(sideInfo);
8381
}
8482

85-
8683
@Override
8784
protected void preInvoke(CRow input, ResultFuture<CRow> resultFuture){
8885

@@ -93,7 +90,6 @@ public void handleAsyncInvoke(Map<String, Object> inputParams, CRow input, Resul
9390

9491
AtomicLong networkLogCounter = new AtomicLong(0L);
9592
while (!CONN_STATUS.get()){//network is unhealth
96-
//todo:统一计数
9793
if(networkLogCounter.getAndIncrement() % 1000 == 0){
9894
LOG.info("network unhealth to block task");
9995
}
@@ -122,7 +118,7 @@ private void connectWithRetry(Map<String, Object> inputParams, CRow input, Resul
122118
try {
123119
if(conn.failed()){
124120
if(failCounter.getAndIncrement() % 1000 == 0){
125-
logger.error("getConnection error", conn.cause());
121+
LOG.error("getConnection error", conn.cause());
126122
}
127123
if(failCounter.get() >= sideInfo.getSideTableInfo().getAsyncFailMaxNum(3L)){
128124
dealFillDataError(input, resultFuture, conn.cause());
@@ -146,7 +142,7 @@ private void connectWithRetry(Map<String, Object> inputParams, CRow input, Resul
146142
try {
147143
latch.wait();
148144
} catch (InterruptedException e) {
149-
logger.error("", e);
145+
LOG.error("", e);
150146
}
151147
}
152148

0 commit comments

Comments
 (0)