1919
2020import com .dtstack .flink .sql .format .FormatType ;
2121import com .dtstack .flink .sql .format .SerializationMetricWrapper ;
22- import com .dtstack .flink .sql .sink .kafka .serialization .AvroCRowSerializationSchema ;
23- import com .dtstack .flink .sql .sink .kafka .serialization .CsvCRowSerializationSchema ;
24- import com .dtstack .flink .sql .sink .kafka .serialization .JsonCRowSerializationSchema ;
22+ import com .dtstack .flink .sql .sink .kafka .serialization .AvroTuple2SerializationSchema ;
23+ import com .dtstack .flink .sql .sink .kafka .serialization .CsvTupleSerializationSchema ;
24+ import com .dtstack .flink .sql .sink .kafka .serialization .JsonTupleSerializationSchema ;
2525import com .dtstack .flink .sql .sink .kafka .table .KafkaSinkTableInfo ;
2626import org .apache .commons .lang3 .StringUtils ;
2727import org .apache .flink .api .common .serialization .SerializationSchema ;
2828import org .apache .flink .api .common .typeinfo .TypeInformation ;
29+ import org .apache .flink .api .java .tuple .Tuple2 ;
2930import org .apache .flink .streaming .api .functions .sink .RichSinkFunction ;
3031import org .apache .flink .streaming .connectors .kafka .partitioner .FlinkKafkaPartitioner ;
31- import org .apache .flink .table . runtime . types .CRow ;
32+ import org .apache .flink .types .Row ;
3233
3334import java .util .Optional ;
3435import java .util .Properties ;
@@ -51,28 +52,29 @@ public abstract class AbstractKafkaProducerFactory {
5152 * @param partitioner
5253 * @return
5354 */
54- public abstract RichSinkFunction <CRow > createKafkaProducer (KafkaSinkTableInfo kafkaSinkTableInfo , TypeInformation <CRow > typeInformation , Properties properties , Optional <FlinkKafkaPartitioner <CRow >> partitioner , String [] partitionKeys );
55+ public abstract RichSinkFunction <Tuple2 <Boolean ,Row >> createKafkaProducer (KafkaSinkTableInfo kafkaSinkTableInfo , TypeInformation <Tuple2 <Boolean ,Row >> typeInformation ,
56+ Properties properties , Optional <FlinkKafkaPartitioner <Tuple2 <Boolean ,Row >>> partitioner , String [] partitionKeys );
5557
56- protected SerializationMetricWrapper createSerializationMetricWrapper (KafkaSinkTableInfo kafkaSinkTableInfo , TypeInformation <CRow > typeInformation ) {
57- SerializationSchema <CRow > serializationSchema = createSerializationSchema (kafkaSinkTableInfo , typeInformation );
58+ protected SerializationMetricWrapper createSerializationMetricWrapper (KafkaSinkTableInfo kafkaSinkTableInfo , TypeInformation <Tuple2 < Boolean , Row > > typeInformation ) {
59+ SerializationSchema <Tuple2 < Boolean , Row > > serializationSchema = createSerializationSchema (kafkaSinkTableInfo , typeInformation );
5860 return new SerializationMetricWrapper (serializationSchema );
5961 }
6062
61- private SerializationSchema <CRow > createSerializationSchema (KafkaSinkTableInfo kafkaSinkTableInfo , TypeInformation <CRow > typeInformation ) {
62- SerializationSchema <CRow > serializationSchema = null ;
63+ private SerializationSchema <Tuple2 < Boolean , Row >> createSerializationSchema (KafkaSinkTableInfo kafkaSinkTableInfo , TypeInformation <Tuple2 < Boolean , Row > > typeInformation ) {
64+ SerializationSchema <Tuple2 < Boolean , Row > > serializationSchema = null ;
6365 if (FormatType .JSON .name ().equalsIgnoreCase (kafkaSinkTableInfo .getSinkDataType ())) {
6466 if (StringUtils .isNotBlank (kafkaSinkTableInfo .getSchemaString ())) {
65- serializationSchema = new JsonCRowSerializationSchema (kafkaSinkTableInfo .getSchemaString (), kafkaSinkTableInfo .getUpdateMode ());
67+ serializationSchema = new JsonTupleSerializationSchema (kafkaSinkTableInfo .getSchemaString (), kafkaSinkTableInfo .getUpdateMode ());
6668 } else if (typeInformation != null && typeInformation .getArity () != 0 ) {
67- serializationSchema = new JsonCRowSerializationSchema (typeInformation , kafkaSinkTableInfo .getUpdateMode ());
69+ serializationSchema = new JsonTupleSerializationSchema (typeInformation , kafkaSinkTableInfo .getUpdateMode ());
6870 } else {
6971 throw new IllegalArgumentException ("sinkDataType:" + FormatType .JSON .name () + " must set schemaString(JSON Schema)or TypeInformation<Row>" );
7072 }
7173 } else if (FormatType .CSV .name ().equalsIgnoreCase (kafkaSinkTableInfo .getSinkDataType ())) {
7274 if (StringUtils .isBlank (kafkaSinkTableInfo .getFieldDelimiter ())) {
7375 throw new IllegalArgumentException ("sinkDataType:" + FormatType .CSV .name () + " must set fieldDelimiter" );
7476 }
75- final CsvCRowSerializationSchema .Builder serSchemaBuilder = new CsvCRowSerializationSchema .Builder (typeInformation );
77+ final CsvTupleSerializationSchema .Builder serSchemaBuilder = new CsvTupleSerializationSchema .Builder (typeInformation );
7678 serSchemaBuilder .setFieldDelimiter (kafkaSinkTableInfo .getFieldDelimiter ().toCharArray ()[0 ]);
7779 serSchemaBuilder .setUpdateMode (kafkaSinkTableInfo .getUpdateMode ());
7880
@@ -81,7 +83,7 @@ private SerializationSchema<CRow> createSerializationSchema(KafkaSinkTableInfo k
8183 if (StringUtils .isBlank (kafkaSinkTableInfo .getSchemaString ())) {
8284 throw new IllegalArgumentException ("sinkDataType:" + FormatType .AVRO .name () + " must set schemaString" );
8385 }
84- serializationSchema = new AvroCRowSerializationSchema (kafkaSinkTableInfo .getSchemaString (), kafkaSinkTableInfo .getUpdateMode ());
86+ serializationSchema = new AvroTuple2SerializationSchema (kafkaSinkTableInfo .getSchemaString (), kafkaSinkTableInfo .getUpdateMode ());
8587 }
8688
8789 if (null == serializationSchema ) {
0 commit comments