@@ -64,41 +64,46 @@ public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt, Strea
6464
6565 TableSourceSinkTable targetTable = (TableSourceSinkTable ) sinkTab .get ();
6666 TableSinkTable tableSinkTable = (TableSinkTable )targetTable .tableSinkTable ().get ();
67- String [] fieldNames = tableSinkTable .tableSink ().getFieldNames ();
67+
68+ StreamQueryConfig config = null == queryConfig ? tableEnv .queryConfig () : queryConfig ;
69+ String [] sinkFieldNames = tableSinkTable .tableSink ().getFieldNames ();
70+ String [] queryFieldNames = queryResult .getSchema ().getColumnNames ();
71+ if (sinkFieldNames .length != queryFieldNames .length ) {
72+ throw new ValidationException (
73+ "Field name of query result and registered TableSink " + targetTableName + " do not match.\n " +
74+ "Query result schema: " + String .join ("," , queryFieldNames ) + "\n " +
75+ "TableSink schema: " + String .join ("," , sinkFieldNames ));
76+ }
6877
6978 Table newTable = null ;
7079 try {
71-
72- newTable = queryResult .select (String .join ("," , fieldNames ));
80+ // sinkFieldNames not in queryResult error
81+ newTable = queryResult .select (String .join ("," , sinkFieldNames ));
7382 } catch (Exception e ) {
74- try {
75-
76- newTable = queryResult .select ( String .join ("," , ignoreCase (queryResult , fieldNames )));
77- }catch (Exception ex ){
78- throw new ValidationException (
79- "Field name of query result and registered TableSink " +targetTableName +" do not match.\n " +
80- "Query result schema: " + String .join ("," , queryResult .getSchema ().getColumnNames ()) + "\n " +
81- "TableSink schema: " + String .join ("," , fieldNames ));
82- }
83+ throw new ValidationException (
84+ "Field name of query result and registered TableSink " + targetTableName + " do not match.\n " +
85+ "Query result schema: " + String .join ("," , queryResult .getSchema ().getColumnNames ()) + "\n " +
86+ "TableSink schema: " + String .join ("," , sinkFieldNames ));
8387 }
84- StreamQueryConfig config = null == queryConfig ? tableEnv .queryConfig () : queryConfig ;
85- tableEnv .insertInto (newTable , targetTableName , config );
88+
89+ try {
90+ tableEnv .insertInto (newTable , targetTableName , config );
91+ } catch (Exception ex ) {
92+ newTable = queryResult .select (String .join ("," , ignoreCase (queryFieldNames , sinkFieldNames )));
93+ tableEnv .insertInto (newTable , targetTableName , config );
94+ }
95+
8696 }
8797
88- public static String [] ignoreCase (Table queryResult , String [] fieldNames ){
89- String [] newFieldNames = new String [fieldNames .length ];
90- String [] queryFieldNames = queryResult .getSchema ().getColumnNames ();
91- for (int i =0 ; i <fieldNames .length ; i ++){
92- boolean flag = true ;
93- for (String queryFieldName : queryFieldNames ){
94- if (fieldNames [i ].equalsIgnoreCase (queryFieldName )){
98+ public static String [] ignoreCase (String [] queryFieldNames , String [] sinkFieldNames ) {
99+ String [] newFieldNames = sinkFieldNames ;
100+ for (int i = 0 ; i < newFieldNames .length ; i ++) {
101+ for (String queryFieldName : queryFieldNames ) {
102+ if (newFieldNames [i ].equalsIgnoreCase (queryFieldName )) {
95103 newFieldNames [i ] = queryFieldName ;
96- continue ;
104+ break ;
97105 }
98106 }
99- if (flag ){
100- newFieldNames [i ] = fieldNames [i ];
101- }
102107 }
103108 return newFieldNames ;
104109 }
0 commit comments