Skip to content

Commit 2411ac6

Browse files
committed
Merge branch 'v1.8.0_dev_bugfix_kafka' into 'v1.8.0_dev'
V1.8.0 dev bugfix kafka 流计算设置kafka自定义参数无效 See merge request !138
2 parents 12406b9 + ea91fbc commit 2411ac6

File tree

14 files changed

+170
-34
lines changed

14 files changed

+170
-34
lines changed

core/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@
139139
<goal>shade</goal>
140140
</goals>
141141
<configuration>
142+
<createDependencyReducedPom>false</createDependencyReducedPom>
142143
<transformers>
143144
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
144145
<mainClass>com.dtstack.flink.sql.Main</mainClass>

docs/kafkaSource.md

Lines changed: 64 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@ CREATE TABLE tableName(
99
WATERMARK FOR colName AS withOffset( colName , delayTime )
1010
)WITH(
1111
type ='kafka09',
12-
kafka.bootstrap.servers ='ip:port,ip:port...',
13-
kafka.zookeeper.quorum ='ip:port,ip:port/zkparent',
14-
kafka.auto.offset.reset ='latest',
15-
kafka.topic ='topicName',
12+
bootstrapServers ='ip:port,ip:port...',
13+
zookeeperQuorum ='ip:port,ip:port/zkparent',
14+
offsetReset ='latest',
15+
topic ='topicName',
16+
groupId='test',
1617
parallelism ='parllNum',
1718
--timezone='America/Los_Angeles',
1819
timezone='Asia/Shanghai',
@@ -39,16 +40,45 @@ CREATE TABLE tableName(
3940
|参数名称|含义|是否必填|默认值|
4041
|----|---|---|---|
4142
|type | kafka09 ||kafka08、kafka09、kafka10、kafka11、kafka(对应kafka1.0及以上版本)|
42-
|kafka.group.id | 需要读取的 groupId 名称|||
43-
|kafka.bootstrap.servers | kafka bootstrap-server 地址信息(多个用逗号隔开)|||
44-
|kafka.zookeeper.quorum | kafka zk地址信息(多个之间用逗号分隔)|||
45-
|kafka.topic | 需要读取的 topic 名称|||
46-
|patterntopic | topic是否是正则表达式格式(true&#124;false) |否| false
47-
|kafka.auto.offset.reset | 读取的topic 的offset初始位置[latest&#124;earliest&#124;指定offset值({"0":12312,"1":12321,"2":12312},{"partition_no":offset_value})]||latest|
43+
|groupId | 需要读取的 groupId 名称|||
44+
|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|||
45+
|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|||
46+
|topic | 需要读取的 topic 名称|||
47+
|topicIsPattern | topic是否是正则表达式格式(true&#124;false) |否| false
48+
|offsetReset | 读取的topic 的offset初始位置[latest&#124;earliest&#124;指定offset值({"0":12312,"1":12321,"2":12312},{"partition_no":offset_value})]||latest|
4849
|parallelism | 并行度设置||1|
4950
|sourcedatatype | 数据类型||json|
5051
|timezone|时区设置[timezone支持的参数](timeZone.md)|否|'Asia/Shanghai'
5152
**kafka相关参数可以自定义,使用kafka.开头即可。**
53+
```
54+
kafka.consumer.id
55+
kafka.socket.timeout.ms
56+
kafka.fetch.message.max.bytes
57+
kafka.num.consumer.fetchers
58+
kafka.auto.commit.enable
59+
kafka.auto.commit.interval.ms
60+
kafka.queued.max.message.chunks
61+
kafka.rebalance.max.retries
62+
kafka.fetch.min.bytes
63+
kafka.fetch.wait.max.ms
64+
kafka.rebalance.backoff.ms
65+
kafka.refresh.leader.backoff.ms
66+
kafka.consumer.timeout.ms
67+
kafka.exclude.internal.topics
68+
kafka.partition.assignment.strategy
69+
kafka.client.id
70+
kafka.zookeeper.session.timeout.ms
71+
kafka.zookeeper.connection.timeout.ms
72+
kafka.zookeeper.sync.time.ms
73+
kafka.offsets.storage
74+
kafka.offsets.channel.backoff.ms
75+
kafka.offsets.channel.socket.timeout.ms
76+
kafka.offsets.commit.max.retries
77+
kafka.dual.commit.enabled
78+
kafka.partition.assignment.strategy
79+
kafka.socket.receive.buffer.bytes
80+
kafka.fetch.min.bytes
81+
```
5282

5383
## 5.样例:
5484
```
@@ -60,12 +90,12 @@ CREATE TABLE MyTable(
6090
CHARACTER_LENGTH(channel) AS timeLeng
6191
)WITH(
6292
type ='kafka09',
63-
kafka.bootstrap.servers ='172.16.8.198:9092',
64-
kafka.zookeeper.quorum ='172.16.8.198:2181/kafka',
65-
kafka.auto.offset.reset ='latest',
66-
kafka.topic ='nbTest1,nbTest2,nbTest3',
67-
--kafka.topic ='mqTest.*',
68-
--patterntopic='true'
93+
bootstrapServers ='172.16.8.198:9092',
94+
zookeeperQuorum ='172.16.8.198:2181/kafka',
95+
offsetReset ='latest',
96+
topic ='nbTest1,nbTest2,nbTest3',
97+
--topic ='mqTest.*',
98+
--topicIsPattern='true'
6999
parallelism ='1',
70100
sourcedatatype ='json' #可不设置
71101
);
@@ -146,10 +176,10 @@ CREATE TABLE MyTable(
146176
|参数名称|含义|是否必填|默认值|
147177
|----|---|---|---|
148178
|type | kafka09 |||
149-
|kafka.bootstrap.servers | kafka bootstrap-server 地址信息(多个用逗号隔开)|||
150-
|kafka.zookeeper.quorum | kafka zk地址信息(多个之间用逗号分隔)|||
151-
|kafka.topic | 需要读取的 topic 名称|||
152-
|kafka.auto.offset.reset | 读取的topic 的offset初始位置[latest&#124;earliest]||latest|
179+
|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|||
180+
|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|||
181+
|topic | 需要读取的 topic 名称|||
182+
|offsetReset | 读取的topic 的offset初始位置[latest&#124;earliest]||latest|
153183
|parallelism | 并行度设置 ||1|
154184
|sourcedatatype | 数据类型||csv|
155185
|fielddelimiter | 字段分隔符|||
@@ -166,12 +196,12 @@ CREATE TABLE MyTable(
166196
CHARACTER_LENGTH(channel) AS timeLeng
167197
)WITH(
168198
type ='kafka09',
169-
kafka.bootstrap.servers ='172.16.8.198:9092',
170-
kafka.zookeeper.quorum ='172.16.8.198:2181/kafka',
171-
kafka.auto.offset.reset ='latest',
172-
kafka.topic ='nbTest1',
173-
--kafka.topic ='mqTest.*',
174-
--kafka.topicIsPattern='true'
199+
bootstrapServers ='172.16.8.198:9092',
200+
zookeeperQuorum ='172.16.8.198:2181/kafka',
201+
offsetReset ='latest',
202+
topic ='nbTest1',
203+
--topic ='mqTest.*',
204+
--topicIsPattern='true'
175205
parallelism ='1',
176206
sourcedatatype ='csv',
177207
fielddelimiter ='\|',
@@ -192,10 +222,10 @@ create table kafka_stream(
192222
_offset BIGINT,
193223
) with (
194224
type ='kafka09',
195-
kafka.bootstrap.servers ='172.16.8.198:9092',
196-
kafka.zookeeper.quorum ='172.16.8.198:2181/kafka',
197-
kafka.auto.offset.reset ='latest',
198-
kafka.topic ='nbTest1',
225+
bootstrapServers ='172.16.8.198:9092',
226+
zookeeperQuorum ='172.16.8.198:2181/kafka',
227+
offsetReset ='latest',
228+
topic ='nbTest1',
199229
parallelism ='1',
200230
sourcedatatype='text'
201231
@@ -205,10 +235,10 @@ create table kafka_stream(
205235
|参数名称|含义|是否必填|默认值|
206236
|----|---|---|---|
207237
|type | kafka09 |||
208-
|kafka.bootstrap.servers | kafka bootstrap-server 地址信息(多个用逗号隔开)|||
209-
|kafka.zookeeper.quorum | kafka zk地址信息(多个之间用逗号分隔)|||
210-
|kafka.topic | 需要读取的 topic 名称|||
211-
|kafka.auto.offset.reset | 读取的topic 的offset初始位置[latest&#124;earliest]||latest|
238+
|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|||
239+
|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|||
240+
|topic | 需要读取的 topic 名称|||
241+
|offsetReset | 读取的topic 的offset初始位置[latest&#124;earliest]||latest|
212242
|parallelism | 并行度设置||1|
213243
|sourcedatatype | 数据类型||text|
214244
**kafka相关参数可以自定义,使用kafka.开头即可。**

kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
6464
String topicName = kafkaSourceTableInfo.getTopic();
6565

6666
Properties props = new Properties();
67+
for (String key : kafkaSourceTableInfo.getKafkaParamKeys()) {
68+
props.setProperty(key, kafkaSourceTableInfo.getKafkaParam(key));
69+
}
6770
props.setProperty("bootstrap.servers", kafkaSourceTableInfo.getBootstrapServers());
6871
if (DtStringUtil.isJosn(kafkaSourceTableInfo.getOffsetReset())) {
6972
props.setProperty("auto.offset.reset", "none");

kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
9191
kafkaSourceTableInfo.setOffsetReset(MathUtil.getString(props.get(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase())));
9292
kafkaSourceTableInfo.setTopicIsPattern(MathUtil.getBoolean(props.get(KafkaSourceTableInfo.TOPICISPATTERN_KEY.toLowerCase())));
9393
kafkaSourceTableInfo.setTimeZone(MathUtil.getString(props.get(KafkaSourceTableInfo.TIME_ZONE_KEY.toLowerCase())));
94+
for (String key : props.keySet()) {
95+
if (!key.isEmpty() && key.startsWith("kafka.")) {
96+
kafkaSourceTableInfo.addKafkaParam(key.substring(6), props.get(key).toString());
97+
}
98+
}
9499
kafkaSourceTableInfo.check();
95100
return kafkaSourceTableInfo;
96101
}

kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121
import com.dtstack.flink.sql.table.SourceTableInfo;
2222
import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions;
2323

24+
import java.util.HashMap;
25+
import java.util.Map;
26+
import java.util.Set;
27+
2428
/**
2529
* @author: chuixue
2630
* @create: 2019-11-05 11:09
@@ -113,6 +117,20 @@ public void setOffset(String offset) {
113117
this.offset = offset;
114118
}
115119

120+
public Map<String, String> kafkaParam = new HashMap<>();
121+
122+
public void addKafkaParam(String key, String value) {
123+
kafkaParam.put(key, value);
124+
}
125+
126+
public String getKafkaParam(String key) {
127+
return kafkaParam.get(key);
128+
}
129+
130+
public Set<String> getKafkaParamKeys() {
131+
return kafkaParam.keySet();
132+
}
133+
116134
@Override
117135
public boolean check() {
118136
Preconditions.checkNotNull(bootstrapServers, "kafka of bootstrapServers is required");

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
6767
String topicName = kafka09SourceTableInfo.getTopic();
6868

6969
Properties props = new Properties();
70+
for (String key : kafka09SourceTableInfo.getKafkaParamKeys()) {
71+
props.setProperty(key, kafka09SourceTableInfo.getKafkaParam(key));
72+
}
7073
props.setProperty("bootstrap.servers", kafka09SourceTableInfo.getBootstrapServers());
7174
if (DtStringUtil.isJosn(kafka09SourceTableInfo.getOffsetReset())){
7275
props.setProperty("auto.offset.reset", "none");

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,11 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
9090
kafka09SourceTableInfo.setOffsetReset(MathUtil.getString(props.get(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase())));
9191
kafka09SourceTableInfo.setTopicIsPattern(MathUtil.getBoolean(props.get(KafkaSourceTableInfo.TOPICISPATTERN_KEY.toLowerCase())));
9292
kafka09SourceTableInfo.setTimeZone(MathUtil.getString(props.get(KafkaSourceTableInfo.TIME_ZONE_KEY.toLowerCase())));
93+
for (String key : props.keySet()) {
94+
if (!key.isEmpty() && key.startsWith("kafka.")) {
95+
kafka09SourceTableInfo.addKafkaParam(key.substring(6), props.get(key).toString());
96+
}
97+
}
9398
kafka09SourceTableInfo.check();
9499
return kafka09SourceTableInfo;
95100
}

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222
import com.dtstack.flink.sql.table.SourceTableInfo;
2323
import com.google.common.base.Preconditions;
2424

25+
import java.util.HashMap;
26+
import java.util.Map;
27+
import java.util.Set;
28+
2529
/**
2630
* Reason:
2731
* Date: 2018/6/22
@@ -117,6 +121,20 @@ public void setOffset(String offset) {
117121
this.offset = offset;
118122
}
119123

124+
public Map<String, String> kafkaParam = new HashMap<>();
125+
126+
public void addKafkaParam(String key, String value) {
127+
kafkaParam.put(key, value);
128+
}
129+
130+
public String getKafkaParam(String key) {
131+
return kafkaParam.get(key);
132+
}
133+
134+
public Set<String> getKafkaParamKeys() {
135+
return kafkaParam.keySet();
136+
}
137+
120138
@Override
121139
public boolean check() {
122140
Preconditions.checkNotNull(bootstrapServers, "kafka of bootstrapServers is required");

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
6868
String topicName = kafka010SourceTableInfo.getTopic();
6969

7070
Properties props = new Properties();
71+
for (String key : kafka010SourceTableInfo.getKafkaParamKeys()) {
72+
props.setProperty(key, kafka010SourceTableInfo.getKafkaParam(key));
73+
}
7174
props.setProperty("bootstrap.servers", kafka010SourceTableInfo.getBootstrapServers());
7275
if (DtStringUtil.isJosn(kafka010SourceTableInfo.getOffsetReset())){
7376
props.setProperty("auto.offset.reset", "none");

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
9696
kafka10SourceTableInfo.setOffsetReset(MathUtil.getString(props.get(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase())));
9797
kafka10SourceTableInfo.setTopicIsPattern(MathUtil.getBoolean(props.get(KafkaSourceTableInfo.TOPICISPATTERN_KEY.toLowerCase())));
9898
kafka10SourceTableInfo.setTimeZone(MathUtil.getString(props.get(KafkaSourceTableInfo.TIME_ZONE_KEY.toLowerCase())));
99+
for (String key : props.keySet()) {
100+
if (!key.isEmpty() && key.startsWith("kafka.")) {
101+
kafka10SourceTableInfo.addKafkaParam(key.substring(6), props.get(key).toString());
102+
}
103+
}
99104
kafka10SourceTableInfo.check();
100105
return kafka10SourceTableInfo;
101106
}

0 commit comments

Comments
 (0)