5959public class HbaseOutputFormat extends AbstractDtRichOutputFormat <Tuple2 <Boolean , Row >> {
6060
6161 private static final Logger LOG = LoggerFactory .getLogger (HbaseOutputFormat .class );
62- private final List <Row > records = new ArrayList <>();
6362 private String host ;
6463 private String zkParent ;
6564 private String rowkey ;
@@ -79,6 +78,8 @@ public class HbaseOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolean
7978 private transient Connection conn ;
8079 private transient Table table ;
8180 private transient ChoreService choreService ;
81+ private transient List <Row > records ;
82+ private transient volatile boolean closed = false ;
8283 /**
8384 * 批量写入的参数
8485 */
@@ -106,6 +107,7 @@ public void configure(Configuration parameters) {
106107 @ Override
107108 public void open (int taskNumber , int numTasks ) throws IOException {
108109 LOG .warn ("---open---" );
110+ records = new ArrayList <>();
109111 conf = HBaseConfiguration .create ();
110112 openConn ();
111113 table = conn .getTable (TableName .valueOf (tableName ));
@@ -145,10 +147,9 @@ private void initScheduledTask(Long batchWaitInterval) {
145147
146148 this .scheduledFuture = this .scheduler .scheduleWithFixedDelay (
147149 () -> {
148- synchronized (this ) {
150+ synchronized (HbaseOutputFormat . this ) {
149151 if (!records .isEmpty ()) {
150152 dealBatchOperation (records );
151- records .clear ();
152153 }
153154 }
154155 }, batchWaitInterval , batchWaitInterval , TimeUnit .MILLISECONDS
@@ -198,7 +199,7 @@ private void openKerberosConn() throws Exception {
198199 public void writeRecord (Tuple2 <Boolean , Row > record ) {
199200 if (record .f0 ) {
200201 if (this .batchSize != 0 ) {
201- writeBatchRecord (Row . copy ( record .f1 ) );
202+ writeBatchRecord (record .f1 );
202203 } else {
203204 dealInsert (record .f1 );
204205 }
@@ -210,12 +211,10 @@ public void writeBatchRecord(Row row) {
210211 // 数据累计到batchSize之后开始处理
211212 if (records .size () == this .batchSize ) {
212213 dealBatchOperation (records );
213- // 添加完数据之后数据清空records
214- records .clear ();
215214 }
216215 }
217216
218- protected void dealBatchOperation (List <Row > records ) {
217+ protected synchronized void dealBatchOperation (List <Row > records ) {
219218 // A null in the result array means that the call for that action failed, even after retries.
220219 Object [] results = new Object [records .size ()];
221220 try {
@@ -245,6 +244,9 @@ protected void dealBatchOperation(List<Row> records) {
245244 }
246245 } catch (IOException | InterruptedException e ) {
247246 LOG .error ("" , e );
247+ } finally {
248+ // 添加完数据之后数据清空records
249+ records .clear ();
248250 }
249251 }
250252
@@ -318,7 +320,12 @@ private Map<String, Object> rowConvertMap(Row record) {
318320 }
319321
320322 @ Override
321- public void close () throws IOException {
323+ public synchronized void close () throws IOException {
324+ if (closed ) {
325+ return ;
326+ }
327+
328+ closed = true ;
322329 if (!records .isEmpty ()) {
323330 dealBatchOperation (records );
324331 }
@@ -334,7 +341,6 @@ public void close() throws IOException {
334341 conn .close ();
335342 conn = null ;
336343 }
337-
338344 }
339345
340346 private void fillSyncKerberosConfig (org .apache .hadoop .conf .Configuration config ,
0 commit comments