Skip to content

Commit c64518e

Browse files
author
dapeng
committed
out test
1 parent f2fef32 commit c64518e

File tree

2 files changed

+4
-1
lines changed

2 files changed

+4
-1
lines changed

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerFlinkPartition.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ public int partition(T record, byte[] key, byte[] value, String targetTopic, int
1414
if(key == null){
1515
return super.partition(record, key, value, targetTopic, partitions);
1616
}
17-
return partitions[key.hashCode() % partitions.length];
17+
System.out.println("hashcode=" + key.hashCode());
18+
Random random = new Random();
19+
return partitions[random.nextInt() % partitions.length];
1820
}
1921
}

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKeyedSerializationSchema.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ private byte[] serializeJsonKey(JsonRowSerializationSchema jsonRowSerializationS
5151
sb.append(objectNode.get(key));
5252
}
5353
}
54+
System.out.println("serialKey=" + sb.toString());
5455
return sb.toString().getBytes();
5556
}catch (Exception e){
5657

0 commit comments

Comments
 (0)