Skip to content

Commit 55715c1

Browse files
committed
Merge branch 'feat_1.8_codereview' into v1.8.0_dev
2 parents 7676cc7 + c2b4d8a commit 55715c1

File tree

5 files changed

+186
-455
lines changed
  • kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
  • kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
  • kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
  • kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
  • kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka

5 files changed

+186
-455
lines changed
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.kafka;
20+
21+
import com.dtstack.flink.sql.sink.IStreamSinkGener;
22+
import com.dtstack.flink.sql.sink.kafka.table.KafkaSinkTableInfo;
23+
import org.apache.commons.lang3.StringUtils;
24+
import org.apache.flink.api.common.typeinfo.TypeInformation;
25+
import org.apache.flink.api.java.tuple.Tuple2;
26+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
27+
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
28+
import org.apache.flink.streaming.api.datastream.DataStream;
29+
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
30+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
31+
import org.apache.flink.table.api.TableSchema;
32+
import org.apache.flink.table.runtime.types.CRow;
33+
import org.apache.flink.table.runtime.types.CRowTypeInfo;
34+
import org.apache.flink.table.sinks.RetractStreamTableSink;
35+
import org.apache.flink.table.sinks.TableSink;
36+
import org.apache.flink.types.Row;
37+
import org.apache.flink.util.Preconditions;
38+
import org.apache.kafka.clients.consumer.ConsumerConfig;
39+
40+
import java.util.Optional;
41+
import java.util.Properties;
42+
import java.util.stream.IntStream;
43+
44+
/**
45+
* Date: 2020/4/1
46+
* Company: www.dtstack.com
47+
* @author maqi
48+
*/
49+
public abstract class AbstractKafkaSink implements RetractStreamTableSink<Row>, IStreamSinkGener {
50+
51+
public static final String SINK_OPERATOR_NAME_TPL = "${topic}_${table}";
52+
53+
protected String[] fieldNames;
54+
protected TypeInformation<?>[] fieldTypes;
55+
56+
protected String[] partitionKeys;
57+
protected String sinkOperatorName;
58+
protected Properties properties;
59+
protected int parallelism;
60+
protected String topic;
61+
protected String tableName;
62+
63+
protected TableSchema schema;
64+
protected SinkFunction<CRow> kafkaProducer;
65+
66+
67+
protected Optional<FlinkKafkaPartitioner<CRow>> partitioner;
68+
69+
protected Properties getKafkaProperties(KafkaSinkTableInfo KafkaSinkTableInfo) {
70+
Properties props = new Properties();
71+
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaSinkTableInfo.getBootstrapServers());
72+
73+
for (String key : KafkaSinkTableInfo.getKafkaParamKeys()) {
74+
props.setProperty(key, KafkaSinkTableInfo.getKafkaParam(key));
75+
}
76+
return props;
77+
}
78+
79+
protected TypeInformation[] getTypeInformations(KafkaSinkTableInfo kafka11SinkTableInfo) {
80+
Class<?>[] fieldClasses = kafka11SinkTableInfo.getFieldClasses();
81+
TypeInformation[] types = IntStream.range(0, fieldClasses.length)
82+
.mapToObj(i -> TypeInformation.of(fieldClasses[i]))
83+
.toArray(TypeInformation[]::new);
84+
return types;
85+
}
86+
87+
88+
protected TableSchema buildTableSchema(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
89+
Preconditions.checkArgument(fieldNames.length == fieldTypes.length, "fieldNames length must equals fieldTypes length !");
90+
91+
TableSchema.Builder builder = TableSchema.builder();
92+
IntStream.range(0, fieldTypes.length)
93+
.forEach(i -> builder.field(fieldNames[i], fieldTypes[i]));
94+
95+
return builder.build();
96+
}
97+
98+
99+
@Override
100+
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
101+
DataStream<CRow> mapDataStream = dataStream
102+
.map((Tuple2<Boolean, Row> record) -> new CRow(record.f1, record.f0))
103+
.returns(getRowTypeInfo())
104+
.setParallelism(parallelism);
105+
106+
mapDataStream.addSink(kafkaProducer).name(sinkOperatorName);
107+
}
108+
109+
public CRowTypeInfo getRowTypeInfo() {
110+
return new CRowTypeInfo(new RowTypeInfo(fieldTypes, fieldNames));
111+
}
112+
113+
protected String[] getPartitionKeys(KafkaSinkTableInfo kafkaSinkTableInfo) {
114+
if (StringUtils.isNotBlank(kafkaSinkTableInfo.getPartitionKeys())) {
115+
return StringUtils.split(kafkaSinkTableInfo.getPartitionKeys(), ',');
116+
}
117+
return null;
118+
}
119+
120+
@Override
121+
public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
122+
return new TupleTypeInfo(org.apache.flink.table.api.Types.BOOLEAN(), new RowTypeInfo(fieldTypes, fieldNames));
123+
}
124+
125+
@Override
126+
public String[] getFieldNames() {
127+
return fieldNames;
128+
}
129+
130+
@Override
131+
public TypeInformation<?>[] getFieldTypes() {
132+
return fieldTypes;
133+
}
134+
135+
@Override
136+
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
137+
this.fieldNames = fieldNames;
138+
this.fieldTypes = fieldTypes;
139+
return this;
140+
}
141+
142+
@Override
143+
public TypeInformation<Row> getRecordType() {
144+
return new RowTypeInfo(fieldTypes, fieldNames);
145+
}
146+
147+
148+
}

kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 9 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,8 @@
1818

1919
package com.dtstack.flink.sql.sink.kafka;
2020

21-
import com.dtstack.flink.sql.sink.IStreamSinkGener;
2221
import com.dtstack.flink.sql.sink.kafka.table.KafkaSinkTableInfo;
2322
import com.dtstack.flink.sql.table.AbstractTargetTableInfo;
24-
import org.apache.commons.lang3.StringUtils;
25-
import org.apache.flink.api.common.typeinfo.TypeInformation;
26-
import org.apache.flink.api.java.tuple.Tuple2;
27-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28-
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
29-
import org.apache.flink.streaming.api.datastream.DataStream;
30-
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
31-
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
32-
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
33-
import org.apache.flink.table.api.TableSchema;
34-
import org.apache.flink.table.runtime.types.CRow;
35-
import org.apache.flink.table.runtime.types.CRowTypeInfo;
36-
import org.apache.flink.table.sinks.RetractStreamTableSink;
37-
import org.apache.flink.table.sinks.TableSink;
38-
import org.apache.flink.table.utils.TableConnectorUtils;
39-
import org.apache.flink.types.Row;
4023

4124
import java.util.Optional;
4225
import java.util.Properties;
@@ -46,108 +29,23 @@
4629
* @create: 2019-11-05 11:45
4730
* @description:
4831
**/
49-
public class KafkaSink implements RetractStreamTableSink<Row>, IStreamSinkGener<KafkaSink> {
50-
51-
protected String[] fieldNames;
52-
53-
protected TypeInformation<?>[] fieldTypes;
54-
55-
protected String topic;
56-
57-
protected int parallelism;
58-
59-
protected Properties properties;
60-
61-
protected FlinkKafkaProducer<CRow> flinkKafkaProducer;
62-
protected CRowTypeInfo typeInformation;
63-
64-
65-
/** The schema of the table. */
66-
private TableSchema schema;
67-
68-
/** Partitioner to select Kafka partition for each item. */
69-
protected Optional<FlinkKafkaPartitioner<CRow>> partitioner;
70-
71-
private String[] partitionKeys;
72-
32+
public class KafkaSink extends AbstractKafkaSink {
7333
@Override
7434
public KafkaSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
7535
KafkaSinkTableInfo kafkaSinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
76-
this.topic = kafkaSinkTableInfo.getTopic();
77-
78-
properties = new Properties();
79-
properties.setProperty("bootstrap.servers", kafkaSinkTableInfo.getBootstrapServers());
8036

81-
for (String key : kafkaSinkTableInfo.getKafkaParamKeys()) {
82-
properties.setProperty(key, kafkaSinkTableInfo.getKafkaParam(key));
83-
}
37+
Properties kafkaProperties = getKafkaProperties(kafkaSinkTableInfo);
38+
this.tableName = kafkaSinkTableInfo.getName();
39+
this.topic = kafkaSinkTableInfo.getTopic();
8440
this.partitioner = Optional.of(new CustomerFlinkPartition<>());
8541
this.partitionKeys = getPartitionKeys(kafkaSinkTableInfo);
8642
this.fieldNames = kafkaSinkTableInfo.getFields();
87-
TypeInformation[] types = new TypeInformation[kafkaSinkTableInfo.getFields().length];
88-
for (int i = 0; i < kafkaSinkTableInfo.getFieldClasses().length; i++) {
89-
types[i] = TypeInformation.of(kafkaSinkTableInfo.getFieldClasses()[i]);
90-
}
91-
this.fieldTypes = types;
92-
93-
TableSchema.Builder schemaBuilder = TableSchema.builder();
94-
for (int i=0;i<fieldNames.length;i++) {
95-
schemaBuilder.field(fieldNames[i], fieldTypes[i]);
96-
}
97-
this.schema = schemaBuilder.build();
98-
99-
Integer parallelism = kafkaSinkTableInfo.getParallelism();
100-
if (parallelism != null) {
101-
this.parallelism = parallelism;
102-
}
103-
104-
typeInformation = new CRowTypeInfo(new RowTypeInfo(fieldTypes, fieldNames));
105-
this.flinkKafkaProducer = (FlinkKafkaProducer<CRow>) new KafkaProducerFactory()
106-
.createKafkaProducer(kafkaSinkTableInfo, typeInformation, properties, partitioner, partitionKeys);
107-
return this;
108-
}
109-
110-
@Override
111-
public TypeInformation<Row> getRecordType() {
112-
return new RowTypeInfo(fieldTypes, fieldNames);
113-
}
114-
115-
@Override
116-
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
117-
DataStream<CRow> mapDataStream = dataStream
118-
.map((Tuple2<Boolean, Row> record) -> new CRow(record.f1, record.f0))
119-
.returns(typeInformation)
120-
.setParallelism(parallelism);
121-
122-
mapDataStream.addSink(flinkKafkaProducer).name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
123-
}
124-
125-
@Override
126-
public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
127-
return new TupleTypeInfo(org.apache.flink.table.api.Types.BOOLEAN(), new RowTypeInfo(fieldTypes, fieldNames));
128-
}
129-
130-
@Override
131-
public String[] getFieldNames() {
132-
return fieldNames;
133-
}
134-
135-
@Override
136-
public TypeInformation<?>[] getFieldTypes() {
137-
return fieldTypes;
138-
}
139-
140-
@Override
141-
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
142-
this.fieldNames = fieldNames;
143-
this.fieldTypes = fieldTypes;
43+
this.fieldTypes = getTypeInformations(kafkaSinkTableInfo);
44+
this.schema = buildTableSchema(fieldNames, fieldTypes);
45+
this.parallelism = kafkaSinkTableInfo.getParallelism();
46+
this.sinkOperatorName = SINK_OPERATOR_NAME_TPL.replace("${topic}", topic).replace("${table}", tableName);
47+
this.kafkaProducer = new KafkaProducerFactory().createKafkaProducer(kafkaSinkTableInfo, getRowTypeInfo(), kafkaProperties, partitioner, partitionKeys);
14448
return this;
14549
}
14650

147-
private String[] getPartitionKeys(KafkaSinkTableInfo kafkaSinkTableInfo){
148-
if(StringUtils.isNotBlank(kafkaSinkTableInfo.getPartitionKeys())){
149-
return StringUtils.split(kafkaSinkTableInfo.getPartitionKeys(), ',');
150-
}
151-
return null;
152-
}
15351
}

0 commit comments

Comments
 (0)