4242import java .sql .SQLException ;
4343import java .sql .Statement ;
4444import java .util .ArrayList ;
45+ import java .util .Arrays ;
4546import java .util .HashMap ;
4647import java .util .List ;
4748import java .util .Map ;
@@ -72,8 +73,10 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
7273
7374 // ${field}
7475 private static final Pattern STATIC_PARTITION_PATTERN = Pattern .compile ("\\ $\\ {([^}]*)}" );
75- // cast(value as string) -> cast('value' as string)
76- private static final Pattern STRING_TYPE_PATTERN = Pattern .compile ("cast\\ ((.*) as string\\ )" );
76+ // cast(value as string) -> cast('value' as string) cast(value as timestamp) -> cast('value' as timestamp)
77+ private static final Pattern TYPE_PATTERN = Pattern .compile ("cast\\ ((.*) as (.*)\\ )" );
78+ //specific type which values need to be quoted
79+ private static final String [] NEED_QUOTE_TYPE = {"string" , "timestamp" };
7780
7881 private static final Integer DEFAULT_CONN_TIME_OUT = 60 ;
7982 private static final int RECEIVE_DATA_PRINT_FREQUENCY = 1000 ;
@@ -83,6 +86,7 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
8386 private static final String UPDATE_MODE = "update" ;
8487 private static final String PARTITION_CONSTANT = "PARTITION" ;
8588 private static final String STRING_TYPE = "STRING" ;
89+ private static final String TIMESTAMP_TYPE = "TIMESTAMP" ;
8690 private static final String DRIVER_NAME = "com.cloudera.impala.jdbc41.Driver" ;
8791
8892 private static final String VALUES_CONDITION = "${valuesCondition}" ;
@@ -353,6 +357,32 @@ private List<String> rebuildFieldNameListAndTypeList(List<String> fieldNames, Li
353357 return valueFields ;
354358 }
355359
360+ /**
361+ * Quote a specific type of value, like string, timestamp
362+ * before: 1, cast(tiezhu as string), cast(2001-01-09 01:05:01 as timestamp), cast(123 as int)
363+ * after: 1, cast('tiezhu' as string), cast('2001-01-09 01:05:01' as timestamp), cast(123 as int)
364+ *
365+ * @param valueCondition original value condition
366+ * @return quoted condition
367+ */
368+ private String valueConditionAddQuotation (String valueCondition ) {
369+ final String [] valueConditionCopy = {valueCondition };
370+ String [] temps = valueCondition .split ("," );
371+ Arrays .stream (temps ).forEach (
372+ item -> {
373+ Matcher matcher = TYPE_PATTERN .matcher (item );
374+ while (matcher .find ()) {
375+ String value = matcher .group (1 );
376+ String type = matcher .group (2 );
377+ if (Arrays .asList (NEED_QUOTE_TYPE ).contains (type )) {
378+ valueConditionCopy [0 ] = valueConditionCopy [0 ].replace (value , "'" + value + "'" );
379+ }
380+ }
381+ }
382+ );
383+ return valueConditionCopy [0 ];
384+ }
385+
356386 @ Override
357387 public synchronized void writeRecord (Tuple2 <Boolean , Row > record ) throws IOException {
358388 try {
@@ -387,7 +417,7 @@ public synchronized void writeRecord(Tuple2<Boolean, Row> record) throws IOExcep
387417 for (int i = 0 ; i < fieldTypes .size (); i ++) {
388418 rowValue .setField (i , valueMap .get (valueFieldNames .get (i )));
389419 }
390- rowTuple2 .f1 = buildValuesCondition (fieldTypes , rowValue );
420+ rowTuple2 .f1 = valueConditionAddQuotation ( buildValuesCondition (fieldTypes , rowValue ) );
391421 putRowIntoMap (rowDataMap , rowTuple2 );
392422 }
393423
@@ -552,15 +582,19 @@ private String buildTableFieldsCondition(List<String> fieldNames, String partiti
552582 private String buildValuesCondition (List <String > fieldTypes , Row row ) {
553583 String valuesCondition = "(" + fieldTypes .stream ().map (
554584 f -> {
555- if (STRING_TYPE .equals (f .toUpperCase ())) {
556- return "cast(? as string)" ;
585+ switch (f .toUpperCase ()) {
586+ case STRING_TYPE :
587+ return "cast(? as string)" ;
588+ case TIMESTAMP_TYPE :
589+ return "cast(? as timestamp)" ;
590+ default :
591+ return "?" ;
557592 }
558- return "?" ;
559593 }).collect (Collectors .joining (", " )) + ")" ;
560594 for (int i = 0 ; i < row .getArity (); i ++) {
561595 valuesCondition = valuesCondition .replaceFirst ("\\ ?" , Objects .isNull (row .getField (i )) ? "null" : row .getField (i ).toString ());
562596 }
563- Matcher matcher = STRING_TYPE_PATTERN .matcher (valuesCondition );
597+ Matcher matcher = TYPE_PATTERN .matcher (valuesCondition );
564598 while (matcher .find ()) {
565599 valuesCondition = valuesCondition .replace (matcher .group (1 ), "'" + matcher .group (1 ) + "'" );
566600 }
0 commit comments