Skip to content

Commit 7044b48

Browse files
committed
oracle merge into midify
1 parent 409723b commit 7044b48

File tree

1 file changed

+39
-7
lines changed
  • oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle

1 file changed

+39
-7
lines changed

oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,14 @@ private void buildInsertSql(String tableName, List<String> fields) {
7373
this.sql = sqlTmp;
7474
}
7575

76+
/**
77+
* use MERGE INTO build oracle replace into sql
78+
* @param tableName
79+
* @param fieldNames create table contained column columns
80+
* @param realIndexes <key: indexName, value: index contains columns >
81+
* @param fullField real columns , query from db
82+
* @return
83+
*/
7684
@Override
7785
public String buildUpdateSql(String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField) {
7886
tableName = quoteTable(tableName);
@@ -111,9 +119,14 @@ public String quoteColumns(List<String> column, String table) {
111119
return StringUtils.join(list, ",");
112120
}
113121

114-
protected List<String> keyColList(Map<String, List<String>> updateKey) {
122+
/**
123+
* extract all distinct index column
124+
* @param realIndexes
125+
* @return
126+
*/
127+
protected List<String> keyColList(Map<String, List<String>> realIndexes) {
115128
List<String> keyCols = new ArrayList<>();
116-
for (Map.Entry<String, List<String>> entry : updateKey.entrySet()) {
129+
for (Map.Entry<String, List<String>> entry : realIndexes.entrySet()) {
117130
List<String> list = entry.getValue();
118131
for (String col : list) {
119132
if (!containsIgnoreCase(keyCols,col)) {
@@ -124,15 +137,25 @@ protected List<String> keyColList(Map<String, List<String>> updateKey) {
124137
return keyCols;
125138
}
126139

127-
public String getUpdateSql(List<String> column, List<String> fullColumn, String leftTable, String rightTable, List<String> keyCols) {
140+
/**
141+
* build update sql , such as UPDATE SET "T1".A="T2".A
142+
* @param updateColumn create table contained column columns
143+
* @param fullColumn real columns , query from db
144+
* @param leftTable alias
145+
* @param rightTable alias
146+
* @param indexCols index column
147+
* @return
148+
*/
149+
public String getUpdateSql(List<String> updateColumn, List<String> fullColumn, String leftTable, String rightTable, List<String> indexCols) {
128150
String prefixLeft = StringUtils.isBlank(leftTable) ? "" : quoteTable(leftTable) + ".";
129151
String prefixRight = StringUtils.isBlank(rightTable) ? "" : quoteTable(rightTable) + ".";
130152
List<String> list = new ArrayList<>();
131153
for (String col : fullColumn) {
132-
if (keyCols == null || keyCols.size() == 0 || containsIgnoreCase(keyCols,col)) {
154+
// filter index column
155+
if (indexCols == null || indexCols.size() == 0 || containsIgnoreCase(indexCols,col)) {
133156
continue;
134157
}
135-
if (fullColumn == null ||containsIgnoreCase(column,col)) {
158+
if (containsIgnoreCase(updateColumn,col)) {
136159
list.add(prefixLeft + col + "=" + prefixRight + col);
137160
} else {
138161
list.add(prefixLeft + col + "=null");
@@ -153,7 +176,11 @@ public String quoteTable(String table) {
153176
return sb.toString();
154177
}
155178

156-
179+
/**
180+
* build connect sql by index column, such as T1."A"=T2."A"
181+
* @param updateKey
182+
* @return
183+
*/
157184
public String updateKeySql(Map<String, List<String>> updateKey) {
158185
List<String> exprList = new ArrayList<>();
159186
for (Map.Entry<String, List<String>> entry : updateKey.entrySet()) {
@@ -166,7 +193,12 @@ public String updateKeySql(Map<String, List<String>> updateKey) {
166193
return StringUtils.join(exprList, " OR ");
167194
}
168195

169-
196+
/**
197+
* build select sql , such as (SELECT ? "A",? "B" FROM DUAL)
198+
*
199+
* @param column destination column
200+
* @return
201+
*/
170202
public String makeValues(List<String> column) {
171203
StringBuilder sb = new StringBuilder("SELECT ");
172204
for (int i = 0; i < column.size(); ++i) {

0 commit comments

Comments
 (0)