Skip to content

Commit d6f72c9

Browse files
committed
mongo doc
1 parent 2e4721b commit d6f72c9

File tree

2 files changed

+170
-47
lines changed

2 files changed

+170
-47
lines changed

docs/plugin/mongoSide.md

Lines changed: 106 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -26,53 +26,123 @@
2626

2727
## 3.表结构定义
2828

29-
|参数名称|含义|
30-
|----|---|
31-
| tableName | 注册到flink的表名称(可选填;不填默认和hbase对应的表名称相同)|
32-
| colName | 列名称|
33-
| colType | 列类型 [colType支持的类型](docs/colType.md)|
34-
| PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息|
35-
| PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开|
29+
[通用维表参数信息](docs/plugin/sideParams.md)
3630

37-
## 4.参数
38-
39-
|参数名称|含义|是否必填|默认值|
40-
|----|---|---|----|
41-
| type |表明 输出表类型 mongo|||
42-
| address | 连接mongo数据库 jdbcUrl |||
43-
| userName | mongo连接用户名|||
44-
| password | mongo连接密码|||
45-
| tableName | mongo表名称|||
46-
| database | mongo表名称|||
47-
| cache | 维表缓存策略(NONE/LRU)||NONE|
48-
| partitionedJoin | 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量)||false|
49-
50-
----------
51-
> 缓存策略
52-
* NONE: 不做内存缓存
53-
* LRU:
54-
* cacheSize: 缓存的条目数量
55-
* cacheTTLMs:缓存的过期时间(ms)
56-
31+
32+
mongo相关参数配置:
33+
34+
|参数名称|含义|是否必填|默认值|
35+
|----|---|---|----|
36+
| type |表明 输出表类型 mongo|||
37+
| address | 连接mongo数据库 jdbcUrl |||
38+
| userName | mongo连接用户名|||
39+
| password | mongo连接密码|||
40+
| tableName | mongo表名称|||
41+
| database | mongo表名称|||
42+
43+
## 4.样例
5744

58-
## 5.样例
45+
46+
### 全量维表结构
47+
48+
```aidl
49+
CREATE TABLE source2(
50+
id int,
51+
address VARCHAR,
52+
PERIOD FOR SYSTEM_TIME
53+
)WITH(
54+
type ='mongo',
55+
address ='172.16.8.193:27017',
56+
database ='dtstack',
57+
tableName ='userInfo',
58+
cache ='ALL',
59+
parallelism ='1',
60+
partitionedJoin='false'
61+
);
5962
```
60-
create table sideTable(
61-
CHANNEL varchar,
62-
XCCOUNT int,
63-
PRIMARY KEY(channel),
63+
64+
### 异步维表结构
65+
66+
```aidl
67+
CREATE TABLE source2(
68+
id int,
69+
address VARCHAR,
6470
PERIOD FOR SYSTEM_TIME
65-
)WITH(
71+
)WITH(
6672
type ='mongo',
67-
address ='172.21.32.1:27017,172.21.32.1:27017',
68-
database ='test',
69-
tableName ='sidetest',
73+
address ='172.16.8.193:27017',
74+
database ='dtstack',
75+
tableName ='userInfo',
7076
cache ='LRU',
7177
parallelism ='1',
7278
partitionedJoin='false'
79+
);
80+
81+
```
82+
83+
### 异步维表关联样例
84+
85+
```
86+
87+
CREATE TABLE source1 (
88+
id int,
89+
name VARCHAR
90+
)WITH(
91+
type ='kafka11',
92+
bootstrapServers ='172.16.8.107:9092',
93+
zookeeperQuorum ='172.16.8.107:2181/kafka',
94+
offsetReset ='latest',
95+
topic ='mqTest03',
96+
timezone='Asia/Shanghai',
97+
topicIsPattern ='false'
7398
);
7499
75100
101+
CREATE TABLE source2(
102+
id int,
103+
address VARCHAR,
104+
PERIOD FOR SYSTEM_TIME
105+
)WITH(
106+
type ='mongo',
107+
address ='172.16.8.193:27017',
108+
database ='dtstack',
109+
tableName ='userInfo',
110+
cache ='ALL',
111+
parallelism ='1',
112+
partitionedJoin='false'
113+
);
114+
115+
116+
CREATE TABLE MyResult(
117+
id int,
118+
name VARCHAR,
119+
address VARCHAR,
120+
primary key (id)
121+
)WITH(
122+
type='console'
123+
);
124+
125+
insert into MyResult
126+
select
127+
s1.id,
128+
s1.name,
129+
s2.address
130+
from
131+
source1 s1
132+
left join
133+
source2 s2
134+
on
135+
s1.id = s2.id
136+
137+
76138
```
77139

78140

141+
维表数据:{"id": 1001,"address":"hz""}
142+
143+
源表数据:{"name":"maqi","id":1001}
144+
145+
146+
输出结果: (1001,maqi,hz)
147+
148+

docs/plugin/mongoSink.md

Lines changed: 64 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,70 @@ CREATE TABLE tableName(
4040
|parallelism | 并行度设置||1|
4141

4242
## 5.样例:
43+
44+
双流join并插入mongo
45+
4346
```
47+
48+
CREATE TABLE source1 (
49+
id int,
50+
name VARCHAR
51+
)WITH(
52+
type ='kafka11',
53+
bootstrapServers ='172.16.8.107:9092',
54+
zookeeperQuorum ='172.16.8.107:2181/kafka',
55+
offsetReset ='latest',
56+
topic ='mqTest03',
57+
timezone='Asia/Shanghai',
58+
topicIsPattern ='false'
59+
);
60+
61+
62+
63+
CREATE TABLE source2(
64+
id int,
65+
address VARCHAR
66+
)WITH(
67+
type ='kafka11',
68+
bootstrapServers ='172.16.8.107:9092',
69+
zookeeperQuorum ='172.16.8.107:2181/kafka',
70+
offsetReset ='latest',
71+
topic ='mqTest04',
72+
timezone='Asia/Shanghai',
73+
topicIsPattern ='false'
74+
);
75+
76+
4477
CREATE TABLE MyResult(
45-
channel VARCHAR,
46-
pv VARCHAR
47-
)WITH(
78+
id int,
79+
name VARCHAR,
80+
address VARCHAR,
81+
primary key (id)
82+
)WITH(
4883
type ='mongo',
49-
address ='172.21.32.1:27017,172.21.32.1:27017',
50-
userName ='dtstack',
51-
password ='abc123',
52-
database ='test',
53-
tableName ='pv',
54-
parallelism ='1'
55-
)
56-
```
84+
address ='172.16.8.193:27017',
85+
database ='dtstack',
86+
tableName ='userInfo'
87+
);
88+
89+
insert into MyResult
90+
select
91+
s1.id,
92+
s1.name,
93+
s2.address
94+
from
95+
source1 s1
96+
left join
97+
source2 s2
98+
on
99+
s1.id = s2.id
100+
101+
102+
```
103+
104+
105+
数据结果:
106+
107+
向Topic mqTest03 发送数据 {"name":"maqi","id":1001} 插入 (1001,"maqi",null)
108+
109+
向Topic mqTest04 发送数据 {"address":"hz","id":1001} 插入 (1001,"maqi","hz")

0 commit comments

Comments
 (0)