Skip to content

Commit f42eac0

Browse files
author
dapeng
committed
fix 缩进和日志
1 parent e4eeebb commit f42eac0

File tree

2 files changed

+6
-5
lines changed

2 files changed

+6
-5
lines changed

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerFlinkPartition.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,8 @@ public void open(int parallelInstanceId, int parallelInstances) {
2323
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
2424
Preconditions.checkArgument(partitions != null && partitions.length > 0, "Partitions of the target topic is empty.");
2525
if(key == null){
26-
System.out.println("partitionKey=null");
2726
return partitions[this.parallelInstanceId % partitions.length];
2827
}
29-
System.out.println("partitionKey=" + new String(key));
3028
return partitions[Math.abs(new String(key).hashCode()) % partitions.length];
3129
}
3230

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKeyedSerializationSchema.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,13 @@
88
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
99
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
1010
import org.apache.flink.types.Row;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
1113

1214
public class CustomerKeyedSerializationSchema implements KeyedSerializationSchema<Row> {
1315

16+
private static final Logger LOG = LoggerFactory.getLogger(CustomerKeyedSerializationSchema.class);
17+
1418
private static final long serialVersionUID = 1L;
1519
private final SerializationMetricWrapper serializationMetricWrapper;
1620
private String[] partitionKeys;
@@ -52,10 +56,9 @@ private byte[] serializeJsonKey(JsonRowSerializationSchema jsonRowSerializationS
5256
}
5357
}
5458
return sb.toString().getBytes();
55-
}catch (Exception e){
56-
59+
} catch (Exception e){
60+
LOG.error("serializeJsonKey error", e);
5761
}
5862
return null;
59-
6063
}
6164
}

0 commit comments

Comments
 (0)