Skip to content

Commit 49af672

Browse files
committed
Merge branch 'feat_1.8_modifyDoc' of http://gitlab.prod.dtstack.cn/dt-insight-engine/flinkStreamSQL into feat_1.8_modifyDoc
� Conflicts: � docs/plugin/db2Side.md
2 parents eb7502a + ccd5ede commit 49af672

File tree

12 files changed

+728
-99
lines changed

12 files changed

+728
-99
lines changed

docs/plugin/cassandraSide.md

Lines changed: 115 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11

22
## 1.格式:
3+
4+
通过建表语句中的` PERIOD FOR SYSTEM_TIME`将表标识为维表,其中`PRIMARY KEY(keyInfo)`中的keyInfo,表示用来和源表进行关联的字段,
5+
维表JOIN的条件必须与`keyInfo`字段一致。
36
```
47
CREATE TABLE tableName(
58
colName cloType,
@@ -22,15 +25,15 @@
2225
```
2326

2427
# 2.支持版本
25-
cassandra-3.6.x
28+
cassandra-3.x
2629

2730
## 3.表结构定义
2831

2932
|参数名称|含义|
3033
|----|---|
31-
| tableName | 注册到flink的表名称(可选填;不填默认和hbase对应的表名称相同)|
34+
| tableName | 注册到flink的表名称(可选填;不填默认和cassandra对应的表名称相同)|
3235
| colName | 列名称|
33-
| colType | 列类型 [colType支持的类型](docs/colType.md)|
36+
| colType | 列类型|
3437
| PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息|
3538
| PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开|
3639

@@ -46,40 +49,128 @@
4649
| database | cassandra表名称|||
4750
| cache | 维表缓存策略(NONE/LRU)||NONE|
4851
| partitionedJoin | 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量)||false|
49-
| maxRequestsPerConnection | 每个连接最多允许64个并发请求||NONE|
50-
| coreConnectionsPerHost | 和Cassandra集群里的每个机器都至少有2个连接||NONE|
51-
| maxConnectionsPerHost | 和Cassandra集群里的每个机器都最多有6个连接||NONE|
52-
| maxQueueSize | Cassandra队列大小||NONE|
53-
| readTimeoutMillis | Cassandra读超时||NONE|
54-
| connectTimeoutMillis | Cassandra连接超时||NONE|
55-
| poolTimeoutMillis | Cassandra线程池超时||NONE|
52+
| maxRequestsPerConnection | 每个连接允许的并发请求数||1|
53+
| coreConnectionsPerHost | 每台主机连接的核心数||8|
54+
| maxConnectionsPerHost | Cassandra集群里的每个机器都最多连接数||32768|
55+
| maxQueueSize | Cassandra队列大小||100000|
56+
| readTimeoutMillis | Cassandra读超时||60000|
57+
| connectTimeoutMillis | Cassandra连接超时||60000|
58+
| poolTimeoutMillis | Cassandra线程池超时||60000|
5659

5760
----------
5861
> 缓存策略
59-
* NONE: 不做内存缓存
60-
* LRU:
61-
* cacheSize: 缓存的条目数量
62-
* cacheTTLMs:缓存的过期时间(ms)
62+
- NONE:不做内存缓存。每条流数据触发一次维表查询操作。
63+
- ALL: 任务启动时,一次性加载所有数据到内存,并进行缓存。适用于维表数据量较小的情况。
64+
- LRU: 任务执行时,根据维表关联条件使用异步算子加载维表数据,并进行缓存。
6365

6466

65-
## 5.样例
67+
## 5.维表定义样例
68+
69+
### ALL全量维表定义
6670
```
67-
create table sideTable(
68-
CHANNEL varchar,
69-
XCCOUNT int,
70-
PRIMARY KEY(channel),
71+
CREATE TABLE sideTable(
72+
id bigint,
73+
school varchar,
74+
home varchar,
75+
PRIMARY KEY(id),
7176
PERIOD FOR SYSTEM_TIME
72-
)WITH(
77+
)WITH(
78+
type='mysql',
79+
url='jdbc:mysql://172.16.8.109:3306/tiezhu',
80+
userName='dtstack',
81+
password='abc123',
82+
tableName='stressTest',
83+
cache='ALL',
84+
parallelism='1'
85+
);
86+
```
87+
### LRU异步维表定义
88+
```
89+
CREATE TABLE sideTable(
90+
id bigint,
91+
message varchar,
92+
PRIMARY KEY(id),
93+
PERIOD FOR SYSTEM_TIME
94+
)WITH(
7395
type ='cassandra',
74-
address ='172.21.32.1:9042,172.21.32.1:9042',
75-
database ='test',
76-
tableName ='sidetest',
96+
address ='192.168.80.106:9042, 192.168.80.107:9042',
97+
database ='tiezhu',
98+
tableName ='stu',
99+
userName='cassandra',
100+
password='cassandra',
77101
cache ='LRU',
78102
parallelism ='1',
79103
partitionedJoin='false'
80-
);
104+
);
105+
```
106+
## 6.完整样例
107+
```
108+
CREATE TABLE MyTable(
109+
id bigint,
110+
name varchar,
111+
address varchar
112+
)WITH(
113+
type = 'kafka10',
114+
bootstrapServers = '172.16.101.224:9092',
115+
zookeeperQuorm = '172.16.100.188:2181/kafka',
116+
offsetReset = 'latest',
117+
topic = 'tiezhu_test_in2',
118+
timezone = 'Asia/Shanghai',
119+
topicIsPattern = 'false',
120+
parallelism = '1'
121+
);
81122
123+
CREATE TABLE sideTable(
124+
id bigint,
125+
message varchar,
126+
PRIMARY KEY(id),
127+
PERIOD FOR SYSTEM_TIME
128+
)WITH(
129+
type ='cassandra',
130+
address ='192.168.80.106:9042, 192.168.80.107:9042',
131+
database ='tiezhu',
132+
tableName ='stu',
133+
userName='cassandra',
134+
password='cassandra',
135+
cache ='LRU',
136+
parallelism ='1',
137+
partitionedJoin='false'
138+
);
139+
140+
CREATE TABLE MyResult(
141+
id bigint,
142+
name varchar,
143+
address varchar,
144+
message varchar
145+
)WITH(
146+
type ='cassandra',
147+
address ='192.168.80.106:9042,192.168.80.107:9042',
148+
userName='cassandra',
149+
password='cassandra',
150+
database ='tiezhu',
151+
tableName ='stu_out',
152+
parallelism ='1'
153+
);
82154
155+
insert
156+
into
157+
MyResult
158+
select
159+
t1.id AS id,
160+
t1.name AS name,
161+
t1.address AS address,
162+
t2.message AS message
163+
from
164+
(
165+
select
166+
id,
167+
name,
168+
address
169+
from
170+
MyTable
171+
) t1
172+
join sideTable t2
173+
on t1.id = t2.id;
83174
```
84175

85176

docs/plugin/cassandraSink.md

Lines changed: 106 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ CREATE TABLE tableName(
1717
```
1818

1919
## 2.支持版本
20-
cassandra-3.6.x
20+
cassandra-3.x
2121

2222
## 3.表结构定义
2323

@@ -27,7 +27,7 @@ CREATE TABLE tableName(
2727
| colName | 列名称|
2828
| colType | 列类型 [colType支持的类型](docs/colType.md)|
2929

30-
## 4.参数
30+
## 4.参数
3131

3232
|参数名称|含义|是否必填|默认值|
3333
|----|----|----|----|
@@ -38,26 +38,113 @@ CREATE TABLE tableName(
3838
|tableName | cassandra表名称|||
3939
|database | cassandra表名称|||
4040
|parallelism | 并行度设置||1|
41-
|maxRequestsPerConnection | 每个连接最多允许64个并发请求||NONE|
42-
|coreConnectionsPerHost | 和Cassandra集群里的每个机器都至少有2个连接||NONE|
43-
|maxConnectionsPerHost | 和Cassandra集群里的每个机器都最多有6个连接||NONE|
44-
|maxQueueSize | Cassandra队列大小||NONE|
45-
|readTimeoutMillis | Cassandra读超时||NONE|
46-
|connectTimeoutMillis | Cassandra连接超时||NONE|
47-
|poolTimeoutMillis | Cassandra线程池超时||NONE|
41+
| maxRequestsPerConnection | 每个连接允许的并发请求数||1|
42+
| coreConnectionsPerHost | 每台主机连接的核心数||8|
43+
| maxConnectionsPerHost | Cassandra集群里的每个机器都最多连接数||32768|
44+
| maxQueueSize | Cassandra队列大小||100000|
45+
| readTimeoutMillis | Cassandra读超时||60000|
46+
| connectTimeoutMillis | Cassandra连接超时||60000|
47+
| poolTimeoutMillis | Cassandra线程池超时||60000|
4848

49-
## 5.样例
49+
## 5.完整样例
5050
```
51+
CREATE TABLE MyTable(
52+
id bigint,
53+
name varchar,
54+
address varchar
55+
)WITH(
56+
type = 'kafka10',
57+
bootstrapServers = '172.16.101.224:9092',
58+
zookeeperQuorm = '172.16.100.188:2181/kafka',
59+
offsetReset = 'latest',
60+
topic = 'tiezhu_test_in2',
61+
timezone = 'Asia/Shanghai',
62+
topicIsPattern = 'false',
63+
parallelism = '1'
64+
);
65+
66+
CREATE TABLE sideTable(
67+
id bigint,
68+
message varchar,
69+
PRIMARY KEY(id),
70+
PERIOD FOR SYSTEM_TIME
71+
)WITH(
72+
type ='cassandra',
73+
address ='192.168.80.106:9042, 192.168.80.107:9042',
74+
database ='tiezhu',
75+
tableName ='stu',
76+
userName='cassandra',
77+
password='cassandra',
78+
cache ='LRU',
79+
parallelism ='1',
80+
partitionedJoin='false'
81+
);
82+
5183
CREATE TABLE MyResult(
52-
channel VARCHAR,
53-
pv VARCHAR
84+
id bigint,
85+
name varchar,
86+
address varchar,
87+
message varchar
5488
)WITH(
5589
type ='cassandra',
56-
address ='172.21.32.1:9042,172.21.32.1:9042',
57-
userName ='dtstack',
58-
password ='abc123',
59-
database ='test',
60-
tableName ='pv',
90+
address ='192.168.80.106:9042,192.168.80.107:9042',
91+
userName='cassandra',
92+
password='cassandra',
93+
database ='tiezhu',
94+
tableName ='stu_out',
6195
parallelism ='1'
62-
)
63-
```
96+
);
97+
98+
insert
99+
into
100+
MyResult
101+
select
102+
t1.id AS id,
103+
t1.name AS name,
104+
t1.address AS address,
105+
t2.message AS message
106+
from
107+
(
108+
select
109+
id,
110+
name,
111+
address
112+
from
113+
MyTable
114+
) t1
115+
join sideTable t2
116+
on t1.id = t2.id;
117+
```
118+
### 6.结果表数据展示
119+
```
120+
cqlsh:tiezhu> desc stu_out
121+
122+
CREATE TABLE tiezhu.stu_out (
123+
id int PRIMARY KEY,
124+
address text,
125+
message text,
126+
name text
127+
) WITH bloom_filter_fp_chance = 0.01
128+
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
129+
AND comment = ''
130+
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
131+
AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
132+
AND crc_check_chance = 1.0
133+
AND dclocal_read_repair_chance = 0.1
134+
AND default_time_to_live = 0
135+
AND gc_grace_seconds = 864000
136+
AND max_index_interval = 2048
137+
AND memtable_flush_period_in_ms = 0
138+
AND min_index_interval = 128
139+
AND read_repair_chance = 0.0
140+
AND speculative_retry = '99PERCENTILE';
141+
142+
143+
cqlsh:tiezhu> select * from stu_out limit 1;
144+
145+
id | address | message | name
146+
----+------------+------------------+----------
147+
23 | hangzhou23 | flinkStreamSql23 | tiezhu23
148+
149+
(1 rows)
150+
```

0 commit comments

Comments
 (0)