2525import org .apache .flink .configuration .Configuration ;
2626import org .apache .flink .types .Row ;
2727import org .apache .hadoop .security .UserGroupInformation ;
28- import org .apache .kudu .client .AsyncKuduClient ;
29- import org .apache .kudu .client .AsyncKuduSession ;
3028import org .apache .kudu .client .KuduClient ;
3129import org .apache .kudu .client .KuduException ;
3230import org .apache .kudu .client .KuduSession ;
@@ -52,26 +50,12 @@ public class KuduOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
5250 private static final long serialVersionUID = 1L ;
5351
5452 private static final Logger LOG = LoggerFactory .getLogger (KuduOutputFormat .class );
55-
56- public enum WriteMode {
57- // insert
58- INSERT ,
59- // update
60- UPDATE ,
61- // update or insert
62- UPSERT
63- }
64-
53+ protected String [] fieldNames ;
54+ TypeInformation <?>[] fieldTypes ;
55+ boolean enableKrb ;
6556 private String kuduMasters ;
66-
6757 private String tableName ;
68-
6958 private WriteMode writeMode ;
70-
71- protected String [] fieldNames ;
72-
73- TypeInformation <?>[] fieldTypes ;
74-
7559 private KuduClient client ;
7660
7761 private KuduTable table ;
@@ -90,11 +74,14 @@ public enum WriteMode {
9074 private String principal ;
9175 private String keytab ;
9276 private String krb5conf ;
93- boolean enableKrb ;
9477
9578 private KuduOutputFormat () {
9679 }
9780
81+ public static KuduOutputFormatBuilder buildKuduOutputFormat () {
82+ return new KuduOutputFormatBuilder ();
83+ }
84+
9885 @ Override
9986 public void configure (Configuration parameters ) {
10087
@@ -112,11 +99,11 @@ private void establishConnection() throws IOException {
11299 kuduClientBuilder .workerCount (workerCount );
113100 }
114101 if (null != defaultSocketReadTimeoutMs ) {
115- kuduClientBuilder .workerCount (defaultSocketReadTimeoutMs );
102+ kuduClientBuilder .defaultSocketReadTimeoutMs (defaultSocketReadTimeoutMs );
116103 }
117104
118105 if (null != defaultOperationTimeoutMs ) {
119- kuduClientBuilder .workerCount (defaultOperationTimeoutMs );
106+ kuduClientBuilder .defaultOperationTimeoutMs (defaultOperationTimeoutMs );
120107 }
121108
122109 if (enableKrb ) {
@@ -130,10 +117,16 @@ private void establishConnection() throws IOException {
130117 } else {
131118 client = kuduClientBuilder .build ();
132119 }
133- LOG . info ( "connect kudu is succeed!" );
120+
134121 if (client .tableExists (tableName )) {
135122 table = client .openTable (tableName );
136123 }
124+ if (Objects .isNull (table )) {
125+ throw new IllegalArgumentException (
126+ String .format ("Table [%s] Open Failed , please check table exists" , tableName ));
127+ }
128+ LOG .info ("connect kudu is succeed!" );
129+
137130 session = client .newSession ();
138131 }
139132
@@ -188,8 +181,97 @@ public void close() {
188181 }
189182 }
190183
191- public static KuduOutputFormatBuilder buildKuduOutputFormat () {
192- return new KuduOutputFormatBuilder ();
184+ private Operation toOperation (WriteMode writeMode , Row row ) {
185+ Operation operation = toOperation (writeMode );
186+ PartialRow partialRow = operation .getRow ();
187+
188+ for (int index = 0 ; index < row .getArity (); index ++) {
189+ //解决kudu中全小写字段找不到的bug
190+ String fieldName = fieldNames [index ].toLowerCase ();
191+ if (row .getField (index ) == null ) {
192+ partialRow .setNull (fieldName );
193+ } else {
194+ if (fieldTypes [index ].getTypeClass () == String .class ) {
195+ partialRow .addString (fieldName , (String ) row .getField (index ));
196+ continue ;
197+ }
198+ if (fieldTypes [index ].getTypeClass () == Float .class ) {
199+ partialRow .addFloat (fieldName , (Float ) row .getField (index ));
200+ continue ;
201+ }
202+ if (fieldTypes [index ].getTypeClass () == Byte .class ) {
203+ partialRow .addByte (fieldName , (Byte ) row .getField (index ));
204+ continue ;
205+ }
206+
207+ if (fieldTypes [index ].getTypeClass () == Short .class ) {
208+ partialRow .addShort (fieldName , (Short ) row .getField (index ));
209+ continue ;
210+ }
211+
212+ if (fieldTypes [index ].getTypeClass () == Integer .class ) {
213+ partialRow .addInt (fieldName , (Integer ) row .getField (index ));
214+ continue ;
215+ }
216+
217+ if (fieldTypes [index ].getTypeClass () == Long .class ) {
218+ partialRow .addLong (fieldName , (Long ) row .getField (index ));
219+ continue ;
220+ }
221+
222+ if (fieldTypes [index ].getTypeClass () == Double .class ) {
223+ partialRow .addDouble (fieldName , (Double ) row .getField (index ));
224+ continue ;
225+ }
226+
227+ if (fieldTypes [index ].getTypeClass () == BigDecimal .class ) {
228+ partialRow .addDecimal (fieldName , (BigDecimal ) row .getField (index ));
229+ continue ;
230+ }
231+ if (fieldTypes [index ].getTypeClass () == Boolean .class ) {
232+ partialRow .addBoolean (fieldName , (Boolean ) row .getField (index ));
233+ continue ;
234+ }
235+
236+ if (fieldTypes [index ].getTypeClass () == Date .class ) {
237+ partialRow .addTimestamp (fieldName , new Timestamp (((Date ) row .getField (index )).getTime ()));
238+ continue ;
239+ }
240+
241+ if (fieldTypes [index ].getTypeClass () == Timestamp .class ) {
242+ partialRow .addTimestamp (fieldName , (Timestamp ) row .getField (index ));
243+ continue ;
244+ }
245+
246+ if (fieldTypes [index ].getTypeClass () == byte [].class ) {
247+ partialRow .addBinary (fieldName , (byte []) row .getField (index ));
248+ continue ;
249+ }
250+ throw new IllegalArgumentException ("Illegal var type: " + fieldTypes [index ]);
251+ }
252+ }
253+ return operation ;
254+
255+ }
256+
257+ private Operation toOperation (WriteMode writeMode ) {
258+ switch (writeMode ) {
259+ case INSERT :
260+ return table .newInsert ();
261+ case UPDATE :
262+ return table .newUpdate ();
263+ default :
264+ return table .newUpsert ();
265+ }
266+ }
267+
268+ public enum WriteMode {
269+ // insert
270+ INSERT ,
271+ // update
272+ UPDATE ,
273+ // update or insert
274+ UPSERT
193275 }
194276
195277 public static class KuduOutputFormatBuilder {
@@ -276,93 +358,4 @@ public KuduOutputFormat finish() {
276358 }
277359 }
278360
279- private Operation toOperation (WriteMode writeMode , Row row ) {
280- if (null == table ) {
281- throw new IllegalArgumentException ("Table Open Failed , please check table exists" );
282- }
283- Operation operation = toOperation (writeMode );
284- PartialRow partialRow = operation .getRow ();
285-
286- for (int index = 0 ; index < row .getArity (); index ++) {
287- //解决kudu中全小写字段找不到的bug
288- String fieldName = fieldNames [index ].toLowerCase ();
289- if (row .getField (index ) == null ) {
290- partialRow .setNull (fieldName );
291- } else {
292- if (fieldTypes [index ].getTypeClass () == String .class ) {
293- partialRow .addString (fieldName , (String ) row .getField (index ));
294- continue ;
295- }
296- if (fieldTypes [index ].getTypeClass () == Float .class ) {
297- partialRow .addFloat (fieldName , (Float ) row .getField (index ));
298- continue ;
299- }
300- if (fieldTypes [index ].getTypeClass () == Byte .class ) {
301- partialRow .addByte (fieldName , (Byte ) row .getField (index ));
302- continue ;
303- }
304-
305- if (fieldTypes [index ].getTypeClass () == Short .class ) {
306- partialRow .addShort (fieldName , (Short ) row .getField (index ));
307- continue ;
308- }
309-
310- if (fieldTypes [index ].getTypeClass () == Integer .class ) {
311- partialRow .addInt (fieldName , (Integer ) row .getField (index ));
312- continue ;
313- }
314-
315- if (fieldTypes [index ].getTypeClass () == Long .class ) {
316- partialRow .addLong (fieldName , (Long ) row .getField (index ));
317- continue ;
318- }
319-
320- if (fieldTypes [index ].getTypeClass () == Double .class ) {
321- partialRow .addDouble (fieldName , (Double ) row .getField (index ));
322- continue ;
323- }
324-
325- if (fieldTypes [index ].getTypeClass () == BigDecimal .class ) {
326- partialRow .addDecimal (fieldName , (BigDecimal ) row .getField (index ));
327- continue ;
328- }
329- if (fieldTypes [index ].getTypeClass () == Boolean .class ) {
330- partialRow .addBoolean (fieldName , (Boolean ) row .getField (index ));
331- continue ;
332- }
333-
334- if (fieldTypes [index ].getTypeClass () == Date .class ) {
335- partialRow .addTimestamp (fieldName , new Timestamp (((Date ) row .getField (index )).getTime ()));
336- continue ;
337- }
338-
339- if (fieldTypes [index ].getTypeClass () == Timestamp .class ) {
340- partialRow .addTimestamp (fieldName , (Timestamp ) row .getField (index ));
341- continue ;
342- }
343-
344- if (fieldTypes [index ].getTypeClass () == byte [].class ) {
345- partialRow .addBinary (fieldName , (byte []) row .getField (index ));
346- continue ;
347- }
348- throw new IllegalArgumentException ("Illegal var type: " + fieldTypes [index ]);
349- }
350- }
351- return operation ;
352-
353- }
354-
355- private Operation toOperation (WriteMode writeMode ) {
356- switch (writeMode ) {
357- case INSERT :
358- return table .newInsert ();
359- case UPDATE :
360- return table .newUpdate ();
361- case UPSERT :
362- return table .newUpsert ();
363- default :
364- return table .newUpsert ();
365- }
366- }
367-
368361}
0 commit comments