Skip to content

Commit 212ad6f

Browse files
committed
Merge remote-tracking branch 'origin/1.10_release_4.0.x' into 1.10_release_4.0.x
2 parents 801fd6f + e6a4c6a commit 212ad6f

File tree

15 files changed

+529
-14
lines changed

15 files changed

+529
-14
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import com.dtstack.flink.sql.table.AbstractTargetTableInfo;
4343
import com.dtstack.flink.sql.util.DtStringUtil;
4444
import com.dtstack.flink.sql.util.PluginUtil;
45+
import com.dtstack.flink.sql.util.TypeInfoDataTypeConverter;
4546
import com.dtstack.flink.sql.watermarker.WaterMarkerAssigner;
4647
import com.fasterxml.jackson.databind.ObjectMapper;
4748
import com.google.common.base.Preconditions;
@@ -64,6 +65,7 @@
6465
import org.apache.flink.table.api.java.StreamTableEnvironment;
6566
import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
6667
import org.apache.flink.table.sinks.TableSink;
68+
import org.apache.flink.table.types.DataType;
6769
import org.slf4j.Logger;
6870
import org.slf4j.LoggerFactory;
6971

@@ -80,6 +82,7 @@
8082
import java.util.Properties;
8183
import java.util.Set;
8284
import java.util.TimeZone;
85+
import java.util.stream.Stream;
8386

8487
/**
8588
* 任务执行时的流程方法
@@ -295,7 +298,7 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
295298
String adaptSql = sourceTableInfo.getAdaptSelectSql();
296299
Table adaptTable = adaptSql == null ? table : tableEnv.sqlQuery(adaptSql);
297300

298-
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getFieldTypes(), adaptTable.getSchema().getFieldNames());
301+
RowTypeInfo typeInfo = new RowTypeInfo(fromDataTypeToLegacyInfo(adaptTable.getSchema().getFieldDataTypes()), adaptTable.getSchema().getFieldNames());
299302
DataStream adaptStream = tableEnv.toAppendStream(adaptTable, typeInfo);
300303

301304
String fields = String.join(",", typeInfo.getFieldNames());
@@ -389,4 +392,10 @@ private static void timeZoneCheck(String timeZone) {
389392
throw new IllegalArgumentException(String.format(" timezone of %s is Incorrect!", timeZone));
390393
}
391394
}
395+
396+
private static TypeInformation<?>[] fromDataTypeToLegacyInfo(DataType[] dataType) {
397+
return Stream.of(dataType)
398+
.map(TypeInfoDataTypeConverter::toLegacyTypeInfo)
399+
.toArray(TypeInformation[]::new);
400+
}
392401
}

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.time.LocalDateTime;
3838
import java.util.Map;
3939
import java.util.TimeZone;
40+
import java.util.TimeZone;
4041
import java.util.concurrent.ScheduledExecutorService;
4142
import java.util.concurrent.ScheduledThreadPoolExecutor;
4243
import java.util.concurrent.TimeUnit;
@@ -45,6 +46,7 @@
4546
* Reason:
4647
* Date: 2018/9/18
4748
* Company: www.dtstack.com
49+
*
4850
* @author xuchao
4951
*/
5052

@@ -86,7 +88,8 @@ protected Object convertTimeIndictorTypeInfo(Integer index, Object obj) {
8688

8789
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
8890
if (obj instanceof LocalDateTime && isTimeIndicatorTypeInfo) {
89-
obj = Timestamp.valueOf(((LocalDateTime) obj));
91+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
92+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
9093
}
9194
return obj;
9295
}

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.util.Collections;
5353
import java.util.List;
5454
import java.util.Map;
55+
import java.util.TimeZone;
5556
import java.util.concurrent.ScheduledFuture;
5657

5758
/**
@@ -71,6 +72,7 @@ public abstract class BaseAsyncReqRow extends RichAsyncFunction<Row, BaseRow> im
7172
private int timeOutNum = 0;
7273
protected BaseSideInfo sideInfo;
7374
protected transient Counter parseErrorRecords;
75+
private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
7476

7577
public BaseAsyncReqRow(BaseSideInfo sideInfo) {
7678
this.sideInfo = sideInfo;
@@ -117,7 +119,8 @@ protected Object convertTimeIndictorTypeInfo(Integer index, Object obj) {
117119

118120
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
119121
if (obj instanceof LocalDateTime && isTimeIndicatorTypeInfo) {
120-
obj = Timestamp.valueOf(((LocalDateTime) obj));
122+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
123+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
121124
}
122125
return obj;
123126
}

core/src/main/java/com/dtstack/flink/sql/table/AbstractSourceTableInfo.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,18 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.table;
2222

2323
import com.google.common.base.Strings;
24+
import com.google.common.collect.Lists;
2425
import com.google.common.collect.Maps;
26+
import org.apache.flink.util.StringUtils;
2527

28+
import java.util.ArrayList;
2629
import java.util.Map;
30+
import java.util.TimeZone;
2731

2832
/**
2933
* Reason:
@@ -36,6 +40,10 @@ public abstract class AbstractSourceTableInfo extends AbstractTableInfo {
3640

3741
public static final String SOURCE_SUFFIX = "Source";
3842

43+
public static final String TIME_ZONE_KEY = "timezone";
44+
45+
private String timeZone = TimeZone.getDefault().getID();
46+
3947
private String eventTimeField;
4048

4149
private Integer maxOutOrderness = 10;
@@ -101,4 +109,23 @@ public String getAdaptSelectSql(){
101109
public String getAdaptName(){
102110
return getName() + "_adapt";
103111
}
112+
113+
public String getTimeZone() {
114+
return timeZone;
115+
}
116+
117+
public void setTimeZone(String timeZone) {
118+
if (StringUtils.isNullOrWhitespaceOnly(timeZone)){
119+
return;
120+
}
121+
timeZoneCheck(timeZone);
122+
this.timeZone = timeZone;
123+
}
124+
125+
private void timeZoneCheck(String timeZone) {
126+
ArrayList<String> zones = Lists.newArrayList(TimeZone.getAvailableIDs());
127+
if (!zones.contains(timeZone)){
128+
throw new IllegalArgumentException(" timezone is Incorrect!");
129+
}
130+
}
104131
}

0 commit comments

Comments
 (0)