|
18 | 18 |
|
19 | 19 | package com.dtstack.flink.sql.sink.kafka; |
20 | 20 |
|
21 | | -import com.dtstack.flink.sql.sink.IStreamSinkGener; |
22 | 21 | import com.dtstack.flink.sql.sink.kafka.table.KafkaSinkTableInfo; |
23 | 22 | import com.dtstack.flink.sql.table.AbstractTargetTableInfo; |
24 | | -import org.apache.commons.lang3.StringUtils; |
25 | | -import org.apache.flink.api.common.typeinfo.TypeInformation; |
26 | | -import org.apache.flink.api.java.tuple.Tuple2; |
27 | | -import org.apache.flink.api.java.typeutils.RowTypeInfo; |
28 | | -import org.apache.flink.api.java.typeutils.TupleTypeInfo; |
29 | | -import org.apache.flink.streaming.api.datastream.DataStream; |
30 | | -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; |
31 | | -import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; |
32 | | -import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; |
33 | | -import org.apache.flink.table.api.TableSchema; |
34 | | -import org.apache.flink.table.runtime.types.CRow; |
35 | | -import org.apache.flink.table.runtime.types.CRowTypeInfo; |
36 | | -import org.apache.flink.table.sinks.RetractStreamTableSink; |
37 | | -import org.apache.flink.table.sinks.TableSink; |
38 | | -import org.apache.flink.table.utils.TableConnectorUtils; |
39 | | -import org.apache.flink.types.Row; |
40 | 23 |
|
41 | 24 | import java.util.Optional; |
42 | 25 | import java.util.Properties; |
|
46 | 29 | * @create: 2019-11-05 11:45 |
47 | 30 | * @description: |
48 | 31 | **/ |
49 | | -public class KafkaSink implements RetractStreamTableSink<Row>, IStreamSinkGener<KafkaSink> { |
50 | | - |
51 | | - protected String[] fieldNames; |
52 | | - |
53 | | - protected TypeInformation<?>[] fieldTypes; |
54 | | - |
55 | | - protected String topic; |
56 | | - |
57 | | - protected int parallelism; |
58 | | - |
59 | | - protected Properties properties; |
60 | | - |
61 | | - protected FlinkKafkaProducer<CRow> flinkKafkaProducer; |
62 | | - protected CRowTypeInfo typeInformation; |
63 | | - |
64 | | - |
65 | | - /** The schema of the table. */ |
66 | | - private TableSchema schema; |
67 | | - |
68 | | - /** Partitioner to select Kafka partition for each item. */ |
69 | | - protected Optional<FlinkKafkaPartitioner<CRow>> partitioner; |
70 | | - |
71 | | - private String[] partitionKeys; |
72 | | - |
| 32 | +public class KafkaSink extends AbstractKafkaSink { |
73 | 33 | @Override |
74 | 34 | public KafkaSink genStreamSink(AbstractTargetTableInfo targetTableInfo) { |
75 | 35 | KafkaSinkTableInfo kafkaSinkTableInfo = (KafkaSinkTableInfo) targetTableInfo; |
76 | | - this.topic = kafkaSinkTableInfo.getTopic(); |
77 | | - |
78 | | - properties = new Properties(); |
79 | | - properties.setProperty("bootstrap.servers", kafkaSinkTableInfo.getBootstrapServers()); |
80 | 36 |
|
81 | | - for (String key : kafkaSinkTableInfo.getKafkaParamKeys()) { |
82 | | - properties.setProperty(key, kafkaSinkTableInfo.getKafkaParam(key)); |
83 | | - } |
| 37 | + Properties kafkaProperties = getKafkaProperties(kafkaSinkTableInfo); |
| 38 | + this.tableName = kafkaSinkTableInfo.getName(); |
| 39 | + this.topic = kafkaSinkTableInfo.getTopic(); |
84 | 40 | this.partitioner = Optional.of(new CustomerFlinkPartition<>()); |
85 | 41 | this.partitionKeys = getPartitionKeys(kafkaSinkTableInfo); |
86 | 42 | this.fieldNames = kafkaSinkTableInfo.getFields(); |
87 | | - TypeInformation[] types = new TypeInformation[kafkaSinkTableInfo.getFields().length]; |
88 | | - for (int i = 0; i < kafkaSinkTableInfo.getFieldClasses().length; i++) { |
89 | | - types[i] = TypeInformation.of(kafkaSinkTableInfo.getFieldClasses()[i]); |
90 | | - } |
91 | | - this.fieldTypes = types; |
92 | | - |
93 | | - TableSchema.Builder schemaBuilder = TableSchema.builder(); |
94 | | - for (int i=0;i<fieldNames.length;i++) { |
95 | | - schemaBuilder.field(fieldNames[i], fieldTypes[i]); |
96 | | - } |
97 | | - this.schema = schemaBuilder.build(); |
98 | | - |
99 | | - Integer parallelism = kafkaSinkTableInfo.getParallelism(); |
100 | | - if (parallelism != null) { |
101 | | - this.parallelism = parallelism; |
102 | | - } |
103 | | - |
104 | | - typeInformation = new CRowTypeInfo(new RowTypeInfo(fieldTypes, fieldNames)); |
105 | | - this.flinkKafkaProducer = (FlinkKafkaProducer<CRow>) new KafkaProducerFactory() |
106 | | - .createKafkaProducer(kafkaSinkTableInfo, typeInformation, properties, partitioner, partitionKeys); |
107 | | - return this; |
108 | | - } |
109 | | - |
110 | | - @Override |
111 | | - public TypeInformation<Row> getRecordType() { |
112 | | - return new RowTypeInfo(fieldTypes, fieldNames); |
113 | | - } |
114 | | - |
115 | | - @Override |
116 | | - public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) { |
117 | | - DataStream<CRow> mapDataStream = dataStream |
118 | | - .map((Tuple2<Boolean, Row> record) -> new CRow(record.f1, record.f0)) |
119 | | - .returns(typeInformation) |
120 | | - .setParallelism(parallelism); |
121 | | - |
122 | | - mapDataStream.addSink(flinkKafkaProducer).name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames())); |
123 | | - } |
124 | | - |
125 | | - @Override |
126 | | - public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() { |
127 | | - return new TupleTypeInfo(org.apache.flink.table.api.Types.BOOLEAN(), new RowTypeInfo(fieldTypes, fieldNames)); |
128 | | - } |
129 | | - |
130 | | - @Override |
131 | | - public String[] getFieldNames() { |
132 | | - return fieldNames; |
133 | | - } |
134 | | - |
135 | | - @Override |
136 | | - public TypeInformation<?>[] getFieldTypes() { |
137 | | - return fieldTypes; |
138 | | - } |
139 | | - |
140 | | - @Override |
141 | | - public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { |
142 | | - this.fieldNames = fieldNames; |
143 | | - this.fieldTypes = fieldTypes; |
| 43 | + this.fieldTypes = getTypeInformations(kafkaSinkTableInfo); |
| 44 | + this.schema = buildTableSchema(fieldNames, fieldTypes); |
| 45 | + this.parallelism = kafkaSinkTableInfo.getParallelism(); |
| 46 | + this.sinkOperatorName = SINK_OPERATOR_NAME_TPL.replace("${topic}", topic).replace("${table}", tableName); |
| 47 | + this.kafkaProducer = new KafkaProducerFactory().createKafkaProducer(kafkaSinkTableInfo, getRowTypeInfo(), kafkaProperties, partitioner, partitionKeys); |
144 | 48 | return this; |
145 | 49 | } |
146 | 50 |
|
147 | | - private String[] getPartitionKeys(KafkaSinkTableInfo kafkaSinkTableInfo){ |
148 | | - if(StringUtils.isNotBlank(kafkaSinkTableInfo.getPartitionKeys())){ |
149 | | - return StringUtils.split(kafkaSinkTableInfo.getPartitionKeys(), ','); |
150 | | - } |
151 | | - return null; |
152 | | - } |
153 | 51 | } |
0 commit comments