Skip to content

Commit 83571b4

Browse files
committed
postgresql-side clickhouse-sise
1 parent 93844cf commit 83571b4

File tree

2 files changed

+239
-93
lines changed

2 files changed

+239
-93
lines changed

docs/plugin/clickhouseSide.md

Lines changed: 120 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1-
21
## 1.格式:
3-
```
2+
3+
通过建表语句中的` PERIOD FOR SYSTEM_TIME`将表标识为维表,其中`PRIMARY KEY(keyInfo)`中的keyInfo,表示用来和源表进行关联的字段,
4+
维表JOIN的条件必须与`keyInfo`字段一致。
5+
6+
```sql
47
CREATE TABLE tableName(
58
colName cloType,
69
...
@@ -21,65 +24,132 @@
2124
```
2225

2326
# 2.支持版本
27+
2428
19.14.x、19.15.x、19.16.x
25-
29+
2630
## 3.表结构定义
27-
28-
|参数名称|含义|
29-
|----|---|
30-
| tableName | clickhouse表名称|
31-
| colName | 列名称|
32-
| colType | 列类型 [colType支持的类型](docs/colType.md)|
33-
| PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息|
34-
| PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开|
35-
36-
## 4.参数
37-
38-
|参数名称|含义|是否必填|默认值|
39-
|----|---|---|----|
40-
| type | 表明维表的类型 clickhouse |||
41-
| url | 连接clickhouse数据库 jdbcUrl |||
42-
| userName | clickhouse连接用户名 |||
43-
| password | clickhouse连接密码|||
44-
| tableName | clickhouse表名称|||
45-
| tableName | clickhouse 的表名称|||
46-
| cache | 维表缓存策略(NONE/LRU)||NONE|
47-
| partitionedJoin | 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量)||false|
48-
49-
----------
50-
> 缓存策略
51-
* NONE: 不做内存缓存
52-
* LRU:
53-
* cacheSize: 缓存的条目数量
54-
* cacheTTLMs:缓存的过期时间(ms)
55-
* cacheMode: (unordered|ordered)异步加载是有序还是无序,默认有序。
56-
* asyncCapacity:异步请求容量,默认1000
57-
* asyncTimeout:异步请求超时时间,默认10000毫秒
58-
59-
## 5.样例
31+
32+
[维表参数信息](docs/plugin/sideParams.md)
33+
34+
clinkhose独有的参数信息
35+
36+
| 参数名称 | 含义 | 是否必填 | 默认值 |
37+
| -------- | --------------------- | -------- | ------ |
38+
| type | 维表类型, clinkhouse || |
39+
| url | 连接数据库 jdbcUrl || |
40+
| userName | 连接用户名 || |
41+
| password | 连接密码 || |
42+
43+
## 4.样例
44+
45+
------
46+
47+
## ALL全量维表定义
48+
49+
```sql
50+
// 定义全量维表
51+
CREATE TABLE sideTable(
52+
id INT,
53+
name VARCHAR,
54+
PRIMARY KEY(id),
55+
PERIOD FOR SYSTEM_TIME
56+
)WITH(
57+
type ='clickhouse',
58+
url ='jdbc:clickhouse://172.16.8.104:3306/zftest',
59+
userName ='dtstack',
60+
password ='abc123',
61+
tableName ='all_test_clinkhousesql',
62+
cache ='ALL',
63+
cacheTTLMs ='60000',
64+
parallelism ='1'
65+
);
6066
```
61-
create table sideTable(
62-
channel varchar,
63-
xccount int,
64-
PRIMARY KEY(channel),
67+
68+
### LRU异步维表定义
69+
70+
```
71+
CREATE TABLE sideTable(
72+
id INT,
73+
name VARCHAR,
74+
PRIMARY KEY(id) ,
6575
PERIOD FOR SYSTEM_TIME
6676
)WITH(
67-
type='clickhouse',
68-
url='jdbc:clickhouse://172.16.8.104:3306/test?charset=utf8',
69-
userName='dtstack',
70-
password='abc123',
71-
tableName='sidetest',
77+
type ='clickhousesql',
78+
url ='jdbc:clickhousesql://172.16.8.104:3306/zftest',
79+
userName ='dtstack',
80+
password ='abc123',
81+
tableName ='lru_test_clickhousesql',
82+
partitionedJoin ='false',
7283
cache ='LRU',
7384
cacheSize ='10000',
7485
cacheTTLMs ='60000',
75-
cacheMode='unordered',
76-
asyncCapacity='1000',
77-
asyncTimeout='10000'
86+
asyncPoolSize ='3',
87+
parallelism ='1'
88+
);
89+
```
90+
91+
### ClickHouseSQL异步维表关联
92+
93+
```sql
94+
CREATE TABLE MyTable(
95+
id int,
96+
name varchar
97+
)WITH(
98+
type ='kafka11',
99+
bootstrapServers ='172.16.8.107:9092',
100+
zookeeperQuorum ='172.16.8.107:2181/kafka',
101+
offsetReset ='latest',
102+
topic ='cannan_zftest01',
103+
timezone='Asia/Shanghai',
104+
enableKeyPartitions ='false',
105+
topicIsPattern ='false',
106+
parallelism ='1'
107+
);
108+
109+
CREATE TABLE MyResult(
110+
id INT,
111+
name VARCHAR
112+
)WITH(
113+
type ='clickhousesql',
114+
url ='jdbc:clickhousesql://172.16.8.104:3306/zftest',
115+
userName ='dtstack',
116+
password ='abc123',
117+
tableName ='test_clickhouse_zf',
118+
updateMode ='append',
78119
parallelism ='1',
79-
partitionedJoin='false'
120+
batchSize ='100',
121+
batchWaitInterval ='1000'
80122
);
81123

124+
CREATE TABLE sideTable(
125+
id INT,
126+
name VARCHAR,
127+
PRIMARY KEY(id) ,
128+
PERIOD FOR SYSTEM_TIME
129+
)WITH(
130+
type ='clickhousesql',
131+
url ='jdbc:clickhousesql://172.16.8.104:3306/zftest',
132+
userName ='dtstack',
133+
password ='abc123',
134+
tableName ='test_clickhouse_10',
135+
partitionedJoin ='false',
136+
cache ='LRU',
137+
cacheSize ='10000',
138+
cacheTTLMs ='60000',
139+
asyncPoolSize ='3',
140+
parallelism ='1'
141+
);
82142

143+
insert
144+
into
145+
MyResult
146+
select
147+
m.id,
148+
s.name
149+
from
150+
MyTable m
151+
join
152+
sideTable s
153+
on m.id=s.id;
83154
```
84155

85-

docs/plugin/postgresqlSide.md

Lines changed: 119 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1-
21
## 1.格式:
2+
3+
通过建表语句中的` PERIOD FOR SYSTEM_TIME`将表标识为维表,其中`PRIMARY KEY(keyInfo)`中的keyInfo,表示用来和源表进行关联的字段,
4+
维表JOIN的条件必须与`keyInfo`字段一致。
5+
36
```
47
CREATE TABLE tableName(
58
colName cloType,
@@ -21,59 +24,132 @@
2124
```
2225

2326
# 2.支持版本
27+
2428
postgresql-8.2+
25-
29+
2630
## 3.表结构定义
27-
28-
|参数名称|含义|
29-
|----|---|
30-
| tableName | 注册到flink的表名称|
31-
| colName | 列名称|
32-
| colType | 列类型 [colType支持的类型](docs/colType.md)|
33-
| PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息|
34-
| PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开|
35-
36-
## 4.参数
37-
38-
|参数名称|含义|是否必填|默认值|
39-
|----|---|---|----|
40-
| type | 表明维表的类型[postgresql] |||
41-
| url | 连接postgresql数据库 jdbcUrl |||
42-
| userName | postgresql连接用户名 |||
43-
| password | postgresql连接密码|||
44-
| tableName | postgresql表名称|||
45-
| cache | 维表缓存策略(NONE/LRU/ALL)||NONE|
46-
| partitionedJoin | 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量)||false|
47-
48-
----------
49-
> 缓存策略
50-
* NONE: 不做内存缓存
51-
* LRU:
52-
* cacheSize: 缓存的条目数量
53-
* cacheTTLMs:缓存的过期时间(ms)
54-
55-
56-
## 5.样例
31+
32+
[维表参数信息](docs/plugin/sideParams.md)
33+
34+
postgresql独有的参数配置
35+
36+
| 参数名称 | 含义 | 是否必填 | 默认值 |
37+
| -------- | ------------------ | -------- | ------ |
38+
| type | 维表类型, mysql || |
39+
| url | 连接数据库 jdbcUrl || |
40+
| userName | 连接用户名 || |
41+
| password | 连接密码 || |
42+
43+
## 4.样例
44+
45+
### ALL全量维表定义
46+
47+
```sql
48+
// 定义全量维表
49+
CREATE TABLE sideTable(
50+
id INT,
51+
name VARCHAR,
52+
PRIMARY KEY(id) ,
53+
PERIOD FOR SYSTEM_TIME
54+
)WITH(
55+
type ='postgresql',
56+
url ='jdbc:postgresql://172.16.10.194:5432/zftest',
57+
userName ='dtstack',
58+
password ='abc123',
59+
tableName ='all_test_postgresql',
60+
cache ='ALL',
61+
cacheTTLMs ='60000',
62+
parallelism ='1'
63+
);
5764
```
58-
create table sideTable(
59-
channel varchar,
60-
xccount int,
61-
PRIMARY KEY(channel),
65+
66+
### LRU异步维表定义
67+
68+
```sql
69+
CREATE TABLE sideTable(
70+
id INT,
71+
name VARCHAR,
72+
PRIMARY KEY(id) ,
6273
PERIOD FOR SYSTEM_TIME
6374
)WITH(
64-
type='postgresql',
65-
url='jdbc:postgresql://localhost:9001/test?sslmode=disable',
66-
userName='dtstack',
67-
password='abc123',
68-
tableName='sidetest',
75+
type ='postgresql',
76+
url ='jdbc:postgresql://172.16.10.194:5432/zftest',
77+
userName ='dtstack',
78+
password ='abc123',
79+
tableName ='lru_test_postgresql',
80+
partitionedJoin ='false',
6981
cache ='LRU',
7082
cacheSize ='10000',
7183
cacheTTLMs ='60000',
84+
asyncPoolSize ='3',
85+
parallelism ='1'
86+
);
87+
88+
```
89+
90+
### PostgreSQL异步维表关联
91+
92+
```sql
93+
CREATE TABLE MyTable(
94+
id int,
95+
name varchar
96+
)WITH(
97+
type ='kafka11',
98+
bootstrapServers ='172.16.8.107:9092',
99+
zookeeperQuorum ='172.16.8.107:2181/kafka',
100+
offsetReset ='latest',
101+
topic ='cannan_zftest01',
102+
timezone='Asia/Shanghai',
103+
enableKeyPartitions ='false',
104+
topicIsPattern ='false',
105+
parallelism ='1'
106+
);
107+
108+
CREATE TABLE MyResult(
109+
id INT,
110+
name VARCHAR
111+
)WITH(
112+
type ='postgresql',
113+
url ='jdbc:postgresql://172.16.10.194:5432/zftest',
114+
userName ='dtstack',
115+
password ='abc123',
116+
tableName ='test_postgresql_zf',
117+
updateMode ='append',
72118
parallelism ='1',
73-
partitionedJoin='false'
119+
batchSize ='100',
120+
batchWaitInterval ='1000'
74121
);
75122

123+
CREATE TABLE sideTable(
124+
id INT,
125+
name VARCHAR,
126+
PRIMARY KEY(id) ,
127+
PERIOD FOR SYSTEM_TIME
128+
)WITH(
129+
type ='postgresql',
130+
url ='jdbc:postgresql://172.16.10.194:5432/zftest',
131+
userName ='dtstack',
132+
password ='abc123',
133+
tableName ='test_postgresql_10',
134+
partitionedJoin ='false',
135+
cache ='LRU',
136+
cacheSize ='10000',
137+
cacheTTLMs ='60000',
138+
asyncPoolSize ='3',
139+
parallelism ='1'
140+
);
76141

77-
```
142+
insert
143+
into
144+
MyResult
145+
select
146+
m.id,
147+
s.name
148+
from
149+
MyTable m
150+
join
151+
sideTable s
152+
on m.id=s.id;
78153

154+
```
79155

0 commit comments

Comments
 (0)