Skip to content

Commit 3c19ba7

Browse files
committed
[fix] timestamp 多表JOIN异常
1 parent a415265 commit 3c19ba7

File tree

1 file changed

+13
-1
lines changed

1 file changed

+13
-1
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.commons.collections.CollectionUtils;
4545
import org.apache.commons.lang3.StringUtils;
4646
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
47+
import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo;
4748
import org.apache.flink.api.common.typeinfo.TypeInformation;
4849
import org.apache.flink.api.java.typeutils.RowTypeInfo;
4950
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -54,9 +55,11 @@
5455
import org.apache.flink.table.catalog.ObjectIdentifier;
5556
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
5657
import org.apache.flink.table.runtime.typeutils.BigDecimalTypeInfo;
58+
import org.apache.flink.table.runtime.typeutils.LegacyLocalDateTimeTypeInfo;
5759
import org.apache.flink.table.types.logical.DecimalType;
5860
import org.apache.flink.table.types.logical.LegacyTypeInformationType;
5961
import org.apache.flink.table.types.logical.LogicalType;
62+
import org.apache.flink.table.types.logical.TimestampType;
6063
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
6164
import org.apache.flink.types.Row;
6265
import org.slf4j.Logger;
@@ -387,6 +390,11 @@ private void joinFun(Object pollObj,
387390
((LegacyTypeInformationType<?>) logicalTypes[i]).getTypeInformation().getClass().equals(BigDecimalTypeInfo.class)) {
388391
logicalTypes[i] = new DecimalType(38, 18);
389392
}
393+
394+
if (logicalTypes[i] instanceof LegacyTypeInformationType &&
395+
(((LegacyTypeInformationType<?>) logicalTypes[i]).getTypeInformation().getClass().equals(LegacyLocalDateTimeTypeInfo.class))) {
396+
logicalTypes[i] = new TimestampType(TimestampType.MAX_PRECISION);
397+
}
390398
}
391399

392400
BaseRowTypeInfo leftBaseTypeInfo = new BaseRowTypeInfo(logicalTypes, leftTable.getSchema().getFieldNames());
@@ -425,11 +433,15 @@ private void joinFun(Object pollObj,
425433
targetTable = localTableCache.get(joinInfo.getLeftTableName());
426434
}
427435

428-
TypeInformation[] fieldDataTypes = targetTable.getSchema().getFieldTypes();
436+
TypeInformation<?>[] fieldDataTypes = targetTable.getSchema().getFieldTypes();
429437
for (int i = 0; i < fieldDataTypes.length; i++) {
430438
if (fieldDataTypes[i].getClass().equals(BigDecimalTypeInfo.class)) {
431439
fieldDataTypes[i] = BasicTypeInfo.BIG_DEC_TYPE_INFO;
432440
}
441+
442+
if (fieldDataTypes[i].getClass().equals(LegacyLocalDateTimeTypeInfo.class)) {
443+
fieldDataTypes[i] = LocalTimeTypeInfo.LOCAL_DATE_TIME;
444+
}
433445
}
434446

435447
RowTypeInfo typeInfo = new RowTypeInfo(fieldDataTypes, targetTable.getSchema().getFieldNames());

0 commit comments

Comments
 (0)