Skip to content

Commit 267cd3f

Browse files
committed
Merge branch '1.5_3.6.1_reconnect' into '1.5_v3.6.1'
[RDB连接超时检查及重连] See merge request !68
2 parents 2fa36be + 277da27 commit 267cd3f

File tree

8 files changed

+161
-20
lines changed

8 files changed

+161
-20
lines changed

mysql/mysql-side/mysql-async-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAsyncReqRow.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,13 @@ public void open(Configuration parameters) throws Exception {
6363
.put("driver_class", MYSQL_DRIVER)
6464
.put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
6565
.put("user", rdbSideTableInfo.getUserName())
66-
.put("password", rdbSideTableInfo.getPassword());
66+
.put("password", rdbSideTableInfo.getPassword())
67+
.put("provider_class", DT_PROVIDER_CLASS)
68+
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
69+
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
70+
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
71+
72+
6773

6874
VertxOptions vo = new VertxOptions();
6975
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);

oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,11 @@ public void open(Configuration parameters) throws Exception {
5555
.put("driver_class", ORACLE_DRIVER)
5656
.put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
5757
.put("user", rdbSideTableInfo.getUserName())
58-
.put("password", rdbSideTableInfo.getPassword());
58+
.put("password", rdbSideTableInfo.getPassword())
59+
.put("provider_class", DT_PROVIDER_CLASS)
60+
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
61+
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
62+
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);;
5963

6064
VertxOptions vo = new VertxOptions();
6165
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,14 @@ public class RdbAsyncReqRow extends AsyncReqRow {
5757

5858
public final static int DEFAULT_MAX_DB_CONN_POOL_SIZE = DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE + DEFAULT_VERTX_WORKER_POOL_SIZE;
5959

60+
public final static int DEFAULT_IDLE_CONNECTION_TEST_PEROID = 60;
61+
62+
public final static boolean DEFAULT_TEST_CONNECTION_ON_CHECKIN = true;
63+
64+
public final static String DT_PROVIDER_CLASS = "com.dtstack.flink.sql.side.rdb.provider.DTC3P0DataSourceProvider";
65+
66+
public final static String PREFERRED_TEST_QUERY_SQL = "select 1 from dual";
67+
6068
private transient SQLClient rdbSQLClient;
6169

6270
public RdbAsyncReqRow(SideInfo sideInfo) {
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package com.dtstack.flink.sql.side.rdb.provider;
2+
3+
import com.mchange.v2.c3p0.ComboPooledDataSource;
4+
import io.vertx.core.json.JsonObject;
5+
import io.vertx.ext.jdbc.spi.impl.C3P0DataSourceProvider;
6+
7+
import javax.sql.DataSource;
8+
import java.beans.PropertyVetoException;
9+
import java.sql.SQLException;
10+
11+
public class DTC3P0DataSourceProvider extends C3P0DataSourceProvider {
12+
13+
@Override
14+
public DataSource getDataSource(JsonObject config) throws SQLException {
15+
String url = config.getString("url");
16+
if (url == null) throw new NullPointerException("url cannot be null");
17+
String driverClass = config.getString("driver_class");
18+
String user = config.getString("user");
19+
String password = config.getString("password");
20+
Integer maxPoolSize = config.getInteger("max_pool_size");
21+
Integer initialPoolSize = config.getInteger("initial_pool_size");
22+
Integer minPoolSize = config.getInteger("min_pool_size");
23+
Integer maxStatements = config.getInteger("max_statements");
24+
Integer maxStatementsPerConnection = config.getInteger("max_statements_per_connection");
25+
Integer maxIdleTime = config.getInteger("max_idle_time");
26+
Integer acquireRetryAttempts = config.getInteger("acquire_retry_attempts");
27+
Integer acquireRetryDelay = config.getInteger("acquire_retry_delay");
28+
Boolean breakAfterAcquireFailure = config.getBoolean("break_after_acquire_failure");
29+
30+
//add c3p0 params
31+
String preferredTestQuery = config.getString("preferred_test_query");
32+
Integer idleConnectionTestPeriod = config.getInteger("idle_connection_test_period");
33+
Boolean testConnectionOnCheckin = config.getBoolean("test_connection_on_checkin");
34+
35+
36+
37+
// If you want to configure any other C3P0 properties you can add a file c3p0.properties to the classpath
38+
ComboPooledDataSource cpds = new ComboPooledDataSource();
39+
cpds.setJdbcUrl(url);
40+
if (driverClass != null) {
41+
try {
42+
cpds.setDriverClass(driverClass);
43+
} catch (PropertyVetoException e) {
44+
throw new IllegalArgumentException(e);
45+
}
46+
}
47+
if (user != null) {
48+
cpds.setUser(user);
49+
}
50+
if (password != null) {
51+
cpds.setPassword(password);
52+
}
53+
if (maxPoolSize != null) {
54+
cpds.setMaxPoolSize(maxPoolSize);
55+
}
56+
if (minPoolSize != null) {
57+
cpds.setMinPoolSize(minPoolSize);
58+
}
59+
if (initialPoolSize != null) {
60+
cpds.setInitialPoolSize(initialPoolSize);
61+
}
62+
if (maxStatements != null) {
63+
cpds.setMaxStatements(maxStatements);
64+
}
65+
if (maxStatementsPerConnection != null) {
66+
cpds.setMaxStatementsPerConnection(maxStatementsPerConnection);
67+
}
68+
if (maxIdleTime != null) {
69+
cpds.setMaxIdleTime(maxIdleTime);
70+
}
71+
if(acquireRetryAttempts != null){
72+
cpds.setAcquireRetryAttempts(acquireRetryAttempts);
73+
}
74+
if(acquireRetryDelay != null){
75+
cpds.setAcquireRetryDelay(acquireRetryDelay);
76+
}
77+
if(breakAfterAcquireFailure != null){
78+
cpds.setBreakAfterAcquireFailure(breakAfterAcquireFailure);
79+
}
80+
81+
if (preferredTestQuery != null) {
82+
cpds.setPreferredTestQuery(preferredTestQuery);
83+
}
84+
85+
if (idleConnectionTestPeriod != null) {
86+
cpds.setIdleConnectionTestPeriod(idleConnectionTestPeriod);
87+
}
88+
89+
if (testConnectionOnCheckin != null) {
90+
cpds.setTestConnectionOnCheckin(testConnectionOnCheckin);
91+
}
92+
93+
return cpds;
94+
}
95+
}

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ public abstract class RdbSink implements RetractStreamTableSink<Row>, Serializab
6161

6262
protected String dbType;
6363

64-
protected int batchInterval = 1;
64+
protected int batchNum = 100;
65+
66+
protected long batchWaitInterval = 10000;
6567

6668
protected int[] sqlTypes;
6769

@@ -89,7 +91,8 @@ public RichSinkFunction createJdbcSinkFunc() {
8991
outputFormat.setUsername(userName);
9092
outputFormat.setPassword(password);
9193
outputFormat.setInsertQuery(sql);
92-
outputFormat.setBatchInterval(batchInterval);
94+
outputFormat.setBatchNum(batchNum);
95+
outputFormat.setBatchWaitInterval(batchWaitInterval);
9396
outputFormat.setTypesArray(sqlTypes);
9497
outputFormat.setTableName(tableName);
9598
outputFormat.setDbType(dbType);
@@ -112,7 +115,12 @@ public RdbSink genStreamSink(TargetTableInfo targetTableInfo) {
112115

113116
Integer tmpSqlBatchSize = rdbTableInfo.getBatchSize();
114117
if (tmpSqlBatchSize != null) {
115-
setBatchInterval(tmpSqlBatchSize);
118+
setBatchNum(tmpSqlBatchSize);
119+
}
120+
121+
Long batchWaitInterval = rdbTableInfo.getBatchWaitInterval();
122+
if (batchWaitInterval != null) {
123+
setBatchWaitInterval(batchWaitInterval);
116124
}
117125

118126
Integer tmpSinkParallelism = rdbTableInfo.getParallelism();
@@ -198,13 +206,12 @@ public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInform
198206
return this;
199207
}
200208

201-
/**
202-
* Set the default frequency submit updated every submission
203-
*
204-
* @param batchInterval
205-
*/
206-
public void setBatchInterval(int batchInterval) {
207-
this.batchInterval = batchInterval;
209+
public void setBatchNum(int batchNum) {
210+
this.batchNum = batchNum;
211+
}
212+
213+
public void setBatchWaitInterval(long batchWaitInterval) {
214+
this.batchWaitInterval = batchWaitInterval;
208215
}
209216

210217
@Override

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public class RetractJDBCOutputFormat extends MetricOutputFormat {
6363
private long batchWaitInterval = 10000l;
6464

6565
// batchNum
66-
private int batchInterval = 5000;
66+
private int batchNum = 100;
6767
private String insertQuery;
6868
public int[] typesArray;
6969

@@ -98,7 +98,7 @@ public void configure(Configuration parameters) {
9898
@Override
9999
public void open(int taskNumber, int numTasks) throws IOException {
100100
try {
101-
LOG.info("PreparedStatement execute batch num is {}", batchInterval);
101+
LOG.info("PreparedStatement execute batch num is {}", batchNum);
102102
dbConn = establishConnection();
103103
initMetric();
104104
if (dbConn.getMetaData().getTables(null, null, tableName, null).next()) {
@@ -110,7 +110,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
110110
throw new SQLException("Table " + tableName + " doesn't exist");
111111
}
112112

113-
if (batchWaitInterval > 0 && batchInterval > 1) {
113+
if (batchWaitInterval > 0 && batchNum > 1) {
114114
LOG.info("open batch wait interval scheduled, interval is {} ms", batchWaitInterval);
115115

116116
timerService = new ScheduledThreadPoolExecutor(1);
@@ -182,13 +182,13 @@ public void writeRecord(Tuple2 tuple2) {
182182
private void insertWrite(Row row) {
183183
checkConnectionOpen(dbConn);
184184
try {
185-
if (batchInterval == 1) {
185+
if (batchNum == 1) {
186186
writeSingleRecord(row);
187187
} else {
188188
updatePreparedStmt(row, upload);
189189
rows.add(row);
190190
upload.addBatch();
191-
if (rows.size() >= batchInterval) {
191+
if (rows.size() >= batchNum) {
192192
submitExecuteBatch();
193193
}
194194
}
@@ -234,6 +234,7 @@ private void checkConnectionOpen(Connection dbConn) {
234234
LOG.info("db connection reconnect..");
235235
dbConn= establishConnection();
236236
upload = dbConn.prepareStatement(insertQuery);
237+
this.dbConn = dbConn;
237238
}
238239
} catch (SQLException e) {
239240
LOG.error("check connection open failed..", e);
@@ -413,8 +414,12 @@ public void setDbSink(RdbSink dbSink) {
413414
this.dbSink = dbSink;
414415
}
415416

416-
public void setBatchInterval(int batchInterval) {
417-
this.batchInterval = batchInterval;
417+
public void setBatchNum(int batchNum) {
418+
this.batchNum = batchNum;
419+
}
420+
421+
public void setBatchWaitInterval(long batchWaitInterval) {
422+
this.batchWaitInterval = batchWaitInterval;
418423
}
419424

420425
public void setInsertQuery(String insertQuery) {

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/table/RdbTableInfo.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public class RdbTableInfo extends TargetTableInfo {
4141

4242
public static final String BUFFER_SIZE_KEY = "bufferSize";
4343

44+
public static final String BATCH_WAIT_INTERVAL_KEY = "batchWaitInterval";
45+
4446
public static final String FLUSH_INTERVALMS_KEY = "flushIntervalMs";
4547

4648
private String url;
@@ -53,6 +55,8 @@ public class RdbTableInfo extends TargetTableInfo {
5355

5456
private Integer batchSize;
5557

58+
private Long batchWaitInterval;
59+
5660
private String bufferSize;
5761

5862
private String flushIntervalMs;
@@ -113,6 +117,14 @@ public void setFlushIntervalMs(String flushIntervalMs) {
113117
this.flushIntervalMs = flushIntervalMs;
114118
}
115119

120+
public Long getBatchWaitInterval() {
121+
return batchWaitInterval;
122+
}
123+
124+
public void setBatchWaitInterval(Long batchWaitInterval) {
125+
this.batchWaitInterval = batchWaitInterval;
126+
}
127+
116128
@Override
117129
public boolean check() {
118130
Preconditions.checkNotNull(url, "rdb field of URL is required");

sqlserver/sqlserver-side/sqlserver-async-side/src/main/java/com/dtstack/flink/sql/side/sqlserver/SqlserverAsyncReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,11 @@ public void open(Configuration parameters) throws Exception {
5353
.put("driver_class", SQLSERVER_DRIVER)
5454
.put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
5555
.put("user", rdbSideTableInfo.getUserName())
56-
.put("password", rdbSideTableInfo.getPassword());
56+
.put("password", rdbSideTableInfo.getPassword())
57+
.put("provider_class", DT_PROVIDER_CLASS)
58+
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
59+
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
60+
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);;
5761

5862
VertxOptions vo = new VertxOptions();
5963
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);

0 commit comments

Comments
 (0)