Skip to content

Commit d236c01

Browse files
committed
Merge branch 'feat_async_mergedDev' into 'v1.8.0_dev'
Feat async merged dev See merge request !250
2 parents 70a2c6b + 4099312 commit d236c01

File tree

9 files changed

+37
-12
lines changed

9 files changed

+37
-12
lines changed

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ private void insertWrite(Row row) {
219219
}
220220
} catch (Exception e) {
221221
if(outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0){
222-
LOG.error("record insert failed ..", row.toString().substring(0, 100));
222+
LOG.error("record insert failed, total dirty num:{}, current record:{}", outDirtyRecords.getCount(), row.toString());
223223
LOG.error("", e);
224224
}
225225

clickhouse/clickhouse-side/clickhouse-all-side/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
<configuration>
4242
<artifactSet>
4343
<excludes>
44-
44+
<exclude>org.slf4j</exclude>
4545
</excludes>
4646
</artifactSet>
4747
<filters>

clickhouse/clickhouse-side/clickhouse-async-side/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
<configuration>
4242
<artifactSet>
4343
<excludes>
44-
44+
<exclude>org.slf4j</exclude>
4545
</excludes>
4646
</artifactSet>
4747
<filters>

clickhouse/clickhouse-sink/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
<configuration>
4141
<artifactSet>
4242
<excludes>
43-
43+
<exclude>org.slf4j</exclude>
4444
</excludes>
4545
</artifactSet>
4646
<filters>

core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ public abstract class AsyncReqRow extends RichAsyncFunction<CRow, CRow> implemen
5151
private static final Logger LOG = LoggerFactory.getLogger(AsyncReqRow.class);
5252
private static final long serialVersionUID = 2098635244857937717L;
5353

54+
private static int TIMEOUT_LOG_FLUSH_NUM = 10;
55+
private int timeOutNum = 0;
56+
5457
protected SideInfo sideInfo;
5558
protected transient Counter parseErrorRecords;
5659

@@ -119,13 +122,16 @@ protected void dealCacheData(String key, CacheObj missKeyObj) {
119122

120123
@Override
121124
public void timeout(CRow input, ResultFuture<CRow> resultFuture) throws Exception {
122-
StreamRecordQueueEntry<CRow> future = (StreamRecordQueueEntry<CRow>)resultFuture;
123-
try {
124-
if (null == future.get()) {
125-
resultFuture.completeExceptionally(new TimeoutException("Async function call has timed out."));
126-
}
127-
} catch (Exception e) {
128-
resultFuture.completeExceptionally(new Exception(e));
125+
126+
if(timeOutNum % TIMEOUT_LOG_FLUSH_NUM == 0){
127+
LOG.info("Async function call has timed out. input:{}, timeOutNum:{}",input.toString(), timeOutNum);
128+
}
129+
130+
timeOutNum ++;
131+
if(timeOutNum > sideInfo.getSideTableInfo().getAsyncTimeoutNumLimit()){
132+
resultFuture.completeExceptionally(new Exception("Async function call timedoutNum beyond limit."));
133+
} else {
134+
resultFuture.complete(null);
129135
}
130136
}
131137

core/src/main/java/com/dtstack/flink/sql/side/SideTableInfo.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ public abstract class SideTableInfo extends TableInfo implements Serializable {
5353

5454
public static final String ASYNC_TIMEOUT_KEY = "asyncTimeout";
5555

56+
public static final String ASYNC_TIMEOUT_NUM_KEY = "asyncTimeoutNum";
57+
5658
private String cacheType = "none";//None or LRU or ALL
5759

5860
private int cacheSize = 10000;
@@ -63,6 +65,8 @@ public abstract class SideTableInfo extends TableInfo implements Serializable {
6365

6466
private int asyncTimeout=10000;
6567

68+
private int asyncTimeoutNumLimit = Integer.MAX_VALUE;
69+
6670
private boolean partitionedJoin = false;
6771

6872
private String cacheMode="ordered";
@@ -143,4 +147,13 @@ public void setPredicateInfoes(List<PredicateInfo> predicateInfoes) {
143147
public List<PredicateInfo> getPredicateInfoes() {
144148
return predicateInfoes;
145149
}
150+
151+
public int getAsyncTimeoutNumLimit() {
152+
return asyncTimeoutNumLimit;
153+
}
154+
155+
public void setAsyncTimeoutNumLimit(int asyncTimeoutNumLimit) {
156+
this.asyncTimeoutNumLimit = asyncTimeoutNumLimit;
157+
}
158+
146159
}

core/src/main/java/com/dtstack/flink/sql/table/AbsSideTableParser.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,12 @@ protected void parseCacheProp(SideTableInfo sideTableInfo, Map<String, Object> p
108108
}
109109
sideTableInfo.setAsyncTimeout(asyncTimeout);
110110
}
111+
if(props.containsKey(SideTableInfo.ASYNC_TIMEOUT_NUM_KEY.toLowerCase())){
112+
Integer asyncTimeoutNum = MathUtil.getIntegerVal(props.get(SideTableInfo.ASYNC_TIMEOUT_NUM_KEY.toLowerCase()));
113+
if (asyncTimeoutNum > 0){
114+
sideTableInfo.setAsyncTimeoutNumLimit(asyncTimeoutNum);
115+
}
116+
}
111117
}
112118
}
113119
}

kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/KuduOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public void writeRecord(Tuple2 record) throws IOException {
142142
outRecords.inc();
143143
} catch (KuduException e) {
144144
if(outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0){
145-
LOG.error("record insert failed ..{}", row.toString().substring(0, 100));
145+
LOG.error("record insert failed, total dirty record:{} current row:{}", outDirtyRecords.getCount(), row.toString());
146146
LOG.error("", e);
147147
}
148148
outDirtyRecords.inc();

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Whitespace-only changes.

0 commit comments

Comments
 (0)