Skip to content

Commit b8ee11f

Browse files
committed
oracle schema
1 parent fd20156 commit b8ee11f

File tree

4 files changed

+32
-26
lines changed

4 files changed

+32
-26
lines changed

core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -253,19 +253,8 @@ public static String firstUpperCase(String str) {
253253
return str.substring(0, 1).toUpperCase() + str.substring(1);
254254
}
255255

256-
public static String addQuoteForTableName(String table) {
257-
String[] parts = table.split("\\.");
258-
StringBuilder sb = new StringBuilder();
259-
for (int i = 0; i < parts.length; ++i) {
260-
if (i != 0) {
261-
sb.append(".");
262-
}
263-
sb.append(DtStringUtil.addQuoteForColumn(parts[i]));
264-
}
265-
return sb.toString();
266-
}
267256

268-
public static String addQuoteForColumn(String column) {
257+
public static String addQuoteForStr(String column) {
269258
return getStartQuote() + column + getEndQuote();
270259
}
271260

oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,11 @@ public void buildSql(String tableName, List<String> fields) {
5757

5858
private void buildInsertSql(String tableName, List<String> fields) {
5959

60-
tableName = DtStringUtil.addQuoteForTableName(tableName);
60+
tableName = DtStringUtil.addQuoteForStr(tableName);
6161
String sqlTmp = "insert into " + tableName + " (${fields}) values (${placeholder})";
6262

6363
List<String> adaptFields = Lists.newArrayList();
64-
fields.forEach(field -> adaptFields.add(DtStringUtil.addQuoteForColumn(field)));
64+
fields.forEach(field -> adaptFields.add(DtStringUtil.addQuoteForStr(field)));
6565

6666
String fieldsStr = StringUtils.join(adaptFields, ",");
6767
String placeholder = "";
@@ -84,7 +84,7 @@ private void buildInsertSql(String tableName, List<String> fields) {
8484
*/
8585
@Override
8686
public String buildUpdateSql(String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField) {
87-
tableName = DtStringUtil.addQuoteForTableName(tableName);
87+
tableName = DtStringUtil.addQuoteForStr(tableName);
8888
StringBuilder sb = new StringBuilder();
8989

9090
sb.append("MERGE INTO " + tableName + " T1 USING "
@@ -112,10 +112,10 @@ public String quoteColumns(List<String> column) {
112112
}
113113

114114
public String quoteColumns(List<String> column, String table) {
115-
String prefix = StringUtils.isBlank(table) ? "" : DtStringUtil.addQuoteForTableName(table) + ".";
115+
String prefix = StringUtils.isBlank(table) ? "" : DtStringUtil.addQuoteForStr(table) + ".";
116116
List<String> list = new ArrayList<>();
117117
for (String col : column) {
118-
list.add(prefix + DtStringUtil.addQuoteForColumn(col));
118+
list.add(prefix + DtStringUtil.addQuoteForStr(col));
119119
}
120120
return StringUtils.join(list, ",");
121121
}
@@ -148,18 +148,18 @@ protected List<String> keyColList(Map<String, List<String>> realIndexes) {
148148
* @return
149149
*/
150150
public String getUpdateSql(List<String> updateColumn, List<String> fullColumn, String leftTable, String rightTable, List<String> indexCols) {
151-
String prefixLeft = StringUtils.isBlank(leftTable) ? "" : DtStringUtil.addQuoteForTableName(leftTable) + ".";
152-
String prefixRight = StringUtils.isBlank(rightTable) ? "" : DtStringUtil.addQuoteForTableName(rightTable) + ".";
151+
String prefixLeft = StringUtils.isBlank(leftTable) ? "" : DtStringUtil.addQuoteForStr(leftTable) + ".";
152+
String prefixRight = StringUtils.isBlank(rightTable) ? "" : DtStringUtil.addQuoteForStr(rightTable) + ".";
153153
List<String> list = new ArrayList<>();
154154
for (String col : fullColumn) {
155155
// filter index column
156156
if (indexCols == null || indexCols.size() == 0 || containsIgnoreCase(indexCols,col)) {
157157
continue;
158158
}
159159
if (containsIgnoreCase(updateColumn,col)) {
160-
list.add(prefixLeft + DtStringUtil.addQuoteForColumn(col) + "=" + prefixRight + DtStringUtil.addQuoteForColumn(col));
160+
list.add(prefixLeft + DtStringUtil.addQuoteForStr(col) + "=" + prefixRight + DtStringUtil.addQuoteForStr(col));
161161
} else {
162-
list.add(prefixLeft + DtStringUtil.addQuoteForColumn(col) + "=null");
162+
list.add(prefixLeft + DtStringUtil.addQuoteForStr(col) + "=null");
163163
}
164164
}
165165
return StringUtils.join(list, ",");
@@ -176,7 +176,7 @@ public String updateKeySql(Map<String, List<String>> updateKey) {
176176
for (Map.Entry<String, List<String>> entry : updateKey.entrySet()) {
177177
List<String> colList = new ArrayList<>();
178178
for (String col : entry.getValue()) {
179-
colList.add("T1." + DtStringUtil.addQuoteForColumn(col) + "=T2." + DtStringUtil.addQuoteForColumn(col));
179+
colList.add("T1." + DtStringUtil.addQuoteForStr(col) + "=T2." + DtStringUtil.addQuoteForStr(col));
180180
}
181181
exprList.add(StringUtils.join(colList, " AND "));
182182
}
@@ -195,7 +195,7 @@ public String makeValues(List<String> column) {
195195
if (i != 0) {
196196
sb.append(",");
197197
}
198-
sb.append("? " + DtStringUtil.addQuoteForColumn(column.get(i)));
198+
sb.append("? " + DtStringUtil.addQuoteForStr(column.get(i)));
199199
}
200200
sb.append(" FROM DUAL");
201201
return sb.toString();

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/ExtendOutputFormat.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ public boolean isReplaceInsertQuery() throws SQLException {
6363
*/
6464
public void fillRealIndexes() throws SQLException {
6565
Map<String, List<String>> map = Maps.newHashMap();
66-
ResultSet rs = getDbConn().getMetaData().getIndexInfo(null, null, DtStringUtil.addQuoteForTableName(getTableName()), true, false);
66+
67+
ResultSet rs = getDbConn().getMetaData().getIndexInfo(null, getSchema().toUpperCase(), DtStringUtil.addQuoteForStr(getTableName()), true, false);
6768

6869
while (rs.next()) {
6970
String indexName = rs.getString("INDEX_NAME");
@@ -94,7 +95,7 @@ public void fillRealIndexes() throws SQLException {
9495
*/
9596
public void fillFullColumns() throws SQLException {
9697
// table name not quote
97-
ResultSet rs = getDbConn().getMetaData().getColumns(null, null, getTableName(), null);
98+
ResultSet rs = getDbConn().getMetaData().getColumns(null, getSchema().toUpperCase(), getTableName(), null);
9899
while (rs.next()) {
99100
String columnName = rs.getString("COLUMN_NAME");
100101
if (StringUtils.isNotBlank(columnName)) {
@@ -111,4 +112,8 @@ public boolean containsIgnoreCase(List<String> l, String s) {
111112
}
112113
return false;
113114
}
115+
116+
public boolean existTabname() throws SQLException {
117+
return getDbConn().getMetaData().getTables(null, getSchema().toUpperCase(), getTableName(), null).next();
118+
}
114119
}

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
106106
dbConn = establishConnection();
107107
initMetric();
108108

109-
if (dbConn.getMetaData().getTables(null, null, tableName, null).next()) {
109+
if (existTabname()) {
110110
if (isReplaceInsertQuery()) {
111111
insertQuery = dbSink.buildUpdateSql(tableName, Arrays.asList(dbSink.getFieldNames()), realIndexes, fullField);
112112
}
@@ -400,6 +400,18 @@ public void setUsername(String username) {
400400
this.username = username;
401401
}
402402

403+
/**
404+
* username as default schema
405+
* @return
406+
*/
407+
public String getSchema() {
408+
return username;
409+
}
410+
411+
public boolean existTabname() throws SQLException {
412+
return dbConn.getMetaData().getTables(null, null, tableName, null).next();
413+
}
414+
403415
public void setPassword(String password) {
404416
this.password = password;
405417
}

0 commit comments

Comments
 (0)