Skip to content

Commit 3f30625

Browse files
committed
fix conflict
1 parent 306c2f2 commit 3f30625

File tree

13 files changed

+565
-29
lines changed

13 files changed

+565
-29
lines changed

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@
2121
package com.dtstack.flink.sql.side;
2222

2323
import com.dtstack.flink.sql.factory.DTThreadFactory;
24+
import org.apache.calcite.sql.JoinType;
2425
import org.apache.flink.api.common.functions.RichFlatMapFunction;
2526
import org.apache.flink.api.java.tuple.Tuple2;
2627
import org.apache.flink.configuration.Configuration;
2728
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
2829
import org.apache.flink.types.Row;
30+
import org.apache.flink.util.Collector;
2931

3032
import java.sql.SQLException;
3133
import java.sql.Timestamp;
@@ -78,6 +80,14 @@ protected Object convertTimeIndictorTypeInfo(Integer index, Object obj) {
7880
return obj;
7981
}
8082

83+
protected void sendOutputRow(Tuple2<Boolean, Row> value, Object sideInput, Collector<Tuple2<Boolean, Row>> out) {
84+
if (sideInput == null && sideInfo.getJoinType() != JoinType.LEFT) {
85+
return;
86+
}
87+
Row row = fillData(value.f1, sideInput);
88+
out.collect(Tuple2.of(value.f0, row));
89+
}
90+
8191
@Override
8292
public void close() throws Exception {
8393
if (null != es && !es.isShutdown()) {

elasticsearch6/elasticsearch6-side/elasticsearch6-all-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AllReqRow.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
import com.google.common.collect.Maps;
3030
import org.apache.commons.collections.CollectionUtils;
3131
import org.apache.commons.lang3.StringUtils;
32+
import org.apache.flink.api.java.tuple.Tuple2;
3233
import org.apache.flink.api.java.typeutils.RowTypeInfo;
33-
import org.apache.flink.table.runtime.types.CRow;
3434
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
3535
import org.apache.flink.types.Row;
3636
import org.apache.flink.util.Collector;
@@ -72,10 +72,10 @@ public Elasticsearch6AllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<
7272
}
7373

7474
@Override
75-
public void flatMap(CRow value, Collector<CRow> out) throws Exception {
75+
public void flatMap(Tuple2<Boolean,Row> value, Collector<Tuple2<Boolean,Row>> out) throws Exception {
7676
List<Object> inputParams = Lists.newArrayList();
7777
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
78-
Object equalObj = value.row().getField(conValIndex);
78+
Object equalObj = value.f1.getField(conValIndex);
7979
if (equalObj == null) {
8080
sendOutputRow(value, null, out);
8181
return;

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/PreRowKeyModeDealerDealer.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
import com.google.common.collect.Maps;
3030
import org.apache.calcite.sql.JoinType;
3131
import com.google.common.collect.Lists;
32+
import org.apache.flink.api.java.tuple.Tuple2;
3233
import org.apache.flink.streaming.api.functions.async.ResultFuture;
33-
import org.apache.flink.table.runtime.types.CRow;
3434
import org.apache.flink.types.Row;
3535
import org.hbase.async.BinaryPrefixComparator;
3636
import org.hbase.async.Bytes;
@@ -65,7 +65,7 @@ public PreRowKeyModeDealerDealer(Map<String, String> colRefType, String[] colNam
6565
}
6666

6767
@Override
68-
public void asyncGetData(String tableName, String rowKeyStr, CRow input, ResultFuture<CRow> resultFuture,
68+
public void asyncGetData(String tableName, String rowKeyStr, Tuple2<Boolean, Row> input, ResultFuture<Tuple2<Boolean, Row>> resultFuture,
6969
AbstractSideCache sideCache) {
7070
Scanner prefixScanner = hBaseClient.newScanner(tableName);
7171
ScanFilter scanFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.UTF8(rowKeyStr)));
@@ -79,7 +79,8 @@ public void asyncGetData(String tableName, String rowKeyStr, CRow input, ResultF
7979
}
8080

8181

82-
private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr, CRow input, ResultFuture<CRow> resultFuture, AbstractSideCache sideCache) {
82+
private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr, Tuple2<Boolean,Row> input,
83+
ResultFuture<Tuple2<Boolean,Row> > resultFuture, AbstractSideCache sideCache) {
8384
if(args == null || args.size() == 0){
8485
dealMissKey(input, resultFuture);
8586
if (openCache) {
@@ -88,7 +89,7 @@ private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr,
8889
}
8990

9091
List<Object> cacheContent = Lists.newArrayList();
91-
List<CRow> rowList = Lists.newArrayList();
92+
List<Tuple2<Boolean,Row> > rowList = Lists.newArrayList();
9293

9394
for(List<KeyValue> oneRow : args){
9495
try {
@@ -117,11 +118,12 @@ private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr,
117118
sideVal.add(val);
118119
}
119120

120-
Row row = fillData(input.row(), sideVal);
121+
Row row = fillData(input.f1, sideVal);
121122
if (openCache) {
122123
cacheContent.add(sideVal);
123124
}
124-
rowList.add(new CRow(row, input.change()));
125+
126+
rowList.add(Tuple2.of(input.f0,row));
125127
}
126128
}catch (Exception e) {
127129
resultFuture.completeExceptionally(e);
@@ -144,7 +146,7 @@ private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr,
144146
return "";
145147
}
146148

147-
private String dealFail(Object arg2, CRow input, ResultFuture<CRow> resultFuture){
149+
private String dealFail(Object arg2, Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,Row>> resultFuture){
148150
LOG.error("record:" + input);
149151
LOG.error("get side record exception:" + arg2);
150152
resultFuture.complete(null);

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/RowKeyEqualModeDealer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
import com.google.common.collect.Maps;
3030
import org.apache.calcite.sql.JoinType;
3131
import com.google.common.collect.Lists;
32+
import org.apache.flink.api.java.tuple.Tuple2;
3233
import org.apache.flink.streaming.api.functions.async.ResultFuture;
33-
import org.apache.flink.table.runtime.types.CRow;
3434
import org.apache.flink.types.Row;
3535
import org.hbase.async.GetRequest;
3636
import org.hbase.async.HBaseClient;
@@ -61,7 +61,7 @@ public RowKeyEqualModeDealer(Map<String, String> colRefType, String[] colNames,
6161

6262

6363
@Override
64-
public void asyncGetData(String tableName, String rowKeyStr, CRow input, ResultFuture<CRow> resultFuture,
64+
public void asyncGetData(String tableName, String rowKeyStr, Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,Row>> resultFuture,
6565
AbstractSideCache sideCache){
6666
//TODO 是否有查询多个col family 和多个col的方法
6767
GetRequest getRequest = new GetRequest(tableName, rowKeyStr);
@@ -93,11 +93,11 @@ public void asyncGetData(String tableName, String rowKeyStr, CRow input, ResultF
9393
sideVal.add(val);
9494
}
9595

96-
Row row = fillData(input.row(), sideVal);
96+
Row row = fillData(input.f1, sideVal);
9797
if(openCache){
9898
sideCache.putCache(rowKeyStr, CacheObj.buildCacheObj(ECacheContentType.SingleLine, row));
9999
}
100-
resultFuture.complete(Collections.singleton(new CRow(row, input.change())));
100+
resultFuture.complete(Collections.singleton(Tuple2.of(input.f0, row)));
101101
} catch (Exception e) {
102102
resultFuture.completeExceptionally(e);
103103
}

0 commit comments

Comments
 (0)