Skip to content

Commit 9f1efa6

Browse files
author
gituser
committed
Merge branch 'hotfix_1.10_4.0.x_32875' into 1.10_release_4.0.x
2 parents b335aa2 + b9d6473 commit 9f1efa6

File tree

1 file changed

+6
-25
lines changed

1 file changed

+6
-25
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/FlinkSQLExec.java

Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@
1919
package com.dtstack.flink.sql.exec;
2020

2121
import org.apache.calcite.sql.SqlIdentifier;
22-
import org.apache.calcite.sql.SqlInsert;
2322
import org.apache.flink.sql.parser.dml.RichSqlInsert;
2423
import org.apache.flink.table.api.Table;
25-
import org.apache.flink.table.api.ValidationException;
2624
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
2725
import org.apache.flink.table.api.internal.TableImpl;
2826
import org.apache.flink.table.api.java.StreamTableEnvironment;
@@ -42,10 +40,11 @@
4240

4341
import java.lang.reflect.InvocationTargetException;
4442
import java.lang.reflect.Method;
43+
import java.util.Arrays;
4544

4645

4746
/**
48-
* @description: mapping by name when insert into sink table
47+
* @description: mapping by name when insert into sink table
4948
* @author: maqi
5049
* @create: 2019/08/15 11:09
5150
*/
@@ -54,44 +53,26 @@ public class FlinkSQLExec {
5453

5554
public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throws Exception {
5655
StreamTableEnvironmentImpl tableEnvImpl = ((StreamTableEnvironmentImpl) tableEnv);
57-
StreamPlanner streamPlanner = (StreamPlanner)tableEnvImpl.getPlanner();
56+
StreamPlanner streamPlanner = (StreamPlanner) tableEnvImpl.getPlanner();
5857
FlinkPlannerImpl flinkPlanner = streamPlanner.createFlinkPlanner();
5958

6059
RichSqlInsert insert = (RichSqlInsert) flinkPlanner.validate(flinkPlanner.parser().parse(stmt));
6160
TableImpl queryResult = extractQueryTableFromInsertCaluse(tableEnvImpl, flinkPlanner, insert);
6261

63-
String targetTableName = ((SqlIdentifier) ((SqlInsert) insert).getTargetTable()).names.get(0);
62+
String targetTableName = ((SqlIdentifier) insert.getTargetTable()).names.get(0);
6463
TableSink tableSink = getTableSinkByPlanner(streamPlanner, targetTableName);
6564

6665
String[] sinkFieldNames = tableSink.getTableSchema().getFieldNames();
6766
String[] queryFieldNames = queryResult.getSchema().getFieldNames();
68-
69-
if (sinkFieldNames.length != queryFieldNames.length) {
70-
throw new ValidationException(
71-
"Field name of query result and registered TableSink " + targetTableName + " do not match.\n" +
72-
"Query result schema: " + String.join(",", queryFieldNames) + "\n" +
73-
"TableSink schema: " + String.join(",", sinkFieldNames));
74-
}
75-
76-
77-
Table newTable = null;
67+
Table newTable;
7868
try {
7969
newTable = queryResult.select(String.join(",", sinkFieldNames));
80-
} catch (Exception e) {
81-
throw new ValidationException(
82-
"Field name of query result and registered TableSink "+targetTableName +" do not match.\n" +
83-
"Query result schema: " + String.join(",", queryFieldNames) + "\n" +
84-
"TableSink schema: " + String.join(",", sinkFieldNames));
85-
}
86-
87-
try {
8870
tableEnv.insertInto(targetTableName, newTable);
8971
} catch (Exception e) {
90-
LOG.warn("Field name case of query result and registered TableSink do not match. ", e);
72+
LOG.warn(String.format("Query result and registered TableSink do not match \n input field list:%s \n output field list:%s ", Arrays.toString(queryFieldNames), Arrays.toString(sinkFieldNames)));
9173
newTable = queryResult.select(String.join(",", ignoreCase(queryFieldNames, sinkFieldNames)));
9274
tableEnv.insertInto(targetTableName, newTable);
9375
}
94-
9576
}
9677

9778
private static TableSink getTableSinkByPlanner(StreamPlanner streamPlanner, String targetTableName)

0 commit comments

Comments
 (0)