Skip to content

Commit 36961f1

Browse files
Aryan-VermaAryan-Verma
andauthored
Oracle refactor work (#424)
Co-authored-by: Aryan-Verma <aryanverma@google.com>
1 parent ed6499d commit 36961f1

File tree

7 files changed

+100
-37
lines changed

7 files changed

+100
-37
lines changed

database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,29 +61,29 @@ public abstract class AbstractDBSpecificSourceConfig extends PluginConfig implem
6161
"For example, 'SELECT * FROM table WHERE $CONDITIONS'. The '$CONDITIONS' string" +
6262
"will be replaced by 'splitBy' field limits specified by the bounding query.")
6363
@Macro
64-
private String importQuery;
64+
protected String importQuery;
6565

6666
@Nullable
6767
@Name(BOUNDING_QUERY)
6868
@Description("Bounding Query should return the min and max of the " +
6969
"values of the 'splitBy' field. For example, 'SELECT MIN(id),MAX(id) FROM table'. " +
7070
"This is required unless numSplits is set to one.")
7171
@Macro
72-
private String boundingQuery;
72+
protected String boundingQuery;
7373

7474
@Nullable
7575
@Name(SPLIT_BY)
7676
@Description("Field Name which will be used to generate splits. This is required unless numSplits is set to one.")
7777
@Macro
78-
private String splitBy;
78+
protected String splitBy;
7979

8080
@Nullable
8181
@Name(NUM_SPLITS)
8282
@Description("The number of splits to generate. If set to one, the boundingQuery is not needed, " +
8383
"and no $CONDITIONS string needs to be specified in the importQuery. If not specified, the " +
8484
"execution framework will pick a value.")
8585
@Macro
86-
private Integer numSplits;
86+
protected Integer numSplits;
8787

8888
@Nullable
8989
@Name(SCHEMA)
@@ -97,7 +97,7 @@ public abstract class AbstractDBSpecificSourceConfig extends PluginConfig implem
9797
@Macro
9898
@Description("The number of rows to fetch at a time per split. Larger fetch size can result in faster import, " +
9999
"with the tradeoff of higher memory usage.")
100-
private Integer fetchSize;
100+
protected Integer fetchSize;
101101

102102
public String getImportQuery() {
103103
return cleanQuery(importQuery);

database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java

Lines changed: 63 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,8 @@ public Schema getSchema(Class<? extends Driver> driverClass) throws IllegalAcces
133133
try {
134134

135135
driverCleanup = loadPluginClassAndGetDriver(driverClass);
136-
try (Connection connection = getConnection()) {
137-
executeInitQueries(connection, sourceConfig.getInitQueries());
138-
String query = sourceConfig.getImportQuery();
139-
return loadSchemaFromDB(connection, query);
136+
try {
137+
return getSchema();
140138
} finally {
141139
driverCleanup.destroy();
142140
}
@@ -146,6 +144,23 @@ public Schema getSchema(Class<? extends Driver> driverClass) throws IllegalAcces
146144
}
147145
}
148146

147+
/**
148+
* Returns the schema of the importQuery from the database using the SourceConfig details.
149+
*
150+
* @return Schema instance
151+
* @throws SQLException In case of
152+
* 1. Error while creating connection to the database.
153+
* 2. Error while running any init queries.
154+
* 3. Error while running the import query.
155+
*/
156+
public Schema getSchema() throws SQLException {
157+
try (Connection connection = getConnection()) {
158+
executeInitQueries(connection, sourceConfig.getInitQueries());
159+
String query = sourceConfig.getImportQuery();
160+
return loadSchemaFromDB(connection, query);
161+
}
162+
}
163+
149164
private Schema loadSchemaFromDB(Connection connection, String query) throws SQLException {
150165
Statement statement = connection.createStatement();
151166
statement.setMaxRows(1);
@@ -229,22 +244,56 @@ public void prepareRun(BatchSourceContext context) throws Exception {
229244
sourceConfig.validate(collector);
230245
collector.getOrThrowException();
231246

232-
String connectionString = sourceConfig.getConnectionString();
233-
234247
LOG.debug("pluginType = {}; pluginName = {}; connectionString = {}; importQuery = {}; " +
235248
"boundingQuery = {};",
236-
ConnectionConfig.JDBC_PLUGIN_TYPE, sourceConfig.getJdbcPluginName(),
237-
connectionString,
238-
sourceConfig.getImportQuery(), sourceConfig.getBoundingQuery());
239-
ConnectionConfigAccessor connectionConfigAccessor = new ConnectionConfigAccessor();
249+
ConnectionConfig.JDBC_PLUGIN_TYPE,
250+
sourceConfig.getJdbcPluginName(),
251+
sourceConfig.getConnectionString(),
252+
sourceConfig.getImportQuery(),
253+
sourceConfig.getBoundingQuery());
240254

241255
// Load the plugin class to make sure it is available.
242256
Class<? extends Driver> driverClass = context.loadPluginClass(getJDBCPluginId());
257+
Schema schemaFromDB = loadSchemaFromDB(driverClass);
258+
259+
ConnectionConfigAccessor connectionConfigAccessor = getConnectionConfigAccessor(
260+
driverClass.getName(),
261+
schemaFromDB,
262+
collector);
263+
264+
LineageRecorder lineageRecorder = getLineageRecorder(context);
265+
Schema schema = sourceConfig.getSchema() == null ? schemaFromDB : sourceConfig.getSchema();
266+
lineageRecorder.createExternalDataset(schema);
267+
if (schema != null && schema.getFields() != null) {
268+
lineageRecorder.recordRead("Read", "Read from database plugin",
269+
schema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList()));
270+
}
271+
context.setInput(Input.of(sourceConfig.getReferenceName(), new SourceInputFormatProvider(
272+
DataDrivenETLDBInputFormat.class, connectionConfigAccessor.getConfiguration())));
273+
}
274+
275+
/**
276+
* Returns the ConnectionConfigAccessor object containing the Configuration object for the SourceConfig
277+
* and Schema. The configuration is later used by the InputFormat object for split calculation, reader creation.
278+
*
279+
* @param driverClassName Class name of the driver in use
280+
* @param schemaFromDB Schema object
281+
* @param collector Failure Collector object
282+
* @return ConnectionConfigAccessor instance
283+
* @throws IOException
284+
*/
285+
public ConnectionConfigAccessor getConnectionConfigAccessor (String driverClassName,
286+
Schema schemaFromDB,
287+
FailureCollector collector) throws IOException {
288+
ConnectionConfigAccessor connectionConfigAccessor = new ConnectionConfigAccessor();
289+
243290
if (sourceConfig.getUser() == null && sourceConfig.getPassword() == null) {
244-
DBConfiguration.configureDB(connectionConfigAccessor.getConfiguration(), driverClass.getName(), connectionString);
291+
DBConfiguration.configureDB(connectionConfigAccessor.getConfiguration(),
292+
driverClassName, sourceConfig.getConnectionString());
245293
} else {
246-
DBConfiguration.configureDB(connectionConfigAccessor.getConfiguration(), driverClass.getName(), connectionString,
247-
sourceConfig.getUser(), sourceConfig.getPassword());
294+
DBConfiguration.configureDB(connectionConfigAccessor.getConfiguration(),
295+
driverClassName, sourceConfig.getConnectionString(),
296+
sourceConfig.getUser(), sourceConfig.getPassword());
248297
}
249298

250299
if (sourceConfig.getFetchSize() != null) {
@@ -255,7 +304,6 @@ public void prepareRun(BatchSourceContext context) throws Exception {
255304
sourceConfig.getImportQuery(), sourceConfig.getBoundingQuery(),
256305
false);
257306

258-
259307
if (sourceConfig.getTransactionIsolationLevel() != null) {
260308
connectionConfigAccessor.setTransactionIsolationLevel(sourceConfig.getTransactionIsolationLevel());
261309
}
@@ -273,7 +321,6 @@ public void prepareRun(BatchSourceContext context) throws Exception {
273321
connectionConfigAccessor.getConfiguration().setInt(MRJobConfig.NUM_MAPS, sourceConfig.getNumSplits());
274322
}
275323

276-
Schema schemaFromDB = loadSchemaFromDB(driverClass);
277324
if (sourceConfig.getSchema() != null) {
278325
sourceConfig.validateSchema(schemaFromDB, collector);
279326
collector.getOrThrowException();
@@ -283,15 +330,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
283330
connectionConfigAccessor.setSchema(schemaStr);
284331
}
285332

286-
LineageRecorder lineageRecorder = getLineageRecorder(context);
287-
Schema schema = sourceConfig.getSchema() == null ? schemaFromDB : sourceConfig.getSchema();
288-
lineageRecorder.createExternalDataset(schema);
289-
if (schema != null && schema.getFields() != null) {
290-
lineageRecorder.recordRead("Read", "Read from database plugin",
291-
schema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList()));
292-
}
293-
context.setInput(Input.of(sourceConfig.getReferenceName(), new SourceInputFormatProvider(
294-
DataDrivenETLDBInputFormat.class, connectionConfigAccessor.getConfiguration())));
333+
return connectionConfigAccessor;
295334
}
296335

297336
protected LineageRecorder getLineageRecorder(BatchSourceContext context) {

database-commons/src/main/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public Connection createConnection() {
129129
}
130130

131131
@Override
132-
protected RecordReader createDBRecordReader(DBInputSplit split, Configuration conf) throws IOException {
132+
public RecordReader createDBRecordReader(DBInputSplit split, Configuration conf) throws IOException {
133133
final RecordReader dbRecordReader = super.createDBRecordReader(split, conf);
134134
return new RecordReader() {
135135
@Override

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnectorConfig.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,18 @@ public class OracleConnectorConfig extends AbstractDBSpecificConnectorConfig {
3737
private static final String ROLE_NORMAL = "normal";
3838

3939
public OracleConnectorConfig(String host, int port, String user, String password, String jdbcPluginName,
40-
String connectionArguments) {
41-
this(host, port, user, password, jdbcPluginName, connectionArguments, null);
40+
String connectionArguments, String database) {
41+
this(host, port, user, password, jdbcPluginName, connectionArguments, null, database);
4242
}
4343

4444
public OracleConnectorConfig(String host, int port, String user, String password, String jdbcPluginName,
45-
String connectionArguments, String connectionType) {
45+
String connectionArguments, String connectionType, String database) {
46+
this(host, port, user, password, jdbcPluginName, connectionArguments, connectionType, database, null);
47+
}
48+
49+
public OracleConnectorConfig(String host, int port, String user, String password, String jdbcPluginName,
50+
String connectionArguments, String connectionType, String database,
51+
String role) {
4652

4753
this.host = host;
4854
this.port = port;
@@ -51,6 +57,8 @@ public OracleConnectorConfig(String host, int port, String user, String password
5157
this.jdbcPluginName = jdbcPluginName;
5258
this.connectionArguments = connectionArguments;
5359
this.connectionType = connectionType;
60+
this.database = database;
61+
this.role = role;
5462
}
5563

5664
@Override

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,22 @@ public static class OracleSourceConfig extends AbstractDBSpecificSourceConfig {
113113
@Nullable
114114
private Integer defaultRowPrefetch;
115115

116+
public OracleSourceConfig(String host, int port, String user, String password, String jdbcPluginName,
117+
String connectionArguments, String connectionType, String database, String role,
118+
int defaultBatchValue, int defaultRowPrefetch,
119+
String importQuery, Integer numSplits, int fetchSize,
120+
String boundingQuery, String splitBy) {
121+
this.connection = new OracleConnectorConfig(host, port, user, password, jdbcPluginName, connectionArguments,
122+
connectionType, database, role);
123+
this.defaultBatchValue = defaultBatchValue;
124+
this.defaultRowPrefetch = defaultRowPrefetch;
125+
this.fetchSize = fetchSize;
126+
this.importQuery = importQuery;
127+
this.numSplits = numSplits;
128+
this.boundingQuery = boundingQuery;
129+
this.splitBy = splitBy;
130+
}
131+
116132
@Override
117133
public String getConnectionString() {
118134
if (OracleConstants.TNS_CONNECTION_TYPE.equals(connection.getConnectionType())) {
@@ -140,7 +156,7 @@ protected Map<String, String> getDBSpecificArguments() {
140156
}
141157

142158
@Override
143-
protected OracleConnectorConfig getConnection() {
159+
public OracleConnectorConfig getConnection() {
144160
return connection;
145161
}
146162

oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleConnectorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class OracleConnectorTest extends DBSpecificConnectorBaseTest {
2929
@Test
3030
public void test() throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException {
3131
test(new OracleConnector(
32-
new OracleConnectorConfig(host, port, username, password, JDBC_PLUGIN_NAME, connectionArguments)),
32+
new OracleConnectorConfig(host, port, username, password, JDBC_PLUGIN_NAME, connectionArguments, database)),
3333
JDBC_DRIVER_CLASS_NAME, OracleConstants.PLUGIN_NAME);
3434

3535
test(new OracleConnector(

oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleFailedConnectionTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ public class OracleFailedConnectionTest extends DBSpecificFailedConnectionTest {
2828
public void test() throws ClassNotFoundException, IOException {
2929

3030
OracleConnector connector = new OracleConnector(
31-
new OracleConnectorConfig("localhost", 1521, "username", "password", "jdbc", ""));
31+
new OracleConnectorConfig("localhost", 1521, "username", "password", "jdbc", "", "database"));
3232

3333
super.test(JDBC_DRIVER_CLASS_NAME, connector, "Failed to create connection to database via connection string:" +
34-
" jdbc:oracle:thin:@localhost:1521:null and arguments: " +
34+
" jdbc:oracle:thin:@localhost:1521:database and arguments: " +
3535
"{user=username, oracle.jdbc.timezoneAsRegion=false, " +
3636
"internal_logon=normal}. Error: ConnectException: Connection " +
3737
"refused.");

0 commit comments

Comments
 (0)