Skip to content

Commit e4eeebb

Browse files
author
dapeng
committed
remove unuse import
1 parent 3db8088 commit e4eeebb

File tree

1 file changed

+3
-5
lines changed
  • kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka

1 file changed

+3
-5
lines changed

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@
2727
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2828
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
2929
import org.apache.flink.streaming.api.datastream.DataStream;
30-
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
31-
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
32-
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
30+
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
3331
import org.apache.flink.table.api.TableSchema;
3432
import org.apache.flink.table.sinks.RetractStreamTableSink;
3533
import org.apache.flink.table.sinks.TableSink;
@@ -110,8 +108,8 @@ public TypeInformation<Row> getRecordType() {
110108
@Override
111109
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
112110

113-
FlinkKafkaProducer010<Row> kafkaProducer010 = (FlinkKafkaProducer010<Row>) new KafkaProducer010Factory().createKafkaProducer(kafka10SinkTableInfo, getOutputType().getTypeAt(1), properties,
114-
Optional.of(new CustomerFlinkPartition<>()), partitionKeys);
111+
RichSinkFunction<Row> kafkaProducer010 = new KafkaProducer010Factory().createKafkaProducer(kafka10SinkTableInfo, getOutputType().getTypeAt(1), properties,
112+
Optional.of(new CustomerFlinkPartition<Row>()), partitionKeys);
115113

116114
DataStream<Row> mapDataStream = dataStream.filter((Tuple2<Boolean, Row> record) -> record.f0)
117115
.map((Tuple2<Boolean, Row> record) -> record.f1)

0 commit comments

Comments
 (0)