Skip to content

Commit 925718e

Browse files
committed
oracle sink code opt
1 parent 1764f98 commit 925718e

File tree

3 files changed

+18
-9
lines changed

3 files changed

+18
-9
lines changed

oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/table/OracleSinkParser.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ public class OracleSinkParser extends RdbSinkParser {
3434

3535
@Override
3636
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
37-
TableInfo sqlserverTableInfo = super.getTableInfo(tableName, fieldsInfo, props);
38-
sqlserverTableInfo.setType(CURR_TYPE);
39-
return sqlserverTableInfo;
37+
TableInfo oracleTableInfo = super.getTableInfo(tableName, fieldsInfo, props);
38+
oracleTableInfo.setType(CURR_TYPE);
39+
return oracleTableInfo;
4040
}
4141
}

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/ExtendOutputFormat.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,25 +66,26 @@ public void fillRealIndexes() throws SQLException {
6666

6767
while (rs.next()) {
6868
String indexName = rs.getString("INDEX_NAME");
69-
if (!map.containsKey(indexName)) {
69+
if (StringUtils.isNotBlank(indexName) && !map.containsKey(indexName)) {
7070
map.put(indexName, new ArrayList<>());
7171
}
7272
String column_name = rs.getString("COLUMN_NAME");
7373
if (StringUtils.isNotBlank(column_name)) {
74-
column_name = column_name.toUpperCase();
74+
map.get(indexName).add(column_name);
7575
}
76-
map.get(indexName).add(column_name);
7776
}
7877

7978
for (Map.Entry<String, List<String>> entry : map.entrySet()) {
8079
String k = entry.getKey();
8180
List<String> v = entry.getValue();
8281
if (v != null && v.size() != 0 && v.get(0) != null) {
83-
getRealIndexes().put(k, v);
82+
realIndexesAdd(k, v);
8483
}
8584
}
8685
}
8786

87+
88+
8889
/**
8990
* get db all column name
9091
*
@@ -95,7 +96,7 @@ public void fillFullColumns() throws SQLException {
9596
while (rs.next()) {
9697
String columnName = rs.getString("COLUMN_NAME");
9798
if (StringUtils.isNotBlank(columnName)) {
98-
getFullField().add(columnName.toUpperCase());
99+
fullFieldAdd(columnName);
99100
}
100101
}
101102
}

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,11 @@ public String getTableName() {
452452
return tableName;
453453
}
454454

455-
public Map<String, List<String>> getRealIndexes() {
455+
public void realIndexesAdd(String index, List<String> fieldes) {
456+
this.realIndexes.put(index, fieldes);
457+
}
458+
459+
public Map<String,List<String>> getRealIndexes() {
456460
return realIndexes;
457461
}
458462

@@ -464,4 +468,8 @@ public void setBatchWaitInterval(long batchWaitInterval) {
464468
public List<String> getFullField() {
465469
return fullField;
466470
}
471+
472+
public void fullFieldAdd(String colName) {
473+
this.fullField.add(colName);
474+
}
467475
}

0 commit comments

Comments
 (0)