Skip to content

Commit 6e174be

Browse files
author
gituser
committed
Merge branch 'hotfix_1.10_4.0.x_31008' into 1.10_release_4.0.x
2 parents cbf1d20 + 3f64b12 commit 6e174be

File tree

1 file changed

+59
-0
lines changed

1 file changed

+59
-0
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,64 @@ private List<String> convertPrimaryAlias(AbstractSideTableInfo sideTableInfo) {
310310
return res;
311311
}
312312

313+
/**
314+
* check whether all table fields exist in join condition.
315+
* @param conditionNode
316+
* @param joinScope
317+
*/
318+
public void checkConditionFieldsInTable(SqlNode conditionNode, JoinScope joinScope) {
319+
List<SqlNode> sqlNodeList = Lists.newArrayList();
320+
ParseUtils.parseAnd(conditionNode, sqlNodeList);
321+
for (SqlNode sqlNode : sqlNodeList) {
322+
if (!SqlKind.COMPARISON.contains(sqlNode.getKind())) {
323+
throw new RuntimeException("not compare operator.");
324+
}
325+
326+
SqlNode leftNode = ((SqlBasicCall) sqlNode).getOperands()[0];
327+
SqlNode rightNode = ((SqlBasicCall) sqlNode).getOperands()[1];
328+
329+
if (leftNode.getKind() == SqlKind.IDENTIFIER) {
330+
checkFieldInTable((SqlIdentifier) leftNode, joinScope, conditionNode);
331+
}
332+
333+
if (rightNode.getKind() == SqlKind.IDENTIFIER) {
334+
checkFieldInTable((SqlIdentifier) rightNode, joinScope, conditionNode);
335+
}
336+
337+
}
338+
}
339+
340+
/**
341+
* check whether table exists and whether field is in table.
342+
* @param sqlNode
343+
* @param joinScope
344+
* @param conditionNode
345+
*/
346+
private void checkFieldInTable(SqlIdentifier sqlNode, JoinScope joinScope, SqlNode conditionNode) {
347+
String tableName = sqlNode.getComponent(0).getSimple();
348+
String fieldName = sqlNode.getComponent(1).getSimple();
349+
JoinScope.ScopeChild scopeChild = joinScope.getScope(tableName);
350+
String tableErrorMsg = "table [%s] is not exist. error condition is [%s]. if you find [%s] is exist, please check AS statement";
351+
Preconditions.checkState(
352+
scopeChild != null,
353+
tableErrorMsg,
354+
tableName,
355+
conditionNode.toString(),
356+
tableName
357+
);
358+
359+
String[] fieldNames = scopeChild.getRowTypeInfo().getFieldNames();
360+
boolean hasField = Arrays.asList(fieldNames).contains(fieldName);
361+
String fieldErrorMsg = "table [%s] has not [%s] field.\n error join condition is [%s]";
362+
Preconditions.checkState(
363+
hasField,
364+
fieldErrorMsg,
365+
tableName,
366+
fieldName,
367+
conditionNode.toString()
368+
);
369+
}
370+
313371
public List<String> getConditionFields(SqlNode conditionNode, String specifyTableName, AbstractSideTableInfo sideTableInfo) {
314372
List<SqlNode> sqlNodeList = Lists.newArrayList();
315373
ParseUtils.parseAnd(conditionNode, sqlNodeList);
@@ -421,6 +479,7 @@ private void joinFun(Object pollObj,
421479
joinScope.addScope(rightScopeChild);
422480

423481
HashBasedTable<String, String, String> mappingTable = ((JoinInfo) pollObj).getTableFieldRef();
482+
checkConditionFieldsInTable(joinInfo.getCondition(), joinScope);
424483

425484
//获取两个表的所有字段
426485
List<FieldInfo> sideJoinFieldInfo = ParserJoinField.getRowTypeInfo(joinInfo.getSelectNode(), joinScope, true);

0 commit comments

Comments
 (0)