Skip to content

Commit df06318

Browse files
author
xuchao
committed
修改维表AllCache 情况下。如果字段选择了ROWTIME, 框架本身在输出的时候会对rowtime进行时区处理,导致window无法触发
1 parent 7fafc92 commit df06318

File tree

7 files changed

+22
-14
lines changed

7 files changed

+22
-14
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,7 @@
5050
import java.net.InetAddress;
5151
import java.sql.SQLException;
5252
import java.sql.Timestamp;
53-
import java.util.ArrayList;
54-
import java.util.Calendar;
55-
import java.util.List;
56-
import java.util.Map;
53+
import java.util.*;
5754
import java.util.concurrent.atomic.AtomicReference;
5855

5956
/**
@@ -91,8 +88,10 @@ public Row fillData(Row input, Object sideInput) {
9188

9289
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
9390
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
94-
obj = ((Timestamp) obj).getTime();
91+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
92+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
9593
}
94+
9695
row.setField(entry.getKey(), obj);
9796
}
9897

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.calcite.sql.JoinType;
3333

3434
import java.sql.SQLException;
35+
import java.util.TimeZone;
3536
import java.util.concurrent.ScheduledExecutorService;
3637
import java.util.concurrent.ScheduledThreadPoolExecutor;
3738
import java.util.concurrent.TimeUnit;
@@ -49,6 +50,8 @@ public abstract class BaseAllReqRow extends RichFlatMapFunction<CRow, CRow> impl
4950

5051
public static final long LOAD_DATA_ERROR_SLEEP_TIME = 5_000L;
5152

53+
public static final TimeZone LOCAL_TZ = TimeZone.getDefault();
54+
5255
protected BaseSideInfo sideInfo;
5356

5457
private ScheduledExecutorService es;

elasticsearch6/elasticsearch6-side/elasticsearch6-all-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AllReqRow.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,11 @@ public Row fillData(Row input, Object sideInput) {
106106

107107
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
108108
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
109-
obj = ((Timestamp) obj).getTime();
109+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
110+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
110111
}
111112

113+
112114
row.setField(entry.getKey(), obj);
113115
}
114116

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,10 @@ public Row fillData(Row input, Object sideInput) {
8888

8989
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
9090
if(obj instanceof Timestamp && isTimeIndicatorTypeInfo){
91-
obj = ((Timestamp)obj).getTime();
91+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
92+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
9293
}
94+
9395
row.setField(entry.getKey(), obj);
9496
}
9597

kudu/kudu-side/kudu-all-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAllReqRow.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,10 @@ public Row fillData(Row input, Object sideInput) {
7676

7777
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
7878
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
79-
obj = ((Timestamp) obj).getTime();
79+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
80+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
8081
}
82+
8183
row.setField(entry.getKey(), obj);
8284
}
8385

mongo/mongo-side/mongo-all-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAllReqRow.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,10 @@ public Row fillData(Row input, Object sideInput) {
8888

8989
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
9090
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
91-
obj = ((Timestamp) obj).getTime();
91+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
92+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
9293
}
94+
9395
row.setField(entry.getKey(), obj);
9496
}
9597

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,7 @@
4343
import java.sql.SQLException;
4444
import java.sql.Statement;
4545
import java.sql.Timestamp;
46-
import java.util.ArrayList;
47-
import java.util.Calendar;
48-
import java.util.List;
49-
import java.util.Map;
46+
import java.util.*;
5047
import java.util.concurrent.atomic.AtomicReference;
5148
import java.util.stream.Collectors;
5249

@@ -161,7 +158,8 @@ public Row fillData(Row input, Object sideInput) {
161158
protected Object dealTimeAttributeType(Class<? extends TypeInformation> entry, Object obj) {
162159
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(entry);
163160
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
164-
obj = ((Timestamp) obj).getTime();
161+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
162+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
165163
}
166164
return obj;
167165
}

0 commit comments

Comments
 (0)