Skip to content

Commit 67bf43b

Browse files
authored
Merge pull request #411 from cloudsufi/bugfix/CSM-1032
PLUGIN-1636 : PostgreSQL Sink - Case Sensitivity in Schema name
2 parents 82a8feb + 06b871f commit 67bf43b

File tree

8 files changed

+59
-5
lines changed

8 files changed

+59
-5
lines changed

aurora-postgresql-plugin/src/main/java/io/cdap/plugin/auroradb/postgres/AuroraPostgresSink.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,11 @@ public String getEscapedTableName() {
101101
return ESCAPE_CHAR + tableName + ESCAPE_CHAR;
102102
}
103103

104+
@Override
105+
public String getEscapedDbSchemaName() {
106+
return ESCAPE_CHAR + dbSchemaName + ESCAPE_CHAR;
107+
}
108+
104109
@Override
105110
public Map<String, String> getDBSpecificArguments() {
106111
if (connectionTimeout != null) {

cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSink.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,11 @@ public String getTransactionIsolationLevel() {
183183
public String getEscapedTableName() {
184184
return ESCAPE_CHAR + getTableName() + ESCAPE_CHAR;
185185
}
186+
187+
@Override
188+
public String getEscapedDbSchemaName() {
189+
return ESCAPE_CHAR + getDBSchemaName() + ESCAPE_CHAR;
190+
}
186191

187192
@Override
188193
public Map<String, String> getDBSpecificArguments() {

database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSinkConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,13 @@ public abstract class AbstractDBSpecificSinkConfig extends PluginConfig implemen
5656
@Macro
5757
@Nullable
5858
private String dbSchemaName;
59+
5960
@Name(OPERATION_NAME)
6061
@Description("Operation for the query to perform. By default, the query performs INSERT operation")
6162
@Macro
6263
@Nullable
6364
protected String operationName;
65+
6466
@Name(RELATION_TABLE_KEY)
6567
@Macro
6668
@Nullable
@@ -89,6 +91,10 @@ public String getEscapedTableName() {
8991
return tableName;
9092
}
9193

94+
public String getEscapedDbSchemaName() {
95+
return dbSchemaName;
96+
}
97+
9298
@Override
9399
public boolean canConnect() {
94100
return !containsMacro(TABLE_NAME) && getConnection().canConnect();

database-commons/src/main/java/io/cdap/plugin/db/config/DatabaseSinkConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,15 @@ public interface DatabaseSinkConfig extends DatabaseConnectionConfig {
6565
*/
6666
String getEscapedTableName();
6767

68+
/**
69+
* Adds escape characters (back quotes, double quotes, etc.) to the database schema name for
70+
* databases with case-sensitive identifiers.
71+
*
72+
* @return dBSchemaName with leading and trailing escape characters appended.
73+
* Default implementation returns unchanged table name string.
74+
*/
75+
String getEscapedDbSchemaName();
76+
6877
/**
6978
* Validate the sink config
7079
*

database-commons/src/main/java/io/cdap/plugin/db/sink/AbstractDBSink.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ public void prepareRun(BatchSinkContext context) {
204204
configAccessor.getConfiguration().set(DBConfiguration.DRIVER_CLASS_PROPERTY, driverClass.getName());
205205
configAccessor.getConfiguration().set(DBConfiguration.URL_PROPERTY, connectionString);
206206
String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig.getEscapedTableName()
207-
: dbSchemaName + "." + dbSinkConfig.getEscapedTableName();
207+
: dbSinkConfig.getEscapedDbSchemaName() + "." + dbSinkConfig.getEscapedTableName();
208208
configAccessor.getConfiguration().set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, fullyQualifiedTableName);
209209
configAccessor.getConfiguration().set(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, dbColumns);
210210
configAccessor.setOperationName(dbSinkConfig.getOperationName());
@@ -267,7 +267,7 @@ private Schema inferSchema(Class<? extends Driver> driverClass) {
267267
List<Schema.Field> inferredFields = new ArrayList<>();
268268
String dbSchemaName = dbSinkConfig.getDBSchemaName();
269269
String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig.getEscapedTableName()
270-
: dbSchemaName + "." + dbSinkConfig.getEscapedTableName();
270+
: dbSinkConfig.getEscapedDbSchemaName() + "." + dbSinkConfig.getEscapedTableName();
271271
try {
272272
DBUtils.ensureJDBCDriverIsAvailable(driverClass, dbSinkConfig.getConnectionString(),
273273
dbSinkConfig.getJdbcPluginName());
@@ -318,7 +318,7 @@ private void setResultSetMetadata() throws Exception {
318318
String connectionString = dbSinkConfig.getConnectionString();
319319
String dbSchemaName = dbSinkConfig.getDBSchemaName();
320320
String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig.getEscapedTableName()
321-
: dbSchemaName + "." + dbSinkConfig.getEscapedTableName();
321+
: dbSinkConfig.getEscapedDbSchemaName() + "." + dbSinkConfig.getEscapedTableName();
322322

323323
driverCleanup = DBUtils
324324
.ensureJDBCDriverIsAvailable(driverClass, connectionString, dbSinkConfig.getJdbcPluginName());
@@ -381,7 +381,7 @@ private void validateSchema(FailureCollector collector, Class<? extends Driver>
381381
Schema inputSchema, String dbSchemaName) {
382382
String connectionString = dbSinkConfig.getConnectionString();
383383
String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig.getEscapedTableName()
384-
: dbSchemaName + "." + dbSinkConfig.getEscapedTableName();
384+
: dbSinkConfig.getEscapedDbSchemaName() + "." + dbSinkConfig.getEscapedTableName();
385385
try {
386386
DBUtils.ensureJDBCDriverIsAvailable(jdbcDriverClass, connectionString, dbSinkConfig.getJdbcPluginName());
387387
} catch (IllegalAccessException | InstantiationException | SQLException e) {
@@ -467,12 +467,14 @@ public abstract static class DBSinkConfig extends DBConfig implements DatabaseSi
467467
@Description("Name of the database schema of table.")
468468
@Macro
469469
@Nullable
470-
private String dbSchemaName;
470+
public String dbSchemaName;
471+
471472
@Name(OPERATION_NAME)
472473
@Description("Operation for the query to perform. By default, the query performs INSERT operation")
473474
@Macro
474475
@Nullable
475476
protected String operationName;
477+
476478
@Name(RELATION_TABLE_KEY)
477479
@Macro
478480
@Nullable
@@ -486,6 +488,7 @@ public String getTableName() {
486488
public String getDBSchemaName() {
487489
return dbSchemaName;
488490
}
491+
489492
@Override
490493
public Operation getOperationName() {
491494
return Strings.isNullOrEmpty(operationName) ? Operation.INSERT : Operation.valueOf(operationName.toUpperCase());
@@ -506,6 +509,17 @@ public String getEscapedTableName() {
506509
return tableName;
507510
}
508511

512+
/**
513+
* Adds escape characters (back quotes, double quotes, etc.) to the database schema name for
514+
* databases with case-sensitive identifiers.
515+
*
516+
* @return dbschemaName with leading and trailing escape characters appended.
517+
* Default implementation returns unchanged table name string.
518+
*/
519+
public String getEscapedDbSchemaName() {
520+
return dbSchemaName;
521+
}
522+
509523
public boolean canConnect() {
510524
return (!containsMacro(ConnectionConfig.HOST) && !containsMacro(ConnectionConfig.PORT) &&
511525
!containsMacro(ConnectionConfig.DATABASE) && !containsMacro(TABLE_NAME) && !containsMacro(USER) &&

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,11 @@ public String getEscapedTableName() {
127127
return ESCAPE_CHAR + getTableName() + ESCAPE_CHAR;
128128
}
129129

130+
@Override
131+
public String getEscapedDbSchemaName() {
132+
return ESCAPE_CHAR + getDBSchemaName() + ESCAPE_CHAR;
133+
}
134+
130135
@Override
131136
protected OracleConnectorConfig getConnection() {
132137
return connection;

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSink.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,11 @@ public String getEscapedTableName() {
155155
return ESCAPE_CHAR + getTableName() + ESCAPE_CHAR;
156156
}
157157

158+
@Override
159+
public String getEscapedDbSchemaName() {
160+
return ESCAPE_CHAR + getDBSchemaName() + ESCAPE_CHAR;
161+
}
162+
158163
@Override
159164
public Map<String, String> getDBSpecificArguments() {
160165
return ImmutableMap.of(PostgresConstants.CONNECTION_TIMEOUT, String.valueOf(connectionTimeout));

saphana-plugin/src/main/java/io/cdap/plugin/saphana/SapHanaSink.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,5 +92,10 @@ public String getEscapedTableName() {
9292
return ESCAPE_CHAR + tableName + ESCAPE_CHAR;
9393
}
9494

95+
@Override
96+
public String getEscapedDbSchemaName() {
97+
return ESCAPE_CHAR + dbSchemaName + ESCAPE_CHAR;
98+
}
99+
95100
}
96101
}

0 commit comments

Comments
 (0)