Skip to content

Commit e0af118

Browse files
WTZ468071157WTZ468071157
authored andcommitted
oceanbase-sink插件
1 parent d124305 commit e0af118

File tree

7 files changed

+256
-6
lines changed

7 files changed

+256
-6
lines changed

core/pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,12 +116,6 @@
116116
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
117117
<version>${flink.version}</version>
118118
</dependency>
119-
120-
<dependency>
121-
<groupId>junit</groupId>
122-
<artifactId>junit</artifactId>
123-
<version>4.12</version>
124-
</dependency>
125119
<dependency>
126120
<groupId>com.aiweiergou</groupId>
127121
<artifactId>tools-logger</artifactId>

oceanbase/oceanbase-sink/pom.xml

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.oceanbase</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>sql.sink.oceanbase</artifactId>
13+
<packaging>jar</packaging>
14+
<name>oceanbase-sink</name>
15+
16+
<properties>
17+
<sql.sink.rdb.version>1.0-SNAPSHOT</sql.sink.rdb.version>
18+
</properties>
19+
20+
<dependencies>
21+
<dependency>
22+
<groupId>com.dtstack.flink</groupId>
23+
<artifactId>sql.sink.rdb</artifactId>
24+
<version>${sql.sink.rdb.version}</version>
25+
</dependency>
26+
</dependencies>
27+
28+
<build>
29+
<plugins>
30+
<plugin>
31+
<groupId>org.apache.maven.plugins</groupId>
32+
<artifactId>maven-shade-plugin</artifactId>
33+
<version>1.4</version>
34+
<executions>
35+
<execution>
36+
<phase>package</phase>
37+
<goals>
38+
<goal>shade</goal>
39+
</goals>
40+
<configuration>
41+
<artifactSet>
42+
<excludes>
43+
44+
</excludes>
45+
</artifactSet>
46+
<filters>
47+
<filter>
48+
<artifact>*:*</artifact>
49+
<excludes>
50+
<exclude>META-INF/*.SF</exclude>
51+
<exclude>META-INF/*.DSA</exclude>
52+
<exclude>META-INF/*.RSA</exclude>
53+
</excludes>
54+
</filter>
55+
</filters>
56+
</configuration>
57+
</execution>
58+
</executions>
59+
</plugin>
60+
61+
<plugin>
62+
<artifactId>maven-antrun-plugin</artifactId>
63+
<version>1.2</version>
64+
<executions>
65+
<execution>
66+
<id>copy-resources</id>
67+
<!-- here the phase you need -->
68+
<phase>package</phase>
69+
<goals>
70+
<goal>run</goal>
71+
</goals>
72+
<configuration>
73+
<tasks>
74+
<copy todir="${basedir}/../../plugins/oceanbasesink">
75+
<fileset dir="target/">
76+
<include name="${project.artifactId}-${project.version}.jar"/>
77+
</fileset>
78+
</copy>
79+
80+
<move file="${basedir}/../../plugins/oceanbasesink/${project.artifactId}-${project.version}.jar"
81+
tofile="${basedir}/../../plugins/oceanbasesink/${project.name}-${git.branch}.jar"/>
82+
</tasks>
83+
</configuration>
84+
</execution>
85+
</executions>
86+
</plugin>
87+
</plugins>
88+
</build>
89+
90+
</project>
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package com.dtstack.flink.sql.sink.ocean;
2+
3+
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
4+
5+
import java.util.Arrays;
6+
import java.util.Optional;
7+
import java.util.stream.Collectors;
8+
9+
/**
10+
* @author : tiezhu
11+
* @date : 2020/3/24
12+
*/
13+
public class OceanbaseDialect implements JDBCDialect {
14+
@Override
15+
public boolean canHandle(String url) {
16+
return url.startsWith("jdbc:mysql:");
17+
}
18+
19+
@Override
20+
public Optional<String> defaultDriverName() {
21+
return Optional.of("com.mysql.jdbc.Driver");
22+
}
23+
24+
@Override
25+
public String quoteIdentifier(String identifier) {
26+
return "`" + identifier + "`";
27+
}
28+
29+
@Override
30+
public Optional<String> getUpsertStatement(String schema,
31+
String tableName,
32+
String[] fieldNames,
33+
String[] uniqueKeyFields,
34+
boolean allReplace) {
35+
return allReplace ?
36+
buildReplaceIntoStatement(tableName, fieldNames) :
37+
buildDuplicateUpsertStatement(tableName, fieldNames);
38+
}
39+
40+
private Optional<String> buildDuplicateUpsertStatement(String tableName, String[] fieldsName) {
41+
String updateClause = Arrays.stream(fieldsName).map(f -> quoteIdentifier(f)
42+
+ "IFNULL(VALUES(" + quoteIdentifier(f) + ")," + quoteIdentifier(f) + ")")
43+
.collect(Collectors.joining(","));
44+
return Optional.of(getInsertIntoStatement("", tableName, fieldsName, null) +
45+
" ON DUPLICATE KEY UPDATE " + updateClause
46+
);
47+
}
48+
49+
private Optional<String> buildReplaceIntoStatement(String tableName, String[] fieldsNames) {
50+
String columns = Arrays.stream(fieldsNames)
51+
.map(this::quoteIdentifier)
52+
.collect(Collectors.joining(","));
53+
String placeholders = Arrays.stream(fieldsNames)
54+
.map(f -> "?")
55+
.collect(Collectors.joining(","));
56+
return Optional.of("REPLACE INTO " + quoteIdentifier(tableName) +
57+
"(" + columns + ") VALUES (" + placeholders + ")");
58+
}
59+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package com.dtstack.flink.sql.sink.ocean;
2+
3+
import com.dtstack.flink.sql.sink.IStreamSinkGener;
4+
import com.dtstack.flink.sql.sink.rdb.AbstractRdbSink;
5+
import com.dtstack.flink.sql.sink.rdb.JDBCOptions;
6+
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
7+
import com.dtstack.flink.sql.sink.rdb.format.JDBCUpsertOutputFormat;
8+
9+
/**
10+
* @author : tiezhu
11+
* @date : 2020/3/24
12+
*/
13+
public class OceanbaseSink extends AbstractRdbSink implements IStreamSinkGener<AbstractRdbSink> {
14+
15+
private static final String OCEANBESE_DRIVER = "com.mysql.jdbc.Driver";
16+
17+
public OceanbaseSink() {
18+
super(new OceanbaseDialect());
19+
}
20+
21+
@Override
22+
public JDBCUpsertOutputFormat getOutputFormat() {
23+
JDBCOptions oceanbaseOptions = JDBCOptions.builder()
24+
.setDbUrl(dbUrl)
25+
.setDialect(jdbcDialect)
26+
.setUsername(userName)
27+
.setPassword(password)
28+
.setTableName(tableName)
29+
.build();
30+
31+
return JDBCUpsertOutputFormat.builder()
32+
.setOptions(oceanbaseOptions)
33+
.setFieldNames(fieldNames)
34+
.setFlushMaxSize(batchNum)
35+
.setFlushIntervalMills(batchWaitInterval)
36+
.setFieldTypes(sqlTypes)
37+
.setKeyFields(primaryKeys)
38+
.setAllReplace(allReplace)
39+
.setUpdateMode(updateMode)
40+
.build();
41+
}
42+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.dtstack.flink.sql.sink.ocean.table;
2+
3+
import com.dtstack.flink.sql.sink.rdb.table.RdbSinkParser;
4+
import com.dtstack.flink.sql.table.AbstractTableInfo;
5+
6+
import java.util.Map;
7+
8+
/**
9+
* @author : tiezhu
10+
* @date : 2020/3/24
11+
*/
12+
public class OceanbaseSinkParser extends RdbSinkParser {
13+
14+
private static final String CURRENT_TYPE = "oceanbase";
15+
16+
@Override
17+
public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
18+
19+
AbstractTableInfo oceanbaseTableInfo = super.getTableInfo(tableName, fieldsInfo, props);
20+
21+
oceanbaseTableInfo.setType(CURRENT_TYPE);
22+
23+
return oceanbaseTableInfo;
24+
}
25+
}

oceanbase/pom.xml

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>flink.sql</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>sql.oceanbase</artifactId>
13+
<packaging>pom</packaging>
14+
<modules>
15+
<module>oceanbase-sink</module>
16+
</modules>
17+
18+
<properties>
19+
<mysql.connector.version>5.1.40</mysql.connector.version>
20+
<sql.core.version>1.0-SNAPSHOT</sql.core.version>
21+
</properties>
22+
23+
<dependencies>
24+
<dependency>
25+
<groupId>com.dtstack.flink</groupId>
26+
<artifactId>sql.core</artifactId>
27+
<version>${sql.core.version}</version>
28+
<scope>provided</scope>
29+
</dependency>
30+
31+
<dependency>
32+
<groupId>mysql</groupId>
33+
<artifactId>mysql-connector-java</artifactId>
34+
<version>${mysql.connector.version}</version>
35+
</dependency>
36+
</dependencies>
37+
38+
39+
</project>

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
<module>impala</module>
3535
<module>db2</module>
3636
<module>polardb</module>
37+
<module>oceanbase</module>
3738

3839
</modules>
3940

0 commit comments

Comments
 (0)