Skip to content

Commit c40ac17

Browse files
author
dapeng
committed
日志记录
1 parent 13efd08 commit c40ac17

File tree

1 file changed

+7
-1
lines changed

1 file changed

+7
-1
lines changed

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKeyedSerializationSchema.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,14 @@
1111
import org.slf4j.Logger;
1212
import org.slf4j.LoggerFactory;
1313

14+
import java.util.concurrent.atomic.AtomicLong;
15+
1416
public class CustomerKeyedSerializationSchema implements KeyedSerializationSchema<Row> {
1517

1618
private static final Logger LOG = LoggerFactory.getLogger(CustomerKeyedSerializationSchema.class);
1719

20+
private static final AtomicLong counter = new AtomicLong(0L);
21+
1822
private static final long serialVersionUID = 1L;
1923
private final SerializationMetricWrapper serializationMetricWrapper;
2024
private String[] partitionKeys;
@@ -57,7 +61,9 @@ private byte[] serializeJsonKey(JsonRowSerializationSchema jsonRowSerializationS
5761
}
5862
return sb.toString().getBytes();
5963
} catch (Exception e){
60-
LOG.error("serializeJsonKey error", e);
64+
if(counter.getAndIncrement() % 1000 == 0){
65+
LOG.error("serializeJsonKey error", e);
66+
}
6167
}
6268
return null;
6369
}

0 commit comments

Comments
 (0)