1919package com .dtstack .flink .sql .exec ;
2020
2121import org .apache .calcite .sql .SqlIdentifier ;
22- import org .apache .calcite .sql .SqlInsert ;
2322import org .apache .flink .sql .parser .dml .RichSqlInsert ;
2423import org .apache .flink .table .api .Table ;
25- import org .apache .flink .table .api .ValidationException ;
2624import org .apache .flink .table .api .internal .TableEnvironmentImpl ;
2725import org .apache .flink .table .api .internal .TableImpl ;
2826import org .apache .flink .table .api .java .StreamTableEnvironment ;
4240
4341import java .lang .reflect .InvocationTargetException ;
4442import java .lang .reflect .Method ;
43+ import java .util .Arrays ;
4544
4645
4746/**
48- * @description: mapping by name when insert into sink table
47+ * @description: mapping by name when insert into sink table
4948 * @author: maqi
5049 * @create: 2019/08/15 11:09
5150 */
@@ -54,44 +53,26 @@ public class FlinkSQLExec {
5453
5554 public static void sqlUpdate (StreamTableEnvironment tableEnv , String stmt ) throws Exception {
5655 StreamTableEnvironmentImpl tableEnvImpl = ((StreamTableEnvironmentImpl ) tableEnv );
57- StreamPlanner streamPlanner = (StreamPlanner )tableEnvImpl .getPlanner ();
56+ StreamPlanner streamPlanner = (StreamPlanner ) tableEnvImpl .getPlanner ();
5857 FlinkPlannerImpl flinkPlanner = streamPlanner .createFlinkPlanner ();
5958
6059 RichSqlInsert insert = (RichSqlInsert ) flinkPlanner .validate (flinkPlanner .parser ().parse (stmt ));
6160 TableImpl queryResult = extractQueryTableFromInsertCaluse (tableEnvImpl , flinkPlanner , insert );
6261
63- String targetTableName = ((SqlIdentifier ) (( SqlInsert ) insert ) .getTargetTable ()).names .get (0 );
62+ String targetTableName = ((SqlIdentifier ) insert .getTargetTable ()).names .get (0 );
6463 TableSink tableSink = getTableSinkByPlanner (streamPlanner , targetTableName );
6564
6665 String [] sinkFieldNames = tableSink .getTableSchema ().getFieldNames ();
6766 String [] queryFieldNames = queryResult .getSchema ().getFieldNames ();
68-
69- if (sinkFieldNames .length != queryFieldNames .length ) {
70- throw new ValidationException (
71- "Field name of query result and registered TableSink " + targetTableName + " do not match.\n " +
72- "Query result schema: " + String .join ("," , queryFieldNames ) + "\n " +
73- "TableSink schema: " + String .join ("," , sinkFieldNames ));
74- }
75-
76-
77- Table newTable = null ;
67+ Table newTable ;
7868 try {
7969 newTable = queryResult .select (String .join ("," , sinkFieldNames ));
80- } catch (Exception e ) {
81- throw new ValidationException (
82- "Field name of query result and registered TableSink " +targetTableName +" do not match.\n " +
83- "Query result schema: " + String .join ("," , queryFieldNames ) + "\n " +
84- "TableSink schema: " + String .join ("," , sinkFieldNames ));
85- }
86-
87- try {
8870 tableEnv .insertInto (targetTableName , newTable );
8971 } catch (Exception e ) {
90- LOG .warn ("Field name case of query result and registered TableSink do not match. " , e );
72+ LOG .warn (String . format ( "Query result and registered TableSink do not match \n input field list:%s \n output field list:%s " , Arrays . toString ( queryFieldNames ), Arrays . toString ( sinkFieldNames )) );
9173 newTable = queryResult .select (String .join ("," , ignoreCase (queryFieldNames , sinkFieldNames )));
9274 tableEnv .insertInto (targetTableName , newTable );
9375 }
94-
9576 }
9677
9778 private static TableSink getTableSinkByPlanner (StreamPlanner streamPlanner , String targetTableName )
0 commit comments