Skip to content

Commit b590fd9

Browse files
flinksql字段名忽略大小写,增加warn日志
1 parent d1f6114 commit b590fd9

File tree

1 file changed

+5
-0
lines changed

1 file changed

+5
-0
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/FlinkSQLExec.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.flink.table.plan.logical.LogicalRelNode;
2828
import org.apache.flink.table.plan.schema.TableSinkTable;
2929
import org.apache.flink.table.plan.schema.TableSourceSinkTable;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
3032
import scala.Option;
3133

3234
import java.lang.reflect.Method;
@@ -38,6 +40,8 @@
3840
*/
3941
public class FlinkSQLExec {
4042

43+
private static final Logger LOG = LoggerFactory.getLogger(FlinkSQLExec.class);
44+
4145
public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt, StreamQueryConfig queryConfig) throws Exception {
4246

4347
FlinkPlannerImpl planner = new FlinkPlannerImpl(tableEnv.getFrameworkConfig(), tableEnv.getPlanner(), tableEnv.getTypeFactory());
@@ -89,6 +93,7 @@ public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt, Strea
8993
try {
9094
tableEnv.insertInto(newTable, targetTableName, config);
9195
} catch (Exception ex) {
96+
LOG.warn("Field name case of query result and registered TableSink " + targetTableName + "do not match. " + ex.getMessage());
9297
newTable = queryResult.select(String.join(",", ignoreCase(queryFieldNames, sinkFieldNames)));
9398
tableEnv.insertInto(newTable, targetTableName, config);
9499
}

0 commit comments

Comments
 (0)