Skip to content

Commit ccd5ede

Browse files
committed
添加kafka-sink文档
1 parent cdc91af commit ccd5ede

File tree

2 files changed

+168
-16
lines changed

2 files changed

+168
-16
lines changed

docs/plugin/kafkaSink.md

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
## 1.格式:
2+
```
3+
CREATE TABLE tableName(
4+
colName colType,
5+
...
6+
colNameX colType,
7+
[primary key (colName)]
8+
)WITH(
9+
type ='kafka09',
10+
bootstrapServers ='ip:port,ip:port...',
11+
zookeeperQuorum ='ip:port,ip:port/zkparent',
12+
offsetReset ='latest',
13+
topic ='topicName',
14+
groupId='test',
15+
parallelism ='parllNum',
16+
timezone='Asia/Shanghai',
17+
sourcedatatype ='json' #可不设置
18+
);
19+
```
20+
21+
## 2.支持版本
22+
kafka09,kafka10,kafka11及以上版本
23+
24+
## 3.表结构定义
25+
26+
|参数名称|含义|
27+
|----|---|
28+
| tableName| 结果表名称|
29+
| colName | 列名称|
30+
| colType | 列类型 [colType支持的类型](docs/colType.md)|
31+
32+
## 4.参数:
33+
34+
|参数名称|含义|是否必填|默认值|
35+
|----|----|----|----|
36+
|type |表名的输出表类型[kafka09|kafka10|kafka11|kafka]|||
37+
|groupId | 需要读取的 groupId 名称|||
38+
|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|||
39+
|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|||
40+
|topic | 需要读取的 topic 名称|||
41+
|topicIsPattern | topic是否是正则表达式格式(true|false) |否| false
42+
|offsetReset | 读取的topic 的offset初始位置[latest|earliest|指定offset值({"0":12312,"1":12321,"2":12312},{"partition_no":offset_value})]||latest|
43+
|parallelism | 并行度设置||1|
44+
|sourcedatatype | 数据类型||json|
45+
|timezone|时区设置[timezone支持的参数](../timeZone.md)|否|'Asia/Shanghai'
46+
**kafka相关参数可以自定义,使用kafka.开头即可。**
47+
```
48+
kafka.consumer.id
49+
kafka.socket.timeout.ms
50+
kafka.fetch.message.max.bytes
51+
kafka.num.consumer.fetchers
52+
kafka.auto.commit.enable
53+
kafka.auto.commit.interval.ms
54+
kafka.queued.max.message.chunks
55+
kafka.rebalance.max.retries
56+
kafka.fetch.min.bytes
57+
kafka.fetch.wait.max.ms
58+
kafka.rebalance.backoff.ms
59+
kafka.refresh.leader.backoff.ms
60+
kafka.consumer.timeout.ms
61+
kafka.exclude.internal.topics
62+
kafka.partition.assignment.strategy
63+
kafka.client.id
64+
kafka.zookeeper.session.timeout.ms
65+
kafka.zookeeper.connection.timeout.ms
66+
kafka.zookeeper.sync.time.ms
67+
kafka.offsets.storage
68+
kafka.offsets.channel.backoff.ms
69+
kafka.offsets.channel.socket.timeout.ms
70+
kafka.offsets.commit.max.retries
71+
kafka.dual.commit.enabled
72+
kafka.partition.assignment.strategy
73+
kafka.socket.receive.buffer.bytes
74+
kafka.fetch.min.bytes
75+
```
76+
77+
## 5.完整样例:
78+
```
79+
CREATE TABLE MyTable(
80+
id bigint,
81+
name varchar,
82+
address varchar
83+
)WITH(
84+
type = 'kafka10',
85+
bootstrapServers = '172.16.101.224:9092',
86+
zookeeperQuorm = '172.16.100.188:2181/kafka',
87+
offsetReset = 'latest',
88+
topic = 'tiezhu_test_in2',
89+
groupId = 'flink_sql',
90+
timezone = 'Asia/Shanghai',
91+
topicIsPattern = 'false',
92+
parallelism = '1'
93+
);
94+
95+
CREATE TABLE sideTable(
96+
id bigint,
97+
school varchar,
98+
home varchar,
99+
PRIMARY KEY(id),
100+
PERIOD FOR SYSTEM_TIME
101+
)WITH(
102+
type='mysql',
103+
url='jdbc:mysql://172.16.8.109:3306/tiezhu',
104+
userName='dtstack',
105+
password='abc123',
106+
tableName='stressTest',
107+
cache='ALL',
108+
parallelism='1'
109+
);
110+
111+
CREATE TABLE MyResult(
112+
id bigint,
113+
name varchar,
114+
address varchar,
115+
home varchar,
116+
school varchar
117+
)WITH(
118+
type = 'kafka10',
119+
bootstrapServers = '172.16.101.224:9092',
120+
zookeeperQuorm = '172.16.100.188:2181/kafka',
121+
offsetReset = 'latest',
122+
topic = 'tiezhu_test_out2',
123+
parallelism = '1'
124+
);
125+
126+
insert
127+
into
128+
MyResult
129+
select
130+
t1.id AS id,
131+
t1.name AS name,
132+
t1.address AS address,
133+
t2.school AS school,
134+
t2.home AS home
135+
from(
136+
select
137+
id,
138+
name,
139+
address
140+
from
141+
MyTable
142+
) t1
143+
join sideTable t2
144+
on t1.id = t2.id;
145+
```
146+
147+
## 6.结果表数据示例:
148+
```
149+
[root@node002 bin]# ./kafka-console-consumer.sh --bootstrap-server 172.16.101.224:9092 --topic tiezhu_test_out2
150+
{"id":122,"name":"tiezhu122","address":"hangzhou122","home":"ganzhou122","school":" ecjtu122"}
151+
{"id":123,"name":"tiezhu123","address":"hangzhou123","home":"ganzhou123","school":" ecjtu123"}
152+
```

pom.xml

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,24 @@
1616
<module>kafka11</module>
1717
<module>kafka</module>
1818
<module>mysql</module>
19-
<module>hbase</module>
20-
<module>elasticsearch5</module>
21-
<module>mongo</module>
22-
<module>redis5</module>
19+
<!-- <module>hbase</module>-->
20+
<!-- <module>elasticsearch5</module>-->
21+
<!-- <module>mongo</module>-->
22+
<!-- <module>redis5</module>-->
2323
<module>launcher</module>
2424
<module>rdb</module>
25-
<module>sqlserver</module>
26-
<module>oracle</module>
27-
<module>cassandra</module>
28-
<module>kudu</module>
29-
<module>postgresql</module>
30-
<module>serversocket</module>
31-
<module>console</module>
32-
<module>clickhouse</module>
33-
<module>impala</module>
34-
<module>db2</module>
35-
<module>polardb</module>
36-
<module>elasticsearch6</module>
25+
<!-- <module>sqlserver</module>-->
26+
<!-- <module>oracle</module>-->
27+
<!-- <module>cassandra</module>-->
28+
<!-- <module>kudu</module>-->
29+
<!-- <module>postgresql</module>-->
30+
<!-- <module>serversocket</module>-->
31+
<!-- <module>console</module>-->
32+
<!-- <module>clickhouse</module>-->
33+
<!-- <module>impala</module>-->
34+
<!-- <module>db2</module>-->
35+
<!-- <module>polardb</module>-->
36+
<!-- <module>elasticsearch6</module>-->
3737

3838
</modules>
3939

0 commit comments

Comments
 (0)