1919
2020package com .dtstack .flink .sql .sink .hbase ;
2121
22+ import com .dtstack .flink .sql .factory .DTThreadFactory ;
2223import com .dtstack .flink .sql .outputformat .AbstractDtRichOutputFormat ;
2324import com .google .common .collect .Maps ;
2425import org .apache .commons .lang3 .StringUtils ;
2526import org .apache .flink .api .java .tuple .Tuple2 ;
2627import org .apache .flink .configuration .Configuration ;
27- import org .apache .flink .runtime .util .ExecutorThreadFactory ;
2828import org .apache .flink .types .Row ;
2929import org .apache .flink .util .Preconditions ;
3030import org .apache .hadoop .hbase .AuthUtil ;
4747import java .util .LinkedList ;
4848import java .util .List ;
4949import java .util .Map ;
50- import java .util .concurrent .CopyOnWriteArrayList ;
51- import java .util .concurrent .Executors ;
5250import java .util .concurrent .ScheduledExecutorService ;
5351import java .util .concurrent .ScheduledFuture ;
52+ import java .util .concurrent .ScheduledThreadPoolExecutor ;
5453import java .util .concurrent .TimeUnit ;
5554
5655/**
5756 * @author: jingzhen@dtstack.com
5857 * date: 2017-6-29
5958 */
60- public class HbaseOutputFormat extends AbstractDtRichOutputFormat <Tuple2 > {
59+ public class HbaseOutputFormat extends AbstractDtRichOutputFormat <Tuple2 < Boolean , Row > > {
6160
6261 private static final Logger LOG = LoggerFactory .getLogger (HbaseOutputFormat .class );
63-
62+ private final List < Row > records = new ArrayList <>();
6463 private String host ;
6564 private String zkParent ;
6665 private String rowkey ;
6766 private String tableName ;
6867 private String [] columnNames ;
69- private String updateMode ;
70- private String [] columnTypes ;
7168 private Map <String , String > columnNameFamily ;
72-
7369 private boolean kerberosAuthEnable ;
7470 private String regionserverKeytabFile ;
7571 private String regionserverPrincipal ;
7672 private String securityKrb5Conf ;
7773 private String zookeeperSaslClient ;
7874 private String clientPrincipal ;
7975 private String clientKeytabFile ;
80-
8176 private String [] families ;
8277 private String [] qualifiers ;
83-
8478 private transient org .apache .hadoop .conf .Configuration conf ;
8579 private transient Connection conn ;
8680 private transient Table table ;
87-
8881 private transient ChoreService choreService ;
89-
82+ /**
83+ * 批量写入的参数
84+ */
9085 private Integer batchSize ;
9186 private Long batchWaitInterval ;
87+ /**
88+ * 定时任务
89+ */
90+ private transient ScheduledExecutorService scheduler ;
91+ private transient ScheduledFuture <?> scheduledFuture ;
9292
93- private transient ScheduledExecutorService executor ;
94- private transient ScheduledFuture scheduledFuture ;
95-
96- private final List <Row > records = new CopyOnWriteArrayList <>();
93+ private HbaseOutputFormat () {
94+ }
9795
96+ public static HbaseOutputFormatBuilder buildHbaseOutputFormat () {
97+ return new HbaseOutputFormatBuilder ();
98+ }
9899
99100 @ Override
100101 public void configure (Configuration parameters ) {
101- LOG . warn ( "---configure---" );
102- conf = HBaseConfiguration . create ();
102+ // 这里不要做耗时较长的操作,否则会导致AKKA通信超时
103+ // DO NOTHING
103104 }
104105
105106 @ Override
106107 public void open (int taskNumber , int numTasks ) throws IOException {
107108 LOG .warn ("---open---" );
109+ conf = HBaseConfiguration .create ();
108110 openConn ();
109111 table = conn .getTable (TableName .valueOf (tableName ));
110112 LOG .warn ("---open end(get table from hbase) ---" );
111113 initMetric ();
112-
113- // 设置定时任务
114- if (batchWaitInterval > 0 ) {
115- this .executor = Executors .newScheduledThreadPool (
116- 1 , new ExecutorThreadFactory ("hbase-sink-flusher" ));
117- this .scheduledFuture = this .executor .scheduleAtFixedRate (() -> {
118- if (!records .isEmpty ()) {
119- dealBatchOperation (records );
120- records .clear ();
121- }
122- }, batchWaitInterval , batchWaitInterval , TimeUnit .MILLISECONDS );
123- }
124114 }
125115
126116 private void openConn () {
@@ -137,13 +127,44 @@ private void openConn() {
137127 } catch (Exception e ) {
138128 throw new RuntimeException (e );
139129 }
130+ initScheduledTask (batchWaitInterval );
131+ }
140132
133+ /**
134+ * 初始化定时写入任务
135+ *
136+ * @param batchWaitInterval 定时任务时间
137+ */
138+ private void initScheduledTask (Long batchWaitInterval ) {
139+ try {
140+ if (batchWaitInterval > 0 ) {
141+ this .scheduler = new ScheduledThreadPoolExecutor (
142+ 1 ,
143+ new DTThreadFactory ("hbase-batch-flusher" )
144+ );
145+
146+ this .scheduledFuture = this .scheduler .scheduleWithFixedDelay (
147+ () -> {
148+ synchronized (this ) {
149+ if (!records .isEmpty ()) {
150+ dealBatchOperation (records );
151+ records .clear ();
152+ }
153+ }
154+ }, batchWaitInterval , batchWaitInterval , TimeUnit .MILLISECONDS
155+ );
156+ }
157+ } catch (Exception e ) {
158+ LOG .error ("init schedule task failed !" );
159+ throw new RuntimeException (e );
160+ }
141161 }
162+
142163 private void openKerberosConn () throws Exception {
143164 conf .set (HbaseConfigUtils .KEY_HBASE_ZOOKEEPER_QUORUM , host );
144165 conf .set (HbaseConfigUtils .KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM , zkParent );
145166
146- LOG .info ("kerberos config:{}" , this .toString ());
167+ LOG .info ("kerberos config:{}" , this .conf . toString ());
147168 Preconditions .checkArgument (!StringUtils .isEmpty (clientPrincipal ), " clientPrincipal not null!" );
148169 Preconditions .checkArgument (!StringUtils .isEmpty (clientKeytabFile ), " clientKeytabFile not null!" );
149170
@@ -173,17 +194,13 @@ private void openKerberosConn() throws Exception {
173194 });
174195 }
175196
176-
177197 @ Override
178- public void writeRecord (Tuple2 tuple2 ) {
179- Tuple2 <Boolean , Row > tupleTrans = tuple2 ;
180- Boolean retract = tupleTrans .f0 ;
181- Row row = tupleTrans .f1 ;
182- if (retract ) {
198+ public void writeRecord (Tuple2 <Boolean , Row > record ) {
199+ if (record .f0 ) {
183200 if (this .batchSize != 0 ) {
184- writeBatchRecord (row );
201+ writeBatchRecord (Row . copy ( record . f1 ) );
185202 } else {
186- dealInsert (row );
203+ dealInsert (record . f1 );
187204 }
188205 }
189206 }
@@ -223,7 +240,8 @@ protected void dealBatchOperation(List<Row> records) {
223240 }
224241 // 打印结果
225242 if (outRecords .getCount () % ROW_PRINT_FREQUENCY == 0 ) {
226- LOG .info (records .toString ());
243+ // 只打印最后一条数据
244+ LOG .info (records .get (records .size () - 1 ).toString ());
227245 }
228246 } catch (IOException | InterruptedException e ) {
229247 LOG .error ("" , e );
@@ -233,6 +251,7 @@ protected void dealBatchOperation(List<Row> records) {
233251 protected void dealInsert (Row record ) {
234252 Put put = getPutByRow (record );
235253 if (put == null || put .isEmpty ()) {
254+ // 记录脏数据
236255 outDirtyRecords .inc ();
237256 return ;
238257 }
@@ -306,8 +325,8 @@ public void close() throws IOException {
306325
307326 if (scheduledFuture != null ) {
308327 scheduledFuture .cancel (false );
309- if (executor != null ) {
310- executor .shutdownNow ();
328+ if (scheduler != null ) {
329+ scheduler .shutdownNow ();
311330 }
312331 }
313332
@@ -318,16 +337,48 @@ public void close() throws IOException {
318337
319338 }
320339
321- private HbaseOutputFormat () {
340+ private void fillSyncKerberosConfig (org .apache .hadoop .conf .Configuration config ,
341+ String regionserverPrincipal ,
342+ String zookeeperSaslClient ,
343+ String securityKrb5Conf ) {
344+ if (StringUtils .isEmpty (regionserverPrincipal )) {
345+ throw new IllegalArgumentException ("Must provide regionserverPrincipal when authentication is Kerberos" );
346+ }
347+ config .set (HbaseConfigUtils .KEY_HBASE_MASTER_KERBEROS_PRINCIPAL , regionserverPrincipal );
348+ config .set (HbaseConfigUtils .KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL , regionserverPrincipal );
349+ config .set (HbaseConfigUtils .KEY_HBASE_SECURITY_AUTHORIZATION , "true" );
350+ config .set (HbaseConfigUtils .KEY_HBASE_SECURITY_AUTHENTICATION , "kerberos" );
351+
352+
353+ if (!StringUtils .isEmpty (zookeeperSaslClient )) {
354+ System .setProperty (HbaseConfigUtils .KEY_ZOOKEEPER_SASL_CLIENT , zookeeperSaslClient );
355+ }
356+
357+ if (!StringUtils .isEmpty (securityKrb5Conf )) {
358+ String krb5ConfPath = System .getProperty ("user.dir" ) + File .separator + securityKrb5Conf ;
359+ LOG .info ("krb5ConfPath:{}" , krb5ConfPath );
360+ System .setProperty (HbaseConfigUtils .KEY_JAVA_SECURITY_KRB5_CONF , krb5ConfPath );
361+ }
322362 }
323363
324- public static HbaseOutputFormatBuilder buildHbaseOutputFormat () {
325- return new HbaseOutputFormatBuilder ();
364+ @ Override
365+ public String toString () {
366+ return "HbaseOutputFormat kerberos{" +
367+ "kerberosAuthEnable=" + kerberosAuthEnable +
368+ ", regionserverKeytabFile='" + regionserverKeytabFile + '\'' +
369+ ", regionserverPrincipal='" + regionserverPrincipal + '\'' +
370+ ", securityKrb5Conf='" + securityKrb5Conf + '\'' +
371+ ", zookeeperSaslClient='" + zookeeperSaslClient + '\'' +
372+ ", clientPrincipal='" + clientPrincipal + '\'' +
373+ ", clientKeytabFile='" + clientKeytabFile + '\'' +
374+ ", batchSize='" + batchSize + '\'' +
375+ ", batchWaitInterval='" + batchWaitInterval + '\'' +
376+ '}' ;
326377 }
327378
328379 public static class HbaseOutputFormatBuilder {
329380
330- private HbaseOutputFormat format ;
381+ private final HbaseOutputFormat format ;
331382
332383 private HbaseOutputFormatBuilder () {
333384 format = new HbaseOutputFormat ();
@@ -359,11 +410,6 @@ public HbaseOutputFormatBuilder setColumnNames(String[] columnNames) {
359410 return this ;
360411 }
361412
362- public HbaseOutputFormatBuilder setColumnTypes (String [] columnTypes ) {
363- format .columnTypes = columnTypes ;
364- return this ;
365- }
366-
367413 public HbaseOutputFormatBuilder setColumnNameFamily (Map <String , String > columnNameFamily ) {
368414 format .columnNameFamily = columnNameFamily ;
369415 return this ;
@@ -438,44 +484,5 @@ public HbaseOutputFormat finish() {
438484
439485 return format ;
440486 }
441-
442487 }
443-
444- private void fillSyncKerberosConfig (org .apache .hadoop .conf .Configuration config , String regionserverPrincipal ,
445- String zookeeperSaslClient , String securityKrb5Conf ) throws IOException {
446- if (StringUtils .isEmpty (regionserverPrincipal )) {
447- throw new IllegalArgumentException ("Must provide regionserverPrincipal when authentication is Kerberos" );
448- }
449- config .set (HbaseConfigUtils .KEY_HBASE_MASTER_KERBEROS_PRINCIPAL , regionserverPrincipal );
450- config .set (HbaseConfigUtils .KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL , regionserverPrincipal );
451- config .set (HbaseConfigUtils .KEY_HBASE_SECURITY_AUTHORIZATION , "true" );
452- config .set (HbaseConfigUtils .KEY_HBASE_SECURITY_AUTHENTICATION , "kerberos" );
453-
454-
455- if (!StringUtils .isEmpty (zookeeperSaslClient )) {
456- System .setProperty (HbaseConfigUtils .KEY_ZOOKEEPER_SASL_CLIENT , zookeeperSaslClient );
457- }
458-
459- if (!StringUtils .isEmpty (securityKrb5Conf )) {
460- String krb5ConfPath = System .getProperty ("user.dir" ) + File .separator + securityKrb5Conf ;
461- LOG .info ("krb5ConfPath:{}" , krb5ConfPath );
462- System .setProperty (HbaseConfigUtils .KEY_JAVA_SECURITY_KRB5_CONF , krb5ConfPath );
463- }
464- }
465-
466- @ Override
467- public String toString () {
468- return "HbaseOutputFormat kerberos{" +
469- "kerberosAuthEnable=" + kerberosAuthEnable +
470- ", regionserverKeytabFile='" + regionserverKeytabFile + '\'' +
471- ", regionserverPrincipal='" + regionserverPrincipal + '\'' +
472- ", securityKrb5Conf='" + securityKrb5Conf + '\'' +
473- ", zookeeperSaslClient='" + zookeeperSaslClient + '\'' +
474- ", clientPrincipal='" + clientPrincipal + '\'' +
475- ", clientKeytabFile='" + clientKeytabFile + '\'' +
476- ", batchSize='" + batchSize + '\'' +
477- ", batchWaitInterval='" + batchWaitInterval + '\'' +
478- '}' ;
479- }
480-
481488}
0 commit comments