Skip to content

Commit f59ab76

Browse files
committed
[fix] support timestamp
1 parent d369012 commit f59ab76

File tree

1 file changed

+41
-7
lines changed

1 file changed

+41
-7
lines changed

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaOutputFormat.java

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.sql.SQLException;
4343
import java.sql.Statement;
4444
import java.util.ArrayList;
45+
import java.util.Arrays;
4546
import java.util.HashMap;
4647
import java.util.List;
4748
import java.util.Map;
@@ -72,8 +73,10 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
7273

7374
// ${field}
7475
private static final Pattern STATIC_PARTITION_PATTERN = Pattern.compile("\\$\\{([^}]*)}");
75-
// cast(value as string) -> cast('value' as string)
76-
private static final Pattern STRING_TYPE_PATTERN = Pattern.compile("cast\\((.*) as string\\)");
76+
// cast(value as string) -> cast('value' as string) cast(value as timestamp) -> cast('value' as timestamp)
77+
private static final Pattern TYPE_PATTERN = Pattern.compile("cast\\((.*) as (.*)\\)");
78+
//specific type which values need to be quoted
79+
private static final String[] NEED_QUOTE_TYPE = {"string", "timestamp"};
7780

7881
private static final Integer DEFAULT_CONN_TIME_OUT = 60;
7982
private static final int RECEIVE_DATA_PRINT_FREQUENCY = 1000;
@@ -83,6 +86,7 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
8386
private static final String UPDATE_MODE = "update";
8487
private static final String PARTITION_CONSTANT = "PARTITION";
8588
private static final String STRING_TYPE = "STRING";
89+
private static final String TIMESTAMP_TYPE = "TIMESTAMP";
8690
private static final String DRIVER_NAME = "com.cloudera.impala.jdbc41.Driver";
8791

8892
private static final String VALUES_CONDITION = "${valuesCondition}";
@@ -353,6 +357,32 @@ private List<String> rebuildFieldNameListAndTypeList(List<String> fieldNames, Li
353357
return valueFields;
354358
}
355359

360+
/**
361+
* Quote a specific type of value, like string, timestamp
362+
* before: 1, cast(tiezhu as string), cast(2001-01-09 01:05:01 as timestamp), cast(123 as int)
363+
* after: 1, cast('tiezhu' as string), cast('2001-01-09 01:05:01' as timestamp), cast(123 as int)
364+
*
365+
* @param valueCondition original value condition
366+
* @return quoted condition
367+
*/
368+
private String valueConditionAddQuotation(String valueCondition) {
369+
final String[] valueConditionCopy = {valueCondition};
370+
String[] temps = valueCondition.split(",");
371+
Arrays.stream(temps).forEach(
372+
item -> {
373+
Matcher matcher = TYPE_PATTERN.matcher(item);
374+
while (matcher.find()) {
375+
String value = matcher.group(1);
376+
String type = matcher.group(2);
377+
if (Arrays.asList(NEED_QUOTE_TYPE).contains(type)) {
378+
valueConditionCopy[0] = valueConditionCopy[0].replace(value, "'" + value + "'");
379+
}
380+
}
381+
}
382+
);
383+
return valueConditionCopy[0];
384+
}
385+
356386
@Override
357387
public synchronized void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
358388
try {
@@ -387,7 +417,7 @@ public synchronized void writeRecord(Tuple2<Boolean, Row> record) throws IOExcep
387417
for (int i = 0; i < fieldTypes.size(); i++) {
388418
rowValue.setField(i, valueMap.get(valueFieldNames.get(i)));
389419
}
390-
rowTuple2.f1 = buildValuesCondition(fieldTypes, rowValue);
420+
rowTuple2.f1 = valueConditionAddQuotation(buildValuesCondition(fieldTypes, rowValue));
391421
putRowIntoMap(rowDataMap, rowTuple2);
392422
}
393423

@@ -552,15 +582,19 @@ private String buildTableFieldsCondition(List<String> fieldNames, String partiti
552582
private String buildValuesCondition(List<String> fieldTypes, Row row) {
553583
String valuesCondition = "(" + fieldTypes.stream().map(
554584
f -> {
555-
if (STRING_TYPE.equals(f.toUpperCase())) {
556-
return "cast(? as string)";
585+
switch (f.toUpperCase()) {
586+
case STRING_TYPE:
587+
return "cast(? as string)";
588+
case TIMESTAMP_TYPE:
589+
return "cast(? as timestamp)";
590+
default:
591+
return "?";
557592
}
558-
return "?";
559593
}).collect(Collectors.joining(", ")) + ")";
560594
for (int i = 0; i < row.getArity(); i++) {
561595
valuesCondition = valuesCondition.replaceFirst("\\?", Objects.isNull(row.getField(i)) ? "null" : row.getField(i).toString());
562596
}
563-
Matcher matcher = STRING_TYPE_PATTERN.matcher(valuesCondition);
597+
Matcher matcher = TYPE_PATTERN.matcher(valuesCondition);
564598
while (matcher.find()) {
565599
valuesCondition = valuesCondition.replace(matcher.group(1), "'" + matcher.group(1) + "'");
566600
}

0 commit comments

Comments
 (0)