Skip to content

Commit 0057f44

Browse files
committed
[hotfix-34789][elasticsearch5&6][sink]support timestamp datatype.
1 parent 1ef5408 commit 0057f44

File tree

5 files changed

+26
-4
lines changed

5 files changed

+26
-4
lines changed

core/src/main/java/com/dtstack/flink/sql/enums/ColumnType.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ public enum ColumnType {
102102
* timestamp
103103
*/
104104
TIMESTAMP,
105+
/**
106+
* time eg: 23:59:59
107+
*/
108+
TIME,
105109
/**
106110
* decimal
107111
*/

core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,9 @@ public static String col2string(Object column, String type) {
243243
case DATE:
244244
result = DateUtil.dateToString((java.util.Date)column);
245245
break;
246+
case TIME:
247+
result = DateUtil.getTimeFromStr(String.valueOf(column));
248+
break;
246249
case TIMESTAMP:
247250
result = DateUtil.timestampToString((java.util.Date)column);
248251
break;

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/CustomerSinkFunc.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
package com.dtstack.flink.sql.sink.elasticsearch;
2222

23+
import com.dtstack.flink.sql.util.DateUtil;
2324
import org.apache.commons.lang3.StringUtils;
2425
import org.apache.flink.api.common.functions.RuntimeContext;
2526
import org.apache.flink.api.java.tuple.Tuple2;
@@ -32,6 +33,8 @@
3233
import org.slf4j.Logger;
3334
import org.slf4j.LoggerFactory;
3435

36+
import java.sql.Date;
37+
import java.sql.Timestamp;
3538
import java.util.List;
3639
import java.util.Map;
3740
import java.util.stream.Collectors;
@@ -107,6 +110,14 @@ private IndexRequest createIndexRequest(Row element) {
107110
Map<String, Object> dataMap = EsUtil.rowToJsonMap(element,fieldNames,fieldTypes);
108111
int length = Math.min(element.getArity(), fieldNames.size());
109112
for(int i=0; i<length; i++){
113+
if (element.getField(i) instanceof Date) {
114+
dataMap.put(fieldNames.get(i), DateUtil.transformSqlDateToUtilDate((Date) element.getField(i)));
115+
continue;
116+
}
117+
if (element.getField(i) instanceof Timestamp) {
118+
dataMap.put(fieldNames.get(i), ((Timestamp) element.getField(i)).getTime());
119+
continue;
120+
}
110121
dataMap.put(fieldNames.get(i), element.getField(i));
111122
}
112123

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/CustomerSinkFunc.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.slf4j.LoggerFactory;
3434

3535
import java.sql.Date;
36+
import java.sql.Time;
37+
import java.sql.Timestamp;
3638
import java.util.List;
3739
import java.util.Map;
3840
import java.util.stream.Collectors;
@@ -115,6 +117,10 @@ private IndexRequest createIndexRequest(Row element) {
115117
dataMap.put(fieldNames.get(i), DateUtil.transformSqlDateToUtilDate((Date) element.getField(i)));
116118
continue;
117119
}
120+
if (element.getField(i) instanceof Timestamp) {
121+
dataMap.put(fieldNames.get(i), ((Timestamp) element.getField(i)).getTime());
122+
continue;
123+
}
118124
dataMap.put(fieldNames.get(i), element.getField(i));
119125
}
120126

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/Es6Util.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,11 @@
1818

1919
package com.dtstack.flink.sql.sink.elasticsearch;
2020

21+
import com.dtstack.flink.sql.util.DtStringUtil;
22+
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
2123
import org.apache.flink.types.Row;
2224
import org.apache.flink.util.Preconditions;
2325

24-
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
25-
26-
import com.dtstack.flink.sql.util.DtStringUtil;
27-
2826
import java.util.HashMap;
2927
import java.util.List;
3028
import java.util.Map;

0 commit comments

Comments
 (0)