@@ -61,7 +61,7 @@ public class KafkaSink implements RetractStreamTableSink<Row>, IStreamSinkGener<
6161
6262 protected int parallelism ;
6363
64- protected FlinkKafkaProducer010 < Row > kafkaProducer010 ;
64+ protected KafkaSinkTableInfo kafka10SinkTableInfo ;
6565
6666 /** The schema of the table. */
6767 private TableSchema schema ;
@@ -70,7 +70,7 @@ public class KafkaSink implements RetractStreamTableSink<Row>, IStreamSinkGener<
7070
7171 @ Override
7272 public KafkaSink genStreamSink (TargetTableInfo targetTableInfo ) {
73- KafkaSinkTableInfo kafka10SinkTableInfo = (KafkaSinkTableInfo ) targetTableInfo ;
73+ this . kafka10SinkTableInfo = (KafkaSinkTableInfo ) targetTableInfo ;
7474 this .topic = kafka10SinkTableInfo .getTopic ();
7575
7676 properties = new Properties ();
@@ -99,8 +99,6 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
9999 if (parallelism != null ) {
100100 this .parallelism = parallelism ;
101101 }
102- this .kafkaProducer010 = (FlinkKafkaProducer010 <Row >) new KafkaProducer010Factory ().createKafkaProducer (kafka10SinkTableInfo , getOutputType ().getTypeAt (1 ), properties ,
103- Optional .of (new CustomerFlinkPartition <>()), partitionKeys );
104102 return this ;
105103 }
106104
@@ -111,6 +109,10 @@ public TypeInformation<Row> getRecordType() {
111109
112110 @ Override
113111 public void emitDataStream (DataStream <Tuple2 <Boolean , Row >> dataStream ) {
112+
113+ FlinkKafkaProducer010 <Row > kafkaProducer010 = (FlinkKafkaProducer010 <Row >) new KafkaProducer010Factory ().createKafkaProducer (kafka10SinkTableInfo , getOutputType ().getTypeAt (1 ), properties ,
114+ Optional .of (new CustomerFlinkPartition <>()), partitionKeys );
115+
114116 DataStream <Row > mapDataStream = dataStream .filter ((Tuple2 <Boolean , Row > record ) -> record .f0 )
115117 .map ((Tuple2 <Boolean , Row > record ) -> record .f1 )
116118 .returns (getOutputType ().getTypeAt (1 ))
0 commit comments