1616 * limitations under the License.
1717 */
1818
19-
2019
2120package com .dtstack .flink .sql .sink .hbase ;
2221
23- import com .dtstack .flink .sql .enums .EUpdateMode ;
2422import com .dtstack .flink .sql .outputformat .AbstractDtRichOutputFormat ;
2523import com .google .common .collect .Maps ;
2624import org .apache .commons .lang3 .StringUtils ;
2725import org .apache .flink .api .java .tuple .Tuple2 ;
2826import org .apache .flink .configuration .Configuration ;
27+ import org .apache .flink .runtime .util .ExecutorThreadFactory ;
2928import org .apache .flink .types .Row ;
3029import org .apache .flink .util .Preconditions ;
3130import org .apache .hadoop .hbase .AuthUtil ;
3534import org .apache .hadoop .hbase .TableName ;
3635import org .apache .hadoop .hbase .client .Connection ;
3736import org .apache .hadoop .hbase .client .ConnectionFactory ;
38- import org .apache .hadoop .hbase .client .Delete ;
3937import org .apache .hadoop .hbase .client .Put ;
4038import org .apache .hadoop .hbase .client .Table ;
41- import org .apache .hadoop .hbase .util .Bytes ;
4239import org .apache .hadoop .security .UserGroupInformation ;
4340import org .slf4j .Logger ;
4441import org .slf4j .LoggerFactory ;
4542
4643import java .io .File ;
4744import java .io .IOException ;
4845import java .security .PrivilegedAction ;
49- import java .util .LinkedHashMap ;
46+ import java .util .ArrayList ;
5047import java .util .LinkedList ;
5148import java .util .List ;
5249import java .util .Map ;
50+ import java .util .concurrent .CopyOnWriteArrayList ;
51+ import java .util .concurrent .Executors ;
52+ import java .util .concurrent .ScheduledExecutorService ;
53+ import java .util .concurrent .ScheduledFuture ;
54+ import java .util .concurrent .TimeUnit ;
5355
5456/**
5557 * @author: jingzhen@dtstack.com
@@ -85,6 +87,15 @@ public class HbaseOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
8587
8688 private transient ChoreService choreService ;
8789
90+ private Integer batchSize ;
91+ private Long batchWaitInterval ;
92+
93+ private transient ScheduledExecutorService executor ;
94+ private transient ScheduledFuture scheduledFuture ;
95+
96+ private final List <Row > records = new CopyOnWriteArrayList <>();
97+
98+
8899 @ Override
89100 public void configure (Configuration parameters ) {
90101 LOG .warn ("---configure---" );
@@ -98,10 +109,22 @@ public void open(int taskNumber, int numTasks) throws IOException {
98109 table = conn .getTable (TableName .valueOf (tableName ));
99110 LOG .warn ("---open end(get table from hbase) ---" );
100111 initMetric ();
112+
113+ // 设置定时任务
114+ if (batchWaitInterval > 0 ) {
115+ this .executor = Executors .newScheduledThreadPool (
116+ 1 , new ExecutorThreadFactory ("hbase-sink-flusher" ));
117+ this .scheduledFuture = this .executor .scheduleAtFixedRate (() -> {
118+ if (!records .isEmpty ()) {
119+ dealBatchOperation (records );
120+ records .clear ();
121+ }
122+ }, batchWaitInterval , batchWaitInterval , TimeUnit .MILLISECONDS );
123+ }
101124 }
102125
103- private void openConn (){
104- try {
126+ private void openConn () {
127+ try {
105128 if (kerberosAuthEnable ) {
106129 LOG .info ("open kerberos conn" );
107130 openKerberosConn ();
@@ -111,7 +134,7 @@ private void openConn(){
111134 conf .set (HbaseConfigUtils .KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM , zkParent );
112135 conn = ConnectionFactory .createConnection (conf );
113136 }
114- }catch (Exception e ){
137+ } catch (Exception e ) {
115138 throw new RuntimeException (e );
116139 }
117140
@@ -151,16 +174,59 @@ private void openKerberosConn() throws Exception {
151174 }
152175
153176
154-
155177 @ Override
156178 public void writeRecord (Tuple2 tuple2 ) {
157179 Tuple2 <Boolean , Row > tupleTrans = tuple2 ;
158180 Boolean retract = tupleTrans .f0 ;
159181 Row row = tupleTrans .f1 ;
160182 if (retract ) {
161- dealInsert (row );
162- } else if (!retract && StringUtils .equalsIgnoreCase (updateMode , EUpdateMode .UPSERT .name ())) {
163- dealDelete (row );
183+ if (this .batchSize != 0 ) {
184+ writeBatchRecord (row );
185+ } else {
186+ dealInsert (row );
187+ }
188+ }
189+ }
190+
191+ public void writeBatchRecord (Row row ) {
192+ records .add (row );
193+ // 数据累计到batchSize之后开始处理
194+ if (records .size () == this .batchSize ) {
195+ dealBatchOperation (records );
196+ // 添加完数据之后数据清空records
197+ records .clear ();
198+ }
199+ }
200+
201+ protected void dealBatchOperation (List <Row > records ) {
202+ // A null in the result array means that the call for that action failed, even after retries.
203+ Object [] results = new Object [records .size ()];
204+ try {
205+ List <Put > puts = new ArrayList <>();
206+ for (Row record : records ) {
207+ puts .add (getPutByRow (record ));
208+ }
209+ table .batch (puts , results );
210+
211+ // 判断数据是否插入成功
212+ for (int i = 0 ; i < results .length ; i ++) {
213+ if (results [i ] == null ) {
214+ if (outDirtyRecords .getCount () % DIRTY_PRINT_FREQUENCY == 0 || LOG .isDebugEnabled ()) {
215+ LOG .error ("record insert failed ..{}" , records .get (i ).toString ());
216+ }
217+ // 脏数据记录
218+ outDirtyRecords .inc ();
219+ } else {
220+ // 输出结果条数记录
221+ outRecords .inc ();
222+ }
223+ }
224+ // 打印结果
225+ if (outRecords .getCount () % ROW_PRINT_FREQUENCY == 0 ) {
226+ LOG .info (records .toString ());
227+ }
228+ } catch (IOException | InterruptedException e ) {
229+ LOG .error ("" , e );
164230 }
165231 }
166232
@@ -187,26 +253,6 @@ protected void dealInsert(Row record) {
187253 outRecords .inc ();
188254 }
189255
190- protected void dealDelete (Row record ) {
191- String rowKey = buildRowKey (record );
192- if (!StringUtils .isEmpty (rowKey )) {
193- Delete delete = new Delete (Bytes .toBytes (rowKey ));
194- try {
195- table .delete (delete );
196- } catch (IOException e ) {
197- if (outDirtyRecords .getCount () % DIRTY_PRINT_FREQUENCY == 0 || LOG .isDebugEnabled ()) {
198- LOG .error ("record insert failed ..{}" , record .toString ());
199- LOG .error ("" , e );
200- }
201- outDirtyRecords .inc ();
202- }
203- if (outRecords .getCount () % ROW_PRINT_FREQUENCY == 0 ) {
204- LOG .info (record .toString ());
205- }
206- outRecords .inc ();
207- }
208- }
209-
210256 private Put getPutByRow (Row record ) {
211257 String rowKey = buildRowKey (record );
212258 if (StringUtils .isEmpty (rowKey )) {
@@ -244,9 +290,9 @@ private String getRowKeyValues(Row record) {
244290 return rowKeyBuilder .getRowKey (row );
245291 }
246292
247- private Map <String , Object > rowConvertMap (Row record ){
293+ private Map <String , Object > rowConvertMap (Row record ) {
248294 Map <String , Object > rowValue = Maps .newHashMap ();
249- for (int i = 0 ; i < columnNames .length ; i ++){
295+ for (int i = 0 ; i < columnNames .length ; i ++) {
250296 rowValue .put (columnNames [i ], record .getField (i ));
251297 }
252298 return rowValue ;
@@ -258,7 +304,15 @@ public void close() throws IOException {
258304 conn .close ();
259305 conn = null ;
260306 }
307+
308+ if (scheduledFuture != null ) {
309+ scheduledFuture .cancel (false );
310+ if (executor != null ) {
311+ executor .shutdownNow ();
312+ }
313+ }
261314 }
315+
262316 private HbaseOutputFormat () {
263317 }
264318
@@ -345,6 +399,15 @@ public HbaseOutputFormatBuilder setClientKeytabFile(String clientKeytabFile) {
345399 return this ;
346400 }
347401
402+ public HbaseOutputFormatBuilder setBatchSize (Integer batchSize ) {
403+ format .batchSize = batchSize ;
404+ return this ;
405+ }
406+
407+ public HbaseOutputFormatBuilder setBatchWaitInterval (Long batchWaitInterval ) {
408+ format .batchWaitInterval = batchWaitInterval ;
409+ return this ;
410+ }
348411
349412 public HbaseOutputFormat finish () {
350413 Preconditions .checkNotNull (format .host , "zookeeperQuorum should be specified" );
@@ -405,6 +468,8 @@ public String toString() {
405468 ", zookeeperSaslClient='" + zookeeperSaslClient + '\'' +
406469 ", clientPrincipal='" + clientPrincipal + '\'' +
407470 ", clientKeytabFile='" + clientKeytabFile + '\'' +
471+ ", batchSize='" + batchSize + '\'' +
472+ ", batchWaitInterval='" + batchWaitInterval + '\'' +
408473 '}' ;
409474 }
410475
0 commit comments