1818
1919package com .dtstack .flink .sql .sink .kudu ;
2020
21+ import com .dtstack .flink .sql .exception .ExceptionTrace ;
2122import com .dtstack .flink .sql .factory .DTThreadFactory ;
2223import com .dtstack .flink .sql .outputformat .AbstractDtRichOutputFormat ;
2324import com .dtstack .flink .sql .sink .kudu .table .KuduTableInfo ;
@@ -60,6 +61,8 @@ public class KuduOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolean,
6061 private static final long serialVersionUID = 1L ;
6162
6263 private static final Logger LOG = LoggerFactory .getLogger (KuduOutputFormat .class );
64+ private static final String MANUAL_FLUSH_BUFFER_BIG_MSG = "MANUAL_FLUSH is enabled but the buffer is too big" ;
65+
6366 protected String [] fieldNames ;
6467 TypeInformation <?>[] fieldTypes ;
6568 boolean enableKrb ;
@@ -76,6 +79,8 @@ public class KuduOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolean,
7679
7780 private Integer defaultOperationTimeoutMs ;
7881
82+ private int mutationBufferMaxOps ;
83+
7984 /**
8085 * kerberos
8186 */
@@ -186,9 +191,7 @@ private KuduSession buildSessionWithFlushMode(String flushMode, KuduClient kuduC
186191 KuduSession kuduSession = kuduClient .newSession ();
187192 if (flushMode .equalsIgnoreCase (KuduTableInfo .KuduFlushMode .MANUAL_FLUSH .name ())) {
188193 kuduSession .setFlushMode (SessionConfiguration .FlushMode .MANUAL_FLUSH );
189- kuduSession .setMutationBufferSpace (
190- Integer .parseInt (String .valueOf (Math .round (batchSize * 1.2 )))
191- );
194+ kuduSession .setMutationBufferSpace (mutationBufferMaxOps );
192195 }
193196
194197 if (flushMode .equalsIgnoreCase (KuduTableInfo .KuduFlushMode .AUTO_FLUSH_SYNC .name ())) {
@@ -197,8 +200,9 @@ private KuduSession buildSessionWithFlushMode(String flushMode, KuduClient kuduC
197200 }
198201
199202 if (flushMode .equalsIgnoreCase (KuduTableInfo .KuduFlushMode .AUTO_FLUSH_BACKGROUND .name ())) {
200- LOG .warn ("Unable to determine the order of data at AUTO_FLUSH_BACKGROUND mode." );
203+ LOG .warn ("Unable to determine the order of data at AUTO_FLUSH_BACKGROUND mode. Only [batchWaitInterval] will effect. " );
201204 kuduSession .setFlushMode (SessionConfiguration .FlushMode .AUTO_FLUSH_BACKGROUND );
205+ kuduSession .setFlushInterval (batchWaitInterval );
202206 }
203207
204208 return kuduSession ;
@@ -213,21 +217,26 @@ public void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
213217 Row row = record .getField (1 );
214218
215219 try {
216- if (outRecords .getCount () % ROW_PRINT_FREQUENCY == 0 ) {
217- LOG .info ("Receive data : {}" , row );
218- }
219220 if (rowCount .getAndIncrement () >= batchSize ) {
220221 flush ();
221222 }
222223 // At AUTO_FLUSH_SYNC mode, kudu automatically flush once session apply operation, then get the response from kudu server.
223224 if (flushMode .equalsIgnoreCase (SessionConfiguration .FlushMode .AUTO_FLUSH_SYNC .name ())) {
224225 dealResponse (session .apply (toOperation (writeMode , row )));
226+ } else {
227+ session .apply (toOperation (writeMode , row ));
225228 }
226-
227- session .apply (toOperation (writeMode , row ));
228229 outRecords .inc ();
230+ if (outRecords .getCount () % ROW_PRINT_FREQUENCY == 0 ) {
231+ LOG .info ("Receive data : {}" , row );
232+ }
229233 } catch (KuduException e ) {
230- throw new RuntimeException (e );
234+ // 如果出现了buffer is too big的问题,需要将当前buffer里的数据flush掉即可。
235+ if (ExceptionTrace .traceOriginalCause (e ).contains (MANUAL_FLUSH_BUFFER_BIG_MSG )) {
236+ flush ();
237+ } else {
238+ throw new RuntimeException (e );
239+ }
231240 }
232241 }
233242
@@ -483,6 +492,11 @@ public KuduOutputFormatBuilder setFlushMode(String flushMode) {
483492 return this ;
484493 }
485494
495+ public KuduOutputFormatBuilder setMutationBufferMaxOps (Integer mutationBufferMaxOps ) {
496+ kuduOutputFormat .mutationBufferMaxOps = mutationBufferMaxOps ;
497+ return this ;
498+ }
499+
486500 public KuduOutputFormat finish () {
487501 if (kuduOutputFormat .kuduMasters == null ) {
488502 throw new IllegalArgumentException ("No kuduMasters supplied." );
0 commit comments