Skip to content

Commit 8d6cc79

Browse files
committed
Merge branch 'hotfix_1.10_4.0.x_32869' into '1.10_release_4.0.x'
[fix-32869][rdb]fix rdb task hangs after connect retry. See merge request dt-insight-engine/flinkStreamSQL!193
2 parents d878bd3 + 36c4cdf commit 8d6cc79

File tree

2 files changed

+100
-27
lines changed

2 files changed

+100
-27
lines changed

core/src/main/java/com/dtstack/flink/sql/util/JDBCUtils.java

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,23 @@
1919

2020
package com.dtstack.flink.sql.util;
2121

22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
import java.sql.Connection;
2226
import java.sql.DriverManager;
27+
import java.sql.ResultSet;
28+
import java.sql.SQLException;
29+
import java.sql.Statement;
30+
import java.util.Objects;
2331

2432
public class JDBCUtils {
33+
private static final Logger LOG = LoggerFactory.getLogger(JDBCUtils.class);
34+
2535
private static final Object LOCK = new Object();
2636

27-
public static void forName(String clazz, ClassLoader classLoader) {
28-
synchronized (LOCK){
37+
public static void forName(String clazz, ClassLoader classLoader) {
38+
synchronized (LOCK) {
2939
try {
3040
Class.forName(clazz, true, classLoader);
3141
DriverManager.setLoginTimeout(10);
@@ -44,4 +54,75 @@ public synchronized static void forName(String clazz) {
4454
throw new RuntimeException(e);
4555
}
4656
}
57+
58+
/**
59+
* 关闭连接资源
60+
*
61+
* @param rs ResultSet
62+
* @param stmt Statement
63+
* @param conn Connection
64+
* @param commit
65+
*/
66+
public static void closeConnectionResource(ResultSet rs, Statement stmt, Connection conn, boolean commit) {
67+
if (Objects.nonNull(rs)) {
68+
try {
69+
rs.close();
70+
} catch (SQLException e) {
71+
LOG.warn("Close resultSet error: {}", e.getMessage());
72+
}
73+
}
74+
75+
if (Objects.nonNull(stmt)) {
76+
try {
77+
stmt.close();
78+
} catch (SQLException e) {
79+
LOG.warn("Close statement error:{}", e.getMessage());
80+
}
81+
}
82+
83+
if (Objects.nonNull(conn)) {
84+
try {
85+
if (commit) {
86+
commit(conn);
87+
} else {
88+
rollBack(conn);
89+
}
90+
91+
conn.close();
92+
} catch (SQLException e) {
93+
LOG.warn("Close connection error:{}", e.getMessage());
94+
}
95+
}
96+
}
97+
98+
/**
99+
* 手动提交事物
100+
*
101+
* @param conn Connection
102+
*/
103+
public static void commit(Connection conn) {
104+
try {
105+
if (!conn.isClosed() && !conn.getAutoCommit()) {
106+
conn.commit();
107+
}
108+
} catch (SQLException e) {
109+
LOG.warn("commit error:{}", e.getMessage());
110+
}
111+
}
112+
113+
/**
114+
* 手动回滚事物
115+
*
116+
* @param conn Connection
117+
*/
118+
public static void rollBack(Connection conn) {
119+
try {
120+
if (!conn.isClosed() && !conn.getAutoCommit()) {
121+
conn.rollback();
122+
}
123+
} catch (SQLException e) {
124+
LOG.warn("rollBack error:{}", e.getMessage());
125+
}
126+
}
127+
47128
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java

Lines changed: 17 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.dtstack.flink.sql.side.BaseSideInfo;
2323
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
2424
import com.dtstack.flink.sql.side.rdb.util.SwitchUtil;
25+
import com.dtstack.flink.sql.util.JDBCUtils;
2526
import com.dtstack.flink.sql.util.RowDataComplete;
2627
import com.dtstack.flink.sql.util.RowDataConvert;
2728
import com.google.common.collect.Lists;
@@ -125,36 +126,26 @@ public void flatMap(Row value, Collector<BaseRow> out) throws Exception {
125126
}
126127

127128
private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQLException {
128-
RdbSideTableInfo tableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
129-
Connection connection = null;
129+
queryAndFillData(tmpCache, getConnectionWithRetry((RdbSideTableInfo) sideInfo.getSideTableInfo()));
130+
}
130131

131-
try {
132-
for (int i = 0; i < CONN_RETRY_NUM; i++) {
132+
private Connection getConnectionWithRetry(RdbSideTableInfo tableInfo) throws SQLException {
133+
String connInfo = "url:" + tableInfo.getUrl() + "; userName:" + tableInfo.getUserName();
134+
String errorMsg = null;
135+
for (int i = 0; i < CONN_RETRY_NUM; i++) {
136+
try {
137+
return getConn(tableInfo.getUrl(), tableInfo.getUserName(), tableInfo.getPassword());
138+
} catch (Exception e) {
133139
try {
134-
connection = getConn(tableInfo.getUrl(), tableInfo.getUserName(), tableInfo.getPassword());
135-
break;
136-
} catch (Exception e) {
137-
if (i == CONN_RETRY_NUM - 1) {
138-
throw new RuntimeException("", e);
139-
}
140-
try {
141-
String connInfo = "url:" + tableInfo.getUrl() + ";userName:" + tableInfo.getUserName() + ",pwd:" + tableInfo.getPassword();
142-
LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo);
143-
Thread.sleep(5 * 1000);
144-
} catch (InterruptedException e1) {
145-
LOG.error("", e1);
146-
}
140+
LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo);
141+
errorMsg = e.getCause().toString();
142+
Thread.sleep(5 * 1000);
143+
} catch (InterruptedException e1) {
144+
LOG.error("", e1);
147145
}
148146
}
149-
queryAndFillData(tmpCache, connection);
150-
} catch (Exception e) {
151-
LOG.error("", e);
152-
throw new SQLException(e);
153-
} finally {
154-
if (connection != null) {
155-
connection.close();
156-
}
157147
}
148+
throw new SQLException("get conn fail. connInfo: " + connInfo + "\ncause by: " + errorMsg);
158149
}
159150

160151
private void queryAndFillData(Map<String, List<Map<String, Object>>> tmpCache, Connection connection) throws SQLException {
@@ -188,6 +179,7 @@ private void queryAndFillData(Map<String, List<Map<String, Object>>> tmpCache, C
188179
tmpCache.computeIfAbsent(cacheKey, key -> Lists.newArrayList())
189180
.add(oneRow);
190181
}
182+
JDBCUtils.closeConnectionResource(resultSet, statement, connection, false);
191183
}
192184

193185
public int getFetchSize() {

0 commit comments

Comments
 (0)