Skip to content

Commit 2b71680

Browse files
committed
[fix-30263] fix oceanbase sink error
1 parent 56df6ec commit 2b71680

File tree

3 files changed

+16
-13
lines changed

3 files changed

+16
-13
lines changed

oceanbase/oceanbase-sink/src/main/java/com/dtstack/flink/sql/sink/ocean/OceanbaseDialect.java renamed to oceanbase/oceanbase-sink/src/main/java/com/dtstack/flink/sql/sink/oceanbase/OceanbaseDialect.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,27 +50,30 @@ public Optional<String> getUpsertStatement(String schema,
5050
String[] uniqueKeyFields,
5151
boolean allReplace) {
5252
return allReplace ?
53-
buildReplaceIntoStatement(tableName, fieldNames) :
54-
buildDuplicateUpsertStatement(tableName, fieldNames);
53+
buildReplaceIntoStatement(tableName, fieldNames) : buildDuplicateUpsertStatement(tableName, fieldNames);
5554
}
5655

57-
private Optional<String> buildDuplicateUpsertStatement(String tableName, String[] fieldsName) {
58-
String updateClause = Arrays.stream(fieldsName).map(f -> quoteIdentifier(f)
59-
+ "IFNULL(VALUES(" + quoteIdentifier(f) + ")," + quoteIdentifier(f) + ")")
60-
.collect(Collectors.joining(","));
61-
return Optional.of(getInsertIntoStatement("", tableName, fieldsName, null) +
56+
public Optional<String> buildDuplicateUpsertStatement(String tableName, String[] fieldNames) {
57+
String updateClause = Arrays
58+
.stream(fieldNames)
59+
.map(f -> quoteIdentifier(f) + "=IFNULL(VALUES(" + quoteIdentifier(f) + ")," + quoteIdentifier(f) + ")")
60+
.collect(Collectors.joining(", "));
61+
62+
return Optional.of(getInsertIntoStatement("", tableName, fieldNames, null) +
6263
" ON DUPLICATE KEY UPDATE " + updateClause
6364
);
6465
}
6566

66-
private Optional<String> buildReplaceIntoStatement(String tableName, String[] fieldsNames) {
67-
String columns = Arrays.stream(fieldsNames)
67+
public Optional<String> buildReplaceIntoStatement(String tableName, String[] fieldNames) {
68+
String columns = Arrays
69+
.stream(fieldNames)
6870
.map(this::quoteIdentifier)
69-
.collect(Collectors.joining(","));
70-
String placeholders = Arrays.stream(fieldsNames)
71+
.collect(Collectors.joining(", "));
72+
String placeholders = Arrays
73+
.stream(fieldNames)
7174
.map(f -> "?")
72-
.collect(Collectors.joining(","));
75+
.collect(Collectors.joining(", "));
7376
return Optional.of("REPLACE INTO " + quoteIdentifier(tableName) +
74-
"(" + columns + ") VALUES (" + placeholders + ")");
77+
"(" + columns + ")" + " VALUES (" + placeholders + ")");
7578
}
7679
}

oceanbase/oceanbase-sink/src/main/java/com/dtstack/flink/sql/sink/ocean/OceanbaseSink.java renamed to oceanbase/oceanbase-sink/src/main/java/com/dtstack/flink/sql/sink/oceanbase/OceanbaseSink.java

File renamed without changes.

oceanbase/oceanbase-sink/src/main/java/com/dtstack/flink/sql/sink/ocean/table/OceanbaseSinkParser.java renamed to oceanbase/oceanbase-sink/src/main/java/com/dtstack/flink/sql/sink/oceanbase/table/OceanbaseSinkParser.java

File renamed without changes.

0 commit comments

Comments
 (0)