Skip to content

Commit 3cfe939

Browse files
committed
完成kafka source
1 parent 0e7b726 commit 3cfe939

File tree

20 files changed

+268
-1019
lines changed

20 files changed

+268
-1019
lines changed

kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserializationSchema.java

Lines changed: 0 additions & 127 deletions
This file was deleted.

kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerKafkaConsumer.java renamed to kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaConsumer.java

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818

1919
package com.dtstack.flink.sql.source.kafka;
2020

21-
import com.dtstack.flink.sql.format.dtnest.DtNestRowDeserializationSchema;
21+
import com.dtstack.flink.sql.format.DeserializationMetricWrapper;
2222
import org.apache.flink.metrics.MetricGroup;
2323
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
2424
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
25+
import org.apache.flink.streaming.api.functions.source.SourceFunction;
2526
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
2627
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
2728
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
@@ -35,38 +36,42 @@
3536
import java.util.Properties;
3637
import java.util.regex.Pattern;
3738

39+
3840
/**
39-
* @author: chuixue
40-
* @create: 2019-11-05 10:58
41-
* @description:
42-
**/
43-
public class CustomerKafkaConsumer extends FlinkKafkaConsumer<Row> {
41+
* Reason:
42+
* Date: 2018/10/19
43+
* Company: www.dtstack.com
44+
*
45+
* @author xuchao
46+
*/
47+
public class KafkaConsumer extends FlinkKafkaConsumer<Row> {
4448

45-
private static final long serialVersionUID = -2265366268827807739L;
49+
private static final long serialVersionUID = 4873757508981691375L;
4650

47-
private CustomerJsonDeserializationSchema customerJsonDeserialization;
51+
private DeserializationMetricWrapper deserializationMetricWrapper;
4852

49-
public CustomerKafkaConsumer(String topic, DtNestRowDeserializationSchema<Row> valueDeserializer, Properties props) {
50-
super(Arrays.asList(topic.split(",")), valueDeserializer, props);
51-
this.customerJsonDeserialization = (CustomerJsonDeserializationSchema) valueDeserializer;
53+
public KafkaConsumer(String topic, DeserializationMetricWrapper deserializationMetricWrapper, Properties props) {
54+
super(Arrays.asList(topic.split(",")), deserializationMetricWrapper, props);
55+
this.deserializationMetricWrapper = deserializationMetricWrapper;
5256
}
5357

54-
public CustomerKafkaConsumer(Pattern subscriptionPattern, DtNestRowDeserializationSchema<Row> valueDeserializer, Properties props) {
55-
super(subscriptionPattern, valueDeserializer, props);
56-
this.customerJsonDeserialization = (CustomerJsonDeserializationSchema) valueDeserializer;
58+
public KafkaConsumer(Pattern subscriptionPattern, DeserializationMetricWrapper deserializationMetricWrapper, Properties props) {
59+
super(subscriptionPattern, deserializationMetricWrapper, props);
60+
this.deserializationMetricWrapper = deserializationMetricWrapper;
5761
}
5862

5963
@Override
60-
public void run(SourceContext<Row> sourceContext) throws Exception {
61-
customerJsonDeserialization.setRuntimeContext(getRuntimeContext());
62-
customerJsonDeserialization.initMetric();
64+
public void run(SourceFunction.SourceContext<Row> sourceContext) throws Exception {
65+
deserializationMetricWrapper.setRuntimeContext(getRuntimeContext());
66+
deserializationMetricWrapper.initMetric();
6367
super.run(sourceContext);
6468
}
6569

6670
@Override
67-
protected AbstractFetcher<Row, ?> createFetcher(SourceContext<Row> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<Row>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<Row>> watermarksPunctuated, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception {
71+
protected AbstractFetcher<Row, ?> createFetcher(SourceFunction.SourceContext<Row> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<Row>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<Row>> watermarksPunctuated, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception {
6872
AbstractFetcher<Row, ?> fetcher = super.createFetcher(sourceContext, assignedPartitionsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated, runtimeContext, offsetCommitMode, consumerMetricGroup, useMetrics);
69-
customerJsonDeserialization.setFetcher(fetcher);
73+
((KafkaDeserializationMetricWrapper) deserializationMetricWrapper).setFetcher(fetcher);
7074
return fetcher;
7175
}
72-
}
76+
77+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.source.kafka;
20+
21+
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
22+
import org.apache.flink.api.common.typeinfo.TypeInformation;
23+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
24+
import org.apache.flink.types.Row;
25+
import org.apache.kafka.common.requests.IsolationLevel;
26+
27+
import java.util.Properties;
28+
import java.util.regex.Pattern;
29+
30+
/**
31+
* company: www.dtstack.com
32+
* @author: toutian
33+
* create: 2019/12/24
34+
*/
35+
public class KafkaConsumerFactory extends AbstractKafkaConsumerFactory {
36+
37+
@Override
38+
public FlinkKafkaConsumerBase<Row> createKafkaTableSource(KafkaSourceTableInfo kafkaSourceTableInfo, TypeInformation<Row> typeInformation, Properties props) {
39+
KafkaConsumer kafkaSrc = null;
40+
if (kafkaSourceTableInfo.getTopicIsPattern()) {
41+
kafkaSrc = new KafkaConsumer(Pattern.compile(kafkaSourceTableInfo.getTopic()), createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (subscriptionState, tp) -> subscriptionState.partitionLag(tp, IsolationLevel.READ_UNCOMMITTED)), props);
42+
} else {
43+
kafkaSrc = new KafkaConsumer(kafkaSourceTableInfo.getTopic(), createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (subscriptionState, tp) -> subscriptionState.partitionLag(tp, IsolationLevel.READ_UNCOMMITTED)), props);
44+
}
45+
return kafkaSrc;
46+
}
47+
48+
}

kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,8 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
8484

8585
TypeInformation<Row> typeInformation = new RowTypeInfo(types, kafkaSourceTableInfo.getFields());
8686

87-
FlinkKafkaConsumer<Row> kafkaSrc;
88-
if (BooleanUtils.isTrue(kafkaSourceTableInfo.getTopicIsPattern())) {
89-
kafkaSrc = new CustomerKafkaConsumer(Pattern.compile(topicName),
90-
new CustomerJsonDeserializationSchema(typeInformation, kafkaSourceTableInfo.getPhysicalFields(), kafkaSourceTableInfo.getFieldExtraInfoList()), props);
91-
} else {
92-
kafkaSrc = new CustomerKafkaConsumer(topicName,
93-
new CustomerJsonDeserializationSchema(typeInformation, kafkaSourceTableInfo.getPhysicalFields(), kafkaSourceTableInfo.getFieldExtraInfoList()), props);
94-
}
87+
FlinkKafkaConsumer<Row> kafkaSrc = (FlinkKafkaConsumer<Row>) new KafkaConsumerFactory().createKafkaTableSource(kafkaSourceTableInfo, typeInformation, props);
88+
9589

9690
//earliest,latest
9791
if ("earliest".equalsIgnoreCase(kafkaSourceTableInfo.getOffsetReset())) {

kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/metric/KafkaTopicPartitionLagMetric.java

Lines changed: 0 additions & 46 deletions
This file was deleted.

kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 0 additions & 60 deletions
This file was deleted.

0 commit comments

Comments
 (0)