Skip to content

Commit 6cde615

Browse files
committed
Merge remote-tracking branch 'origin/v1.5.0_dev' into v1.8.0_dev
2 parents 747cebd + 37c9487 commit 6cde615

File tree

4 files changed

+74
-20
lines changed

4 files changed

+74
-20
lines changed

oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java

Lines changed: 56 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -73,15 +73,36 @@ private void buildInsertSql(String tableName, List<String> fields) {
7373
this.sql = sqlTmp;
7474
}
7575

76+
/**
77+
* use MERGE INTO build oracle replace into sql
78+
* @param tableName
79+
* @param fieldNames create table contained column columns
80+
* @param realIndexes <key: indexName, value: index contains columns >
81+
* @param fullField real columns , query from db
82+
* @return
83+
*/
7684
@Override
7785
public String buildUpdateSql(String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField) {
7886
tableName = quoteTable(tableName);
79-
return "MERGE INTO " + tableName + " T1 USING "
87+
StringBuilder sb = new StringBuilder();
88+
89+
sb.append("MERGE INTO " + tableName + " T1 USING "
8090
+ "(" + makeValues(fieldNames) + ") T2 ON ("
81-
+ updateKeySql(realIndexes) + ") WHEN MATCHED THEN UPDATE SET "
82-
+ getUpdateSql(fieldNames, fullField, "T1", "T2", keyColList(realIndexes)) + " WHEN NOT MATCHED THEN "
91+
+ updateKeySql(realIndexes) + ") ");
92+
93+
94+
String updateSql = getUpdateSql(fieldNames, fullField, "T1", "T2", keyColList(realIndexes));
95+
96+
if (StringUtils.isNotEmpty(updateSql)) {
97+
sb.append(" WHEN MATCHED THEN UPDATE SET ");
98+
sb.append(updateSql);
99+
}
100+
101+
sb.append(" WHEN NOT MATCHED THEN "
83102
+ "INSERT (" + quoteColumns(fieldNames) + ") VALUES ("
84-
+ quoteColumns(fieldNames, "T2") + ")";
103+
+ quoteColumns(fieldNames, "T2") + ")");
104+
105+
return sb.toString();
85106
}
86107

87108

@@ -98,9 +119,14 @@ public String quoteColumns(List<String> column, String table) {
98119
return StringUtils.join(list, ",");
99120
}
100121

101-
protected List<String> keyColList(Map<String, List<String>> updateKey) {
122+
/**
123+
* extract all distinct index column
124+
* @param realIndexes
125+
* @return
126+
*/
127+
protected List<String> keyColList(Map<String, List<String>> realIndexes) {
102128
List<String> keyCols = new ArrayList<>();
103-
for (Map.Entry<String, List<String>> entry : updateKey.entrySet()) {
129+
for (Map.Entry<String, List<String>> entry : realIndexes.entrySet()) {
104130
List<String> list = entry.getValue();
105131
for (String col : list) {
106132
if (!containsIgnoreCase(keyCols,col)) {
@@ -111,15 +137,25 @@ protected List<String> keyColList(Map<String, List<String>> updateKey) {
111137
return keyCols;
112138
}
113139

114-
public String getUpdateSql(List<String> column, List<String> fullColumn, String leftTable, String rightTable, List<String> keyCols) {
140+
/**
141+
* build update sql , such as UPDATE SET "T1".A="T2".A
142+
* @param updateColumn create table contained column columns
143+
* @param fullColumn real columns , query from db
144+
* @param leftTable alias
145+
* @param rightTable alias
146+
* @param indexCols index column
147+
* @return
148+
*/
149+
public String getUpdateSql(List<String> updateColumn, List<String> fullColumn, String leftTable, String rightTable, List<String> indexCols) {
115150
String prefixLeft = StringUtils.isBlank(leftTable) ? "" : quoteTable(leftTable) + ".";
116151
String prefixRight = StringUtils.isBlank(rightTable) ? "" : quoteTable(rightTable) + ".";
117152
List<String> list = new ArrayList<>();
118153
for (String col : fullColumn) {
119-
if (keyCols == null || keyCols.size() == 0 || containsIgnoreCase(keyCols,col)) {
154+
// filter index column
155+
if (indexCols == null || indexCols.size() == 0 || containsIgnoreCase(indexCols,col)) {
120156
continue;
121157
}
122-
if (fullColumn == null ||containsIgnoreCase(column,col)) {
158+
if (containsIgnoreCase(updateColumn,col)) {
123159
list.add(prefixLeft + col + "=" + prefixRight + col);
124160
} else {
125161
list.add(prefixLeft + col + "=null");
@@ -140,7 +176,11 @@ public String quoteTable(String table) {
140176
return sb.toString();
141177
}
142178

143-
179+
/**
180+
* build connect sql by index column, such as T1."A"=T2."A"
181+
* @param updateKey
182+
* @return
183+
*/
144184
public String updateKeySql(Map<String, List<String>> updateKey) {
145185
List<String> exprList = new ArrayList<>();
146186
for (Map.Entry<String, List<String>> entry : updateKey.entrySet()) {
@@ -153,7 +193,12 @@ public String updateKeySql(Map<String, List<String>> updateKey) {
153193
return StringUtils.join(exprList, " OR ");
154194
}
155195

156-
196+
/**
197+
* build select sql , such as (SELECT ? "A",? "B" FROM DUAL)
198+
*
199+
* @param column destination column
200+
* @return
201+
*/
157202
public String makeValues(List<String> column) {
158203
StringBuilder sb = new StringBuilder("SELECT ");
159204
for (int i = 0; i < column.size(); ++i) {

oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/table/OracleSinkParser.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ public class OracleSinkParser extends RdbSinkParser {
3434

3535
@Override
3636
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
37-
TableInfo sqlserverTableInfo = super.getTableInfo(tableName, fieldsInfo, props);
38-
sqlserverTableInfo.setType(CURR_TYPE);
39-
return sqlserverTableInfo;
37+
TableInfo oracleTableInfo = super.getTableInfo(tableName, fieldsInfo, props);
38+
oracleTableInfo.setType(CURR_TYPE);
39+
return oracleTableInfo;
4040
}
4141
}

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/ExtendOutputFormat.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,25 +66,26 @@ public void fillRealIndexes() throws SQLException {
6666

6767
while (rs.next()) {
6868
String indexName = rs.getString("INDEX_NAME");
69-
if (!map.containsKey(indexName)) {
69+
if (StringUtils.isNotBlank(indexName) && !map.containsKey(indexName)) {
7070
map.put(indexName, new ArrayList<>());
7171
}
7272
String column_name = rs.getString("COLUMN_NAME");
7373
if (StringUtils.isNotBlank(column_name)) {
74-
column_name = column_name.toUpperCase();
74+
map.get(indexName).add(column_name);
7575
}
76-
map.get(indexName).add(column_name);
7776
}
7877

7978
for (Map.Entry<String, List<String>> entry : map.entrySet()) {
8079
String k = entry.getKey();
8180
List<String> v = entry.getValue();
8281
if (v != null && v.size() != 0 && v.get(0) != null) {
83-
getRealIndexes().put(k, v);
82+
realIndexesAdd(k, v);
8483
}
8584
}
8685
}
8786

87+
88+
8889
/**
8990
* get db all column name
9091
*
@@ -95,7 +96,7 @@ public void fillFullColumns() throws SQLException {
9596
while (rs.next()) {
9697
String columnName = rs.getString("COLUMN_NAME");
9798
if (StringUtils.isNotBlank(columnName)) {
98-
getFullField().add(columnName.toUpperCase());
99+
fullFieldAdd(columnName);
99100
}
100101
}
101102
}

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,11 @@ public String getTableName() {
452452
return tableName;
453453
}
454454

455-
public Map<String, List<String>> getRealIndexes() {
455+
public void realIndexesAdd(String index, List<String> fieldes) {
456+
this.realIndexes.put(index, fieldes);
457+
}
458+
459+
public Map<String,List<String>> getRealIndexes() {
456460
return realIndexes;
457461
}
458462

@@ -464,4 +468,8 @@ public void setBatchWaitInterval(long batchWaitInterval) {
464468
public List<String> getFullField() {
465469
return fullField;
466470
}
471+
472+
public void fullFieldAdd(String colName) {
473+
this.fullField.add(colName);
474+
}
467475
}

0 commit comments

Comments
 (0)