Skip to content

Commit ff67b88

Browse files
author
dapeng
committed
last commit
1 parent 0628f06 commit ff67b88

File tree

3 files changed

+2
-17
lines changed

3 files changed

+2
-17
lines changed

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerFlinkPartition.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,9 @@
99
public class CustomerFlinkPartition<T> extends FlinkFixedPartitioner<T> {
1010
@Override
1111
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
12-
System.out.println("key="+new String(key));
1312
Preconditions.checkArgument(partitions != null && partitions.length > 0, "Partitions of the target topic is empty.");
1413
if(key == null){
15-
Random random = new Random();
16-
return partitions[random.nextInt(1000) % partitions.length];
14+
super.partition(record, key, value, targetTopic, partitions);
1715
}
1816
return partitions[key.hashCode() % partitions.length];
1917
}

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKeyedSerializationSchema.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ public byte[] serializeKey(Row element) {
2828
}
2929
SerializationSchema<Row> serializationSchema = serializationMetricWrapper.getSerializationSchema();
3030
if(serializationSchema instanceof JsonRowSerializationSchema){
31-
System.out.println("serial start, element = " + element);
3231
return serializeJsonKey((JsonRowSerializationSchema) serializationSchema, element);
3332
}
3433
return null;
@@ -46,11 +45,9 @@ private byte[] serializeJsonKey(JsonRowSerializationSchema jsonRowSerializationS
4645
try {
4746
byte[] data = jsonRowSerializationSchema.serialize(element);
4847
ObjectNode objectNode = mapper.readValue(data, ObjectNode.class);
49-
System.out.println("objectNode = " + objectNode);
5048
StringBuilder sb = new StringBuilder();
5149
for(String key : partitionKeys){
5250
if(objectNode.has(key)){
53-
System.out.println("key = " + key+ ", value = " + objectNode.get(key));
5451
sb.append(objectNode.get(key));
5552
}
5653
}

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
8282
properties.setProperty(key, kafka10SinkTableInfo.getKafkaParam(key));
8383
}
8484

85-
this.partitioner = Optional.of(getFlinkPartitioner(kafka10SinkTableInfo));
85+
this.partitioner = Optional.of(new CustomerFlinkPartition<>());
8686
this.partitionKeys = getPartitionKeys(kafka10SinkTableInfo);
8787
this.fieldNames = kafka10SinkTableInfo.getFields();
8888
TypeInformation[] types = new TypeInformation[kafka10SinkTableInfo.getFields().length];
@@ -142,17 +142,7 @@ public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInform
142142
this.fieldTypes = fieldTypes;
143143
return this;
144144
}
145-
146-
private FlinkKafkaPartitioner getFlinkPartitioner(KafkaSinkTableInfo kafkaSinkTableInfo){
147-
System.out.println("enablePartition =" + kafkaSinkTableInfo.getEnableKeyPartition());
148-
if("true".equalsIgnoreCase(kafkaSinkTableInfo.getEnableKeyPartition())){
149-
return new CustomerFlinkPartition<>();
150-
}
151-
return new FlinkFixedPartitioner<>();
152-
}
153-
154145
private String[] getPartitionKeys(KafkaSinkTableInfo kafkaSinkTableInfo){
155-
System.out.println("partitionKeys =" + kafkaSinkTableInfo.getPartitionKeys());
156146
if(StringUtils.isNotBlank(kafkaSinkTableInfo.getPartitionKeys())){
157147
return StringUtils.split(kafkaSinkTableInfo.getPartitionKeys(), ',');
158148
}

0 commit comments

Comments
 (0)