Skip to content

Commit f243567

Browse files
committed
left join null nosql
1 parent 9468eb4 commit f243567

File tree

8 files changed

+26
-8
lines changed

8 files changed

+26
-8
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,11 @@ public void flatMap(Row value, Collector<Row> out) throws Exception {
134134
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
135135
Object equalObj = value.getField(conValIndex);
136136
if (equalObj == null) {
137-
out.collect(null);
137+
if(sideInfo.getJoinType() == JoinType.LEFT){
138+
Row data = fillData(value, null);
139+
out.collect(data);
140+
}
141+
return;
138142
}
139143

140144
inputParams.add(equalObj);

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,8 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
170170
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
171171
Object equalObj = input.getField(conValIndex);
172172
if (equalObj == null) {
173-
resultFuture.complete(null);
173+
dealMissKey(input, resultFuture);
174+
return;
174175
}
175176
inputParams.add(equalObj);
176177
stringBuffer.append(sideInfo.getEqualFieldList().get(i))

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import com.dtstack.flink.sql.side.*;
2424
import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo;
25+
import org.apache.calcite.sql.JoinType;
2526
import org.apache.commons.collections.map.HashedMap;
2627
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2728
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
@@ -119,7 +120,11 @@ public void flatMap(Row value, Collector<Row> out) throws Exception {
119120
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
120121
Object equalObj = value.getField(conValIndex);
121122
if(equalObj == null){
122-
out.collect(null);
123+
if(sideInfo.getJoinType() == JoinType.LEFT){
124+
Row data = fillData(value, null);
125+
out.collect(data);
126+
}
127+
return;
123128
}
124129
refData.put(sideInfo.getEqualFieldList().get(i), equalObj);
125130
}

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
128128
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
129129
Object equalObj = input.getField(conValIndex);
130130
if(equalObj == null){
131-
resultFuture.complete(null);
131+
dealMissKey(input, resultFuture);
132132
return;
133133
}
134134

mongo/mongo-side/mongo-all-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAllReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,11 @@ public void flatMap(Row value, Collector<Row> out) throws Exception {
132132
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
133133
Object equalObj = value.getField(conValIndex);
134134
if (equalObj == null) {
135-
out.collect(null);
135+
if(sideInfo.getJoinType() == JoinType.LEFT){
136+
Row data = fillData(value, null);
137+
out.collect(data);
138+
}
139+
return;
136140
}
137141

138142
inputParams.add(equalObj);

mongo/mongo-side/mongo-async-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
129129
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
130130
Object equalObj = input.getField(conValIndex);
131131
if (equalObj == null) {
132-
resultFuture.complete(null);
132+
dealMissKey(input, resultFuture);
133133
return;
134134
}
135135
basicDBObject.put(sideInfo.getEqualFieldList().get(i), equalObj);

redis5/redis5-side/redis-all-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAllReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,11 @@ public void flatMap(Row row, Collector<Row> out) throws Exception {
9595
for(Integer conValIndex : sideInfo.getEqualValIndex()){
9696
Object equalObj = row.getField(conValIndex);
9797
if(equalObj == null){
98-
out.collect(null);
98+
if(sideInfo.getJoinType() == JoinType.LEFT){
99+
Row data = fillData(row, null);
100+
out.collect(data);
101+
}
102+
return;
99103
}
100104
String columnName = sideInfo.getEqualFieldList().get(conValIndex);
101105
inputParams.put(columnName, (String) equalObj);

redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
124124
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
125125
Object equalObj = input.getField(conValIndex);
126126
if(equalObj == null){
127-
resultFuture.complete(null);
127+
dealMissKey(input, resultFuture);
128128
return;
129129
}
130130

0 commit comments

Comments
 (0)