Skip to content

Commit 4481c3a

Browse files
polardb
1 parent 812a110 commit 4481c3a

File tree

16 files changed

+728
-0
lines changed

16 files changed

+728
-0
lines changed

docs/polardbSide.md

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
## 1.格式:
2+
```
3+
CREATE TABLE tableName(
4+
colName cloType,
5+
...
6+
PRIMARY KEY(keyInfo),
7+
PERIOD FOR SYSTEM_TIME
8+
)WITH(
9+
type='polardb',
10+
url='jdbcUrl',
11+
userName='dbUserName',
12+
password='dbPwd',
13+
tableName='tableName',
14+
cache ='LRU',
15+
cacheSize ='10000',
16+
cacheTTLMs ='60000',
17+
parallelism ='1',
18+
partitionedJoin='false'
19+
);
20+
```
21+
22+
# 2.支持版本
23+
mysql-8.0.16
24+
25+
## 3.表结构定义
26+
27+
|参数名称|含义|
28+
|----|---|
29+
| tableName | polardb表名称|
30+
| colName | 列名称|
31+
| colType | 列类型 [colType支持的类型](colType.md)|
32+
| PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息|
33+
| PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开|
34+
35+
## 4.参数
36+
37+
|参数名称|含义|是否必填|默认值|
38+
|----|---|---|----|
39+
| type | 表明维表的类型 polardb |||
40+
| url | 连接polardb数据库 jdbcUrl |||
41+
| userName | ploardb连接用户名 |||
42+
| password | ploardb连接密码|||
43+
| tableName | ploardb表名称|||
44+
| cache | 维表缓存策略(NONE/LRU)||NONE|
45+
| partitionedJoin | 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量)||false|
46+
47+
----------
48+
> 缓存策略
49+
* NONE: 不做内存缓存
50+
* LRU:
51+
* cacheSize: 缓存的条目数量
52+
* cacheTTLMs:缓存的过期时间(ms)
53+
* cacheMode: (unordered|ordered)异步加载是有序还是无序,默认有序。
54+
* asyncCapacity:异步请求容量,默认1000
55+
* asyncTimeout:异步请求超时时间,默认10000毫秒
56+
57+
## 5.样例
58+
```
59+
create table sideTable(
60+
channel varchar,
61+
xccount int,
62+
PRIMARY KEY(channel),
63+
PERIOD FOR SYSTEM_TIME
64+
)WITH(
65+
type='polardb',
66+
url='jdbc:mysql://xxx.xxx.xxx:3306/test?charset=utf8',
67+
userName='dtstack',
68+
password='abc123',
69+
tableName='sidetest',
70+
cache ='LRU',
71+
cacheSize ='10000',
72+
cacheTTLMs ='60000',
73+
cacheMode='unordered',
74+
asyncCapacity='1000',
75+
asyncTimeout='10000'
76+
parallelism ='1',
77+
partitionedJoin='false'
78+
);
79+
```

docs/polardbSink.md

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
## 1.格式:
2+
```
3+
CREATE TABLE tableName(
4+
colName colType,
5+
...
6+
colNameX colType
7+
)WITH(
8+
type ='polardb',
9+
url ='jdbcUrl',
10+
userName ='userName',
11+
password ='pwd',
12+
tableName ='tableName',
13+
parallelism ='parllNum'
14+
);
15+
16+
```
17+
18+
## 2.支持版本
19+
mysql-8.0.16
20+
21+
## 3.表结构定义
22+
23+
|参数名称|含义|
24+
|----|---|
25+
| tableName| polardb表名称|
26+
| colName | 列名称|
27+
| colType | 列类型 [colType支持的类型](colType.md)|
28+
29+
## 4.参数:
30+
31+
|参数名称|含义|是否必填|默认值|
32+
|----|----|----|----|
33+
|type |表名 输出表类型 polardb|||
34+
|url | 连接polardb数据库 jdbcUrl |||
35+
|userName | polardb连接用户名 |||
36+
| password | polardb连接密码|||
37+
| tableName | polardb表名称|||
38+
| parallelism | 并行度设置||1|
39+
40+
## 5.样例:
41+
```
42+
CREATE TABLE MyResult(
43+
channel VARCHAR,
44+
pv VARCHAR
45+
)WITH(
46+
type ='polardb',
47+
url ='jdbc:mysql://xxx.xxx.xxx:3306/test?charset=utf8',
48+
userName ='dtstack',
49+
password ='abc123',
50+
tableName ='pv2',
51+
parallelism ='1'
52+
);
53+
```
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.side.polardb</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>sql.side.all.polardb</artifactId>
13+
<name>polardb-all-side</name>
14+
15+
<packaging>jar</packaging>
16+
17+
<properties>
18+
<sql.side.polardb.core.version>1.0-SNAPSHOT</sql.side.polardb.core.version>
19+
</properties>
20+
21+
<dependencies>
22+
<dependency>
23+
<groupId>com.dtstack.flink</groupId>
24+
<artifactId>sql.side.polardb.core</artifactId>
25+
<version>${sql.side.polardb.core.version}</version>
26+
</dependency>
27+
</dependencies>
28+
29+
<build>
30+
<plugins>
31+
<plugin>
32+
<groupId>org.apache.maven.plugins</groupId>
33+
<artifactId>maven-shade-plugin</artifactId>
34+
<version>1.4</version>
35+
<executions>
36+
<execution>
37+
<phase>package</phase>
38+
<goals>
39+
<goal>shade</goal>
40+
</goals>
41+
<configuration>
42+
<artifactSet>
43+
<excludes>
44+
45+
</excludes>
46+
</artifactSet>
47+
<filters>
48+
<filter>
49+
<artifact>*:*</artifact>
50+
<excludes>
51+
<exclude>META-INF/*.SF</exclude>
52+
<exclude>META-INF/*.DSA</exclude>
53+
<exclude>META-INF/*.RSA</exclude>
54+
</excludes>
55+
</filter>
56+
</filters>
57+
</configuration>
58+
</execution>
59+
</executions>
60+
</plugin>
61+
62+
<plugin>
63+
<artifactId>maven-antrun-plugin</artifactId>
64+
<version>1.2</version>
65+
<executions>
66+
<execution>
67+
<id>copy-resources</id>
68+
<!-- here the phase you need -->
69+
<phase>package</phase>
70+
<goals>
71+
<goal>run</goal>
72+
</goals>
73+
<configuration>
74+
<tasks>
75+
<copy todir="${basedir}/../../../plugins/polardballside">
76+
<fileset dir="target/">
77+
<include name="${project.artifactId}-${project.version}.jar"/>
78+
</fileset>
79+
</copy>
80+
81+
<move file="${basedir}/../../../plugins/polardballside/${project.artifactId}-${project.version}.jar"
82+
tofile="${basedir}/../../../plugins/polardballside/${project.name}-${git.branch}.jar"/>
83+
</tasks>
84+
</configuration>
85+
</execution>
86+
</executions>
87+
</plugin>
88+
</plugins>
89+
</build>
90+
91+
92+
</project>
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.dtstack.flink.sql.side.polardb;
2+
3+
import com.dtstack.flink.sql.side.FieldInfo;
4+
import com.dtstack.flink.sql.side.JoinInfo;
5+
import com.dtstack.flink.sql.side.SideTableInfo;
6+
import com.dtstack.flink.sql.side.rdb.all.RdbAllReqRow;
7+
import com.dtstack.flink.sql.util.DtStringUtil;
8+
import com.google.common.collect.Maps;
9+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
13+
import java.sql.Connection;
14+
import java.sql.DriverManager;
15+
import java.util.List;
16+
import java.util.Map;
17+
18+
public class PolardbAllReqRow extends RdbAllReqRow {
19+
20+
private static final long serialVersionUID = 2098635140857937717L;
21+
22+
private static final Logger LOG = LoggerFactory.getLogger(PolardbAllReqRow.class);
23+
24+
private static final String POLARDB_DRIVER = "com.mysql.cj.jdbc.Driver";
25+
26+
public PolardbAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
27+
super(new PolardbAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
28+
}
29+
30+
@Override
31+
public Connection getConn(String dbURL, String userName, String password) {
32+
try {
33+
Class.forName(POLARDB_DRIVER);
34+
//add param useCursorFetch=true
35+
Map<String, String> addParams = Maps.newHashMap();
36+
addParams.put("useCursorFetch", "true");
37+
String targetDbUrl = DtStringUtil.addJdbcParam(dbURL, addParams, true);
38+
return DriverManager.getConnection(targetDbUrl, userName, password);
39+
} catch (Exception e) {
40+
LOG.error("", e);
41+
throw new RuntimeException("", e);
42+
}
43+
}
44+
45+
@Override
46+
public int getFetchSize() {
47+
return Integer.MIN_VALUE;
48+
}
49+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.dtstack.flink.sql.side.polardb;
2+
3+
import com.dtstack.flink.sql.side.FieldInfo;
4+
import com.dtstack.flink.sql.side.JoinInfo;
5+
import com.dtstack.flink.sql.side.SideTableInfo;
6+
import com.dtstack.flink.sql.side.rdb.all.RdbAllSideInfo;
7+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
8+
9+
import java.util.List;
10+
11+
public class PolardbAllSideInfo extends RdbAllSideInfo {
12+
public PolardbAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
13+
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
14+
}
15+
}
16+
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.side.polardb</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>sql.side.async.polardb</artifactId>
13+
<name>polardb-async-side</name>
14+
<packaging>jar</packaging>
15+
16+
<properties>
17+
<sql.side.polardb.core.version>1.0-SNAPSHOT</sql.side.polardb.core.version>
18+
</properties>
19+
20+
<dependencies>
21+
<dependency>
22+
<groupId>com.dtstack.flink</groupId>
23+
<artifactId>sql.side.polardb.core</artifactId>
24+
<version>${sql.side.polardb.core.version}</version>
25+
</dependency>
26+
</dependencies>
27+
28+
<build>
29+
<plugins>
30+
<plugin>
31+
<groupId>org.apache.maven.plugins</groupId>
32+
<artifactId>maven-shade-plugin</artifactId>
33+
<version>1.4</version>
34+
<executions>
35+
<execution>
36+
<phase>package</phase>
37+
<goals>
38+
<goal>shade</goal>
39+
</goals>
40+
<configuration>
41+
<artifactSet>
42+
<excludes>
43+
44+
</excludes>
45+
</artifactSet>
46+
<filters>
47+
<filter>
48+
<artifact>*:*</artifact>
49+
<excludes>
50+
<exclude>META-INF/*.SF</exclude>
51+
<exclude>META-INF/*.DSA</exclude>
52+
<exclude>META-INF/*.RSA</exclude>
53+
</excludes>
54+
</filter>
55+
</filters>
56+
</configuration>
57+
</execution>
58+
</executions>
59+
</plugin>
60+
61+
<plugin>
62+
<artifactId>maven-antrun-plugin</artifactId>
63+
<version>1.2</version>
64+
<executions>
65+
<execution>
66+
<id>copy-resources</id>
67+
<!-- here the phase you need -->
68+
<phase>package</phase>
69+
<goals>
70+
<goal>run</goal>
71+
</goals>
72+
<configuration>
73+
<tasks>
74+
<copy todir="${basedir}/../../../plugins/polardbasyncside">
75+
<fileset dir="target/">
76+
<include name="${project.artifactId}-${project.version}.jar"/>
77+
</fileset>
78+
</copy>
79+
80+
<move file="${basedir}/../../../plugins/polardbasyncside/${project.artifactId}-${project.version}.jar"
81+
tofile="${basedir}/../../../plugins/polardbasyncside/${project.name}-${git.branch}.jar"/>
82+
</tasks>
83+
</configuration>
84+
</execution>
85+
</executions>
86+
</plugin>
87+
</plugins>
88+
</build>
89+
90+
91+
</project>

0 commit comments

Comments
 (0)