Skip to content

Commit aa9845a

Browse files
author
dapeng
committed
查询根据失败次数和joinType输出
1 parent 2079ce4 commit aa9845a

File tree

4 files changed

+27
-19
lines changed

4 files changed

+27
-19
lines changed

core/src/main/java/com/dtstack/flink/sql/side/AbstractSideTableInfo.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,11 @@ public abstract class AbstractSideTableInfo extends AbstractTableInfo implements
6868

6969
private int asyncTimeout=10000;
7070

71-
private int asyncTimeoutNumLimit = Integer.MAX_VALUE;
72-
7371
private boolean partitionedJoin = false;
7472

7573
private String cacheMode="ordered";
7674

77-
private Integer asyncFailMaxNum;
75+
private Long asyncFailMaxNum;
7876

7977
private List<PredicateInfo> predicateInfoes = Lists.newArrayList();
8078

@@ -153,19 +151,11 @@ public List<PredicateInfo> getPredicateInfoes() {
153151
return predicateInfoes;
154152
}
155153

156-
public int getAsyncTimeoutNumLimit() {
157-
return asyncTimeoutNumLimit;
158-
}
159-
160-
public void setAsyncTimeoutNumLimit(int asyncTimeoutNumLimit) {
161-
this.asyncTimeoutNumLimit = asyncTimeoutNumLimit;
162-
}
163-
164-
public Integer getAsyncFailMaxNum(Integer defaultValue) {
154+
public Long getAsyncFailMaxNum(Long defaultValue) {
165155
return Objects.isNull(asyncFailMaxNum) ? defaultValue : asyncFailMaxNum;
166156
}
167157

168-
public void setAsyncFailMaxNum(Integer asyncFailMaxNum) {
158+
public void setAsyncFailMaxNum(Long asyncFailMaxNum) {
169159
this.asyncFailMaxNum = asyncFailMaxNum;
170160
}
171161

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public void timeout(CRow input, ResultFuture<CRow> resultFuture) throws Exceptio
149149
resultFuture.complete(null);
150150
return;
151151
}
152-
if(timeOutNum > sideInfo.getSideTableInfo().getAsyncFailMaxNum(Integer.MAX_VALUE)){
152+
if(timeOutNum > sideInfo.getSideTableInfo().getAsyncFailMaxNum(Long.MAX_VALUE)){
153153
resultFuture.completeExceptionally(new Exception("Async function call timedoutNum beyond limit."));
154154
return;
155155
}

core/src/main/java/com/dtstack/flink/sql/table/AbstractSideTableParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ protected void parseCacheProp(AbstractSideTableInfo sideTableInfo, Map<String, O
116116
}
117117

118118
if(props.containsKey(AbstractSideTableInfo.ASYNC_FAIL_MAX_NUM_KEY.toLowerCase())){
119-
Integer asyncFailNum = MathUtil.getIntegerVal(props.get(AbstractSideTableInfo.ASYNC_FAIL_MAX_NUM_KEY.toLowerCase()));
119+
Long asyncFailNum = MathUtil.getLongVal(props.get(AbstractSideTableInfo.ASYNC_FAIL_MAX_NUM_KEY.toLowerCase()));
120120
if (asyncFailNum > 0){
121121
sideTableInfo.setAsyncFailMaxNum(asyncFailNum);
122122
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.vertx.ext.sql.SQLClient;
3131
import io.vertx.ext.sql.SQLConnection;
3232
import com.google.common.collect.Lists;
33+
import org.apache.calcite.sql.JoinType;
3334
import org.apache.commons.lang.exception.ExceptionUtils;
3435
import org.apache.commons.lang3.StringUtils;
3536
import org.apache.flink.streaming.api.functions.async.ResultFuture;
@@ -38,16 +39,19 @@
3839
import org.apache.flink.types.Row;
3940
import org.slf4j.Logger;
4041
import org.slf4j.LoggerFactory;
42+
import scala.Int;
4143

4244
import java.sql.Timestamp;
4345
import java.util.Collection;
4446
import java.util.List;
4547
import java.util.Map;
4648
import java.util.concurrent.CountDownLatch;
49+
import java.util.concurrent.ExecutionException;
4750
import java.util.concurrent.Future;
4851
import java.util.concurrent.ScheduledFuture;
4952
import java.util.concurrent.atomic.AtomicBoolean;
5053
import java.util.concurrent.atomic.AtomicInteger;
54+
import java.util.concurrent.atomic.AtomicLong;
5155
import java.util.concurrent.atomic.AtomicReference;
5256

5357
/**
@@ -81,6 +85,8 @@ public class RdbAsyncReqRow extends BaseAsyncReqRow {
8185

8286
private final static AtomicBoolean CONN_STATUS = new AtomicBoolean(true);
8387

88+
private final static AtomicLong TIMOUT_NUM = new AtomicLong(0);
89+
8490
private Logger logger = LoggerFactory.getLogger(getClass());
8591

8692
public RdbAsyncReqRow(BaseSideInfo sideInfo) {
@@ -114,7 +120,7 @@ public void handleAsyncInvoke(Map<String, Object> inputParams, CRow input, Resul
114120
}
115121

116122
private void connectWithRetry(Map<String, Object> inputParams, CRow input, ResultFuture<CRow> resultFuture, SQLClient rdbSqlClient) {
117-
AtomicInteger failCounter = new AtomicInteger(0);
123+
AtomicLong failCounter = new AtomicLong(0);
118124
AtomicBoolean finishFlag = new AtomicBoolean(false);
119125
while(!finishFlag.get()){
120126
CountDownLatch latch = new CountDownLatch(1);
@@ -124,8 +130,8 @@ private void connectWithRetry(Map<String, Object> inputParams, CRow input, Resul
124130
if(failCounter.getAndIncrement() % 1000 == 0){
125131
logger.error("getConnection error", conn.cause());
126132
}
127-
if(failCounter.get() >= sideInfo.getSideTableInfo().getAsyncFailMaxNum(3)){
128-
resultFuture.completeExceptionally(conn.cause());
133+
if(failCounter.get() >= sideInfo.getSideTableInfo().getAsyncFailMaxNum(3L)){
134+
outByJoinType(resultFuture, conn.cause());
129135
finishFlag.set(true);
130136
}
131137
conn.result().close();
@@ -202,8 +208,12 @@ private void handleQuery(SQLConnection connection,Map<String, Object> inputParam
202208
JsonArray params = new JsonArray(Lists.newArrayList(inputParams.values()));
203209
connection.queryWithParams(sideInfo.getSqlCondition(), params, rs -> {
204210
if (rs.failed()) {
211+
if(TIMOUT_NUM.incrementAndGet() > sideInfo.getSideTableInfo().getAsyncFailMaxNum(Long.MAX_VALUE)){
212+
outByJoinType(resultFuture, rs.cause());
213+
return;
214+
}
205215
LOG.error("Cannot retrieve the data from the database", rs.cause());
206-
resultFuture.completeExceptionally(rs.cause());
216+
resultFuture.complete(null);
207217
return;
208218
}
209219

@@ -242,4 +252,12 @@ private void handleQuery(SQLConnection connection,Map<String, Object> inputParam
242252
});
243253
}
244254

255+
private void outByJoinType(ResultFuture<CRow> resultFuture, Throwable e){
256+
if(sideInfo.getJoinType() == JoinType.LEFT){
257+
resultFuture.complete(null);
258+
return;
259+
}
260+
resultFuture.completeExceptionally(e);
261+
}
262+
245263
}

0 commit comments

Comments
 (0)