Skip to content

Commit cba9993

Browse files
committed
Merge branch 'v1.8.0_dev' of ssh://git.dtstack.cn:10022/dtstack/dt-center-flinkStreamSQL into 1.8_pushdown
2 parents 97aca74 + bf848a9 commit cba9993

File tree

13 files changed

+313
-911
lines changed

13 files changed

+313
-911
lines changed

core/src/main/java/com/dtstack/flink/sql/source/AbsDeserialization.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import org.apache.flink.metrics.Counter;
2525
import org.apache.flink.metrics.Meter;
2626
import org.apache.flink.metrics.MeterView;
27+
import org.apache.flink.types.Row;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
2730

2831
/**
2932
* add metric for source, customer Deserialization which want add metric need to extends this abs class
@@ -34,9 +37,14 @@
3437
*/
3538

3639
public abstract class AbsDeserialization<T> extends AbstractDeserializationSchema<T> {
40+
private static final Logger LOG = LoggerFactory.getLogger(AbsDeserialization.class);
3741

3842
private static final long serialVersionUID = 2176278128811784415L;
3943

44+
private static int dataPrintFrequency = 1000;
45+
46+
protected JsonDataParser jsonDataParser;
47+
4048
private transient RuntimeContext runtimeContext;
4149

4250
protected transient Counter dirtyDataCounter;
@@ -75,4 +83,27 @@ public void initMetric(){
7583
numInResolveRecord = runtimeContext.getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_RESOVED_IN_COUNTER);
7684
numInResolveRate = runtimeContext.getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_RESOVED_IN_RATE, new MeterView(numInResolveRecord, 20));
7785
}
86+
87+
protected T parseSourceData (byte[] message) {
88+
try {
89+
if (numInRecord.getCount() % dataPrintFrequency == 0) {
90+
LOG.info("receive source data:" + new String(message, "UTF-8"));
91+
}
92+
numInRecord.inc();
93+
numInBytes.inc(message.length);
94+
Row row = jsonDataParser.parseData(message);
95+
numInResolveRecord.inc();
96+
return (T) row;
97+
} catch (Exception e) {
98+
//add metric of dirty data
99+
if (dirtyDataCounter.getCount() % dataPrintFrequency == 0) {
100+
LOG.info("dirtyData: " + new String(message));
101+
LOG.error("data parse error", e);
102+
}
103+
dirtyDataCounter.inc();
104+
return null;
105+
}
106+
}
107+
108+
78109
}
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.source;
20+
21+
import com.dtstack.flink.sql.table.TableInfo;
22+
import com.google.common.base.Strings;
23+
import com.google.common.collect.Maps;
24+
import org.apache.flink.api.common.typeinfo.TypeInformation;
25+
import org.apache.flink.api.common.typeinfo.Types;
26+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
27+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
28+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
29+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
30+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
31+
import org.apache.flink.types.Row;
32+
33+
import java.io.IOException;
34+
import java.io.Serializable;
35+
import java.sql.Date;
36+
import java.sql.Time;
37+
import java.sql.Timestamp;
38+
import java.util.Iterator;
39+
import java.util.List;
40+
import java.util.Map;
41+
42+
/**
43+
* source data parse to json format
44+
*
45+
* Date: 2019/12/12
46+
* Company: www.dtstack.com
47+
* @author maqi
48+
*/
49+
public class JsonDataParser implements Serializable {
50+
51+
private final ObjectMapper objectMapper = new ObjectMapper();
52+
53+
private Map<String, String> rowAndFieldMapping;
54+
private Map<String, JsonNode> nodeAndJsonNodeMapping = Maps.newHashMap();
55+
56+
private final String[] fieldNames;
57+
private final TypeInformation<?>[] fieldTypes;
58+
private List<TableInfo.FieldExtraInfo> fieldExtraInfos;
59+
60+
public JsonDataParser(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping, List<TableInfo.FieldExtraInfo> fieldExtraInfos) {
61+
this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
62+
this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
63+
this.rowAndFieldMapping = rowAndFieldMapping;
64+
this.fieldExtraInfos = fieldExtraInfos;
65+
}
66+
67+
68+
public Row parseData(byte[] data) throws IOException {
69+
JsonNode root = objectMapper.readTree(data);
70+
parseTree(root, null);
71+
Row row = new Row(fieldNames.length);
72+
73+
try {
74+
for (int i = 0; i < fieldNames.length; i++) {
75+
JsonNode node = getIgnoreCase(fieldNames[i]);
76+
TableInfo.FieldExtraInfo fieldExtraInfo = fieldExtraInfos.get(i);
77+
78+
if (node == null) {
79+
if (fieldExtraInfo != null && fieldExtraInfo.getNotNull()) {
80+
throw new IllegalStateException("Failed to find field with name '"
81+
+ fieldNames[i] + "'.");
82+
} else {
83+
row.setField(i, null);
84+
}
85+
} else {
86+
// Read the value as specified type
87+
Object value = convert(node, fieldTypes[i]);
88+
row.setField(i, value);
89+
}
90+
}
91+
return row;
92+
} finally {
93+
nodeAndJsonNodeMapping.clear();
94+
}
95+
}
96+
97+
private void parseTree(JsonNode jsonNode, String prefix){
98+
if (jsonNode.isArray()) {
99+
ArrayNode array = (ArrayNode) jsonNode;
100+
for (int i = 0; i < array.size(); i++) {
101+
JsonNode child = array.get(i);
102+
String nodeKey = getNodeKey(prefix, i);
103+
104+
if (child.isValueNode()) {
105+
nodeAndJsonNodeMapping.put(nodeKey, child);
106+
} else {
107+
if (rowAndFieldMapping.containsValue(nodeKey)) {
108+
nodeAndJsonNodeMapping.put(nodeKey, child);
109+
}
110+
parseTree(child, nodeKey);
111+
}
112+
}
113+
return;
114+
}
115+
Iterator<String> iterator = jsonNode.fieldNames();
116+
while (iterator.hasNext()){
117+
String next = iterator.next();
118+
JsonNode child = jsonNode.get(next);
119+
String nodeKey = getNodeKey(prefix, next);
120+
121+
if (child.isValueNode()){
122+
nodeAndJsonNodeMapping.put(nodeKey, child);
123+
}else if(child.isArray()){
124+
parseTree(child, nodeKey);
125+
}else {
126+
parseTree(child, nodeKey);
127+
}
128+
}
129+
}
130+
131+
private JsonNode getIgnoreCase(String key) {
132+
String nodeMappingKey = rowAndFieldMapping.getOrDefault(key, key);
133+
return nodeAndJsonNodeMapping.get(nodeMappingKey);
134+
}
135+
136+
private String getNodeKey(String prefix, String nodeName){
137+
if(Strings.isNullOrEmpty(prefix)){
138+
return nodeName;
139+
}
140+
return prefix + "." + nodeName;
141+
}
142+
143+
private String getNodeKey(String prefix, int i) {
144+
if (Strings.isNullOrEmpty(prefix)) {
145+
return "[" + i + "]";
146+
}
147+
return prefix + "[" + i + "]";
148+
}
149+
150+
private Object convert(JsonNode node, TypeInformation<?> info) {
151+
if (info.getTypeClass().equals(Types.BOOLEAN.getTypeClass())) {
152+
return node.asBoolean();
153+
} else if (info.getTypeClass().equals(Types.STRING.getTypeClass())) {
154+
return node.asText();
155+
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
156+
return Date.valueOf(node.asText());
157+
} else if (info.getTypeClass().equals(Types.SQL_TIME.getTypeClass())) {
158+
// local zone
159+
return Time.valueOf(node.asText());
160+
} else if (info.getTypeClass().equals(Types.SQL_TIMESTAMP.getTypeClass())) {
161+
// local zone
162+
return Timestamp.valueOf(node.asText());
163+
} else {
164+
// for types that were specified without JSON schema
165+
// e.g. POJOs
166+
try {
167+
return objectMapper.treeToValue(node, info.getTypeClass());
168+
} catch (JsonProcessingException e) {
169+
throw new IllegalStateException("Unsupported type information '" + info + "' for node: " + node);
170+
}
171+
}
172+
}
173+
}

core/src/main/java/com/dtstack/flink/sql/table/AbsSourceParser.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ public abstract class AbsSourceParser extends AbsTableParser {
3939
private static final String VIRTUAL_KEY = "virtualFieldKey";
4040
private static final String WATERMARK_KEY = "waterMarkKey";
4141
private static final String NOTNULL_KEY = "notNullKey";
42+
private static final String NEST_JSON_FIELD_KEY = "nestFieldKey";
4243

44+
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
4345
private static Pattern virtualFieldKeyPattern = Pattern.compile("(?i)^(\\S+\\([^\\)]+\\))\\s+AS\\s+(\\w+)$");
4446
private static Pattern waterMarkKeyPattern = Pattern.compile("(?i)^\\s*WATERMARK\\s+FOR\\s+(\\S+)\\s+AS\\s+withOffset\\(\\s*(\\S+)\\s*,\\s*(\\d+)\\s*\\)$");
4547
private static Pattern notNullKeyPattern = Pattern.compile("(?i)^(\\w+)\\s+(\\w+)\\s+NOT\\s+NULL?$");
@@ -48,10 +50,12 @@ public abstract class AbsSourceParser extends AbsTableParser {
4850
keyPatternMap.put(VIRTUAL_KEY, virtualFieldKeyPattern);
4951
keyPatternMap.put(WATERMARK_KEY, waterMarkKeyPattern);
5052
keyPatternMap.put(NOTNULL_KEY, notNullKeyPattern);
53+
keyPatternMap.put(NEST_JSON_FIELD_KEY, nestJsonFieldKeyPattern);
5154

5255
keyHandlerMap.put(VIRTUAL_KEY, AbsSourceParser::dealVirtualField);
5356
keyHandlerMap.put(WATERMARK_KEY, AbsSourceParser::dealWaterMark);
5457
keyHandlerMap.put(NOTNULL_KEY, AbsSourceParser::dealNotNull);
58+
keyHandlerMap.put(NEST_JSON_FIELD_KEY, AbsSourceParser::dealNestField);
5559
}
5660

5761
static void dealVirtualField(Matcher matcher, TableInfo tableInfo){
@@ -83,4 +87,25 @@ static void dealNotNull(Matcher matcher, TableInfo tableInfo) {
8387
tableInfo.addFieldType(fieldType);
8488
tableInfo.addFieldExtraInfo(fieldExtraInfo);
8589
}
90+
91+
/**
92+
* add parser for alias field
93+
* @param matcher
94+
* @param tableInfo
95+
*/
96+
static void dealNestField(Matcher matcher, TableInfo tableInfo) {
97+
String physicalField = matcher.group(1);
98+
String fieldType = matcher.group(3);
99+
String mappingField = matcher.group(4);
100+
Class fieldClass= ClassUtil.stringConvertClass(fieldType);
101+
boolean notNull = matcher.group(5) != null;
102+
TableInfo.FieldExtraInfo fieldExtraInfo = new TableInfo.FieldExtraInfo();
103+
fieldExtraInfo.setNotNull(notNull);
104+
105+
tableInfo.addPhysicalMappings(mappingField, physicalField);
106+
tableInfo.addField(mappingField);
107+
tableInfo.addFieldClass(fieldClass);
108+
tableInfo.addFieldType(fieldType);
109+
tableInfo.addFieldExtraInfo(fieldExtraInfo);
110+
}
86111
}

0 commit comments

Comments
 (0)