Skip to content

Commit 7fafc92

Browse files
author
xuchao
committed
修改异步维表情况下。如果字段选择了ROWTIME, 框架本身在输出的时候会对rowtime进行时区处理,导致window无法触发
1 parent 828062e commit 7fafc92

File tree

7 files changed

+36
-13
lines changed

7 files changed

+36
-13
lines changed

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import java.util.ArrayList;
6161
import java.util.List;
6262
import java.util.Map;
63+
import java.util.TimeZone;
6364

6465
/**
6566
* Reason:
@@ -73,6 +74,8 @@ public class CassandraAsyncReqRow extends BaseAsyncReqRow {
7374

7475
private static final Logger LOG = LoggerFactory.getLogger(CassandraAsyncReqRow.class);
7576

77+
private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
78+
7679
private final static int DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE = 10;
7780

7881
private final static int DEFAULT_VERTX_WORKER_POOL_SIZE = 20;
@@ -251,7 +254,8 @@ public Row fillData(Row input, Object line) {
251254
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
252255

253256
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
254-
obj = ((Timestamp) obj).getTime();
257+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
258+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
255259
}
256260

257261
row.setField(entry.getKey(), obj);

elasticsearch6/elasticsearch6-side/elasticsearch6-async-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AsyncReqRow.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.sql.Timestamp;
5858
import java.util.List;
5959
import java.util.Map;
60+
import java.util.TimeZone;
6061

6162
/**
6263
* @author yinxi
@@ -65,6 +66,7 @@
6566
public class Elasticsearch6AsyncReqRow extends BaseAsyncReqRow implements Serializable {
6667

6768
private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6AsyncReqRow.class);
69+
private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
6870
private transient RestHighLevelClient rhlClient;
6971
private SearchRequest searchRequest;
7072
private List<String> sqlJoinCompareOperate = Lists.newArrayList();
@@ -195,7 +197,8 @@ public Row fillData(Row input, Object line) {
195197
Object obj = input.getField(entry.getValue());
196198
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
197199
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
198-
obj = ((Timestamp) obj).getTime();
200+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
201+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
199202
}
200203

201204
row.setField(entry.getKey(), obj);

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.Collections;
4949
import java.util.List;
5050
import java.util.Map;
51+
import java.util.TimeZone;
5152
import java.util.concurrent.ExecutorService;
5253
import java.util.concurrent.LinkedBlockingQueue;
5354
import java.util.concurrent.ThreadPoolExecutor;
@@ -63,6 +64,8 @@ public class HbaseAsyncReqRow extends BaseAsyncReqRow {
6364

6465
private static final long serialVersionUID = 2098635104857937717L;
6566

67+
private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
68+
6669
private static final Logger LOG = LoggerFactory.getLogger(HbaseAsyncReqRow.class);
6770

6871
//match to the rule of netty3
@@ -143,7 +146,8 @@ public Row fillData(Row input, Object sideInput){
143146
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
144147

145148
if(obj instanceof Timestamp && isTimeIndicatorTypeInfo){
146-
obj = ((Timestamp)obj).getTime();
149+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
150+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
147151
}
148152

149153
row.setField(entry.getKey(), obj);

kudu/kudu-side/kudu-async-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAsyncReqRow.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,14 @@
3636
import org.slf4j.LoggerFactory;
3737

3838
import java.sql.Timestamp;
39-
import java.util.Arrays;
40-
import java.util.Collections;
41-
import java.util.List;
42-
import java.util.Map;
39+
import java.util.*;
4340

4441
public class KuduAsyncReqRow extends BaseAsyncReqRow {
4542

4643
private static final Logger LOG = LoggerFactory.getLogger(KuduAsyncReqRow.class);
44+
45+
private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
46+
4747
/**
4848
* 获取连接的尝试次数
4949
*/
@@ -184,6 +184,8 @@ public Row fillData(Row input, Object sideInput) {
184184
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
185185
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
186186
obj = ((Timestamp) obj).getTime();
187+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
188+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
187189
}
188190
row.setField(entry.getKey(), obj);
189191
}

mongo/mongo-side/mongo-async-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAsyncReqRow.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,7 @@
5050
import org.slf4j.LoggerFactory;
5151

5252
import java.sql.Timestamp;
53-
import java.util.Collection;
54-
import java.util.Collections;
55-
import java.util.List;
56-
import java.util.Map;
53+
import java.util.*;
5754
import java.util.concurrent.atomic.AtomicInteger;
5855

5956
/**
@@ -63,8 +60,11 @@
6360
* @author xuqianjin
6461
*/
6562
public class MongoAsyncReqRow extends BaseAsyncReqRow {
63+
6664
private static final long serialVersionUID = -1183158242862673706L;
6765

66+
private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
67+
6868
private static final Logger LOG = LoggerFactory.getLogger(MongoAsyncReqRow.class);
6969

7070
private transient MongoClient mongoClient;
@@ -165,6 +165,8 @@ public Row fillData(Row input, Object line) {
165165

166166
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
167167
obj = ((Timestamp) obj).getTime();
168+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
169+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
168170
}
169171

170172
row.setField(entry.getKey(), obj);

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.time.Instant;
4848
import java.util.List;
4949
import java.util.Map;
50+
import java.util.TimeZone;
5051
import java.util.concurrent.*;
5152
import java.util.concurrent.atomic.AtomicBoolean;
5253
import java.util.concurrent.atomic.AtomicLong;
@@ -62,6 +63,8 @@ public class RdbAsyncReqRow extends BaseAsyncReqRow {
6263

6364
private static final long serialVersionUID = 2098635244857937720L;
6465

66+
private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
67+
6568
private static final Logger LOG = LoggerFactory.getLogger(RdbAsyncReqRow.class);
6669

6770
public final static int DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE = 1;
@@ -211,7 +214,8 @@ public Row fillData(Row input, Object line) {
211214
Object obj = input.getField(entry.getValue());
212215
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
213216
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
214-
obj = ((Timestamp) obj).getTime();
217+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
218+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
215219
}
216220

217221
row.setField(entry.getKey(), obj);

redis5/redis5-side/redis-side-core/src/main/java/com/dtstack/flink/sql/side/redis/table/RedisSideReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.sql.Date;
2929
import java.sql.Timestamp;
3030
import java.util.Map;
31+
import java.util.TimeZone;
3132

3233
/**
3334
* redis fill row data
@@ -40,6 +41,8 @@ public class RedisSideReqRow implements ISideReqRow, Serializable {
4041

4142
private static final long serialVersionUID = 3751171828444748982L;
4243

44+
private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
45+
4346
private BaseSideInfo sideInfo;
4447

4548
public RedisSideReqRow(BaseSideInfo sideInfo){
@@ -55,7 +58,8 @@ public Row fillData(Row input, Object sideInput) {
5558
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
5659

5760
if(obj instanceof Timestamp && isTimeIndicatorTypeInfo){
58-
obj = ((Timestamp)obj).getTime();
61+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
62+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
5963
}
6064
row.setField(entry.getKey(), obj);
6165
}

0 commit comments

Comments
 (0)