2727import org .apache .flink .table .plan .logical .LogicalRelNode ;
2828import org .apache .flink .table .plan .schema .TableSinkTable ;
2929import org .apache .flink .table .plan .schema .TableSourceSinkTable ;
30+ import org .slf4j .Logger ;
31+ import org .slf4j .LoggerFactory ;
3032import scala .Option ;
3133
3234import java .lang .reflect .Method ;
3840 */
3941public class FlinkSQLExec {
4042
43+ private static final Logger LOG = LoggerFactory .getLogger (FlinkSQLExec .class );
44+
4145 public static void sqlUpdate (StreamTableEnvironment tableEnv , String stmt , StreamQueryConfig queryConfig ) throws Exception {
4246
4347 FlinkPlannerImpl planner = new FlinkPlannerImpl (tableEnv .getFrameworkConfig (), tableEnv .getPlanner (), tableEnv .getTypeFactory ());
@@ -64,18 +68,47 @@ public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt, Strea
6468
6569 TableSourceSinkTable targetTable = (TableSourceSinkTable ) sinkTab .get ();
6670 TableSinkTable tableSinkTable = (TableSinkTable )targetTable .tableSinkTable ().get ();
67- String [] fieldNames = tableSinkTable .tableSink ().getFieldNames ();
71+
72+ StreamQueryConfig config = null == queryConfig ? tableEnv .queryConfig () : queryConfig ;
73+ String [] sinkFieldNames = tableSinkTable .tableSink ().getFieldNames ();
74+ String [] queryFieldNames = queryResult .getSchema ().getColumnNames ();
75+ if (sinkFieldNames .length != queryFieldNames .length ) {
76+ throw new ValidationException (
77+ "Field name of query result and registered TableSink " + targetTableName + " do not match.\n " +
78+ "Query result schema: " + String .join ("," , queryFieldNames ) + "\n " +
79+ "TableSink schema: " + String .join ("," , sinkFieldNames ));
80+ }
6881
6982 Table newTable = null ;
7083 try {
71- newTable = queryResult .select (String .join ("," , fieldNames ));
84+ // sinkFieldNames not in queryResult error
85+ newTable = queryResult .select (String .join ("," , sinkFieldNames ));
7286 } catch (Exception e ) {
7387 throw new ValidationException (
74- "Field name of query result and registered TableSink " + targetTableName +" do not match.\n " +
75- "Query result schema: " + String .join ("," , queryResult .getSchema ().getColumnNames ()) + "\n " +
76- "TableSink schema: " + String .join ("," , fieldNames ));
88+ "Field name of query result and registered TableSink " + targetTableName + " do not match.\n " +
89+ "Query result schema: " + String .join ("," , queryResult .getSchema ().getColumnNames ()) + "\n " +
90+ "TableSink schema: " + String .join ("," , sinkFieldNames ));
7791 }
78- StreamQueryConfig config = null == queryConfig ? tableEnv .queryConfig () : queryConfig ;
79- tableEnv .insertInto (newTable , targetTableName , config );
92+
93+ try {
94+ tableEnv .insertInto (newTable , targetTableName , config );
95+ } catch (Exception ex ) {
96+ LOG .warn ("Field name case of query result and registered TableSink " + targetTableName + "do not match. " + ex .getMessage ());
97+ newTable = queryResult .select (String .join ("," , ignoreCase (queryFieldNames , sinkFieldNames )));
98+ tableEnv .insertInto (newTable , targetTableName , config );
99+ }
100+ }
101+
102+ public static String [] ignoreCase (String [] queryFieldNames , String [] sinkFieldNames ) {
103+ String [] newFieldNames = sinkFieldNames ;
104+ for (int i = 0 ; i < newFieldNames .length ; i ++) {
105+ for (String queryFieldName : queryFieldNames ) {
106+ if (newFieldNames [i ].equalsIgnoreCase (queryFieldName )) {
107+ newFieldNames [i ] = queryFieldName ;
108+ break ;
109+ }
110+ }
111+ }
112+ return newFieldNames ;
80113 }
81114}
0 commit comments