Skip to content

Commit 8c4a25a

Browse files
committed
[fix-33847][core] fix other sink except kafka table schema not found problem.
1 parent 6795386 commit 8c4a25a

File tree

2 files changed

+16
-2
lines changed

2 files changed

+16
-2
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,9 +332,15 @@ public static Set<URL> registerTable(SqlTree sqlTree,
332332
URL sourceTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), AbstractSourceTableInfo.SOURCE_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
333333
pluginClassPathSets.add(sourceTablePathUrl);
334334
} else if (tableInfo instanceof AbstractTargetTableInfo) {
335-
336335
TableSink tableSink = StreamSinkFactory.getTableSink((AbstractTargetTableInfo) tableInfo, localSqlPluginPath, pluginLoadMode);
337-
tableEnv.registerTableSink(tableInfo.getName(), tableSink);
336+
// TODO Kafka Sink直接注册,其他的Sink要修复才可以。
337+
if (tableInfo.getType().startsWith("kafka")) {
338+
tableEnv.registerTableSink(tableInfo.getName(), tableSink);
339+
} else {
340+
TypeInformation[] flinkTypes = FunctionManager.transformTypes(tableInfo.getFieldClasses());
341+
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);
342+
}
343+
338344
URL sinkTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), AbstractTargetTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
339345
pluginClassPathSets.add(sinkTablePathUrl);
340346
} else if (tableInfo instanceof AbstractSideTableInfo) {

core/src/main/java/com/dtstack/flink/sql/function/FunctionManager.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,4 +124,12 @@ public static void registerAggregateUDF(String classPath, String funcName, Table
124124
}
125125
}
126126

127+
public static TypeInformation[] transformTypes(Class[] fieldTypes) {
128+
TypeInformation[] types = new TypeInformation[fieldTypes.length];
129+
for (int i = 0; i < fieldTypes.length; i++) {
130+
types[i] = TypeInformation.of(fieldTypes[i]);
131+
}
132+
return types;
133+
}
134+
127135
}

0 commit comments

Comments
 (0)