Skip to content

Commit 6271d99

Browse files
Adding CloudSQLPostgreSQL connector
1 parent 2f33820 commit 6271d99

File tree

8 files changed

+686
-171
lines changed

8 files changed

+686
-171
lines changed
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
# CloudSQLPostgreSQL Connection
2+
3+
4+
Description
5+
-----------
6+
Use this connection to access data in a CloudSQLPostgreSQL database using JDBC.
7+
8+
Properties
9+
----------
10+
**Name:** Name of the connection. Connection names must be unique in a namespace.
11+
12+
**Description:** Description of the connection.
13+
14+
**JDBC Driver name:** Select the JDBC driver to use.
15+
16+
**CloudSQL Instance Type:** Whether the CloudSQL instance to connect to is private or public. Defaults to 'Public'.
17+
18+
**Connection Name:** The CloudSQL instance to connect to in the format <PROJECT_ID>:\<REGION>:<INSTANCE_NAME>.
19+
Can be found in the instance overview page.
20+
21+
**Database:** CloudSQL PostgreSQL database name.
22+
23+
**Username:** User identity for connecting to the specified database. Required for databases that need
24+
authentication. Optional for databases that do not require authentication.
25+
26+
**Password:** Password to use to connect to the specified database.
27+
28+
**Connection Arguments:** A list of arbitrary string tag/value pairs as connection arguments. These arguments
29+
will be passed to the JDBC driver, as connection arguments, for JDBC drivers that may need additional configurations.
30+
This is a semicolon-separated list of key-value pairs, where each pair is separated by a equals '=' and specifies
31+
the key and value for the argument. For example, 'key1=value1;key2=value' specifies that the connection will be
32+
given arguments 'key1' mapped to 'value1' and the argument 'key2' mapped to 'value2'.
33+
34+
Path of the connection
35+
----------------------
36+
To browse, get a sample from, or get the specification for this connection through
37+
[Pipeline Microservices](https://cdap.atlassian.net/wiki/spaces/DOCS/pages/975929350/Pipeline+Microservices), the `path`
38+
property is required in the request body. It can be in the following form :
39+
40+
1. `/{schema}/{table}`
41+
This path indicates a table. A table is the only one that can be sampled. Browse on this path to return the specified table.
42+
43+
2. `/{schema}`
44+
This path indicates a schema. A schema cannot be sampled. Browse on this path to get all the tables under this schema.
45+
46+
3. `/`
47+
This path indicates the root. A root cannot be sampled. Browse on this path to get all the schemas visible through this connection.
48+
49+
Examples
50+
--------
51+
**Connecting to a public CloudSQL PostgreSQL instance**
52+
53+
Suppose you want to read data from CloudSQL PostgreSQL database named "prod", as "postgres" user with "postgres"
54+
password (Get the latest version of the CloudSQL socket factory jar with driver and dependencies
55+
[here](https://github.com/GoogleCloudPlatform/cloud-sql-jdbc-socket-factory/releases)), then configure plugin with:
56+
57+
58+
```
59+
Name: "connection1"
60+
Driver Name: "cloudsql-postgresql"
61+
Database: "prod"
62+
Connection Name: [PROJECT_ID]:[REGION]:[INSTANCE_NAME]
63+
CloudSQL Instance Type: "Public"
64+
Username: "postgres"
65+
Password: "postgres"
66+
```
67+
68+
**Connecting to a private CloudSQL PostgreSQL instance**
69+
70+
If you want to connect to a private CloudSQL PostgreSQL instance, create a Compute Engine VM that runs the CloudSQL Proxy
71+
docker image using the following command
72+
73+
```
74+
# Set the environment variables
75+
export PROJECT=[project_id]
76+
export REGION=[vm-region]
77+
export ZONE=`gcloud compute zones list --filter="name=${REGION}" --limit
78+
1 --uri --project=${PROJECT}| sed 's/.*\///'`
79+
export SUBNET=[vpc-subnet-name]
80+
export NAME=[gce-vm-name]
81+
export POSTGRESQL_CONN=[postgresql-instance-connection-name]
82+
83+
# Create a Compute Engine VM
84+
gcloud beta compute --project=${PROJECT_ID} instances create ${INSTANCE_NAME}
85+
--zone=${ZONE} --machine-type=g1-small --subnet=${SUBNE} --no-address
86+
--metadata=startup-script="docker run -d -p 0.0.0.0:3306:3306
87+
gcr.io/cloudsql-docker/gce-proxy:1.16 /cloud_sql_proxy
88+
-instances=${POSTGRESQL_CONNECTION_NAME}=tcp:0.0.0.0:3306" --maintenance-policy=MIGRATE
89+
--scopes=https://www.googleapis.com/auth/cloud-platform
90+
--image=cos-69-10895-385-0 --image-project=cos-cloud
91+
```
92+
93+
Optionally, you can promote the internal IP address of the VM running the Proxy image to a static IP using
94+
95+
```
96+
# Get the VM internal IP
97+
export IP=`gcloud compute instances describe ${NAME} --zone ${ZONE} |
98+
grep "networkIP" | awk '{print $2}'`
99+
100+
# Promote the VM internal IP to static IP
101+
gcloud compute addresses create postgresql-proxy --addresses ${IP} --region
102+
${REGION} --subnet ${SUBNET}
103+
104+
# Note down the IP to be used in MySQL or PostgreSQL JDBC
105+
# connection string
106+
echo Proxy IP: ${IP}
107+
108+
echo "JDBC Connection strings:"
109+
echo "jdbc:postgresql://${IP}:5432/{PostgreSQL_DB_NAME}"
110+
echo "jdbc:mysql://${IP}:3306/{MySQL_DB_NAME}"
111+
```
112+
113+
Get the latest version of the CloudSQL socket factory jar with driver and dependencies from
114+
[here](https://github.com/GoogleCloudPlatform/cloud-sql-jdbc-socket-factory/releases), then configure plugin with:
115+
116+
```
117+
Name: "connection1"
118+
Driver Name: "cloudsql-postgresql"
119+
Database: "prod"
120+
Connection Name: <proxy-ip> (obtained from commands above)
121+
CloudSQL Instance Type: "Private"
122+
Username: "postgres"
123+
Password: "postgres"
124+
```
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright © 2020 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.cloudsql.postgres;
18+
19+
import io.cdap.cdap.api.annotation.Category;
20+
import io.cdap.cdap.api.annotation.Description;
21+
import io.cdap.cdap.api.annotation.Name;
22+
import io.cdap.cdap.api.annotation.Plugin;
23+
import io.cdap.cdap.api.data.format.StructuredRecord;
24+
import io.cdap.cdap.etl.api.batch.BatchSink;
25+
import io.cdap.cdap.etl.api.batch.BatchSource;
26+
import io.cdap.cdap.etl.api.connector.Connector;
27+
import io.cdap.cdap.etl.api.connector.ConnectorSpec;
28+
import io.cdap.cdap.etl.api.connector.ConnectorSpecRequest;
29+
import io.cdap.cdap.etl.api.connector.PluginSpec;
30+
import io.cdap.plugin.common.Constants;
31+
import io.cdap.plugin.common.ReferenceNames;
32+
import io.cdap.plugin.common.db.DBConnectorPath;
33+
import io.cdap.plugin.db.ConnectionConfig;
34+
import io.cdap.plugin.db.SchemaReader;
35+
import io.cdap.plugin.db.connector.AbstractDBSpecificConnector;
36+
import io.cdap.plugin.postgres.PostgresDBRecord;
37+
import io.cdap.plugin.postgres.PostgresSchemaReader;
38+
import org.apache.hadoop.io.LongWritable;
39+
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
40+
41+
import java.util.HashMap;
42+
import java.util.Map;
43+
44+
/**
45+
* A CloudSQL PostgreSQL Database Connector that connects to CloudSQL PostgreSQL database via JDBC.
46+
*/
47+
@Plugin(type = Connector.PLUGIN_TYPE)
48+
@Name(CloudSQLPostgreSQLConnector.NAME)
49+
@Description("Connection to access data in CloudSQL PostgreSQL Server databases using JDBC.")
50+
@Category("Database")
51+
public class CloudSQLPostgreSQLConnector extends AbstractDBSpecificConnector<PostgresDBRecord> {
52+
public static final String NAME = CloudSQLPostgreSQLConstants.PLUGIN_NAME;
53+
private final CloudSQLPostgreSQLConnectorConfig config;
54+
55+
public CloudSQLPostgreSQLConnector(CloudSQLPostgreSQLConnectorConfig config) {
56+
super(config);
57+
this.config = config;
58+
}
59+
60+
@Override
61+
public boolean supportSchema() {
62+
return true;
63+
}
64+
65+
@Override
66+
protected Class<? extends DBWritable> getDBRecordType() {
67+
return PostgresDBRecord.class;
68+
}
69+
70+
@Override
71+
public StructuredRecord transform(LongWritable longWritable, PostgresDBRecord postgresDBRecord) {
72+
return postgresDBRecord.getRecord();
73+
}
74+
75+
@Override
76+
protected SchemaReader getSchemaReader() {
77+
return new PostgresSchemaReader();
78+
}
79+
80+
@Override
81+
protected String getTableQuery(DBConnectorPath path) {
82+
return String.format("SELECT * FROM \"%s\".\"%s\"", path.getSchema(), path.getTable());
83+
}
84+
85+
@Override
86+
protected String getTableQuery(DBConnectorPath path, int limit) {
87+
return String.format("SELECT * FROM \"%s\".\"%s\" LIMIT %d", path.getSchema(), path.getTable(), limit);
88+
}
89+
90+
@Override
91+
protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath path,
92+
ConnectorSpec.Builder builder) {
93+
Map<String, String> properties = new HashMap<>();
94+
setConnectionProperties(properties);
95+
builder
96+
.addRelatedPlugin(new PluginSpec(CloudSQLPostgreSQLConstants.PLUGIN_NAME, BatchSource.PLUGIN_TYPE, properties))
97+
.addRelatedPlugin(new PluginSpec(CloudSQLPostgreSQLConstants.PLUGIN_NAME, BatchSink.PLUGIN_TYPE, properties));
98+
99+
String table = path.getTable();
100+
if (table == null) {
101+
return;
102+
}
103+
104+
properties.put(CloudSQLPostgreSQLSource.CloudSQLPostgreSQLSourceConfig.IMPORT_QUERY, getTableQuery(path));
105+
properties.put(CloudSQLPostgreSQLSource.CloudSQLPostgreSQLSourceConfig.NUM_SPLITS, "1");
106+
properties.put(ConnectionConfig.DATABASE, path.getDatabase());
107+
properties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
108+
properties.put(CloudSQLPostgreSQLSink.CloudSQLPostgreSQLSinkConfig.TABLE_NAME, table);
109+
}
110+
111+
@Override
112+
protected void setConnectionProperties(Map<String, String> properties) {
113+
Map<String, String> rawProperties = config.getRawProperties().getProperties();
114+
properties.put(ConnectionConfig.JDBC_PLUGIN_NAME, rawProperties.get(ConnectionConfig.JDBC_PLUGIN_NAME));
115+
properties.put(ConnectionConfig.USER, rawProperties.get(ConnectionConfig.USER));
116+
properties.put(ConnectionConfig.PASSWORD, rawProperties.get(ConnectionConfig.PASSWORD));
117+
properties.put(CloudSQLPostgreSQLConstants.CONNECTION_NAME,
118+
rawProperties.get(CloudSQLPostgreSQLConstants.CONNECTION_NAME));
119+
properties.put(CloudSQLPostgreSQLConstants.INSTANCE_TYPE,
120+
rawProperties.get(CloudSQLPostgreSQLConstants.INSTANCE_TYPE));
121+
}
122+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright © 2020 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.cloudsql.postgres;
18+
19+
import io.cdap.cdap.api.annotation.Description;
20+
import io.cdap.cdap.api.annotation.Name;
21+
import io.cdap.plugin.db.ConnectionConfig;
22+
import io.cdap.plugin.db.connector.AbstractDBConnectorConfig;
23+
24+
import javax.annotation.Nullable;
25+
26+
/**
27+
* Configuration for CloudSQL PostgreSQL connector
28+
*/
29+
public class CloudSQLPostgreSQLConnectorConfig extends AbstractDBConnectorConfig {
30+
31+
@Name(CloudSQLPostgreSQLConstants.CONNECTION_NAME)
32+
@Description(
33+
"The CloudSQL instance to connect to. For a public instance, the connection string should be in the format "
34+
+ "<PROJECT_ID>:<REGION>:<INSTANCE_NAME> which can be found in the instance overview page. For a private "
35+
+ "instance, enter the internal IP address of the Compute Engine VM cloudsql proxy is running on.")
36+
private String connectionName;
37+
38+
@Name(ConnectionConfig.DATABASE)
39+
@Description("Database name to connect to")
40+
private String database;
41+
42+
@Name(CloudSQLPostgreSQLConstants.INSTANCE_TYPE)
43+
@Description("Whether the CloudSQL instance to connect to is private or public.")
44+
@Nullable
45+
private String instanceType;
46+
47+
public String getDatabase() {
48+
return database;
49+
}
50+
51+
public String getInstanceType() {
52+
return instanceType;
53+
}
54+
55+
public String getConnectionName() {
56+
return connectionName;
57+
}
58+
59+
@Override
60+
public String getConnectionString() {
61+
if (CloudSQLPostgreSQLConstants.PRIVATE_INSTANCE.equalsIgnoreCase(instanceType)) {
62+
return String.format(
63+
CloudSQLPostgreSQLConstants.PRIVATE_CLOUDSQL_POSTGRES_CONNECTION_STRING_FORMAT,
64+
connectionName,
65+
database);
66+
}
67+
68+
return String.format(
69+
CloudSQLPostgreSQLConstants.PUBLIC_CLOUDSQL_POSTGRES_CONNECTION_STRING_FORMAT,
70+
database,
71+
connectionName);
72+
}
73+
}

0 commit comments

Comments
 (0)