Skip to content

Commit 82cb3fe

Browse files
flinkSql字段名忽略大小写
1 parent 3713301 commit 82cb3fe

File tree

1 file changed

+16
-15
lines changed

1 file changed

+16
-15
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/FlinkSQLExec.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -70,35 +70,36 @@ public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt, Strea
7070
try {
7171

7272
newTable = queryResult.select(String.join(",", fieldNames));
73+
7374
} catch (Exception e) {
74-
try{
75-
76-
newTable = queryResult.select( String.join(",", ignoreCase(queryResult, fieldNames)));
77-
}catch (Exception ex){
78-
throw new ValidationException(
79-
"Field name of query result and registered TableSink "+targetTableName +" do not match.\n" +
80-
"Query result schema: " + String.join(",", queryResult.getSchema().getColumnNames()) + "\n" +
81-
"TableSink schema: " + String.join(",", fieldNames));
82-
}
75+
76+
throw new ValidationException(
77+
"Field name of query result and registered TableSink "+targetTableName +" do not match.\n" +
78+
"Query result schema: " + String.join(",", queryResult.getSchema().getColumnNames()) + "\n" +
79+
"TableSink schema: " + String.join(",", fieldNames));
80+
8381
}
8482
StreamQueryConfig config = null == queryConfig ? tableEnv.queryConfig() : queryConfig;
85-
tableEnv.insertInto(newTable, targetTableName, config);
83+
try{
84+
tableEnv.insertInto(newTable, targetTableName, config);
85+
}catch (Exception ex){
86+
newTable = queryResult.select( String.join(",", ignoreCase(queryResult, fieldNames)));
87+
System.out.println(String.join(",",fieldNames));
88+
tableEnv.insertInto(newTable, targetTableName, config);
89+
}
90+
8691
}
8792

8893
public static String[] ignoreCase(Table queryResult, String[] fieldNames){
89-
String[] newFieldNames = new String[fieldNames.length];
94+
String[] newFieldNames = fieldNames.clone();
9095
String[] queryFieldNames = queryResult.getSchema().getColumnNames();
9196
for(int i=0; i<fieldNames.length; i++){
92-
boolean flag = true;
9397
for(String queryFieldName : queryFieldNames){
9498
if(fieldNames[i].equalsIgnoreCase(queryFieldName)){
9599
newFieldNames[i] = queryFieldName;
96100
continue;
97101
}
98102
}
99-
if(flag){
100-
newFieldNames[i] = fieldNames[i];
101-
}
102103
}
103104
return newFieldNames;
104105
}

0 commit comments

Comments
 (0)