Skip to content

Commit 7f96408

Browse files
committed
[feat] add kingbase sink
[feat] add kingbase sink [feat] add kingbase sink
1 parent 52b4665 commit 7f96408

File tree

3 files changed

+84
-4
lines changed

3 files changed

+84
-4
lines changed

kingbase/kingbase-sink/pom.xml

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111

12-
<artifactId>kingbase-sink</artifactId>
12+
<artifactId>sql.sink.kingbase</artifactId>
1313
<properties>
1414
<sql.sink.rdb.version>1.0-SNAPSHOT</sql.sink.rdb.version>
1515
</properties>
@@ -21,4 +21,66 @@
2121
<version>${sql.sink.rdb.version}</version>
2222
</dependency>
2323
</dependencies>
24+
<build>
25+
<plugins>
26+
<plugin>
27+
<groupId>org.apache.maven.plugins</groupId>
28+
<artifactId>maven-shade-plugin</artifactId>
29+
<version>3.2.1</version>
30+
<executions>
31+
<execution>
32+
<phase>package</phase>
33+
<goals>
34+
<goal>shade</goal>
35+
</goals>
36+
<configuration>
37+
<createDependencyReducedPom>false</createDependencyReducedPom>
38+
<artifactSet>
39+
<excludes>
40+
41+
</excludes>
42+
</artifactSet>
43+
<filters>
44+
<filter>
45+
<artifact>*:*</artifact>
46+
<excludes>
47+
<exclude>META-INF/*.SF</exclude>
48+
<exclude>META-INF/*.DSA</exclude>
49+
<exclude>META-INF/*.RSA</exclude>
50+
</excludes>
51+
</filter>
52+
</filters>
53+
</configuration>
54+
</execution>
55+
</executions>
56+
</plugin>
57+
58+
<plugin>
59+
<artifactId>maven-antrun-plugin</artifactId>
60+
<version>1.2</version>
61+
<executions>
62+
<execution>
63+
<id>copy-resources</id>
64+
<!-- here the phase you need -->
65+
<phase>package</phase>
66+
<goals>
67+
<goal>run</goal>
68+
</goals>
69+
<configuration>
70+
<tasks>
71+
<copy todir="${basedir}/../../sqlplugins/kingbasesink">
72+
<fileset dir="target/">
73+
<include name="${project.artifactId}-${project.version}.jar"/>
74+
</fileset>
75+
</copy>
76+
77+
<move file="${basedir}/../../sqlplugins/kingbasesink/${project.artifactId}-${project.version}.jar"
78+
tofile="${basedir}/../../sqlplugins/kingbasesink/${project.name}-${git.branch}.jar"/>
79+
</tasks>
80+
</configuration>
81+
</execution>
82+
</executions>
83+
</plugin>
84+
</plugins>
85+
</build>
2486
</project>

kingbase/kingbase-sink/src/main/java/com/dtstack/flink/sql/sink/kingbase/KingbaseDialect.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,15 @@ public Optional<String> buildDuplicateUpsertStatement(String tableName,
6161
String[] fieldNames,
6262
String[] uniqueKeyFields) {
6363
String updateClause = Arrays.stream(fieldNames)
64-
.map(f -> quoteIdentifier(f) + "=IFNULL(VALUES(" + quoteIdentifier(f) + ")," + quoteIdentifier(f) + ")")
64+
.map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
65+
.collect(Collectors.joining(", "));
66+
67+
String uniqueColumns = Arrays.stream(uniqueKeyFields)
68+
.map(this::quoteIdentifier)
6569
.collect(Collectors.joining(", "));
6670

6771
return Optional.of(getInsertIntoStatement("", tableName, fieldNames, null) +
68-
" ON CONFLICT (" + Arrays.toString(uniqueKeyFields) + ") DO UPDATE SET " + updateClause
72+
" ON CONFLICT (" + uniqueColumns + ") DO UPDATE SET " + updateClause
6973
);
7074
}
7175
}

kingbase/kingbase-sink/src/main/java/com/dtstack/flink/sql/sink/kingbase/table/KingbaseSinkParser.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,24 @@
1818

1919
package com.dtstack.flink.sql.sink.kingbase.table;
2020

21+
import com.dtstack.flink.sql.sink.rdb.table.RdbSinkParser;
22+
import com.dtstack.flink.sql.table.AbstractTableInfo;
23+
24+
import java.util.Map;
25+
2126
/**
2227
* Date: 2020/09/10
2328
* Company: www.dtstack.com
29+
*
2430
* @author tiezhu
2531
*/
26-
public class KingbaseSinkParser {
32+
public class KingbaseSinkParser extends RdbSinkParser {
33+
private static final String CURRENT_TYPE = "kingbase";
34+
35+
@Override
36+
public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
37+
AbstractTableInfo kingbaseTableInfo = super.getTableInfo(tableName, fieldsInfo, props);
38+
kingbaseTableInfo.setType(CURRENT_TYPE);
39+
return kingbaseTableInfo;
40+
}
2741
}

0 commit comments

Comments
 (0)