Skip to content

Commit 6d396e5

Browse files
author
gituser
committed
Merge branch '1.8_release_3.10.x' into 1.8_release_4.0.x
2 parents 40e8106 + e73b8ff commit 6d396e5

File tree

4 files changed

+26
-11
lines changed

4 files changed

+26
-11
lines changed

core/src/main/java/com/dtstack/flink/sql/format/dtnest/DtNestRowDeserializationSchema.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,7 @@ private void parseTree(JsonNode jsonNode, String prefix) {
106106
String nodeKey = getNodeKey(prefix, next);
107107

108108
nodeAndJsonNodeMapping.put(nodeKey, child);
109-
if (child.isArray()) {
110-
parseTree(child, nodeKey);
111-
} else {
112-
parseTree(child, nodeKey);
113-
}
109+
parseTree(child, nodeKey);
114110
}
115111
}
116112

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableInfo.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
* limitations under the License.
1717
*/
1818

19-
20-
2119
package com.dtstack.flink.sql.table;
2220

2321
import com.google.common.collect.Lists;
@@ -178,9 +176,9 @@ public void setPhysicalFields(Map<String, String> physicalFields) {
178176
}
179177

180178
public void finish(){
181-
this.fields = fieldList.toArray(new String[fieldList.size()]);
182-
this.fieldClasses = fieldClassList.toArray(new Class[fieldClassList.size()]);
183-
this.fieldTypes = fieldTypeList.toArray(new String[fieldTypeList.size()]);
179+
this.fields = fieldList.toArray(new String[0]);
180+
this.fieldClasses = fieldClassList.toArray(new Class[0]);
181+
this.fieldTypes = fieldTypeList.toArray(new String[0]);
184182
}
185183

186184
/**

core/src/main/java/com/dtstack/flink/sql/util/DateUtil.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,22 @@ public static Date localDateTimetoDate(LocalDateTime localDateTime){
108108
}
109109

110110
public static LocalDateTime dateToLocalDateTime(Date date){
111+
date = transformSqlDateToUtilDate(date);
111112
return date.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();
112113
}
113114

115+
/**
116+
* 将java.sql.Date 转化为 java.util.Date
117+
* @param date 不知道是java.sql.Date 还是 java.util.Date
118+
* @return 最后返回 java.util.Date
119+
*/
120+
public static Date transformSqlDateToUtilDate(Date date) {
121+
if (date instanceof java.sql.Date) {
122+
date = new Date(date.getTime());
123+
}
124+
return date;
125+
}
126+
114127
/**
115128
*
116129
*

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/CustomerSinkFunc.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.sink.elasticsearch;
2020

21+
import com.dtstack.flink.sql.util.DateUtil;
2122
import org.apache.flink.api.common.functions.RuntimeContext;
2223
import org.apache.flink.api.java.tuple.Tuple2;
2324
import org.apache.flink.metrics.Counter;
@@ -31,6 +32,7 @@
3132
import org.slf4j.Logger;
3233
import org.slf4j.LoggerFactory;
3334

35+
import java.sql.Date;
3436
import java.util.List;
3537
import java.util.Map;
3638
import java.util.stream.Collectors;
@@ -42,7 +44,9 @@
4244
public class CustomerSinkFunc implements ElasticsearchSinkFunction<Tuple2> {
4345

4446
private final Logger logger = LoggerFactory.getLogger(CustomerSinkFunc.class);
45-
/** 用作ID的属性值连接符号 */
47+
/**
48+
* 用作ID的属性值连接符号
49+
*/
4650
private static final String ID_VALUE_SPLIT = "_";
4751

4852
private String index;
@@ -107,6 +111,10 @@ private IndexRequest createIndexRequest(Row element) {
107111
Map<String, Object> dataMap = Es6Util.rowToJsonMap(element, fieldNames, fieldTypes);
108112
int length = Math.min(element.getArity(), fieldNames.size());
109113
for (int i = 0; i < length; i++) {
114+
if (element.getField(i) instanceof Date) {
115+
dataMap.put(fieldNames.get(i), DateUtil.transformSqlDateToUtilDate((Date) element.getField(i)));
116+
continue;
117+
}
110118
dataMap.put(fieldNames.get(i), element.getField(i));
111119
}
112120

0 commit comments

Comments
 (0)