Skip to content

Commit cb25fb3

Browse files
committed
rdb schema
1 parent b8ee11f commit cb25fb3

File tree

14 files changed

+98
-36
lines changed

14 files changed

+98
-36
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,7 @@ private SqlNode replaceNodeInfo(SqlNode groupNode, HashBasedTable<String, String
361361
if(groupNode.getKind() == IDENTIFIER){
362362
SqlIdentifier sqlIdentifier = (SqlIdentifier) groupNode;
363363
String mappingFieldName = mappingTable.get(sqlIdentifier.getComponent(0).getSimple(), sqlIdentifier.getComponent(1).getSimple());
364+
364365
sqlIdentifier = sqlIdentifier.setName(0, tableAlias);
365366
return sqlIdentifier.setName(1, mappingFieldName);
366367
}else if(groupNode instanceof SqlBasicCall){

core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,15 @@ public static String firstUpperCase(String str) {
253253
return str.substring(0, 1).toUpperCase() + str.substring(1);
254254
}
255255

256+
public static String getTableFullPath(String schema, String tableName) {
257+
if (StringUtils.isEmpty(schema)){
258+
return addQuoteForStr(tableName);
259+
}
260+
String schemaAndTabName = addQuoteForStr(schema) + "." + addQuoteForStr(tableName);
261+
return schemaAndTabName;
262+
}
263+
264+
256265

257266
public static String addQuoteForStr(String column) {
258267
return getStartQuote() + column + getEndQuote();

mysql/mysql-sink/src/main/java/com/dtstack/flink/sql/sink/mysql/MysqlSink.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.dtstack.flink.sql.sink.IStreamSinkGener;
2424
import com.dtstack.flink.sql.sink.rdb.RdbSink;
2525
import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat;
26+
import com.dtstack.flink.sql.util.DtStringUtil;
27+
2628
import java.util.List;
2729
import java.util.Map;
2830

@@ -46,17 +48,17 @@ public RetractJDBCOutputFormat getOutputFormat() {
4648
}
4749

4850
@Override
49-
public void buildSql(String tableName, List<String> fields) {
51+
public void buildSql(String scheam, String tableName, List<String> fields) {
5052
buildInsertSql(tableName, fields);
5153
}
5254

5355
@Override
54-
public String buildUpdateSql(String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField) {
56+
public String buildUpdateSql(String schema, String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField) {
5557
return null;
5658
}
5759

5860
private void buildInsertSql(String tableName, List<String> fields) {
59-
String sqlTmp = "replace into " + tableName + " (${fields}) values (${placeholder})";
61+
String sqlTmp = "replace into " + tableName + " (${fields}) values (${placeholder})";
6062
String fieldsStr = "";
6163
String placeholder = "";
6264

oracle/oracle-side/oracle-all-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAllSideInfo.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.dtstack.flink.sql.side.SideTableInfo;
2323
import com.dtstack.flink.sql.side.rdb.all.RdbAllSideInfo;
2424
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
25+
import com.dtstack.flink.sql.util.DtStringUtil;
2526
import org.apache.commons.lang3.StringUtils;
2627
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2728

@@ -40,15 +41,11 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
4041
sqlCondition = "select ${selectField} from ${tableName} ";
4142

4243

43-
sqlCondition = sqlCondition.replace("${tableName}", dealFiled(rdbSideTableInfo.getTableName())).replace("${selectField}", dealLowerSelectFiled(sideSelectFields));
44+
sqlCondition = sqlCondition.replace("${tableName}", DtStringUtil.getTableFullPath(rdbSideTableInfo.getSchema(), rdbSideTableInfo.getTableName())).replace("${selectField}", dealLowerSelectFiled(sideSelectFields));
4445
System.out.println("---------side_exe_sql-----\n" + sqlCondition);
4546
}
4647

4748

48-
private String dealFiled(String field) {
49-
return "\"" + field + "\"";
50-
}
51-
5249
private String dealLowerSelectFiled(String fieldsStr) {
5350
StringBuilder sb = new StringBuilder();
5451
String[] fields = fieldsStr.split(",");

oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.dtstack.flink.sql.side.SideTableInfo;
2424
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncSideInfo;
2525
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
26+
import com.dtstack.flink.sql.util.DtStringUtil;
2627
import com.dtstack.flink.sql.util.ParseUtils;
2728
import org.apache.calcite.sql.SqlKind;
2829
import org.apache.calcite.sql.SqlNode;
@@ -68,10 +69,12 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
6869
}
6970
}
7071

71-
sqlCondition = sqlCondition.replace("${tableName}", dealLowerFiled(rdbSideTableInfo.getTableName())).replace("${selectField}", dealLowerSelectFiled(sideSelectFields));
72+
sqlCondition = sqlCondition.replace("${tableName}", DtStringUtil.getTableFullPath(rdbSideTableInfo.getSchema(), rdbSideTableInfo.getTableName())).replace("${selectField}", dealLowerSelectFiled(sideSelectFields));
7273
System.out.println("---------side_exe_sql-----\n" + sqlCondition);
7374
}
7475

76+
77+
7578
private String dealLowerFiled(String field) {
7679
return "\"" + field + "\"";
7780
}

oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,14 @@ public RetractJDBCOutputFormat getOutputFormat() {
5151
}
5252

5353
@Override
54-
public void buildSql(String tableName, List<String> fields) {
55-
buildInsertSql(tableName, fields);
54+
public void buildSql(String scheam, String tableName, List<String> fields) {
55+
buildInsertSql(scheam, tableName, fields);
5656
}
5757

58-
private void buildInsertSql(String tableName, List<String> fields) {
58+
private void buildInsertSql(String scheam, String tableName, List<String> fields) {
59+
60+
tableName = DtStringUtil.getTableFullPath(scheam,tableName);
5961

60-
tableName = DtStringUtil.addQuoteForStr(tableName);
6162
String sqlTmp = "insert into " + tableName + " (${fields}) values (${placeholder})";
6263

6364
List<String> adaptFields = Lists.newArrayList();
@@ -83,8 +84,9 @@ private void buildInsertSql(String tableName, List<String> fields) {
8384
* @return
8485
*/
8586
@Override
86-
public String buildUpdateSql(String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField) {
87-
tableName = DtStringUtil.addQuoteForStr(tableName);
87+
public String buildUpdateSql(String scheam, String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField) {
88+
tableName = DtStringUtil.getTableFullPath(scheam, tableName);
89+
8890
StringBuilder sb = new StringBuilder();
8991

9092
sb.append("MERGE INTO " + tableName + " T1 USING "

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/table/RdbSideParser.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
4747
rdbTableInfo.setTableName(MathUtil.getString(props.get(RdbSideTableInfo.TABLE_NAME_KEY.toLowerCase())));
4848
rdbTableInfo.setUserName(MathUtil.getString(props.get(RdbSideTableInfo.USER_NAME_KEY.toLowerCase())));
4949
rdbTableInfo.setPassword(MathUtil.getString(props.get(RdbSideTableInfo.PASSWORD_KEY.toLowerCase())));
50+
51+
rdbTableInfo.setSchema(MathUtil.getString(props.get(RdbSideTableInfo.SCHEMA_KEY.toLowerCase())));
52+
5053
rdbTableInfo.check();
5154
return rdbTableInfo;
5255
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/table/RdbSideTableInfo.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public class RdbSideTableInfo extends SideTableInfo {
3939

4040
public static final String PASSWORD_KEY = "password";
4141

42+
public static final String SCHEMA_KEY = "schema";
43+
4244
@Override
4345
public boolean check() {
4446
Preconditions.checkNotNull(url, "rdb of URL is required");
@@ -56,6 +58,16 @@ public boolean check() {
5658

5759
private String password;
5860

61+
private String schema;
62+
63+
public String getSchema() {
64+
return schema;
65+
}
66+
67+
public void setSchema(String schema) {
68+
this.schema = schema;
69+
}
70+
5971
public String getUrl() {
6072
return url;
6173
}

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ public abstract class RdbSink implements RetractStreamTableSink<Row>, Serializab
8181

8282
private int parallelism = -1;
8383

84+
private String schema;
85+
8486
public RichSinkFunction createJdbcSinkFunc() {
8587
if (driverName == null || dbURL == null || userName == null
8688
|| password == null || sqlTypes == null || tableName == null) {
@@ -98,6 +100,7 @@ public RichSinkFunction createJdbcSinkFunc() {
98100
outputFormat.setTypesArray(sqlTypes);
99101
outputFormat.setTableName(tableName);
100102
outputFormat.setDbType(dbType);
103+
outputFormat.setSchema(schema);
101104
outputFormat.setDbSink(this);
102105

103106
outputFormat.verifyField();
@@ -142,8 +145,9 @@ public RdbSink genStreamSink(TargetTableInfo targetTableInfo) {
142145
this.registerTabName = tmpRegisterName;
143146
this.primaryKeys = rdbTableInfo.getPrimaryKeys();
144147
this.dbType = rdbTableInfo.getType();
148+
this.schema = rdbTableInfo.getSchema();
145149

146-
buildSql(tableName, fields);
150+
buildSql(schema, tableName, fields);
147151
buildSqlTypes(fieldTypeArray);
148152
return this;
149153
}
@@ -254,7 +258,7 @@ public void setDbType(String dbType) {
254258
* @param tableName
255259
* @param fields
256260
*/
257-
public abstract void buildSql(String tableName, List<String> fields);
261+
public abstract void buildSql(String schema, String tableName, List<String> fields);
258262

259263
/**
260264
* sqlserver and oracle maybe implement
@@ -264,7 +268,7 @@ public void setDbType(String dbType) {
264268
* @param realIndexes
265269
* @return
266270
*/
267-
public abstract String buildUpdateSql(String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField);
271+
public abstract String buildUpdateSql(String schema, String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField);
268272

269273
public abstract String getDriverName();
270274

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/ExtendOutputFormat.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public boolean isReplaceInsertQuery() throws SQLException {
6464
public void fillRealIndexes() throws SQLException {
6565
Map<String, List<String>> map = Maps.newHashMap();
6666

67-
ResultSet rs = getDbConn().getMetaData().getIndexInfo(null, getSchema().toUpperCase(), DtStringUtil.addQuoteForStr(getTableName()), true, false);
67+
ResultSet rs = getDbConn().getMetaData().getIndexInfo(null, getSchema(), DtStringUtil.addQuoteForStr(getTableName()), true, false);
6868

6969
while (rs.next()) {
7070
String indexName = rs.getString("INDEX_NAME");
@@ -95,7 +95,7 @@ public void fillRealIndexes() throws SQLException {
9595
*/
9696
public void fillFullColumns() throws SQLException {
9797
// table name not quote
98-
ResultSet rs = getDbConn().getMetaData().getColumns(null, getSchema().toUpperCase(), getTableName(), null);
98+
ResultSet rs = getDbConn().getMetaData().getColumns(null, getSchema(), getTableName(), null);
9999
while (rs.next()) {
100100
String columnName = rs.getString("COLUMN_NAME");
101101
if (StringUtils.isNotBlank(columnName)) {
@@ -113,7 +113,4 @@ public boolean containsIgnoreCase(List<String> l, String s) {
113113
return false;
114114
}
115115

116-
public boolean existTabname() throws SQLException {
117-
return getDbConn().getMetaData().getTables(null, getSchema().toUpperCase(), getTableName(), null).next();
118-
}
119116
}

0 commit comments

Comments
 (0)