2121package com .dtstack .flink .sql .sink .hbase ;
2222
2323import com .dtstack .flink .sql .sink .MetricOutputFormat ;
24+ import com .google .common .collect .Lists ;
2425import org .apache .commons .lang3 .StringUtils ;
2526import org .apache .flink .api .java .tuple .Tuple2 ;
2627import org .apache .flink .configuration .Configuration ;
3637import org .slf4j .LoggerFactory ;
3738import java .io .IOException ;
3839import java .text .SimpleDateFormat ;
39- import java .util .ArrayList ;
4040import java .util .List ;
4141import java .util .Map ;
4242import java .util .Set ;
@@ -65,9 +65,10 @@ public class HbaseOutputFormat extends MetricOutputFormat {
6565 private transient Table table ;
6666
6767 public final SimpleDateFormat ROWKEY_DATE_FORMAT = new SimpleDateFormat ("yyyyMMddHHmmss" );
68- public final SimpleDateFormat FIELD_DATE_FORMAT = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss" );
6968
7069 private static int rowLenth = 1000 ;
70+ private static int dirtyDataPrintFrequency = 1000 ;
71+
7172
7273 @ Override
7374 public void configure (Configuration parameters ) {
@@ -90,7 +91,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
9091 }
9192
9293 @ Override
93- public void writeRecord (Tuple2 tuple2 ) throws IOException {
94+ public void writeRecord (Tuple2 tuple2 ) {
9495
9596 Tuple2 <Boolean , Row > tupleTrans = tuple2 ;
9697 Boolean retract = tupleTrans .getField (0 );
@@ -100,52 +101,71 @@ public void writeRecord(Tuple2 tuple2) throws IOException {
100101 }
101102
102103 Row record = tupleTrans .getField (1 );
103-
104- List <String > list = new ArrayList <>();
105- for (int i = 0 ; i < rowkey .length ; ++i ) {
106- String colName = rowkey [i ];
107- int j = 0 ;
108- for (; j < columnNames .length ; ++j ) {
109- if (columnNames [j ].equals (colName )) {
110- break ;
111- }
112- }
113- if (j != columnNames .length && record .getField (i ) != null ) {
114- Object field = record .getField (j );
115- if (field == null ) {
116- list .add ("null" );
117- } else if (field instanceof java .util .Date ){
118- java .util .Date d = (java .util .Date )field ;
119- list .add (ROWKEY_DATE_FORMAT .format (d ));
120- } else {
121- list .add (field .toString ());
122- }
123- }
104+ List <String > rowKeyValues = getRowKeyValues (record );
105+ // all rowkey not null
106+ if (rowKeyValues .size () != rowkey .length ) {
107+ LOG .error ("row key value must not null,record is .." , record );
108+ outDirtyRecords .inc ();
109+ return ;
124110 }
125111
126- String key = StringUtils .join (list , "-" );
112+ String key = StringUtils .join (rowKeyValues , "-" );
127113 Put put = new Put (key .getBytes ());
128114 for (int i = 0 ; i < record .getArity (); ++i ) {
129- Object field = record .getField (i );
130- byte [] val = null ;
131- if (field != null ) {
132- val = field .toString ().getBytes ();
115+ Object fieldVal = record .getField (i );
116+ if (fieldVal == null ) {
117+ continue ;
133118 }
119+ byte [] val = fieldVal .toString ().getBytes ();
134120 byte [] cf = families [i ].getBytes ();
135121 byte [] qualifier = qualifiers [i ].getBytes ();
136- put .addColumn (cf , qualifier , val );
137122
123+ put .addColumn (cf , qualifier , val );
138124 }
139125
140- table .put (put );
126+ try {
127+ table .put (put );
128+ } catch (IOException e ) {
129+ outDirtyRecords .inc ();
130+ if (outDirtyRecords .getCount () % dirtyDataPrintFrequency == 0 || LOG .isDebugEnabled ()) {
131+ LOG .error ("record insert failed .." , record .toString ());
132+ LOG .error ("" , e );
133+ }
134+ }
141135
142- if (outRecords .getCount ()% rowLenth == 0 ){
136+ if (outRecords .getCount () % rowLenth == 0 ) {
143137 LOG .info (record .toString ());
144138 }
145139 outRecords .inc ();
146140
147141 }
148142
143+ private List <String > getRowKeyValues (Row record ) {
144+ List <String > rowKeyValues = Lists .newArrayList ();
145+ for (int i = 0 ; i < rowkey .length ; ++i ) {
146+ String colName = rowkey [i ];
147+ int rowKeyIndex = 0 ; //rowkey index
148+ for (; rowKeyIndex < columnNames .length ; ++rowKeyIndex ) {
149+ if (columnNames [rowKeyIndex ].equals (colName )) {
150+ break ;
151+ }
152+ }
153+
154+ if (rowKeyIndex != columnNames .length && record .getField (rowKeyIndex ) != null ) {
155+ Object field = record .getField (rowKeyIndex );
156+ if (field == null ) {
157+ continue ;
158+ } else if (field instanceof java .util .Date ) {
159+ java .util .Date d = (java .util .Date ) field ;
160+ rowKeyValues .add (ROWKEY_DATE_FORMAT .format (d ));
161+ } else {
162+ rowKeyValues .add (field .toString ());
163+ }
164+ }
165+ }
166+ return rowKeyValues ;
167+ }
168+
149169 @ Override
150170 public void close () throws IOException {
151171 if (conn != null ) {
0 commit comments