Skip to content

Commit c76fb0b

Browse files
committed
[hotfix-35512][impala] 1.增加批量写入时出现异常,将批量sql转为单条sql执行 2.fix inserting the value like '[a,b]' or '{a, b}' or ' ' error with impala sink.
1 parent f7d94b0 commit c76fb0b

File tree

1 file changed

+11
-10
lines changed

1 file changed

+11
-10
lines changed

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaOutputFormat.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package com.dtstack.flink.sql.sink.impala;
2020

2121
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
22+
import com.dtstack.flink.sql.core.rdb.util.JdbcConnectUtil;
2223
import com.dtstack.flink.sql.exception.ExceptionTrace;
2324
import com.dtstack.flink.sql.factory.DTThreadFactory;
2425
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
@@ -282,8 +283,8 @@ private void executeUpdateBatch() throws SQLException {
282283
rows.clear();
283284
} catch (Exception e) {
284285
LOG.debug("impala jdbc execute batch error ", e);
285-
JDBCUtils.rollBack(connection);
286-
JDBCUtils.commit(connection);
286+
JdbcConnectUtil.rollBack(connection);
287+
JdbcConnectUtil.commit(connection);
287288
updateStatement.clearBatch();
288289
executeUpdate(connection);
289290
}
@@ -294,10 +295,10 @@ public void executeUpdate(Connection connection) {
294295
try {
295296
setRecordToStatement(updateStatement, JDBCTypeConvertUtils.getSqlTypeFromFieldType(fieldTypes), row);
296297
updateStatement.executeUpdate();
297-
JDBCUtils.commit(connection);
298+
JdbcConnectUtil.commit(connection);
298299
} catch (Exception e) {
299-
JDBCUtils.rollBack(connection);
300-
JDBCUtils.commit(connection);
300+
JdbcConnectUtil.rollBack(connection);
301+
JdbcConnectUtil.commit(connection);
301302
if (metricOutputFormat.outDirtyRecords.getCount() % DIRTY_DATA_PRINT_FREQUENCY == 0 || LOG.isDebugEnabled()) {
302303
LOG.error("record insert failed ,this row is {}", row.toString());
303304
LOG.error("", e);
@@ -509,16 +510,16 @@ private void dealBatchSqlError(List<String> rowData,
509510
Statement statement,
510511
String templateSql) {
511512
String errorMsg = "Insert into impala error. \nCause: [%s]\nRow: [%s]";
512-
JDBCUtils.rollBack(connection);
513-
JDBCUtils.commit(connection);
513+
JdbcConnectUtil.rollBack(connection);
514+
JdbcConnectUtil.commit(connection);
514515
for (String rowDatum : rowData) {
515516
String executeSql = templateSql.replace(VALUES_CONDITION, rowDatum);
516517
try {
517518
statement.execute(executeSql);
518-
JDBCUtils.commit(connection);
519+
JdbcConnectUtil.commit(connection);
519520
} catch (SQLException e) {
520-
JDBCUtils.rollBack(connection);
521-
JDBCUtils.commit(connection);
521+
JdbcConnectUtil.rollBack(connection);
522+
JdbcConnectUtil.commit(connection);
522523
if (metricOutputFormat.outDirtyRecords.getCount() % DIRTY_DATA_PRINT_FREQUENCY == 0 || LOG.isDebugEnabled()) {
523524
LOG.error(
524525
String.format(

0 commit comments

Comments
 (0)