1919package com .dtstack .flink .sql .sink .impala ;
2020
2121import com .dtstack .flink .sql .classloader .ClassLoaderManager ;
22+ import com .dtstack .flink .sql .exception .ExceptionTrace ;
2223import com .dtstack .flink .sql .factory .DTThreadFactory ;
2324import com .dtstack .flink .sql .outputformat .AbstractDtRichOutputFormat ;
2425import com .dtstack .flink .sql .sink .rdb .JDBCTypeConvertUtils ;
2526import com .dtstack .flink .sql .table .AbstractTableInfo ;
27+ import com .dtstack .flink .sql .util .DtStringUtil ;
28+ import com .dtstack .flink .sql .util .JDBCUtils ;
2629import com .dtstack .flink .sql .util .KrbUtils ;
2730import com .google .common .collect .Maps ;
2831import org .apache .commons .collections .CollectionUtils ;
4144import java .sql .SQLException ;
4245import java .sql .Statement ;
4346import java .util .ArrayList ;
44- import java .util .Arrays ;
4547import java .util .HashMap ;
4648import java .util .List ;
4749import java .util .Map ;
@@ -72,8 +74,6 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
7274
7375 // ${field}
7476 private static final Pattern STATIC_PARTITION_PATTERN = Pattern .compile ("\\ $\\ {([^}]*)}" );
75- // cast(value as string) -> cast('value' as string) cast(value as timestamp) -> cast('value' as timestamp)
76- private static final Pattern TYPE_PATTERN = Pattern .compile ("cast\\ ((.*) as (.*)\\ )" );
7777 //specific type which values need to be quoted
7878 private static final String [] NEED_QUOTE_TYPE = {"string" , "timestamp" , "varchar" };
7979
@@ -190,12 +190,7 @@ private void initScheduledTask(Long batchWaitInterval) {
190190 new DTThreadFactory ("impala-upsert-output-format" ));
191191 this .scheduledFuture = this .scheduler .scheduleWithFixedDelay (() -> {
192192 synchronized (ImpalaOutputFormat .this ) {
193- try {
194- flush ();
195- } catch (Exception e ) {
196- LOG .error ("Writing records to impala jdbc failed." , e );
197- throw new RuntimeException ("Writing records to impala jdbc failed." , e );
198- }
193+ flush ();
199194 }
200195 }, batchWaitInterval , batchWaitInterval , TimeUnit .MILLISECONDS );
201196 }
@@ -234,16 +229,16 @@ private void openJdbc() {
234229 }
235230 }
236231
237- private void flush () throws Exception {
238- if (batchCount > 0 ) {
239- if (updateMode .equalsIgnoreCase (UPDATE_MODE )) {
240- executeUpdateBatch ();
241- }
242- if (!rowDataMap .isEmpty ()) {
243- String templateSql =
232+ private synchronized void flush () {
233+ try {
234+ if (batchCount > 0 ) {
235+ if (updateMode .equalsIgnoreCase (UPDATE_MODE )) {
236+ executeUpdateBatch ();
237+ }
238+ if (!rowDataMap .isEmpty ()) {
239+ String templateSql =
244240 "INSERT INTO tableName ${tableFieldsCondition} PARTITION ${partitionCondition} VALUES ${valuesCondition}" ;
245- executeBatchSql (
246- statement ,
241+ executeBatchSql (
247242 templateSql ,
248243 schema ,
249244 tableName ,
@@ -252,12 +247,15 @@ private void flush() throws Exception {
252247 valueFieldNames ,
253248 partitionFields ,
254249 rowDataMap
255- );
256- rowDataMap .clear ();
250+ );
251+ rowDataMap .clear ();
252+ }
257253 }
254+ batchCount = 0 ;
255+ } catch (Exception e ) {
256+ LOG .error ("Writing records to impala jdbc failed." , e );
257+ throw new RuntimeException ("Writing records to impala jdbc failed." , e );
258258 }
259- batchCount = 0 ;
260-
261259 }
262260
263261 /**
@@ -285,8 +283,8 @@ private void executeUpdateBatch() throws SQLException {
285283 rows .clear ();
286284 } catch (Exception e ) {
287285 LOG .debug ("impala jdbc execute batch error " , e );
288- connection . rollback ( );
289- connection .commit ();
286+ JDBCUtils . rollBack ( connection );
287+ JDBCUtils .commit (connection );
290288 updateStatement .clearBatch ();
291289 executeUpdate (connection );
292290 }
@@ -297,14 +295,10 @@ public void executeUpdate(Connection connection) {
297295 try {
298296 setRecordToStatement (updateStatement , JDBCTypeConvertUtils .getSqlTypeFromFieldType (fieldTypes ), row );
299297 updateStatement .executeUpdate ();
300- connection .commit ();
298+ JDBCUtils .commit (connection );
301299 } catch (Exception e ) {
302- try {
303- connection .rollback ();
304- connection .commit ();
305- } catch (SQLException e1 ) {
306- throw new RuntimeException (e1 );
307- }
300+ JDBCUtils .rollBack (connection );
301+ JDBCUtils .commit (connection );
308302 if (metricOutputFormat .outDirtyRecords .getCount () % DIRTY_DATA_PRINT_FREQUENCY == 0 || LOG .isDebugEnabled ()) {
309303 LOG .error ("record insert failed ,this row is {}" , row .toString ());
310304 LOG .error ("" , e );
@@ -351,40 +345,6 @@ private List<String> rebuildFieldNameListAndTypeList(List<String> fieldNames, Li
351345 return valueFields ;
352346 }
353347
354- /**
355- * Quote a specific type of value, like string, timestamp
356- * before: 1, cast(tiezhu as string), cast(2001-01-09 01:05:01 as timestamp), cast(123 as int)
357- * after: 1, cast('tiezhu' as string), cast('2001-01-09 01:05:01' as timestamp), cast(123 as int)
358- * if cast value is null, then cast(null as type)
359- *
360- * @param valueCondition original value condition
361- * @return quoted condition
362- */
363- private String valueConditionAddQuotation (String valueCondition ) {
364- String [] temps = valueCondition .split ("," );
365- List <String > replacedItem = new ArrayList <>();
366- Arrays .stream (temps ).forEach (
367- item -> {
368- Matcher matcher = TYPE_PATTERN .matcher (item );
369- while (matcher .find ()) {
370- String value = matcher .group (1 );
371- String type = matcher .group (2 );
372-
373- for (String needQuoteType : NEED_QUOTE_TYPE ) {
374- if (type .contains (needQuoteType )) {
375- if (!"null" .equals (value )) {
376- item = item .replace (value , "'" + value + "'" );
377- }
378- }
379- }
380- }
381- replacedItem .add (item );
382- }
383- );
384-
385- return "(" + String .join (", " , replacedItem ) + ")" ;
386- }
387-
388348 @ Override
389349 public void writeRecord (Tuple2 <Boolean , Row > record ) throws IOException {
390350 try {
@@ -419,7 +379,7 @@ public void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
419379 for (int i = 0 ; i < fieldTypes .size (); i ++) {
420380 rowValue .setField (i , valueMap .get (valueFieldNames .get (i )));
421381 }
422- rowTuple2 .f1 = valueConditionAddQuotation ( buildValuesCondition (fieldTypes , rowValue ) );
382+ rowTuple2 .f1 = buildValuesCondition (fieldTypes , rowValue );
423383 putRowIntoMap (rowDataMap , rowTuple2 );
424384 }
425385
@@ -443,11 +403,7 @@ public void close() throws IOException {
443403 }
444404 // 将还未执行的SQL flush
445405 if (batchCount > 0 ) {
446- try {
447- flush ();
448- } catch (Exception e ) {
449- throw new RuntimeException ("Writing records to impala failed." , e );
450- }
406+ flush ();
451407 }
452408 // cancel scheduled task
453409 if (this .scheduledFuture != null ) {
@@ -481,63 +437,98 @@ public void close() throws IOException {
481437 * execute batch sql from row data map
482438 * sql like 'insert into tableName(f1, f2, f3) ${partitionCondition} values(v1, v2, v3), (v4, v5, v6)....
483439 *
484- * @param statement execute statement
485440 * @param tempSql template sql
486441 * @param storeType the store type of data
487442 * @param enablePartition enable partition or not
488443 * @param fieldNames field name list
489444 * @param partitionFields partition fields
490445 * @param rowDataMap row data map
491446 */
492- private void executeBatchSql (Statement statement ,
493- String tempSql ,
447+ private void executeBatchSql (String tempSql ,
494448 String schema ,
495449 String tableName ,
496450 String storeType ,
497451 Boolean enablePartition ,
498452 List <String > fieldNames ,
499453 String partitionFields ,
500454 Map <String , ArrayList <String >> rowDataMap ) {
501- StringBuilder valuesCondition = new StringBuilder ();
502455 StringBuilder partitionCondition = new StringBuilder ();
503456 String tableFieldsCondition = buildTableFieldsCondition (fieldNames , partitionFields );
504- ArrayList <String > rowData ;
457+ ArrayList <String > rowData = new ArrayList <>() ;
505458 String tableNameInfo = Objects .isNull (schema ) ?
506459 tableName : quoteIdentifier (schema ) + "." + tableName ;
507460 tempSql = tempSql .replace ("tableName" , tableNameInfo );
461+ boolean isPartitioned = storeType .equalsIgnoreCase (KUDU_TYPE ) || !enablePartition ;
508462
509- // kudu ${partitionCondition} is null
510- if (storeType .equalsIgnoreCase (KUDU_TYPE ) || !enablePartition ) {
511- try {
512- rowData = rowDataMap .get (NO_PARTITION );
513- rowData .forEach (row -> valuesCondition .append (row ).append (", " ));
514- String executeSql = tempSql .replace (VALUES_CONDITION , valuesCondition .toString ())
463+ try {
464+ // kudu ${partitionCondition} is null
465+ if (isPartitioned ) {
466+ tempSql = tempSql
467+ .replace (PARTITION_CONDITION , partitionCondition .toString ())
468+ .replace (PARTITION_CONSTANT , "" )
469+ .replace (TABLE_FIELDS_CONDITION , tableFieldsCondition );
470+ rowData .addAll (rowDataMap .get (NO_PARTITION ));
471+ String executeSql = tempSql .replace (VALUES_CONDITION , String .join (", " , rowData ));
472+ statement .execute (executeSql );
473+ rowData .clear ();
474+ } else {
475+ // partition sql
476+ Set <String > keySet = rowDataMap .keySet ();
477+ for (String key : keySet ) {
478+ rowData .addAll (rowDataMap .get (key ));
479+ partitionCondition .append (key );
480+ tempSql = tempSql
515481 .replace (PARTITION_CONDITION , partitionCondition .toString ())
516- .replace (PARTITION_CONSTANT , "" )
517482 .replace (TABLE_FIELDS_CONDITION , tableFieldsCondition );
518- String substring = executeSql .substring (0 , executeSql .length () - 2 );
519- statement .execute (substring );
520- } catch (Exception e ) {
521- throw new RuntimeException ("execute impala SQL error!" , e );
483+ String executeSql = tempSql
484+ .replace (VALUES_CONDITION , String .join (", " , rowData ));
485+ statement .execute (executeSql );
486+ partitionCondition .delete (0 , partitionCondition .length ());
487+ }
522488 }
523- return ;
489+ } catch (Exception e ) {
490+ if (e instanceof SQLException ) {
491+ dealBatchSqlError (rowData , connection , statement , tempSql );
492+ } else {
493+ throw new RuntimeException ("Insert into impala error!" , e );
494+ }
495+ } finally {
496+ rowData .clear ();
524497 }
498+ }
525499
526- // partition sql
527- Set <String > keySet = rowDataMap .keySet ();
528- String finalTempSql = tempSql ;
529- for (String key : keySet ) {
500+ /**
501+ * 当批量写入失败时,把批量的sql拆解为单条sql提交,对于单条写入的sql记做脏数据
502+ *
503+ * @param rowData 批量的values
504+ * @param connection 当前数据库connect
505+ * @param statement 当前statement
506+ * @param templateSql 模版sql,例如insert into tableName(f1, f2, f3) [partition] values $valueCondition
507+ */
508+ private void dealBatchSqlError (List <String > rowData ,
509+ Connection connection ,
510+ Statement statement ,
511+ String templateSql ) {
512+ String errorMsg = "Insert into impala error. \n Cause: [%s]\n Row: [%s]" ;
513+ JDBCUtils .rollBack (connection );
514+ JDBCUtils .commit (connection );
515+ for (String rowDatum : rowData ) {
516+ String executeSql = templateSql .replace (VALUES_CONDITION , rowDatum );
530517 try {
531- String executeSql = String .copyValueOf (finalTempSql .toCharArray ());
532- ArrayList <String > valuesConditionList = rowDataMap .get (key );
533- partitionCondition .append (key );
534- executeSql = executeSql .replace (PARTITION_CONDITION , partitionCondition .toString ())
535- .replace (TABLE_FIELDS_CONDITION , tableFieldsCondition )
536- .replace (VALUES_CONDITION , String .join (", " , valuesConditionList ));
537518 statement .execute (executeSql );
538- partitionCondition .delete (0 , partitionCondition .length ());
539- } catch (SQLException sqlException ) {
540- throw new RuntimeException ("execute impala SQL error! " , sqlException );
519+ JDBCUtils .commit (connection );
520+ } catch (SQLException e ) {
521+ JDBCUtils .rollBack (connection );
522+ JDBCUtils .commit (connection );
523+ if (metricOutputFormat .outDirtyRecords .getCount () % DIRTY_DATA_PRINT_FREQUENCY == 0 || LOG .isDebugEnabled ()) {
524+ LOG .error (
525+ String .format (
526+ errorMsg ,
527+ ExceptionTrace .traceOriginalCause (e ),
528+ rowDatum )
529+ );
530+ }
531+ metricOutputFormat .outDirtyRecords .inc ();
541532 }
542533 }
543534 }
@@ -580,22 +571,27 @@ private String buildTableFieldsCondition(List<String> fieldNames, String partiti
580571 * replace ${valuesCondition}
581572 *
582573 * @param fieldTypes field types
583- * @return condition like '(?, ?, cast(? as string))' and '?' will be replaced with row data
574+ * @return condition like '(?, ?, cast('?' as string))' and '?' will be replaced with row data
584575 */
585576 private String buildValuesCondition (List <String > fieldTypes , Row row ) {
586577 String valuesCondition = fieldTypes .stream ().map (
587578 f -> {
588579 for (String item : NEED_QUOTE_TYPE ) {
589580 if (f .toLowerCase ().contains (item )) {
590- return String .format ("cast(? as %s)" , f .toLowerCase ());
581+ return String .format ("cast('?' as %s)" , f .toLowerCase ());
591582 }
592583 }
593584 return "?" ;
594585 }).collect (Collectors .joining (", " ));
595586 for (int i = 0 ; i < row .getArity (); i ++) {
596- valuesCondition = valuesCondition .replaceFirst ("\\ ?" , Objects .isNull (row .getField (i )) ? "null" : row .getField (i ).toString ());
587+ Object rowField = row .getField (i );
588+ if (DtStringUtil .isEmptyOrNull (rowField )) {
589+ valuesCondition = valuesCondition .replaceFirst ("'\\ ?'" , "null" );
590+ } else {
591+ valuesCondition = valuesCondition .replaceFirst ("\\ ?" , rowField .toString ());
592+ }
597593 }
598- return valuesCondition ;
594+ return "(" + valuesCondition + ")" ;
599595 }
600596
601597 /**
0 commit comments