Skip to content

Commit 30d99a6

Browse files
committed
Merge remote-tracking branch 'origin/1.8_v3.9.2' into v1.8.0_dev
# Conflicts: # hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java # kudu/kudu-side/kudu-async-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAsyncReqRow.java # rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java # redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java
2 parents 86120e9 + 2a58e7b commit 30d99a6

File tree

9 files changed

+157
-102
lines changed

9 files changed

+157
-102
lines changed

core/src/main/java/com/dtstack/flink/sql/metric/MetricConstant.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public class MetricConstant {
4747

4848
public static final String DT_NUM_DIRTY_RECORDS_OUT = "dtNumDirtyRecordsOut";
4949

50+
public static final String DT_NUM_SIDE_PARSE_ERROR_RECORDS = "dtNumSideParseErrorRecords";
51+
5052
public static final String DT_NUM_RECORDS_OUT_RATE = "dtNumRecordsOutRate";
5153

5254
public static final String DT_EVENT_DELAY_GAUGE = "dtEventDelay";

core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,19 @@
2121
package com.dtstack.flink.sql.side;
2222

2323
import com.dtstack.flink.sql.enums.ECacheType;
24+
import com.dtstack.flink.sql.metric.MetricConstant;
2425
import com.dtstack.flink.sql.side.cache.AbsSideCache;
2526
import com.dtstack.flink.sql.side.cache.CacheObj;
2627
import com.dtstack.flink.sql.side.cache.LRUSideCache;
2728
import org.apache.calcite.sql.JoinType;
2829
import org.apache.flink.configuration.Configuration;
30+
import org.apache.flink.metrics.Counter;
2931
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3032
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
3133
import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
3234
import org.apache.flink.types.Row;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
3337

3438
import java.util.Collection;
3539
import java.util.Collections;
@@ -44,25 +48,21 @@
4448
*/
4549

4650
public abstract class AsyncReqRow extends RichAsyncFunction<Row, Row> implements ISideReqRow {
47-
51+
private static final Logger LOG = LoggerFactory.getLogger(AsyncReqRow.class);
4852
private static final long serialVersionUID = 2098635244857937717L;
4953

5054
protected SideInfo sideInfo;
55+
protected transient Counter parseErrorRecords;
5156

5257
public AsyncReqRow(SideInfo sideInfo){
5358
this.sideInfo = sideInfo;
5459
}
5560

5661
@Override
57-
public void timeout(Row input, ResultFuture<Row> resultFuture) throws Exception {
58-
StreamRecordQueueEntry<Row> future = (StreamRecordQueueEntry<Row>)resultFuture;
59-
try {
60-
if (null == future.get()) {
61-
new TimeoutException("Async function call has timed out.");
62-
}
63-
} catch (Exception e) {
64-
throw new Exception(e);
65-
}
62+
public void open(Configuration parameters) throws Exception {
63+
super.open(parameters);
64+
initCache();
65+
initMetric();
6666
}
6767

6868
private void initCache(){
@@ -78,10 +78,13 @@ private void initCache(){
7878
}else{
7979
throw new RuntimeException("not support side cache with type:" + sideTableInfo.getCacheType());
8080
}
81-
8281
sideCache.initCache();
8382
}
8483

84+
private void initMetric() {
85+
parseErrorRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_SIDE_PARSE_ERROR_RECORDS);
86+
}
87+
8588
protected CacheObj getFromCache(String key){
8689
return sideInfo.getSideCache().getFromCache(key);
8790
}
@@ -97,17 +100,41 @@ protected boolean openCache(){
97100
protected void dealMissKey(Row input, ResultFuture<Row> resultFuture){
98101
if(sideInfo.getJoinType() == JoinType.LEFT){
99102
//Reserved left table data
100-
Row row = fillData(input, null);
101-
resultFuture.complete(Collections.singleton(row));
103+
try {
104+
Row row = fillData(input, null);
105+
resultFuture.complete(Collections.singleton(row));
106+
} catch (Exception e) {
107+
dealFillDataError(resultFuture, e, input);
108+
}
102109
}else{
103110
resultFuture.complete(null);
104111
}
105112
}
106113

114+
protected void dealCacheData(String key, CacheObj missKeyObj) {
115+
if (openCache()) {
116+
putCache(key, missKeyObj);
117+
}
118+
}
119+
107120
@Override
108-
public void open(Configuration parameters) throws Exception {
109-
super.open(parameters);
110-
initCache();
121+
public void timeout(Row input, ResultFuture<Row> resultFuture) throws Exception {
122+
StreamRecordQueueEntry<Row> future = (StreamRecordQueueEntry<Row>)resultFuture;
123+
try {
124+
if (null == future.get()) {
125+
resultFuture.completeExceptionally(new TimeoutException("Async function call has timed out."));
126+
}
127+
} catch (Exception e) {
128+
resultFuture.completeExceptionally(new Exception(e));
129+
}
130+
}
131+
132+
133+
protected void dealFillDataError(ResultFuture<Row> resultFuture, Exception e, Object sourceData) {
134+
LOG.debug("source data {} join side table error ", sourceData);
135+
LOG.debug("async buid row error..{}", e);
136+
parseErrorRecords.inc();
137+
resultFuture.complete(Collections.emptyList());
111138
}
112139

113140
@Override

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
132132
dealMissKey(inputRow, resultFuture);
133133
return;
134134
}
135-
136135
refData.put(sideInfo.getEqualFieldList().get(i), equalObj);
137136
}
138137

@@ -146,12 +145,20 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
146145
dealMissKey(inputRow, resultFuture);
147146
return;
148147
}else if(ECacheContentType.SingleLine == val.getType()){
149-
Row row = fillData(inputRow, val);
150-
resultFuture.complete(Collections.singleton(row));
151-
}else if(ECacheContentType.MultiLine == val.getType()){
152-
for(Object one : (List)val.getContent()){
153-
Row row = fillData(inputRow, one);
148+
try {
149+
Row row = fillData(inputRow, val);
154150
resultFuture.complete(Collections.singleton(row));
151+
} catch (Exception e) {
152+
dealFillDataError(resultFuture, e, inputRow);
153+
}
154+
}else if(ECacheContentType.MultiLine == val.getType()){
155+
try {
156+
for(Object one : (List)val.getContent()){
157+
Row row = fillData(inputRow, one);
158+
resultFuture.complete(Collections.singleton(row));
159+
}
160+
} catch (Exception e) {
161+
dealFillDataError(resultFuture, e, inputRow);
155162
}
156163
}
157164
return;

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/AbsRowKeyModeDealer.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,13 @@ public AbsRowKeyModeDealer(Map<String, String> colRefType, String[] colNames, HB
7474

7575
protected void dealMissKey(Row input, ResultFuture<Row> resultFuture){
7676
if(joinType == JoinType.LEFT){
77-
//保留left 表数据
78-
Row row = fillData(input, null);
79-
resultFuture.complete(Collections.singleton(row));
77+
try {
78+
//保留left 表数据
79+
Row row = fillData(input, null);
80+
resultFuture.complete(Collections.singleton(row));
81+
} catch (Exception e) {
82+
resultFuture.completeExceptionally(e);
83+
}
8084
}else{
8185
resultFuture.complete(null);
8286
}

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/PreRowKeyModeDealerDealer.java

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -103,25 +103,28 @@ private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr,
103103
Object val = HbaseUtils.convertByte(keyValue.value(), colType);
104104
sideMap.put(mapKey, val);
105105
}
106-
107-
if (oneRow.size() > 0) {
108-
//The order of the fields defined in the data conversion table
109-
List<Object> sideVal = Lists.newArrayList();
110-
for (String key : colNames) {
111-
Object val = sideMap.get(key);
112-
if (val == null) {
113-
System.out.println("can't get data with column " + key);
114-
LOG.error("can't get data with column " + key);
106+
try {
107+
if (oneRow.size() > 0) {
108+
//The order of the fields defined in the data conversion table
109+
List<Object> sideVal = Lists.newArrayList();
110+
for (String key : colNames) {
111+
Object val = sideMap.get(key);
112+
if (val == null) {
113+
System.out.println("can't get data with column " + key);
114+
LOG.error("can't get data with column " + key);
115+
}
116+
117+
sideVal.add(val);
115118
}
116119

117-
sideVal.add(val);
118-
}
119-
120-
Row row = fillData(input, sideVal);
121-
if (openCache) {
122-
cacheContent.add(sideVal);
120+
Row row = fillData(input, sideVal);
121+
if (openCache) {
122+
cacheContent.add(sideVal);
123+
}
124+
rowList.add(row);
123125
}
124-
rowList.add(row);
126+
}catch (Exception e) {
127+
resultFuture.completeExceptionally(e);
125128
}
126129
} catch (Exception e) {
127130
resultFuture.complete(null);

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/RowKeyEqualModeDealer.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -79,28 +79,29 @@ public void asyncGetData(String tableName, String rowKeyStr, Row input, ResultFu
7979
}
8080

8181
if(arg.size() > 0){
82-
//The order of the fields defined in the data conversion table
83-
List<Object> sideVal = Lists.newArrayList();
84-
for(String key : colNames){
85-
Object val = sideMap.get(key);
86-
if(val == null){
87-
System.out.println("can't get data with column " + key);
88-
LOG.error("can't get data with column " + key);
82+
try {
83+
//The order of the fields defined in the data conversion table
84+
List<Object> sideVal = Lists.newArrayList();
85+
for(String key : colNames){
86+
Object val = sideMap.get(key);
87+
if(val == null){
88+
System.out.println("can't get data with column " + key);
89+
LOG.error("can't get data with column " + key);
90+
}
91+
92+
sideVal.add(val);
8993
}
9094

91-
sideVal.add(val);
92-
}
93-
94-
Row row = fillData(input, sideVal);
95-
if(openCache){
96-
sideCache.putCache(rowKeyStr, CacheObj.buildCacheObj(ECacheContentType.SingleLine, row));
95+
Row row = fillData(input, sideVal);
96+
if(openCache){
97+
sideCache.putCache(rowKeyStr, CacheObj.buildCacheObj(ECacheContentType.SingleLine, row));
98+
}
99+
resultFuture.complete(Collections.singleton(row));
100+
} catch (Exception e) {
101+
resultFuture.completeExceptionally(e);
97102
}
98-
99-
resultFuture.complete(Collections.singleton(row));
100103
}else{
101-
102104
dealMissKey(input, resultFuture);
103-
104105
if(openCache){
105106
sideCache.putCache(rowKeyStr, CacheMissVal.getMissKeyObj());
106107
}

kudu/kudu-side/kudu-async-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAsyncReqRow.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -162,17 +162,25 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
162162
dealMissKey(inputRow, resultFuture);
163163
return;
164164
} else if (ECacheContentType.SingleLine == val.getType()) {
165-
Row row = fillData(inputRow, val);
166-
resultFuture.complete(Collections.singleton(row));
165+
try {
166+
Row row = fillData(inputRow, val);
167+
resultFuture.complete(Collections.singleton(row));
168+
} catch (Exception e) {
169+
dealFillDataError(resultFuture, e, inputRow);
170+
}
167171
} else if (ECacheContentType.MultiLine == val.getType()) {
168-
List<Row> rowList = Lists.newArrayList();
169-
for (Object jsonArray : (List) val.getContent()) {
170-
Row row = fillData(inputRow, jsonArray);
171-
rowList.add(row);
172+
try {
173+
List<Row> rowList = Lists.newArrayList();
174+
for (Object jsonArray : (List) val.getContent()) {
175+
Row row = fillData(inputRow, jsonArray);
176+
rowList.add(row);
177+
}
178+
resultFuture.complete(rowList);
179+
} catch (Exception e) {
180+
dealFillDataError(resultFuture, e, inputRow);
172181
}
173-
resultFuture.complete(rowList);
174182
} else {
175-
throw new RuntimeException("not support cache obj type " + val.getType());
183+
resultFuture.completeExceptionally(new RuntimeException("not support cache obj type " + val.getType()));
176184
}
177185
return;
178186
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -95,17 +95,16 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
9595
if (openCache()) {
9696
CacheObj val = getFromCache(key);
9797
if (val != null) {
98-
9998
if (ECacheContentType.MissVal == val.getType()) {
10099
dealMissKey(inputRow, resultFuture);
101100
return;
102101
} else if (ECacheContentType.MultiLine == val.getType()) {
103-
List<Row> rowList = Lists.newArrayList();
104-
for (Object jsonArray : (List) val.getContent()) {
105-
Row row = fillData(inputRow, jsonArray);
106-
rowList.add(row);
102+
try {
103+
List<Row> rowList = getRows(inputRow, null, (List) val.getContent());
104+
resultFuture.complete(rowList);
105+
} catch (Exception e) {
106+
dealFillDataError(resultFuture, e, inputRow);
107107
}
108-
resultFuture.complete(rowList);
109108
} else {
110109
resultFuture.completeExceptionally(new RuntimeException("not support cache obj type " + val.getType()));
111110
}
@@ -128,31 +127,19 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
128127
resultFuture.completeExceptionally(rs.cause());
129128
return;
130129
}
131-
132130
List<JsonArray> cacheContent = Lists.newArrayList();
133-
134-
int resultSize = rs.result().getResults().size();
135-
if (resultSize > 0) {
136-
List<Row> rowList = Lists.newArrayList();
137-
138-
for (JsonArray line : rs.result().getResults()) {
139-
Row row = fillData(inputRow, line);
140-
if (openCache()) {
141-
cacheContent.add(line);
142-
}
143-
rowList.add(row);
144-
}
145-
146-
if (openCache()) {
147-
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
131+
List<JsonArray> results = rs.result().getResults();
132+
if (results.size() > 0) {
133+
try {
134+
List<Row> rowList = getRows(inputRow, cacheContent, results);
135+
dealCacheData(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
136+
resultFuture.complete(rowList);
137+
} catch (Exception e){
138+
dealFillDataError(resultFuture, e, inputRow);
148139
}
149-
150-
resultFuture.complete(Collections.unmodifiableCollection(rowList));
151140
} else {
152141
dealMissKey(inputRow, resultFuture);
153-
if (openCache()) {
154-
putCache(key, CacheMissVal.getMissKeyObj());
155-
}
142+
dealCacheData(key, CacheMissVal.getMissKeyObj());
156143
}
157144

158145
// and close the connection
@@ -165,6 +152,18 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
165152
});
166153
}
167154

155+
protected List<Row> getRows(Row inputRow, List<JsonArray> cacheContent, List<JsonArray> results) {
156+
List<Row> rowList = Lists.newArrayList();
157+
for (JsonArray line : results) {
158+
Row row = fillData(inputRow, line);
159+
if (null != cacheContent && openCache()) {
160+
cacheContent.add(line);
161+
}
162+
rowList.add(row);
163+
}
164+
return rowList;
165+
}
166+
168167
@Override
169168
public Row fillData(Row input, Object line) {
170169
JsonArray jsonArray = (JsonArray) line;

0 commit comments

Comments
 (0)