Skip to content

Commit e0b29ca

Browse files
author
dapeng
committed
fix blank and hashcode question
1 parent c3d8ed4 commit e0b29ca

File tree

2 files changed

+3
-2
lines changed

2 files changed

+3
-2
lines changed

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerFlinkPartition.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,11 @@ public void open(int parallelInstanceId, int parallelInstances) {
2222

2323
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
2424
Preconditions.checkArgument(partitions != null && partitions.length > 0, "Partitions of the target topic is empty.");
25+
System.out.println("partitionKey=" + new String(key));
2526
if(key == null){
2627
return partitions[this.parallelInstanceId % partitions.length];
2728
}
28-
return partitions[key.hashCode() % partitions.length];
29+
return partitions[new String(key).hashCode() % partitions.length];
2930
}
3031

3132
public boolean equals(Object o) {

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKeyedSerializationSchema.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ private byte[] serializeJsonKey(JsonRowSerializationSchema jsonRowSerializationS
4848
StringBuilder sb = new StringBuilder();
4949
for(String key : partitionKeys){
5050
if(objectNode.has(key)){
51-
sb.append(objectNode.get(key));
51+
sb.append(objectNode.get(key.trim()));
5252
}
5353
}
5454
return sb.toString().getBytes();

0 commit comments

Comments
 (0)