Skip to content

Commit 8295b7e

Browse files
author
dapeng
committed
Merge remote-tracking branch 'origin/hotfix_1.8_3.10.x_25344' into 1.8_zy_3.10.x
2 parents cc228de + 1a361e1 commit 8295b7e

File tree

8 files changed

+129
-5
lines changed

8 files changed

+129
-5
lines changed

core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,14 +261,43 @@ public static String firstUpperCase(String str) {
261261
}
262262

263263
public static String getTableFullPath(String schema, String tableName) {
264+
String[] tableInfoSplit = StringUtils.split(tableName, ".");
265+
//表明表信息带了schema
266+
if(tableInfoSplit.length == 2){
267+
schema = tableInfoSplit[0];
268+
tableName = tableInfoSplit[1];
269+
}
270+
271+
//清理首个字符" 和最后字符 "
272+
schema = rmStrQuote(schema);
273+
tableName = rmStrQuote(tableName);
274+
264275
if (StringUtils.isEmpty(schema)){
265276
return addQuoteForStr(tableName);
266277
}
278+
267279
String schemaAndTabName = addQuoteForStr(schema) + "." + addQuoteForStr(tableName);
268280
return schemaAndTabName;
269281
}
270282

283+
/**
284+
* 清理首个字符" 和最后字符 "
285+
*/
286+
public static String rmStrQuote(String str){
287+
if(StringUtils.isEmpty(str)){
288+
return str;
289+
}
290+
291+
if(str.startsWith("\"")){
292+
str = str.substring(1);
293+
}
294+
295+
if(str.endsWith("\"")){
296+
str = str.substring(0, str.length()-1);
297+
}
271298

299+
return str;
300+
}
272301

273302
public static String addQuoteForStr(String column) {
274303
return getStartQuote() + column + getEndQuote();

db2/db2-side/db2-all-side/src/main/java/com/dtstack/flink/sql/side/db2/Db2AllSideInfo.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import com.dtstack.flink.sql.side.JoinInfo;
2323
import com.dtstack.flink.sql.side.SideTableInfo;
2424
import com.dtstack.flink.sql.side.rdb.all.RdbAllSideInfo;
25+
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
26+
import com.dtstack.flink.sql.util.DtStringUtil;
2527
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2628

2729
import java.util.List;
@@ -38,4 +40,9 @@ public class Db2AllSideInfo extends RdbAllSideInfo {
3840
public Db2AllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
3941
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
4042
}
43+
44+
@Override
45+
public String getTableName(RdbSideTableInfo rdbSideTableInfo) {
46+
return DtStringUtil.getTableFullPath(rdbSideTableInfo.getSchema(), rdbSideTableInfo.getTableName());
47+
}
4148
}

db2/db2-side/db2-async-side/src/main/java/com/dtstack/flink/sql/side/db2/Db2AsyncReqRow.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
import io.vertx.ext.jdbc.JDBCClient;
3030
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3131
import org.apache.flink.configuration.Configuration;
32-
import org.slf4j.Logger;
33-
import org.slf4j.LoggerFactory;
3432

3533
import java.util.List;
3634

@@ -43,8 +41,7 @@
4341
*/
4442

4543
public class Db2AsyncReqRow extends RdbAsyncReqRow {
46-
47-
private static final Logger LOG = LoggerFactory.getLogger(Db2AsyncReqRow.class);
44+
private final static String DB2_PREFERRED_TEST_QUERY_SQL = "select 1 from sysibm.dual";
4845

4946
private final static String DB2_DRIVER = "com.ibm.db2.jcc.DB2Driver";
5047

@@ -63,7 +60,7 @@ public void open(Configuration parameters) throws Exception {
6360
.put("user", rdbSideTableInfo.getUserName())
6461
.put("password", rdbSideTableInfo.getPassword())
6562
.put("provider_class", DT_PROVIDER_CLASS)
66-
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
63+
.put("preferred_test_query", DB2_PREFERRED_TEST_QUERY_SQL)
6764
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
6865
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
6966

db2/db2-side/db2-async-side/src/main/java/com/dtstack/flink/sql/side/db2/Db2AsyncSideInfo.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import com.dtstack.flink.sql.side.JoinInfo;
2323
import com.dtstack.flink.sql.side.SideTableInfo;
2424
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncSideInfo;
25+
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
26+
import com.dtstack.flink.sql.util.DtStringUtil;
2527
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2628

2729
import java.util.List;
@@ -38,4 +40,10 @@ public class Db2AsyncSideInfo extends RdbAsyncSideInfo {
3840
public Db2AsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
3941
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
4042
}
43+
44+
@Override
45+
public String getTableName(RdbSideTableInfo rdbSideTableInfo) {
46+
return DtStringUtil.getTableFullPath(rdbSideTableInfo.getSchema(), rdbSideTableInfo.getTableName());
47+
}
48+
4149
}

db2/db2-sink/src/main/java/com/dtstack/flink/sql/sink/db/DbDialect.java

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,13 @@
1919
package com.dtstack.flink.sql.sink.db;
2020

2121
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
22+
import com.dtstack.flink.sql.util.DtStringUtil;
23+
import org.apache.commons.lang3.StringUtils;
2224

25+
import java.util.Arrays;
26+
import java.util.List;
2327
import java.util.Optional;
28+
import java.util.stream.Collectors;
2429

2530
/**
2631
* Date: 2020/1/19
@@ -43,4 +48,79 @@ public String quoteIdentifier(String identifier) {
4348
return identifier;
4449
}
4550

51+
@Override
52+
public Optional<String> getUpsertStatement(String schema, String tableName, String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) {
53+
tableName = DtStringUtil.getTableFullPath(schema, tableName);
54+
StringBuilder sb = new StringBuilder();
55+
sb.append("MERGE INTO " + tableName + " T1 USING "
56+
+ "(" + buildValuesStatement(fieldNames) + ") T2 ("
57+
+ buildFiledNameStatement(fieldNames) +
58+
") ON ("
59+
+ buildConnectionConditions(uniqueKeyFields) + ") ");
60+
61+
String updateSql = buildUpdateConnection(fieldNames, uniqueKeyFields, allReplace);
62+
63+
if (StringUtils.isNotEmpty(updateSql)) {
64+
sb.append(" WHEN MATCHED THEN UPDATE SET ");
65+
sb.append(updateSql);
66+
}
67+
68+
sb.append(" WHEN NOT MATCHED THEN "
69+
+ "INSERT (" + Arrays.stream(fieldNames).map(this::quoteIdentifier).collect(Collectors.joining(",")) + ") VALUES ("
70+
+ Arrays.stream(fieldNames).map(col -> "T2." + quoteIdentifier(col)).collect(Collectors.joining(",")) + ")");
71+
return Optional.of(sb.toString());
72+
}
73+
74+
/**
75+
* build T1."A"=T2."A" or T1."A"=nvl(T2."A",T1."A")
76+
* @param fieldNames
77+
* @param uniqueKeyFields
78+
* @param allReplace
79+
* @return
80+
*/
81+
private String buildUpdateConnection(String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) {
82+
List<String> uniqueKeyList = Arrays.asList(uniqueKeyFields);
83+
return Arrays.stream(fieldNames)
84+
.filter(col -> !uniqueKeyList.contains(col))
85+
.map(col -> buildConnectString(allReplace, col))
86+
.collect(Collectors.joining(","));
87+
}
88+
89+
private String buildConnectString(boolean allReplace, String col) {
90+
return allReplace ? quoteIdentifier("T1") + "." + quoteIdentifier(col) + " = " + quoteIdentifier("T2") + "." + quoteIdentifier(col) :
91+
quoteIdentifier("T1") + "." + quoteIdentifier(col) + " =NVL(" + quoteIdentifier("T2") + "." + quoteIdentifier(col) + ","
92+
+ quoteIdentifier("T1") + "." + quoteIdentifier(col) + ")";
93+
}
94+
95+
96+
private String buildConnectionConditions(String[] uniqueKeyFields) {
97+
return Arrays.stream(uniqueKeyFields).map(col -> "T1." + quoteIdentifier(col) + "=T2." + quoteIdentifier(col)).collect(Collectors.joining(","));
98+
}
99+
100+
/**
101+
* build sql part e.g: VALUES('1001','zs','sss')
102+
*
103+
* @param column destination column
104+
* @return
105+
*/
106+
public String buildValuesStatement(String[] column) {
107+
StringBuilder sb = new StringBuilder("VALUES(");
108+
String collect = Arrays.stream(column)
109+
.map(col -> " ? ")
110+
.collect(Collectors.joining(", "));
111+
112+
return sb.append(collect).append(")").toString();
113+
}
114+
115+
/**
116+
* build sql part e.g: id, name, address
117+
* @param column
118+
* @return
119+
*/
120+
public String buildFiledNameStatement(String[] column) {
121+
return Arrays.stream(column)
122+
.collect(Collectors.joining(", "));
123+
}
124+
125+
46126
}

db2/db2-sink/src/main/java/com/dtstack/flink/sql/sink/db/DbSink.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ public JDBCUpsertOutputFormat getOutputFormat() {
1919
.setDialect(jdbcDialect)
2020
.setUsername(userName)
2121
.setPassword(password)
22+
.setSchema(schema)
2223
.setTableName(tableName)
2324
.build();
2425

kafka09/kafka09-sink/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
<goal>shade</goal>
3838
</goals>
3939
<configuration>
40+
<createDependencyReducedPom>false</createDependencyReducedPom>
4041
<artifactSet>
4142
<excludes>
4243
<exclude>org.slf4j</exclude>

kafka09/kafka09-source/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
<goal>shade</goal>
3636
</goals>
3737
<configuration>
38+
<createDependencyReducedPom>false</createDependencyReducedPom>
3839
<artifactSet>
3940
<excludes>
4041
<exclude>org.slf4j</exclude>

0 commit comments

Comments
 (0)