Skip to content

Commit 006236b

Browse files
committed
[fix] fix timezone
1 parent 25fadc4 commit 006236b

File tree

12 files changed

+62
-12
lines changed

12 files changed

+62
-12
lines changed

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.sql.SQLException;
3535
import java.sql.Timestamp;
3636
import java.time.LocalDateTime;
37+
import java.util.TimeZone;
3738
import java.util.concurrent.ScheduledExecutorService;
3839
import java.util.concurrent.ScheduledThreadPoolExecutor;
3940
import java.util.concurrent.TimeUnit;
@@ -52,6 +53,8 @@ public abstract class BaseAllReqRow extends RichFlatMapFunction<Row, BaseRow> im
5253

5354
public static final long LOAD_DATA_ERROR_SLEEP_TIME = 5_000L;
5455

56+
public static final TimeZone LOCAL_TZ = TimeZone.getDefault();
57+
5558
protected BaseSideInfo sideInfo;
5659

5760
private ScheduledExecutorService es;
@@ -82,7 +85,8 @@ protected Object convertTimeIndictorTypeInfo(Integer index, Object obj) {
8285

8386
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
8487
if (obj instanceof LocalDateTime && isTimeIndicatorTypeInfo) {
85-
obj = Timestamp.valueOf(((LocalDateTime) obj));
88+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
89+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
8690
}
8791
return obj;
8892
}

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.util.Collections;
5353
import java.util.List;
5454
import java.util.Map;
55+
import java.util.TimeZone;
5556
import java.util.concurrent.ScheduledFuture;
5657

5758
/**
@@ -71,6 +72,7 @@ public abstract class BaseAsyncReqRow extends RichAsyncFunction<Row, BaseRow> im
7172
private int timeOutNum = 0;
7273
protected BaseSideInfo sideInfo;
7374
protected transient Counter parseErrorRecords;
75+
private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
7476

7577
public BaseAsyncReqRow(BaseSideInfo sideInfo) {
7678
this.sideInfo = sideInfo;
@@ -117,7 +119,8 @@ protected Object convertTimeIndictorTypeInfo(Integer index, Object obj) {
117119

118120
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
119121
if (obj instanceof LocalDateTime && isTimeIndicatorTypeInfo) {
120-
obj = Timestamp.valueOf(((LocalDateTime) obj));
122+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
123+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
121124
}
122125
return obj;
123126
}

core/src/main/java/com/dtstack/flink/sql/table/AbstractSourceTableInfo.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,18 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.table;
2222

2323
import com.google.common.base.Strings;
24+
import com.google.common.collect.Lists;
2425
import com.google.common.collect.Maps;
26+
import org.apache.flink.util.StringUtils;
2527

28+
import java.util.ArrayList;
2629
import java.util.Map;
30+
import java.util.TimeZone;
2731

2832
/**
2933
* Reason:
@@ -36,6 +40,10 @@ public abstract class AbstractSourceTableInfo extends AbstractTableInfo {
3640

3741
public static final String SOURCE_SUFFIX = "Source";
3842

43+
public static final String TIME_ZONE_KEY = "timezone";
44+
45+
private String timeZone = TimeZone.getDefault().getID();
46+
3947
private String eventTimeField;
4048

4149
private Integer maxOutOrderness = 10;
@@ -101,4 +109,23 @@ public String getAdaptSelectSql(){
101109
public String getAdaptName(){
102110
return getName() + "_adapt";
103111
}
112+
113+
public String getTimeZone() {
114+
return timeZone;
115+
}
116+
117+
public void setTimeZone(String timeZone) {
118+
if (StringUtils.isNullOrWhitespaceOnly(timeZone)){
119+
return;
120+
}
121+
timeZoneCheck(timeZone);
122+
this.timeZone = timeZone;
123+
}
124+
125+
private void timeZoneCheck(String timeZone) {
126+
ArrayList<String> zones = Lists.newArrayList(TimeZone.getAvailableIDs());
127+
if (!zones.contains(timeZone)){
128+
throw new IllegalArgumentException(" timezone is Incorrect!");
129+
}
130+
}
104131
}

core/src/main/java/com/dtstack/flink/sql/watermarker/AbstractCustomerWaterMarker.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
3030
import org.apache.flink.streaming.api.windowing.time.Time;
3131

32+
import java.util.TimeZone;
33+
3234
/**
3335
* Reason:
3436
* Date: 2018/10/18
@@ -51,6 +53,8 @@ public abstract class AbstractCustomerWaterMarker<T> extends BoundedOutOfOrderne
5153

5254
protected long lastTime = 0;
5355

56+
protected TimeZone timezone;
57+
5458
public AbstractCustomerWaterMarker(Time maxOutOfOrderness) {
5559
super(maxOutOfOrderness);
5660
}
@@ -98,7 +102,8 @@ public void setFromSourceTag(String fromSourceTag) {
98102

99103
protected long getExtractTimestamp(Long extractTime){
100104

101-
lastTime = extractTime;
105+
lastTime = extractTime + timezone.getOffset(extractTime);
106+
102107
eventDelayGauge.setDelayTime(MathUtil.getIntegerVal((System.currentTimeMillis() - extractTime)/1000));
103108

104109
return lastTime;

core/src/main/java/com/dtstack/flink/sql/watermarker/CustomerWaterMarkerForLong.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.slf4j.Logger;
2727
import org.slf4j.LoggerFactory;
2828

29+
import java.util.TimeZone;
30+
2931
/**
3032
* Custom watermark --- for eventtime
3133
* Date: 2017/12/28
@@ -37,9 +39,10 @@ public class CustomerWaterMarkerForLong extends AbstractCustomerWaterMarker<Row>
3739

3840
private static final Logger logger = LoggerFactory.getLogger(CustomerWaterMarkerForLong.class);
3941

40-
public CustomerWaterMarkerForLong(Time maxOutOfOrderness, int pos) {
42+
public CustomerWaterMarkerForLong(Time maxOutOfOrderness, int pos, String timezone) {
4143
super(maxOutOfOrderness);
4244
this.pos = pos;
45+
this.timezone= TimeZone.getTimeZone(timezone);
4346
}
4447

4548
@Override

core/src/main/java/com/dtstack/flink/sql/watermarker/CustomerWaterMarkerForTimeStamp.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.watermarker;
2222

@@ -39,9 +39,10 @@ public class CustomerWaterMarkerForTimeStamp extends AbstractCustomerWaterMarker
3939

4040
private static final Logger logger = LoggerFactory.getLogger(CustomerWaterMarkerForTimeStamp.class);
4141

42-
public CustomerWaterMarkerForTimeStamp(Time maxOutOfOrderness, int pos) {
42+
public CustomerWaterMarkerForTimeStamp(Time maxOutOfOrderness, int pos,String timezone) {
4343
super(maxOutOfOrderness);
4444
this.pos = pos;
45+
this.timezone= TimeZone.getTimeZone(timezone);
4546
}
4647

4748
@Override

core/src/main/java/com/dtstack/flink/sql/watermarker/WaterMarkerAssigner.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ public DataStream assignWaterMarker(DataStream<Row> dataStream, RowTypeInfo type
4848

4949
int maxOutOrderness = sourceTableInfo.getMaxOutOrderness();
5050

51+
String timeZone = sourceTableInfo.getTimeZone();
52+
5153
String[] fieldNames = typeInfo.getFieldNames();
5254
TypeInformation<?>[] fieldTypes = typeInfo.getFieldTypes();
5355

@@ -69,9 +71,9 @@ public DataStream assignWaterMarker(DataStream<Row> dataStream, RowTypeInfo type
6971

7072
AbstractCustomerWaterMarker waterMarker = null;
7173
if(fieldType.getTypeClass().isAssignableFrom(Timestamp.class)){
72-
waterMarker = new CustomerWaterMarkerForTimeStamp(Time.milliseconds(maxOutOrderness), pos);
74+
waterMarker = new CustomerWaterMarkerForTimeStamp(Time.milliseconds(maxOutOrderness), pos, timeZone);
7375
}else if(fieldType.getTypeClass().isAssignableFrom(Long.class)){
74-
waterMarker = new CustomerWaterMarkerForLong(Time.milliseconds(maxOutOrderness), pos);
76+
waterMarker = new CustomerWaterMarkerForLong(Time.milliseconds(maxOutOrderness), pos, timeZone);
7577
}else{
7678
throw new IllegalArgumentException("not support type of " + fieldType + ", current only support(timestamp, long).");
7779
}

elasticsearch6/elasticsearch6-side/elasticsearch6-all-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AllReqRow.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,8 @@ public Row fillData(Row input, Object sideInput) {
106106

107107
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
108108
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
109-
obj = ((Timestamp) obj).getTime();
109+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
110+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
110111
}
111112

112113
row.setField(entry.getKey(), obj);

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@ public Row fillData(Row input, Object sideInput) {
100100

101101
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
102102
if (obj instanceof LocalDateTime && isTimeIndicatorTypeInfo) {
103-
obj = Timestamp.valueOf(((LocalDateTime) obj));
103+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
104+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
104105
}
105106

106107
row.setField(entry.getKey(), obj);

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
5555
kafkaSourceTableInfo.setSchemaString(MathUtil.getString(props.get(KafkaSourceTableInfo.SCHEMA_STRING_KEY.toLowerCase())));
5656
kafkaSourceTableInfo.setFieldDelimiter(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.CSV_FIELD_DELIMITER_KEY.toLowerCase(), "|")));
5757
kafkaSourceTableInfo.setSourceDataType(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.SOURCE_DATA_TYPE_KEY.toLowerCase(), FormatType.DT_NEST.name())));
58+
kafkaSourceTableInfo.setTimeZone(MathUtil.getString(props.get(KafkaSourceTableInfo.TIME_ZONE_KEY.toLowerCase())));
5859

5960
if(props.containsKey(KafkaSourceTableInfo.TIMESTAMP_OFFSET.toLowerCase())){
6061
kafkaSourceTableInfo.setTimestampOffset(MathUtil.getLongVal(props.getOrDefault(KafkaSourceTableInfo.TIMESTAMP_OFFSET.toLowerCase(), System.currentTimeMillis())));

0 commit comments

Comments
 (0)