|
20 | 20 | package com.dtstack.flink.sql.sink.hbase; |
21 | 21 |
|
22 | 22 | import com.dtstack.flink.sql.dirtyManager.manager.DirtyDataManager; |
| 23 | +import com.dtstack.flink.sql.exception.ExceptionTrace; |
23 | 24 | import com.dtstack.flink.sql.factory.DTThreadFactory; |
24 | 25 | import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat; |
25 | 26 | import com.google.common.collect.Maps; |
@@ -240,12 +241,12 @@ protected synchronized void dealBatchOperation(List<Row> records) { |
240 | 241 | } finally { |
241 | 242 | // 判断数据是否插入成功 |
242 | 243 | for (int i = 0; i < results.length; i++) { |
243 | | - if (results[i] != null) { |
| 244 | + if (results[i] instanceof Exception) { |
244 | 245 | dirtyDataManager.execute(); |
245 | 246 | // 脏数据记录 |
246 | 247 | dirtyDataManager.collectDirtyData( |
247 | | - records.get(i).toString(), |
248 | | - ((Exception) results[i]).getMessage() |
| 248 | + records.get(i).toString(), |
| 249 | + ExceptionTrace.traceOriginalCause((Exception) results[i]) |
249 | 250 | ); |
250 | 251 | outDirtyRecords.inc(); |
251 | 252 | } else { |
@@ -273,8 +274,8 @@ protected void dealInsert(Row record) { |
273 | 274 | } |
274 | 275 | } catch (Exception e) { |
275 | 276 | dirtyDataManager.collectDirtyData( |
276 | | - record.toString() |
277 | | - , e.getMessage()); |
| 277 | + record.toString(), |
| 278 | + ExceptionTrace.traceOriginalCause(e)); |
278 | 279 | outDirtyRecords.inc(); |
279 | 280 | } |
280 | 281 |
|
@@ -348,6 +349,10 @@ public synchronized void close() throws IOException { |
348 | 349 | conn.close(); |
349 | 350 | conn = null; |
350 | 351 | } |
| 352 | + |
| 353 | + if (dirtyDataManager != null) { |
| 354 | + dirtyDataManager.close(); |
| 355 | + } |
351 | 356 | } |
352 | 357 |
|
353 | 358 | private void fillSyncKerberosConfig(org.apache.hadoop.conf.Configuration config, |
|
0 commit comments