Skip to content

Commit ea131b6

Browse files
author
dapeng
committed
so
1 parent c64518e commit ea131b6

File tree

3 files changed

+2
-5
lines changed

3 files changed

+2
-5
lines changed

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerFlinkPartition.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@ public int partition(T record, byte[] key, byte[] value, String targetTopic, int
1414
if(key == null){
1515
return super.partition(record, key, value, targetTopic, partitions);
1616
}
17-
System.out.println("hashcode=" + key.hashCode());
18-
Random random = new Random();
19-
return partitions[random.nextInt() % partitions.length];
17+
return partitions[key.hashCode() % partitions.length];
2018
}
2119
}

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKeyedSerializationSchema.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ private byte[] serializeJsonKey(JsonRowSerializationSchema jsonRowSerializationS
5151
sb.append(objectNode.get(key));
5252
}
5353
}
54-
System.out.println("serialKey=" + sb.toString());
5554
return sb.toString().getBytes();
5655
}catch (Exception e){
5756

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer010.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class KafkaProducer010 extends FlinkKafkaProducer010<Row> {
4040
private SerializationMetricWrapper serializationMetricWrapper;
4141

4242
public KafkaProducer010(String topicId, SerializationSchema<Row> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<Row>> customPartitioner, String[] partitionKeys) {
43-
super(topicId, new CustomerKeyedSerializationSchema((SerializationMetricWrapper)serializationSchema, partitionKeys), producerConfig, customPartitioner.orElse(null));
43+
super(topicId, new CustomerKeyedSerializationSchema((SerializationMetricWrapper)serializationSchema, partitionKeys), producerConfig, customPartitioner.get());
4444
this.serializationMetricWrapper = (SerializationMetricWrapper) serializationSchema;
4545
}
4646

0 commit comments

Comments
 (0)