Skip to content

Commit 3db8088

Browse files
author
dapeng
committed
log fix and abs hashcode
1 parent e0b29ca commit 3db8088

File tree

1 file changed

+3
-2
lines changed

1 file changed

+3
-2
lines changed

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerFlinkPartition.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ public void open(int parallelInstanceId, int parallelInstances) {
2222

2323
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
2424
Preconditions.checkArgument(partitions != null && partitions.length > 0, "Partitions of the target topic is empty.");
25-
System.out.println("partitionKey=" + new String(key));
2625
if(key == null){
26+
System.out.println("partitionKey=null");
2727
return partitions[this.parallelInstanceId % partitions.length];
2828
}
29-
return partitions[new String(key).hashCode() % partitions.length];
29+
System.out.println("partitionKey=" + new String(key));
30+
return partitions[Math.abs(new String(key).hashCode()) % partitions.length];
3031
}
3132

3233
public boolean equals(Object o) {

0 commit comments

Comments
 (0)