Skip to content

Commit 67fc733

Browse files
committed
fix bug 29911
1 parent 56df6ec commit 67fc733

File tree

5 files changed

+9
-0
lines changed
  • kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
  • kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
  • kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
  • kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
  • kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka

5 files changed

+9
-0
lines changed

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/AbstractKafkaSink.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.sink.kafka;
2020

21+
import com.dtstack.flink.sql.enums.EUpdateMode;
2122
import com.dtstack.flink.sql.sink.IStreamSinkGener;
2223
import com.dtstack.flink.sql.sink.kafka.table.KafkaSinkTableInfo;
2324
import org.apache.commons.lang3.StringUtils;
@@ -60,6 +61,7 @@ public abstract class AbstractKafkaSink implements RetractStreamTableSink<Row>,
6061
protected int parallelism;
6162
protected String topic;
6263
protected String tableName;
64+
protected String updateMode;
6365

6466
protected TableSchema schema;
6567
protected SinkFunction<Tuple2<Boolean,Row>> kafkaProducer011;
@@ -107,6 +109,9 @@ protected String[] getPartitionKeys(KafkaSinkTableInfo kafkaSinkTableInfo) {
107109

108110
@Override
109111
public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
112+
if (updateMode.equalsIgnoreCase(EUpdateMode.APPEND.name())) {
113+
dataStream = dataStream.filter((Tuple2<Boolean, Row> record) -> record.f0);
114+
}
110115
DataStreamSink<Tuple2<Boolean, Row>> dataStreamSink = dataStream.addSink(kafkaProducer011).name(sinkOperatorName);
111116
if (parallelism > 0) {
112117
dataStreamSink.setParallelism(parallelism);

kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public KafkaSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
3636

3737
Properties kafkaProperties = getKafkaProperties(kafkaSinkTableInfo);
3838
this.tableName = kafkaSinkTableInfo.getName();
39+
this.updateMode = kafkaSinkTableInfo.getUpdateMode();
3940
this.topic = kafkaSinkTableInfo.getTopic();
4041
this.partitioner = Optional.of(new CustomerFlinkPartition<>());
4142
this.partitionKeys = getPartitionKeys(kafkaSinkTableInfo);

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public KafkaSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
3838

3939
Properties kafkaProperties = getKafkaProperties(kafka09SinkTableInfo);
4040
this.tableName = kafka09SinkTableInfo.getName();
41+
this.updateMode = kafka09SinkTableInfo.getUpdateMode();
4142
this.topic = kafka09SinkTableInfo.getTopic();
4243
this.partitioner = Optional.of(new CustomerFlinkPartition<>());
4344
this.partitionKeys = getPartitionKeys(kafka09SinkTableInfo);

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public KafkaSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
4242

4343
Properties kafkaProperties = getKafkaProperties(kafka10SinkTableInfo);
4444
this.tableName = kafka10SinkTableInfo.getName();
45+
this.updateMode = kafka10SinkTableInfo.getUpdateMode();
4546
this.topic = kafka10SinkTableInfo.getTopic();
4647
this.partitioner = Optional.of(new CustomerFlinkPartition<>());
4748
this.partitionKeys = getPartitionKeys(kafka10SinkTableInfo);

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public KafkaSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
4141

4242
Properties kafkaProperties = getKafkaProperties(kafka11SinkTableInfo);
4343
this.tableName = kafka11SinkTableInfo.getName();
44+
this.updateMode = kafka11SinkTableInfo.getUpdateMode();
4445
this.topic = kafka11SinkTableInfo.getTopic();
4546
this.partitioner = Optional.of(new CustomerFlinkPartition<>());
4647
this.partitionKeys = getPartitionKeys(kafka11SinkTableInfo);

0 commit comments

Comments
 (0)