Skip to content

Commit adbb5d7

Browse files
author
gituser
committed
Merge branch 'hotfix_1.10_4.1.x_35526' into 1.10_release_4.1.x
2 parents c76fb0b + 402670c commit adbb5d7

File tree

6 files changed

+52
-51
lines changed

6 files changed

+52
-51
lines changed

rdb/rdb-core/src/main/java/com/dtstack/flink/sql/core/rdb/JdbcResourceCheck.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package com.dtstack.flink.sql.core.rdb;
2020

21-
import com.dtstack.flink.sql.core.rdb.util.JdbcConnectUtil;
21+
import com.dtstack.flink.sql.core.rdb.util.JdbcConnectionUtil;
2222
import com.dtstack.flink.sql.resource.ResourceCheck;
2323
import org.apache.commons.lang.StringUtils;
2424
import org.apache.flink.runtime.execution.SuppressRestartsException;
@@ -109,7 +109,7 @@ public void checkPrivilege(
109109
, String schema
110110
, List<String> privilegeList) {
111111
Connection connection =
112-
JdbcConnectUtil.getConnectWithRetry(driverName, url, userName, password);
112+
JdbcConnectionUtil.getConnectionWithRetry(driverName, url, userName, password);
113113
Statement statement = null;
114114
String tableInfo = Objects.isNull(schema) ? tableName : schema + "." + tableName;
115115
String privilege = null;
@@ -133,7 +133,7 @@ public void checkPrivilege(
133133

134134
throw new SuppressRestartsException(new IllegalArgumentException(sqlException.getMessage()));
135135
} finally {
136-
JdbcConnectUtil.closeConnectionResource(null, statement, connection, false);
136+
JdbcConnectionUtil.closeConnectionResource(null, statement, connection, false);
137137
}
138138
}
139139
}

rdb/rdb-core/src/main/java/com/dtstack/flink/sql/core/rdb/util/JdbcConnectUtil.java renamed to rdb/rdb-core/src/main/java/com/dtstack/flink/sql/core/rdb/util/JdbcConnectionUtil.java

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package com.dtstack.flink.sql.core.rdb.util;
2020

2121
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
22+
import com.dtstack.flink.sql.exception.ExceptionTrace;
2223
import com.dtstack.flink.sql.util.ThreadUtil;
2324
import com.google.common.base.Preconditions;
2425
import org.apache.flink.runtime.execution.SuppressRestartsException;
@@ -37,11 +38,11 @@
3738
* Date 2020-12-25
3839
* Company dtstack
3940
*/
40-
public class JdbcConnectUtil {
41+
public class JdbcConnectionUtil {
4142
private static final int DEFAULT_RETRY_NUM = 3;
4243
private static final long DEFAULT_RETRY_TIME_WAIT = 3L;
4344
private static final int DEFAULT_VALID_TIME = 10;
44-
private static final Logger LOG = LoggerFactory.getLogger(JdbcConnectUtil.class);
45+
private static final Logger LOG = LoggerFactory.getLogger(JdbcConnectionUtil.class);
4546

4647
/**
4748
* 关闭连接资源
@@ -118,43 +119,49 @@ public static void rollBack(Connection conn) {
118119
}
119120

120121
/**
121-
* get connect from datasource and retry when failed.
122+
* get connection from datasource and retry when failed.
122123
*
123124
* @param driverName driver name for rdb datasource
124125
* @param url connect url
125126
* @param userName connect user name
126127
* @param password password for user name
127128
* @return a valid connection
128129
*/
129-
public static Connection getConnectWithRetry(
130-
String driverName
131-
, String url
132-
, String userName
133-
, String password) {
134-
String errorMessage = "\nGet connect failed with properties: \nurl: " + url
135-
+ (Objects.isNull(userName) ? "" : "\nuserName: " + userName
136-
+ "\nerror message: ");
137-
String errorCause = null;
130+
public static Connection getConnectionWithRetry(String driverName,
131+
String url,
132+
String userName,
133+
String password) {
134+
String message = "Get connection failed. " +
135+
"\nurl: [%s]" +
136+
"\nuserName: [%s]" +
137+
"\ncause: [%s]";
138+
String errorCause;
139+
String errorMessage = "";
138140

139-
ClassLoaderManager.forName(driverName, JdbcConnectUtil.class.getClassLoader());
141+
ClassLoaderManager.forName(driverName, JdbcConnectionUtil.class.getClassLoader());
140142
Preconditions.checkNotNull(url, "url can't be null!");
141143

142144
for (int i = 0; i < DEFAULT_RETRY_NUM; i++) {
143145
try {
144-
return Objects.isNull(userName) ?
145-
DriverManager.getConnection(url) : DriverManager.getConnection(url, userName, password);
146+
Connection connection =
147+
Objects.isNull(userName) ?
148+
DriverManager.getConnection(url) :
149+
DriverManager.getConnection(url, userName, password);
150+
connection.setAutoCommit(false);
151+
return connection;
146152
} catch (Exception e) {
147-
if (Objects.isNull(e.getCause())) {
148-
errorCause = e.getMessage();
149-
} else {
150-
errorCause = e.getCause().toString();
151-
}
152-
153-
LOG.warn(errorMessage + errorCause);
153+
errorCause = ExceptionTrace.traceOriginalCause(e);
154+
errorMessage = String.format(
155+
message,
156+
url,
157+
userName,
158+
errorCause
159+
);
160+
LOG.warn(errorMessage);
154161
LOG.warn("Connect will retry after [{}] s. Retry time [{}] ...", DEFAULT_RETRY_TIME_WAIT, i + 1);
155162
ThreadUtil.sleepSeconds(DEFAULT_RETRY_TIME_WAIT);
156163
}
157164
}
158-
throw new SuppressRestartsException(new Throwable(errorMessage + errorCause));
165+
throw new SuppressRestartsException(new SQLException(errorMessage));
159166
}
160167
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package com.dtstack.flink.sql.side.rdb.all;
2020

2121
import com.dtstack.flink.sql.core.rdb.JdbcResourceCheck;
22-
import com.dtstack.flink.sql.core.rdb.util.JdbcConnectUtil;
22+
import com.dtstack.flink.sql.core.rdb.util.JdbcConnectionUtil;
2323
import com.dtstack.flink.sql.side.BaseAllReqRow;
2424
import com.dtstack.flink.sql.side.BaseSideInfo;
2525
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
@@ -208,7 +208,7 @@ private void queryAndFillData(Map<String, List<Map<String, Object>>> tmpCache, C
208208
tmpCache.computeIfAbsent(cacheKey, key -> Lists.newArrayList())
209209
.add(oneRow);
210210
}
211-
JdbcConnectUtil.closeConnectionResource(resultSet, statement, connection, false);
211+
JdbcConnectionUtil.closeConnectionResource(resultSet, statement, connection, false);
212212
}
213213

214214
public int getFetchSize() {

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/AbstractJDBCOutputFormat.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package com.dtstack.flink.sql.sink.rdb.format;
2020

21-
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2221
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
2322
import org.apache.flink.configuration.Configuration;
2423
import org.apache.flink.types.Row;
@@ -63,16 +62,6 @@ public AbstractJDBCOutputFormat(String username, String password, String driverN
6362
public void configure(Configuration parameters) {
6463
}
6564

66-
protected void establishConnection() throws SQLException, ClassNotFoundException, IOException {
67-
ClassLoaderManager.forName(driverName, getClass().getClassLoader());
68-
if (username == null) {
69-
connection = DriverManager.getConnection(dbURL);
70-
} else {
71-
connection = DriverManager.getConnection(dbURL, username, password);
72-
}
73-
connection.setAutoCommit(false);
74-
}
75-
7665
protected void closeDbConnection() throws IOException {
7766
if (connection != null) {
7867
try {

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/JDBCUpsertOutputFormat.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121

2222
import com.dtstack.flink.sql.core.rdb.JdbcResourceCheck;
23+
import com.dtstack.flink.sql.core.rdb.util.JdbcConnectionUtil;
2324
import com.dtstack.flink.sql.enums.EUpdateMode;
2425
import com.dtstack.flink.sql.exception.ExceptionTrace;
2526
import com.dtstack.flink.sql.factory.DTThreadFactory;
@@ -134,7 +135,12 @@ public void open(int taskNumber, int numTasks) throws IOException {
134135

135136
public void openJdbc() throws IOException {
136137
try {
137-
establishConnection();
138+
connection = JdbcConnectionUtil.getConnectionWithRetry(
139+
driverName,
140+
dbURL,
141+
username,
142+
password
143+
);
138144
initMetric();
139145
if(jdbcWriter == null){
140146
if (StringUtils.equalsIgnoreCase(updateMode, EUpdateMode.APPEND.name()) || keyFields == null || keyFields.length == 0) {
@@ -152,8 +158,6 @@ public void openJdbc() throws IOException {
152158
jdbcWriter.open(connection);
153159
} catch (SQLException sqe) {
154160
throw new IllegalArgumentException("open() failed.", sqe);
155-
} catch (ClassNotFoundException cnfe) {
156-
throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
157161
}
158162

159163
if (flushIntervalMills != 0) {
@@ -190,15 +194,16 @@ private void checkConnectionOpen() {
190194
try {
191195
if (!connection.isValid(10)) {
192196
LOG.info("db connection reconnect..");
193-
establishConnection();
197+
connection = JdbcConnectionUtil.getConnectionWithRetry(
198+
driverName,
199+
dbURL,
200+
username,
201+
password
202+
);
194203
jdbcWriter.prepareStatement(connection);
195204
}
196205
} catch (SQLException e) {
197206
LOG.error("check connection open failed..", e);
198-
} catch (ClassNotFoundException e) {
199-
LOG.error("load jdbc class error when reconnect db..", e);
200-
} catch (IOException e) {
201-
LOG.error("jdbc io exception..", e);
202207
}
203208
}
204209

@@ -208,7 +213,7 @@ public synchronized void flush() {
208213
batchCount = 0;
209214
} catch (Exception e) {
210215
String errorMsg = String.format(
211-
"Writing records to JDBC failed. %s",
216+
"Writing records to JDBC failed. Cause: [%s]",
212217
ExceptionTrace.traceOriginalCause(e));
213218

214219
ExceptionTrace.dealExceptionWithSuppressStart(

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/writer/JDBCWriter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package com.dtstack.flink.sql.sink.rdb.writer;
2020

21-
import com.dtstack.flink.sql.core.rdb.util.JdbcConnectUtil;
21+
import com.dtstack.flink.sql.core.rdb.util.JdbcConnectionUtil;
2222
import com.dtstack.flink.sql.exception.ExceptionTrace;
2323
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
2424
import org.apache.flink.api.java.tuple.Tuple2;
@@ -82,8 +82,8 @@ default void dealExecuteError(Connection connection,
8282
Row row,
8383
long errorLimit,
8484
Logger LOG) {
85-
JdbcConnectUtil.rollBack(connection);
86-
JdbcConnectUtil.commit(connection);
85+
JdbcConnectionUtil.rollBack(connection);
86+
JdbcConnectionUtil.commit(connection);
8787

8888
if (metricOutputFormat.outDirtyRecords.getCount() % DIRTYDATA_PRINT_FREQUENTY == 0 ||
8989
LOG.isDebugEnabled()) {

0 commit comments

Comments
 (0)