Skip to content

Commit 1e23fe6

Browse files
author
dapeng
committed
Merge branch 'feat_1.8_asyncException_mergeTest' into 1.8_release_3.10.x_mergedTest_new
2 parents 7f544e3 + 374065a commit 1e23fe6

File tree

9 files changed

+62
-14
lines changed

9 files changed

+62
-14
lines changed

clickhouse/clickhouse-side/clickhouse-async-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAsyncReqRow.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package com.dtstack.flink.sql.side.clickhouse;
2121

22+
import com.dtstack.flink.sql.factory.DTThreadFactory;
2223
import com.dtstack.flink.sql.side.FieldInfo;
2324
import com.dtstack.flink.sql.side.JoinInfo;
2425
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
@@ -32,6 +33,9 @@
3233
import org.apache.flink.configuration.Configuration;
3334

3435
import java.util.List;
36+
import java.util.concurrent.LinkedBlockingQueue;
37+
import java.util.concurrent.ThreadPoolExecutor;
38+
import java.util.concurrent.TimeUnit;
3539

3640

3741
public class ClickhouseAsyncReqRow extends RdbAsyncReqRow {
@@ -63,6 +67,8 @@ public void open(Configuration parameters) throws Exception {
6367
vo.setFileResolverCachingEnabled(false);
6468
Vertx vertx = Vertx.vertx(vo);
6569
setRdbSqlClient(JDBCClient.createNonShared(vertx, clickhouseClientConfig));
70+
setExecutor(new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
71+
new LinkedBlockingQueue<>(10), new DTThreadFactory("clickhouseAsyncExec")));
6672
}
6773

6874
}

db2/db2-side/db2-async-side/src/main/java/com/dtstack/flink/sql/side/db2/Db2AsyncReqRow.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.side.db2;
2020

21+
import com.dtstack.flink.sql.factory.DTThreadFactory;
2122
import com.dtstack.flink.sql.side.FieldInfo;
2223
import com.dtstack.flink.sql.side.JoinInfo;
2324
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
@@ -31,6 +32,9 @@
3132
import org.apache.flink.configuration.Configuration;
3233

3334
import java.util.List;
35+
import java.util.concurrent.LinkedBlockingQueue;
36+
import java.util.concurrent.ThreadPoolExecutor;
37+
import java.util.concurrent.TimeUnit;
3438

3539
/**
3640
* Reason:
@@ -72,6 +76,8 @@ public void open(Configuration parameters) throws Exception {
7276
vo.setFileResolverCachingEnabled(false);
7377
Vertx vertx = Vertx.vertx(vo);
7478
setRdbSqlClient(JDBCClient.createNonShared(vertx, db2lientConfig));
79+
setExecutor(new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
80+
new LinkedBlockingQueue<>(10), new DTThreadFactory("dbAsyncExec")));
7581
}
7682

7783
}

impala/impala-side/impala-async-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAsyncReqRow.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.side.impala;
2020

21+
import com.dtstack.flink.sql.factory.DTThreadFactory;
2122
import com.dtstack.flink.sql.side.FieldInfo;
2223
import com.dtstack.flink.sql.side.JoinInfo;
2324
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
@@ -35,6 +36,9 @@
3536

3637
import java.io.IOException;
3738
import java.util.List;
39+
import java.util.concurrent.LinkedBlockingQueue;
40+
import java.util.concurrent.ThreadPoolExecutor;
41+
import java.util.concurrent.TimeUnit;
3842

3943
/**
4044
* Date: 2019/11/12
@@ -79,6 +83,8 @@ public void open(Configuration parameters) throws Exception {
7983
vo.setFileResolverCachingEnabled(false);
8084
Vertx vertx = Vertx.vertx(vo);
8185
setRdbSqlClient(JDBCClient.createNonShared(vertx, impalaClientConfig));
86+
setExecutor(new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
87+
new LinkedBlockingQueue<>(10), new DTThreadFactory("impalaAsyncExec")));
8288
}
8389

8490

mysql/mysql-side/mysql-async-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAsyncReqRow.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package com.dtstack.flink.sql.side.mysql;
2121

22+
import com.dtstack.flink.sql.factory.DTThreadFactory;
2223
import com.dtstack.flink.sql.side.FieldInfo;
2324
import com.dtstack.flink.sql.side.JoinInfo;
2425
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
@@ -32,6 +33,9 @@
3233
import org.apache.flink.configuration.Configuration;
3334

3435
import java.util.List;
36+
import java.util.concurrent.LinkedBlockingQueue;
37+
import java.util.concurrent.ThreadPoolExecutor;
38+
import java.util.concurrent.TimeUnit;
3539

3640
/**
3741
* Mysql dim table
@@ -72,6 +76,8 @@ public void open(Configuration parameters) throws Exception {
7276
vo.setFileResolverCachingEnabled(false);
7377
Vertx vertx = Vertx.vertx(vo);
7478
setRdbSqlClient(JDBCClient.createNonShared(vertx, mysqlClientConfig));
79+
setExecutor(new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
80+
new LinkedBlockingQueue<>(10), new DTThreadFactory("mysqlAsyncExec")));
7581
}
7682

7783
}

oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncReqRow.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package com.dtstack.flink.sql.side.oracle;
2121

22+
import com.dtstack.flink.sql.factory.DTThreadFactory;
2223
import com.dtstack.flink.sql.side.FieldInfo;
2324
import com.dtstack.flink.sql.side.JoinInfo;
2425
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
@@ -32,6 +33,9 @@
3233
import org.apache.flink.configuration.Configuration;
3334

3435
import java.util.List;
36+
import java.util.concurrent.LinkedBlockingQueue;
37+
import java.util.concurrent.ThreadPoolExecutor;
38+
import java.util.concurrent.TimeUnit;
3539

3640

3741
public class OracleAsyncReqRow extends RdbAsyncReqRow {
@@ -64,5 +68,7 @@ public void open(Configuration parameters) throws Exception {
6468
vo.setFileResolverCachingEnabled(false);
6569
Vertx vertx = Vertx.vertx(vo);
6670
setRdbSqlClient(JDBCClient.createNonShared(vertx, oracleClientConfig));
71+
setExecutor(new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
72+
new LinkedBlockingQueue<>(10), new DTThreadFactory("oracleAsyncExec")));
6773
}
6874
}

polardb/polardb-side/polardb-async-side/src/main/java/com/dtstack/flink/sql/side/polardb/PolardbAsyncReqRow.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package com.dtstack.flink.sql.side.polardb;
1919

20+
import com.dtstack.flink.sql.factory.DTThreadFactory;
2021
import com.dtstack.flink.sql.side.FieldInfo;
2122
import com.dtstack.flink.sql.side.JoinInfo;
2223
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
@@ -32,6 +33,9 @@
3233
import org.slf4j.LoggerFactory;
3334

3435
import java.util.List;
36+
import java.util.concurrent.LinkedBlockingQueue;
37+
import java.util.concurrent.ThreadPoolExecutor;
38+
import java.util.concurrent.TimeUnit;
3539

3640
/**
3741
* Date: 2019/12/20
@@ -71,5 +75,7 @@ public void open(Configuration parameters) throws Exception {
7175
vo.setFileResolverCachingEnabled(false);
7276
Vertx vertx = Vertx.vertx(vo);
7377
setRdbSqlClient(JDBCClient.createNonShared(vertx, mysqlClientConfig));
78+
setExecutor(new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
79+
new LinkedBlockingQueue<>(10), new DTThreadFactory("polardbAsyncExec")));
7480
}
7581
}

postgresql/postgresql-side/postgresql-async-side/src/main/java/com/dtstack/flink/sql/side/postgresql/PostgresqlAsyncReqRow.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package com.dtstack.flink.sql.side.postgresql;
2121

22+
import com.dtstack.flink.sql.factory.DTThreadFactory;
2223
import com.dtstack.flink.sql.side.FieldInfo;
2324
import com.dtstack.flink.sql.side.JoinInfo;
2425
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
@@ -34,6 +35,9 @@
3435
import org.slf4j.LoggerFactory;
3536

3637
import java.util.List;
38+
import java.util.concurrent.LinkedBlockingQueue;
39+
import java.util.concurrent.ThreadPoolExecutor;
40+
import java.util.concurrent.TimeUnit;
3741

3842
/**
3943
* Date: 2019-08-11
@@ -72,6 +76,8 @@ public void open(Configuration parameters) throws Exception {
7276
vo.setWorkerPoolSize(rdbSideTableInfo.getAsyncPoolSize());
7377
Vertx vertx = Vertx.vertx(vo);
7478
setRdbSqlClient(JDBCClient.createNonShared(vertx, pgClientConfig));
79+
setExecutor(new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
80+
new LinkedBlockingQueue<>(10), new DTThreadFactory("postgresqlAsyncExec")));
7581
}
7682

7783
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package com.dtstack.flink.sql.side.rdb.async;
2121

2222
import com.dtstack.flink.sql.enums.ECacheContentType;
23+
import com.dtstack.flink.sql.factory.DTThreadFactory;
2324
import com.dtstack.flink.sql.side.BaseAsyncReqRow;
2425
import com.dtstack.flink.sql.side.BaseSideInfo;
2526
import com.dtstack.flink.sql.side.CacheMissVal;
@@ -46,8 +47,7 @@
4647
import java.time.Instant;
4748
import java.util.List;
4849
import java.util.Map;
49-
import java.util.concurrent.CountDownLatch;
50-
import java.util.concurrent.ScheduledFuture;
50+
import java.util.concurrent.*;
5151
import java.util.concurrent.atomic.AtomicBoolean;
5252
import java.util.concurrent.atomic.AtomicLong;
5353

@@ -84,6 +84,8 @@ public class RdbAsyncReqRow extends BaseAsyncReqRow {
8484

8585
private AtomicBoolean connectionStatus = new AtomicBoolean(true);
8686

87+
private transient ThreadPoolExecutor executor;
88+
8789
public RdbAsyncReqRow(BaseSideInfo sideInfo) {
8890
super(sideInfo);
8991
init(sideInfo);
@@ -112,18 +114,7 @@ public void handleAsyncInvoke(Map<String, Object> inputParams, CRow input, Resul
112114
Thread.sleep(100);
113115
}
114116
Map<String, Object> params = formatInputParam(inputParams);
115-
rdbSqlClient.getConnection(conn -> {
116-
if(conn.failed()){
117-
connectionStatus.set(false);
118-
connectWithRetry(params, input, resultFuture, rdbSqlClient);
119-
return;
120-
}
121-
connectionStatus.set(true);
122-
ScheduledFuture<?> timerFuture = registerTimer(input, resultFuture);
123-
cancelTimerWhenComplete(resultFuture, timerFuture);
124-
handleQuery(conn.result(), params, input, resultFuture);
125-
});
126-
117+
executor.execute(() -> connectWithRetry(params, input, resultFuture, rdbSqlClient));
127118
}
128119

129120
private void connectWithRetry(Map<String, Object> inputParams, CRow input, ResultFuture<CRow> resultFuture, SQLClient rdbSqlClient) {
@@ -134,6 +125,7 @@ private void connectWithRetry(Map<String, Object> inputParams, CRow input, Resul
134125
rdbSqlClient.getConnection(conn -> {
135126
try {
136127
if(conn.failed()){
128+
connectionStatus.set(false);
137129
if(failCounter.getAndIncrement() % 1000 == 0){
138130
LOG.error("getConnection error", conn.cause());
139131
}
@@ -246,12 +238,20 @@ public void close() throws Exception {
246238
rdbSqlClient.close();
247239
}
248240

241+
if(executor != null){
242+
executor.shutdown();
243+
}
244+
249245
}
250246

251247
public void setRdbSqlClient(SQLClient rdbSqlClient) {
252248
this.rdbSqlClient = rdbSqlClient;
253249
}
254250

251+
public void setExecutor(ThreadPoolExecutor executor) {
252+
this.executor = executor;
253+
}
254+
255255
private void handleQuery(SQLConnection connection, Map<String, Object> inputParams, CRow input, ResultFuture<CRow> resultFuture){
256256
String key = buildCacheKey(inputParams);
257257
JsonArray params = new JsonArray(Lists.newArrayList(inputParams.values()));

sqlserver/sqlserver-side/sqlserver-async-side/src/main/java/com/dtstack/flink/sql/side/sqlserver/SqlserverAsyncReqRow.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package com.dtstack.flink.sql.side.sqlserver;
2121

22+
import com.dtstack.flink.sql.factory.DTThreadFactory;
2223
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2324
import com.dtstack.flink.sql.side.FieldInfo;
2425
import com.dtstack.flink.sql.side.JoinInfo;
@@ -34,6 +35,9 @@
3435
import org.slf4j.LoggerFactory;
3536

3637
import java.util.List;
38+
import java.util.concurrent.LinkedBlockingQueue;
39+
import java.util.concurrent.ThreadPoolExecutor;
40+
import java.util.concurrent.TimeUnit;
3741

3842
/**
3943
* Date: 2019/11/26
@@ -74,5 +78,7 @@ public void open(Configuration parameters) throws Exception {
7478
vo.setFileResolverCachingEnabled(false);
7579
Vertx vertx = Vertx.vertx(vo);
7680
setRdbSqlClient(JDBCClient.createNonShared(vertx, sqlserverClientConfig));
81+
setExecutor(new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
82+
new LinkedBlockingQueue<>(10), new DTThreadFactory("sqlServerAsyncExec")));
7783
}
7884
}

0 commit comments

Comments
 (0)