@@ -68,14 +68,38 @@ public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt, Strea
6868
6969 Table newTable = null ;
7070 try {
71+
7172 newTable = queryResult .select (String .join ("," , fieldNames ));
7273 } catch (Exception e ) {
73- throw new ValidationException (
74- "Field name of query result and registered TableSink " +targetTableName +" do not match.\n " +
75- "Query result schema: " + String .join ("," , queryResult .getSchema ().getColumnNames ()) + "\n " +
76- "TableSink schema: " + String .join ("," , fieldNames ));
74+ try {
75+
76+ newTable = queryResult .select ( String .join ("," , ignoreCase (queryResult , fieldNames )));
77+ }catch (Exception ex ){
78+ throw new ValidationException (
79+ "Field name of query result and registered TableSink " +targetTableName +" do not match.\n " +
80+ "Query result schema: " + String .join ("," , queryResult .getSchema ().getColumnNames ()) + "\n " +
81+ "TableSink schema: " + String .join ("," , fieldNames ));
82+ }
7783 }
7884 StreamQueryConfig config = null == queryConfig ? tableEnv .queryConfig () : queryConfig ;
7985 tableEnv .insertInto (newTable , targetTableName , config );
8086 }
87+
88+ public static String [] ignoreCase (Table queryResult , String [] fieldNames ){
89+ String [] newFieldNames = new String [fieldNames .length ];
90+ String [] queryFieldNames = queryResult .getSchema ().getColumnNames ();
91+ for (int i =0 ; i <fieldNames .length ; i ++){
92+ boolean flag = true ;
93+ for (String queryFieldName : queryFieldNames ){
94+ if (fieldNames [i ].equalsIgnoreCase (queryFieldName )){
95+ newFieldNames [i ] = queryFieldName ;
96+ continue ;
97+ }
98+ }
99+ if (flag ){
100+ newFieldNames [i ] = fieldNames [i ];
101+ }
102+ }
103+ return newFieldNames ;
104+ }
81105}
0 commit comments