Skip to content

Commit 409723b

Browse files
committed
modify oracle mergeinto sql
1 parent 925718e commit 409723b

File tree

1 file changed

+17
-4
lines changed
  • oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle

1 file changed

+17
-4
lines changed

oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,25 @@ private void buildInsertSql(String tableName, List<String> fields) {
7676
@Override
7777
public String buildUpdateSql(String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField) {
7878
tableName = quoteTable(tableName);
79-
return "MERGE INTO " + tableName + " T1 USING "
79+
StringBuilder sb = new StringBuilder();
80+
81+
sb.append("MERGE INTO " + tableName + " T1 USING "
8082
+ "(" + makeValues(fieldNames) + ") T2 ON ("
81-
+ updateKeySql(realIndexes) + ") WHEN MATCHED THEN UPDATE SET "
82-
+ getUpdateSql(fieldNames, fullField, "T1", "T2", keyColList(realIndexes)) + " WHEN NOT MATCHED THEN "
83+
+ updateKeySql(realIndexes) + ") ");
84+
85+
86+
String updateSql = getUpdateSql(fieldNames, fullField, "T1", "T2", keyColList(realIndexes));
87+
88+
if (StringUtils.isNotEmpty(updateSql)) {
89+
sb.append(" WHEN MATCHED THEN UPDATE SET ");
90+
sb.append(updateSql);
91+
}
92+
93+
sb.append(" WHEN NOT MATCHED THEN "
8394
+ "INSERT (" + quoteColumns(fieldNames) + ") VALUES ("
84-
+ quoteColumns(fieldNames, "T2") + ")";
95+
+ quoteColumns(fieldNames, "T2") + ")");
96+
97+
return sb.toString();
8598
}
8699

87100

0 commit comments

Comments
 (0)