Skip to content

Commit 2b21311

Browse files
author
gituser
committed
Merge branch '1.8_release_3.10.x' into 1.8_release_4.0.x
2 parents fadcd03 + cb738ac commit 2b21311

File tree

14 files changed

+57
-23
lines changed

14 files changed

+57
-23
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,7 @@
5050
import java.net.InetAddress;
5151
import java.sql.SQLException;
5252
import java.sql.Timestamp;
53-
import java.util.ArrayList;
54-
import java.util.Calendar;
55-
import java.util.List;
56-
import java.util.Map;
53+
import java.util.*;
5754
import java.util.concurrent.atomic.AtomicReference;
5855

5956
/**
@@ -91,8 +88,10 @@ public Row fillData(Row input, Object sideInput) {
9188

9289
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
9390
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
94-
obj = ((Timestamp) obj).getTime();
91+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
92+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
9593
}
94+
9695
row.setField(entry.getKey(), obj);
9796
}
9897

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import java.util.ArrayList;
6161
import java.util.List;
6262
import java.util.Map;
63+
import java.util.TimeZone;
6364

6465
/**
6566
* Reason:
@@ -73,6 +74,8 @@ public class CassandraAsyncReqRow extends BaseAsyncReqRow {
7374

7475
private static final Logger LOG = LoggerFactory.getLogger(CassandraAsyncReqRow.class);
7576

77+
private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
78+
7679
private final static int DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE = 10;
7780

7881
private final static int DEFAULT_VERTX_WORKER_POOL_SIZE = 20;
@@ -251,7 +254,8 @@ public Row fillData(Row input, Object line) {
251254
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
252255

253256
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
254-
obj = ((Timestamp) obj).getTime();
257+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
258+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
255259
}
256260

257261
row.setField(entry.getKey(), obj);

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.calcite.sql.JoinType;
3333

3434
import java.sql.SQLException;
35+
import java.util.TimeZone;
3536
import java.util.concurrent.ScheduledExecutorService;
3637
import java.util.concurrent.ScheduledThreadPoolExecutor;
3738
import java.util.concurrent.TimeUnit;
@@ -49,6 +50,8 @@ public abstract class BaseAllReqRow extends RichFlatMapFunction<CRow, CRow> impl
4950

5051
public static final long LOAD_DATA_ERROR_SLEEP_TIME = 5_000L;
5152

53+
public static final TimeZone LOCAL_TZ = TimeZone.getDefault();
54+
5255
protected BaseSideInfo sideInfo;
5356

5457
private ScheduledExecutorService es;

elasticsearch6/elasticsearch6-side/elasticsearch6-all-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AllReqRow.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,11 @@ public Row fillData(Row input, Object sideInput) {
106106

107107
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
108108
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
109-
obj = ((Timestamp) obj).getTime();
109+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
110+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
110111
}
111112

113+
112114
row.setField(entry.getKey(), obj);
113115
}
114116

elasticsearch6/elasticsearch6-side/elasticsearch6-async-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AsyncReqRow.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.sql.Timestamp;
5858
import java.util.List;
5959
import java.util.Map;
60+
import java.util.TimeZone;
6061

6162
/**
6263
* @author yinxi
@@ -65,6 +66,7 @@
6566
public class Elasticsearch6AsyncReqRow extends BaseAsyncReqRow implements Serializable {
6667

6768
private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6AsyncReqRow.class);
69+
private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
6870
private transient RestHighLevelClient rhlClient;
6971
private SearchRequest searchRequest;
7072
private List<String> sqlJoinCompareOperate = Lists.newArrayList();
@@ -195,7 +197,8 @@ public Row fillData(Row input, Object line) {
195197
Object obj = input.getField(entry.getValue());
196198
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
197199
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
198-
obj = ((Timestamp) obj).getTime();
200+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
201+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
199202
}
200203

201204
row.setField(entry.getKey(), obj);

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,10 @@ public Row fillData(Row input, Object sideInput) {
9898

9999
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
100100
if(obj instanceof Timestamp && isTimeIndicatorTypeInfo){
101-
obj = ((Timestamp)obj).getTime();
101+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
102+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
102103
}
104+
103105
row.setField(entry.getKey(), obj);
104106
}
105107

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import java.util.Collections;
5454
import java.util.List;
5555
import java.util.Map;
56+
import java.util.TimeZone;
5657
import java.util.concurrent.ExecutorService;
5758
import java.util.concurrent.LinkedBlockingQueue;
5859
import java.util.concurrent.ThreadPoolExecutor;
@@ -68,6 +69,8 @@ public class HbaseAsyncReqRow extends BaseAsyncReqRow {
6869

6970
private static final long serialVersionUID = 2098635104857937717L;
7071

72+
private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
73+
7174
private static final Logger LOG = LoggerFactory.getLogger(HbaseAsyncReqRow.class);
7275

7376
//match to the rule of netty3
@@ -164,7 +167,8 @@ public Row fillData(Row input, Object sideInput){
164167
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
165168

166169
if(obj instanceof Timestamp && isTimeIndicatorTypeInfo){
167-
obj = ((Timestamp)obj).getTime();
170+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
171+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
168172
}
169173

170174
row.setField(entry.getKey(), obj);

kudu/kudu-side/kudu-all-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAllReqRow.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,10 @@ public Row fillData(Row input, Object sideInput) {
7676

7777
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
7878
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
79-
obj = ((Timestamp) obj).getTime();
79+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
80+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
8081
}
82+
8183
row.setField(entry.getKey(), obj);
8284
}
8385

kudu/kudu-side/kudu-async-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAsyncReqRow.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,14 @@
3636
import org.slf4j.LoggerFactory;
3737

3838
import java.sql.Timestamp;
39-
import java.util.Arrays;
40-
import java.util.Collections;
41-
import java.util.List;
42-
import java.util.Map;
39+
import java.util.*;
4340

4441
public class KuduAsyncReqRow extends BaseAsyncReqRow {
4542

4643
private static final Logger LOG = LoggerFactory.getLogger(KuduAsyncReqRow.class);
44+
45+
private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
46+
4747
/**
4848
* 获取连接的尝试次数
4949
*/
@@ -184,6 +184,8 @@ public Row fillData(Row input, Object sideInput) {
184184
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
185185
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
186186
obj = ((Timestamp) obj).getTime();
187+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
188+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
187189
}
188190
row.setField(entry.getKey(), obj);
189191
}

mongo/mongo-side/mongo-all-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAllReqRow.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,10 @@ public Row fillData(Row input, Object sideInput) {
8888

8989
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
9090
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
91-
obj = ((Timestamp) obj).getTime();
91+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
92+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
9293
}
94+
9395
row.setField(entry.getKey(), obj);
9496
}
9597

0 commit comments

Comments
 (0)