Skip to content

Commit 5f546c8

Browse files
committed
Merge branch 'feat_1.8_keyPartition_v2' into 'feat_keyPartition_mergeTest'
Feat 1.8 key partition v2 See merge request !259
2 parents b3bc749 + a41d72e commit 5f546c8

File tree

8 files changed

+39
-5
lines changed

8 files changed

+39
-5
lines changed

core/src/main/java/com/dtstack/flink/sql/format/SerializationMetricWrapper.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
*/
3737
public class SerializationMetricWrapper implements SerializationSchema<Row> {
3838

39+
private static final long serialVersionUID = 1L;
40+
3941
private SerializationSchema<Row> serializationSchema;
4042

4143
private transient RuntimeContext runtimeContext;

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerFlinkPartition.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package com.dtstack.flink.sql.sink.kafka;
22

33

4-
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
5-
64
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
75
import org.apache.flink.util.Preconditions;
86

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKeyedSerializationSchema.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,14 @@
1111
import org.slf4j.Logger;
1212
import org.slf4j.LoggerFactory;
1313

14+
import java.util.concurrent.atomic.AtomicLong;
15+
1416
public class CustomerKeyedSerializationSchema implements KeyedSerializationSchema<Row> {
1517

1618
private static final Logger LOG = LoggerFactory.getLogger(CustomerKeyedSerializationSchema.class);
1719

20+
private static final AtomicLong COUNTER = new AtomicLong(0L);
21+
1822
private static final long serialVersionUID = 1L;
1923
private final SerializationMetricWrapper serializationMetricWrapper;
2024
private String[] partitionKeys;
@@ -57,7 +61,9 @@ private byte[] serializeJsonKey(JsonRowSerializationSchema jsonRowSerializationS
5761
}
5862
return sb.toString().getBytes();
5963
} catch (Exception e){
60-
LOG.error("serializeJsonKey error", e);
64+
if(COUNTER.getAndIncrement() % 1000 == 0){
65+
LOG.error("serializeJsonKey error", e);
66+
}
6167
}
6268
return null;
6369
}

kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
2525
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
2626
import org.apache.flink.types.Row;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
2729

2830
import java.util.Optional;
2931
import java.util.Properties;
@@ -37,6 +39,10 @@
3739
*/
3840
public class KafkaProducer extends FlinkKafkaProducer<Row> {
3941

42+
private static final Logger LOG = LoggerFactory.getLogger(KafkaProducer.class);
43+
44+
private static final long serialVersionUID = 1L;
45+
4046
private SerializationMetricWrapper serializationMetricWrapper;
4147

4248
public KafkaProducer(String topicId, SerializationSchema<Row> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<Row>> customPartitioner, String[] parititonKeys) {
@@ -46,6 +52,7 @@ public KafkaProducer(String topicId, SerializationSchema<Row> serializationSchem
4652

4753
@Override
4854
public void open(Configuration configuration) throws Exception {
55+
LOG.warn("---open KafkaProducer--");
4956
RuntimeContext runtimeContext = getRuntimeContext();
5057
serializationMetricWrapper.setRuntimeContext(runtimeContext);
5158
serializationMetricWrapper.initMetric();

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer09.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
2525
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
2626
import org.apache.flink.types.Row;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
2729

2830
import java.util.Optional;
2931
import java.util.Properties;
@@ -37,6 +39,10 @@
3739
*/
3840
public class KafkaProducer09 extends FlinkKafkaProducer09<Row> {
3941

42+
private static final Logger LOG = LoggerFactory.getLogger(KafkaProducer09.class);
43+
44+
private static final long serialVersionUID = 1L;
45+
4046
private SerializationMetricWrapper serializationMetricWrapper;
4147

4248
public KafkaProducer09(String topicId, SerializationSchema<Row> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<Row>> customPartitioner,String[] partitionKeys) {
@@ -46,6 +52,7 @@ public KafkaProducer09(String topicId, SerializationSchema<Row> serializationSch
4652

4753
@Override
4854
public void open(Configuration configuration) {
55+
LOG.info("----KafkaProducer09 open ---");
4956
RuntimeContext runtimeContext = getRuntimeContext();
5057
serializationMetricWrapper.setRuntimeContext(runtimeContext);
5158
serializationMetricWrapper.initMetric();

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer010.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
2525
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
2626
import org.apache.flink.types.Row;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
2729

2830
import java.util.Optional;
2931
import java.util.Properties;
@@ -37,6 +39,10 @@
3739
*/
3840
public class KafkaProducer010 extends FlinkKafkaProducer010<Row> {
3941

42+
private static final Logger LOG = LoggerFactory.getLogger(KafkaProducer010.class);
43+
44+
private static final long serialVersionUID = 1L;
45+
4046
private SerializationMetricWrapper serializationMetricWrapper;
4147

4248
public KafkaProducer010(String topicId, SerializationSchema<Row> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<Row>> customPartitioner, String[] partitionKeys) {
@@ -46,6 +52,7 @@ public KafkaProducer010(String topicId, SerializationSchema<Row> serializationSc
4652

4753
@Override
4854
public void open(Configuration configuration) {
55+
LOG.info("----open KafkaProducer010 --");
4956
RuntimeContext runtimeContext = getRuntimeContext();
5057
serializationMetricWrapper.setRuntimeContext(runtimeContext);
5158
serializationMetricWrapper.initMetric();

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,8 @@ public TypeInformation<Row> getRecordType() {
108108
@Override
109109
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
110110

111-
RichSinkFunction<Row> kafkaProducer010 = new KafkaProducer010Factory().createKafkaProducer(kafka10SinkTableInfo, getOutputType().getTypeAt(1), properties,
112-
Optional.of(new CustomerFlinkPartition<Row>()), partitionKeys);
111+
RichSinkFunction<Row> kafkaProducer010 = new KafkaProducer010Factory().createKafkaProducer(kafka10SinkTableInfo, getOutputType().getTypeAt(1), properties,
112+
Optional.of(new CustomerFlinkPartition<>()), partitionKeys);
113113

114114
DataStream<Row> mapDataStream = dataStream.filter((Tuple2<Boolean, Row> record) -> record.f0)
115115
.map((Tuple2<Boolean, Row> record) -> record.f1)

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer011.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
2626
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
2727
import org.apache.flink.types.Row;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
2830

2931
import java.util.Optional;
3032
import java.util.Properties;
@@ -38,6 +40,10 @@
3840
*/
3941
public class KafkaProducer011 extends FlinkKafkaProducer011<Row> {
4042

43+
private static final Logger LOG = LoggerFactory.getLogger(KafkaProducer011.class);
44+
45+
private static final long serialVersionUID = 1L;
46+
4147
private SerializationMetricWrapper serializationMetricWrapper;
4248

4349
public KafkaProducer011(String topicId, SerializationSchema<Row> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<Row>> customPartitioner, String[] partitionKeys) {
@@ -47,6 +53,7 @@ public KafkaProducer011(String topicId, SerializationSchema<Row> serializationSc
4753

4854
@Override
4955
public void open(Configuration configuration) throws Exception {
56+
LOG.info("--KafkaProducer011 open--");
5057
RuntimeContext runtimeContext = getRuntimeContext();
5158
serializationMetricWrapper.setRuntimeContext(runtimeContext);
5259
serializationMetricWrapper.initMetric();

0 commit comments

Comments
 (0)