Skip to content

Commit a8364e9

Browse files
committed
Merge branch 'v1.8.0_dev_bugfix_leftjoinnull' into 'v1.8.0_dev'
left join null nosql See merge request !135
2 parents d34c1e8 + f243567 commit a8364e9

File tree

7 files changed

+25
-7
lines changed

7 files changed

+25
-7
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,11 @@ public void flatMap(Row value, Collector<Row> out) throws Exception {
134134
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
135135
Object equalObj = value.getField(conValIndex);
136136
if (equalObj == null) {
137-
out.collect(null);
137+
if(sideInfo.getJoinType() == JoinType.LEFT){
138+
Row data = fillData(value, null);
139+
out.collect(data);
140+
}
141+
return;
138142
}
139143

140144
inputParams.add(equalObj);

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,8 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
170170
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
171171
Object equalObj = input.getField(conValIndex);
172172
if (equalObj == null) {
173-
resultFuture.complete(null);
173+
dealMissKey(input, resultFuture);
174+
return;
174175
}
175176
inputParams.add(equalObj);
176177
stringBuffer.append(sideInfo.getEqualFieldList().get(i))

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import com.dtstack.flink.sql.side.*;
2424
import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo;
25+
import org.apache.calcite.sql.JoinType;
2526
import org.apache.commons.collections.map.HashedMap;
2627
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2728
import com.google.common.collect.Maps;
@@ -119,7 +120,11 @@ public void flatMap(Row value, Collector<Row> out) throws Exception {
119120
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
120121
Object equalObj = value.getField(conValIndex);
121122
if(equalObj == null){
122-
out.collect(null);
123+
if(sideInfo.getJoinType() == JoinType.LEFT){
124+
Row data = fillData(value, null);
125+
out.collect(data);
126+
}
127+
return;
123128
}
124129
refData.put(sideInfo.getEqualFieldList().get(i), equalObj);
125130
}

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
128128
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
129129
Object equalObj = input.getField(conValIndex);
130130
if(equalObj == null){
131-
resultFuture.complete(null);
131+
dealMissKey(input, resultFuture);
132132
return;
133133
}
134134

mongo/mongo-side/mongo-all-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAllReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,11 @@ public void flatMap(Row value, Collector<Row> out) throws Exception {
132132
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
133133
Object equalObj = value.getField(conValIndex);
134134
if (equalObj == null) {
135-
out.collect(null);
135+
if(sideInfo.getJoinType() == JoinType.LEFT){
136+
Row data = fillData(value, null);
137+
out.collect(data);
138+
}
139+
return;
136140
}
137141

138142
inputParams.add(equalObj);

mongo/mongo-side/mongo-async-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
129129
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
130130
Object equalObj = input.getField(conValIndex);
131131
if (equalObj == null) {
132-
resultFuture.complete(null);
132+
dealMissKey(input, resultFuture);
133133
return;
134134
}
135135
basicDBObject.put(sideInfo.getEqualFieldList().get(i), equalObj);

redis5/redis5-side/redis-all-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAllReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,11 @@ public void flatMap(Row row, Collector<Row> out) throws Exception {
9393
for(Integer conValIndex : sideInfo.getEqualValIndex()){
9494
Object equalObj = row.getField(conValIndex);
9595
if(equalObj == null){
96-
out.collect(null);
96+
if(sideInfo.getJoinType() == JoinType.LEFT){
97+
Row data = fillData(row, null);
98+
out.collect(data);
99+
}
100+
return;
97101
}
98102
String columnName = sideInfo.getEqualFieldList().get(conValIndex);
99103
inputParams.put(columnName, equalObj.toString());

0 commit comments

Comments
 (0)