Skip to content

Commit faf8438

Browse files
migrate connector DB specific sink plugins
1 parent a356d6b commit faf8438

File tree

34 files changed

+868
-205
lines changed

34 files changed

+868
-205
lines changed

aurora-mysql-plugin/src/main/java/io/cdap/plugin/auroradb/mysql/AuroraMysqlSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
@Plugin(type = BatchSink.PLUGIN_TYPE)
3434
@Name(AuroraMysqlConstants.PLUGIN_NAME)
3535
@Description("Writes records to a table of Aurora DB MySQL cluster. Each record will be written in a row in the table.")
36-
public class AuroraMysqlSink extends AbstractDBSink {
36+
public class AuroraMysqlSink extends AbstractDBSink<AuroraMysqlSink.AuroraMysqlSinkConfig> {
3737

3838
private final AuroraMysqlSinkConfig auroraMysqlSinkConfig;
3939

aurora-postgresql-plugin/src/main/java/io/cdap/plugin/auroradb/postgres/AuroraPostgresSink.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
@Name(AuroraPostgresConstants.PLUGIN_NAME)
4040
@Description("Writes records to a table of Aurora DB PostgreSQL cluster. " +
4141
"Each record will be written in a row in the table.")
42-
public class AuroraPostgresSink extends AbstractDBSink {
42+
public class AuroraPostgresSink extends AbstractDBSink<AuroraPostgresSink.AuroraPostgresSinkConfig> {
4343
private static final Character ESCAPE_CHAR = '"';
4444

4545
private final AuroraPostgresSinkConfig auroraPostgresSinkConfig;
@@ -78,7 +78,7 @@ public String getConnectionString() {
7878
}
7979

8080
@Override
81-
protected String getEscapedTableName() {
81+
public String getEscapedTableName() {
8282
return ESCAPE_CHAR + tableName + ESCAPE_CHAR;
8383
}
8484

cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
@Name(CloudSQLMySQLConstants.PLUGIN_NAME)
3636
@Description(
3737
"Writes records to a CloudSQL MySQL table. Each record will be written in a row in the table.")
38-
public class CloudSQLMySQLSink extends AbstractDBSink {
38+
public class CloudSQLMySQLSink extends AbstractDBSink<CloudSQLMySQLSink.CloudSQLMySQLSinkConfig> {
3939

4040
private final CloudSQLMySQLSinkConfig cloudsqlMysqlSinkConfig;
4141

cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSink.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
@Name(CloudSQLPostgreSQLConstants.PLUGIN_NAME)
4646
@Description(
4747
"Writes records to a CloudSQL PostgreSQL table. Each record will be written in a row in the table")
48-
public class CloudSQLPostgreSQLSink extends AbstractDBSink {
48+
public class CloudSQLPostgreSQLSink extends AbstractDBSink<CloudSQLPostgreSQLSink.CloudSQLPostgreSQLSinkConfig> {
4949

5050
private static final Character ESCAPE_CHAR = '"';
5151

@@ -138,7 +138,7 @@ public String getTransactionIsolationLevel() {
138138
}
139139

140140
@Override
141-
protected String getEscapedTableName() {
141+
public String getEscapedTableName() {
142142
return ESCAPE_CHAR + tableName + ESCAPE_CHAR;
143143
}
144144

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Copyright © 2021 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.db.batch.config;
18+
19+
import com.google.common.collect.Maps;
20+
import io.cdap.cdap.api.annotation.Description;
21+
import io.cdap.cdap.api.annotation.Macro;
22+
import io.cdap.cdap.api.annotation.Name;
23+
import io.cdap.cdap.api.plugin.PluginConfig;
24+
import io.cdap.plugin.common.Constants;
25+
import io.cdap.plugin.db.connector.AbstractDBSpecificConnectorConfig;
26+
27+
import java.util.Collections;
28+
import java.util.HashMap;
29+
import java.util.List;
30+
import java.util.Map;
31+
import javax.annotation.Nullable;
32+
33+
/**
34+
* Abstract Config for DB Specific Sink plugin
35+
*/
36+
public abstract class AbstractDBSpecificSinkConfig extends PluginConfig implements DatabaseSinkConfig {
37+
public static final String TABLE_NAME = "tableName";
38+
public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel";
39+
40+
@Name(Constants.Reference.REFERENCE_NAME)
41+
@Description(Constants.Reference.REFERENCE_NAME_DESCRIPTION)
42+
private String referenceName;
43+
44+
@Name(TABLE_NAME)
45+
@Description("Name of the database table to write to.")
46+
@Macro
47+
private String tableName;
48+
49+
@Override
50+
public String getTableName() {
51+
return tableName;
52+
}
53+
54+
/**
55+
* Adds escape characters (back quotes, double quotes, etc.) to the table name for
56+
* databases with case-sensitive identifiers.
57+
*
58+
* @return tableName with leading and trailing escape characters appended.
59+
* Default implementation returns unchanged table name string.
60+
*/
61+
@Override
62+
public String getEscapedTableName() {
63+
return tableName;
64+
}
65+
66+
@Override
67+
public boolean canConnect() {
68+
return !containsMacro(TABLE_NAME) && getConnection().canConnect();
69+
}
70+
71+
@Nullable
72+
@Override
73+
public String getTransactionIsolationLevel() {
74+
return null;
75+
}
76+
77+
@Override
78+
public List<String> getInitQueries() {
79+
return Collections.emptyList();
80+
}
81+
82+
protected abstract Map<String, String> getDBSpecificArguments();
83+
84+
protected abstract AbstractDBSpecificConnectorConfig getConnection();
85+
86+
@Override
87+
public String getConnectionString() {
88+
return getConnection().getConnectionString();
89+
}
90+
91+
@Override
92+
public Map<String, String> getConnectionArguments() {
93+
Map<String, String> arguments = new HashMap<>();
94+
arguments.putAll(Maps.fromProperties(getConnection().getConnectionArgumentsProperties()));
95+
arguments.putAll(getDBSpecificArguments());
96+
return arguments;
97+
}
98+
99+
@Override
100+
public String getJdbcPluginName() {
101+
return getConnection().getJdbcPluginName();
102+
}
103+
104+
@Override
105+
public String getUser() {
106+
return getConnection().getUser();
107+
}
108+
109+
@Override
110+
public String getPassword() {
111+
return getConnection().getPassword();
112+
}
113+
114+
@Override
115+
public String getReferenceName() {
116+
return referenceName;
117+
}
118+
}

database-commons/src/main/java/io/cdap/plugin/db/batch/config/AbstractDBSpecificSourceConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.cdap.cdap.etl.api.FailureCollector;
2727
import io.cdap.plugin.common.Constants;
2828
import io.cdap.plugin.db.batch.TransactionIsolationLevel;
29+
import io.cdap.plugin.db.batch.source.AbstractDBSource;
2930
import io.cdap.plugin.db.connector.AbstractDBSpecificConnectorConfig;
3031

3132
import java.io.IOException;
@@ -243,4 +244,9 @@ protected String cleanQuery(@Nullable String query) {
243244

244245
protected abstract AbstractDBSpecificConnectorConfig getConnection();
245246

247+
@Override
248+
public boolean canConnect() {
249+
return !containsMacro(AbstractDBSource.DBSourceConfig.IMPORT_QUERY) && getConnection().canConnect();
250+
}
251+
246252
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright © 2021 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.db.batch.config;
18+
19+
import java.util.List;
20+
21+
/**
22+
* Interface for DB Sink plugin config
23+
*/
24+
public interface DatabaseSinkConfig extends DatabaseConnectionConfig {
25+
26+
/**
27+
* @return the reference name of the sink stage
28+
*/
29+
String getReferenceName();
30+
31+
/**
32+
* @return the transaction isolation level
33+
*/
34+
String getTransactionIsolationLevel();
35+
36+
/**
37+
* @return the initial query to be run upon connecting to database
38+
*/
39+
List<String> getInitQueries();
40+
41+
/**
42+
* @return true if none of the connection parameters is macro, otherwise false
43+
*/
44+
boolean canConnect();
45+
46+
/**
47+
* @return the table name
48+
*/
49+
String getTableName();
50+
51+
/**
52+
* Adds escape characters (back quotes, double quotes, etc.) to the table name for
53+
* databases with case-sensitive identifiers.
54+
*
55+
* @return tableName with leading and trailing escape characters appended.
56+
* Default implementation returns unchanged table name string.
57+
*/
58+
String getEscapedTableName();
59+
60+
}

database-commons/src/main/java/io/cdap/plugin/db/batch/config/DatabaseSourceConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,9 @@ public interface DatabaseSourceConfig extends DatabaseConnectionConfig {
8080
* @return the initial query to be run upon connecting to database
8181
*/
8282
List<String> getInitQueries();
83+
84+
/**
85+
* @return true if none of the connection parameters is macro, otherwise false
86+
*/
87+
boolean canConnect();
8388
}

database-commons/src/main/java/io/cdap/plugin/db/batch/sink/AbstractDBSink.java

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import io.cdap.plugin.db.DBConfig;
4545
import io.cdap.plugin.db.DBRecord;
4646
import io.cdap.plugin.db.SchemaReader;
47+
import io.cdap.plugin.db.batch.config.DatabaseSinkConfig;
4748
import io.cdap.plugin.util.DBUtils;
4849
import io.cdap.plugin.util.DriverCleanup;
4950
import org.apache.hadoop.io.NullWritable;
@@ -69,19 +70,21 @@
6970

7071
/**
7172
* Sink that can be configured to export data to a database table.
73+
* @param <T> the DB Sink config
7274
*/
73-
public abstract class AbstractDBSink extends ReferenceBatchSink<StructuredRecord, DBRecord, NullWritable> {
75+
public abstract class AbstractDBSink<T extends PluginConfig & DatabaseSinkConfig>
76+
extends ReferenceBatchSink<StructuredRecord, DBRecord, NullWritable> {
7477
private static final Logger LOG = LoggerFactory.getLogger(AbstractDBSink.class);
7578

76-
private final DBSinkConfig dbSinkConfig;
79+
private final T dbSinkConfig;
7780
private Class<? extends Driver> driverClass;
7881
private DriverCleanup driverCleanup;
7982
protected List<String> columns;
8083
protected List<ColumnType> columnTypes;
8184
protected String dbColumns;
8285

83-
public AbstractDBSink(DBSinkConfig dbSinkConfig) {
84-
super(new ReferencePluginConfig(dbSinkConfig.referenceName));
86+
public AbstractDBSink(T dbSinkConfig) {
87+
super(new ReferencePluginConfig(dbSinkConfig.getReferenceName()));
8588
this.dbSinkConfig = dbSinkConfig;
8689
}
8790

@@ -98,9 +101,9 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
98101
if (Objects.nonNull(inputSchema)) {
99102
Class<? extends Driver> driverClass = DBUtils.getDriverClass(
100103
pipelineConfigurer, dbSinkConfig, ConnectionConfig.JDBC_PLUGIN_TYPE);
101-
if (driverClass != null && !dbSinkConfig.connectionParamsContainsMacro()) {
104+
if (driverClass != null && dbSinkConfig.canConnect()) {
102105
FailureCollector collector = configurer.getFailureCollector();
103-
validateSchema(collector, driverClass, dbSinkConfig.tableName, inputSchema);
106+
validateSchema(collector, driverClass, dbSinkConfig.getTableName(), inputSchema);
104107
}
105108
}
106109
}
@@ -110,7 +113,7 @@ public void prepareRun(BatchSinkContext context) {
110113
String connectionString = dbSinkConfig.getConnectionString();
111114

112115
LOG.debug("tableName = {}; pluginType = {}; pluginName = {}; connectionString = {};",
113-
dbSinkConfig.tableName,
116+
dbSinkConfig.getTableName(),
114117
ConnectionConfig.JDBC_PLUGIN_TYPE,
115118
dbSinkConfig.getJdbcPluginName(),
116119
connectionString);
@@ -123,7 +126,7 @@ public void prepareRun(BatchSinkContext context) {
123126
try {
124127
if (Objects.nonNull(outputSchema)) {
125128
FailureCollector collector = context.getFailureCollector();
126-
validateSchema(collector, driverClass, dbSinkConfig.tableName, outputSchema);
129+
validateSchema(collector, driverClass, dbSinkConfig.getTableName(), outputSchema);
127130
collector.getOrThrowException();
128131
} else {
129132
outputSchema = inferSchema(driverClass);
@@ -155,7 +158,7 @@ public void prepareRun(BatchSinkContext context) {
155158
configAccessor.setTransactionIsolationLevel(dbSinkConfig.getTransactionIsolationLevel());
156159
}
157160

158-
context.addOutput(Output.of(dbSinkConfig.referenceName, new SinkOutputFormatProvider(ETLDBOutputFormat.class,
161+
context.addOutput(Output.of(dbSinkConfig.getReferenceName(), new SinkOutputFormatProvider(ETLDBOutputFormat.class,
159162
configAccessor.getConfiguration())));
160163
}
161164

@@ -325,7 +328,7 @@ protected FieldsValidator getFieldsValidator() {
325328
}
326329

327330
private void emitLineage(BatchSinkContext context, List<Schema.Field> fields) {
328-
LineageRecorder lineageRecorder = new LineageRecorder(context, dbSinkConfig.referenceName);
331+
LineageRecorder lineageRecorder = new LineageRecorder(context, dbSinkConfig.getReferenceName());
329332

330333
if (!fields.isEmpty()) {
331334
lineageRecorder.recordWrite("Write", "Wrote to DB table.",
@@ -344,7 +347,7 @@ private void executeInitQueries(Connection connection, List<String> initQueries)
344347
/**
345348
* {@link PluginConfig} for {@link AbstractDBSink}
346349
*/
347-
public abstract static class DBSinkConfig extends DBConfig {
350+
public abstract static class DBSinkConfig extends DBConfig implements DatabaseSinkConfig {
348351
public static final String TABLE_NAME = "tableName";
349352
public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel";
350353

@@ -353,21 +356,25 @@ public abstract static class DBSinkConfig extends DBConfig {
353356
@Macro
354357
public String tableName;
355358

359+
public String getTableName() {
360+
return tableName;
361+
}
362+
356363
/**
357364
* Adds escape characters (back quotes, double quotes, etc.) to the table name for
358365
* databases with case-sensitive identifiers.
359366
*
360367
* @return tableName with leading and trailing escape characters appended.
361368
* Default implementation returns unchanged table name string.
362369
*/
363-
protected String getEscapedTableName() {
370+
public String getEscapedTableName() {
364371
return tableName;
365372
}
366373

367-
public boolean connectionParamsContainsMacro() {
368-
return (containsMacro(ConnectionConfig.HOST) || containsMacro(ConnectionConfig.PORT) ||
369-
containsMacro(ConnectionConfig.DATABASE) || containsMacro(TABLE_NAME) || containsMacro(USER) ||
370-
containsMacro(PASSWORD));
374+
public boolean canConnect() {
375+
return (!containsMacro(ConnectionConfig.HOST) && !containsMacro(ConnectionConfig.PORT) &&
376+
!containsMacro(ConnectionConfig.DATABASE) && !containsMacro(TABLE_NAME) && !containsMacro(USER) &&
377+
!containsMacro(PASSWORD));
371378
}
372379
}
373380
}

0 commit comments

Comments
 (0)