Skip to content

Commit 6754700

Browse files
author
dapeng
committed
fix 类型判断bug
1 parent c895ba2 commit 6754700

File tree

1 file changed

+9
-7
lines changed

1 file changed

+9
-7
lines changed

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaDeserializationMetricWrapper.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,7 @@ protected void beforeDeserialize() throws IOException {
7676
}
7777

7878
protected void registerPtMetric(AbstractFetcher<Row, ?> fetcher) throws Exception {
79-
Field consumerThreadField = null;
80-
if(fetcher.getClass().getDeclaredField("consumerThread") != null){
81-
consumerThreadField = fetcher.getClass().getDeclaredField("consumerThread");
82-
} else {
83-
consumerThreadField = fetcher.getClass().getSuperclass().getDeclaredField("consumerThread");
84-
}
85-
79+
Field consumerThreadField = getConsumerThreadField(fetcher);
8680
consumerThreadField.setAccessible(true);
8781
KafkaConsumerThread consumerThread = (KafkaConsumerThread) consumerThreadField.get(fetcher);
8882

@@ -123,4 +117,12 @@ public Long getValue() {
123117
public void setFetcher(AbstractFetcher<Row, ?> fetcher) {
124118
this.fetcher = fetcher;
125119
}
120+
121+
private Field getConsumerThreadField(AbstractFetcher fetcher) throws NoSuchFieldException {
122+
try {
123+
return fetcher.getClass().getDeclaredField("consumerThread");
124+
} catch (Exception e) {
125+
return fetcher.getClass().getSuperclass().getDeclaredField("consumerThread");
126+
}
127+
}
126128
}

0 commit comments

Comments
 (0)