Skip to content

Commit e89bb80

Browse files
author
sishu@dtstack.com
committed
optize
1 parent 03afb78 commit e89bb80

File tree

1 file changed

+4
-4
lines changed

1 file changed

+4
-4
lines changed

core/src/main/java/com/dtstack/flink/sql/watermarker/WaterMarkerAssigner.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828
import org.apache.flink.streaming.api.windowing.time.Time;
2929
import org.apache.flink.types.Row;
3030
import org.apache.flink.util.Preconditions;
31-
32-
import java.util.TimeZone;
31+
import java.sql.Timestamp;
32+
import java.lang.Long;
3333

3434
/**
3535
* define watermarker
@@ -76,9 +76,9 @@ public DataStream assignWaterMarker(DataStream<Row> dataStream, RowTypeInfo type
7676
TypeInformation fieldType = fieldTypes[pos];
7777

7878
AbsCustomerWaterMarker waterMarker = null;
79-
if(fieldType.getTypeClass().getTypeName().equalsIgnoreCase("java.sql.Timestamp")){
79+
if(fieldType.getTypeClass().isAssignableFrom(Timestamp.class)){
8080
waterMarker = new CustomerWaterMarkerForTimeStamp(Time.milliseconds(maxOutOrderness), pos,timeZone);
81-
}else if(fieldType.getTypeClass().getTypeName().equalsIgnoreCase("java.lang.Long")){
81+
}else if(fieldType.getTypeClass().isAssignableFrom(Long.class)){
8282
waterMarker = new CustomerWaterMarkerForLong(Time.milliseconds(maxOutOrderness), pos,timeZone);
8383
}else{
8484
throw new IllegalArgumentException("not support type of " + fieldType + ", current only support(timestamp, long).");

0 commit comments

Comments
 (0)