@@ -77,7 +77,7 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
7777 // cast(value as string) -> cast('value' as string) cast(value as timestamp) -> cast('value' as timestamp)
7878 private static final Pattern TYPE_PATTERN = Pattern .compile ("cast\\ ((.*) as (.*)\\ )" );
7979 //specific type which values need to be quoted
80- private static final String [] NEED_QUOTE_TYPE = {"string" , " timestamp" , "varchar" };
80+ private static final String [] NEED_QUOTE_TYPE = {"timestamp" , "varchar" };
8181
8282 private static final Integer DEFAULT_CONN_TIME_OUT = 60 ;
8383 private static final int RECEIVE_DATA_PRINT_FREQUENCY = 1000 ;
@@ -162,7 +162,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
162162 init ();
163163 initMetric ();
164164 } catch (Exception e ) {
165- throw new RemoteException ("impala output format open error!" , e );
165+ throw new RuntimeException ("impala output format open error!" , e );
166166 }
167167 }
168168
@@ -189,19 +189,23 @@ private void init() throws SQLException {
189189 }
190190
191191 private void initScheduledTask (Long batchWaitInterval ) {
192- if (batchWaitInterval != 0 ) {
193- this .scheduler = new ScheduledThreadPoolExecutor (1 ,
194- new DTThreadFactory ("impala-upsert-output-format" ));
195- this .scheduledFuture = this .scheduler .scheduleWithFixedDelay (() -> {
196- synchronized (ImpalaOutputFormat .this ) {
197- try {
198- flush ();
199- } catch (Exception e ) {
200- LOG .error ("Writing records to impala jdbc failed." , e );
201- throw new RuntimeException ("Writing records to impala jdbc failed." , e );
192+ try {
193+ if (batchWaitInterval != 0 ) {
194+ this .scheduler = new ScheduledThreadPoolExecutor (1 ,
195+ new DTThreadFactory ("impala-upsert-output-format" ));
196+ this .scheduledFuture = this .scheduler .scheduleWithFixedDelay (() -> {
197+ synchronized (ImpalaOutputFormat .this ) {
198+ try {
199+ flush ();
200+ } catch (Exception e ) {
201+ LOG .error ("Writing records to impala jdbc failed." , e );
202+ throw new RuntimeException ("Writing records to impala jdbc failed." , e );
203+ }
202204 }
203- }
204- }, batchWaitInterval , batchWaitInterval , TimeUnit .MILLISECONDS );
205+ }, batchWaitInterval , batchWaitInterval , TimeUnit .MILLISECONDS );
206+ }
207+ } catch (Exception e ) {
208+ throw new RuntimeException (e );
205209 }
206210 }
207211
@@ -235,7 +239,7 @@ private void openJdbc() {
235239 }
236240 }
237241
238- private synchronized void flush () throws SQLException {
242+ private void flush () throws Exception {
239243 if (batchCount > 0 ) {
240244 if (updateMode .equalsIgnoreCase (UPDATE_MODE )) {
241245 executeUpdateBatch ();
@@ -387,7 +391,7 @@ private String valueConditionAddQuotation(String valueCondition) {
387391 }
388392
389393 @ Override
390- public synchronized void writeRecord (Tuple2 <Boolean , Row > record ) throws IOException {
394+ public void writeRecord (Tuple2 <Boolean , Row > record ) throws IOException {
391395 try {
392396 if (!record .f0 ) {
393397 return ;
@@ -433,7 +437,7 @@ public synchronized void writeRecord(Tuple2<Boolean, Row> record) throws IOExcep
433437 // Receive data
434438 outRecords .inc ();
435439 } catch (Exception e ) {
436- throw new RuntimeException ("Writing records to impala failed." , e );
440+ throw new IOException ("Writing records to impala failed." , e );
437441 }
438442 }
439443
@@ -469,7 +473,7 @@ public void close() throws IOException {
469473 updateStatement .close ();
470474 }
471475 } catch (SQLException e ) {
472- throw new RemoteException ("impala connection close failed!" );
476+ throw new RemoteException ("impala connection close failed!" , e );
473477 } finally {
474478 connection = null ;
475479 statement = null ;
@@ -489,17 +493,16 @@ public void close() throws IOException {
489493 * @param fieldNames field name list
490494 * @param partitionFields partition fields
491495 * @param rowDataMap row data map
492- * @throws SQLException throw sql exception
493496 */
494- private synchronized void executeBatchSql (Statement statement ,
495- String tempSql ,
496- String schema ,
497- String tableName ,
498- String storeType ,
499- Boolean enablePartition ,
500- List <String > fieldNames ,
501- String partitionFields ,
502- Map <String , ArrayList <String >> rowDataMap ) throws SQLException {
497+ private void executeBatchSql (Statement statement ,
498+ String tempSql ,
499+ String schema ,
500+ String tableName ,
501+ String storeType ,
502+ Boolean enablePartition ,
503+ List <String > fieldNames ,
504+ String partitionFields ,
505+ Map <String , ArrayList <String >> rowDataMap ) {
503506 StringBuilder valuesCondition = new StringBuilder ();
504507 StringBuilder partitionCondition = new StringBuilder ();
505508 String tableFieldsCondition = buildTableFieldsCondition (fieldNames , partitionFields );
@@ -510,14 +513,18 @@ private synchronized void executeBatchSql(Statement statement,
510513
511514 // kudu ${partitionCondition} is null
512515 if (storeType .equalsIgnoreCase (KUDU_TYPE ) || !enablePartition ) {
513- rowData = rowDataMap .get (NO_PARTITION );
514- rowData .forEach (row -> valuesCondition .append (row ).append (", " ));
515- String executeSql = tempSql .replace (VALUES_CONDITION , valuesCondition .toString ())
516- .replace (PARTITION_CONDITION , partitionCondition .toString ())
517- .replace (PARTITION_CONSTANT , "" )
518- .replace (TABLE_FIELDS_CONDITION , tableFieldsCondition );
519- String substring = executeSql .substring (0 , executeSql .length () - 2 );
520- statement .execute (substring );
516+ try {
517+ rowData = rowDataMap .get (NO_PARTITION );
518+ rowData .forEach (row -> valuesCondition .append (row ).append (", " ));
519+ String executeSql = tempSql .replace (VALUES_CONDITION , valuesCondition .toString ())
520+ .replace (PARTITION_CONDITION , partitionCondition .toString ())
521+ .replace (PARTITION_CONSTANT , "" )
522+ .replace (TABLE_FIELDS_CONDITION , tableFieldsCondition );
523+ String substring = executeSql .substring (0 , executeSql .length () - 2 );
524+ statement .execute (substring );
525+ } catch (Exception e ) {
526+ throw new RuntimeException ("execute impala SQL error!" , e );
527+ }
521528 return ;
522529 }
523530
@@ -535,7 +542,7 @@ private synchronized void executeBatchSql(Statement statement,
535542 statement .execute (executeSql );
536543 partitionCondition .delete (0 , partitionCondition .length ());
537544 } catch (SQLException sqlException ) {
538- throw new RuntimeException ("execute impala partition SQL error! " , sqlException );
545+ throw new RuntimeException ("execute impala SQL error! " , sqlException );
539546 }
540547 });
541548 }
@@ -583,7 +590,7 @@ private String buildTableFieldsCondition(List<String> fieldNames, String partiti
583590 private String buildValuesCondition (List <String > fieldTypes , Row row ) {
584591 String valuesCondition = fieldTypes .stream ().map (
585592 f -> {
586- for (String item : NEED_QUOTE_TYPE ) {
593+ for (String item : NEED_QUOTE_TYPE ) {
587594 if (f .toLowerCase ().contains (item )) {
588595 return String .format ("cast(? as %s)" , f .toLowerCase ());
589596 }
0 commit comments