Skip to content

Commit 7c622a6

Browse files
committed
Merge branch 'feat_1.8_connectPoolSize' into 'v1.8.0_dev'
连接池上限 See merge request dt-insight-engine/flinkStreamSQL!2
2 parents b78a005 + 229afce commit 7c622a6

File tree

15 files changed

+104
-49
lines changed

15 files changed

+104
-49
lines changed

clickhouse/clickhouse-side/clickhouse-async-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAsyncReqRow.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,14 @@ public void open(Configuration parameters) throws Exception {
4848
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
4949
clickhouseClientConfig.put("url", rdbSideTableInfo.getUrl())
5050
.put("driver_class", CLICKHOUSE_DRIVER)
51-
.put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
51+
.put("max_pool_size", rdbSideTableInfo.getAsyncPoolSize())
5252
.put("user", rdbSideTableInfo.getUserName())
5353
.put("password", rdbSideTableInfo.getPassword())
5454
.put("provider_class", DT_PROVIDER_CLASS);
5555
System.setProperty("vertx.disableFileCPResolving", "true");
5656
VertxOptions vo = new VertxOptions();
57-
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
58-
vo.setWorkerPoolSize(DEFAULT_VERTX_WORKER_POOL_SIZE);
57+
vo.setEventLoopPoolSize(rdbSideTableInfo.getAsyncPoolSize());
58+
vo.setWorkerPoolSize(rdbSideTableInfo.getAsyncPoolSize());
5959
vo.setFileResolverCachingEnabled(false);
6060
Vertx vertx = Vertx.vertx(vo);
6161
setRdbSqlClient(JDBCClient.createNonShared(vertx, clickhouseClientConfig));

core/src/main/java/com/dtstack/flink/sql/side/AbstractSideTableInfo.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ public abstract class AbstractSideTableInfo extends AbstractTableInfo implements
5555

5656
public static final String ASYNC_TIMEOUT_NUM_KEY = "asyncTimeoutNum";
5757

58+
public static final String ASYNC_REQ_POOL_KEY = "asyncPoolSize";
59+
5860
private String cacheType = "none";//None or LRU or ALL
5961

6062
private int cacheSize = 10000;
@@ -65,6 +67,11 @@ public abstract class AbstractSideTableInfo extends AbstractTableInfo implements
6567

6668
private int asyncTimeout=10000;
6769

70+
/**
71+
* async operator req outside conn pool size, egg rdb conn pool size
72+
*/
73+
private int asyncPoolSize = 0;
74+
6875
private int asyncTimeoutNumLimit = Integer.MAX_VALUE;
6976

7077
private boolean partitionedJoin = false;
@@ -156,4 +163,26 @@ public void setAsyncTimeoutNumLimit(int asyncTimeoutNumLimit) {
156163
this.asyncTimeoutNumLimit = asyncTimeoutNumLimit;
157164
}
158165

166+
public int getAsyncPoolSize() {
167+
return asyncPoolSize;
168+
}
169+
170+
public void setAsyncPoolSize(int asyncPoolSize) {
171+
this.asyncPoolSize = asyncPoolSize;
172+
}
173+
174+
@Override
175+
public String toString() {
176+
return "Cache Info{" +
177+
"cacheType='" + cacheType + '\'' +
178+
", cacheSize=" + cacheSize +
179+
", cacheTimeout=" + cacheTimeout +
180+
", asyncCapacity=" + asyncCapacity +
181+
", asyncTimeout=" + asyncTimeout +
182+
", asyncPoolSize=" + asyncPoolSize +
183+
", asyncTimeoutNumLimit=" + asyncTimeoutNumLimit +
184+
", partitionedJoin=" + partitionedJoin +
185+
", cacheMode='" + cacheMode + '\'' +
186+
'}';
187+
}
159188
}

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,12 @@
3030
import org.apache.flink.metrics.Counter;
3131
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3232
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
33-
import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
3433
import org.apache.flink.table.runtime.types.CRow;
3534
import org.apache.flink.types.Row;
3635
import org.slf4j.Logger;
3736
import org.slf4j.LoggerFactory;
3837

3938
import java.util.Collections;
40-
import java.util.concurrent.TimeoutException;
4139

4240
/**
4341
* All interfaces inherit naming rules: type + "AsyncReqRow" such as == "MysqlAsyncReqRow

core/src/main/java/com/dtstack/flink/sql/table/AbstractSideTableParser.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.dtstack.flink.sql.enums.ECacheType;
2424
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2525
import com.dtstack.flink.sql.util.MathUtil;
26+
import org.apache.flink.util.Preconditions;
27+
2628
import java.util.Map;
2729
import java.util.regex.Matcher;
2830
import java.util.regex.Pattern;
@@ -114,6 +116,13 @@ protected void parseCacheProp(AbstractSideTableInfo sideTableInfo, Map<String, O
114116
sideTableInfo.setAsyncTimeoutNumLimit(asyncTimeoutNum);
115117
}
116118
}
119+
120+
if (props.containsKey(AbstractSideTableInfo.ASYNC_REQ_POOL_KEY.toLowerCase())) {
121+
Integer asyncPoolSize = MathUtil.getIntegerVal(props.get(AbstractSideTableInfo.ASYNC_REQ_POOL_KEY.toLowerCase()));
122+
Preconditions.checkArgument(asyncPoolSize > 0 && asyncPoolSize <= 20, "asyncPoolSize size limit (0,20]");
123+
sideTableInfo.setAsyncPoolSize(asyncPoolSize);
124+
}
125+
117126
}
118127
}
119128
}

db2/db2-side/db2-async-side/src/main/java/com/dtstack/flink/sql/side/db2/Db2AsyncReqRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public void open(Configuration parameters) throws Exception {
5959
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
6060
db2lientConfig.put("url", rdbSideTableInfo.getUrl())
6161
.put("driver_class", DB2_DRIVER)
62-
.put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
62+
.put("max_pool_size", rdbSideTableInfo.getAsyncPoolSize())
6363
.put("user", rdbSideTableInfo.getUserName())
6464
.put("password", rdbSideTableInfo.getPassword())
6565
.put("provider_class", DT_PROVIDER_CLASS)
@@ -71,7 +71,7 @@ public void open(Configuration parameters) throws Exception {
7171

7272
VertxOptions vo = new VertxOptions();
7373
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
74-
vo.setWorkerPoolSize(DEFAULT_VERTX_WORKER_POOL_SIZE);
74+
vo.setWorkerPoolSize(rdbSideTableInfo.getAsyncPoolSize());
7575
vo.setFileResolverCachingEnabled(false);
7676
Vertx vertx = Vertx.vertx(vo);
7777
setRdbSqlClient(JDBCClient.createNonShared(vertx, db2lientConfig));

hbase/pom.xml

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,6 @@
2929
<version>1.0-SNAPSHOT</version>
3030
<scope>provided</scope>
3131
</dependency>
32-
<dependency>
33-
<groupId>jdk.tools</groupId>
34-
<artifactId>jdk.tools</artifactId>
35-
<version>1.6</version>
36-
<scope>system</scope>
37-
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
38-
</dependency>
3932

4033
<dependency>
4134
<groupId>org.apache.hbase</groupId>

impala/impala-side/impala-async-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAsyncReqRow.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -57,31 +57,28 @@ public ImpalaAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldI
5757
@Override
5858
public void open(Configuration parameters) throws Exception {
5959
super.open(parameters);
60-
JsonObject impalaClientConfig = getClientConfig();
61-
62-
System.setProperty("vertx.disableFileCPResolving", "true");
63-
64-
VertxOptions vo = new VertxOptions();
65-
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
66-
vo.setWorkerPoolSize(DEFAULT_VERTX_WORKER_POOL_SIZE);
67-
vo.setFileResolverCachingEnabled(false);
68-
Vertx vertx = Vertx.vertx(vo);
69-
setRdbSqlClient(JDBCClient.createNonShared(vertx, impalaClientConfig));
70-
}
60+
ImpalaSideTableInfo impalaSideTableInfo = (ImpalaSideTableInfo) sideInfo.getSideTableInfo();
7161

72-
public JsonObject getClientConfig() {
7362
JsonObject impalaClientConfig = new JsonObject();
7463
impalaClientConfig.put("url", getUrl())
7564
.put("driver_class", IMPALA_DRIVER)
76-
.put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
65+
.put("max_pool_size", impalaSideTableInfo.getAsyncPoolSize())
7766
.put("provider_class", DT_PROVIDER_CLASS)
7867
.put("idle_connection_test_period", 300)
7968
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN)
8069
.put("max_idle_time", 600);
8170

82-
return impalaClientConfig;
71+
System.setProperty("vertx.disableFileCPResolving", "true");
72+
73+
VertxOptions vo = new VertxOptions();
74+
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
75+
vo.setWorkerPoolSize(impalaSideTableInfo.getAsyncPoolSize());
76+
vo.setFileResolverCachingEnabled(false);
77+
Vertx vertx = Vertx.vertx(vo);
78+
setRdbSqlClient(JDBCClient.createNonShared(vertx, impalaClientConfig));
8379
}
8480

81+
8582
public String getUrl() {
8683
ImpalaSideTableInfo impalaSideTableInfo = (ImpalaSideTableInfo) sideInfo.getSideTableInfo();
8784

mysql/mysql-side/mysql-async-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAsyncReqRow.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@
3030
import io.vertx.ext.jdbc.JDBCClient;
3131
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3232
import org.apache.flink.configuration.Configuration;
33-
import org.slf4j.Logger;
34-
import org.slf4j.LoggerFactory;
3533

3634
import java.util.List;
3735

@@ -44,9 +42,6 @@
4442
*/
4543

4644
public class MysqlAsyncReqRow extends RdbAsyncReqRow {
47-
48-
private static final Logger LOG = LoggerFactory.getLogger(MysqlAsyncReqRow.class);
49-
5045
private final static String MYSQL_DRIVER = "com.mysql.jdbc.Driver";
5146

5247
public MysqlAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
@@ -61,7 +56,7 @@ public void open(Configuration parameters) throws Exception {
6156
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
6257
mysqlClientConfig.put("url", rdbSideTableInfo.getUrl())
6358
.put("driver_class", MYSQL_DRIVER)
64-
.put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
59+
.put("max_pool_size", rdbSideTableInfo.getAsyncPoolSize())
6560
.put("user", rdbSideTableInfo.getUserName())
6661
.put("password", rdbSideTableInfo.getPassword())
6762
.put("provider_class", DT_PROVIDER_CLASS)
@@ -73,7 +68,7 @@ public void open(Configuration parameters) throws Exception {
7368

7469
VertxOptions vo = new VertxOptions();
7570
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
76-
vo.setWorkerPoolSize(DEFAULT_VERTX_WORKER_POOL_SIZE);
71+
vo.setWorkerPoolSize(rdbSideTableInfo.getAsyncPoolSize());
7772
vo.setFileResolverCachingEnabled(false);
7873
Vertx vertx = Vertx.vertx(vo);
7974
setRdbSqlClient(JDBCClient.createNonShared(vertx, mysqlClientConfig));

oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncReqRow.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,11 @@
3030
import io.vertx.ext.jdbc.JDBCClient;
3131
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3232
import org.apache.flink.configuration.Configuration;
33-
import org.slf4j.Logger;
34-
import org.slf4j.LoggerFactory;
3533

3634
import java.util.List;
3735

3836

3937
public class OracleAsyncReqRow extends RdbAsyncReqRow {
40-
41-
private static final Logger LOG = LoggerFactory.getLogger(OracleAsyncReqRow.class);
42-
4338
private static final String ORACLE_DRIVER = "oracle.jdbc.driver.OracleDriver";
4439

4540
public OracleAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
@@ -53,7 +48,7 @@ public void open(Configuration parameters) throws Exception {
5348
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
5449
oracleClientConfig.put("url", rdbSideTableInfo.getUrl())
5550
.put("driver_class", ORACLE_DRIVER)
56-
.put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
51+
.put("max_pool_size", rdbSideTableInfo.getAsyncPoolSize())
5752
.put("user", rdbSideTableInfo.getUserName())
5853
.put("password", rdbSideTableInfo.getPassword())
5954
.put("provider_class", DT_PROVIDER_CLASS)
@@ -65,7 +60,7 @@ public void open(Configuration parameters) throws Exception {
6560

6661
VertxOptions vo = new VertxOptions();
6762
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
68-
vo.setWorkerPoolSize(DEFAULT_VERTX_WORKER_POOL_SIZE);
63+
vo.setWorkerPoolSize(rdbSideTableInfo.getAsyncPoolSize());
6964
vo.setFileResolverCachingEnabled(false);
7065
Vertx vertx = Vertx.vertx(vo);
7166
setRdbSqlClient(JDBCClient.createNonShared(vertx, oracleClientConfig));

polardb/polardb-side/polardb-async-side/src/main/java/com/dtstack/flink/sql/side/polardb/PolardbAsyncReqRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public void open(Configuration parameters) throws Exception {
5555
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
5656
mysqlClientConfig.put("url", rdbSideTableInfo.getUrl())
5757
.put("driver_class", POLARDB_DRIVER)
58-
.put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
58+
.put("max_pool_size", rdbSideTableInfo.getAsyncPoolSize())
5959
.put("user", rdbSideTableInfo.getUserName())
6060
.put("password", rdbSideTableInfo.getPassword())
6161
.put("provider_class", DT_PROVIDER_CLASS)
@@ -67,7 +67,7 @@ public void open(Configuration parameters) throws Exception {
6767

6868
VertxOptions vo = new VertxOptions();
6969
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
70-
vo.setWorkerPoolSize(DEFAULT_VERTX_WORKER_POOL_SIZE);
70+
vo.setWorkerPoolSize(rdbSideTableInfo.getAsyncPoolSize());
7171
vo.setFileResolverCachingEnabled(false);
7272
Vertx vertx = Vertx.vertx(vo);
7373
setRdbSqlClient(JDBCClient.createNonShared(vertx, mysqlClientConfig));

0 commit comments

Comments
 (0)