2828import org .apache .flink .configuration .Configuration ;
2929import org .apache .flink .types .Row ;
3030import org .apache .flink .util .Preconditions ;
31- import org .apache .hadoop .hbase .*;
31+ import org .apache .hadoop .hbase .AuthUtil ;
32+ import org .apache .hadoop .hbase .ChoreService ;
33+ import org .apache .hadoop .hbase .HBaseConfiguration ;
34+ import org .apache .hadoop .hbase .ScheduledChore ;
35+ import org .apache .hadoop .hbase .TableName ;
3236import org .apache .hadoop .hbase .client .Connection ;
3337import org .apache .hadoop .hbase .client .ConnectionFactory ;
3438import org .apache .hadoop .hbase .client .Delete ;
4246import java .io .File ;
4347import java .io .IOException ;
4448import java .security .PrivilegedAction ;
49+ import java .util .LinkedHashMap ;
50+ import java .util .LinkedList ;
4551import java .util .List ;
4652import java .util .Map ;
47- import java .util .Set ;
4853
4954/**
5055 * @author: jingzhen@dtstack.com
@@ -61,7 +66,7 @@ public class HbaseOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
6166 private String [] columnNames ;
6267 private String updateMode ;
6368 private String [] columnTypes ;
64- private Map <String , String > columnNameFamily ;
69+ private LinkedHashMap <String , String > columnNameFamily ;
6570
6671 private boolean kerberosAuthEnable ;
6772 private String regionserverKeytabFile ;
@@ -129,21 +134,18 @@ private void openKerberosConn() throws Exception {
129134
130135 UserGroupInformation userGroupInformation = HbaseConfigUtils .loginAndReturnUGI (conf , clientPrincipal , clientKeytabFile );
131136 org .apache .hadoop .conf .Configuration finalConf = conf ;
132- conn = userGroupInformation .doAs (new PrivilegedAction <Connection >() {
133- @ Override
134- public Connection run () {
135- try {
136- ScheduledChore authChore = AuthUtil .getAuthChore (finalConf );
137- if (authChore != null ) {
138- choreService = new ChoreService ("hbaseKerberosSink" );
139- choreService .scheduleChore (authChore );
140- }
141-
142- return ConnectionFactory .createConnection (finalConf );
143- } catch (IOException e ) {
144- LOG .error ("Get connection fail with config:{}" , finalConf );
145- throw new RuntimeException (e );
137+ conn = userGroupInformation .doAs ((PrivilegedAction <Connection >) () -> {
138+ try {
139+ ScheduledChore authChore = AuthUtil .getAuthChore (finalConf );
140+ if (authChore != null ) {
141+ choreService = new ChoreService ("hbaseKerberosSink" );
142+ choreService .scheduleChore (authChore );
146143 }
144+
145+ return ConnectionFactory .createConnection (finalConf );
146+ } catch (IOException e ) {
147+ LOG .error ("Get connection fail with config:{}" , finalConf );
148+ throw new RuntimeException (e );
147149 }
148150 });
149151 }
@@ -304,7 +306,7 @@ public HbaseOutputFormatBuilder setColumnTypes(String[] columnTypes) {
304306 return this ;
305307 }
306308
307- public HbaseOutputFormatBuilder setColumnNameFamily (Map <String , String > columnNameFamily ) {
309+ public HbaseOutputFormatBuilder setColumnNameFamily (LinkedHashMap <String , String > columnNameFamily ) {
308310 format .columnNameFamily = columnNameFamily ;
309311 return this ;
310312 }
@@ -355,8 +357,8 @@ public HbaseOutputFormat finish() {
355357 String [] qualifiers = new String [format .columnNames .length ];
356358
357359 if (format .columnNameFamily != null ) {
358- Set <String > keySet = format .columnNameFamily .keySet ();
359- String [] columns = keySet .toArray (new String [keySet . size () ]);
360+ List <String > keyList = new LinkedList <>( format .columnNameFamily .keySet () );
361+ String [] columns = keyList .toArray (new String [0 ]);
360362 for (int i = 0 ; i < columns .length ; ++i ) {
361363 String col = columns [i ];
362364 String [] part = col .split (":" );
0 commit comments