Skip to content

Commit c895ba2

Browse files
author
dapeng
committed
修复kafka-source 作为数据源错误,type='kakfa'
1 parent 738a565 commit c895ba2

File tree

1 file changed

+6
-1
lines changed

1 file changed

+6
-1
lines changed

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaDeserializationMetricWrapper.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,13 @@ protected void beforeDeserialize() throws IOException {
7676
}
7777

7878
protected void registerPtMetric(AbstractFetcher<Row, ?> fetcher) throws Exception {
79+
Field consumerThreadField = null;
80+
if(fetcher.getClass().getDeclaredField("consumerThread") != null){
81+
consumerThreadField = fetcher.getClass().getDeclaredField("consumerThread");
82+
} else {
83+
consumerThreadField = fetcher.getClass().getSuperclass().getDeclaredField("consumerThread");
84+
}
7985

80-
Field consumerThreadField = fetcher.getClass().getSuperclass().getDeclaredField("consumerThread");
8186
consumerThreadField.setAccessible(true);
8287
KafkaConsumerThread consumerThread = (KafkaConsumerThread) consumerThreadField.get(fetcher);
8388

0 commit comments

Comments
 (0)