File tree Expand file tree Collapse file tree 2 files changed +14
-13
lines changed
core/src/main/java/com/dtstack/flink/sql/dirtyManager/manager
hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase Expand file tree Collapse file tree 2 files changed +14
-13
lines changed Original file line number Diff line number Diff line change @@ -129,15 +129,17 @@ public void close() {
129129 public void collectDirtyData (String dataInfo , String cause ) {
130130 DirtyDataEntity dirtyDataEntity = new DirtyDataEntity (dataInfo , System .currentTimeMillis (), cause );
131131 try {
132- consumer .collectDirtyData (dirtyDataEntity , blockingInterval );
133132 count .incrementAndGet ();
134- } catch (Exception ignored ) {
133+ consumer .collectDirtyData (dirtyDataEntity , blockingInterval );
134+ } catch (Exception e ) {
135135 LOG .warn ("dirty Data insert error ... Failed number: " + errorCount .incrementAndGet ());
136- LOG .warn ("error dirty data:" + dirtyDataEntity .toString ());
136+ LOG .warn ("error cause: " + e .getMessage ());
137+ LOG .warn ("error dirty data:" + dirtyDataEntity .getDirtyData ());
137138 if (errorCount .get () > Math .ceil (count .longValue () * errorLimitRate )) {
138139 // close consumer and manager
139140 close ();
140- throw new RuntimeException (String .format ("The number of failed number 【%s】 reaches the limit, manager fails" , errorCount .get ()));
141+ throw new RuntimeException (
142+ String .format ("The number of failed number 【%s】 reaches the limit, manager fails" , errorCount .get ()));
141143 }
142144 }
143145 }
Original file line number Diff line number Diff line change @@ -227,27 +227,26 @@ protected synchronized void dealBatchOperation(List<Row> records) {
227227 }
228228 table .batch (puts , results );
229229
230+ // 打印结果
231+ if (outRecords .getCount () % ROW_PRINT_FREQUENCY == 0 ) {
232+ // 只打印最后一条数据
233+ LOG .info (records .get (records .size () - 1 ).toString ());
234+ }
235+ } catch (IOException | InterruptedException e ) {
230236 // 判断数据是否插入成功
231237 for (int i = 0 ; i < results .length ; i ++) {
232- if (results [i ] = = null ) {
238+ if (results [i ] ! = null ) {
233239 // 脏数据记录
234240 dirtyDataManager .collectDirtyData (
235241 records .get (i ).toString (),
236- "Batch Hbase Sink Error"
242+ (( Exception ) results [ i ]). getMessage ()
237243 );
238244 outDirtyRecords .inc ();
239245 } else {
240246 // 输出结果条数记录
241247 outRecords .inc ();
242248 }
243249 }
244- // 打印结果
245- if (outRecords .getCount () % ROW_PRINT_FREQUENCY == 0 ) {
246- // 只打印最后一条数据
247- LOG .info (records .get (records .size () - 1 ).toString ());
248- }
249- } catch (IOException | InterruptedException e ) {
250- LOG .error ("" , e );
251250 } finally {
252251 // 添加完数据之后数据清空records
253252 records .clear ();
You can’t perform that action at this time.
0 commit comments