2929import org .apache .kudu .client .AsyncKuduSession ;
3030import org .apache .kudu .client .KuduClient ;
3131import org .apache .kudu .client .KuduException ;
32+ import org .apache .kudu .client .KuduSession ;
3233import org .apache .kudu .client .KuduTable ;
3334import org .apache .kudu .client .Operation ;
3435import org .apache .kudu .client .PartialRow ;
4041import java .security .PrivilegedAction ;
4142import java .sql .Timestamp ;
4243import java .util .Date ;
44+ import java .util .Objects ;
4345
4446/**
45- * @author gituser
46- * @modify xiuzhu
47+ * @author gituser
48+ * @modify xiuzhu
4749 */
4850public class KuduOutputFormat extends AbstractDtRichOutputFormat <Tuple2 > {
4951
@@ -70,10 +72,12 @@ public enum WriteMode {
7072
7173 TypeInformation <?>[] fieldTypes ;
7274
73- private AsyncKuduClient client ;
75+ private KuduClient client ;
7476
7577 private KuduTable table ;
7678
79+ private volatile KuduSession session ;
80+
7781 private Integer workerCount ;
7882
7983 private Integer defaultOperationTimeoutMs ;
@@ -103,39 +107,34 @@ public void open(int taskNumber, int numTasks) throws IOException {
103107 }
104108
105109 private void establishConnection () throws IOException {
106- AsyncKuduClient . AsyncKuduClientBuilder asyncKuduClientBuilder = new AsyncKuduClient . AsyncKuduClientBuilder (kuduMasters );
110+ KuduClient . KuduClientBuilder kuduClientBuilder = new KuduClient . KuduClientBuilder (kuduMasters );
107111 if (null != workerCount ) {
108- asyncKuduClientBuilder .workerCount (workerCount );
112+ kuduClientBuilder .workerCount (workerCount );
109113 }
110114 if (null != defaultSocketReadTimeoutMs ) {
111- asyncKuduClientBuilder .workerCount (defaultSocketReadTimeoutMs );
115+ kuduClientBuilder .workerCount (defaultSocketReadTimeoutMs );
112116 }
113117
114118 if (null != defaultOperationTimeoutMs ) {
115- asyncKuduClientBuilder .workerCount (defaultOperationTimeoutMs );
119+ kuduClientBuilder .workerCount (defaultOperationTimeoutMs );
116120 }
117121
118122 if (enableKrb ) {
119123 UserGroupInformation ugi = KrbUtils .loginAndReturnUgi (
120- principal ,
121- keytab ,
122- krb5conf
124+ principal ,
125+ keytab ,
126+ krb5conf
123127 );
124128 client = ugi .doAs (
125- new PrivilegedAction <AsyncKuduClient >() {
126- @ Override
127- public AsyncKuduClient run () {
128- return asyncKuduClientBuilder .build ();
129- }
130- });
129+ (PrivilegedAction <KuduClient >) kuduClientBuilder ::build );
131130 } else {
132- client = asyncKuduClientBuilder .build ();
131+ client = kuduClientBuilder .build ();
133132 }
134- LOG .info ("connect kudu is successed!" );
135- KuduClient syncClient = client .syncClient ();
136- if (syncClient .tableExists (tableName )) {
137- table = syncClient .openTable (tableName );
133+ LOG .info ("connect kudu is succeed!" );
134+ if (client .tableExists (tableName )) {
135+ table = client .openTable (tableName );
138136 }
137+ session = client .newSession ();
139138 }
140139
141140 @ Override
@@ -147,26 +146,22 @@ public void writeRecord(Tuple2 record) throws IOException {
147146 }
148147 Row row = tupleTrans .getField (1 );
149148 if (row .getArity () != fieldNames .length ) {
150- if (outDirtyRecords .getCount () % DIRTY_PRINT_FREQUENCY == 0 ) {
149+ if (outDirtyRecords .getCount () % DIRTY_PRINT_FREQUENCY == 0 ) {
151150 LOG .error ("record insert failed ..{}" , row .toString ());
152151 LOG .error ("cause by row.getArity() != fieldNames.length" );
153152 }
154153 outDirtyRecords .inc ();
155154 return ;
156155 }
157- Operation operation = toOperation (writeMode , row );
158- AsyncKuduSession session = client .newSession ();
159156
160157 try {
161158 if (outRecords .getCount () % ROW_PRINT_FREQUENCY == 0 ) {
162159 LOG .info ("Receive data : {}" , row );
163160 }
164-
165- session .apply (operation );
166- session .close ();
161+ session .apply (toOperation (writeMode , row ));
167162 outRecords .inc ();
168163 } catch (KuduException e ) {
169- if (outDirtyRecords .getCount () % DIRTY_PRINT_FREQUENCY == 0 ){
164+ if (outDirtyRecords .getCount () % DIRTY_PRINT_FREQUENCY == 0 ) {
170165 LOG .error ("record insert failed, total dirty record:{} current row:{}" , outDirtyRecords .getCount (), row .toString ());
171166 LOG .error ("" , e );
172167 }
@@ -176,11 +171,19 @@ public void writeRecord(Tuple2 record) throws IOException {
176171
177172 @ Override
178173 public void close () {
174+ if (Objects .nonNull (session ) && !session .isClosed ()) {
175+ try {
176+ session .close ();
177+ } catch (Exception e ) {
178+ throw new IllegalArgumentException ("[closeKuduSession]: " + e .getMessage ());
179+ }
180+ }
181+
179182 if (null != client ) {
180183 try {
181- client .close ();
184+ client .shutdown ();
182185 } catch (Exception e ) {
183- throw new IllegalArgumentException ("[closeKudu ]:" + e .getMessage ());
186+ throw new IllegalArgumentException ("[closeKuduClient ]:" + e .getMessage ());
184187 }
185188 }
186189 }
0 commit comments