|
3 | 3 |
|
4 | 4 | import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; |
5 | 5 |
|
| 6 | +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; |
6 | 7 | import org.apache.flink.util.Preconditions; |
7 | 8 |
|
8 | | -public class CustomerFlinkPartition<T> extends FlinkFixedPartitioner<T> { |
9 | | - @Override |
| 9 | +public class CustomerFlinkPartition<T> extends FlinkKafkaPartitioner<T> { |
| 10 | + private static final long serialVersionUID = 1L; |
| 11 | + private int parallelInstanceId; |
| 12 | + |
| 13 | + public CustomerFlinkPartition() { |
| 14 | + |
| 15 | + } |
| 16 | + |
| 17 | + public void open(int parallelInstanceId, int parallelInstances) { |
| 18 | + Preconditions.checkArgument(parallelInstanceId >= 0, "Id of this subtask cannot be negative."); |
| 19 | + Preconditions.checkArgument(parallelInstances > 0, "Number of subtasks must be larger than 0."); |
| 20 | + this.parallelInstanceId = parallelInstanceId; |
| 21 | + } |
| 22 | + |
10 | 23 | public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) { |
11 | 24 | Preconditions.checkArgument(partitions != null && partitions.length > 0, "Partitions of the target topic is empty."); |
12 | 25 | if(key == null){ |
13 | | - return super.partition(record, key, value, targetTopic, partitions); |
| 26 | + return partitions[this.parallelInstanceId % partitions.length]; |
14 | 27 | } |
15 | 28 | return partitions[key.hashCode() % partitions.length]; |
16 | 29 | } |
| 30 | + |
| 31 | + public boolean equals(Object o) { |
| 32 | + return this == o || o instanceof CustomerFlinkPartition; |
| 33 | + } |
| 34 | + |
| 35 | + public int hashCode() { |
| 36 | + return CustomerFlinkPartition.class.hashCode(); |
| 37 | + } |
17 | 38 | } |
0 commit comments