2929import org .apache .kudu .client .AsyncKuduSession ;
3030import org .apache .kudu .client .KuduClient ;
3131import org .apache .kudu .client .KuduException ;
32+ import org .apache .kudu .client .KuduSession ;
3233import org .apache .kudu .client .KuduTable ;
3334import org .apache .kudu .client .Operation ;
3435import org .apache .kudu .client .PartialRow ;
@@ -71,11 +72,11 @@ public enum WriteMode {
7172
7273 TypeInformation <?>[] fieldTypes ;
7374
74- private AsyncKuduClient client ;
75+ private KuduClient client ;
7576
7677 private KuduTable table ;
7778
78- private AsyncKuduSession session ;
79+ private volatile KuduSession session ;
7980
8081 private Integer workerCount ;
8182
@@ -106,16 +107,16 @@ public void open(int taskNumber, int numTasks) throws IOException {
106107 }
107108
108109 private void establishConnection () throws IOException {
109- AsyncKuduClient . AsyncKuduClientBuilder asyncKuduClientBuilder = new AsyncKuduClient . AsyncKuduClientBuilder (kuduMasters );
110+ KuduClient . KuduClientBuilder kuduClientBuilder = new KuduClient . KuduClientBuilder (kuduMasters );
110111 if (null != workerCount ) {
111- asyncKuduClientBuilder .workerCount (workerCount );
112+ kuduClientBuilder .workerCount (workerCount );
112113 }
113114 if (null != defaultSocketReadTimeoutMs ) {
114- asyncKuduClientBuilder .workerCount (defaultSocketReadTimeoutMs );
115+ kuduClientBuilder .workerCount (defaultSocketReadTimeoutMs );
115116 }
116117
117118 if (null != defaultOperationTimeoutMs ) {
118- asyncKuduClientBuilder .workerCount (defaultOperationTimeoutMs );
119+ kuduClientBuilder .workerCount (defaultOperationTimeoutMs );
119120 }
120121
121122 if (enableKrb ) {
@@ -125,14 +126,13 @@ private void establishConnection() throws IOException {
125126 krb5conf
126127 );
127128 client = ugi .doAs (
128- (PrivilegedAction <AsyncKuduClient >) asyncKuduClientBuilder ::build );
129+ (PrivilegedAction <KuduClient >) kuduClientBuilder ::build );
129130 } else {
130- client = asyncKuduClientBuilder .build ();
131+ client = kuduClientBuilder .build ();
131132 }
132- LOG .info ("connect kudu is successed!" );
133- KuduClient syncClient = client .syncClient ();
134- if (syncClient .tableExists (tableName )) {
135- table = syncClient .openTable (tableName );
133+ LOG .info ("connect kudu is succeed!" );
134+ if (client .tableExists (tableName )) {
135+ table = client .openTable (tableName );
136136 }
137137 session = client .newSession ();
138138 }
@@ -153,14 +153,12 @@ public void writeRecord(Tuple2 record) throws IOException {
153153 outDirtyRecords .inc ();
154154 return ;
155155 }
156- Operation operation = toOperation (writeMode , row );
157156
158157 try {
159158 if (outRecords .getCount () % ROW_PRINT_FREQUENCY == 0 ) {
160159 LOG .info ("Receive data : {}" , row );
161160 }
162-
163- session .apply (operation );
161+ session .apply (toOperation (writeMode , row ));
164162 outRecords .inc ();
165163 } catch (KuduException e ) {
166164 if (outDirtyRecords .getCount () % DIRTY_PRINT_FREQUENCY == 0 ) {
@@ -173,17 +171,19 @@ public void writeRecord(Tuple2 record) throws IOException {
173171
174172 @ Override
175173 public void close () {
176- if (Objects .nonNull (session )) {
177- // 先把未执行完的操作执行掉,防止操作不一致
178- session .flush ();
179- session .close ();
174+ if (Objects .nonNull (session ) && !session .isClosed ()) {
175+ try {
176+ session .close ();
177+ } catch (Exception e ) {
178+ throw new IllegalArgumentException ("[closeKuduSession]: " + e .getMessage ());
179+ }
180180 }
181181
182182 if (null != client ) {
183183 try {
184184 client .shutdown ();
185185 } catch (Exception e ) {
186- throw new IllegalArgumentException ("[closeKudu ]:" + e .getMessage ());
186+ throw new IllegalArgumentException ("[closeKuduClient ]:" + e .getMessage ());
187187 }
188188 }
189189 }
0 commit comments