Skip to content

Commit c0cfa1a

Browse files
committed
resolve conflict
2 parents 74708fc + 8cb6d5a commit c0cfa1a

File tree

26 files changed

+642
-141
lines changed

26 files changed

+642
-141
lines changed

core/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,18 @@
108108
<groupId>org.apache.flink</groupId>
109109
<artifactId>flink-yarn_2.11</artifactId>
110110
<version>${flink.version}</version>
111+
<exclusions>
112+
<exclusion>
113+
<groupId>org.apache.flink</groupId>
114+
<artifactId>flink-shaded-hadoop2</artifactId>
115+
</exclusion>
116+
</exclusions>
117+
</dependency>
118+
119+
<dependency>
120+
<groupId>org.apache.flink</groupId>
121+
<artifactId>flink-shaded-hadoop2</artifactId>
122+
<version>2.7.5-1.8.1</version>
111123
</dependency>
112124

113125
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-statebackend-rocksdb -->

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.apache.flink.types.Row;
3939

4040
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
41-
import com.dtstack.flink.sql.constrant.ConfigConstrant;
4241
import com.dtstack.flink.sql.enums.ClusterMode;
4342
import com.dtstack.flink.sql.enums.ECacheType;
4443
import com.dtstack.flink.sql.enums.EPluginLoadMode;

core/src/main/java/com/dtstack/flink/sql/format/dtnest/DtNestRowDeserializationSchema.java

Lines changed: 76 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
2525
import org.apache.flink.api.common.typeinfo.TypeInformation;
2626
import org.apache.flink.api.common.typeinfo.Types;
27+
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
2728
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2829
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
2930
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -34,6 +35,7 @@
3435
import org.apache.flink.types.Row;
3536

3637
import java.io.IOException;
38+
import java.lang.reflect.Array;
3739
import java.sql.Date;
3840
import java.sql.Time;
3941
import java.sql.Timestamp;
@@ -43,7 +45,7 @@
4345

4446
/**
4547
* source data parse to json format
46-
*
48+
* <p>
4749
* Date: 2019/12/12
4850
* Company: www.dtstack.com
4951
*
@@ -53,51 +55,33 @@ public class DtNestRowDeserializationSchema extends AbstractDeserializationSchem
5355

5456
private final ObjectMapper objectMapper = new ObjectMapper();
5557

56-
private Map<String, String> rowAndFieldMapping;
57-
private Map<String, JsonNode> nodeAndJsonNodeMapping = Maps.newHashMap();
58+
private final Map<String, String> rowAndFieldMapping;
59+
private final Map<String, JsonNode> nodeAndJsonNodeMapping = Maps.newHashMap();
5860

5961
private final String[] fieldNames;
6062
private final TypeInformation<?>[] fieldTypes;
61-
private List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos;
63+
private final List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos;
64+
private final String charsetName;
6265

63-
public DtNestRowDeserializationSchema(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping, List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos) {
66+
public DtNestRowDeserializationSchema(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping,
67+
List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos,
68+
String charsetName) {
6469
this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
6570
this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
6671
this.rowAndFieldMapping = rowAndFieldMapping;
6772
this.fieldExtraInfos = fieldExtraInfos;
73+
this.charsetName = charsetName;
6874
}
6975

7076
@Override
7177
public Row deserialize(byte[] message) throws IOException {
72-
JsonNode root = objectMapper.readTree(message);
78+
String decoderStr = new String(message, charsetName);
79+
JsonNode root = objectMapper.readTree(decoderStr);
7380
this.parseTree(root, null);
74-
Row row = new Row(fieldNames.length);
75-
76-
try {
77-
for (int i = 0; i < fieldNames.length; i++) {
78-
JsonNode node = getIgnoreCase(fieldNames[i]);
79-
AbstractTableInfo.FieldExtraInfo fieldExtraInfo = fieldExtraInfos.get(i);
80-
81-
if (node == null) {
82-
if (fieldExtraInfo != null && fieldExtraInfo.getNotNull()) {
83-
throw new IllegalStateException("Failed to find field with name '"
84-
+ fieldNames[i] + "'.");
85-
} else {
86-
row.setField(i, null);
87-
}
88-
} else {
89-
// Read the value as specified type
90-
Object value = convert(node, fieldTypes[i]);
91-
row.setField(i, value);
92-
}
93-
}
94-
return row;
95-
} finally {
96-
nodeAndJsonNodeMapping.clear();
97-
}
81+
return convertTopRow();
9882
}
9983

100-
private void parseTree(JsonNode jsonNode, String prefix){
84+
private void parseTree(JsonNode jsonNode, String prefix) {
10185
if (jsonNode.isArray()) {
10286
ArrayNode array = (ArrayNode) jsonNode;
10387
for (int i = 0; i < array.size(); i++) {
@@ -116,15 +100,15 @@ private void parseTree(JsonNode jsonNode, String prefix){
116100
return;
117101
}
118102
Iterator<String> iterator = jsonNode.fieldNames();
119-
while (iterator.hasNext()){
103+
while (iterator.hasNext()) {
120104
String next = iterator.next();
121105
JsonNode child = jsonNode.get(next);
122106
String nodeKey = getNodeKey(prefix, next);
123107

124108
nodeAndJsonNodeMapping.put(nodeKey, child);
125-
if(child.isArray()){
109+
if (child.isArray()) {
126110
parseTree(child, nodeKey);
127-
}else {
111+
} else {
128112
parseTree(child, nodeKey);
129113
}
130114
}
@@ -135,8 +119,8 @@ private JsonNode getIgnoreCase(String key) {
135119
return nodeAndJsonNodeMapping.get(nodeMappingKey);
136120
}
137121

138-
private String getNodeKey(String prefix, String nodeName){
139-
if(Strings.isNullOrEmpty(prefix)){
122+
private String getNodeKey(String prefix, String nodeName) {
123+
if (Strings.isNullOrEmpty(prefix)) {
140124
return nodeName;
141125
}
142126
return prefix + "." + nodeName;
@@ -160,15 +144,19 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
160144
} else {
161145
return node.asText();
162146
}
163-
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
147+
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
164148
return Date.valueOf(node.asText());
165149
} else if (info.getTypeClass().equals(Types.SQL_TIME.getTypeClass())) {
166150
// local zone
167151
return Time.valueOf(node.asText());
168152
} else if (info.getTypeClass().equals(Types.SQL_TIMESTAMP.getTypeClass())) {
169153
// local zone
170154
return Timestamp.valueOf(node.asText());
171-
} else {
155+
} else if (info instanceof RowTypeInfo) {
156+
return convertRow(node, (RowTypeInfo) info);
157+
} else if (info instanceof ObjectArrayTypeInfo) {
158+
return convertObjectArray(node, ((ObjectArrayTypeInfo) info).getComponentInfo());
159+
} else {
172160
// for types that were specified without JSON schema
173161
// e.g. POJOs
174162
try {
@@ -179,5 +167,55 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
179167
}
180168
}
181169

170+
private Row convertTopRow() {
171+
Row row = new Row(fieldNames.length);
172+
try {
173+
for (int i = 0; i < fieldNames.length; i++) {
174+
JsonNode node = getIgnoreCase(fieldNames[i]);
175+
AbstractTableInfo.FieldExtraInfo fieldExtraInfo = fieldExtraInfos.get(i);
176+
177+
if (node == null) {
178+
if (fieldExtraInfo != null && fieldExtraInfo.getNotNull()) {
179+
throw new IllegalStateException("Failed to find field with name '"
180+
+ fieldNames[i] + "'.");
181+
} else {
182+
row.setField(i, null);
183+
}
184+
} else {
185+
// Read the value as specified type
186+
Object value = convert(node, fieldTypes[i]);
187+
row.setField(i, value);
188+
}
189+
}
190+
return row;
191+
} finally {
192+
nodeAndJsonNodeMapping.clear();
193+
}
194+
}
195+
196+
private Row convertRow(JsonNode node, RowTypeInfo info) {
197+
final String[] names = info.getFieldNames();
198+
final TypeInformation<?>[] types = info.getFieldTypes();
199+
200+
final Row row = new Row(names.length);
201+
for (int i = 0; i < names.length; i++) {
202+
final String name = names[i];
203+
final JsonNode subNode = node.get(name);
204+
if (subNode == null) {
205+
row.setField(i, null);
206+
} else {
207+
row.setField(i, convert(subNode, types[i]));
208+
}
209+
}
182210

211+
return row;
212+
}
213+
214+
private Object convertObjectArray(JsonNode node, TypeInformation<?> elementType) {
215+
final Object[] array = (Object[]) Array.newInstance(elementType.getTypeClass(), node.size());
216+
for (int i = 0; i < node.size(); i++) {
217+
array[i] = convert(node.get(i), elementType);
218+
}
219+
return array;
220+
}
183221
}

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableParser.java

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
* limitations under the License.
1717
*/
1818

19-
20-
2119
package com.dtstack.flink.sql.table;
2220

2321
import com.dtstack.flink.sql.util.ClassUtil;
@@ -46,7 +44,7 @@ public abstract class AbstractTableParser {
4644
private static final String CHAR_TYPE_NO_LENGTH = "CHAR";
4745

4846
private static Pattern primaryKeyPattern = Pattern.compile("(?i)PRIMARY\\s+KEY\\s*\\((.*)\\)");
49-
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
47+
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(.+?)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
5048
private static Pattern physicalFieldFunPattern = Pattern.compile("\\w+\\((\\w+)\\)$");
5149
private static Pattern charTypePattern = Pattern.compile("(?i)CHAR\\((\\d*)\\)$");
5250

@@ -84,32 +82,32 @@ public boolean dealKeyPattern(String fieldRow, AbstractTableInfo tableInfo){
8482
return false;
8583
}
8684

87-
public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo){
85+
public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo) {
8886

8987
List<String> fieldRows = DtStringUtil.splitIgnoreQuota(fieldsInfo, ',');
90-
for(String fieldRow : fieldRows){
88+
89+
for (String fieldRow : fieldRows) {
9190
fieldRow = fieldRow.trim();
9291

93-
if(StringUtils.isBlank(fieldRow)){
92+
if (StringUtils.isBlank(fieldRow)) {
9493
throw new RuntimeException(String.format("table [%s],exists field empty.", tableInfo.getName()));
9594
}
9695

97-
String[] filedInfoArr = fieldRow.split("\\s+");
98-
if(filedInfoArr.length < 2 ){
99-
throw new RuntimeException(String.format("table [%s] field [%s] format error.", tableInfo.getName(), fieldRow));
100-
}
96+
String[] fieldInfoArr = fieldRow.split("\\s+");
97+
98+
String errorMsg = String.format("table [%s] field [%s] format error.", tableInfo.getName(), fieldRow);
99+
Preconditions.checkState(fieldInfoArr.length >= 2, errorMsg);
101100

102101
boolean isMatcherKey = dealKeyPattern(fieldRow, tableInfo);
103-
if(isMatcherKey){
102+
if (isMatcherKey) {
104103
continue;
105104
}
106105

107106
//Compatible situation may arise in space in the fieldName
108-
String[] filedNameArr = new String[filedInfoArr.length - 1];
109-
System.arraycopy(filedInfoArr, 0, filedNameArr, 0, filedInfoArr.length - 1);
107+
String[] filedNameArr = new String[fieldInfoArr.length - 1];
108+
System.arraycopy(fieldInfoArr, 0, filedNameArr, 0, fieldInfoArr.length - 1);
110109
String fieldName = String.join(" ", filedNameArr);
111-
String fieldType = filedInfoArr[filedInfoArr.length - 1 ].trim();
112-
110+
String fieldType = fieldInfoArr[fieldInfoArr.length - 1 ].trim();
113111

114112
Class fieldClass = null;
115113
AbstractTableInfo.FieldExtraInfo fieldExtraInfo = null;
@@ -123,7 +121,7 @@ public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo){
123121
fieldClass = dbTypeConvertToJavaType(fieldType);
124122
}
125123

126-
tableInfo.addPhysicalMappings(filedInfoArr[0],filedInfoArr[0]);
124+
tableInfo.addPhysicalMappings(fieldInfoArr[0], fieldInfoArr[0]);
127125
tableInfo.addField(fieldName);
128126
tableInfo.addFieldClass(fieldClass);
129127
tableInfo.addFieldType(fieldType);
@@ -133,7 +131,7 @@ public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo){
133131
tableInfo.finish();
134132
}
135133

136-
public void dealPrimaryKey(Matcher matcher, AbstractTableInfo tableInfo){
134+
public void dealPrimaryKey(Matcher matcher, AbstractTableInfo tableInfo) {
137135
String primaryFields = matcher.group(1).trim();
138136
String[] splitArry = primaryFields.split(",");
139137
List<String> primaryKes = Lists.newArrayList(splitArry);
@@ -172,4 +170,5 @@ protected void addParserHandler(String parserName, Pattern pattern, ITableFieldD
172170
patternMap.put(parserName, pattern);
173171
handlerMap.put(parserName, handler);
174172
}
173+
175174
}

core/src/main/java/com/dtstack/flink/sql/util/ClassUtil.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
package com.dtstack.flink.sql.util;
2222

23+
import java.lang.reflect.Array;
2324
import java.math.BigDecimal;
2425
import java.sql.Date;
2526
import java.sql.Time;
@@ -34,7 +35,14 @@
3435
public class ClassUtil {
3536

3637
public static Class<?> stringConvertClass(String str) {
37-
switch (str.toLowerCase()) {
38+
39+
// 这部分主要是告诉Class转TypeInfomation的方法,字段是Array类型
40+
String lowerStr = str.toLowerCase().trim();
41+
if (lowerStr.startsWith("array")) {
42+
return Array.newInstance(Integer.class, 0).getClass();
43+
}
44+
45+
switch (lowerStr) {
3846
case "boolean":
3947
case "bit":
4048
return Boolean.class;
@@ -61,6 +69,7 @@ public static Class<?> stringConvertClass(String str) {
6169
case "varchar":
6270
case "char":
6371
case "text":
72+
case "string":
6473
return String.class;
6574

6675
case "real":

0 commit comments

Comments
 (0)