Skip to content

Commit 1470167

Browse files
committed
fix async side parse error
1 parent 812a110 commit 1470167

File tree

9 files changed

+166
-109
lines changed

9 files changed

+166
-109
lines changed

core/src/main/java/com/dtstack/flink/sql/metric/MetricConstant.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public class MetricConstant {
4747

4848
public static final String DT_NUM_DIRTY_RECORDS_OUT = "dtNumDirtyRecordsOut";
4949

50+
public static final String DT_NUM_SIDE_PARSE_ERROR_RECORDS = "dtNumSideParseErrorRecords";
51+
5052
public static final String DT_NUM_RECORDS_OUT_RATE = "dtNumRecordsOutRate";
5153

5254
public static final String DT_EVENT_DELAY_GAUGE = "dtEventDelay";

core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,19 @@
2121
package com.dtstack.flink.sql.side;
2222

2323
import com.dtstack.flink.sql.enums.ECacheType;
24+
import com.dtstack.flink.sql.metric.MetricConstant;
2425
import com.dtstack.flink.sql.side.cache.AbsSideCache;
2526
import com.dtstack.flink.sql.side.cache.CacheObj;
2627
import com.dtstack.flink.sql.side.cache.LRUSideCache;
2728
import org.apache.calcite.sql.JoinType;
2829
import org.apache.flink.configuration.Configuration;
30+
import org.apache.flink.metrics.Counter;
2931
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3032
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
3133
import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
3234
import org.apache.flink.types.Row;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
3337

3438
import java.util.Collection;
3539
import java.util.Collections;
@@ -44,25 +48,21 @@
4448
*/
4549

4650
public abstract class AsyncReqRow extends RichAsyncFunction<Row, Row> implements ISideReqRow {
47-
51+
private static final Logger LOG = LoggerFactory.getLogger(AsyncReqRow.class);
4852
private static final long serialVersionUID = 2098635244857937717L;
4953

5054
protected SideInfo sideInfo;
55+
protected transient Counter parseErrorRecords;
5156

5257
public AsyncReqRow(SideInfo sideInfo){
5358
this.sideInfo = sideInfo;
5459
}
5560

5661
@Override
57-
public void timeout(Row input, ResultFuture<Row> resultFuture) throws Exception {
58-
StreamRecordQueueEntry<Row> future = (StreamRecordQueueEntry<Row>)resultFuture;
59-
try {
60-
if (null == future.get()) {
61-
new TimeoutException("Async function call has timed out.");
62-
}
63-
} catch (Exception e) {
64-
throw new Exception(e);
65-
}
62+
public void open(Configuration parameters) throws Exception {
63+
super.open(parameters);
64+
initCache();
65+
initMetric();
6666
}
6767

6868
private void initCache(){
@@ -78,10 +78,13 @@ private void initCache(){
7878
}else{
7979
throw new RuntimeException("not support side cache with type:" + sideTableInfo.getCacheType());
8080
}
81-
8281
sideCache.initCache();
8382
}
8483

84+
private void initMetric() {
85+
parseErrorRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_SIDE_PARSE_ERROR_RECORDS);
86+
}
87+
8588
protected CacheObj getFromCache(String key){
8689
return sideInfo.getSideCache().getFromCache(key);
8790
}
@@ -97,17 +100,35 @@ protected boolean openCache(){
97100
protected void dealMissKey(Row input, ResultFuture<Row> resultFuture){
98101
if(sideInfo.getJoinType() == JoinType.LEFT){
99102
//Reserved left table data
100-
Row row = fillData(input, null);
101-
resultFuture.complete(Collections.singleton(row));
103+
try {
104+
Row row = fillData(input, null);
105+
resultFuture.complete(Collections.singleton(row));
106+
} catch (Exception e) {
107+
dealFillDataError(resultFuture, e, input);
108+
}
102109
}else{
103110
resultFuture.complete(null);
104111
}
105112
}
106113

107114
@Override
108-
public void open(Configuration parameters) throws Exception {
109-
super.open(parameters);
110-
initCache();
115+
public void timeout(Row input, ResultFuture<Row> resultFuture) throws Exception {
116+
StreamRecordQueueEntry<Row> future = (StreamRecordQueueEntry<Row>)resultFuture;
117+
try {
118+
if (null == future.get()) {
119+
resultFuture.completeExceptionally(new TimeoutException("Async function call has timed out."));
120+
}
121+
} catch (Exception e) {
122+
resultFuture.completeExceptionally(new Exception(e));
123+
}
124+
}
125+
126+
127+
protected void dealFillDataError(ResultFuture<Row> resultFuture, Exception e, Object sourceData) {
128+
LOG.debug("source data {} join side table error ", sourceData);
129+
LOG.debug("async buid row error..{}", e);
130+
parseErrorRecords.inc();
131+
resultFuture.complete(Collections.emptyList());
111132
}
112133

113134
@Override

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,13 @@ public void open(Configuration parameters) throws Exception {
123123

124124
@Override
125125
public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Exception {
126+
Row inputRow = Row.copy(input);
126127
Map<String, Object> refData = Maps.newHashMap();
127128
for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) {
128129
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
129-
Object equalObj = input.getField(conValIndex);
130+
Object equalObj = inputRow.getField(conValIndex);
130131
if(equalObj == null){
131-
dealMissKey(input, resultFuture);
132+
dealMissKey(inputRow, resultFuture);
132133
return;
133134
}
134135

@@ -142,22 +143,30 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
142143
CacheObj val = getFromCache(rowKeyStr);
143144
if(val != null){
144145
if(ECacheContentType.MissVal == val.getType()){
145-
dealMissKey(input, resultFuture);
146+
dealMissKey(inputRow, resultFuture);
146147
return;
147148
}else if(ECacheContentType.SingleLine == val.getType()){
148-
Row row = fillData(input, val);
149-
resultFuture.complete(Collections.singleton(row));
150-
}else if(ECacheContentType.MultiLine == val.getType()){
151-
for(Object one : (List)val.getContent()){
152-
Row row = fillData(input, one);
149+
try {
150+
Row row = fillData(inputRow, val);
153151
resultFuture.complete(Collections.singleton(row));
152+
} catch (Exception e) {
153+
dealFillDataError(resultFuture, e, inputRow);
154+
}
155+
}else if(ECacheContentType.MultiLine == val.getType()){
156+
try {
157+
for(Object one : (List)val.getContent()){
158+
Row row = fillData(inputRow, one);
159+
resultFuture.complete(Collections.singleton(row));
160+
}
161+
} catch (Exception e) {
162+
dealFillDataError(resultFuture, e, inputRow);
154163
}
155164
}
156165
return;
157166
}
158167
}
159168

160-
rowKeyMode.asyncGetData(tableName, rowKeyStr, input, resultFuture, sideInfo.getSideCache());
169+
rowKeyMode.asyncGetData(tableName, rowKeyStr, inputRow, resultFuture, sideInfo.getSideCache());
161170
}
162171

163172
@Override

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/AbsRowKeyModeDealer.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,13 @@ public AbsRowKeyModeDealer(Map<String, String> colRefType, String[] colNames, HB
7474

7575
protected void dealMissKey(Row input, ResultFuture<Row> resultFuture){
7676
if(joinType == JoinType.LEFT){
77-
//保留left 表数据
78-
Row row = fillData(input, null);
79-
resultFuture.complete(Collections.singleton(row));
77+
try {
78+
//保留left 表数据
79+
Row row = fillData(input, null);
80+
resultFuture.complete(Collections.singleton(row));
81+
} catch (Exception e) {
82+
resultFuture.completeExceptionally(e);
83+
}
8084
}else{
8185
resultFuture.complete(null);
8286
}

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/PreRowKeyModeDealerDealer.java

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -103,25 +103,28 @@ private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr,
103103
Object val = HbaseUtils.convertByte(keyValue.value(), colType);
104104
sideMap.put(mapKey, val);
105105
}
106-
107-
if (oneRow.size() > 0) {
108-
//The order of the fields defined in the data conversion table
109-
List<Object> sideVal = Lists.newArrayList();
110-
for (String key : colNames) {
111-
Object val = sideMap.get(key);
112-
if (val == null) {
113-
System.out.println("can't get data with column " + key);
114-
LOG.error("can't get data with column " + key);
106+
try {
107+
if (oneRow.size() > 0) {
108+
//The order of the fields defined in the data conversion table
109+
List<Object> sideVal = Lists.newArrayList();
110+
for (String key : colNames) {
111+
Object val = sideMap.get(key);
112+
if (val == null) {
113+
System.out.println("can't get data with column " + key);
114+
LOG.error("can't get data with column " + key);
115+
}
116+
117+
sideVal.add(val);
115118
}
116119

117-
sideVal.add(val);
118-
}
119-
120-
Row row = fillData(input, sideVal);
121-
if (openCache) {
122-
cacheContent.add(sideVal);
120+
Row row = fillData(input, sideVal);
121+
if (openCache) {
122+
cacheContent.add(sideVal);
123+
}
124+
rowList.add(row);
123125
}
124-
rowList.add(row);
126+
}catch (Exception e) {
127+
resultFuture.completeExceptionally(e);
125128
}
126129
} catch (Exception e) {
127130
resultFuture.complete(null);

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/RowKeyEqualModeDealer.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -79,28 +79,29 @@ public void asyncGetData(String tableName, String rowKeyStr, Row input, ResultFu
7979
}
8080

8181
if(arg.size() > 0){
82-
//The order of the fields defined in the data conversion table
83-
List<Object> sideVal = Lists.newArrayList();
84-
for(String key : colNames){
85-
Object val = sideMap.get(key);
86-
if(val == null){
87-
System.out.println("can't get data with column " + key);
88-
LOG.error("can't get data with column " + key);
82+
try {
83+
//The order of the fields defined in the data conversion table
84+
List<Object> sideVal = Lists.newArrayList();
85+
for(String key : colNames){
86+
Object val = sideMap.get(key);
87+
if(val == null){
88+
System.out.println("can't get data with column " + key);
89+
LOG.error("can't get data with column " + key);
90+
}
91+
92+
sideVal.add(val);
8993
}
9094

91-
sideVal.add(val);
92-
}
93-
94-
Row row = fillData(input, sideVal);
95-
if(openCache){
96-
sideCache.putCache(rowKeyStr, CacheObj.buildCacheObj(ECacheContentType.SingleLine, row));
95+
Row row = fillData(input, sideVal);
96+
if(openCache){
97+
sideCache.putCache(rowKeyStr, CacheObj.buildCacheObj(ECacheContentType.SingleLine, row));
98+
}
99+
resultFuture.complete(Collections.singleton(row));
100+
} catch (Exception e) {
101+
resultFuture.completeExceptionally(e);
97102
}
98-
99-
resultFuture.complete(Collections.singleton(row));
100103
}else{
101-
102104
dealMissKey(input, resultFuture);
103-
104105
if(openCache){
105106
sideCache.putCache(rowKeyStr, CacheMissVal.getMissKeyObj());
106107
}

kudu/kudu-side/kudu-async-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAsyncReqRow.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ private void connKuDu() throws KuduException {
121121

122122
@Override
123123
public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Exception {
124+
Row inputRow = Row.copy(input);
124125
//scannerBuilder 设置为null重新加载过滤条件
125126
scannerBuilder = null;
126127
connKuDu();
@@ -144,22 +145,29 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
144145
//判断数据是否已经加载到缓存中
145146
CacheObj val = getFromCache(key);
146147
if (val != null) {
147-
148148
if (ECacheContentType.MissVal == val.getType()) {
149-
dealMissKey(input, resultFuture);
149+
dealMissKey(inputRow, resultFuture);
150150
return;
151151
} else if (ECacheContentType.SingleLine == val.getType()) {
152-
Row row = fillData(input, val);
153-
resultFuture.complete(Collections.singleton(row));
152+
try {
153+
Row row = fillData(inputRow, val);
154+
resultFuture.complete(Collections.singleton(row));
155+
} catch (Exception e) {
156+
dealFillDataError(resultFuture, e, inputRow);
157+
}
154158
} else if (ECacheContentType.MultiLine == val.getType()) {
155-
List<Row> rowList = Lists.newArrayList();
156-
for (Object jsonArray : (List) val.getContent()) {
157-
Row row = fillData(input, jsonArray);
158-
rowList.add(row);
159+
try {
160+
List<Row> rowList = Lists.newArrayList();
161+
for (Object jsonArray : (List) val.getContent()) {
162+
Row row = fillData(inputRow, jsonArray);
163+
rowList.add(row);
164+
}
165+
resultFuture.complete(rowList);
166+
} catch (Exception e) {
167+
dealFillDataError(resultFuture, e, inputRow);
159168
}
160-
resultFuture.complete(rowList);
161169
} else {
162-
throw new RuntimeException("not support cache obj type " + val.getType());
170+
resultFuture.completeExceptionally(new RuntimeException("not support cache obj type " + val.getType()));
163171
}
164172
return;
165173
}
@@ -169,7 +177,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
169177
List<Row> rowList = Lists.newArrayList();
170178
Deferred<RowResultIterator> data = asyncKuduScanner.nextRows();
171179
//从之前的同步修改为调用异步的Callback
172-
data.addCallbackDeferring(new GetListRowCB(input, cacheContent, rowList, asyncKuduScanner, resultFuture, key));
180+
data.addCallbackDeferring(new GetListRowCB(inputRow, cacheContent, rowList, asyncKuduScanner, resultFuture, key));
173181
}
174182

175183

0 commit comments

Comments
 (0)