Skip to content

Commit b57f20f

Browse files
Merge pull request #414 from data-integrations/PLUGIN-1640
PLUGIN-1640: add support for using different ports in cloud sql proxy VM
2 parents 37121fa + 8f5e01d commit b57f20f

28 files changed

+295
-26
lines changed

cloudsql-mysql-plugin/docs/CloudSQLMySQL-action.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ Properties
2323
**Connection Name:** The CloudSQL instance to connect to in the format <PROJECT_ID>:\<REGION>:<INSTANCE_NAME>.
2424
Can be found in the instance overview page.
2525

26+
**Port:** Port that MySQL is running on.
27+
2628
**CloudSQL Instance Type:** Whether the CloudSQL instance to connect to is private or public. Defaults to 'Public'.
2729

2830
**Username:** User identity for connecting to the specified database.

cloudsql-mysql-plugin/docs/CloudSQLMySQL-batchsink.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ You also can use the macro function ${conn(connection-name)}.
3232
**Connection Name:** The CloudSQL instance to connect to in the format <PROJECT_ID>:\<REGION>:<INSTANCE_NAME>.
3333
Can be found in the instance overview page.
3434

35+
**Port:** Port that MySQL is running on.
36+
3537
**CloudSQL Instance Type:** Whether the CloudSQL instance to connect to is private or public. Defaults to 'Public'.
3638

3739
**Table Name:** Name of the table to export to. Table must exist prior to running the pipeline.

cloudsql-mysql-plugin/docs/CloudSQLMySQL-batchsource.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ You also can use the macro function ${conn(connection-name)}.
3131
**Connection Name:** The CloudSQL instance to connect to in the format <PROJECT_ID>:\<REGION>:<INSTANCE_NAME>.
3232
Can be found in the instance overview page.
3333

34+
**Port:** Port that MySQL is running on.
35+
3436
**CloudSQL Instance Type:** Whether the CloudSQL instance to connect to is private or public. Defaults to 'Public'.
3537

3638
**Import Query:** The SELECT query to use to import data from the specified table.

cloudsql-mysql-plugin/docs/CloudSQLMySQL-connector.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ Properties
1818
**Connection Name:** The CloudSQL instance to connect to in the format <PROJECT_ID>:\<REGION>:<INSTANCE_NAME>.
1919
Can be found in the instance overview page.
2020

21+
**Port:** Port that MySQL is running on.
22+
2123
**Database:** MySQL database name.
2224

2325
**Username:** User identity for connecting to the specified database. Required for databases that need

cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLAction.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818

1919
import com.google.common.collect.ImmutableMap;
2020
import io.cdap.cdap.api.annotation.Description;
21+
import io.cdap.cdap.api.annotation.Macro;
2122
import io.cdap.cdap.api.annotation.Name;
2223
import io.cdap.cdap.api.annotation.Plugin;
2324
import io.cdap.cdap.etl.api.FailureCollector;
2425
import io.cdap.cdap.etl.api.PipelineConfigurer;
2526
import io.cdap.cdap.etl.api.action.Action;
27+
import io.cdap.plugin.db.ConnectionConfig;
2628
import io.cdap.plugin.db.action.AbstractDBAction;
2729
import io.cdap.plugin.db.action.QueryConfig;
2830
import io.cdap.plugin.util.CloudSQLUtil;
@@ -48,11 +50,13 @@ public CloudSQLMySQLAction(CloudSQLMySQLActionConfig cloudsqlMysqlActionConfig)
4850
@Override
4951
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
5052
FailureCollector failureCollector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
51-
52-
CloudSQLUtil.checkConnectionName(
53-
failureCollector,
54-
cloudsqlMysqlActionConfig.instanceType,
55-
cloudsqlMysqlActionConfig.connectionName);
53+
54+
if (cloudsqlMysqlActionConfig.canConnect()) {
55+
CloudSQLUtil.checkConnectionName(
56+
failureCollector,
57+
cloudsqlMysqlActionConfig.instanceType,
58+
cloudsqlMysqlActionConfig.connectionName);
59+
}
5660

5761
super.configurePipeline(pipelineConfigurer);
5862
}
@@ -69,10 +73,18 @@ public CloudSQLMySQLActionConfig() {
6973
"The CloudSQL instance to connect to. For a public instance, the connection string should be in the format "
7074
+ "<PROJECT_ID>:<REGION>:<INSTANCE_NAME> which can be found in the instance overview page. For a private "
7175
+ "instance, enter the internal IP address of the Compute Engine VM cloudsql proxy is running on.")
76+
@Macro
7277
public String connectionName;
7378

79+
@Name(ConnectionConfig.PORT)
80+
@Description("Database port number")
81+
@Macro
82+
@Nullable
83+
private Integer port;
84+
7485
@Name(DATABASE)
7586
@Description("Database name to connect to")
87+
@Macro
7688
public String database;
7789

7890
@Name(CloudSQLMySQLConstants.CONNECTION_TIMEOUT)
@@ -94,6 +106,7 @@ public String getConnectionString() {
94106
return String.format(
95107
CloudSQLMySQLConstants.PRIVATE_CLOUDSQL_MYSQL_CONNECTION_STRING_FORMAT,
96108
connectionName,
109+
getPort(),
97110
database);
98111
}
99112

@@ -103,10 +116,19 @@ public String getConnectionString() {
103116
connectionName);
104117
}
105118

119+
public int getPort() {
120+
return port == null ? 3306 : port;
121+
}
122+
106123
@Override
107124
public Map<String, String> getDBSpecificArguments() {
108125
return ImmutableMap.of(
109126
CloudSQLMySQLConstants.CONNECTION_TIMEOUT, String.valueOf(connectionTimeout));
110127
}
128+
129+
public boolean canConnect() {
130+
return !containsMacro(CloudSQLUtil.CONNECTION_NAME) && !containsMacro(ConnectionConfig.PORT) &&
131+
!containsMacro(DATABASE);
132+
}
111133
}
112134
}

cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLConnectorConfig.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.cdap.plugin.cloudsql.mysql;
1818

1919
import io.cdap.cdap.api.annotation.Description;
20+
import io.cdap.cdap.api.annotation.Macro;
2021
import io.cdap.cdap.api.annotation.Name;
2122
import io.cdap.plugin.db.ConnectionConfig;
2223
import io.cdap.plugin.db.connector.AbstractDBConnectorConfig;
@@ -38,25 +39,35 @@ public class CloudSQLMySQLConnectorConfig extends AbstractDBConnectorConfig {
3839
"The CloudSQL instance to connect to. For a public instance, the connection string should be in the format "
3940
+ "<PROJECT_ID>:<REGION>:<INSTANCE_NAME> which can be found in the instance overview page. For a private "
4041
+ "instance, enter the internal IP address of the Compute Engine VM cloudsql proxy is running on.")
42+
@Macro
4143
private String connectionName;
4244

45+
@Name(ConnectionConfig.PORT)
46+
@Description("Database port number")
47+
@Macro
48+
@Nullable
49+
private Integer port;
50+
4351
@Name(ConnectionConfig.DATABASE)
4452
@Description("Database name to connect to")
53+
@Macro
4554
private String database;
4655

4756
@Name(CloudSQLUtil.INSTANCE_TYPE)
4857
@Description("Whether the CloudSQL instance to connect to is private or public.")
4958
private String instanceType;
5059

5160
public CloudSQLMySQLConnectorConfig(String user, String password, String jdbcPluginName, String connectionArguments,
52-
String instanceType, String connectionName, String database) {
61+
String instanceType, String connectionName, String database,
62+
@Nullable Integer port) {
5363
this.user = user;
5464
this.password = password;
5565
this.jdbcPluginName = jdbcPluginName;
5666
this.connectionArguments = connectionArguments;
5767
this.instanceType = instanceType;
5868
this.connectionName = connectionName;
5969
this.database = database;
70+
this.port = port;
6071
}
6172

6273
public String getDatabase() {
@@ -71,12 +82,17 @@ public String getConnectionName() {
7182
return connectionName;
7283
}
7384

85+
public int getPort() {
86+
return port == null ? 3306 : port;
87+
}
88+
7489
@Override
7590
public String getConnectionString() {
7691
if (CloudSQLUtil.PRIVATE_INSTANCE.equalsIgnoreCase(instanceType)) {
7792
return String.format(
7893
CloudSQLMySQLConstants.PRIVATE_CLOUDSQL_MYSQL_CONNECTION_STRING_FORMAT,
7994
connectionName,
95+
getPort(),
8096
database);
8197
}
8298

@@ -93,4 +109,10 @@ public Properties getConnectionArgumentsProperties() {
93109
properties.put(JDBC_PROPERTY_SOCKET_TIMEOUT_MILLIS, "20000");
94110
return properties;
95111
}
112+
113+
@Override
114+
public boolean canConnect() {
115+
return super.canConnect() && !containsMacro(CloudSQLUtil.CONNECTION_NAME) &&
116+
!containsMacro(ConnectionConfig.PORT) && !containsMacro(ConnectionConfig.DATABASE);
117+
}
96118
}

cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLConstants.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,5 @@ private CloudSQLMySQLConstants() {
2626
public static final String CONNECTION_TIMEOUT = "connectionTimeout";
2727
public static final String PUBLIC_CLOUDSQL_MYSQL_CONNECTION_STRING_FORMAT =
2828
"jdbc:mysql:///%s?cloudSqlInstance=%s&socketFactory=com.google.cloud.sql.mysql.SocketFactory";
29-
public static final String PRIVATE_CLOUDSQL_MYSQL_CONNECTION_STRING_FORMAT = "jdbc:mysql://%s/%s";
29+
public static final String PRIVATE_CLOUDSQL_MYSQL_CONNECTION_STRING_FORMAT = "jdbc:mysql://%s:%s/%s";
3030
}

cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSink.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ protected LineageRecorder getLineageRecorder(BatchSinkContext context) {
9191
host = connectionParams[2];
9292
location = connectionParams[1];
9393
}
94-
String fqn = DBUtils.constructFQN("mysql", host, 3306,
94+
String fqn = DBUtils.constructFQN("mysql", host,
95+
cloudsqlMysqlSinkConfig.getConnection().getPort(),
9596
cloudsqlMysqlSinkConfig.getConnection().getDatabase(),
9697
cloudsqlMysqlSinkConfig.getReferenceName());
9798
Asset.Builder assetBuilder = Asset.builder(cloudsqlMysqlSinkConfig.getReferenceName()).setFqn(fqn);

cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSource.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ protected String createConnectionString() {
8686
return String.format(
8787
CloudSQLMySQLConstants.PRIVATE_CLOUDSQL_MYSQL_CONNECTION_STRING_FORMAT,
8888
cloudsqlMysqlSourceConfig.connection.getConnectionName(),
89+
cloudsqlMysqlSourceConfig.connection.getPort(),
8990
cloudsqlMysqlSourceConfig.connection.getDatabase());
9091
}
9192

@@ -108,7 +109,8 @@ protected LineageRecorder getLineageRecorder(BatchSourceContext context) {
108109
host = connectionParams[2];
109110
location = connectionParams[1];
110111
}
111-
String fqn = DBUtils.constructFQN("mysql", host, 3306,
112+
String fqn = DBUtils.constructFQN("mysql", host,
113+
cloudsqlMysqlSourceConfig.getConnection().getPort(),
112114
cloudsqlMysqlSourceConfig.getConnection().getDatabase(),
113115
cloudsqlMysqlSourceConfig.getReferenceName());
114116
Asset.Builder assetBuilder = Asset.builder(cloudsqlMysqlSourceConfig.getReferenceName()).setFqn(fqn);

cloudsql-mysql-plugin/src/test/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLConnectorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public void test() throws IOException, ClassNotFoundException, InstantiationExce
5454
test(
5555
new CloudSQLMySQLConnector(
5656
new CloudSQLMySQLConnectorConfig(username, password, JDBC_PLUGIN_NAME, connectionArguments, instanceType,
57-
connectionName, database)
57+
connectionName, database, null)
5858
),
5959
JDBC_DRIVER_CLASS_NAME,
6060
CloudSQLMySQLConstants.PLUGIN_NAME

0 commit comments

Comments
 (0)