Skip to content

Commit 4e61d02

Browse files
committed
Merge branch '1.8.0_oracle_schema' into 'v1.8.0_dev'
oracle schema See merge request !82
2 parents fd20156 + cb25fb3 commit 4e61d02

File tree

14 files changed

+115
-47
lines changed

14 files changed

+115
-47
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,7 @@ private SqlNode replaceNodeInfo(SqlNode groupNode, HashBasedTable<String, String
361361
if(groupNode.getKind() == IDENTIFIER){
362362
SqlIdentifier sqlIdentifier = (SqlIdentifier) groupNode;
363363
String mappingFieldName = mappingTable.get(sqlIdentifier.getComponent(0).getSimple(), sqlIdentifier.getComponent(1).getSimple());
364+
364365
sqlIdentifier = sqlIdentifier.setName(0, tableAlias);
365366
return sqlIdentifier.setName(1, mappingFieldName);
366367
}else if(groupNode instanceof SqlBasicCall){

core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -253,19 +253,17 @@ public static String firstUpperCase(String str) {
253253
return str.substring(0, 1).toUpperCase() + str.substring(1);
254254
}
255255

256-
public static String addQuoteForTableName(String table) {
257-
String[] parts = table.split("\\.");
258-
StringBuilder sb = new StringBuilder();
259-
for (int i = 0; i < parts.length; ++i) {
260-
if (i != 0) {
261-
sb.append(".");
262-
}
263-
sb.append(DtStringUtil.addQuoteForColumn(parts[i]));
256+
public static String getTableFullPath(String schema, String tableName) {
257+
if (StringUtils.isEmpty(schema)){
258+
return addQuoteForStr(tableName);
264259
}
265-
return sb.toString();
260+
String schemaAndTabName = addQuoteForStr(schema) + "." + addQuoteForStr(tableName);
261+
return schemaAndTabName;
266262
}
267263

268-
public static String addQuoteForColumn(String column) {
264+
265+
266+
public static String addQuoteForStr(String column) {
269267
return getStartQuote() + column + getEndQuote();
270268
}
271269

mysql/mysql-sink/src/main/java/com/dtstack/flink/sql/sink/mysql/MysqlSink.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.dtstack.flink.sql.sink.IStreamSinkGener;
2424
import com.dtstack.flink.sql.sink.rdb.RdbSink;
2525
import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat;
26+
import com.dtstack.flink.sql.util.DtStringUtil;
27+
2628
import java.util.List;
2729
import java.util.Map;
2830

@@ -46,17 +48,17 @@ public RetractJDBCOutputFormat getOutputFormat() {
4648
}
4749

4850
@Override
49-
public void buildSql(String tableName, List<String> fields) {
51+
public void buildSql(String scheam, String tableName, List<String> fields) {
5052
buildInsertSql(tableName, fields);
5153
}
5254

5355
@Override
54-
public String buildUpdateSql(String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField) {
56+
public String buildUpdateSql(String schema, String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField) {
5557
return null;
5658
}
5759

5860
private void buildInsertSql(String tableName, List<String> fields) {
59-
String sqlTmp = "replace into " + tableName + " (${fields}) values (${placeholder})";
61+
String sqlTmp = "replace into " + tableName + " (${fields}) values (${placeholder})";
6062
String fieldsStr = "";
6163
String placeholder = "";
6264

oracle/oracle-side/oracle-all-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAllSideInfo.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.dtstack.flink.sql.side.SideTableInfo;
2323
import com.dtstack.flink.sql.side.rdb.all.RdbAllSideInfo;
2424
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
25+
import com.dtstack.flink.sql.util.DtStringUtil;
2526
import org.apache.commons.lang3.StringUtils;
2627
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2728

@@ -40,15 +41,11 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
4041
sqlCondition = "select ${selectField} from ${tableName} ";
4142

4243

43-
sqlCondition = sqlCondition.replace("${tableName}", dealFiled(rdbSideTableInfo.getTableName())).replace("${selectField}", dealLowerSelectFiled(sideSelectFields));
44+
sqlCondition = sqlCondition.replace("${tableName}", DtStringUtil.getTableFullPath(rdbSideTableInfo.getSchema(), rdbSideTableInfo.getTableName())).replace("${selectField}", dealLowerSelectFiled(sideSelectFields));
4445
System.out.println("---------side_exe_sql-----\n" + sqlCondition);
4546
}
4647

4748

48-
private String dealFiled(String field) {
49-
return "\"" + field + "\"";
50-
}
51-
5249
private String dealLowerSelectFiled(String fieldsStr) {
5350
StringBuilder sb = new StringBuilder();
5451
String[] fields = fieldsStr.split(",");

oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.dtstack.flink.sql.side.SideTableInfo;
2424
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncSideInfo;
2525
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
26+
import com.dtstack.flink.sql.util.DtStringUtil;
2627
import com.dtstack.flink.sql.util.ParseUtils;
2728
import org.apache.calcite.sql.SqlKind;
2829
import org.apache.calcite.sql.SqlNode;
@@ -68,10 +69,12 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
6869
}
6970
}
7071

71-
sqlCondition = sqlCondition.replace("${tableName}", dealLowerFiled(rdbSideTableInfo.getTableName())).replace("${selectField}", dealLowerSelectFiled(sideSelectFields));
72+
sqlCondition = sqlCondition.replace("${tableName}", DtStringUtil.getTableFullPath(rdbSideTableInfo.getSchema(), rdbSideTableInfo.getTableName())).replace("${selectField}", dealLowerSelectFiled(sideSelectFields));
7273
System.out.println("---------side_exe_sql-----\n" + sqlCondition);
7374
}
7475

76+
77+
7578
private String dealLowerFiled(String field) {
7679
return "\"" + field + "\"";
7780
}

oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -51,17 +51,18 @@ public RetractJDBCOutputFormat getOutputFormat() {
5151
}
5252

5353
@Override
54-
public void buildSql(String tableName, List<String> fields) {
55-
buildInsertSql(tableName, fields);
54+
public void buildSql(String scheam, String tableName, List<String> fields) {
55+
buildInsertSql(scheam, tableName, fields);
5656
}
5757

58-
private void buildInsertSql(String tableName, List<String> fields) {
58+
private void buildInsertSql(String scheam, String tableName, List<String> fields) {
59+
60+
tableName = DtStringUtil.getTableFullPath(scheam,tableName);
5961

60-
tableName = DtStringUtil.addQuoteForTableName(tableName);
6162
String sqlTmp = "insert into " + tableName + " (${fields}) values (${placeholder})";
6263

6364
List<String> adaptFields = Lists.newArrayList();
64-
fields.forEach(field -> adaptFields.add(DtStringUtil.addQuoteForColumn(field)));
65+
fields.forEach(field -> adaptFields.add(DtStringUtil.addQuoteForStr(field)));
6566

6667
String fieldsStr = StringUtils.join(adaptFields, ",");
6768
String placeholder = "";
@@ -83,8 +84,9 @@ private void buildInsertSql(String tableName, List<String> fields) {
8384
* @return
8485
*/
8586
@Override
86-
public String buildUpdateSql(String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField) {
87-
tableName = DtStringUtil.addQuoteForTableName(tableName);
87+
public String buildUpdateSql(String scheam, String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField) {
88+
tableName = DtStringUtil.getTableFullPath(scheam, tableName);
89+
8890
StringBuilder sb = new StringBuilder();
8991

9092
sb.append("MERGE INTO " + tableName + " T1 USING "
@@ -112,10 +114,10 @@ public String quoteColumns(List<String> column) {
112114
}
113115

114116
public String quoteColumns(List<String> column, String table) {
115-
String prefix = StringUtils.isBlank(table) ? "" : DtStringUtil.addQuoteForTableName(table) + ".";
117+
String prefix = StringUtils.isBlank(table) ? "" : DtStringUtil.addQuoteForStr(table) + ".";
116118
List<String> list = new ArrayList<>();
117119
for (String col : column) {
118-
list.add(prefix + DtStringUtil.addQuoteForColumn(col));
120+
list.add(prefix + DtStringUtil.addQuoteForStr(col));
119121
}
120122
return StringUtils.join(list, ",");
121123
}
@@ -148,18 +150,18 @@ protected List<String> keyColList(Map<String, List<String>> realIndexes) {
148150
* @return
149151
*/
150152
public String getUpdateSql(List<String> updateColumn, List<String> fullColumn, String leftTable, String rightTable, List<String> indexCols) {
151-
String prefixLeft = StringUtils.isBlank(leftTable) ? "" : DtStringUtil.addQuoteForTableName(leftTable) + ".";
152-
String prefixRight = StringUtils.isBlank(rightTable) ? "" : DtStringUtil.addQuoteForTableName(rightTable) + ".";
153+
String prefixLeft = StringUtils.isBlank(leftTable) ? "" : DtStringUtil.addQuoteForStr(leftTable) + ".";
154+
String prefixRight = StringUtils.isBlank(rightTable) ? "" : DtStringUtil.addQuoteForStr(rightTable) + ".";
153155
List<String> list = new ArrayList<>();
154156
for (String col : fullColumn) {
155157
// filter index column
156158
if (indexCols == null || indexCols.size() == 0 || containsIgnoreCase(indexCols,col)) {
157159
continue;
158160
}
159161
if (containsIgnoreCase(updateColumn,col)) {
160-
list.add(prefixLeft + DtStringUtil.addQuoteForColumn(col) + "=" + prefixRight + DtStringUtil.addQuoteForColumn(col));
162+
list.add(prefixLeft + DtStringUtil.addQuoteForStr(col) + "=" + prefixRight + DtStringUtil.addQuoteForStr(col));
161163
} else {
162-
list.add(prefixLeft + DtStringUtil.addQuoteForColumn(col) + "=null");
164+
list.add(prefixLeft + DtStringUtil.addQuoteForStr(col) + "=null");
163165
}
164166
}
165167
return StringUtils.join(list, ",");
@@ -176,7 +178,7 @@ public String updateKeySql(Map<String, List<String>> updateKey) {
176178
for (Map.Entry<String, List<String>> entry : updateKey.entrySet()) {
177179
List<String> colList = new ArrayList<>();
178180
for (String col : entry.getValue()) {
179-
colList.add("T1." + DtStringUtil.addQuoteForColumn(col) + "=T2." + DtStringUtil.addQuoteForColumn(col));
181+
colList.add("T1." + DtStringUtil.addQuoteForStr(col) + "=T2." + DtStringUtil.addQuoteForStr(col));
180182
}
181183
exprList.add(StringUtils.join(colList, " AND "));
182184
}
@@ -195,7 +197,7 @@ public String makeValues(List<String> column) {
195197
if (i != 0) {
196198
sb.append(",");
197199
}
198-
sb.append("? " + DtStringUtil.addQuoteForColumn(column.get(i)));
200+
sb.append("? " + DtStringUtil.addQuoteForStr(column.get(i)));
199201
}
200202
sb.append(" FROM DUAL");
201203
return sb.toString();

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/table/RdbSideParser.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
4747
rdbTableInfo.setTableName(MathUtil.getString(props.get(RdbSideTableInfo.TABLE_NAME_KEY.toLowerCase())));
4848
rdbTableInfo.setUserName(MathUtil.getString(props.get(RdbSideTableInfo.USER_NAME_KEY.toLowerCase())));
4949
rdbTableInfo.setPassword(MathUtil.getString(props.get(RdbSideTableInfo.PASSWORD_KEY.toLowerCase())));
50+
51+
rdbTableInfo.setSchema(MathUtil.getString(props.get(RdbSideTableInfo.SCHEMA_KEY.toLowerCase())));
52+
5053
rdbTableInfo.check();
5154
return rdbTableInfo;
5255
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/table/RdbSideTableInfo.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public class RdbSideTableInfo extends SideTableInfo {
3939

4040
public static final String PASSWORD_KEY = "password";
4141

42+
public static final String SCHEMA_KEY = "schema";
43+
4244
@Override
4345
public boolean check() {
4446
Preconditions.checkNotNull(url, "rdb of URL is required");
@@ -56,6 +58,16 @@ public boolean check() {
5658

5759
private String password;
5860

61+
private String schema;
62+
63+
public String getSchema() {
64+
return schema;
65+
}
66+
67+
public void setSchema(String schema) {
68+
this.schema = schema;
69+
}
70+
5971
public String getUrl() {
6072
return url;
6173
}

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ public abstract class RdbSink implements RetractStreamTableSink<Row>, Serializab
8181

8282
private int parallelism = -1;
8383

84+
private String schema;
85+
8486
public RichSinkFunction createJdbcSinkFunc() {
8587
if (driverName == null || dbURL == null || userName == null
8688
|| password == null || sqlTypes == null || tableName == null) {
@@ -98,6 +100,7 @@ public RichSinkFunction createJdbcSinkFunc() {
98100
outputFormat.setTypesArray(sqlTypes);
99101
outputFormat.setTableName(tableName);
100102
outputFormat.setDbType(dbType);
103+
outputFormat.setSchema(schema);
101104
outputFormat.setDbSink(this);
102105

103106
outputFormat.verifyField();
@@ -142,8 +145,9 @@ public RdbSink genStreamSink(TargetTableInfo targetTableInfo) {
142145
this.registerTabName = tmpRegisterName;
143146
this.primaryKeys = rdbTableInfo.getPrimaryKeys();
144147
this.dbType = rdbTableInfo.getType();
148+
this.schema = rdbTableInfo.getSchema();
145149

146-
buildSql(tableName, fields);
150+
buildSql(schema, tableName, fields);
147151
buildSqlTypes(fieldTypeArray);
148152
return this;
149153
}
@@ -254,7 +258,7 @@ public void setDbType(String dbType) {
254258
* @param tableName
255259
* @param fields
256260
*/
257-
public abstract void buildSql(String tableName, List<String> fields);
261+
public abstract void buildSql(String schema, String tableName, List<String> fields);
258262

259263
/**
260264
* sqlserver and oracle maybe implement
@@ -264,7 +268,7 @@ public void setDbType(String dbType) {
264268
* @param realIndexes
265269
* @return
266270
*/
267-
public abstract String buildUpdateSql(String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField);
271+
public abstract String buildUpdateSql(String schema, String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField);
268272

269273
public abstract String getDriverName();
270274

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/ExtendOutputFormat.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ public boolean isReplaceInsertQuery() throws SQLException {
6363
*/
6464
public void fillRealIndexes() throws SQLException {
6565
Map<String, List<String>> map = Maps.newHashMap();
66-
ResultSet rs = getDbConn().getMetaData().getIndexInfo(null, null, DtStringUtil.addQuoteForTableName(getTableName()), true, false);
66+
67+
ResultSet rs = getDbConn().getMetaData().getIndexInfo(null, getSchema(), DtStringUtil.addQuoteForStr(getTableName()), true, false);
6768

6869
while (rs.next()) {
6970
String indexName = rs.getString("INDEX_NAME");
@@ -94,7 +95,7 @@ public void fillRealIndexes() throws SQLException {
9495
*/
9596
public void fillFullColumns() throws SQLException {
9697
// table name not quote
97-
ResultSet rs = getDbConn().getMetaData().getColumns(null, null, getTableName(), null);
98+
ResultSet rs = getDbConn().getMetaData().getColumns(null, getSchema(), getTableName(), null);
9899
while (rs.next()) {
99100
String columnName = rs.getString("COLUMN_NAME");
100101
if (StringUtils.isNotBlank(columnName)) {
@@ -111,4 +112,5 @@ public boolean containsIgnoreCase(List<String> l, String s) {
111112
}
112113
return false;
113114
}
115+
114116
}

0 commit comments

Comments
 (0)