Skip to content

Commit 698094e

Browse files
author
dapeng
committed
async的超时次数配置
1 parent 0dbf0d4 commit 698094e

File tree

3 files changed

+24
-3
lines changed

3 files changed

+24
-3
lines changed

core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,13 @@ public void timeout(Row input, ResultFuture<Row> resultFuture) throws Exception
125125

126126
//TODO 需要添加数据指标
127127
if(timeOutNum % TIMEOUT_LOG_FLUSH_NUM == 0){
128-
LOG.info("Async function call has timed out. input:" + input.toString());
128+
LOG.info("Async function call has timed out. input:{}, timeOutNum:{}",input.toString(), timeOutNum);
129+
}
130+
timeOutNum ++;
131+
if(timeOutNum > sideInfo.getSideTableInfo().getAsyncTimeoutNumLimit()){
132+
resultFuture.complete(null);
129133
}
130134

131-
timeOutNum++;
132-
resultFuture.complete(null);
133135
}
134136

135137

core/src/main/java/com/dtstack/flink/sql/side/SideTableInfo.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ public abstract class SideTableInfo extends TableInfo implements Serializable {
5353

5454
public static final String ASYNC_TIMEOUT_KEY = "asyncTimeout";
5555

56+
public static final String ASYNC_TIMEOUT_NUM_KEY = "asyncTimeoutNum";
57+
5658
private String cacheType = "none";//None or LRU or ALL
5759

5860
private int cacheSize = 10000;
@@ -63,6 +65,8 @@ public abstract class SideTableInfo extends TableInfo implements Serializable {
6365

6466
private int asyncTimeout=10000;
6567

68+
private int asyncTimeoutNumLimit = 0;
69+
6670
private boolean partitionedJoin = false;
6771

6872
private String cacheMode="ordered";
@@ -143,4 +147,13 @@ public void setPredicateInfoes(List<PredicateInfo> predicateInfoes) {
143147
public List<PredicateInfo> getPredicateInfoes() {
144148
return predicateInfoes;
145149
}
150+
151+
public int getAsyncTimeoutNumLimit() {
152+
return asyncTimeoutNumLimit;
153+
}
154+
155+
public void setAsyncTimeoutNumLimit(int asyncTimeoutNumLimit) {
156+
this.asyncTimeoutNumLimit = asyncTimeoutNumLimit;
157+
}
158+
146159
}

core/src/main/java/com/dtstack/flink/sql/table/AbsSideTableParser.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,12 @@ protected void parseCacheProp(SideTableInfo sideTableInfo, Map<String, Object> p
108108
}
109109
sideTableInfo.setAsyncTimeout(asyncTimeout);
110110
}
111+
if(props.containsKey(SideTableInfo.ASYNC_TIMEOUT_NUM_KEY.toLowerCase())){
112+
Integer asyncTimeoutNum = MathUtil.getIntegerVal(props.get(SideTableInfo.ASYNC_TIMEOUT_NUM_KEY.toLowerCase()));
113+
if (asyncTimeoutNum > 0){
114+
sideTableInfo.setAsyncTimeout(asyncTimeoutNum);
115+
}
116+
}
111117
}
112118
}
113119
}

0 commit comments

Comments
 (0)