Skip to content

Commit 0e7b726

Browse files
committed
source
1 parent 6943cff commit 0e7b726

File tree

17 files changed

+553
-731
lines changed

17 files changed

+553
-731
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.source.kafka;
20+
21+
import com.dtstack.flink.sql.format.DeserializationMetricWrapper;
22+
import com.dtstack.flink.sql.format.dtnest.DtNestRowDeserializationSchema;
23+
import com.dtstack.flink.sql.format.FormatType;
24+
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
25+
import org.apache.commons.lang3.StringUtils;
26+
import org.apache.flink.api.common.serialization.DeserializationSchema;
27+
import org.apache.flink.api.common.typeinfo.TypeInformation;
28+
import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
29+
import org.apache.flink.formats.csv.CsvRowDeserializationSchema;
30+
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
31+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
32+
import org.apache.flink.types.Row;
33+
34+
import java.util.Properties;
35+
36+
/**
37+
* company: www.dtstack.com
38+
*
39+
* @author: toutian
40+
* create: 2019/12/24
41+
*/
42+
public abstract class AbstractKafkaConsumerFactory {
43+
44+
protected abstract FlinkKafkaConsumerBase<Row> createKafkaTableSource(KafkaSourceTableInfo kafkaSourceTableInfo, TypeInformation<Row> typeInformation, Properties props);
45+
46+
protected DeserializationSchema<Row> createDeserializationSchema(KafkaSourceTableInfo kafkaSourceTableInfo, TypeInformation<Row> typeInformation) {
47+
DeserializationSchema<Row> deserializationSchema = null;
48+
if (FormatType.DT_NEST.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
49+
deserializationSchema = new DtNestRowDeserializationSchema(typeInformation, kafkaSourceTableInfo.getPhysicalFields(), kafkaSourceTableInfo.getFieldExtraInfoList());
50+
} else if (FormatType.JSON.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
51+
if (StringUtils.isBlank(kafkaSourceTableInfo.getSchemaString())) {
52+
throw new IllegalArgumentException("sourceDataType:" + FormatType.JSON.name() + " must set schemaString(JSON Schema)");
53+
}
54+
deserializationSchema = new JsonRowDeserializationSchema(kafkaSourceTableInfo.getSchemaString());
55+
} else if (FormatType.CSV.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
56+
if (StringUtils.isBlank(kafkaSourceTableInfo.getFieldDelimiter())) {
57+
throw new IllegalArgumentException("sourceDataType:" + FormatType.CSV.name() + " must set fieldDelimiter");
58+
}
59+
final CsvRowDeserializationSchema.Builder deserSchemaBuilder = new CsvRowDeserializationSchema.Builder(typeInformation);
60+
deserSchemaBuilder.setFieldDelimiter(kafkaSourceTableInfo.getFieldDelimiter().toCharArray()[0]);
61+
deserializationSchema = deserSchemaBuilder.build();
62+
} else if (FormatType.AVRO.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
63+
if (StringUtils.isBlank(kafkaSourceTableInfo.getSchemaString())) {
64+
throw new IllegalArgumentException("sourceDataType:" + FormatType.AVRO.name() + " must set schemaString");
65+
}
66+
deserializationSchema = new AvroRowDeserializationSchema(kafkaSourceTableInfo.getSchemaString());
67+
}
68+
69+
if (null == deserializationSchema) {
70+
throw new UnsupportedOperationException("FormatType:" + kafkaSourceTableInfo.getSourceDataType());
71+
}
72+
return deserializationSchema;
73+
}
74+
75+
protected DeserializationMetricWrapper createDeserializationMetricWrapper(KafkaSourceTableInfo kafkaSourceTableInfo, TypeInformation<Row> typeInformation, Calculate calculate) {
76+
return new KafkaDeserializationMetricWrapper(typeInformation, createDeserializationSchema(kafkaSourceTableInfo, typeInformation), calculate);
77+
}
78+
79+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.source.kafka;
20+
21+
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
22+
import org.apache.kafka.common.TopicPartition;
23+
24+
/**
25+
* company: www.dtstack.com
26+
* @author: toutian
27+
* create: 2019/12/24
28+
*/
29+
@FunctionalInterface
30+
public interface Calculate {
31+
32+
Long calc(SubscriptionState subscriptionState, TopicPartition topicPartition);
33+
}

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerKafka011Consumer.java

Lines changed: 0 additions & 76 deletions
This file was deleted.
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.source.kafka;
20+
21+
import com.dtstack.flink.sql.format.DeserializationMetricWrapper;
22+
import org.apache.flink.metrics.MetricGroup;
23+
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
24+
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
25+
import org.apache.flink.streaming.api.functions.source.SourceFunction;
26+
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
27+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
28+
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
29+
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
30+
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
31+
import org.apache.flink.types.Row;
32+
import org.apache.flink.util.SerializedValue;
33+
34+
import java.util.Arrays;
35+
import java.util.Map;
36+
import java.util.Properties;
37+
import java.util.regex.Pattern;
38+
39+
40+
/**
41+
* FlinkKafkaConsumer010、FlinkKafkaConsumer011 all extends FlinkKafkaConsumer09, and the construct use super(...) directly use FlinkKafkaConsumer09
42+
*
43+
* Reason:
44+
* Date: 2018/10/19
45+
* Company: www.dtstack.com
46+
*
47+
* @author xuchao
48+
*/
49+
public class KafkaConsumer extends FlinkKafkaConsumer09<Row> {
50+
51+
private static final long serialVersionUID = 4873757508981691375L;
52+
53+
private DeserializationMetricWrapper deserializationMetricWrapper;
54+
55+
public KafkaConsumer(String topic, DeserializationMetricWrapper deserializationMetricWrapper, Properties props) {
56+
super(Arrays.asList(topic.split(",")), deserializationMetricWrapper, props);
57+
this.deserializationMetricWrapper = deserializationMetricWrapper;
58+
}
59+
60+
public KafkaConsumer(Pattern subscriptionPattern, DeserializationMetricWrapper deserializationMetricWrapper, Properties props) {
61+
super(subscriptionPattern, deserializationMetricWrapper, props);
62+
this.deserializationMetricWrapper = deserializationMetricWrapper;
63+
}
64+
65+
@Override
66+
public void run(SourceFunction.SourceContext<Row> sourceContext) throws Exception {
67+
deserializationMetricWrapper.setRuntimeContext(getRuntimeContext());
68+
deserializationMetricWrapper.initMetric();
69+
super.run(sourceContext);
70+
}
71+
72+
@Override
73+
protected AbstractFetcher<Row, ?> createFetcher(SourceFunction.SourceContext<Row> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<Row>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<Row>> watermarksPunctuated, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception {
74+
AbstractFetcher<Row, ?> fetcher = super.createFetcher(sourceContext, assignedPartitionsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated, runtimeContext, offsetCommitMode, consumerMetricGroup, useMetrics);
75+
((KafkaDeserializationMetricWrapper) deserializationMetricWrapper).setFetcher(fetcher);
76+
return fetcher;
77+
}
78+
79+
}

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaDeserializationMetricWrapper.java renamed to kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaDeserializationMetricWrapper.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
package com.dtstack.flink.sql.source.kafka;
2020

2121
import com.dtstack.flink.sql.format.DeserializationMetricWrapper;
22-
import com.dtstack.flink.sql.source.kafka.metric.KafkaTopicPartitionLagMetric;
2322
import org.apache.flink.api.common.serialization.DeserializationSchema;
2423
import org.apache.flink.api.common.typeinfo.TypeInformation;
24+
import org.apache.flink.metrics.Gauge;
2525
import org.apache.flink.metrics.MetricGroup;
2626
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
2727
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
@@ -45,7 +45,7 @@
4545
* add metric for source
4646
* <p>
4747
* company: www.dtstack.com
48-
* author: toutian
48+
* @author: toutian
4949
* create: 2019/12/24
5050
*/
5151
public class KafkaDeserializationMetricWrapper extends DeserializationMetricWrapper {
@@ -56,8 +56,11 @@ public class KafkaDeserializationMetricWrapper extends DeserializationMetricWrap
5656

5757
private AtomicBoolean firstMsg = new AtomicBoolean(true);
5858

59-
public KafkaDeserializationMetricWrapper(TypeInformation<Row> typeInfo, DeserializationSchema<Row> deserializationSchema) {
59+
private Calculate calculate;
60+
61+
public KafkaDeserializationMetricWrapper(TypeInformation<Row> typeInfo, DeserializationSchema<Row> deserializationSchema, Calculate calculate) {
6062
super(typeInfo, deserializationSchema);
63+
this.calculate = calculate;
6164
}
6265

6366
@Override
@@ -99,12 +102,17 @@ protected void registerPtMetric(AbstractFetcher<Row, ?> fetcher) throws Exceptio
99102
//topic partitions lag
100103
SubscriptionState subscriptionState = (SubscriptionState) subscriptionStateField.get(kafkaConsumer);
101104
Set<TopicPartition> assignedPartitions = subscriptionState.assignedPartitions();
105+
102106
for (TopicPartition topicPartition : assignedPartitions) {
103107
MetricGroup metricGroup = getRuntimeContext().getMetricGroup().addGroup(DT_TOPIC_GROUP, topicPartition.topic())
104108
.addGroup(DT_PARTITION_GROUP, topicPartition.partition() + "");
105-
metricGroup.gauge(DT_TOPIC_PARTITION_LAG_GAUGE, new KafkaTopicPartitionLagMetric(subscriptionState, topicPartition));
109+
metricGroup.gauge(DT_TOPIC_PARTITION_LAG_GAUGE, new Gauge<Long>() {
110+
@Override
111+
public Long getValue() {
112+
return calculate.calc(subscriptionState, topicPartition);
113+
}
114+
});
106115
}
107-
108116
}
109117

110118
public void setFetcher(AbstractFetcher<Row, ?> fetcher) {

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/metric/KafkaTopicPartitionLagMetric.java

Lines changed: 0 additions & 29 deletions
This file was deleted.

0 commit comments

Comments
 (0)