Skip to content

Commit 83af3c0

Browse files
author
gituser
committed
Merge branch 'hotfix_1.10_4.0.x_31763' into 1.10_release_4.0.x
2 parents ef35b0c + c0f3b0e commit 83af3c0

File tree

3 files changed

+420
-4
lines changed

3 files changed

+420
-4
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import com.dtstack.flink.sql.table.AbstractTargetTableInfo;
4343
import com.dtstack.flink.sql.util.DtStringUtil;
4444
import com.dtstack.flink.sql.util.PluginUtil;
45+
import com.dtstack.flink.sql.util.TypeInfoDataTypeConverter;
4546
import com.dtstack.flink.sql.watermarker.WaterMarkerAssigner;
4647
import com.fasterxml.jackson.databind.ObjectMapper;
4748
import com.google.common.base.Preconditions;
@@ -64,6 +65,7 @@
6465
import org.apache.flink.table.api.java.StreamTableEnvironment;
6566
import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
6667
import org.apache.flink.table.sinks.TableSink;
68+
import org.apache.flink.table.types.DataType;
6769
import org.slf4j.Logger;
6870
import org.slf4j.LoggerFactory;
6971

@@ -80,6 +82,7 @@
8082
import java.util.Properties;
8183
import java.util.Set;
8284
import java.util.TimeZone;
85+
import java.util.stream.Stream;
8386

8487
/**
8588
* 任务执行时的流程方法
@@ -295,7 +298,7 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
295298
String adaptSql = sourceTableInfo.getAdaptSelectSql();
296299
Table adaptTable = adaptSql == null ? table : tableEnv.sqlQuery(adaptSql);
297300

298-
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getFieldTypes(), adaptTable.getSchema().getFieldNames());
301+
RowTypeInfo typeInfo = new RowTypeInfo(fromDataTypeToLegacyInfo(adaptTable.getSchema().getFieldDataTypes()), adaptTable.getSchema().getFieldNames());
299302
DataStream adaptStream = tableEnv.toAppendStream(adaptTable, typeInfo);
300303

301304
String fields = String.join(",", typeInfo.getFieldNames());
@@ -389,4 +392,10 @@ private static void timeZoneCheck(String timeZone) {
389392
throw new IllegalArgumentException(String.format(" timezone of %s is Incorrect!", timeZone));
390393
}
391394
}
395+
396+
private static TypeInformation<?>[] fromDataTypeToLegacyInfo(DataType[] dataType) {
397+
return Stream.of(dataType)
398+
.map(TypeInfoDataTypeConverter::toLegacyTypeInfo)
399+
.toArray(TypeInformation[]::new);
400+
}
392401
}

0 commit comments

Comments
 (0)