Skip to content

Commit a926ddd

Browse files
author
sishu@dtstack.com
committed
abstract CustomerWaterMarker
1 parent e89bb80 commit a926ddd

File tree

3 files changed

+20
-29
lines changed

3 files changed

+20
-29
lines changed

core/src/main/java/com/dtstack/flink/sql/watermarker/AbsCustomerWaterMarker.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@
2121

2222
import com.dtstack.flink.sql.metric.EventDelayGauge;
2323
import com.dtstack.flink.sql.metric.MetricConstant;
24+
import com.dtstack.flink.sql.util.MathUtil;
2425
import org.apache.flink.api.common.functions.IterationRuntimeContext;
2526
import org.apache.flink.api.common.functions.RichFunction;
2627
import org.apache.flink.api.common.functions.RuntimeContext;
2728
import org.apache.flink.configuration.Configuration;
2829
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
2930
import org.apache.flink.streaming.api.windowing.time.Time;
3031

32+
import java.util.TimeZone;
33+
3134
/**
3235
* Reason:
3336
* Date: 2018/10/18
@@ -46,6 +49,12 @@ public abstract class AbsCustomerWaterMarker<T> extends BoundedOutOfOrdernessTim
4649

4750
protected transient EventDelayGauge eventDelayGauge;
4851

52+
protected int pos;
53+
54+
protected long lastTime = 0;
55+
56+
protected TimeZone timezone;
57+
4958
public AbsCustomerWaterMarker(Time maxOutOfOrderness) {
5059
super(maxOutOfOrderness);
5160
}
@@ -90,4 +99,13 @@ public void setRuntimeContext(RuntimeContext t) {
9099
public void setFromSourceTag(String fromSourceTag) {
91100
this.fromSourceTag = fromSourceTag;
92101
}
102+
103+
protected long getExtractTimestamp(Long extractTime){
104+
105+
lastTime = extractTime + timezone.getOffset(extractTime);
106+
107+
eventDelayGauge.setDelayTime(MathUtil.getIntegerVal((System.currentTimeMillis() - extractTime)/1000));
108+
109+
return lastTime;
110+
}
93111
}

core/src/main/java/com/dtstack/flink/sql/watermarker/CustomerWaterMarkerForLong.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,6 @@ public class CustomerWaterMarkerForLong extends AbsCustomerWaterMarker<Row> {
3939

4040
private static final Logger logger = LoggerFactory.getLogger(CustomerWaterMarkerForLong.class);
4141

42-
private static final long serialVersionUID = 1L;
43-
44-
private int pos;
45-
46-
private long lastTime = 0;
47-
48-
private TimeZone timezone;
49-
5042
public CustomerWaterMarkerForLong(Time maxOutOfOrderness, int pos,String timezone) {
5143
super(maxOutOfOrderness);
5244
this.pos = pos;
@@ -58,12 +50,7 @@ public long extractTimestamp(Row row) {
5850

5951
try{
6052
Long extractTime = MathUtil.getLongVal(row.getField(pos));
61-
62-
lastTime = extractTime + timezone.getOffset(extractTime);
63-
64-
eventDelayGauge.setDelayTime(MathUtil.getIntegerVal((System.currentTimeMillis() - extractTime)/1000));
65-
66-
return lastTime;
53+
return getExtractTimestamp(extractTime);
6754
}catch (Exception e){
6855
logger.error("", e);
6956
}

core/src/main/java/com/dtstack/flink/sql/watermarker/CustomerWaterMarkerForTimeStamp.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,6 @@ public class CustomerWaterMarkerForTimeStamp extends AbsCustomerWaterMarker<Row>
4040

4141
private static final Logger logger = LoggerFactory.getLogger(CustomerWaterMarkerForTimeStamp.class);
4242

43-
private static final long serialVersionUID = 1L;
44-
45-
private int pos;
46-
47-
private long lastTime = 0;
48-
49-
private TimeZone timezone;
50-
5143
public CustomerWaterMarkerForTimeStamp(Time maxOutOfOrderness, int pos,String timezone) {
5244
super(maxOutOfOrderness);
5345
this.pos = pos;
@@ -58,14 +50,8 @@ public CustomerWaterMarkerForTimeStamp(Time maxOutOfOrderness, int pos,String ti
5850
public long extractTimestamp(Row row) {
5951
try {
6052
Timestamp time = (Timestamp) row.getField(pos);
61-
6253
long extractTime=time.getTime();
63-
64-
lastTime = extractTime + timezone.getOffset(extractTime);
65-
66-
eventDelayGauge.setDelayTime(MathUtil.getIntegerVal((System.currentTimeMillis() - extractTime)/1000));
67-
68-
return lastTime;
54+
return getExtractTimestamp(extractTime);
6955
} catch (RuntimeException e) {
7056
logger.error("", e);
7157
}

0 commit comments

Comments
 (0)