File tree Expand file tree Collapse file tree 4 files changed +4
-4
lines changed
kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka Expand file tree Collapse file tree 4 files changed +4
-4
lines changed Original file line number Diff line number Diff line change @@ -105,7 +105,7 @@ public TypeInformation<Row> getRecordType() {
105105 @ Override
106106 public void emitDataStream (DataStream <Tuple2 <Boolean , Row >> dataStream ) {
107107 DataStream <Row > mapDataStream = dataStream .map ((Tuple2 <Boolean , Row > record ) -> record .f1 ).returns (getOutputType ().getTypeAt (1 )).setParallelism (parallelism );
108- mapDataStream .addSink (flinkKafkaProducer ).name (TableConnectorUtils .generateRuntimeName (FlinkKafkaProducer . class , getFieldNames ()));
108+ mapDataStream .addSink (flinkKafkaProducer ).name (TableConnectorUtils .generateRuntimeName (this . getClass () , getFieldNames ()));
109109 }
110110
111111 @ Override
Original file line number Diff line number Diff line change @@ -109,7 +109,7 @@ public TypeInformation<Row> getRecordType() {
109109 @ Override
110110 public void emitDataStream (DataStream <Tuple2 <Boolean , Row >> dataStream ) {
111111 DataStream <Row > mapDataStream = dataStream .map ((Tuple2 <Boolean , Row > record ) -> record .f1 ).returns (getOutputType ().getTypeAt (1 )).setParallelism (parallelism );
112- mapDataStream .addSink (kafkaProducer09 ).name (TableConnectorUtils .generateRuntimeName (FlinkKafkaProducer09 . class , getFieldNames ()));
112+ mapDataStream .addSink (kafkaProducer09 ).name (TableConnectorUtils .generateRuntimeName (this . getClass () , getFieldNames ()));
113113 }
114114
115115 @ Override
Original file line number Diff line number Diff line change @@ -112,7 +112,7 @@ public TypeInformation<Row> getRecordType() {
112112 @ Override
113113 public void emitDataStream (DataStream <Tuple2 <Boolean , Row >> dataStream ) {
114114 DataStream <Row > mapDataStream = dataStream .map ((Tuple2 <Boolean , Row > record ) -> record .f1 ).returns (getOutputType ().getTypeAt (1 )).setParallelism (parallelism );
115- mapDataStream .addSink (kafkaProducer010 ).name (TableConnectorUtils .generateRuntimeName (FlinkKafkaProducer010 . class , getFieldNames ()));
115+ mapDataStream .addSink (kafkaProducer010 ).name (TableConnectorUtils .generateRuntimeName (this . getClass () , getFieldNames ()));
116116 }
117117
118118 @ Override
Original file line number Diff line number Diff line change @@ -111,7 +111,7 @@ public TypeInformation<Row> getRecordType() {
111111 @ Override
112112 public void emitDataStream (DataStream <Tuple2 <Boolean , Row >> dataStream ) {
113113 DataStream <Row > mapDataStream = dataStream .map ((Tuple2 <Boolean , Row > record ) -> record .f1 ).returns (getOutputType ().getTypeAt (1 )).setParallelism (parallelism );
114- mapDataStream .addSink (kafkaProducer011 ).name (TableConnectorUtils .generateRuntimeName (FlinkKafkaProducer011 . class , getFieldNames ()));
114+ mapDataStream .addSink (kafkaProducer011 ).name (TableConnectorUtils .generateRuntimeName (this . getClass () , getFieldNames ()));
115115 }
116116
117117 @ Override
You can’t perform that action at this time.
0 commit comments