Skip to content

Commit f2fef32

Browse files
author
dapeng
committed
fix last one
1 parent f0687cb commit f2fef32

File tree

4 files changed

+4
-5
lines changed

4 files changed

+4
-5
lines changed

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerFlinkPartition.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,8 @@ public class CustomerFlinkPartition<T> extends FlinkFixedPartitioner<T> {
1111
@Override
1212
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
1313
Preconditions.checkArgument(partitions != null && partitions.length > 0, "Partitions of the target topic is empty.");
14-
System.out.println("key = " + key+ ", value = " + value+ ", targetTopic = " + targetTopic + ", partitions = " + Arrays.toString(partitions));
1514
if(key == null){
16-
super.partition(record, key, value, targetTopic, partitions);
15+
return super.partition(record, key, value, targetTopic, partitions);
1716
}
1817
return partitions[key.hashCode() % partitions.length];
1918
}

kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
7777
for (String key : kafkaSinkTableInfo.getKafkaParamKeys()) {
7878
properties.setProperty(key, kafkaSinkTableInfo.getKafkaParam(key));
7979
}
80-
this.partitioner = Optional.of(getFlinkPartitioner(kafkaSinkTableInfo));
80+
this.partitioner = Optional.of(new CustomerFlinkPartition<>());
8181
this.partitionKeys = getPartitionKeys(kafkaSinkTableInfo);
8282
this.fieldNames = kafkaSinkTableInfo.getFields();
8383
TypeInformation[] types = new TypeInformation[kafkaSinkTableInfo.getFields().length];

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
8181
properties.setProperty(key, kafka09SinkTableInfo.getKafkaParam(key));
8282
}
8383

84-
this.partitioner = Optional.of(getFlinkPartitioner(kafka09SinkTableInfo));
84+
this.partitioner = Optional.of(new CustomerFlinkPartition<>());
8585
this.partitionKeys = getPartitionKeys(kafka09SinkTableInfo);
8686
this.fieldNames = kafka09SinkTableInfo.getFields();
8787
TypeInformation[] types = new TypeInformation[kafka09SinkTableInfo.getFields().length];

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
8282
for (String key : kafka11SinkTableInfo.getKafkaParamKeys()) {
8383
properties.setProperty(key, kafka11SinkTableInfo.getKafkaParam(key));
8484
}
85-
this.partitioner = Optional.of(getFlinkPartitioner(kafka11SinkTableInfo));
85+
this.partitioner = Optional.of(new CustomerFlinkPartition<>());
8686
this.partitionKeys = getPartitionKeys(kafka11SinkTableInfo);
8787
this.fieldNames = kafka11SinkTableInfo.getFields();
8888
TypeInformation[] types = new TypeInformation[kafka11SinkTableInfo.getFields().length];

0 commit comments

Comments
 (0)