1818
1919package com .dtstack .flink .sql .sink .impala ;
2020
21+ import com .dtstack .flink .sql .exception .ExceptionTrace ;
2122import com .dtstack .flink .sql .factory .DTThreadFactory ;
2223import com .dtstack .flink .sql .outputformat .AbstractDtRichOutputFormat ;
2324import com .dtstack .flink .sql .sink .rdb .JDBCTypeConvertUtils ;
2425import com .dtstack .flink .sql .table .AbstractTableInfo ;
26+ import com .dtstack .flink .sql .util .DtStringUtil ;
2527import com .dtstack .flink .sql .util .JDBCUtils ;
2628import com .dtstack .flink .sql .util .KrbUtils ;
2729import com .google .common .collect .Maps ;
4143import java .sql .SQLException ;
4244import java .sql .Statement ;
4345import java .util .ArrayList ;
44- import java .util .Arrays ;
4546import java .util .HashMap ;
4647import java .util .List ;
4748import java .util .Map ;
@@ -72,8 +73,6 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
7273
7374 // ${field}
7475 private static final Pattern STATIC_PARTITION_PATTERN = Pattern .compile ("\\ $\\ {([^}]*)}" );
75- // cast(value as string) -> cast('value' as string) cast(value as timestamp) -> cast('value' as timestamp)
76- private static final Pattern TYPE_PATTERN = Pattern .compile ("cast\\ ((.*) as (.*)\\ )" );
7776 //specific type which values need to be quoted
7877 private static final String [] NEED_QUOTE_TYPE = {"string" , "timestamp" , "varchar" };
7978
@@ -192,12 +191,7 @@ private void initScheduledTask(Long batchWaitInterval) {
192191 new DTThreadFactory ("impala-upsert-output-format" ));
193192 this .scheduledFuture = this .scheduler .scheduleWithFixedDelay (() -> {
194193 synchronized (ImpalaOutputFormat .this ) {
195- try {
196- flush ();
197- } catch (Exception e ) {
198- LOG .error ("Writing records to impala jdbc failed." , e );
199- throw new RuntimeException ("Writing records to impala jdbc failed." , e );
200- }
194+ flush ();
201195 }
202196 }, batchWaitInterval , batchWaitInterval , TimeUnit .MILLISECONDS );
203197 }
@@ -236,16 +230,16 @@ private void openJdbc() {
236230 }
237231 }
238232
239- private void flush () throws Exception {
240- if (batchCount > 0 ) {
241- if (updateMode .equalsIgnoreCase (UPDATE_MODE )) {
242- executeUpdateBatch ();
243- }
244- if (!rowDataMap .isEmpty ()) {
245- String templateSql =
233+ private synchronized void flush () {
234+ try {
235+ if (batchCount > 0 ) {
236+ if (updateMode .equalsIgnoreCase (UPDATE_MODE )) {
237+ executeUpdateBatch ();
238+ }
239+ if (!rowDataMap .isEmpty ()) {
240+ String templateSql =
246241 "INSERT INTO tableName ${tableFieldsCondition} PARTITION ${partitionCondition} VALUES ${valuesCondition}" ;
247- executeBatchSql (
248- statement ,
242+ executeBatchSql (
249243 templateSql ,
250244 schema ,
251245 tableName ,
@@ -254,12 +248,15 @@ private void flush() throws Exception {
254248 valueFieldNames ,
255249 partitionFields ,
256250 rowDataMap
257- );
258- rowDataMap .clear ();
251+ );
252+ rowDataMap .clear ();
253+ }
259254 }
255+ batchCount = 0 ;
256+ } catch (Exception e ) {
257+ LOG .error ("Writing records to impala jdbc failed." , e );
258+ throw new RuntimeException ("Writing records to impala jdbc failed." , e );
260259 }
261- batchCount = 0 ;
262-
263260 }
264261
265262 /**
@@ -287,8 +284,8 @@ private void executeUpdateBatch() throws SQLException {
287284 rows .clear ();
288285 } catch (Exception e ) {
289286 LOG .debug ("impala jdbc execute batch error " , e );
290- connection . rollback ( );
291- connection .commit ();
287+ JDBCUtils . rollBack ( connection );
288+ JDBCUtils .commit (connection );
292289 updateStatement .clearBatch ();
293290 executeUpdate (connection );
294291 }
@@ -299,14 +296,10 @@ public void executeUpdate(Connection connection) {
299296 try {
300297 setRecordToStatement (updateStatement , JDBCTypeConvertUtils .getSqlTypeFromFieldType (fieldTypes ), row );
301298 updateStatement .executeUpdate ();
302- connection .commit ();
299+ JDBCUtils .commit (connection );
303300 } catch (Exception e ) {
304- try {
305- connection .rollback ();
306- connection .commit ();
307- } catch (SQLException e1 ) {
308- throw new RuntimeException (e1 );
309- }
301+ JDBCUtils .rollBack (connection );
302+ JDBCUtils .commit (connection );
310303 if (metricOutputFormat .outDirtyRecords .getCount () % DIRTY_DATA_PRINT_FREQUENCY == 0 || LOG .isDebugEnabled ()) {
311304 LOG .error ("record insert failed ,this row is {}" , row .toString ());
312305 LOG .error ("" , e );
@@ -353,40 +346,6 @@ private List<String> rebuildFieldNameListAndTypeList(List<String> fieldNames, Li
353346 return valueFields ;
354347 }
355348
356- /**
357- * Quote a specific type of value, like string, timestamp
358- * before: 1, cast(tiezhu as string), cast(2001-01-09 01:05:01 as timestamp), cast(123 as int)
359- * after: 1, cast('tiezhu' as string), cast('2001-01-09 01:05:01' as timestamp), cast(123 as int)
360- * if cast value is null, then cast(null as type)
361- *
362- * @param valueCondition original value condition
363- * @return quoted condition
364- */
365- private String valueConditionAddQuotation (String valueCondition ) {
366- String [] temps = valueCondition .split ("," );
367- List <String > replacedItem = new ArrayList <>();
368- Arrays .stream (temps ).forEach (
369- item -> {
370- Matcher matcher = TYPE_PATTERN .matcher (item );
371- while (matcher .find ()) {
372- String value = matcher .group (1 );
373- String type = matcher .group (2 );
374-
375- for (String needQuoteType : NEED_QUOTE_TYPE ) {
376- if (type .contains (needQuoteType )) {
377- if (!"null" .equals (value )) {
378- item = item .replace (value , "'" + value + "'" );
379- }
380- }
381- }
382- }
383- replacedItem .add (item );
384- }
385- );
386-
387- return "(" + String .join (", " , replacedItem ) + ")" ;
388- }
389-
390349 @ Override
391350 public void writeRecord (Tuple2 <Boolean , Row > record ) throws IOException {
392351 try {
@@ -421,7 +380,7 @@ public void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
421380 for (int i = 0 ; i < fieldTypes .size (); i ++) {
422381 rowValue .setField (i , valueMap .get (valueFieldNames .get (i )));
423382 }
424- rowTuple2 .f1 = valueConditionAddQuotation ( buildValuesCondition (fieldTypes , rowValue ) );
383+ rowTuple2 .f1 = buildValuesCondition (fieldTypes , rowValue );
425384 putRowIntoMap (rowDataMap , rowTuple2 );
426385 }
427386
@@ -445,11 +404,7 @@ public void close() throws IOException {
445404 }
446405 // 将还未执行的SQL flush
447406 if (batchCount > 0 ) {
448- try {
449- flush ();
450- } catch (Exception e ) {
451- throw new RuntimeException ("Writing records to impala failed." , e );
452- }
407+ flush ();
453408 }
454409 // cancel scheduled task
455410 if (this .scheduledFuture != null ) {
@@ -483,63 +438,98 @@ public void close() throws IOException {
483438 * execute batch sql from row data map
484439 * sql like 'insert into tableName(f1, f2, f3) ${partitionCondition} values(v1, v2, v3), (v4, v5, v6)....
485440 *
486- * @param statement execute statement
487441 * @param tempSql template sql
488442 * @param storeType the store type of data
489443 * @param enablePartition enable partition or not
490444 * @param fieldNames field name list
491445 * @param partitionFields partition fields
492446 * @param rowDataMap row data map
493447 */
494- private void executeBatchSql (Statement statement ,
495- String tempSql ,
448+ private void executeBatchSql (String tempSql ,
496449 String schema ,
497450 String tableName ,
498451 String storeType ,
499452 Boolean enablePartition ,
500453 List <String > fieldNames ,
501454 String partitionFields ,
502455 Map <String , ArrayList <String >> rowDataMap ) {
503- StringBuilder valuesCondition = new StringBuilder ();
504456 StringBuilder partitionCondition = new StringBuilder ();
505457 String tableFieldsCondition = buildTableFieldsCondition (fieldNames , partitionFields );
506- ArrayList <String > rowData ;
458+ ArrayList <String > rowData = new ArrayList <>() ;
507459 String tableNameInfo = Objects .isNull (schema ) ?
508460 tableName : quoteIdentifier (schema ) + "." + tableName ;
509461 tempSql = tempSql .replace ("tableName" , tableNameInfo );
462+ boolean isPartitioned = storeType .equalsIgnoreCase (KUDU_TYPE ) || !enablePartition ;
510463
511- // kudu ${partitionCondition} is null
512- if (storeType .equalsIgnoreCase (KUDU_TYPE ) || !enablePartition ) {
513- try {
514- rowData = rowDataMap .get (NO_PARTITION );
515- rowData .forEach (row -> valuesCondition .append (row ).append (", " ));
516- String executeSql = tempSql .replace (VALUES_CONDITION , valuesCondition .toString ())
464+ try {
465+ // kudu ${partitionCondition} is null
466+ if (isPartitioned ) {
467+ tempSql = tempSql
468+ .replace (PARTITION_CONDITION , partitionCondition .toString ())
469+ .replace (PARTITION_CONSTANT , "" )
470+ .replace (TABLE_FIELDS_CONDITION , tableFieldsCondition );
471+ rowData .addAll (rowDataMap .get (NO_PARTITION ));
472+ String executeSql = tempSql .replace (VALUES_CONDITION , String .join (", " , rowData ));
473+ statement .execute (executeSql );
474+ rowData .clear ();
475+ } else {
476+ // partition sql
477+ Set <String > keySet = rowDataMap .keySet ();
478+ for (String key : keySet ) {
479+ rowData .addAll (rowDataMap .get (key ));
480+ partitionCondition .append (key );
481+ tempSql = tempSql
517482 .replace (PARTITION_CONDITION , partitionCondition .toString ())
518- .replace (PARTITION_CONSTANT , "" )
519483 .replace (TABLE_FIELDS_CONDITION , tableFieldsCondition );
520- String substring = executeSql .substring (0 , executeSql .length () - 2 );
521- statement .execute (substring );
522- } catch (Exception e ) {
523- throw new RuntimeException ("execute impala SQL error!" , e );
484+ String executeSql = tempSql
485+ .replace (VALUES_CONDITION , String .join (", " , rowData ));
486+ statement .execute (executeSql );
487+ partitionCondition .delete (0 , partitionCondition .length ());
488+ }
524489 }
525- return ;
490+ } catch (Exception e ) {
491+ if (e instanceof SQLException ) {
492+ dealBatchSqlError (rowData , connection , statement , tempSql );
493+ } else {
494+ throw new RuntimeException ("Insert into impala error!" , e );
495+ }
496+ } finally {
497+ rowData .clear ();
526498 }
499+ }
527500
528- // partition sql
529- Set <String > keySet = rowDataMap .keySet ();
530- String finalTempSql = tempSql ;
531- for (String key : keySet ) {
501+ /**
502+ * 当批量写入失败时,把批量的sql拆解为单条sql提交,对于单条写入的sql记做脏数据
503+ *
504+ * @param rowData 批量的values
505+ * @param connection 当前数据库connect
506+ * @param statement 当前statement
507+ * @param templateSql 模版sql,例如insert into tableName(f1, f2, f3) [partition] values $valueCondition
508+ */
509+ private void dealBatchSqlError (List <String > rowData ,
510+ Connection connection ,
511+ Statement statement ,
512+ String templateSql ) {
513+ String errorMsg = "Insert into impala error. \n Cause: [%s]\n Row: [%s]" ;
514+ JDBCUtils .rollBack (connection );
515+ JDBCUtils .commit (connection );
516+ for (String rowDatum : rowData ) {
517+ String executeSql = templateSql .replace (VALUES_CONDITION , rowDatum );
532518 try {
533- String executeSql = String .copyValueOf (finalTempSql .toCharArray ());
534- ArrayList <String > valuesConditionList = rowDataMap .get (key );
535- partitionCondition .append (key );
536- executeSql = executeSql .replace (PARTITION_CONDITION , partitionCondition .toString ())
537- .replace (TABLE_FIELDS_CONDITION , tableFieldsCondition )
538- .replace (VALUES_CONDITION , String .join (", " , valuesConditionList ));
539519 statement .execute (executeSql );
540- partitionCondition .delete (0 , partitionCondition .length ());
541- } catch (SQLException sqlException ) {
542- throw new RuntimeException ("execute impala SQL error! " , sqlException );
520+ JDBCUtils .commit (connection );
521+ } catch (SQLException e ) {
522+ JDBCUtils .rollBack (connection );
523+ JDBCUtils .commit (connection );
524+ if (metricOutputFormat .outDirtyRecords .getCount () % DIRTY_DATA_PRINT_FREQUENCY == 0 || LOG .isDebugEnabled ()) {
525+ LOG .error (
526+ String .format (
527+ errorMsg ,
528+ ExceptionTrace .traceOriginalCause (e ),
529+ rowDatum )
530+ );
531+ }
532+ metricOutputFormat .outDirtyRecords .inc ();
543533 }
544534 }
545535 }
@@ -582,22 +572,27 @@ private String buildTableFieldsCondition(List<String> fieldNames, String partiti
582572 * replace ${valuesCondition}
583573 *
584574 * @param fieldTypes field types
585- * @return condition like '(?, ?, cast(? as string))' and '?' will be replaced with row data
575+ * @return condition like '(?, ?, cast('?' as string))' and '?' will be replaced with row data
586576 */
587577 private String buildValuesCondition (List <String > fieldTypes , Row row ) {
588578 String valuesCondition = fieldTypes .stream ().map (
589579 f -> {
590580 for (String item : NEED_QUOTE_TYPE ) {
591581 if (f .toLowerCase ().contains (item )) {
592- return String .format ("cast(? as %s)" , f .toLowerCase ());
582+ return String .format ("cast('?' as %s)" , f .toLowerCase ());
593583 }
594584 }
595585 return "?" ;
596586 }).collect (Collectors .joining (", " ));
597587 for (int i = 0 ; i < row .getArity (); i ++) {
598- valuesCondition = valuesCondition .replaceFirst ("\\ ?" , Objects .isNull (row .getField (i )) ? "null" : row .getField (i ).toString ());
588+ Object rowField = row .getField (i );
589+ if (DtStringUtil .isEmptyOrNull (rowField )) {
590+ valuesCondition = valuesCondition .replaceFirst ("'\\ ?'" , "null" );
591+ } else {
592+ valuesCondition = valuesCondition .replaceFirst ("\\ ?" , rowField .toString ());
593+ }
599594 }
600- return valuesCondition ;
595+ return "(" + valuesCondition + ")" ;
601596 }
602597
603598 /**
0 commit comments