Skip to content

Commit d822d4c

Browse files
author
gituser
committed
Merge branch 'hotfix_1.8_3.10.x_23728' into 1.8_release_3.10.x
2 parents d910719 + 6754700 commit d822d4c

File tree

1 file changed

+9
-2
lines changed

1 file changed

+9
-2
lines changed

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaDeserializationMetricWrapper.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,7 @@ protected void beforeDeserialize() throws IOException {
7676
}
7777

7878
protected void registerPtMetric(AbstractFetcher<Row, ?> fetcher) throws Exception {
79-
80-
Field consumerThreadField = fetcher.getClass().getSuperclass().getDeclaredField("consumerThread");
79+
Field consumerThreadField = getConsumerThreadField(fetcher);
8180
consumerThreadField.setAccessible(true);
8281
KafkaConsumerThread consumerThread = (KafkaConsumerThread) consumerThreadField.get(fetcher);
8382

@@ -118,4 +117,12 @@ public Long getValue() {
118117
public void setFetcher(AbstractFetcher<Row, ?> fetcher) {
119118
this.fetcher = fetcher;
120119
}
120+
121+
private Field getConsumerThreadField(AbstractFetcher fetcher) throws NoSuchFieldException {
122+
try {
123+
return fetcher.getClass().getDeclaredField("consumerThread");
124+
} catch (Exception e) {
125+
return fetcher.getClass().getSuperclass().getDeclaredField("consumerThread");
126+
}
127+
}
121128
}

0 commit comments

Comments
 (0)