3434import java .util .Properties ;
3535
3636/**
37+ * 抽象的kafka producer 的工厂类
38+ * 包括序统一的序列化工具的构造
3739 * company: www.dtstack.com
38- *
3940 * @author: toutian
4041 * create: 2019/12/26
4142 */
4243public abstract class AbstractKafkaProducerFactory {
4344
45+ /**
46+ * 获取具体的KafkaProducer
47+ * eg create KafkaProducer010
48+ * @param kafkaSinkTableInfo
49+ * @param typeInformation
50+ * @param properties
51+ * @param partitioner
52+ * @return
53+ */
4454 public abstract RichSinkFunction <Row > createKafkaProducer (KafkaSinkTableInfo kafkaSinkTableInfo , TypeInformation <Row > typeInformation , Properties properties , Optional <FlinkKafkaPartitioner <Row >> partitioner );
4555
4656 protected SerializationMetricWrapper createSerializationMetricWrapper (KafkaSinkTableInfo kafkaSinkTableInfo , TypeInformation <Row > typeInformation ) {
@@ -50,30 +60,39 @@ protected SerializationMetricWrapper createSerializationMetricWrapper(KafkaSinkT
5060 private SerializationSchema <Row > createSerializationSchema (KafkaSinkTableInfo kafkaSinkTableInfo , TypeInformation <Row > typeInformation ) {
5161 SerializationSchema <Row > serializationSchema = null ;
5262 if (FormatType .JSON .name ().equalsIgnoreCase (kafkaSinkTableInfo .getSinkDataType ())) {
63+
5364 if (StringUtils .isNotBlank (kafkaSinkTableInfo .getSchemaString ())) {
5465 serializationSchema = new JsonRowSerializationSchema (kafkaSinkTableInfo .getSchemaString ());
5566 } else if (typeInformation != null && typeInformation .getArity () != 0 ) {
5667 serializationSchema = new JsonRowSerializationSchema (typeInformation );
5768 } else {
5869 throw new IllegalArgumentException ("sinkDataType:" + FormatType .JSON .name () + " must set schemaString(JSON Schema)or TypeInformation<Row>" );
5970 }
71+
6072 } else if (FormatType .CSV .name ().equalsIgnoreCase (kafkaSinkTableInfo .getSinkDataType ())) {
73+
6174 if (StringUtils .isBlank (kafkaSinkTableInfo .getFieldDelimiter ())) {
6275 throw new IllegalArgumentException ("sinkDataType:" + FormatType .CSV .name () + " must set fieldDelimiter" );
6376 }
77+
6478 final CsvRowSerializationSchema .Builder serSchemaBuilder = new CsvRowSerializationSchema .Builder (typeInformation );
6579 serSchemaBuilder .setFieldDelimiter (kafkaSinkTableInfo .getFieldDelimiter ().toCharArray ()[0 ]);
6680 serializationSchema = serSchemaBuilder .build ();
81+
6782 } else if (FormatType .AVRO .name ().equalsIgnoreCase (kafkaSinkTableInfo .getSinkDataType ())) {
83+
6884 if (StringUtils .isBlank (kafkaSinkTableInfo .getSchemaString ())) {
6985 throw new IllegalArgumentException ("sinkDataType:" + FormatType .AVRO .name () + " must set schemaString" );
7086 }
87+
7188 serializationSchema = new AvroRowSerializationSchema (kafkaSinkTableInfo .getSchemaString ());
89+
7290 }
7391
7492 if (null == serializationSchema ) {
7593 throw new UnsupportedOperationException ("FormatType:" + kafkaSinkTableInfo .getSinkDataType ());
7694 }
95+
7796 return serializationSchema ;
7897 }
7998
0 commit comments