1919package com .dtstack .flink .sql .sink .impala ;
2020
2121import com .dtstack .flink .sql .classloader .ClassLoaderManager ;
22+ import com .dtstack .flink .sql .exception .ExceptionTrace ;
2223import com .dtstack .flink .sql .factory .DTThreadFactory ;
2324import com .dtstack .flink .sql .outputformat .AbstractDtRichOutputFormat ;
2425import com .dtstack .flink .sql .sink .rdb .JDBCTypeConvertUtils ;
2526import com .dtstack .flink .sql .table .AbstractTableInfo ;
27+ import com .dtstack .flink .sql .util .DtStringUtil ;
2628import com .dtstack .flink .sql .util .KrbUtils ;
2729import com .google .common .collect .Maps ;
2830import org .apache .commons .collections .CollectionUtils ;
4143import java .sql .SQLException ;
4244import java .sql .Statement ;
4345import java .util .ArrayList ;
44- import java .util .Arrays ;
4546import java .util .HashMap ;
4647import java .util .List ;
4748import java .util .Map ;
@@ -72,8 +73,6 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
7273
7374 // ${field}
7475 private static final Pattern STATIC_PARTITION_PATTERN = Pattern .compile ("\\ $\\ {([^}]*)}" );
75- // cast(value as string) -> cast('value' as string) cast(value as timestamp) -> cast('value' as timestamp)
76- private static final Pattern TYPE_PATTERN = Pattern .compile ("cast\\ ((.*) as (.*)\\ )" );
7776 //specific type which values need to be quoted
7877 private static final String [] NEED_QUOTE_TYPE = {"string" , "timestamp" , "varchar" };
7978
@@ -190,12 +189,7 @@ private void initScheduledTask(Long batchWaitInterval) {
190189 new DTThreadFactory ("impala-upsert-output-format" ));
191190 this .scheduledFuture = this .scheduler .scheduleWithFixedDelay (() -> {
192191 synchronized (ImpalaOutputFormat .this ) {
193- try {
194- flush ();
195- } catch (Exception e ) {
196- LOG .error ("Writing records to impala jdbc failed." , e );
197- throw new RuntimeException ("Writing records to impala jdbc failed." , e );
198- }
192+ flush ();
199193 }
200194 }, batchWaitInterval , batchWaitInterval , TimeUnit .MILLISECONDS );
201195 }
@@ -234,16 +228,16 @@ private void openJdbc() {
234228 }
235229 }
236230
237- private void flush () throws Exception {
238- if (batchCount > 0 ) {
239- if (updateMode .equalsIgnoreCase (UPDATE_MODE )) {
240- executeUpdateBatch ();
241- }
242- if (!rowDataMap .isEmpty ()) {
243- String templateSql =
231+ private synchronized void flush () {
232+ try {
233+ if (batchCount > 0 ) {
234+ if (updateMode .equalsIgnoreCase (UPDATE_MODE )) {
235+ executeUpdateBatch ();
236+ }
237+ if (!rowDataMap .isEmpty ()) {
238+ String templateSql =
244239 "INSERT INTO tableName ${tableFieldsCondition} PARTITION ${partitionCondition} VALUES ${valuesCondition}" ;
245- executeBatchSql (
246- statement ,
240+ executeBatchSql (
247241 templateSql ,
248242 schema ,
249243 tableName ,
@@ -252,12 +246,15 @@ private void flush() throws Exception {
252246 valueFieldNames ,
253247 partitionFields ,
254248 rowDataMap
255- );
256- rowDataMap .clear ();
249+ );
250+ rowDataMap .clear ();
251+ }
257252 }
253+ batchCount = 0 ;
254+ } catch (Exception e ) {
255+ LOG .error ("Writing records to impala jdbc failed." , e );
256+ throw new RuntimeException ("Writing records to impala jdbc failed." , e );
258257 }
259- batchCount = 0 ;
260-
261258 }
262259
263260 /**
@@ -285,8 +282,8 @@ private void executeUpdateBatch() throws SQLException {
285282 rows .clear ();
286283 } catch (Exception e ) {
287284 LOG .debug ("impala jdbc execute batch error " , e );
288- connection . rollback ( );
289- connection .commit ();
285+ JDBCUtils . rollBack ( connection );
286+ JDBCUtils .commit (connection );
290287 updateStatement .clearBatch ();
291288 executeUpdate (connection );
292289 }
@@ -297,14 +294,10 @@ public void executeUpdate(Connection connection) {
297294 try {
298295 setRecordToStatement (updateStatement , JDBCTypeConvertUtils .getSqlTypeFromFieldType (fieldTypes ), row );
299296 updateStatement .executeUpdate ();
300- connection .commit ();
297+ JDBCUtils .commit (connection );
301298 } catch (Exception e ) {
302- try {
303- connection .rollback ();
304- connection .commit ();
305- } catch (SQLException e1 ) {
306- throw new RuntimeException (e1 );
307- }
299+ JDBCUtils .rollBack (connection );
300+ JDBCUtils .commit (connection );
308301 if (metricOutputFormat .outDirtyRecords .getCount () % DIRTY_DATA_PRINT_FREQUENCY == 0 || LOG .isDebugEnabled ()) {
309302 LOG .error ("record insert failed ,this row is {}" , row .toString ());
310303 LOG .error ("" , e );
@@ -351,40 +344,6 @@ private List<String> rebuildFieldNameListAndTypeList(List<String> fieldNames, Li
351344 return valueFields ;
352345 }
353346
354- /**
355- * Quote a specific type of value, like string, timestamp
356- * before: 1, cast(tiezhu as string), cast(2001-01-09 01:05:01 as timestamp), cast(123 as int)
357- * after: 1, cast('tiezhu' as string), cast('2001-01-09 01:05:01' as timestamp), cast(123 as int)
358- * if cast value is null, then cast(null as type)
359- *
360- * @param valueCondition original value condition
361- * @return quoted condition
362- */
363- private String valueConditionAddQuotation (String valueCondition ) {
364- String [] temps = valueCondition .split ("," );
365- List <String > replacedItem = new ArrayList <>();
366- Arrays .stream (temps ).forEach (
367- item -> {
368- Matcher matcher = TYPE_PATTERN .matcher (item );
369- while (matcher .find ()) {
370- String value = matcher .group (1 );
371- String type = matcher .group (2 );
372-
373- for (String needQuoteType : NEED_QUOTE_TYPE ) {
374- if (type .contains (needQuoteType )) {
375- if (!"null" .equals (value )) {
376- item = item .replace (value , "'" + value + "'" );
377- }
378- }
379- }
380- }
381- replacedItem .add (item );
382- }
383- );
384-
385- return "(" + String .join (", " , replacedItem ) + ")" ;
386- }
387-
388347 @ Override
389348 public void writeRecord (Tuple2 <Boolean , Row > record ) throws IOException {
390349 try {
@@ -419,7 +378,7 @@ public void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
419378 for (int i = 0 ; i < fieldTypes .size (); i ++) {
420379 rowValue .setField (i , valueMap .get (valueFieldNames .get (i )));
421380 }
422- rowTuple2 .f1 = valueConditionAddQuotation ( buildValuesCondition (fieldTypes , rowValue ) );
381+ rowTuple2 .f1 = buildValuesCondition (fieldTypes , rowValue );
423382 putRowIntoMap (rowDataMap , rowTuple2 );
424383 }
425384
@@ -443,11 +402,7 @@ public void close() throws IOException {
443402 }
444403 // 将还未执行的SQL flush
445404 if (batchCount > 0 ) {
446- try {
447- flush ();
448- } catch (Exception e ) {
449- throw new RuntimeException ("Writing records to impala failed." , e );
450- }
405+ flush ();
451406 }
452407 // cancel scheduled task
453408 if (this .scheduledFuture != null ) {
@@ -481,63 +436,98 @@ public void close() throws IOException {
481436 * execute batch sql from row data map
482437 * sql like 'insert into tableName(f1, f2, f3) ${partitionCondition} values(v1, v2, v3), (v4, v5, v6)....
483438 *
484- * @param statement execute statement
485439 * @param tempSql template sql
486440 * @param storeType the store type of data
487441 * @param enablePartition enable partition or not
488442 * @param fieldNames field name list
489443 * @param partitionFields partition fields
490444 * @param rowDataMap row data map
491445 */
492- private void executeBatchSql (Statement statement ,
493- String tempSql ,
446+ private void executeBatchSql (String tempSql ,
494447 String schema ,
495448 String tableName ,
496449 String storeType ,
497450 Boolean enablePartition ,
498451 List <String > fieldNames ,
499452 String partitionFields ,
500453 Map <String , ArrayList <String >> rowDataMap ) {
501- StringBuilder valuesCondition = new StringBuilder ();
502454 StringBuilder partitionCondition = new StringBuilder ();
503455 String tableFieldsCondition = buildTableFieldsCondition (fieldNames , partitionFields );
504- ArrayList <String > rowData ;
456+ ArrayList <String > rowData = new ArrayList <>() ;
505457 String tableNameInfo = Objects .isNull (schema ) ?
506458 tableName : quoteIdentifier (schema ) + "." + tableName ;
507459 tempSql = tempSql .replace ("tableName" , tableNameInfo );
460+ boolean isPartitioned = storeType .equalsIgnoreCase (KUDU_TYPE ) || !enablePartition ;
508461
509- // kudu ${partitionCondition} is null
510- if (storeType .equalsIgnoreCase (KUDU_TYPE ) || !enablePartition ) {
511- try {
512- rowData = rowDataMap .get (NO_PARTITION );
513- rowData .forEach (row -> valuesCondition .append (row ).append (", " ));
514- String executeSql = tempSql .replace (VALUES_CONDITION , valuesCondition .toString ())
462+ try {
463+ // kudu ${partitionCondition} is null
464+ if (isPartitioned ) {
465+ tempSql = tempSql
466+ .replace (PARTITION_CONDITION , partitionCondition .toString ())
467+ .replace (PARTITION_CONSTANT , "" )
468+ .replace (TABLE_FIELDS_CONDITION , tableFieldsCondition );
469+ rowData .addAll (rowDataMap .get (NO_PARTITION ));
470+ String executeSql = tempSql .replace (VALUES_CONDITION , String .join (", " , rowData ));
471+ statement .execute (executeSql );
472+ rowData .clear ();
473+ } else {
474+ // partition sql
475+ Set <String > keySet = rowDataMap .keySet ();
476+ for (String key : keySet ) {
477+ rowData .addAll (rowDataMap .get (key ));
478+ partitionCondition .append (key );
479+ tempSql = tempSql
515480 .replace (PARTITION_CONDITION , partitionCondition .toString ())
516- .replace (PARTITION_CONSTANT , "" )
517481 .replace (TABLE_FIELDS_CONDITION , tableFieldsCondition );
518- String substring = executeSql .substring (0 , executeSql .length () - 2 );
519- statement .execute (substring );
520- } catch (Exception e ) {
521- throw new RuntimeException ("execute impala SQL error!" , e );
482+ String executeSql = tempSql
483+ .replace (VALUES_CONDITION , String .join (", " , rowData ));
484+ statement .execute (executeSql );
485+ partitionCondition .delete (0 , partitionCondition .length ());
486+ }
522487 }
523- return ;
488+ } catch (Exception e ) {
489+ if (e instanceof SQLException ) {
490+ dealBatchSqlError (rowData , connection , statement , tempSql );
491+ } else {
492+ throw new RuntimeException ("Insert into impala error!" , e );
493+ }
494+ } finally {
495+ rowData .clear ();
524496 }
497+ }
525498
526- // partition sql
527- Set <String > keySet = rowDataMap .keySet ();
528- String finalTempSql = tempSql ;
529- for (String key : keySet ) {
499+ /**
500+ * 当批量写入失败时,把批量的sql拆解为单条sql提交,对于单条写入的sql记做脏数据
501+ *
502+ * @param rowData 批量的values
503+ * @param connection 当前数据库connect
504+ * @param statement 当前statement
505+ * @param templateSql 模版sql,例如insert into tableName(f1, f2, f3) [partition] values $valueCondition
506+ */
507+ private void dealBatchSqlError (List <String > rowData ,
508+ Connection connection ,
509+ Statement statement ,
510+ String templateSql ) {
511+ String errorMsg = "Insert into impala error. \n Cause: [%s]\n Row: [%s]" ;
512+ JDBCUtils .rollBack (connection );
513+ JDBCUtils .commit (connection );
514+ for (String rowDatum : rowData ) {
515+ String executeSql = templateSql .replace (VALUES_CONDITION , rowDatum );
530516 try {
531- String executeSql = String .copyValueOf (finalTempSql .toCharArray ());
532- ArrayList <String > valuesConditionList = rowDataMap .get (key );
533- partitionCondition .append (key );
534- executeSql = executeSql .replace (PARTITION_CONDITION , partitionCondition .toString ())
535- .replace (TABLE_FIELDS_CONDITION , tableFieldsCondition )
536- .replace (VALUES_CONDITION , String .join (", " , valuesConditionList ));
537517 statement .execute (executeSql );
538- partitionCondition .delete (0 , partitionCondition .length ());
539- } catch (SQLException sqlException ) {
540- throw new RuntimeException ("execute impala SQL error! " , sqlException );
518+ JDBCUtils .commit (connection );
519+ } catch (SQLException e ) {
520+ JDBCUtils .rollBack (connection );
521+ JDBCUtils .commit (connection );
522+ if (metricOutputFormat .outDirtyRecords .getCount () % DIRTY_DATA_PRINT_FREQUENCY == 0 || LOG .isDebugEnabled ()) {
523+ LOG .error (
524+ String .format (
525+ errorMsg ,
526+ ExceptionTrace .traceOriginalCause (e ),
527+ rowDatum )
528+ );
529+ }
530+ metricOutputFormat .outDirtyRecords .inc ();
541531 }
542532 }
543533 }
@@ -580,22 +570,27 @@ private String buildTableFieldsCondition(List<String> fieldNames, String partiti
580570 * replace ${valuesCondition}
581571 *
582572 * @param fieldTypes field types
583- * @return condition like '(?, ?, cast(? as string))' and '?' will be replaced with row data
573+ * @return condition like '(?, ?, cast('?' as string))' and '?' will be replaced with row data
584574 */
585575 private String buildValuesCondition (List <String > fieldTypes , Row row ) {
586576 String valuesCondition = fieldTypes .stream ().map (
587577 f -> {
588578 for (String item : NEED_QUOTE_TYPE ) {
589579 if (f .toLowerCase ().contains (item )) {
590- return String .format ("cast(? as %s)" , f .toLowerCase ());
580+ return String .format ("cast('?' as %s)" , f .toLowerCase ());
591581 }
592582 }
593583 return "?" ;
594584 }).collect (Collectors .joining (", " ));
595585 for (int i = 0 ; i < row .getArity (); i ++) {
596- valuesCondition = valuesCondition .replaceFirst ("\\ ?" , Objects .isNull (row .getField (i )) ? "null" : row .getField (i ).toString ());
586+ Object rowField = row .getField (i );
587+ if (DtStringUtil .isEmptyOrNull (rowField )) {
588+ valuesCondition = valuesCondition .replaceFirst ("'\\ ?'" , "null" );
589+ } else {
590+ valuesCondition = valuesCondition .replaceFirst ("\\ ?" , rowField .toString ());
591+ }
597592 }
598- return valuesCondition ;
593+ return "(" + valuesCondition + ")" ;
599594 }
600595
601596 /**
0 commit comments