Skip to content

Commit 5540db8

Browse files
author
dapeng
committed
Merge remote-tracking branch 'origin/hotfix_1.8_3.10.x_28911' into 1.8_3.10_zy
2 parents 7983c8a + fd6fa5f commit 5540db8

File tree

33 files changed

+828
-487
lines changed

33 files changed

+828
-487
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ target/
88
*.eclipse.*
99
*.iml
1010
plugins/
11+
sqlplugins/
1112
lib/
1213
.vertx/
1314
.DS_Store

core/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@
134134
<artifactId>junit</artifactId>
135135
<version>4.12</version>
136136
</dependency>
137+
137138
</dependencies>
138139

139140
<build>

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.apache.flink.types.Row;
3939

4040
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
41-
import com.dtstack.flink.sql.constrant.ConfigConstrant;
4241
import com.dtstack.flink.sql.enums.ClusterMode;
4342
import com.dtstack.flink.sql.enums.ECacheType;
4443
import com.dtstack.flink.sql.enums.EPluginLoadMode;

core/src/main/java/com/dtstack/flink/sql/format/dtnest/DtNestRowDeserializationSchema.java

Lines changed: 76 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
2525
import org.apache.flink.api.common.typeinfo.TypeInformation;
2626
import org.apache.flink.api.common.typeinfo.Types;
27+
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
2728
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2829
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
2930
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -34,6 +35,7 @@
3435
import org.apache.flink.types.Row;
3536

3637
import java.io.IOException;
38+
import java.lang.reflect.Array;
3739
import java.sql.Date;
3840
import java.sql.Time;
3941
import java.sql.Timestamp;
@@ -43,7 +45,7 @@
4345

4446
/**
4547
* source data parse to json format
46-
*
48+
* <p>
4749
* Date: 2019/12/12
4850
* Company: www.dtstack.com
4951
*
@@ -53,51 +55,33 @@ public class DtNestRowDeserializationSchema extends AbstractDeserializationSchem
5355

5456
private final ObjectMapper objectMapper = new ObjectMapper();
5557

56-
private Map<String, String> rowAndFieldMapping;
57-
private Map<String, JsonNode> nodeAndJsonNodeMapping = Maps.newHashMap();
58+
private final Map<String, String> rowAndFieldMapping;
59+
private final Map<String, JsonNode> nodeAndJsonNodeMapping = Maps.newHashMap();
5860

5961
private final String[] fieldNames;
6062
private final TypeInformation<?>[] fieldTypes;
61-
private List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos;
63+
private final List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos;
64+
private final String charsetName;
6265

63-
public DtNestRowDeserializationSchema(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping, List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos) {
66+
public DtNestRowDeserializationSchema(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping,
67+
List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos,
68+
String charsetName) {
6469
this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
6570
this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
6671
this.rowAndFieldMapping = rowAndFieldMapping;
6772
this.fieldExtraInfos = fieldExtraInfos;
73+
this.charsetName = charsetName;
6874
}
6975

7076
@Override
7177
public Row deserialize(byte[] message) throws IOException {
72-
JsonNode root = objectMapper.readTree(message);
78+
String decoderStr = new String(message, charsetName);
79+
JsonNode root = objectMapper.readTree(decoderStr);
7380
this.parseTree(root, null);
74-
Row row = new Row(fieldNames.length);
75-
76-
try {
77-
for (int i = 0; i < fieldNames.length; i++) {
78-
JsonNode node = getIgnoreCase(fieldNames[i]);
79-
AbstractTableInfo.FieldExtraInfo fieldExtraInfo = fieldExtraInfos.get(i);
80-
81-
if (node == null) {
82-
if (fieldExtraInfo != null && fieldExtraInfo.getNotNull()) {
83-
throw new IllegalStateException("Failed to find field with name '"
84-
+ fieldNames[i] + "'.");
85-
} else {
86-
row.setField(i, null);
87-
}
88-
} else {
89-
// Read the value as specified type
90-
Object value = convert(node, fieldTypes[i]);
91-
row.setField(i, value);
92-
}
93-
}
94-
return row;
95-
} finally {
96-
nodeAndJsonNodeMapping.clear();
97-
}
81+
return convertTopRow();
9882
}
9983

100-
private void parseTree(JsonNode jsonNode, String prefix){
84+
private void parseTree(JsonNode jsonNode, String prefix) {
10185
if (jsonNode.isArray()) {
10286
ArrayNode array = (ArrayNode) jsonNode;
10387
for (int i = 0; i < array.size(); i++) {
@@ -116,15 +100,15 @@ private void parseTree(JsonNode jsonNode, String prefix){
116100
return;
117101
}
118102
Iterator<String> iterator = jsonNode.fieldNames();
119-
while (iterator.hasNext()){
103+
while (iterator.hasNext()) {
120104
String next = iterator.next();
121105
JsonNode child = jsonNode.get(next);
122106
String nodeKey = getNodeKey(prefix, next);
123107

124108
nodeAndJsonNodeMapping.put(nodeKey, child);
125-
if(child.isArray()){
109+
if (child.isArray()) {
126110
parseTree(child, nodeKey);
127-
}else {
111+
} else {
128112
parseTree(child, nodeKey);
129113
}
130114
}
@@ -135,8 +119,8 @@ private JsonNode getIgnoreCase(String key) {
135119
return nodeAndJsonNodeMapping.get(nodeMappingKey);
136120
}
137121

138-
private String getNodeKey(String prefix, String nodeName){
139-
if(Strings.isNullOrEmpty(prefix)){
122+
private String getNodeKey(String prefix, String nodeName) {
123+
if (Strings.isNullOrEmpty(prefix)) {
140124
return nodeName;
141125
}
142126
return prefix + "." + nodeName;
@@ -160,15 +144,19 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
160144
} else {
161145
return node.asText();
162146
}
163-
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
147+
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
164148
return Date.valueOf(node.asText());
165149
} else if (info.getTypeClass().equals(Types.SQL_TIME.getTypeClass())) {
166150
// local zone
167151
return Time.valueOf(node.asText());
168152
} else if (info.getTypeClass().equals(Types.SQL_TIMESTAMP.getTypeClass())) {
169153
// local zone
170154
return Timestamp.valueOf(node.asText());
171-
} else {
155+
} else if (info instanceof RowTypeInfo) {
156+
return convertRow(node, (RowTypeInfo) info);
157+
} else if (info instanceof ObjectArrayTypeInfo) {
158+
return convertObjectArray(node, ((ObjectArrayTypeInfo) info).getComponentInfo());
159+
} else {
172160
// for types that were specified without JSON schema
173161
// e.g. POJOs
174162
try {
@@ -179,5 +167,55 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
179167
}
180168
}
181169

170+
private Row convertTopRow() {
171+
Row row = new Row(fieldNames.length);
172+
try {
173+
for (int i = 0; i < fieldNames.length; i++) {
174+
JsonNode node = getIgnoreCase(fieldNames[i]);
175+
AbstractTableInfo.FieldExtraInfo fieldExtraInfo = fieldExtraInfos.get(i);
176+
177+
if (node == null) {
178+
if (fieldExtraInfo != null && fieldExtraInfo.getNotNull()) {
179+
throw new IllegalStateException("Failed to find field with name '"
180+
+ fieldNames[i] + "'.");
181+
} else {
182+
row.setField(i, null);
183+
}
184+
} else {
185+
// Read the value as specified type
186+
Object value = convert(node, fieldTypes[i]);
187+
row.setField(i, value);
188+
}
189+
}
190+
return row;
191+
} finally {
192+
nodeAndJsonNodeMapping.clear();
193+
}
194+
}
195+
196+
private Row convertRow(JsonNode node, RowTypeInfo info) {
197+
final String[] names = info.getFieldNames();
198+
final TypeInformation<?>[] types = info.getFieldTypes();
199+
200+
final Row row = new Row(names.length);
201+
for (int i = 0; i < names.length; i++) {
202+
final String name = names[i];
203+
final JsonNode subNode = node.get(name);
204+
if (subNode == null) {
205+
row.setField(i, null);
206+
} else {
207+
row.setField(i, convert(subNode, types[i]));
208+
}
209+
}
182210

211+
return row;
212+
}
213+
214+
private Object convertObjectArray(JsonNode node, TypeInformation<?> elementType) {
215+
final Object[] array = (Object[]) Array.newInstance(elementType.getTypeClass(), node.size());
216+
for (int i = 0; i < node.size(); i++) {
217+
array[i] = convert(node.get(i), elementType);
218+
}
219+
return array;
220+
}
183221
}

core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,7 @@
2222

2323
import com.dtstack.flink.sql.util.DtStringUtil;
2424
import org.apache.calcite.config.Lex;
25-
import org.apache.calcite.sql.SqlBasicCall;
26-
import org.apache.calcite.sql.SqlJoin;
27-
import org.apache.calcite.sql.SqlKind;
28-
import org.apache.calcite.sql.SqlNode;
29-
import org.apache.calcite.sql.SqlSelect;
25+
import org.apache.calcite.sql.*;
3026
import org.apache.calcite.sql.parser.SqlParseException;
3127
import org.apache.calcite.sql.parser.SqlParser;
3228
import com.google.common.collect.Lists;
@@ -164,6 +160,10 @@ private static void parseNode(SqlNode sqlNode, CreateTmpTableParser.SqlParserRes
164160
parseNode(unionRight, sqlParseResult);
165161
}
166162
break;
163+
case MATCH_RECOGNIZE:
164+
SqlMatchRecognize node = (SqlMatchRecognize) sqlNode;
165+
sqlParseResult.addSourceTable(node.getTableRef().toString());
166+
break;
167167
default:
168168
//do nothing
169169
break;

core/src/main/java/com/dtstack/flink/sql/side/BaseSideInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName){
162162
}
163163
}
164164
if(equalFieldIndex == -1){
165-
throw new RuntimeException("can't find equal field " + rightField);
165+
throw new RuntimeException("can't find equal field " + leftField);
166166
}
167167

168168
equalValIndex.add(equalFieldIndex);

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ public class JoinInfo implements Serializable {
6969

7070
private String scope = "";
7171

72+
private String newTableName = null;
73+
7274
/**
7375
* 左表需要查询的字段信息和output的时候对应的列名称
7476
*/
@@ -96,13 +98,12 @@ public String getNonSideTable(){
9698
}
9799

98100
public String getNewTableName(){
99-
//兼容左边表是as 的情况
100-
String leftStr = leftTableName;
101-
leftStr = Strings.isNullOrEmpty(leftStr) ? leftTableAlias : leftStr;
102-
String newName = leftStr + "_" + rightTableName;
103-
return TableUtils.buildTableNameWithScope(newName, scope);
101+
return this.newTableName;
104102
}
105103

104+
public void setNewTableName(String newTableName){
105+
this.newTableName = newTableName;
106+
}
106107

107108
public String getNewTableAlias(){
108109
String newName = leftTableAlias + "_" + rightTableAlias;

core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
9191
Set<Tuple2<String, String>> joinFieldSet,
9292
Map<String, String> tableRef,
9393
Map<String, String> fieldRef,
94-
String scope) {
94+
String scope,
95+
Set<String> joinTableNames) {
9596

9697
SqlNode leftNode = joinNode.getLeft();
9798
SqlNode rightNode = joinNode.getRight();
@@ -108,13 +109,13 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
108109
if (leftNode.getKind() == JOIN) {
109110
//处理连续join
110111
dealNestJoin(joinNode, sideTableSet, queueInfo, parentWhere, parentSelectList,
111-
parentGroupByList, joinFieldSet, tableRef, fieldRef, scope);
112+
parentGroupByList, joinFieldSet, tableRef, fieldRef, scope, joinTableNames);
112113
leftNode = joinNode.getLeft();
113114
}
114115

115116
if (leftNode.getKind() == AS) {
116117
AliasInfo aliasInfo = (AliasInfo) sideSQLParser.parseSql(leftNode, sideTableSet, queueInfo,
117-
parentWhere, parentSelectList, parentGroupByList, scope);
118+
parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
118119
leftTbName = aliasInfo.getName();
119120
leftTbAlias = aliasInfo.getAlias();
120121
} else if(leftNode.getKind() == IDENTIFIER){
@@ -126,7 +127,7 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
126127
Preconditions.checkState(!leftIsSide, "side-table must be at the right of join operator");
127128

128129
Tuple2<String, String> rightTableNameAndAlias = parseRightNode(rightNode, sideTableSet, queueInfo,
129-
parentWhere, parentSelectList, parentGroupByList, scope);
130+
parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
130131
rightTableName = rightTableNameAndAlias.f0;
131132
rightTableAlias = rightTableNameAndAlias.f1;
132133

@@ -150,7 +151,8 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
150151
tableInfo.setJoinType(joinType);
151152
tableInfo.setCondition(joinNode.getCondition());
152153
tableInfo.setScope(scope);
153-
154+
tableInfo.setNewTableName(TableUtils.buildTableNameWithScope(leftTbName, leftTbAlias, rightTableName, scope, joinTableNames));
155+
joinTableNames.add(tableInfo.getNewTableName());
154156
TableUtils.replaceJoinFieldRefTableName(joinNode.getCondition(), fieldRef);
155157

156158
//extract 需要查询的字段信息
@@ -262,20 +264,21 @@ private JoinInfo dealNestJoin(SqlJoin joinNode,
262264
Set<Tuple2<String, String>> joinFieldSet,
263265
Map<String, String> tableRef,
264266
Map<String, String> fieldRef,
265-
String scope){
267+
String scope,
268+
Set<String> joinTableNames){
266269

267270
SqlJoin leftJoinNode = (SqlJoin) joinNode.getLeft();
268271
SqlNode parentRightJoinNode = joinNode.getRight();
269272
SqlNode rightNode = leftJoinNode.getRight();
270273

271274
Tuple2<String, String> rightTableNameAndAlias = parseRightNode(rightNode, sideTableSet, queueInfo,
272-
parentWhere, parentSelectList, parentGroupByList, scope);
275+
parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
273276
Tuple2<String, String> parentRightJoinInfo = parseRightNode(parentRightJoinNode, sideTableSet,
274-
queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
277+
queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
275278
boolean parentRightIsSide = checkIsSideTable(parentRightJoinInfo.f0, sideTableSet);
276279

277280
JoinInfo joinInfo = dealJoinNode(leftJoinNode, sideTableSet, queueInfo, parentWhere, parentSelectList,
278-
parentGroupByList, joinFieldSet, tableRef, fieldRef, scope);
281+
parentGroupByList, joinFieldSet, tableRef, fieldRef, scope, joinTableNames);
279282

280283
String rightTableName = rightTableNameAndAlias.f0;
281284
boolean rightIsSide = checkIsSideTable(rightTableName, sideTableSet);
@@ -670,12 +673,12 @@ private void extractSelectField(SqlNode selectNode,
670673

671674
private Tuple2<String, String> parseRightNode(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object> queueInfo,
672675
SqlNode parentWhere, SqlNodeList selectList, SqlNodeList parentGroupByList,
673-
String scope) {
676+
String scope, Set<String> joinTableNames) {
674677
Tuple2<String, String> tabName = new Tuple2<>("", "");
675678
if(sqlNode.getKind() == IDENTIFIER){
676679
tabName.f0 = sqlNode.toString();
677680
}else{
678-
AliasInfo aliasInfo = (AliasInfo)sideSQLParser.parseSql(sqlNode, sideTableSet, queueInfo, parentWhere, selectList, parentGroupByList, scope);
681+
AliasInfo aliasInfo = (AliasInfo)sideSQLParser.parseSql(sqlNode, sideTableSet, queueInfo, parentWhere, selectList, parentGroupByList, scope, joinTableNames);
679682
tabName.f0 = aliasInfo.getName();
680683
tabName.f1 = aliasInfo.getAlias();
681684
}

0 commit comments

Comments
 (0)