1818
1919package com .dtstack .flink .sql .sink .kudu ;
2020
21+ import com .dtstack .flink .sql .factory .DTThreadFactory ;
2122import com .dtstack .flink .sql .outputformat .AbstractDtRichOutputFormat ;
23+ import com .dtstack .flink .sql .sink .kudu .table .KuduTableInfo ;
2224import com .dtstack .flink .sql .util .KrbUtils ;
2325import org .apache .flink .api .common .typeinfo .TypeInformation ;
2426import org .apache .flink .api .java .tuple .Tuple2 ;
3133import org .apache .kudu .client .KuduTable ;
3234import org .apache .kudu .client .Operation ;
3335import org .apache .kudu .client .PartialRow ;
36+ import org .apache .kudu .client .SessionConfiguration ;
3437import org .slf4j .Logger ;
3538import org .slf4j .LoggerFactory ;
3639
4043import java .sql .Timestamp ;
4144import java .util .Date ;
4245import java .util .Objects ;
46+ import java .util .concurrent .ScheduledExecutorService ;
47+ import java .util .concurrent .ScheduledFuture ;
48+ import java .util .concurrent .ScheduledThreadPoolExecutor ;
49+ import java .util .concurrent .TimeUnit ;
50+ import java .util .concurrent .atomic .AtomicInteger ;
4351
4452/**
4553 * @author gituser
4654 * @modify xiuzhu
4755 */
48- public class KuduOutputFormat extends AbstractDtRichOutputFormat <Tuple2 > {
56+ public class KuduOutputFormat extends AbstractDtRichOutputFormat <Tuple2 < Boolean , Row > > {
4957
5058 private static final long serialVersionUID = 1L ;
5159
@@ -75,6 +83,24 @@ public class KuduOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
7583 private String keytab ;
7684 private String krb5conf ;
7785
86+ /**
87+ * batch size
88+ */
89+ private Integer batchSize ;
90+ private Integer batchWaitInterval ;
91+ /**
92+ * kudu session flush mode
93+ */
94+ private String flushMode ;
95+
96+ private transient AtomicInteger rowCount ;
97+
98+ /**
99+ * 定时任务
100+ */
101+ private transient ScheduledExecutorService scheduler ;
102+ private transient ScheduledFuture <?> scheduledFuture ;
103+
78104 private KuduOutputFormat () {
79105 }
80106
@@ -91,6 +117,33 @@ public void configure(Configuration parameters) {
91117 public void open (int taskNumber , int numTasks ) throws IOException {
92118 establishConnection ();
93119 initMetric ();
120+ initSchedulerTask ();
121+ rowCount = new AtomicInteger (0 );
122+ }
123+
124+ /**
125+ * init the scheduler task of {@link KuduOutputFormat#flush()}
126+ */
127+ private void initSchedulerTask () {
128+ try {
129+ if (batchWaitInterval > 0 ) {
130+ this .scheduler = new ScheduledThreadPoolExecutor (
131+ 1 ,
132+ new DTThreadFactory ("kudu-batch-flusher" )
133+ );
134+
135+ this .scheduledFuture = this .scheduler .scheduleWithFixedDelay (
136+ () -> {
137+ synchronized (KuduOutputFormat .this ) {
138+ flush ();
139+ }
140+ }, batchWaitInterval , batchWaitInterval , TimeUnit .MILLISECONDS
141+ );
142+ }
143+ } catch (Exception e ) {
144+ LOG .error ("init schedule task failed !" );
145+ throw new RuntimeException (e );
146+ }
94147 }
95148
96149 private void establishConnection () throws IOException {
@@ -127,17 +180,46 @@ private void establishConnection() throws IOException {
127180 }
128181 LOG .info ("connect kudu is succeed!" );
129182
130- session = client .newSession ();
183+ session = buildSessionWithFlushMode (flushMode , client );
184+ }
185+
186+ /**
187+ * According to the different flush mode, construct different session. Detail see {@link SessionConfiguration.FlushMode}
188+ *
189+ * @param flushMode flush mode
190+ * @param kuduClient kudu client
191+ * @return KuduSession with flush mode
192+ * @throws KuduException kudu exception when session flush
193+ */
194+ private KuduSession buildSessionWithFlushMode (String flushMode , KuduClient kuduClient ) throws KuduException {
195+ KuduSession kuduSession = kuduClient .newSession ();
196+ if (flushMode .equalsIgnoreCase (KuduTableInfo .KuduFlushMode .MANUAL_FLUSH .name ())) {
197+ kuduSession .setFlushMode (SessionConfiguration .FlushMode .MANUAL_FLUSH );
198+ kuduSession .setMutationBufferSpace (
199+ Integer .parseInt (String .valueOf (Math .round (batchSize * 1.2 )))
200+ );
201+ }
202+
203+ if (flushMode .equalsIgnoreCase (KuduTableInfo .KuduFlushMode .AUTO_FLUSH_SYNC .name ())) {
204+ LOG .warn ("Parameter [batchSize] will not take effect at AUTO_FLUSH_SYNC mode." );
205+ kuduSession .setFlushMode (SessionConfiguration .FlushMode .AUTO_FLUSH_SYNC );
206+ }
207+
208+ if (flushMode .equalsIgnoreCase (KuduTableInfo .KuduFlushMode .AUTO_FLUSH_BACKGROUND .name ())) {
209+ LOG .warn ("Unable to determine the order of data at AUTO_FLUSH_BACKGROUND mode." );
210+ kuduSession .setFlushMode (SessionConfiguration .FlushMode .AUTO_FLUSH_BACKGROUND );
211+ }
212+
213+ return kuduSession ;
131214 }
132215
133216 @ Override
134- public void writeRecord (Tuple2 record ) throws IOException {
135- Tuple2 <Boolean , Row > tupleTrans = record ;
136- Boolean retract = tupleTrans .getField (0 );
217+ public void writeRecord (Tuple2 <Boolean , Row > record ) throws IOException {
218+ Boolean retract = record .getField (0 );
137219 if (!retract ) {
138220 return ;
139221 }
140- Row row = tupleTrans .getField (1 );
222+ Row row = record .getField (1 );
141223 if (row .getArity () != fieldNames .length ) {
142224 if (outDirtyRecords .getCount () % DIRTY_PRINT_FREQUENCY == 0 ) {
143225 LOG .error ("record insert failed ..{}" , row .toString ());
@@ -151,6 +233,9 @@ public void writeRecord(Tuple2 record) throws IOException {
151233 if (outRecords .getCount () % ROW_PRINT_FREQUENCY == 0 ) {
152234 LOG .info ("Receive data : {}" , row );
153235 }
236+ if (rowCount .getAndIncrement () >= batchSize ) {
237+ flush ();
238+ }
154239 session .apply (toOperation (writeMode , row ));
155240 outRecords .inc ();
156241 } catch (KuduException e ) {
@@ -162,6 +247,22 @@ public void writeRecord(Tuple2 record) throws IOException {
162247 }
163248 }
164249
250+ private synchronized void flush () {
251+ try {
252+ if (session .isClosed ()) {
253+ throw new IllegalStateException ("session is closed! flush data error!" );
254+ }
255+
256+ session .flush ();
257+ // clear
258+ rowCount .set (0 );
259+ } catch (KuduException kuduException ) {
260+ LOG .error (
261+ "flush data error!" , kuduException );
262+ throw new RuntimeException (kuduException );
263+ }
264+ }
265+
165266 @ Override
166267 public void close () {
167268 if (Objects .nonNull (session ) && !session .isClosed ()) {
@@ -179,6 +280,14 @@ public void close() {
179280 throw new IllegalArgumentException ("[closeKuduClient]:" + e .getMessage ());
180281 }
181282 }
283+
284+ if (scheduledFuture != null ) {
285+ scheduledFuture .cancel (false );
286+ }
287+
288+ if (scheduler != null ) {
289+ scheduler .shutdownNow ();
290+ }
182291 }
183292
184293 private Operation toOperation (WriteMode writeMode , Row row ) {
@@ -345,6 +454,21 @@ public KuduOutputFormatBuilder setEnableKrb(boolean enableKrb) {
345454 return this ;
346455 }
347456
457+ public KuduOutputFormatBuilder setBatchSize (Integer batchSize ) {
458+ kuduOutputFormat .batchSize = batchSize ;
459+ return this ;
460+ }
461+
462+ public KuduOutputFormatBuilder setBatchWaitInterval (Integer batchWaitInterval ) {
463+ kuduOutputFormat .batchWaitInterval = batchWaitInterval ;
464+ return this ;
465+ }
466+
467+ public KuduOutputFormatBuilder setFlushMode (String flushMode ) {
468+ kuduOutputFormat .flushMode = flushMode ;
469+ return this ;
470+ }
471+
348472 public KuduOutputFormat finish () {
349473 if (kuduOutputFormat .kuduMasters == null ) {
350474 throw new IllegalArgumentException ("No kuduMasters supplied." );
0 commit comments