Skip to content

Commit 6726e6f

Browse files
Merge pull request #181 from data-integrations/connection-management
Adding CloudSqlMySql and CloudSQLPostgreSQL connector
2 parents 46b0fd0 + 298badf commit 6726e6f

27 files changed

+1793
-453
lines changed
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
# CloudSQLMySQL Connection
2+
3+
4+
Description
5+
-----------
6+
Use this connection to access data in a CloudSQLMySQL database using JDBC.
7+
8+
Properties
9+
----------
10+
**Name:** Name of the connection. Connection names must be unique in a namespace.
11+
12+
**Description:** Description of the connection.
13+
14+
**JDBC Driver name:** Select the JDBC driver to use.
15+
16+
**CloudSQL Instance Type:** Whether the CloudSQL instance to connect to is private or public. Defaults to 'Public'.
17+
18+
**Connection Name:** The CloudSQL instance to connect to in the format <PROJECT_ID>:\<REGION>:<INSTANCE_NAME>.
19+
Can be found in the instance overview page.
20+
21+
**Database:** MySQL database name.
22+
23+
**Username:** User identity for connecting to the specified database. Required for databases that need
24+
authentication. Optional for databases that do not require authentication.
25+
26+
**Password:** Password to use to connect to the specified database.
27+
28+
**Connection Arguments:** A list of arbitrary string tag/value pairs as connection arguments. These arguments
29+
will be passed to the JDBC driver, as connection arguments, for JDBC drivers that may need additional configurations.
30+
This is a semicolon-separated list of key-value pairs, where each pair is separated by a equals '=' and specifies
31+
the key and value for the argument. For example, 'key1=value1;key2=value' specifies that the connection will be
32+
given arguments 'key1' mapped to 'value1' and the argument 'key2' mapped to 'value2'.
33+
34+
Path of the connection
35+
----------------------
36+
To browse, get a sample from, or get the specification for this connection through
37+
[Pipeline Microservices](https://cdap.atlassian.net/wiki/spaces/DOCS/pages/975929350/Pipeline+Microservices), the `path`
38+
property is required in the request body. It can be in the following form :
39+
40+
1. `/{database}/{table}`
41+
This path indicates a table. A table is the only one that can be sampled. Browse on this path to return the specified table.
42+
43+
2. `/{database}`
44+
This path indicates a database. A database cannot be sampled. Browse on this path to get all the tables under this database.
45+
46+
3. `/`
47+
This path indicates the root. A root cannot be sampled. Browse on this path to get all the databases visible through this connection.
48+
49+
Examples
50+
--------
51+
**Connecting to a public CloudSQL MySQL instance**
52+
53+
Suppose you want to read data from CloudSQL MySQL database named "prod", as "root" user with "root" password (Get the
54+
latest version of the CloudSQL socket factory jar with driver and dependencies
55+
[here](https://github.com/GoogleCloudPlatform/cloud-sql-jdbc-socket-factory/releases)), then configure plugin with:
56+
57+
58+
```
59+
Reference Name: "connection1"
60+
Driver Name: "cloudsql-mysql"
61+
Database: "prod"
62+
CloudSQL Instance Type: "Public"
63+
Connection Name: [PROJECT_ID]:[REGION]:[INSTANCE_NAME]
64+
Username: "root"
65+
Password: "root"
66+
```
67+
68+
**Connecting to a private CloudSQL MySQL instance**
69+
70+
If you want to connect to a private CloudSQL MySQL instance, create a Compute Engine VM that runs the CloudSQL Proxy
71+
docker image using the following command
72+
73+
```
74+
# Set the environment variables
75+
export PROJECT=[project_id]
76+
export REGION=[vm-region]
77+
export ZONE=`gcloud compute zones list --filter="name=${REGION}" --limit
78+
1 --uri --project=${PROJECT}| sed 's/.*\///'`
79+
export SUBNET=[vpc-subnet-name]
80+
export NAME=[gce-vm-name]
81+
export MYSQL_CONN=[mysql-instance-connection-name]
82+
83+
# Create a Compute Engine VM
84+
gcloud beta compute --project=${PROJECT_ID} instances create ${INSTANCE_NAME}
85+
--zone=${ZONE} --machine-type=g1-small --subnet=${SUBNE} --no-address
86+
--metadata=startup-script="docker run -d -p 0.0.0.0:3306:3306
87+
gcr.io/cloudsql-docker/gce-proxy:1.16 /cloud_sql_proxy
88+
-instances=${MYSQL_CONNECTION_NAME}=tcp:0.0.0.0:3306" --maintenance-policy=MIGRATE
89+
--scopes=https://www.googleapis.com/auth/cloud-platform
90+
--image=cos-69-10895-385-0 --image-project=cos-cloud
91+
```
92+
93+
Optionally, you can promote the internal IP address of the VM running the Proxy image to a static IP using
94+
95+
```
96+
# Get the VM internal IP
97+
export IP=`gcloud compute instances describe ${NAME} --zone ${ZONE} |
98+
grep "networkIP" | awk '{print $2}'`
99+
100+
# Promote the VM internal IP to static IP
101+
gcloud compute addresses create mysql-proxy --addresses ${IP} --region
102+
${REGION} --subnet ${SUBNET}
103+
104+
105+
# Note down the IP to be used in MySQL or PostgreSQL JDBC
106+
# connection string
107+
echo Proxy IP: ${IP}
108+
109+
echo "JDBC Connection strings:"
110+
echo "jdbc:postgresql://${IP}:5432/{PostgreSQL_DB_NAME}"
111+
echo "jdbc:mysql://${IP}:3306/{MySQL_DB_NAME}"
112+
```
113+
114+
Get the latest version of the CloudSQL socket factory jar with driver and dependencies from
115+
[here](https://github.com/GoogleCloudPlatform/cloud-sql-jdbc-socket-factory/releases), then configure plugin with:
116+
117+
```
118+
Reference Name: "connection1"
119+
Driver Name: "cloudsql-mysql"
120+
Database: "prod"
121+
CloudSQL Instance Type: "Private"
122+
Connection Name: <proxy-ip> (obtained from commands above)
123+
Username: "root"
124+
Password: "root"
125+
```

cloudsql-mysql-plugin/pom.xml

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,55 @@
4343
<groupId>io.cdap.plugin</groupId>
4444
<artifactId>hydrator-common</artifactId>
4545
</dependency>
46+
47+
<!-- test dependencies -->
48+
<dependency>
49+
<groupId>io.cdap.plugin</groupId>
50+
<artifactId>database-commons</artifactId>
51+
<version>${project.version}</version>
52+
<type>test-jar</type>
53+
<scope>test</scope>
54+
</dependency>
4655
<dependency>
4756
<groupId>io.cdap.cdap</groupId>
4857
<artifactId>hydrator-test</artifactId>
58+
</dependency>
59+
<dependency>
60+
<groupId>io.cdap.cdap</groupId>
61+
<artifactId>cdap-data-pipeline2_2.11</artifactId>
62+
<scope>test</scope>
63+
</dependency>
64+
<dependency>
65+
<groupId>junit</groupId>
66+
<artifactId>junit</artifactId>
67+
</dependency>
68+
<dependency>
69+
<groupId>io.cdap.cdap</groupId>
70+
<artifactId>cdap-api</artifactId>
71+
<scope>provided</scope>
72+
</dependency>
73+
<dependency>
74+
<groupId>org.mockito</groupId>
75+
<artifactId>mockito-core</artifactId>
76+
</dependency>
77+
<dependency>
78+
<groupId>org.jetbrains</groupId>
79+
<artifactId>annotations</artifactId>
80+
<version>RELEASE</version>
81+
<scope>compile</scope>
82+
</dependency>
83+
<!-- https://mvnrepository.com/artifact/com.google.cloud.sql/mysql-socket-factory-connector-j-8 -->
84+
<dependency>
85+
<groupId>com.google.cloud.sql</groupId>
86+
<artifactId>mysql-socket-factory-connector-j-8</artifactId>
87+
<version>1.0.10</version>
88+
<scope>test</scope>
89+
</dependency>
90+
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
91+
<dependency>
92+
<groupId>mysql</groupId>
93+
<artifactId>mysql-connector-java</artifactId>
94+
<version>8.0.11</version>
4995
<scope>test</scope>
5096
</dependency>
5197
</dependencies>
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Copyright © 2021 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.cloudsql.mysql;
18+
19+
import io.cdap.cdap.api.annotation.Category;
20+
import io.cdap.cdap.api.annotation.Description;
21+
import io.cdap.cdap.api.annotation.Name;
22+
import io.cdap.cdap.api.annotation.Plugin;
23+
import io.cdap.cdap.api.data.format.StructuredRecord;
24+
import io.cdap.cdap.etl.api.batch.BatchSink;
25+
import io.cdap.cdap.etl.api.batch.BatchSource;
26+
import io.cdap.cdap.etl.api.connector.Connector;
27+
import io.cdap.cdap.etl.api.connector.ConnectorSpec;
28+
import io.cdap.cdap.etl.api.connector.ConnectorSpecRequest;
29+
import io.cdap.cdap.etl.api.connector.PluginSpec;
30+
import io.cdap.plugin.common.Constants;
31+
import io.cdap.plugin.common.ReferenceNames;
32+
import io.cdap.plugin.common.db.DBConnectorPath;
33+
import io.cdap.plugin.db.ConnectionConfig;
34+
import io.cdap.plugin.db.DBRecord;
35+
import io.cdap.plugin.db.connector.AbstractDBSpecificConnector;
36+
import org.apache.hadoop.io.LongWritable;
37+
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
38+
39+
import java.util.HashMap;
40+
import java.util.Map;
41+
42+
/**
43+
* A CLoudSQL MySQL Server Database Connector that connects to CloudSQL MySQL Server database via JDBC
44+
*/
45+
@Plugin(type = Connector.PLUGIN_TYPE)
46+
@Name(CloudSQLMySQLConnector.NAME)
47+
@Description("Connection to access data in CloudSQL MySQL Server databases using JDBC.")
48+
@Category("Database")
49+
public class CloudSQLMySQLConnector extends AbstractDBSpecificConnector<DBRecord> {
50+
51+
public static final String NAME = CloudSQLMySQLConstants.PLUGIN_NAME;
52+
private final CloudSQLMySQLConnectorConfig config;
53+
54+
public CloudSQLMySQLConnector(CloudSQLMySQLConnectorConfig config) {
55+
super(config);
56+
this.config = config;
57+
}
58+
59+
@Override
60+
public boolean supportSchema() {
61+
return false;
62+
}
63+
64+
@Override
65+
protected Class<? extends DBWritable> getDBRecordType() {
66+
return DBRecord.class;
67+
}
68+
69+
@Override
70+
public StructuredRecord transform(LongWritable longWritable, DBRecord dbRecord) {
71+
return dbRecord.getRecord();
72+
}
73+
74+
@Override
75+
protected String getTableQuery(DBConnectorPath path) {
76+
return String.format("SELECT * FROM `%s`.`%s`", path.getDatabase(), path.getTable());
77+
}
78+
79+
@Override
80+
protected String getTableQuery(DBConnectorPath path, int limit) {
81+
return String.format("SELECT * FROM `%s`.`%s` LIMIT %d", path.getDatabase(), path.getTable(), limit);
82+
}
83+
84+
@Override
85+
protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath path,
86+
ConnectorSpec.Builder builder) {
87+
Map<String, String> properties = new HashMap<>();
88+
setConnectionProperties(properties);
89+
builder
90+
.addRelatedPlugin(new PluginSpec(CloudSQLMySQLConstants.PLUGIN_NAME, BatchSource.PLUGIN_TYPE, properties))
91+
.addRelatedPlugin(new PluginSpec(CloudSQLMySQLConstants.PLUGIN_NAME, BatchSink.PLUGIN_TYPE, properties));
92+
93+
String table = path.getTable();
94+
if (table == null) {
95+
return;
96+
}
97+
98+
properties.put(CloudSQLMySQLSource.CloudSQLMySQLSourceConfig.IMPORT_QUERY, getTableQuery(path));
99+
properties.put(CloudSQLMySQLSource.CloudSQLMySQLSourceConfig.NUM_SPLITS, "1");
100+
properties.put(ConnectionConfig.DATABASE, path.getDatabase());
101+
properties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
102+
properties.put(CloudSQLMySQLSink.CloudSQLMySQLSinkConfig.TABLE_NAME, table);
103+
}
104+
105+
@Override
106+
protected void setConnectionProperties(Map<String, String> properties) {
107+
Map<String, String> rawProperties = config.getRawProperties().getProperties();
108+
properties.put(ConnectionConfig.JDBC_PLUGIN_NAME, rawProperties.get(ConnectionConfig.JDBC_PLUGIN_NAME));
109+
properties.put(ConnectionConfig.USER, rawProperties.get(ConnectionConfig.USER));
110+
properties.put(ConnectionConfig.PASSWORD, rawProperties.get(ConnectionConfig.PASSWORD));
111+
properties.put(CloudSQLMySQLConstants.CONNECTION_NAME, rawProperties.get(CloudSQLMySQLConstants.CONNECTION_NAME));
112+
properties.put(CloudSQLMySQLConstants.INSTANCE_TYPE, rawProperties.get(CloudSQLMySQLConstants.INSTANCE_TYPE));
113+
}
114+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright © 2021 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.cloudsql.mysql;
18+
19+
import io.cdap.cdap.api.annotation.Description;
20+
import io.cdap.cdap.api.annotation.Name;
21+
import io.cdap.plugin.db.ConnectionConfig;
22+
import io.cdap.plugin.db.connector.AbstractDBConnectorConfig;
23+
24+
import java.util.Properties;
25+
import javax.annotation.Nullable;
26+
27+
/**
28+
* Configuration for CloudSQL MySQL connector
29+
*/
30+
public class CloudSQLMySQLConnectorConfig extends AbstractDBConnectorConfig {
31+
32+
private static final String JDBC_PROPERTY_CONNECT_TIMEOUT_MILLIS = "connectTimeout";
33+
private static final String JDBC_PROPERTY_SOCKET_TIMEOUT_MILLIS = "socketTimeout";
34+
35+
@Name(CloudSQLMySQLConstants.CONNECTION_NAME)
36+
@Description(
37+
"The CloudSQL instance to connect to. For a public instance, the connection string should be in the format "
38+
+ "<PROJECT_ID>:<REGION>:<INSTANCE_NAME> which can be found in the instance overview page. For a private "
39+
+ "instance, enter the internal IP address of the Compute Engine VM cloudsql proxy is running on.")
40+
private String connectionName;
41+
42+
@Name(ConnectionConfig.DATABASE)
43+
@Description("Database name to connect to")
44+
private String database;
45+
46+
@Name(CloudSQLMySQLConstants.INSTANCE_TYPE)
47+
@Description("Whether the CloudSQL instance to connect to is private or public.")
48+
@Nullable
49+
private String instanceType;
50+
51+
public CloudSQLMySQLConnectorConfig(String user, String password, String jdbcPluginName, String connectionArguments,
52+
String instanceType, String connectionName, String database) {
53+
this.user = user;
54+
this.password = password;
55+
this.jdbcPluginName = jdbcPluginName;
56+
this.connectionArguments = connectionArguments;
57+
this.instanceType = instanceType;
58+
this.connectionName = connectionName;
59+
this.database = database;
60+
}
61+
62+
public String getDatabase() {
63+
return database;
64+
}
65+
66+
public String getInstanceType() {
67+
return instanceType;
68+
}
69+
70+
public String getConnectionName() {
71+
return connectionName;
72+
}
73+
74+
@Override
75+
public String getConnectionString() {
76+
if (CloudSQLMySQLConstants.PRIVATE_INSTANCE.equalsIgnoreCase(instanceType)) {
77+
return String.format(
78+
CloudSQLMySQLConstants.PRIVATE_CLOUDSQL_MYSQL_CONNECTION_STRING_FORMAT,
79+
connectionName,
80+
database);
81+
}
82+
83+
return String.format(
84+
CloudSQLMySQLConstants.PUBLIC_CLOUDSQL_MYSQL_CONNECTION_STRING_FORMAT,
85+
database,
86+
connectionName);
87+
}
88+
89+
@Override
90+
public Properties getConnectionArgumentsProperties() {
91+
Properties properties = super.getConnectionArgumentsProperties();
92+
properties.put(JDBC_PROPERTY_CONNECT_TIMEOUT_MILLIS, "20000");
93+
properties.put(JDBC_PROPERTY_SOCKET_TIMEOUT_MILLIS, "20000");
94+
return properties;
95+
}
96+
}

0 commit comments

Comments
 (0)