Skip to content

Commit a9794d9

Browse files
committed
Merge remote-tracking branch 'origin/v1.9.0_dev' into v1.10.0_dev
2 parents cd047a6 + 648ce78 commit a9794d9

File tree

6 files changed

+559
-450
lines changed

6 files changed

+559
-450
lines changed
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.kafka;
20+
21+
import com.dtstack.flink.sql.sink.IStreamSinkGener;
22+
import com.dtstack.flink.sql.sink.kafka.table.KafkaSinkTableInfo;
23+
import org.apache.commons.lang3.StringUtils;
24+
import org.apache.flink.api.common.typeinfo.TypeInformation;
25+
import org.apache.flink.api.java.tuple.Tuple2;
26+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
27+
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
28+
import org.apache.flink.streaming.api.datastream.DataStream;
29+
import org.apache.flink.streaming.api.datastream.DataStreamSink;
30+
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
31+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
32+
import org.apache.flink.table.api.TableSchema;
33+
import org.apache.flink.table.sinks.RetractStreamTableSink;
34+
import org.apache.flink.table.sinks.TableSink;
35+
import org.apache.flink.table.types.DataType;
36+
import org.apache.flink.types.Row;
37+
import org.apache.flink.util.Preconditions;
38+
import org.apache.kafka.clients.consumer.ConsumerConfig;
39+
40+
import java.util.Optional;
41+
import java.util.Properties;
42+
import java.util.stream.IntStream;
43+
44+
import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
45+
46+
/**
47+
* Date: 2020/3/30
48+
* Company: www.dtstack.com
49+
* @author maqi
50+
*/
51+
public abstract class AbstractKafkaSink implements RetractStreamTableSink<Row>, IStreamSinkGener {
52+
public static final String SINK_OPERATOR_NAME_TPL = "${topic}_${table}";
53+
54+
protected String[] fieldNames;
55+
protected TypeInformation<?>[] fieldTypes;
56+
57+
protected String[] partitionKeys;
58+
protected String sinkOperatorName;
59+
protected Properties properties;
60+
protected int parallelism;
61+
protected String topic;
62+
protected String tableName;
63+
64+
protected TableSchema schema;
65+
protected SinkFunction<Tuple2<Boolean,Row>> kafkaProducer011;
66+
67+
protected Optional<FlinkKafkaPartitioner<Tuple2<Boolean,Row>>> partitioner;
68+
69+
protected Properties getKafkaProperties(KafkaSinkTableInfo KafkaSinkTableInfo) {
70+
Properties props = new Properties();
71+
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaSinkTableInfo.getBootstrapServers());
72+
73+
for (String key : KafkaSinkTableInfo.getKafkaParamKeys()) {
74+
props.setProperty(key, KafkaSinkTableInfo.getKafkaParam(key));
75+
}
76+
return props;
77+
}
78+
79+
protected TypeInformation[] getTypeInformations(KafkaSinkTableInfo kafka11SinkTableInfo) {
80+
Class<?>[] fieldClasses = kafka11SinkTableInfo.getFieldClasses();
81+
TypeInformation[] types = IntStream.range(0, fieldClasses.length)
82+
.mapToObj(i -> TypeInformation.of(fieldClasses[i]))
83+
.toArray(TypeInformation[]::new);
84+
return types;
85+
}
86+
87+
88+
protected TableSchema buildTableSchema(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
89+
Preconditions.checkArgument(fieldNames.length == fieldTypes.length, "fieldNames length must equals fieldTypes length !");
90+
91+
DataType[] dataTypes = IntStream.range(0, fieldTypes.length)
92+
.mapToObj(i -> fromLegacyInfoToDataType(fieldTypes[i]))
93+
.toArray(DataType[]::new);
94+
95+
TableSchema tableSchema = TableSchema.builder()
96+
.fields(fieldNames, dataTypes)
97+
.build();
98+
return tableSchema;
99+
}
100+
101+
protected String[] getPartitionKeys(KafkaSinkTableInfo kafkaSinkTableInfo) {
102+
if (StringUtils.isNotBlank(kafkaSinkTableInfo.getPartitionKeys())) {
103+
return StringUtils.split(kafkaSinkTableInfo.getPartitionKeys(), ',');
104+
}
105+
return null;
106+
}
107+
108+
@Override
109+
public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
110+
DataStreamSink<Tuple2<Boolean, Row>> dataStreamSink = dataStream.addSink(kafkaProducer011).name(sinkOperatorName);
111+
if (parallelism > 0) {
112+
dataStreamSink.setParallelism(parallelism);
113+
}
114+
return dataStreamSink;
115+
}
116+
117+
@Override
118+
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
119+
consumeDataStream(dataStream);
120+
}
121+
122+
@Override
123+
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
124+
this.fieldNames = fieldNames;
125+
this.fieldTypes = fieldTypes;
126+
return this;
127+
}
128+
129+
@Override
130+
public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
131+
return new TupleTypeInfo(org.apache.flink.table.api.Types.BOOLEAN(), new RowTypeInfo(fieldTypes, fieldNames));
132+
}
133+
134+
@Override
135+
public TableSchema getTableSchema() {
136+
return schema;
137+
}
138+
139+
@Override
140+
public TypeInformation<Row> getRecordType() {
141+
return new RowTypeInfo(fieldTypes, fieldNames);
142+
}
143+
}

0 commit comments

Comments
 (0)