Skip to content

Commit cd4573c

Browse files
committed
Merge branch 'feat_1.8_autoJsonArray_mergedTest' into '1.8_test_3.10.x'
Feat 1.8 auto json array merged test See merge request dt-insight-engine/flinkStreamSQL!76
2 parents 6749990 + 9484dcc commit cd4573c

File tree

20 files changed

+682
-427
lines changed

20 files changed

+682
-427
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ target/
88
*.eclipse.*
99
*.iml
1010
plugins/
11+
sqlplugins/
1112
lib/
1213
.vertx/
1314
.DS_Store

core/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@
122122
<artifactId>junit</artifactId>
123123
<version>4.12</version>
124124
</dependency>
125+
125126
</dependencies>
126127

127128
<build>

core/src/main/java/com/dtstack/flink/sql/format/dtnest/DtNestRowDeserializationSchema.java

Lines changed: 57 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
2525
import org.apache.flink.api.common.typeinfo.TypeInformation;
2626
import org.apache.flink.api.common.typeinfo.Types;
27+
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
2728
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2829
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
2930
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -34,7 +35,7 @@
3435
import org.apache.flink.types.Row;
3536

3637
import java.io.IOException;
37-
import java.nio.charset.StandardCharsets;
38+
import java.lang.reflect.Array;
3839
import java.sql.Date;
3940
import java.sql.Time;
4041
import java.sql.Timestamp;
@@ -77,30 +78,7 @@ public Row deserialize(byte[] message) throws IOException {
7778
String decoderStr = new String(message, charsetName);
7879
JsonNode root = objectMapper.readTree(decoderStr);
7980
this.parseTree(root, null);
80-
Row row = new Row(fieldNames.length);
81-
82-
try {
83-
for (int i = 0; i < fieldNames.length; i++) {
84-
JsonNode node = getIgnoreCase(fieldNames[i]);
85-
AbstractTableInfo.FieldExtraInfo fieldExtraInfo = fieldExtraInfos.get(i);
86-
87-
if (node == null) {
88-
if (fieldExtraInfo != null && fieldExtraInfo.getNotNull()) {
89-
throw new IllegalStateException("Failed to find field with name '"
90-
+ fieldNames[i] + "'.");
91-
} else {
92-
row.setField(i, null);
93-
}
94-
} else {
95-
// Read the value as specified type
96-
Object value = convert(node, fieldTypes[i]);
97-
row.setField(i, value);
98-
}
99-
}
100-
return row;
101-
} finally {
102-
nodeAndJsonNodeMapping.clear();
103-
}
81+
return convertTopRow();
10482
}
10583

10684
private void parseTree(JsonNode jsonNode, String prefix) {
@@ -174,6 +152,10 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
174152
} else if (info.getTypeClass().equals(Types.SQL_TIMESTAMP.getTypeClass())) {
175153
// local zone
176154
return Timestamp.valueOf(node.asText());
155+
} else if (info instanceof RowTypeInfo) {
156+
return convertRow(node, (RowTypeInfo) info);
157+
} else if (info instanceof ObjectArrayTypeInfo) {
158+
return convertObjectArray(node, ((ObjectArrayTypeInfo) info).getComponentInfo());
177159
} else {
178160
// for types that were specified without JSON schema
179161
// e.g. POJOs
@@ -185,5 +167,55 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
185167
}
186168
}
187169

170+
private Row convertTopRow() {
171+
Row row = new Row(fieldNames.length);
172+
try {
173+
for (int i = 0; i < fieldNames.length; i++) {
174+
JsonNode node = getIgnoreCase(fieldNames[i]);
175+
AbstractTableInfo.FieldExtraInfo fieldExtraInfo = fieldExtraInfos.get(i);
188176

177+
if (node == null) {
178+
if (fieldExtraInfo != null && fieldExtraInfo.getNotNull()) {
179+
throw new IllegalStateException("Failed to find field with name '"
180+
+ fieldNames[i] + "'.");
181+
} else {
182+
row.setField(i, null);
183+
}
184+
} else {
185+
// Read the value as specified type
186+
Object value = convert(node, fieldTypes[i]);
187+
row.setField(i, value);
188+
}
189+
}
190+
return row;
191+
} finally {
192+
nodeAndJsonNodeMapping.clear();
193+
}
194+
}
195+
196+
private Row convertRow(JsonNode node, RowTypeInfo info) {
197+
final String[] names = info.getFieldNames();
198+
final TypeInformation<?>[] types = info.getFieldTypes();
199+
200+
final Row row = new Row(names.length);
201+
for (int i = 0; i < names.length; i++) {
202+
final String name = names[i];
203+
final JsonNode subNode = node.get(name);
204+
if (subNode == null) {
205+
row.setField(i, null);
206+
} else {
207+
row.setField(i, convert(subNode, types[i]));
208+
}
209+
}
210+
211+
return row;
212+
}
213+
214+
private Object convertObjectArray(JsonNode node, TypeInformation<?> elementType) {
215+
final Object[] array = (Object[]) Array.newInstance(elementType.getTypeClass(), node.size());
216+
for (int i = 0; i < node.size(); i++) {
217+
array[i] = convert(node.get(i), elementType);
218+
}
219+
return array;
220+
}
189221
}

core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,7 @@
2222

2323
import com.dtstack.flink.sql.util.DtStringUtil;
2424
import org.apache.calcite.config.Lex;
25-
import org.apache.calcite.sql.SqlBasicCall;
26-
import org.apache.calcite.sql.SqlJoin;
27-
import org.apache.calcite.sql.SqlKind;
28-
import org.apache.calcite.sql.SqlNode;
29-
import org.apache.calcite.sql.SqlSelect;
25+
import org.apache.calcite.sql.*;
3026
import org.apache.calcite.sql.parser.SqlParseException;
3127
import org.apache.calcite.sql.parser.SqlParser;
3228
import com.google.common.collect.Lists;
@@ -164,6 +160,10 @@ private static void parseNode(SqlNode sqlNode, CreateTmpTableParser.SqlParserRes
164160
parseNode(unionRight, sqlParseResult);
165161
}
166162
break;
163+
case MATCH_RECOGNIZE:
164+
SqlMatchRecognize node = (SqlMatchRecognize) sqlNode;
165+
sqlParseResult.addSourceTable(node.getTableRef().toString());
166+
break;
167167
default:
168168
//do nothing
169169
break;

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableParser.java

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
* limitations under the License.
1717
*/
1818

19-
20-
2119
package com.dtstack.flink.sql.table;
2220

2321
import com.dtstack.flink.sql.util.ClassUtil;
@@ -46,7 +44,7 @@ public abstract class AbstractTableParser {
4644
private static final String CHAR_TYPE_NO_LENGTH = "CHAR";
4745

4846
private static Pattern primaryKeyPattern = Pattern.compile("(?i)PRIMARY\\s+KEY\\s*\\((.*)\\)");
49-
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
47+
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(.+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
5048
private static Pattern physicalFieldFunPattern = Pattern.compile("\\w+\\((\\w+)\\)$");
5149
private static Pattern charTypePattern = Pattern.compile("(?i)CHAR\\((\\d*)\\)$");
5250

@@ -84,32 +82,32 @@ public boolean dealKeyPattern(String fieldRow, AbstractTableInfo tableInfo){
8482
return false;
8583
}
8684

87-
public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo){
85+
public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo) {
8886

8987
List<String> fieldRows = DtStringUtil.splitIgnoreQuota(fieldsInfo, ',');
90-
for(String fieldRow : fieldRows){
88+
89+
for (String fieldRow : fieldRows) {
9190
fieldRow = fieldRow.trim();
9291

93-
if(StringUtils.isBlank(fieldRow)){
92+
if (StringUtils.isBlank(fieldRow)) {
9493
throw new RuntimeException(String.format("table [%s],exists field empty.", tableInfo.getName()));
9594
}
9695

97-
String[] filedInfoArr = fieldRow.split("\\s+");
98-
if(filedInfoArr.length < 2 ){
99-
throw new RuntimeException(String.format("table [%s] field [%s] format error.", tableInfo.getName(), fieldRow));
100-
}
96+
String[] fieldInfoArr = fieldRow.split("\\s+");
97+
98+
String errorMsg = String.format("table [%s] field [%s] format error.", tableInfo.getName(), fieldRow);
99+
Preconditions.checkState(fieldInfoArr.length >= 2, errorMsg);
101100

102101
boolean isMatcherKey = dealKeyPattern(fieldRow, tableInfo);
103-
if(isMatcherKey){
102+
if (isMatcherKey) {
104103
continue;
105104
}
106105

107106
//Compatible situation may arise in space in the fieldName
108-
String[] filedNameArr = new String[filedInfoArr.length - 1];
109-
System.arraycopy(filedInfoArr, 0, filedNameArr, 0, filedInfoArr.length - 1);
107+
String[] filedNameArr = new String[fieldInfoArr.length - 1];
108+
System.arraycopy(fieldInfoArr, 0, filedNameArr, 0, fieldInfoArr.length - 1);
110109
String fieldName = String.join(" ", filedNameArr);
111-
String fieldType = filedInfoArr[filedInfoArr.length - 1 ].trim();
112-
110+
String fieldType = fieldInfoArr[fieldInfoArr.length - 1 ].trim();
113111

114112
Class fieldClass = null;
115113
AbstractTableInfo.FieldExtraInfo fieldExtraInfo = null;
@@ -123,7 +121,7 @@ public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo){
123121
fieldClass = dbTypeConvertToJavaType(fieldType);
124122
}
125123

126-
tableInfo.addPhysicalMappings(filedInfoArr[0],filedInfoArr[0]);
124+
tableInfo.addPhysicalMappings(fieldInfoArr[0], fieldInfoArr[0]);
127125
tableInfo.addField(fieldName);
128126
tableInfo.addFieldClass(fieldClass);
129127
tableInfo.addFieldType(fieldType);
@@ -133,7 +131,7 @@ public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo){
133131
tableInfo.finish();
134132
}
135133

136-
public void dealPrimaryKey(Matcher matcher, AbstractTableInfo tableInfo){
134+
public void dealPrimaryKey(Matcher matcher, AbstractTableInfo tableInfo) {
137135
String primaryFields = matcher.group(1).trim();
138136
String[] splitArry = primaryFields.split(",");
139137
List<String> primaryKes = Lists.newArrayList(splitArry);
@@ -172,4 +170,5 @@ protected void addParserHandler(String parserName, Pattern pattern, ITableFieldD
172170
patternMap.put(parserName, pattern);
173171
handlerMap.put(parserName, handler);
174172
}
173+
175174
}

core/src/main/java/com/dtstack/flink/sql/util/ClassUtil.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
package com.dtstack.flink.sql.util;
2222

23+
import java.lang.reflect.Array;
2324
import java.math.BigDecimal;
2425
import java.sql.Date;
2526
import java.sql.Time;
@@ -34,7 +35,14 @@
3435
public class ClassUtil {
3536

3637
public static Class<?> stringConvertClass(String str) {
37-
switch (str.toLowerCase()) {
38+
39+
// 这部分主要是告诉Class转TypeInfomation的方法,字段是Array类型
40+
String lowerStr = str.toLowerCase();
41+
if (lowerStr.startsWith("array")) {
42+
return Array.newInstance(Integer.class, 0).getClass();
43+
}
44+
45+
switch (lowerStr) {
3846
case "boolean":
3947
case "bit":
4048
return Boolean.class;
@@ -61,6 +69,7 @@ public static Class<?> stringConvertClass(String str) {
6169
case "varchar":
6270
case "char":
6371
case "text":
72+
case "string":
6473
return String.class;
6574

6675
case "real":

0 commit comments

Comments
 (0)