Skip to content

Commit fd17f60

Browse files
committed
Merge branch '1.5_v3.7.1' into 1.5_v3.7.2
2 parents 25675b0 + c315e60 commit fd17f60

File tree

1 file changed

+14
-8
lines changed

1 file changed

+14
-8
lines changed

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public class RetractJDBCOutputFormat extends MetricOutputFormat {
6363
private long batchWaitInterval = 10000l;
6464

6565
// batchNum
66-
private int batchInterval = 5000;
66+
private int batchNum = 100;
6767
private String insertQuery;
6868
public int[] typesArray;
6969

@@ -98,7 +98,7 @@ public void configure(Configuration parameters) {
9898
@Override
9999
public void open(int taskNumber, int numTasks) throws IOException {
100100
try {
101-
LOG.info("PreparedStatement execute batch num is {}", batchInterval);
101+
LOG.info("PreparedStatement execute batch num is {}", batchNum);
102102
dbConn = establishConnection();
103103
initMetric();
104104
if (dbConn.getMetaData().getTables(null, null, tableName, null).next()) {
@@ -110,7 +110,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
110110
throw new SQLException("Table " + tableName + " doesn't exist");
111111
}
112112

113-
if (batchWaitInterval > 0 && batchInterval > 1) {
113+
if (batchWaitInterval > 0 && batchNum > 1) {
114114
LOG.info("open batch wait interval scheduled, interval is {} ms", batchWaitInterval);
115115

116116
timerService = new ScheduledThreadPoolExecutor(1);
@@ -182,13 +182,13 @@ public void writeRecord(Tuple2 tuple2) {
182182
private void insertWrite(Row row) {
183183
checkConnectionOpen(dbConn);
184184
try {
185-
if (batchInterval == 1) {
185+
if (batchNum == 1) {
186186
writeSingleRecord(row);
187187
} else {
188188
updatePreparedStmt(row, upload);
189189
rows.add(row);
190190
upload.addBatch();
191-
if (rows.size() >= batchInterval) {
191+
if (rows.size() >= batchNum) {
192192
submitExecuteBatch();
193193
}
194194
}
@@ -413,9 +413,6 @@ public void setDbSink(RdbSink dbSink) {
413413
this.dbSink = dbSink;
414414
}
415415

416-
public void setBatchInterval(int batchInterval) {
417-
this.batchInterval = batchInterval;
418-
}
419416

420417
public void setInsertQuery(String insertQuery) {
421418
this.insertQuery = insertQuery;
@@ -445,6 +442,15 @@ public Map<String, List<String>> getRealIndexes() {
445442
return realIndexes;
446443
}
447444

445+
446+
public void setBatchNum(int batchNum) {
447+
this.batchNum = batchNum;
448+
}
449+
450+
public void setBatchWaitInterval(long batchWaitInterval) {
451+
this.batchWaitInterval = batchWaitInterval;
452+
}
453+
448454
public List<String> getFullField() {
449455
return fullField;
450456
}

0 commit comments

Comments
 (0)