Skip to content

Commit af3dc96

Browse files
author
gituser
committed
Merge branch 'hotfix_1.10_4.0.x_31007' into 1.10_release_4.0.x
2 parents 6e174be + a633747 commit af3dc96

File tree

3 files changed

+62
-2
lines changed

3 files changed

+62
-2
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/FlinkSQLExec.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.exec;
2020

21+
import com.dtstack.flink.sql.util.SqlCheckUtils;
2122
import org.apache.calcite.sql.SqlIdentifier;
2223
import org.apache.calcite.sql.SqlInsert;
2324
import org.apache.flink.sql.parser.dml.RichSqlInsert;
@@ -58,7 +59,7 @@ public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throw
5859
StreamTableEnvironmentImpl tableEnvImpl = ((StreamTableEnvironmentImpl) tableEnv);
5960
StreamPlanner streamPlanner = (StreamPlanner)tableEnvImpl.getPlanner();
6061
FlinkPlannerImpl flinkPlanner = streamPlanner.createFlinkPlanner();
61-
62+
SqlCheckUtils.check(stmt);
6263
RichSqlInsert insert = (RichSqlInsert) flinkPlanner.validate(flinkPlanner.parser().parse(stmt));
6364
TableImpl queryResult = extractQueryTableFromInsertCaluse(tableEnvImpl, flinkPlanner, insert);
6465

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.dtstack.flink.sql.side.operator.SideWithAllCacheOperator;
2929
import com.dtstack.flink.sql.util.ClassUtil;
3030
import com.dtstack.flink.sql.util.ParseUtils;
31+
import com.dtstack.flink.sql.util.SqlCheckUtils;
3132
import com.dtstack.flink.sql.util.TableUtils;
3233
import com.google.common.base.Preconditions;
3334
import com.google.common.collect.HashBasedTable;
@@ -151,7 +152,7 @@ public void exec(String sql,
151152

152153
} else if (pollSqlNode.getKind() == SELECT) {
153154
Preconditions.checkState(createView != null, "select sql must included by create view");
154-
Table table = tableEnv.sqlQuery(pollObj.toString());
155+
Table table = SqlCheckUtils.sqlQueryWithCheck(tableEnv, pollObj.toString());
155156

156157
if (createView.getFieldsInfoStr() == null) {
157158
tableEnv.registerTable(createView.getTableName(), table);
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.util;
20+
21+
import com.google.common.base.Preconditions;
22+
import org.apache.flink.table.api.Table;
23+
import org.apache.flink.table.api.TableEnvironment;
24+
25+
import java.util.regex.Matcher;
26+
import java.util.regex.Pattern;
27+
28+
/**
29+
* @program: flinkStreamSQL
30+
* @author: wuren
31+
* @create: 2020/10/13
32+
**/
33+
public class SqlCheckUtils {
34+
35+
private final static Pattern NULL_AS_PATTERN = Pattern.compile("(?i)NULL\\s+AS");
36+
37+
/**
38+
* check SQL before call sqlQuery
39+
* @param tEnv
40+
* @param query
41+
* @return
42+
*/
43+
public static Table sqlQueryWithCheck(TableEnvironment tEnv, String query) {
44+
check(query);
45+
return tEnv.sqlQuery(query);
46+
}
47+
48+
/**
49+
* check SQL before pass into flink planner
50+
* 在传入原生Flink之前校验SQL合法性。
51+
* @param stmt
52+
*/
53+
public static void check(String stmt) {
54+
Matcher matcher = NULL_AS_PATTERN.matcher(stmt);
55+
Preconditions.checkState(!matcher.find(),"NULL AS is not supported. error SQL is [%s]", stmt);
56+
}
57+
58+
}

0 commit comments

Comments
 (0)