Skip to content

Commit 3856dc4

Browse files
author
yinxi
committed
Merge branch '1.8_test_3.1.0x_mergeElasticsearch6-side' into '1.8_test_3.10.x'
添加elasticsearch6-side中缺的sendOutputRow()方法 See merge request !240
2 parents 4bf4615 + 3bcb8a2 commit 3856dc4

File tree

1 file changed

+12
-0
lines changed

1 file changed

+12
-0
lines changed

core/src/main/java/com/dtstack/flink/sql/side/AllReqRow.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@
2323
import org.apache.flink.api.common.functions.RichFlatMapFunction;
2424
import org.apache.flink.configuration.Configuration;
2525
import org.apache.flink.table.runtime.types.CRow;
26+
import org.apache.flink.types.Row;
27+
import org.apache.flink.util.Collector;
2628

2729
import com.dtstack.flink.sql.factory.DTThreadFactory;
30+
import org.apache.calcite.sql.JoinType;
2831

2932
import java.sql.SQLException;
3033
import java.util.concurrent.Executors;
@@ -65,4 +68,13 @@ public void open(Configuration parameters) throws Exception {
6568
es.scheduleAtFixedRate(() -> reloadCache(), sideTableInfo.getCacheTimeout(), sideTableInfo.getCacheTimeout(), TimeUnit.MILLISECONDS);
6669
}
6770

71+
protected void sendOutputRow(CRow value, Object sideInput, Collector<CRow> out){
72+
if(sideInput == null && sideInfo.getJoinType() != JoinType.LEFT){
73+
return;
74+
}
75+
76+
Row row = fillData(value.row(), sideInput);
77+
out.collect(new CRow(row, value.change()));
78+
}
79+
6880
}

0 commit comments

Comments
 (0)