Skip to content

Commit 2051383

Browse files
author
dapeng
committed
rdb 连接重试配置项
1 parent c81cbb2 commit 2051383

File tree

3 files changed

+19
-6
lines changed

3 files changed

+19
-6
lines changed

core/src/main/java/com/dtstack/flink/sql/side/AbstractSideTableInfo.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ public abstract class AbstractSideTableInfo extends AbstractTableInfo implements
5555
public static final String ASYNC_TIMEOUT_KEY = "asyncTimeout";
5656

5757
public static final String ASYNC_FAIL_MAX_NUM_KEY = "asyncFailMaxNum";
58-
58+
59+
public static final String CONNECT_RETRY_MAX_NUM_KEY = "connectRetryMaxNum";
60+
5961
public static final String ASYNC_REQ_POOL_KEY = "asyncPoolSize";
6062

6163
private String cacheType = "none";
@@ -79,6 +81,8 @@ public abstract class AbstractSideTableInfo extends AbstractTableInfo implements
7981

8082
private Long asyncFailMaxNum;
8183

84+
private Integer connectRetryMaxNum;
85+
8286
private List<PredicateInfo> predicateInfoes = Lists.newArrayList();
8387

8488
public RowTypeInfo getRowTypeInfo(){
@@ -164,6 +168,7 @@ public void setAsyncFailMaxNum(Long asyncFailMaxNum) {
164168
this.asyncFailMaxNum = asyncFailMaxNum;
165169
}
166170

171+
167172
public int getAsyncPoolSize() {
168173
return asyncPoolSize;
169174
}
@@ -172,6 +177,14 @@ public void setAsyncPoolSize(int asyncPoolSize) {
172177
this.asyncPoolSize = asyncPoolSize;
173178
}
174179

180+
181+
public Integer getConnectRetryMaxNum(Integer defaultValue) {
182+
return Objects.isNull(connectRetryMaxNum) ? defaultValue : connectRetryMaxNum;
183+
}
184+
185+
public void setConnectRetryMaxNum(Integer connectRetryMaxNum) {
186+
this.connectRetryMaxNum = connectRetryMaxNum;
187+
}
175188
@Override
176189
public String toString() {
177190
return "Cache Info{" +

core/src/main/java/com/dtstack/flink/sql/table/AbstractSideTableParser.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,10 @@ protected void parseCacheProp(AbstractSideTableInfo sideTableInfo, Map<String, O
110110
sideTableInfo.setAsyncTimeout(asyncTimeout);
111111
}
112112

113-
if(props.containsKey(AbstractSideTableInfo.ASYNC_FAIL_MAX_NUM_KEY.toLowerCase())){
114-
Long asyncFailNum = MathUtil.getLongVal(props.get(AbstractSideTableInfo.ASYNC_FAIL_MAX_NUM_KEY.toLowerCase()));
115-
if (asyncFailNum > 0){
116-
sideTableInfo.setAsyncFailMaxNum(asyncFailNum);
113+
if(props.containsKey(AbstractSideTableInfo.CONNECT_RETRY_MAX_NUM_KEY.toLowerCase())){
114+
Integer connectRetryMaxNum = MathUtil.getIntegerVal(props.get(AbstractSideTableInfo.CONNECT_RETRY_MAX_NUM_KEY.toLowerCase()));
115+
if (connectRetryMaxNum > 0){
116+
sideTableInfo.setConnectRetryMaxNum(connectRetryMaxNum);
117117
}
118118
}
119119
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ private void connectWithRetry(Map<String, Object> inputParams, CRow input, Resul
137137
if(failCounter.getAndIncrement() % 1000 == 0){
138138
LOG.error("getConnection error", conn.cause());
139139
}
140-
if(failCounter.get() >= sideInfo.getSideTableInfo().getAsyncFailMaxNum(3L)){
140+
if(failCounter.get() >= sideInfo.getSideTableInfo().getConnectRetryMaxNum(3)){
141141
dealFillDataError(input, resultFuture, conn.cause());
142142
finishFlag.set(true);
143143
}

0 commit comments

Comments
 (0)