Skip to content

Commit 7242f30

Browse files
Merge pull request #394 from cloudsufi/develop-postgreSQL-operations
Update and Upsert functionality for PostgreSQL plugins.
2 parents 9864d63 + 0df5053 commit 7242f30

File tree

30 files changed

+880
-19
lines changed

30 files changed

+880
-19
lines changed

aurora-mysql-plugin/widgets/AuroraMysql-batchsink.json

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,19 @@
5050
"widget-type": "hidden",
5151
"label": "Schema Name",
5252
"name": "dbSchemaName"
53+
},
54+
{
55+
"widget-type": "hidden",
56+
"label": "Operation Name",
57+
"name": "operationName",
58+
"widget-attributes" : {
59+
"default": "insert"
60+
}
61+
},
62+
{
63+
"widget-type": "hidden",
64+
"label": "Table Key",
65+
"name": "relationTableKey"
5366
}
5467
]
5568
},

aurora-postgresql-plugin/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@
8383
<version>RELEASE</version>
8484
<scope>compile</scope>
8585
</dependency>
86+
<dependency>
87+
<groupId>io.cdap.plugin</groupId>
88+
<artifactId>postgresql-plugin</artifactId>
89+
<version>${project.version}</version>
90+
</dependency>
8691
</dependencies>
8792
<build>
8893
<plugins>
@@ -99,6 +104,7 @@
99104
<instructions>
100105
<_exportcontents>
101106
io.cdap.plugin.auroradb.postgres.*;
107+
io.cdap.plugin.postgres.*;
102108
io.cdap.plugin.db.source.*;
103109
io.cdap.plugin.db.sink.*;
104110
org.apache.commons.lang;

aurora-postgresql-plugin/src/main/java/io/cdap/plugin/auroradb/postgres/AuroraPostgresSink.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,17 @@
2020
import io.cdap.cdap.api.annotation.Description;
2121
import io.cdap.cdap.api.annotation.Name;
2222
import io.cdap.cdap.api.annotation.Plugin;
23+
import io.cdap.cdap.api.data.batch.Output;
24+
import io.cdap.cdap.api.data.format.StructuredRecord;
2325
import io.cdap.cdap.api.data.schema.Schema;
2426
import io.cdap.cdap.etl.api.batch.BatchSink;
27+
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
28+
import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider;
29+
import io.cdap.plugin.db.DBRecord;
2530
import io.cdap.plugin.db.config.DBSpecificSinkConfig;
2631
import io.cdap.plugin.db.sink.AbstractDBSink;
32+
import io.cdap.plugin.postgres.PostgresDBRecord;
33+
import io.cdap.plugin.postgres.PostgresETLDBOutputFormat;
2734

2835
import java.util.ArrayList;
2936
import java.util.Collections;
@@ -48,6 +55,18 @@ public AuroraPostgresSink(AuroraPostgresSinkConfig auroraPostgresSinkConfig) {
4855
super(auroraPostgresSinkConfig);
4956
this.auroraPostgresSinkConfig = auroraPostgresSinkConfig;
5057
}
58+
@Override
59+
protected void addOutputContext(BatchSinkContext context) {
60+
context.addOutput(Output.of(auroraPostgresSinkConfig.getReferenceName(),
61+
new SinkOutputFormatProvider(PostgresETLDBOutputFormat.class,
62+
getConfiguration())));
63+
}
64+
65+
@Override
66+
protected DBRecord getDBRecord(StructuredRecord output) {
67+
return new PostgresDBRecord(output, columnTypes, auroraPostgresSinkConfig.getOperationName(),
68+
auroraPostgresSinkConfig.getRelationTableKey());
69+
}
5170

5271
@Override
5372
protected void setColumnsInfo(List<Schema.Field> fields) {

aurora-postgresql-plugin/widgets/AuroraPostgres-batchsink.json

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,35 @@
5050
"widget-type": "hidden",
5151
"label": "Schema Name",
5252
"name": "dbSchemaName"
53+
},
54+
{
55+
"widget-type": "radio-group",
56+
"label": "Operation Name",
57+
"name": "operationName",
58+
"widget-attributes": {
59+
"default": "insert",
60+
"layout": "inline",
61+
"options": [
62+
{
63+
"id": "insert",
64+
"label": "INSERT"
65+
},
66+
{
67+
"id": "update",
68+
"label": "UPDATE"
69+
},
70+
{
71+
"id": "upsert",
72+
"label": "UPSERT"
73+
}
74+
]
75+
}
76+
},
77+
{
78+
"name": "relationTableKey",
79+
"widget-type": "csv",
80+
"label": "Table Key",
81+
"widget-attributes": {}
5382
}
5483
]
5584
},

cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsink.json

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,19 @@
126126
"widget-type": "hidden",
127127
"label": "Schema Name",
128128
"name": "dbSchemaName"
129+
},
130+
{
131+
"widget-type": "hidden",
132+
"label": "Operation Name",
133+
"name": "operationName",
134+
"widget-attributes" : {
135+
"default": "insert"
136+
}
137+
},
138+
{
139+
"widget-type": "hidden",
140+
"label": "Table Key",
141+
"name": "relationTableKey"
129142
}
130143
]
131144
},

cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSink.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.cdap.cdap.api.annotation.MetadataProperty;
2525
import io.cdap.cdap.api.annotation.Name;
2626
import io.cdap.cdap.api.annotation.Plugin;
27+
import io.cdap.cdap.api.data.batch.Output;
2728
import io.cdap.cdap.api.data.format.StructuredRecord;
2829
import io.cdap.cdap.api.data.schema.Schema;
2930
import io.cdap.cdap.etl.api.FailureCollector;
@@ -34,12 +35,14 @@
3435
import io.cdap.plugin.common.Asset;
3536
import io.cdap.plugin.common.ConfigUtil;
3637
import io.cdap.plugin.common.LineageRecorder;
38+
import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider;
3739
import io.cdap.plugin.db.DBRecord;
3840
import io.cdap.plugin.db.SchemaReader;
3941
import io.cdap.plugin.db.config.AbstractDBSpecificSinkConfig;
4042
import io.cdap.plugin.db.sink.AbstractDBSink;
4143
import io.cdap.plugin.db.sink.FieldsValidator;
4244
import io.cdap.plugin.postgres.PostgresDBRecord;
45+
import io.cdap.plugin.postgres.PostgresETLDBOutputFormat;
4346
import io.cdap.plugin.postgres.PostgresFieldsValidator;
4447
import io.cdap.plugin.postgres.PostgresSchemaReader;
4548
import io.cdap.plugin.util.CloudSQLUtil;
@@ -88,10 +91,17 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
8891
protected SchemaReader getSchemaReader() {
8992
return new PostgresSchemaReader();
9093
}
94+
@Override
95+
protected void addOutputContext(BatchSinkContext context) {
96+
context.addOutput(Output.of(cloudsqlPostgresqlSinkConfig.getReferenceName(),
97+
new SinkOutputFormatProvider(PostgresETLDBOutputFormat.class,
98+
getConfiguration())));
99+
}
91100

92101
@Override
93102
protected DBRecord getDBRecord(StructuredRecord output) {
94-
return new PostgresDBRecord(output, columnTypes);
103+
return new PostgresDBRecord(output, columnTypes, cloudsqlPostgresqlSinkConfig.getOperationName(),
104+
cloudsqlPostgresqlSinkConfig.getRelationTableKey());
95105
}
96106

97107
@Override

cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsink.json

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,35 @@
126126
"widget-type": "textbox",
127127
"label": "Schema Name",
128128
"name": "dbSchemaName"
129+
},
130+
{
131+
"widget-type": "radio-group",
132+
"label": "Operation Name",
133+
"name": "operationName",
134+
"widget-attributes": {
135+
"default": "insert",
136+
"layout": "inline",
137+
"options": [
138+
{
139+
"id": "insert",
140+
"label": "INSERT"
141+
},
142+
{
143+
"id": "update",
144+
"label": "UPDATE"
145+
},
146+
{
147+
"id": "upsert",
148+
"label": "UPSERT"
149+
}
150+
]
151+
}
152+
},
153+
{
154+
"name": "relationTableKey",
155+
"widget-type": "csv",
156+
"label": "Table Key",
157+
"widget-attributes": {}
129158
}
130159
]
131160
},

database-commons/src/main/java/io/cdap/plugin/db/ConnectionConfigAccessor.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public class ConnectionConfigAccessor {
3636
private static final String INIT_QUERIES = "io.cdap.plugin.db.init.queries";
3737
public static final String AUTO_COMMIT_ENABLED = "io.cdap.plugin.db.output.autocommit.enabled";
3838
public static final String FETCH_SIZE = "io.cdap.plugin.db.fetch.size";
39+
public static final String OPERATION_NAME = "io.cdap.plugin.db.operation.name";
40+
public static final String RELATION_TABLE_KEY = "io.cdap.plugin.db.relation.table.key";
3941

4042
private static final Gson GSON = new Gson();
4143
private static final Type STRING_MAP_TYPE = new TypeToken<Map<String, String>>() { }.getType();
@@ -106,6 +108,13 @@ public void setFetchSize(Integer fetchSize) {
106108
public Integer getFetchSize() {
107109
return configuration.getInt(FETCH_SIZE, 0);
108110
}
111+
public void setOperationName(Operation operationName) {
112+
configuration.set(OPERATION_NAME, operationName.toString());
113+
}
114+
115+
public void setRelationTableKey(String relationTableKey) {
116+
configuration.set(RELATION_TABLE_KEY, relationTableKey);
117+
}
109118

110119
public Configuration getConfiguration() {
111120
return configuration;

0 commit comments

Comments
 (0)