Skip to content

Commit 4d087dc

Browse files
author
dapeng
committed
修改flink.1.10数据类型转换问题
1 parent ea52aeb commit 4d087dc

File tree

2 files changed

+418
-1
lines changed

2 files changed

+418
-1
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.dtstack.flink.sql.parser.InsertSqlParser;
2525
import com.dtstack.flink.sql.parser.SqlParser;
2626
import com.dtstack.flink.sql.parser.SqlTree;
27+
import com.dtstack.flink.sql.util.TypeInfoDataTypeConverter;
2728
import org.apache.flink.api.common.typeinfo.TypeInformation;
2829
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2930
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -62,6 +63,8 @@
6263
import org.apache.calcite.sql.SqlNode;
6364
import org.apache.commons.io.Charsets;
6465
import org.apache.commons.lang3.StringUtils;
66+
import org.apache.flink.table.types.DataType;
67+
import org.apache.flink.table.types.utils.TypeConversions;
6568
import org.slf4j.Logger;
6669
import org.slf4j.LoggerFactory;
6770

@@ -78,6 +81,7 @@
7881
import java.util.Set;
7982
import java.util.TimeZone;
8083
import java.util.ArrayList;
84+
import java.util.stream.Stream;
8185

8286
/**
8387
* 任务执行时的流程方法
@@ -287,7 +291,7 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
287291
String adaptSql = sourceTableInfo.getAdaptSelectSql();
288292
Table adaptTable = adaptSql == null ? table : tableEnv.sqlQuery(adaptSql);
289293

290-
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getFieldTypes(), adaptTable.getSchema().getFieldNames());
294+
RowTypeInfo typeInfo = new RowTypeInfo(fromDataTypeToLegacyInfo(adaptTable.getSchema().getFieldDataTypes()), adaptTable.getSchema().getFieldNames());
291295
DataStream adaptStream = tableEnv.toAppendStream(adaptTable, typeInfo);
292296

293297
String fields = String.join(",", typeInfo.getFieldNames());
@@ -380,4 +384,10 @@ private static void timeZoneCheck(String timeZone) {
380384
throw new IllegalArgumentException(String.format(" timezone of %s is Incorrect!", timeZone));
381385
}
382386
}
387+
388+
private static TypeInformation<?>[] fromDataTypeToLegacyInfo(DataType[] dataType) {
389+
return Stream.of(dataType)
390+
.map(TypeInfoDataTypeConverter::toLegacyTypeInfo)
391+
.toArray(TypeInformation[]::new);
392+
}
383393
}

0 commit comments

Comments
 (0)