Skip to content

Commit a54ce86

Browse files
committed
[fix-32570][core]关闭enableObjectReuse,并修复关闭之后带来的序列化问题
1 parent 76f133f commit a54ce86

File tree

2 files changed

+6
-2
lines changed

2 files changed

+6
-2
lines changed

core/src/main/java/com/dtstack/flink/sql/environment/StreamEnvConfigManager.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,6 @@ public static void streamExecutionEnvironmentConfig(StreamExecutionEnvironment s
7373

7474
confProperties = PropertiesUtils.propertiesTrim(confProperties);
7575
streamEnv.getConfig().disableClosureCleaner();
76-
// Disables reusing object
77-
streamEnv.getConfig().enableObjectReuse();
7876

7977
Configuration globalJobParameters = new Configuration();
8078
//Configuration unsupported set properties key-value

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.commons.lang3.StringUtils;
4646
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
4747
import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo;
48+
import org.apache.flink.api.common.typeinfo.TypeHint;
4849
import org.apache.flink.api.common.typeinfo.TypeInformation;
4950
import org.apache.flink.api.java.typeutils.RowTypeInfo;
5051
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -65,6 +66,7 @@
6566
import org.slf4j.Logger;
6667
import org.slf4j.LoggerFactory;
6768

69+
import java.sql.Timestamp;
6870
import java.time.LocalDateTime;
6971
import java.util.Arrays;
7072
import java.util.LinkedList;
@@ -442,6 +444,10 @@ private void joinFun(Object pollObj,
442444
if (fieldDataTypes[i].getClass().equals(LegacyLocalDateTimeTypeInfo.class)) {
443445
fieldDataTypes[i] = LocalTimeTypeInfo.LOCAL_DATE_TIME;
444446
}
447+
448+
if (fieldDataTypes[i].getClass().equals(TimeIndicatorTypeInfo.class)) {
449+
fieldDataTypes[i] = TypeInformation.of(new TypeHint<Timestamp>() {});
450+
}
445451
}
446452

447453
RowTypeInfo typeInfo = new RowTypeInfo(fieldDataTypes, targetTable.getSchema().getFieldNames());

0 commit comments

Comments
 (0)