Skip to content

Commit 5880711

Browse files
committed
更新cassandra-side文档
1 parent 0304f32 commit 5880711

File tree

1 file changed

+115
-24
lines changed

1 file changed

+115
-24
lines changed

docs/plugin/cassandraSide.md

Lines changed: 115 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11

22
## 1.格式:
3+
4+
通过建表语句中的` PERIOD FOR SYSTEM_TIME`将表标识为维表,其中`PRIMARY KEY(keyInfo)`中的keyInfo,表示用来和源表进行关联的字段,
5+
维表JOIN的条件必须与`keyInfo`字段一致。
36
```
47
CREATE TABLE tableName(
58
colName cloType,
@@ -22,15 +25,15 @@
2225
```
2326

2427
# 2.支持版本
25-
cassandra-3.6.x
28+
cassandra-3.x
2629

2730
## 3.表结构定义
2831

2932
|参数名称|含义|
3033
|----|---|
31-
| tableName | 注册到flink的表名称(可选填;不填默认和hbase对应的表名称相同)|
34+
| tableName | 注册到flink的表名称(可选填;不填默认和cassandra对应的表名称相同)|
3235
| colName | 列名称|
33-
| colType | 列类型 [colType支持的类型](docs/colType.md)|
36+
| colType | 列类型|
3437
| PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息|
3538
| PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开|
3639

@@ -46,40 +49,128 @@
4649
| database | cassandra表名称|||
4750
| cache | 维表缓存策略(NONE/LRU)||NONE|
4851
| partitionedJoin | 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量)||false|
49-
| maxRequestsPerConnection | 每个连接最多允许64个并发请求||NONE|
50-
| coreConnectionsPerHost | 和Cassandra集群里的每个机器都至少有2个连接||NONE|
51-
| maxConnectionsPerHost | 和Cassandra集群里的每个机器都最多有6个连接||NONE|
52-
| maxQueueSize | Cassandra队列大小||NONE|
53-
| readTimeoutMillis | Cassandra读超时||NONE|
54-
| connectTimeoutMillis | Cassandra连接超时||NONE|
55-
| poolTimeoutMillis | Cassandra线程池超时||NONE|
52+
| maxRequestsPerConnection | 每个连接最多允许64个并发请求||1|
53+
| coreConnectionsPerHost | Cassandra集群里的每个机器都最少连接数||8|
54+
| maxConnectionsPerHost | Cassandra集群里的每个机器都最多连接数||32768|
55+
| maxQueueSize | Cassandra队列大小||100000|
56+
| readTimeoutMillis | Cassandra读超时||60000|
57+
| connectTimeoutMillis | Cassandra连接超时||60000|
58+
| poolTimeoutMillis | Cassandra线程池超时||60000|
5659

5760
----------
5861
> 缓存策略
59-
* NONE: 不做内存缓存
60-
* LRU:
61-
* cacheSize: 缓存的条目数量
62-
* cacheTTLMs:缓存的过期时间(ms)
62+
- NONE:不做内存缓存。每条流数据触发一次维表查询操作。
63+
- ALL: 任务启动时,一次性加载所有数据到内存,并进行缓存。适用于维表数据量较小的情况。
64+
- LRU: 任务执行时,根据维表关联条件使用异步算子加载维表数据,并进行缓存。
6365

6466

65-
## 5.样例
67+
## 5.维表定义样例
68+
69+
### ALL全量维表定义
6670
```
67-
create table sideTable(
68-
CHANNEL varchar,
69-
XCCOUNT int,
70-
PRIMARY KEY(channel),
71+
CREATE TABLE sideTable(
72+
id bigint,
73+
school varchar,
74+
home varchar,
75+
PRIMARY KEY(id),
7176
PERIOD FOR SYSTEM_TIME
72-
)WITH(
77+
)WITH(
78+
type='mysql',
79+
url='jdbc:mysql://172.16.8.109:3306/tiezhu',
80+
userName='dtstack',
81+
password='abc123',
82+
tableName='stressTest',
83+
cache='ALL',
84+
parallelism='1'
85+
);
86+
```
87+
### LRU异步维表定义
88+
```
89+
CREATE TABLE sideTable(
90+
id bigint,
91+
message varchar,
92+
PRIMARY KEY(id),
93+
PERIOD FOR SYSTEM_TIME
94+
)WITH(
7395
type ='cassandra',
74-
address ='172.21.32.1:9042,172.21.32.1:9042',
75-
database ='test',
76-
tableName ='sidetest',
96+
address ='192.168.80.106:9042, 192.168.80.107:9042',
97+
database ='tiezhu',
98+
tableName ='stu',
99+
userName='cassandra',
100+
password='cassandra',
77101
cache ='LRU',
78102
parallelism ='1',
79103
partitionedJoin='false'
80-
);
104+
);
105+
```
106+
## 6.完整样例
107+
```
108+
CREATE TABLE MyTable(
109+
id bigint,
110+
name varchar,
111+
address varchar
112+
)WITH(
113+
type = 'kafka10',
114+
bootstrapServers = '172.16.101.224:9092',
115+
zookeeperQuorm = '172.16.100.188:2181/kafka',
116+
offsetReset = 'latest',
117+
topic = 'tiezhu_test_in2',
118+
timezone = 'Asia/Shanghai',
119+
topicIsPattern = 'false',
120+
parallelism = '1'
121+
);
81122
123+
CREATE TABLE sideTable(
124+
id bigint,
125+
message varchar,
126+
PRIMARY KEY(id),
127+
PERIOD FOR SYSTEM_TIME
128+
)WITH(
129+
type ='cassandra',
130+
address ='192.168.80.106:9042, 192.168.80.107:9042',
131+
database ='tiezhu',
132+
tableName ='stu',
133+
userName='cassandra',
134+
password='cassandra',
135+
cache ='LRU',
136+
parallelism ='1',
137+
partitionedJoin='false'
138+
);
139+
140+
CREATE TABLE MyResult(
141+
id bigint,
142+
name varchar,
143+
address varchar,
144+
message varchar
145+
)WITH(
146+
type ='cassandra',
147+
address ='192.168.80.106:9042,192.168.80.107:9042',
148+
userName='cassandra',
149+
password='cassandra',
150+
database ='tiezhu',
151+
tableName ='stu_out',
152+
parallelism ='1'
153+
);
82154
155+
insert
156+
into
157+
MyResult
158+
select
159+
t1.id AS id,
160+
t1.name AS name,
161+
t1.address AS address,
162+
t2.message AS message
163+
from
164+
(
165+
select
166+
id,
167+
name,
168+
address
169+
from
170+
MyTable
171+
) t1
172+
join sideTable t2
173+
on t1.id = t2.id;
83174
```
84175

85176

0 commit comments

Comments
 (0)