Skip to content

Commit 3f64b12

Browse files
committed
add join field is not exist throw error
1 parent 64c00d3 commit 3f64b12

File tree

1 file changed

+59
-0
lines changed

1 file changed

+59
-0
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,64 @@ private List<String> convertPrimaryAlias(AbstractSideTableInfo sideTableInfo) {
293293
return res;
294294
}
295295

296+
/**
297+
* check whether all table fields exist in join condition.
298+
* @param conditionNode
299+
* @param joinScope
300+
*/
301+
public void checkConditionFieldsInTable(SqlNode conditionNode, JoinScope joinScope) {
302+
List<SqlNode> sqlNodeList = Lists.newArrayList();
303+
ParseUtils.parseAnd(conditionNode, sqlNodeList);
304+
for (SqlNode sqlNode : sqlNodeList) {
305+
if (!SqlKind.COMPARISON.contains(sqlNode.getKind())) {
306+
throw new RuntimeException("not compare operator.");
307+
}
308+
309+
SqlNode leftNode = ((SqlBasicCall) sqlNode).getOperands()[0];
310+
SqlNode rightNode = ((SqlBasicCall) sqlNode).getOperands()[1];
311+
312+
if (leftNode.getKind() == SqlKind.IDENTIFIER) {
313+
checkFieldInTable((SqlIdentifier) leftNode, joinScope, conditionNode);
314+
}
315+
316+
if (rightNode.getKind() == SqlKind.IDENTIFIER) {
317+
checkFieldInTable((SqlIdentifier) rightNode, joinScope, conditionNode);
318+
}
319+
320+
}
321+
}
322+
323+
/**
324+
* check whether table exists and whether field is in table.
325+
* @param sqlNode
326+
* @param joinScope
327+
* @param conditionNode
328+
*/
329+
private void checkFieldInTable(SqlIdentifier sqlNode, JoinScope joinScope, SqlNode conditionNode) {
330+
String tableName = sqlNode.getComponent(0).getSimple();
331+
String fieldName = sqlNode.getComponent(1).getSimple();
332+
JoinScope.ScopeChild scopeChild = joinScope.getScope(tableName);
333+
String tableErrorMsg = "table [%s] is not exist. error condition is [%s]. if you find [%s] is exist, please check AS statement";
334+
Preconditions.checkState(
335+
scopeChild != null,
336+
tableErrorMsg,
337+
tableName,
338+
conditionNode.toString(),
339+
tableName
340+
);
341+
342+
String[] fieldNames = scopeChild.getRowTypeInfo().getFieldNames();
343+
boolean hasField = Arrays.asList(fieldNames).contains(fieldName);
344+
String fieldErrorMsg = "table [%s] has not [%s] field.\n error join condition is [%s]";
345+
Preconditions.checkState(
346+
hasField,
347+
fieldErrorMsg,
348+
tableName,
349+
fieldName,
350+
conditionNode.toString()
351+
);
352+
}
353+
296354
public List<String> getConditionFields(SqlNode conditionNode, String specifyTableName, AbstractSideTableInfo sideTableInfo) {
297355
List<SqlNode> sqlNodeList = Lists.newArrayList();
298356
ParseUtils.parseAnd(conditionNode, sqlNodeList);
@@ -395,6 +453,7 @@ private void joinFun(Object pollObj,
395453
joinScope.addScope(rightScopeChild);
396454

397455
HashBasedTable<String, String, String> mappingTable = ((JoinInfo) pollObj).getTableFieldRef();
456+
checkConditionFieldsInTable(joinInfo.getCondition(), joinScope);
398457

399458
//获取两个表的所有字段
400459
List<FieldInfo> sideJoinFieldInfo = ParserJoinField.getRowTypeInfo(joinInfo.getSelectNode(), joinScope, true);

0 commit comments

Comments
 (0)