Skip to content

Commit 4f0d359

Browse files
committed
jdbc connection check
1 parent 82e5534 commit 4f0d359

File tree

4 files changed

+28
-1
lines changed

4 files changed

+28
-1
lines changed

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/JDBCUpsertOutputFormat.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ private void checkConnectionOpen() {
170170
if (connection.isClosed()) {
171171
LOG.info("db connection reconnect..");
172172
establishConnection();
173+
jdbcWriter.prepareStatement(connection);
173174
}
174175
} catch (SQLException e) {
175176
LOG.error("check connection open failed..", e);

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/writer/AppendOnlyWriter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ public AppendOnlyWriter(String insertSQL, int[] fieldTypes, DtRichOutputFormat m
5858
@Override
5959
public void open(Connection connection) throws SQLException {
6060
this.rows = new ArrayList();
61+
prepareStatement(connection);
62+
}
63+
64+
@Override
65+
public void prepareStatement(Connection connection) throws SQLException {
6166
this.statement = connection.prepareStatement(insertSQL);
6267
}
6368

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/writer/JDBCWriter.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,16 @@ public interface JDBCWriter extends Serializable {
3333
int DIRTYDATA_PRINT_FREQUENTY = 1000;
3434

3535
/**
36-
* Open the writer by JDBC Connection. It can create Statement from Connection.
36+
* Open the writer by JDBC Connection.
3737
*/
3838
void open(Connection connection) throws SQLException;
3939

40+
/**
41+
* Create Statement from Connection. Used where the connection is created
42+
* @throws SQLException
43+
*/
44+
void prepareStatement(Connection connection) throws SQLException;
45+
4046
/**
4147
* Add record to writer, the writer may cache the data.
4248
*/

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/writer/UpsertWriter.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,11 @@ private UpsertWriter(int[] fieldTypes, int[] pkFields, int[] pkTypes, String del
110110
@Override
111111
public void open(Connection connection) throws SQLException {
112112
this.keyToRows = new HashMap<>();
113+
prepareStatement(connection);
114+
}
115+
116+
@Override
117+
public void prepareStatement(Connection connection) throws SQLException {
113118
this.deleteStatement = connection.prepareStatement(deleteSQL);
114119
}
115120

@@ -224,6 +229,11 @@ private UpsertWriterUsingUpsertStatement(
224229
@Override
225230
public void open(Connection connection) throws SQLException {
226231
super.open(connection);
232+
}
233+
234+
@Override
235+
public void prepareStatement(Connection connection) throws SQLException {
236+
super.prepareStatement(connection);
227237
upsertStatement = connection.prepareStatement(upsertSQL);
228238
}
229239

@@ -284,6 +294,11 @@ private UpsertWriterUsingInsertUpdateStatement(
284294
@Override
285295
public void open(Connection connection) throws SQLException {
286296
super.open(connection);
297+
}
298+
299+
@Override
300+
public void prepareStatement(Connection connection) throws SQLException {
301+
super.prepareStatement(connection);
287302
existStatement = connection.prepareStatement(existSQL);
288303
insertStatement = connection.prepareStatement(insertSQL);
289304
updateStatement = connection.prepareStatement(updateSQL);

0 commit comments

Comments
 (0)