Skip to content

Commit baabfd4

Browse files
committed
extract abstractKafkaSource
1 parent d124305 commit baabfd4

File tree

9 files changed

+292
-391
lines changed

9 files changed

+292
-391
lines changed

core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,14 +219,14 @@ public static String addJdbcParam(String dbUrl, Map<String, String> addParams, b
219219
return preStr + "?" + sb.toString();
220220
}
221221

222-
public static boolean isJosn(String str){
222+
public static boolean isJson(String str) {
223223
boolean flag = false;
224-
if(StringUtils.isNotBlank(str)){
224+
if (StringUtils.isNotBlank(str)) {
225225
try {
226-
objectMapper.readValue(str,Map.class);
226+
objectMapper.readValue(str, Map.class);
227227
flag = true;
228228
} catch (Throwable e) {
229-
flag=false;
229+
flag = false;
230230
}
231231
}
232232
return flag;
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.source.kafka;
20+
21+
import com.dtstack.flink.sql.source.IStreamSourceGener;
22+
import com.dtstack.flink.sql.source.kafka.enums.EKafkaOffset;
23+
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
24+
import com.dtstack.flink.sql.util.DtStringUtil;
25+
import com.dtstack.flink.sql.util.PluginUtil;
26+
import org.apache.commons.lang3.StringUtils;
27+
import org.apache.flink.api.common.typeinfo.TypeInformation;
28+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
29+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
30+
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
31+
import org.apache.flink.table.api.Table;
32+
import org.apache.flink.types.Row;
33+
import org.apache.kafka.clients.consumer.ConsumerConfig;
34+
35+
import java.util.Map;
36+
import java.util.Properties;
37+
import java.util.stream.Collectors;
38+
import java.util.stream.IntStream;
39+
40+
/**
41+
* Date: 2020/3/20
42+
* Company: www.dtstack.com
43+
* @author maqi
44+
*/
45+
public abstract class AbstractKafkaSource implements IStreamSourceGener<Table> {
46+
47+
private static final String SOURCE_OPERATOR_NAME_TPL = "${topic}_${table}";
48+
49+
protected Properties getKafkaProperties(KafkaSourceTableInfo kafkaSourceTableInfo) {
50+
Properties props = new Properties();
51+
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSourceTableInfo.getBootstrapServers());
52+
53+
if (DtStringUtil.isJson(kafkaSourceTableInfo.getOffsetReset())) {
54+
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, EKafkaOffset.NONE.name().toLowerCase());
55+
} else {
56+
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaSourceTableInfo.getOffsetReset());
57+
}
58+
59+
if (StringUtils.isNotBlank(kafkaSourceTableInfo.getGroupId())) {
60+
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaSourceTableInfo.getGroupId());
61+
}
62+
63+
for (String key : kafkaSourceTableInfo.getKafkaParamKeys()) {
64+
props.setProperty(key, kafkaSourceTableInfo.getKafkaParam(key));
65+
}
66+
return props;
67+
}
68+
69+
protected String generateOperatorName(String tabName, String topicName) {
70+
return SOURCE_OPERATOR_NAME_TPL.replace("${topic}", topicName).replace("${table}", tabName);
71+
}
72+
73+
protected TypeInformation<Row> getRowTypeInformation(KafkaSourceTableInfo kafkaSourceTableInfo) {
74+
Class<?>[] fieldClasses = kafkaSourceTableInfo.getFieldClasses();
75+
TypeInformation[] types = IntStream.range(0, fieldClasses.length)
76+
.mapToObj(i -> TypeInformation.of(fieldClasses[i]))
77+
.toArray(TypeInformation[]::new);
78+
79+
return new RowTypeInfo(types, kafkaSourceTableInfo.getFields());
80+
}
81+
82+
protected void setStartPosition(String offset, String topicName, FlinkKafkaConsumerBase<Row> kafkaSrc) {
83+
if (StringUtils.equalsIgnoreCase(offset, EKafkaOffset.EARLIEST.name())) {
84+
kafkaSrc.setStartFromEarliest();
85+
} else if (DtStringUtil.isJson(offset)) {
86+
Map<KafkaTopicPartition, Long> specificStartupOffsets = buildOffsetMap(offset, topicName);
87+
kafkaSrc.setStartFromSpecificOffsets(specificStartupOffsets);
88+
} else {
89+
kafkaSrc.setStartFromLatest();
90+
}
91+
}
92+
93+
/**
94+
* kafka offset,eg.. {"0":12312,"1":12321,"2":12312}
95+
* @param offsetJson
96+
* @param topicName
97+
* @return
98+
*/
99+
protected Map<KafkaTopicPartition, Long> buildOffsetMap(String offsetJson, String topicName) {
100+
try {
101+
Properties properties = PluginUtil.jsonStrToObject(offsetJson, Properties.class);
102+
Map<String, Object> offsetMap = PluginUtil.objectToMap(properties);
103+
Map<KafkaTopicPartition, Long> specificStartupOffsets = offsetMap
104+
.entrySet()
105+
.stream()
106+
.collect(Collectors.toMap(
107+
(Map.Entry<String, Object> entry) -> new KafkaTopicPartition(topicName, Integer.valueOf(entry.getKey())),
108+
(Map.Entry<String, Object> entry) -> Long.valueOf(entry.getValue().toString()))
109+
);
110+
111+
return specificStartupOffsets;
112+
} catch (Exception e) {
113+
throw new RuntimeException("not support offsetReset type:" + offsetJson);
114+
}
115+
}
116+
117+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.source.kafka.enums;
20+
21+
/**
22+
* Date: 2020/3/20
23+
* Company: www.dtstack.com
24+
* @author maqi
25+
*/
26+
public enum EKafkaOffset {
27+
28+
LATEST,
29+
EARLIEST,
30+
NONE
31+
}

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919

2020
package com.dtstack.flink.sql.source.kafka.table;
2121

22+
import com.dtstack.flink.sql.source.kafka.enums.EKafkaOffset;
2223
import com.dtstack.flink.sql.table.AbstractSourceParser;
2324
import com.dtstack.flink.sql.table.AbstractTableInfo;
2425
import com.dtstack.flink.sql.util.MathUtil;
2526

2627
import java.util.Map;
28+
import java.util.stream.Collectors;
2729

2830
/**
2931
* Reason:
@@ -37,28 +39,27 @@ public class KafkaSourceParser extends AbstractSourceParser {
3739
public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) throws Exception {
3840

3941
KafkaSourceTableInfo kafkaSourceTableInfo = new KafkaSourceTableInfo();
40-
kafkaSourceTableInfo.setName(tableName);
41-
kafkaSourceTableInfo.setType(MathUtil.getString(props.get(KafkaSourceTableInfo.TYPE_KEY.toLowerCase())));
4242
parseFieldsInfo(fieldsInfo, kafkaSourceTableInfo);
4343

44+
kafkaSourceTableInfo.setName(tableName);
45+
kafkaSourceTableInfo.setType(MathUtil.getString(props.get(KafkaSourceTableInfo.TYPE_KEY.toLowerCase())));
4446
kafkaSourceTableInfo.setParallelism(MathUtil.getIntegerVal(props.get(KafkaSourceTableInfo.PARALLELISM_KEY.toLowerCase())));
45-
String bootstrapServer = MathUtil.getString(props.get(KafkaSourceTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase()));
46-
if (bootstrapServer == null || "".equals(bootstrapServer.trim())) {
47-
throw new Exception("BootstrapServers can not be empty!");
48-
} else {
49-
kafkaSourceTableInfo.setBootstrapServers(bootstrapServer);
50-
}
47+
kafkaSourceTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSourceTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
5148
kafkaSourceTableInfo.setGroupId(MathUtil.getString(props.get(KafkaSourceTableInfo.GROUPID_KEY.toLowerCase())));
5249
kafkaSourceTableInfo.setTopic(MathUtil.getString(props.get(KafkaSourceTableInfo.TOPIC_KEY.toLowerCase())));
53-
kafkaSourceTableInfo.setOffsetReset(MathUtil.getString(props.get(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase())));
50+
kafkaSourceTableInfo.setOffsetReset(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase(), EKafkaOffset.LATEST.name().toLowerCase())));
5451
kafkaSourceTableInfo.setTopicIsPattern(MathUtil.getBoolean(props.get(KafkaSourceTableInfo.TOPICISPATTERN_KEY.toLowerCase())));
5552
kafkaSourceTableInfo.setTimeZone(MathUtil.getString(props.get(KafkaSourceTableInfo.TIME_ZONE_KEY.toLowerCase())));
56-
for (String key : props.keySet()) {
57-
if (!key.isEmpty() && key.startsWith("kafka.")) {
58-
kafkaSourceTableInfo.addKafkaParam(key.substring(6), props.get(key).toString());
59-
}
60-
}
53+
54+
Map<String, String> kafkaParams = props.keySet().stream()
55+
.filter(key -> !key.isEmpty() && key.startsWith("kafka."))
56+
.collect(Collectors.toMap(
57+
key -> key.substring(6), key -> props.get(key).toString())
58+
);
59+
60+
kafkaSourceTableInfo.addKafkaParam(kafkaParams);
6161
kafkaSourceTableInfo.check();
62+
6263
return kafkaSourceTableInfo;
6364
}
6465
}

0 commit comments

Comments
 (0)