Skip to content

Commit 6dfcac8

Browse files
FlechazoWHiLany
authored andcommitted
[hotfix-35526][rdb]调整rdb获取连接方法调用,统一走JdbcConnectUtil的getConnectWithRetry
1 parent bffeb0d commit 6dfcac8

File tree

3 files changed

+23
-29
lines changed

3 files changed

+23
-29
lines changed

rdb/rdb-core/src/main/java/com/dtstack/flink/sql/core/rdb/util/JdbcConnectUtil.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package com.dtstack.flink.sql.core.rdb.util;
2020

2121
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
22+
import com.dtstack.flink.sql.exception.ExceptionTrace;
2223
import com.dtstack.flink.sql.util.ThreadUtil;
2324
import com.google.common.base.Preconditions;
2425
import org.apache.flink.runtime.execution.SuppressRestartsException;
@@ -141,20 +142,19 @@ public static Connection getConnectWithRetry(
141142

142143
for (int i = 0; i < DEFAULT_RETRY_NUM; i++) {
143144
try {
144-
return Objects.isNull(userName) ?
145-
DriverManager.getConnection(url) : DriverManager.getConnection(url, userName, password);
145+
Connection connection =
146+
Objects.isNull(userName) ?
147+
DriverManager.getConnection(url) :
148+
DriverManager.getConnection(url, userName, password);
149+
connection.setAutoCommit(false);
150+
return connection;
146151
} catch (Exception e) {
147-
if (Objects.isNull(e.getCause())) {
148-
errorCause = e.getMessage();
149-
} else {
150-
errorCause = e.getCause().toString();
151-
}
152-
152+
errorCause = ExceptionTrace.traceOriginalCause(e);
153153
LOG.warn(errorMessage + errorCause);
154154
LOG.warn("Connect will retry after [{}] s. Retry time [{}] ...", DEFAULT_RETRY_TIME_WAIT, i + 1);
155155
ThreadUtil.sleepSeconds(DEFAULT_RETRY_TIME_WAIT);
156156
}
157157
}
158-
throw new SuppressRestartsException(new Throwable(errorMessage + errorCause));
158+
throw new SuppressRestartsException(new SQLException(errorMessage + errorCause));
159159
}
160160
}

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/AbstractJDBCOutputFormat.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package com.dtstack.flink.sql.sink.rdb.format;
2020

21-
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2221
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
2322
import org.apache.flink.configuration.Configuration;
2423
import org.apache.flink.types.Row;
@@ -63,16 +62,6 @@ public AbstractJDBCOutputFormat(String username, String password, String driverN
6362
public void configure(Configuration parameters) {
6463
}
6564

66-
protected void establishConnection() throws SQLException, ClassNotFoundException, IOException {
67-
ClassLoaderManager.forName(driverName, getClass().getClassLoader());
68-
if (username == null) {
69-
connection = DriverManager.getConnection(dbURL);
70-
} else {
71-
connection = DriverManager.getConnection(dbURL, username, password);
72-
}
73-
connection.setAutoCommit(false);
74-
}
75-
7665
protected void closeDbConnection() throws IOException {
7766
if (connection != null) {
7867
try {

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/JDBCUpsertOutputFormat.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121

2222
import com.dtstack.flink.sql.core.rdb.JdbcResourceCheck;
23+
import com.dtstack.flink.sql.core.rdb.util.JdbcConnectUtil;
2324
import com.dtstack.flink.sql.enums.EUpdateMode;
2425
import com.dtstack.flink.sql.exception.ExceptionTrace;
2526
import com.dtstack.flink.sql.factory.DTThreadFactory;
@@ -134,7 +135,12 @@ public void open(int taskNumber, int numTasks) throws IOException {
134135

135136
public void openJdbc() throws IOException {
136137
try {
137-
establishConnection();
138+
connection = JdbcConnectUtil.getConnectWithRetry(
139+
driverName,
140+
dbURL,
141+
username,
142+
password
143+
);
138144
initMetric();
139145
if(jdbcWriter == null){
140146
if (StringUtils.equalsIgnoreCase(updateMode, EUpdateMode.APPEND.name()) || keyFields == null || keyFields.length == 0) {
@@ -152,8 +158,6 @@ public void openJdbc() throws IOException {
152158
jdbcWriter.open(connection);
153159
} catch (SQLException sqe) {
154160
throw new IllegalArgumentException("open() failed.", sqe);
155-
} catch (ClassNotFoundException cnfe) {
156-
throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
157161
}
158162

159163
if (flushIntervalMills != 0) {
@@ -190,15 +194,16 @@ private void checkConnectionOpen() {
190194
try {
191195
if (!connection.isValid(10)) {
192196
LOG.info("db connection reconnect..");
193-
establishConnection();
197+
connection = JdbcConnectUtil.getConnectWithRetry(
198+
driverName,
199+
dbURL,
200+
username,
201+
password
202+
);
194203
jdbcWriter.prepareStatement(connection);
195204
}
196205
} catch (SQLException e) {
197206
LOG.error("check connection open failed..", e);
198-
} catch (ClassNotFoundException e) {
199-
LOG.error("load jdbc class error when reconnect db..", e);
200-
} catch (IOException e) {
201-
LOG.error("jdbc io exception..", e);
202207
}
203208
}
204209

@@ -208,7 +213,7 @@ public synchronized void flush() {
208213
batchCount = 0;
209214
} catch (Exception e) {
210215
String errorMsg = String.format(
211-
"Writing records to JDBC failed. %s",
216+
"Writing records to JDBC failed. Cause: [%s]",
212217
ExceptionTrace.traceOriginalCause(e));
213218

214219
ExceptionTrace.dealExceptionWithSuppressStart(

0 commit comments

Comments
 (0)