Skip to content

Commit 4a4f21c

Browse files
committed
rdb async connection pool size
1 parent a0e7d5b commit 4a4f21c

File tree

13 files changed

+96
-32
lines changed

13 files changed

+96
-32
lines changed

clickhouse/clickhouse-side/clickhouse-async-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAsyncReqRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public void open(Configuration parameters) throws Exception {
4848
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
4949
clickhouseClientConfig.put("url", rdbSideTableInfo.getUrl())
5050
.put("driver_class", CLICKHOUSE_DRIVER)
51-
.put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
51+
.put("max_pool_size", rdbSideTableInfo.getAsyncPoolSize())
5252
.put("user", rdbSideTableInfo.getUserName())
5353
.put("password", rdbSideTableInfo.getPassword())
5454
.put("provider_class", DT_PROVIDER_CLASS)
@@ -59,7 +59,7 @@ public void open(Configuration parameters) throws Exception {
5959
System.setProperty("vertx.disableFileCPResolving", "true");
6060
VertxOptions vo = new VertxOptions();
6161
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
62-
vo.setWorkerPoolSize(DEFAULT_VERTX_WORKER_POOL_SIZE);
62+
vo.setWorkerPoolSize(rdbSideTableInfo.getAsyncPoolSize());
6363
vo.setFileResolverCachingEnabled(false);
6464
Vertx vertx = Vertx.vertx(vo);
6565
setRdbSQLClient(JDBCClient.createNonShared(vertx, clickhouseClientConfig));

core/src/main/java/com/dtstack/flink/sql/side/SideTableInfo.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ public abstract class SideTableInfo extends TableInfo implements Serializable {
5555

5656
public static final String ASYNC_TIMEOUT_NUM_KEY = "asyncTimeoutNum";
5757

58+
public static final String ASYNC_REQ_POOL_KEY = "asyncPoolSize";
59+
5860
private String cacheType = "none";//None or LRU or ALL
5961

6062
private int cacheSize = 10000;
@@ -65,6 +67,11 @@ public abstract class SideTableInfo extends TableInfo implements Serializable {
6567

6668
private int asyncTimeout=10000;
6769

70+
/**
71+
* async operator req outside conn pool size, egg rdb conn pool size
72+
*/
73+
private int asyncPoolSize = 0;
74+
6875
private int asyncTimeoutNumLimit = Integer.MAX_VALUE;
6976

7077
private boolean partitionedJoin = false;
@@ -156,4 +163,27 @@ public void setAsyncTimeoutNumLimit(int asyncTimeoutNumLimit) {
156163
this.asyncTimeoutNumLimit = asyncTimeoutNumLimit;
157164
}
158165

166+
public int getAsyncPoolSize() {
167+
return asyncPoolSize;
168+
}
169+
170+
public void setAsyncPoolSize(int asyncPoolSize) {
171+
this.asyncPoolSize = asyncPoolSize;
172+
}
173+
174+
@Override
175+
public String toString() {
176+
return "Cache Info{" +
177+
"cacheType='" + cacheType + '\'' +
178+
", cacheSize=" + cacheSize +
179+
", cacheTimeout=" + cacheTimeout +
180+
", asyncCapacity=" + asyncCapacity +
181+
", asyncTimeout=" + asyncTimeout +
182+
", asyncPoolSize=" + asyncPoolSize +
183+
", asyncTimeoutNumLimit=" + asyncTimeoutNumLimit +
184+
", partitionedJoin=" + partitionedJoin +
185+
", cacheMode='" + cacheMode + '\'' +
186+
'}';
187+
}
188+
159189
}

core/src/main/java/com/dtstack/flink/sql/table/AbsSideTableParser.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.dtstack.flink.sql.enums.ECacheType;
2424
import com.dtstack.flink.sql.side.SideTableInfo;
2525
import com.dtstack.flink.sql.util.MathUtil;
26+
import org.apache.flink.util.Preconditions;
27+
2628
import java.util.Map;
2729
import java.util.regex.Matcher;
2830
import java.util.regex.Pattern;
@@ -114,6 +116,12 @@ protected void parseCacheProp(SideTableInfo sideTableInfo, Map<String, Object> p
114116
sideTableInfo.setAsyncTimeoutNumLimit(asyncTimeoutNum);
115117
}
116118
}
119+
120+
if (props.containsKey(SideTableInfo.ASYNC_REQ_POOL_KEY.toLowerCase())) {
121+
Integer asyncPoolSize = MathUtil.getIntegerVal(props.get(SideTableInfo.ASYNC_REQ_POOL_KEY.toLowerCase()));
122+
Preconditions.checkArgument(asyncPoolSize > 0 && asyncPoolSize <= 20, "asyncPoolSize size limit (0,20]");
123+
sideTableInfo.setAsyncPoolSize(asyncPoolSize);
124+
}
117125
}
118126
}
119127
}

db2/db2-side/db2-async-side/src/main/java/com/dtstack/flink/sql/side/db2/Db2AsyncReqRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public void open(Configuration parameters) throws Exception {
5959
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
6060
db2lientConfig.put("url", rdbSideTableInfo.getUrl())
6161
.put("driver_class", DB2_DRIVER)
62-
.put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
62+
.put("max_pool_size", rdbSideTableInfo.getAsyncPoolSize())
6363
.put("user", rdbSideTableInfo.getUserName())
6464
.put("password", rdbSideTableInfo.getPassword())
6565
.put("provider_class", DT_PROVIDER_CLASS)
@@ -71,7 +71,7 @@ public void open(Configuration parameters) throws Exception {
7171

7272
VertxOptions vo = new VertxOptions();
7373
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
74-
vo.setWorkerPoolSize(DEFAULT_VERTX_WORKER_POOL_SIZE);
74+
vo.setWorkerPoolSize(rdbSideTableInfo.getAsyncPoolSize());
7575
vo.setFileResolverCachingEnabled(false);
7676
Vertx vertx = Vertx.vertx(vo);
7777
setRdbSQLClient(JDBCClient.createNonShared(vertx, db2lientConfig));

impala/impala-side/impala-async-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAsyncReqRow.java

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -57,33 +57,28 @@ public ImpalaAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldI
5757
@Override
5858
public void open(Configuration parameters) throws Exception {
5959
super.open(parameters);
60-
JsonObject impalaClientConfig = getClientConfig();
60+
ImpalaSideTableInfo impalaSideTableInfo = (ImpalaSideTableInfo) sideInfo.getSideTableInfo();
61+
62+
JsonObject impalaClientConfig = new JsonObject();
63+
impalaClientConfig.put("url", getUrl())
64+
.put("driver_class", IMPALA_DRIVER)
65+
.put("max_pool_size", impalaSideTableInfo.getAsyncPoolSize())
66+
.put("provider_class", DT_PROVIDER_CLASS)
67+
.put("idle_connection_test_period", 300)
68+
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN)
69+
.put("max_idle_time", 600);
6170

6271
System.setProperty("vertx.disableFileCPResolving", "true");
6372

6473
VertxOptions vo = new VertxOptions();
6574
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
66-
vo.setWorkerPoolSize(DEFAULT_VERTX_WORKER_POOL_SIZE);
75+
vo.setWorkerPoolSize(impalaSideTableInfo.getAsyncPoolSize());
6776
vo.setFileResolverCachingEnabled(false);
6877
Vertx vertx = Vertx.vertx(vo);
78+
6979
setRdbSQLClient(JDBCClient.createNonShared(vertx, impalaClientConfig));
7080
}
7181

72-
public JsonObject getClientConfig() {
73-
JsonObject impalaClientConfig = new JsonObject();
74-
impalaClientConfig.put("url", getUrl())
75-
.put("driver_class", IMPALA_DRIVER)
76-
.put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
77-
.put("provider_class", DT_PROVIDER_CLASS)
78-
.put("idle_connection_test_period", 300)
79-
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN)
80-
.put("max_idle_time", 600)
81-
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
82-
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
83-
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
84-
85-
return impalaClientConfig;
86-
}
8782

8883
public String getUrl() {
8984
ImpalaSideTableInfo impalaSideTableInfo = (ImpalaSideTableInfo) sideInfo.getSideTableInfo();

mysql/mysql-side/mysql-async-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAsyncReqRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void open(Configuration parameters) throws Exception {
6161
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
6262
mysqlClientConfig.put("url", rdbSideTableInfo.getUrl())
6363
.put("driver_class", MYSQL_DRIVER)
64-
.put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
64+
.put("max_pool_size", rdbSideTableInfo.getAsyncPoolSize())
6565
.put("user", rdbSideTableInfo.getUserName())
6666
.put("password", rdbSideTableInfo.getPassword())
6767
.put("provider_class", DT_PROVIDER_CLASS)
@@ -73,7 +73,7 @@ public void open(Configuration parameters) throws Exception {
7373

7474
VertxOptions vo = new VertxOptions();
7575
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
76-
vo.setWorkerPoolSize(DEFAULT_VERTX_WORKER_POOL_SIZE);
76+
vo.setWorkerPoolSize(rdbSideTableInfo.getAsyncPoolSize());
7777
vo.setFileResolverCachingEnabled(false);
7878
Vertx vertx = Vertx.vertx(vo);
7979
setRdbSQLClient(JDBCClient.createNonShared(vertx, mysqlClientConfig));

oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncReqRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public void open(Configuration parameters) throws Exception {
5353
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
5454
oracleClientConfig.put("url", rdbSideTableInfo.getUrl())
5555
.put("driver_class", ORACLE_DRIVER)
56-
.put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
56+
.put("max_pool_size", rdbSideTableInfo.getAsyncPoolSize())
5757
.put("user", rdbSideTableInfo.getUserName())
5858
.put("password", rdbSideTableInfo.getPassword())
5959
.put("provider_class", DT_PROVIDER_CLASS)
@@ -65,7 +65,7 @@ public void open(Configuration parameters) throws Exception {
6565

6666
VertxOptions vo = new VertxOptions();
6767
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
68-
vo.setWorkerPoolSize(DEFAULT_VERTX_WORKER_POOL_SIZE);
68+
vo.setWorkerPoolSize(rdbSideTableInfo.getAsyncPoolSize());
6969
vo.setFileResolverCachingEnabled(false);
7070
Vertx vertx = Vertx.vertx(vo);
7171
setRdbSQLClient(JDBCClient.createNonShared(vertx, oracleClientConfig));

polardb/polardb-side/polardb-async-side/src/main/java/com/dtstack/flink/sql/side/polardb/PolardbAsyncReqRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public void open(Configuration parameters) throws Exception {
3333
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
3434
mysqlClientConfig.put("url", rdbSideTableInfo.getUrl())
3535
.put("driver_class", POLARDB_DRIVER)
36-
.put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
36+
.put("max_pool_size", rdbSideTableInfo.getAsyncPoolSize())
3737
.put("user", rdbSideTableInfo.getUserName())
3838
.put("password", rdbSideTableInfo.getPassword())
3939
.put("provider_class", DT_PROVIDER_CLASS)
@@ -45,7 +45,7 @@ public void open(Configuration parameters) throws Exception {
4545

4646
VertxOptions vo = new VertxOptions();
4747
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
48-
vo.setWorkerPoolSize(DEFAULT_VERTX_WORKER_POOL_SIZE);
48+
vo.setWorkerPoolSize(rdbSideTableInfo.getAsyncPoolSize());
4949
vo.setFileResolverCachingEnabled(false);
5050
Vertx vertx = Vertx.vertx(vo);
5151
setRdbSQLClient(JDBCClient.createNonShared(vertx, mysqlClientConfig));

postgresql/postgresql-side/postgresql-async-side/src/main/java/com/dtstack/flink/sql/side/postgresql/PostgresqlAsyncReqRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public void open(Configuration parameters) throws Exception {
6060
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
6161
pgClientConfig.put("url", rdbSideTableInfo.getUrl())
6262
.put("driver_class", POSTGRESQL_DRIVER)
63-
.put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
63+
.put("max_pool_size", rdbSideTableInfo.getAsyncPoolSize())
6464
.put("user", rdbSideTableInfo.getUserName())
6565
.put("password", rdbSideTableInfo.getPassword())
6666
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
@@ -69,7 +69,7 @@ public void open(Configuration parameters) throws Exception {
6969

7070
VertxOptions vo = new VertxOptions();
7171
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
72-
vo.setWorkerPoolSize(DEFAULT_VERTX_WORKER_POOL_SIZE);
72+
vo.setWorkerPoolSize(rdbSideTableInfo.getAsyncPoolSize());
7373
Vertx vertx = Vertx.vertx(vo);
7474
setRdbSQLClient(JDBCClient.createNonShared(vertx, pgClientConfig));
7575
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/RdbAllReqRow.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package com.dtstack.flink.sql.side.rdb.all;
2020

2121
import org.apache.flink.api.common.typeinfo.TypeInformation;
22+
import org.apache.flink.configuration.Configuration;
2223
import org.apache.flink.table.runtime.types.CRow;
2324
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
2425
import org.apache.flink.types.Row;
@@ -72,6 +73,13 @@ public RdbAllReqRow(SideInfo sideInfo) {
7273
super(sideInfo);
7374
}
7475

76+
@Override
77+
public void open(Configuration parameters) throws Exception {
78+
super.open(parameters);
79+
RdbSideTableInfo tableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
80+
LOG.info("rdb dim table config info: {} ", tableInfo.toString());
81+
}
82+
7583
@Override
7684
protected void initCache() throws SQLException {
7785
Map<String, List<Map<String, Object>>> newCache = Maps.newConcurrentMap();

0 commit comments

Comments
 (0)