Skip to content

Commit 4f9022f

Browse files
author
dapeng
committed
Merge branch 'hotfix_1.8_3.10.x_26972' into 1.8_3.10_zy
2 parents 9fc7179 + b1b56ad commit 4f9022f

File tree

4 files changed

+14
-7
lines changed

4 files changed

+14
-7
lines changed

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import com.dtstack.flink.sql.enums.ECacheContentType;
2424
import com.dtstack.flink.sql.enums.ECacheType;
25+
import com.dtstack.flink.sql.factory.DTThreadFactory;
2526
import com.dtstack.flink.sql.metric.MetricConstant;
2627
import com.dtstack.flink.sql.side.cache.AbstractSideCache;
2728
import com.dtstack.flink.sql.side.cache.CacheObj;
@@ -68,6 +69,7 @@ public abstract class BaseAsyncReqRow extends RichAsyncFunction<CRow, CRow> impl
6869
private int timeOutNum = 0;
6970
protected BaseSideInfo sideInfo;
7071
protected transient Counter parseErrorRecords;
72+
private transient ThreadPoolExecutor cancelExecutor;
7173

7274
public BaseAsyncReqRow(BaseSideInfo sideInfo){
7375
this.sideInfo = sideInfo;
@@ -82,6 +84,8 @@ public void open(Configuration parameters) throws Exception {
8284
super.open(parameters);
8385
initCache();
8486
initMetric();
87+
cancelExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100000),
88+
new DTThreadFactory("cancel-timer-executor"));
8589
LOG.info("async dim table config info: {} ", sideInfo.getSideTableInfo().toString());
8690
}
8791

@@ -248,12 +252,11 @@ public void onProcessingTime(long timestamp) throws Exception {
248252
}
249253

250254
protected void cancelTimerWhenComplete(ResultFuture<CRow> resultFuture, ScheduledFuture<?> timerFuture){
251-
ThreadPoolExecutor executors = new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
252255
if(resultFuture instanceof StreamRecordQueueEntry){
253256
StreamRecordQueueEntry streamRecordBufferEntry = (StreamRecordQueueEntry) resultFuture;
254257
streamRecordBufferEntry.onComplete((Object value) -> {
255258
timerFuture.cancel(true);
256-
},executors);
259+
}, cancelExecutor);
257260
}
258261
}
259262

mongo/mongo-side/mongo-side-core/src/main/java/com/dtstack/flink/sql/side/mongo/table/MongoSideParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
5959
mongoSideTableInfo.setDatabase(MathUtil.getString(props.get(DATABASE_KEY.toLowerCase())));
6060
mongoSideTableInfo.setUserName(MathUtil.getString(props.get(USER_NAME_KEY.toLowerCase())));
6161
mongoSideTableInfo.setPassword(MathUtil.getString(props.get(PASSWORD_KEY.toLowerCase())));
62-
62+
mongoSideTableInfo.check();
6363
return mongoSideTableInfo;
6464
}
6565
}

mongo/mongo-sink/src/main/java/com/dtstack/flink/sql/sink/mongo/table/MongoSinkParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
6060
mongoTableInfo.setDatabase(MathUtil.getString(props.get(DATABASE_KEY.toLowerCase())));
6161
mongoTableInfo.setUserName(MathUtil.getString(props.get(USER_NAME_KEY.toLowerCase())));
6262
mongoTableInfo.setPassword(MathUtil.getString(props.get(PASSWORD_KEY.toLowerCase())));
63-
63+
mongoTableInfo.check();
6464
return mongoTableInfo;
6565
}
6666
}

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/JDBCUpsertOutputFormat.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
import org.slf4j.LoggerFactory;
3434

3535
import java.io.IOException;
36+
import java.sql.ResultSet;
3637
import java.sql.SQLException;
38+
import java.sql.Statement;
3739
import java.util.List;
3840
import java.util.concurrent.ScheduledExecutorService;
3941
import java.util.concurrent.ScheduledFuture;
@@ -44,6 +46,7 @@
4446

4547
/**
4648
* An upsert OutputFormat for JDBC.
49+
*
4750
* @author maqi
4851
*/
4952
public class JDBCUpsertOutputFormat extends AbstractJDBCOutputFormat<Tuple2<Boolean, Row>> {
@@ -103,7 +106,7 @@ public JDBCUpsertOutputFormat(
103106
*
104107
* @param taskNumber The number of the parallel instance.
105108
* @throws IOException Thrown, if the output could not be opened due to an
106-
* I/O problem.
109+
* I/O problem.
107110
*/
108111
@Override
109112
public void open(int taskNumber, int numTasks) throws IOException {
@@ -167,7 +170,7 @@ public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws IOExcep
167170

168171
private void checkConnectionOpen() {
169172
try {
170-
if (connection.isClosed()) {
173+
if (!connection.isValid(10)) {
171174
LOG.info("db connection reconnect..");
172175
establishConnection();
173176
jdbcWriter.prepareStatement(connection);
@@ -270,7 +273,8 @@ public Builder setFieldTypes(int[] fieldTypes) {
270273
}
271274

272275
/**
273-
* optional, partition Fields
276+
* optional, partition Fields
277+
*
274278
* @param partitionFields
275279
* @return
276280
*/

0 commit comments

Comments
 (0)