Skip to content

Commit 34b10de

Browse files
use db specific way instead of generic way to get table schema for db specific plugin (#190) (#193)
use db specific way instead of generic way to get table schema for db specific plugin
1 parent 9ec07e4 commit 34b10de

File tree

9 files changed

+55
-41
lines changed

9 files changed

+55
-41
lines changed

cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLConnector.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,13 @@ public StructuredRecord transform(LongWritable longWritable, DBRecord dbRecord)
7272
}
7373

7474
@Override
75-
protected String getTableQuery(DBConnectorPath path) {
76-
return String.format("SELECT * FROM `%s`.`%s`", path.getDatabase(), path.getTable());
75+
protected String getTableQuery(String database, String schema, String table) {
76+
return String.format("SELECT * FROM `%s`.`%s`", database, table);
7777
}
7878

7979
@Override
80-
protected String getTableQuery(DBConnectorPath path, int limit) {
81-
return String.format("SELECT * FROM `%s`.`%s` LIMIT %d", path.getDatabase(), path.getTable(), limit);
80+
protected String getTableQuery(String database, String schema, String table, int limit) {
81+
return String.format("SELECT * FROM `%s`.`%s` LIMIT %d", database, table, limit);
8282
}
8383

8484
@Override
@@ -95,7 +95,9 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
9595
return;
9696
}
9797

98-
properties.put(CloudSQLMySQLSource.CloudSQLMySQLSourceConfig.IMPORT_QUERY, getTableQuery(path));
98+
properties.put(CloudSQLMySQLSource.CloudSQLMySQLSourceConfig.IMPORT_QUERY, getTableQuery(path.getDatabase(),
99+
path.getSchema(),
100+
path.getTable()));
99101
properties.put(CloudSQLMySQLSource.CloudSQLMySQLSourceConfig.NUM_SPLITS, "1");
100102
properties.put(ConnectionConfig.DATABASE, path.getDatabase());
101103
properties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));

cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLConnector.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,13 @@ protected SchemaReader getSchemaReader() {
7878
}
7979

8080
@Override
81-
protected String getTableQuery(DBConnectorPath path) {
82-
return String.format("SELECT * FROM \"%s\".\"%s\"", path.getSchema(), path.getTable());
81+
protected String getTableQuery(String database, String schema, String table) {
82+
return String.format("SELECT * FROM \"%s\".\"%s\"", schema, table);
8383
}
8484

8585
@Override
86-
protected String getTableQuery(DBConnectorPath path, int limit) {
87-
return String.format("SELECT * FROM \"%s\".\"%s\" LIMIT %d", path.getSchema(), path.getTable(), limit);
86+
protected String getTableQuery(String database, String schema, String table, int limit) {
87+
return String.format("SELECT * FROM \"%s\".\"%s\" LIMIT %d", schema, table, limit);
8888
}
8989

9090
@Override
@@ -101,7 +101,8 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
101101
return;
102102
}
103103

104-
properties.put(CloudSQLPostgreSQLSource.CloudSQLPostgreSQLSourceConfig.IMPORT_QUERY, getTableQuery(path));
104+
properties.put(CloudSQLPostgreSQLSource.CloudSQLPostgreSQLSourceConfig.IMPORT_QUERY,
105+
getTableQuery(path.getDatabase(), path.getSchema(), path.getTable()));
105106
properties.put(CloudSQLPostgreSQLSource.CloudSQLPostgreSQLSourceConfig.NUM_SPLITS, "1");
106107
properties.put(ConnectionConfig.DATABASE, path.getDatabase());
107108
properties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));

database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBSpecificConnector.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public InputFormatProvider getInputFormatProvider(ConnectorContext context, Samp
8787
DBConfiguration.configureDB(connectionConfigAccessor.getConfiguration(), driverClass.getName(),
8888
getConnectionString(path.getDatabase()), config.getUser(), config.getPassword());
8989
}
90-
String tableQuery = getTableQuery(path, request.getLimit());
90+
String tableQuery = getTableQuery(path.getDatabase(), path.getSchema(), path.getTable(), request.getLimit());
9191
DataDrivenETLDBInputFormat.setInput(connectionConfigAccessor.getConfiguration(), getDBRecordType(),
9292
tableQuery, null, false);
9393
connectionConfigAccessor.setConnectionArguments(Maps.fromProperties(config.getConnectionArgumentsProperties()));
@@ -115,16 +115,16 @@ protected String getConnectionString(String database) {
115115
return config.getConnectionString();
116116
}
117117

118-
protected String getTableQuery(DBConnectorPath path) {
119-
return path.getSchema() == null ? String.format("SELECT * FROM \"%s\".\"%s\"", path.getDatabase(), path.getTable())
120-
: String.format("SELECT * FROM \"%s\".\"%s\".\"%s\"", path.getDatabase(), path.getSchema(), path.getTable());
118+
protected String getTableQuery(String database, String schema, String table) {
119+
return schema == null ? String.format("SELECT * FROM \"%s\".\"%s\"", database, table)
120+
: String.format("SELECT * FROM \"%s\".\"%s\".\"%s\"", database, schema, table);
121121
}
122122

123-
protected String getTableQuery(DBConnectorPath path, int limit) {
124-
return path.getSchema() == null ?
125-
String.format("SELECT * FROM \"%s\".\"%s\" LIMIT %d", path.getDatabase(), path.getTable(), limit) :
123+
protected String getTableQuery(String database, String schema, String table, int limit) {
124+
return schema == null ?
125+
String.format("SELECT * FROM \"%s\".\"%s\" LIMIT %d", database, table, limit) :
126126
String.format(
127-
"SELECT * FROM \"%s\".\"%s\".\"%s\" LIMIT %d", path.getDatabase(), path.getSchema(), path.getTable(), limit);
127+
"SELECT * FROM \"%s\".\"%s\".\"%s\" LIMIT %d", database, schema, table, limit);
128128
}
129129

130130
protected Schema loadTableSchema(Connection connection, String query) throws SQLException {
@@ -143,4 +143,11 @@ protected void setConnectionProperties(Map<String, String> properties) {
143143
properties.put(ConnectionConfig.PASSWORD, rawProperties.get(ConnectionConfig.PASSWORD));
144144
properties.put(ConnectionConfig.CONNECTION_ARGUMENTS, rawProperties.get(ConnectionConfig.CONNECTION_ARGUMENTS));
145145
}
146+
147+
@Override
148+
protected Schema getTableSchema(Connection connection, String database,
149+
String schema, String table) throws SQLException {
150+
151+
return loadTableSchema(getConnection(), getTableQuery(database, schema, table));
152+
}
146153
}

database-commons/src/test/java/io/cdap/plugin/db/connector/DBSpecificConnectorBaseTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -265,9 +265,7 @@ protected void testGenerateSpec(AbstractDBSpecificConnector connector, String pl
265265
Map<String, String> properties = pluginSpec.getProperties();
266266
Assert.assertNull(properties.get(NAME_USE_CONNECTION));
267267
Assert.assertNull(properties.get(NAME_CONNECTION));
268-
Assert.assertEquals(connector.getTableQuery(connector.getDBConnectorPath(schema == null ? database + "/" + table :
269-
database + "/" + schema + "/" + table)),
270-
properties.get(IMPORT_QUERY));
268+
Assert.assertEquals(connector.getTableQuery(database, schema, table), properties.get(IMPORT_QUERY));
271269
properties.put("1", properties.get(NUM_SPLITS));
272270
}
273271

mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerConnector.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
7878
return;
7979
}
8080

81-
properties.put(SqlServerSource.SqlServerSourceConfig.IMPORT_QUERY, getTableQuery(path));
81+
properties.put(SqlServerSource.SqlServerSourceConfig.IMPORT_QUERY, getTableQuery(path.getDatabase(),
82+
path.getSchema(),
83+
path.getTable()));
8284
properties.put(SqlServerSource.SqlServerSourceConfig.NUM_SPLITS, "1");
8385
properties.put(SqlServerSource.SqlServerSourceConfig.DATABASE, path.getDatabase());
8486
properties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
@@ -96,9 +98,9 @@ public StructuredRecord transform(LongWritable longWritable, SqlServerSourceDBRe
9698
}
9799

98100
@Override
99-
protected String getTableQuery(DBConnectorPath path, int limit) {
101+
protected String getTableQuery(String database, String schema, String table, int limit) {
100102
return String.format(
101-
"SELECT TOP(%d) * FROM \"%s\".\"%s\".\"%s\"", limit, path.getDatabase(), path.getSchema(), path.getTable());
103+
"SELECT TOP(%d) * FROM \"%s\".\"%s\".\"%s\"", limit, database, schema, table);
102104
}
103105

104106
@Override

mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlConnector.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,21 +77,22 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
7777
return;
7878
}
7979

80-
properties.put(MysqlSource.MysqlSourceConfig.IMPORT_QUERY, getTableQuery(path));
80+
properties.put(MysqlSource.MysqlSourceConfig.IMPORT_QUERY, getTableQuery(path.getDatabase(), path.getSchema(),
81+
path.getTable()));
8182
properties.put(MysqlSource.MysqlSourceConfig.NUM_SPLITS, "1");
8283
properties.put(MysqlSource.MysqlSourceConfig.DATABASE, path.getDatabase());
8384
properties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
8485
properties.put(MysqlSink.MysqlSinkConfig.TABLE_NAME, table);
8586
}
8687

8788
@Override
88-
protected String getTableQuery(DBConnectorPath path) {
89-
return String.format("SELECT * FROM `%s`.`%s`", path.getDatabase(), path.getTable());
89+
protected String getTableQuery(String database, String schema, String table) {
90+
return String.format("SELECT * FROM `%s`.`%s`", database, table);
9091
}
9192

9293
@Override
93-
protected String getTableQuery(DBConnectorPath path, int limit) {
94-
return String.format("SELECT * FROM `%s`.`%s` LIMIT %d", path.getDatabase(), path.getTable(), limit);
94+
protected String getTableQuery(String database, String schema, String table, int limit) {
95+
return String.format("SELECT * FROM `%s`.`%s` LIMIT %d", database, table, limit);
9596
}
9697

9798
@Override

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
8383
return;
8484
}
8585

86-
properties.put(OracleSource.OracleSourceConfig.IMPORT_QUERY, getTableQuery(path));
86+
properties.put(OracleSource.OracleSourceConfig.IMPORT_QUERY, getTableQuery(path.getDatabase(), path.getSchema(),
87+
path.getTable()));
8788
properties.put(OracleSource.OracleSourceConfig.NUM_SPLITS, "1");
8889
properties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
8990
properties.put(OracleSink.OracleSinkConfig.TABLE_NAME, table);
@@ -127,13 +128,13 @@ protected String getConnectionString(@Nullable String database) {
127128
}
128129

129130
@Override
130-
protected String getTableQuery(DBConnectorPath path) {
131-
return String.format("SELECT * from \"%s\".\"%s\"", path.getSchema(), path.getTable());
131+
protected String getTableQuery(String database, String schema, String table) {
132+
return String.format("SELECT * from \"%s\".\"%s\"", schema, table);
132133
}
133134

134135
@Override
135-
protected String getTableQuery(DBConnectorPath path, int limit) {
136-
return String.format("SELECT * FROM \"%s\".\"%s\" WHERE ROWNUM <= %d", path.getSchema(), path.getTable(), limit);
136+
protected String getTableQuery(String database, String schema, String table, int limit) {
137+
return String.format("SELECT * FROM \"%s\".\"%s\" WHERE ROWNUM <= %d", schema, table, limit);
137138
}
138139

139140
@Override

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -366,8 +366,8 @@
366366
<version>1.1.0</version>
367367
<configuration>
368368
<cdapArtifacts>
369-
<parent>system:cdap-data-pipeline[6.5.0-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
370-
<parent>system:cdap-data-streams[6.5.0-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
369+
<parent>system:cdap-data-pipeline[6.5.0,7.0.0-SNAPSHOT)</parent>
370+
<parent>system:cdap-data-streams[6.5.0,7.0.0-SNAPSHOT)</parent>
371371
</cdapArtifacts>
372372
</configuration>
373373
<executions>

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresConnector.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
8484
return;
8585
}
8686

87-
properties.put(PostgresSource.PostgresSourceConfig.IMPORT_QUERY, getTableQuery(path));
87+
properties.put(PostgresSource.PostgresSourceConfig.IMPORT_QUERY, getTableQuery(path.getDatabase(),
88+
path.getSchema(), path.getTable()));
8889
properties.put(PostgresSource.PostgresSourceConfig.NUM_SPLITS, "1");
8990
properties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
9091
properties.put(PostgresSink.PostgresSinkConfig.TABLE_NAME, table);
@@ -108,13 +109,14 @@ public StructuredRecord transform(LongWritable longWritable, PostgresDBRecord re
108109
return record.getRecord();
109110
}
110111

111-
protected String getTableQuery(DBConnectorPath path) {
112-
return String.format("SELECT * FROM \"%s\".\"%s\"", path.getSchema(), path.getTable());
112+
@Override
113+
protected String getTableQuery(String database, String schema, String table) {
114+
return String.format("SELECT * FROM \"%s\".\"%s\"", schema, table);
113115
}
114116

115117
@Override
116-
protected String getTableQuery(DBConnectorPath path, int limit) {
117-
return String.format("SELECT * FROM \"%s\".\"%s\" LIMIT %d", path.getSchema(), path.getTable(), limit);
118+
protected String getTableQuery(String database, String schema, String table, int limit) {
119+
return String.format("SELECT * FROM \"%s\".\"%s\" LIMIT %d", schema, table, limit);
118120
}
119121

120122
}

0 commit comments

Comments
 (0)