Skip to content

Commit ec2b84e

Browse files
committed
kafk update mode
1 parent 14446f4 commit ec2b84e

File tree

23 files changed

+1067
-86
lines changed

23 files changed

+1067
-86
lines changed

core/src/main/java/com/dtstack/flink/sql/format/SerializationMetricWrapper.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.flink.metrics.Counter;
2525
import org.apache.flink.metrics.Meter;
2626
import org.apache.flink.metrics.MeterView;
27+
import org.apache.flink.table.runtime.types.CRow;
2728
import org.apache.flink.types.Row;
2829

2930

@@ -34,11 +35,11 @@
3435
* author: toutian
3536
* create: 2019/12/24
3637
*/
37-
public class SerializationMetricWrapper implements SerializationSchema<Row> {
38+
public class SerializationMetricWrapper implements SerializationSchema<CRow> {
3839

3940
private static final long serialVersionUID = 1L;
4041

41-
private SerializationSchema<Row> serializationSchema;
42+
private SerializationSchema<CRow> serializationSchema;
4243

4344
private transient RuntimeContext runtimeContext;
4445

@@ -47,7 +48,7 @@ public class SerializationMetricWrapper implements SerializationSchema<Row> {
4748
protected transient Meter dtNumRecordsOutRate;
4849

4950

50-
public SerializationMetricWrapper(SerializationSchema<Row> serializationSchema) {
51+
public SerializationMetricWrapper(SerializationSchema<CRow> serializationSchema) {
5152
this.serializationSchema = serializationSchema;
5253
}
5354

@@ -57,7 +58,7 @@ public void initMetric() {
5758
}
5859

5960
@Override
60-
public byte[] serialize(Row element) {
61+
public byte[] serialize(CRow element) {
6162
beforeSerialize();
6263
byte[] row = serializationSchema.serialize(element);
6364
afterSerialize();
@@ -79,7 +80,7 @@ public void setRuntimeContext(RuntimeContext runtimeContext) {
7980
this.runtimeContext = runtimeContext;
8081
}
8182

82-
public SerializationSchema<Row> getSerializationSchema() {
83+
public SerializationSchema<CRow> getSerializationSchema() {
8384
return serializationSchema;
8485
}
8586

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/AbstractKafkaProducerFactory.java

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,18 @@
1919

2020
import com.dtstack.flink.sql.format.FormatType;
2121
import com.dtstack.flink.sql.format.SerializationMetricWrapper;
22+
import com.dtstack.flink.sql.sink.kafka.serialization.AvroCRowSerializationSchema;
23+
import com.dtstack.flink.sql.sink.kafka.serialization.CsvCRowSerializationSchema;
24+
import com.dtstack.flink.sql.sink.kafka.serialization.JsonCRowSerializationSchema;
2225
import com.dtstack.flink.sql.sink.kafka.table.KafkaSinkTableInfo;
2326
import org.apache.commons.lang3.StringUtils;
2427
import org.apache.flink.api.common.serialization.SerializationSchema;
2528
import org.apache.flink.api.common.typeinfo.TypeInformation;
2629
import org.apache.flink.formats.avro.AvroRowSerializationSchema;
2730
import org.apache.flink.formats.csv.CsvRowSerializationSchema;
28-
import org.apache.flink.formats.json.JsonRowSerializationSchema;
2931
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
3032
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
33+
import org.apache.flink.table.runtime.types.CRow;
3134
import org.apache.flink.types.Row;
3235

3336
import java.util.Optional;
@@ -51,42 +54,36 @@ public abstract class AbstractKafkaProducerFactory {
5154
* @param partitioner
5255
* @return
5356
*/
54-
public abstract RichSinkFunction<Row> createKafkaProducer(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation<Row> typeInformation, Properties properties, Optional<FlinkKafkaPartitioner<Row>> partitioner, String[] partitionKeys);
57+
public abstract RichSinkFunction<CRow> createKafkaProducer(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation<CRow> typeInformation, Properties properties, Optional<FlinkKafkaPartitioner<CRow>> partitioner, String[] partitionKeys);
5558

56-
protected SerializationMetricWrapper createSerializationMetricWrapper(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation<Row> typeInformation) {
57-
return new SerializationMetricWrapper(createSerializationSchema(kafkaSinkTableInfo, typeInformation));
59+
protected SerializationMetricWrapper createSerializationMetricWrapper(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation<CRow> typeInformation) {
60+
SerializationSchema<CRow> serializationSchema = createSerializationSchema(kafkaSinkTableInfo, typeInformation);
61+
return new SerializationMetricWrapper(serializationSchema);
5862
}
5963

60-
private SerializationSchema<Row> createSerializationSchema(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation<Row> typeInformation) {
61-
SerializationSchema<Row> serializationSchema = null;
64+
private SerializationSchema<CRow> createSerializationSchema(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation<CRow> typeInformation) {
65+
SerializationSchema<CRow> serializationSchema = null;
6266
if (FormatType.JSON.name().equalsIgnoreCase(kafkaSinkTableInfo.getSinkDataType())) {
63-
6467
if (StringUtils.isNotBlank(kafkaSinkTableInfo.getSchemaString())) {
65-
serializationSchema = new JsonRowSerializationSchema(kafkaSinkTableInfo.getSchemaString());
68+
serializationSchema = new JsonCRowSerializationSchema(kafkaSinkTableInfo.getSchemaString(), kafkaSinkTableInfo.getUpdateMode());
6669
} else if (typeInformation != null && typeInformation.getArity() != 0) {
67-
serializationSchema = new JsonRowSerializationSchema(typeInformation);
70+
serializationSchema = new JsonCRowSerializationSchema(typeInformation, kafkaSinkTableInfo.getUpdateMode());
6871
} else {
6972
throw new IllegalArgumentException("sinkDataType:" + FormatType.JSON.name() + " must set schemaString(JSON Schema)or TypeInformation<Row>");
7073
}
71-
7274
} else if (FormatType.CSV.name().equalsIgnoreCase(kafkaSinkTableInfo.getSinkDataType())) {
73-
7475
if (StringUtils.isBlank(kafkaSinkTableInfo.getFieldDelimiter())) {
7576
throw new IllegalArgumentException("sinkDataType:" + FormatType.CSV.name() + " must set fieldDelimiter");
7677
}
7778

78-
final CsvRowSerializationSchema.Builder serSchemaBuilder = new CsvRowSerializationSchema.Builder(typeInformation);
79+
final CsvCRowSerializationSchema.Builder serSchemaBuilder = new CsvCRowSerializationSchema.Builder(typeInformation);
7980
serSchemaBuilder.setFieldDelimiter(kafkaSinkTableInfo.getFieldDelimiter().toCharArray()[0]);
8081
serializationSchema = serSchemaBuilder.build();
81-
8282
} else if (FormatType.AVRO.name().equalsIgnoreCase(kafkaSinkTableInfo.getSinkDataType())) {
83-
8483
if (StringUtils.isBlank(kafkaSinkTableInfo.getSchemaString())) {
8584
throw new IllegalArgumentException("sinkDataType:" + FormatType.AVRO.name() + " must set schemaString");
8685
}
87-
88-
serializationSchema = new AvroRowSerializationSchema(kafkaSinkTableInfo.getSchemaString());
89-
86+
serializationSchema = new AvroCRowSerializationSchema(kafkaSinkTableInfo.getSchemaString(),kafkaSinkTableInfo.getUpdateMode());
9087
}
9188

9289
if (null == serializationSchema) {

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKeyedSerializationSchema.java

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,20 @@
22

33

44
import com.dtstack.flink.sql.format.SerializationMetricWrapper;
5+
import com.dtstack.flink.sql.sink.kafka.serialization.JsonCRowSerializationSchema;
56
import org.apache.flink.api.common.serialization.SerializationSchema;
67
import org.apache.flink.formats.json.JsonRowSerializationSchema;
78
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
89
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
910
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
11+
import org.apache.flink.table.runtime.types.CRow;
1012
import org.apache.flink.types.Row;
1113
import org.slf4j.Logger;
1214
import org.slf4j.LoggerFactory;
1315

1416
import java.util.concurrent.atomic.AtomicLong;
1517

16-
public class CustomerKeyedSerializationSchema implements KeyedSerializationSchema<Row> {
18+
public class CustomerKeyedSerializationSchema implements KeyedSerializationSchema<CRow> {
1719

1820
private static final Logger LOG = LoggerFactory.getLogger(CustomerKeyedSerializationSchema.class);
1921

@@ -30,38 +32,41 @@ public CustomerKeyedSerializationSchema(SerializationMetricWrapper serialization
3032
this.mapper = new ObjectMapper();
3133
}
3234

33-
public byte[] serializeKey(Row element) {
34-
if(partitionKeys == null || partitionKeys.length <=0){
35+
@Override
36+
public byte[] serializeKey(CRow element) {
37+
if (partitionKeys == null || partitionKeys.length <= 0) {
3538
return null;
36-
}
37-
SerializationSchema<Row> serializationSchema = serializationMetricWrapper.getSerializationSchema();
38-
if(serializationSchema instanceof JsonRowSerializationSchema){
39-
return serializeJsonKey((JsonRowSerializationSchema) serializationSchema, element);
39+
}
40+
SerializationSchema<CRow> serializationSchema = serializationMetricWrapper.getSerializationSchema();
41+
if (serializationSchema instanceof JsonCRowSerializationSchema) {
42+
return serializeJsonKey((JsonCRowSerializationSchema) serializationSchema, element);
4043
}
4144
return null;
4245
}
4346

44-
public byte[] serializeValue(Row element) {
47+
@Override
48+
public byte[] serializeValue(CRow element) {
4549
return this.serializationMetricWrapper.serialize(element);
4650
}
4751

48-
public String getTargetTopic(Row element) {
52+
@Override
53+
public String getTargetTopic(CRow element) {
4954
return null;
5055
}
5156

52-
private byte[] serializeJsonKey(JsonRowSerializationSchema jsonRowSerializationSchema, Row element) {
57+
private byte[] serializeJsonKey(JsonCRowSerializationSchema jsonCRowSerializationSchema, CRow element) {
5358
try {
54-
byte[] data = jsonRowSerializationSchema.serialize(element);
59+
byte[] data = jsonCRowSerializationSchema.serialize(element);
5560
ObjectNode objectNode = mapper.readValue(data, ObjectNode.class);
5661
StringBuilder sb = new StringBuilder();
57-
for(String key : partitionKeys){
58-
if(objectNode.has(key)){
62+
for (String key : partitionKeys) {
63+
if (objectNode.has(key)) {
5964
sb.append(objectNode.get(key.trim()));
6065
}
6166
}
6267
return sb.toString().getBytes();
63-
} catch (Exception e){
64-
if(COUNTER.getAndIncrement() % 1000 == 0){
68+
} catch (Exception e) {
69+
if (COUNTER.getAndIncrement() % 1000 == 0) {
6570
LOG.error("serializeJsonKey error", e);
6671
}
6772
}

0 commit comments

Comments
 (0)