2020
2121package com .dtstack .flink .sql .sink .hbase ;
2222
23+ import com .dtstack .flink .sql .enums .EUpdateMode ;
2324import com .dtstack .flink .sql .outputformat .AbstractDtRichOutputFormat ;
2425import com .google .common .collect .Maps ;
2526import org .apache .commons .lang3 .StringUtils ;
3031import org .apache .hadoop .hbase .*;
3132import org .apache .hadoop .hbase .client .Connection ;
3233import org .apache .hadoop .hbase .client .ConnectionFactory ;
34+ import org .apache .hadoop .hbase .client .Delete ;
3335import org .apache .hadoop .hbase .client .Put ;
3436import org .apache .hadoop .hbase .client .Table ;
37+ import org .apache .hadoop .hbase .util .Bytes ;
3538import org .apache .hadoop .security .UserGroupInformation ;
3639import org .slf4j .Logger ;
3740import org .slf4j .LoggerFactory ;
@@ -150,8 +153,13 @@ public Connection run() {
150153 @ Override
151154 public void writeRecord (Tuple2 tuple2 ) {
152155 Tuple2 <Boolean , Row > tupleTrans = tuple2 ;
156+ Boolean retract = tupleTrans .f0 ;
153157 Row row = tupleTrans .f1 ;
154- dealInsert (row );
158+ if (retract ) {
159+ dealInsert (row );
160+ } else if (!retract && StringUtils .equalsIgnoreCase (updateMode , EUpdateMode .UPSERT .name ())) {
161+ dealDelete (row );
162+ }
155163 }
156164
157165 protected void dealInsert (Row record ) {
@@ -177,6 +185,26 @@ protected void dealInsert(Row record) {
177185 outRecords .inc ();
178186 }
179187
188+ protected void dealDelete (Row record ) {
189+ String rowKey = buildRowKey (record );
190+ if (!StringUtils .isEmpty (rowKey )) {
191+ Delete delete = new Delete (Bytes .toBytes (rowKey ));
192+ try {
193+ table .delete (delete );
194+ } catch (IOException e ) {
195+ if (outDirtyRecords .getCount () % DIRTY_PRINT_FREQUENCY == 0 || LOG .isDebugEnabled ()) {
196+ LOG .error ("record insert failed ..{}" , record .toString ());
197+ LOG .error ("" , e );
198+ }
199+ outDirtyRecords .inc ();
200+ }
201+ if (outRecords .getCount () % ROW_PRINT_FREQUENCY == 0 ) {
202+ LOG .info (record .toString ());
203+ }
204+ outRecords .inc ();
205+ }
206+ }
207+
180208 private Put getPutByRow (Row record ) {
181209 String rowKey = buildRowKey (record );
182210 if (StringUtils .isEmpty (rowKey )) {
@@ -187,7 +215,7 @@ private Put getPutByRow(Row record) {
187215 Object fieldVal = record .getField (i );
188216 byte [] val = null ;
189217 if (fieldVal != null ) {
190- val = HbaseUtil . toByte ( fieldVal );
218+ val = fieldVal . toString (). getBytes ( );
191219 }
192220 byte [] cf = families [i ].getBytes ();
193221 byte [] qualifier = qualifiers [i ].getBytes ();
0 commit comments