Skip to content

Commit 8cb6d5a

Browse files
author
dapeng
committed
Merge branch '1.8_release_3.10.x' into feat_1.8_hbaseRefactor
# Conflicts: # hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseConfigUtils.java # hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java
2 parents 296f212 + 0ff459b commit 8cb6d5a

File tree

22 files changed

+107
-45
lines changed

22 files changed

+107
-45
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,7 @@
5050
import java.net.InetAddress;
5151
import java.sql.SQLException;
5252
import java.sql.Timestamp;
53-
import java.util.ArrayList;
54-
import java.util.Calendar;
55-
import java.util.List;
56-
import java.util.Map;
53+
import java.util.*;
5754
import java.util.concurrent.atomic.AtomicReference;
5855

5956
/**
@@ -91,8 +88,10 @@ public Row fillData(Row input, Object sideInput) {
9188

9289
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
9390
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
94-
obj = ((Timestamp) obj).getTime();
91+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
92+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
9593
}
94+
9695
row.setField(entry.getKey(), obj);
9796
}
9897

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import java.util.ArrayList;
6161
import java.util.List;
6262
import java.util.Map;
63+
import java.util.TimeZone;
6364

6465
/**
6566
* Reason:
@@ -73,6 +74,8 @@ public class CassandraAsyncReqRow extends BaseAsyncReqRow {
7374

7475
private static final Logger LOG = LoggerFactory.getLogger(CassandraAsyncReqRow.class);
7576

77+
private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
78+
7679
private final static int DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE = 10;
7780

7881
private final static int DEFAULT_VERTX_WORKER_POOL_SIZE = 20;
@@ -251,7 +254,8 @@ public Row fillData(Row input, Object line) {
251254
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
252255

253256
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
254-
obj = ((Timestamp) obj).getTime();
257+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
258+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
255259
}
256260

257261
row.setField(entry.getKey(), obj);

core/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,18 @@
108108
<groupId>org.apache.flink</groupId>
109109
<artifactId>flink-yarn_2.11</artifactId>
110110
<version>${flink.version}</version>
111+
<exclusions>
112+
<exclusion>
113+
<groupId>org.apache.flink</groupId>
114+
<artifactId>flink-shaded-hadoop2</artifactId>
115+
</exclusion>
116+
</exclusions>
117+
</dependency>
118+
119+
<dependency>
120+
<groupId>org.apache.flink</groupId>
121+
<artifactId>flink-shaded-hadoop2</artifactId>
122+
<version>2.7.5-1.8.1</version>
111123
</dependency>
112124

113125
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-statebackend-rocksdb -->

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.calcite.sql.JoinType;
3333

3434
import java.sql.SQLException;
35+
import java.util.TimeZone;
3536
import java.util.concurrent.ScheduledExecutorService;
3637
import java.util.concurrent.ScheduledThreadPoolExecutor;
3738
import java.util.concurrent.TimeUnit;
@@ -49,6 +50,8 @@ public abstract class BaseAllReqRow extends RichFlatMapFunction<CRow, CRow> impl
4950

5051
public static final long LOAD_DATA_ERROR_SLEEP_TIME = 5_000L;
5152

53+
public static final TimeZone LOCAL_TZ = TimeZone.getDefault();
54+
5255
protected BaseSideInfo sideInfo;
5356

5457
private ScheduledExecutorService es;

core/src/main/java/com/dtstack/flink/sql/side/BaseSideInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName){
162162
}
163163
}
164164
if(equalFieldIndex == -1){
165-
throw new RuntimeException("can't find equal field " + rightField);
165+
throw new RuntimeException("can't find equal field " + leftField);
166166
}
167167

168168
equalValIndex.add(equalFieldIndex);

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public abstract class AbstractTableParser {
4444
private static final String CHAR_TYPE_NO_LENGTH = "CHAR";
4545

4646
private static Pattern primaryKeyPattern = Pattern.compile("(?i)PRIMARY\\s+KEY\\s*\\((.*)\\)");
47-
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(.+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
47+
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(.+?)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
4848
private static Pattern physicalFieldFunPattern = Pattern.compile("\\w+\\((\\w+)\\)$");
4949
private static Pattern charTypePattern = Pattern.compile("(?i)CHAR\\((\\d*)\\)$");
5050

core/src/main/java/com/dtstack/flink/sql/util/ClassUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class ClassUtil {
3737
public static Class<?> stringConvertClass(String str) {
3838

3939
// 这部分主要是告诉Class转TypeInfomation的方法,字段是Array类型
40-
String lowerStr = str.toLowerCase();
40+
String lowerStr = str.toLowerCase().trim();
4141
if (lowerStr.startsWith("array")) {
4242
return Array.newInstance(Integer.class, 0).getClass();
4343
}

elasticsearch6/elasticsearch6-side/elasticsearch6-all-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AllReqRow.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,11 @@ public Row fillData(Row input, Object sideInput) {
106106

107107
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
108108
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
109-
obj = ((Timestamp) obj).getTime();
109+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
110+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
110111
}
111112

113+
112114
row.setField(entry.getKey(), obj);
113115
}
114116

elasticsearch6/elasticsearch6-side/elasticsearch6-async-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AsyncReqRow.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.sql.Timestamp;
5858
import java.util.List;
5959
import java.util.Map;
60+
import java.util.TimeZone;
6061

6162
/**
6263
* @author yinxi
@@ -65,6 +66,7 @@
6566
public class Elasticsearch6AsyncReqRow extends BaseAsyncReqRow implements Serializable {
6667

6768
private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6AsyncReqRow.class);
69+
private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
6870
private transient RestHighLevelClient rhlClient;
6971
private SearchRequest searchRequest;
7072
private List<String> sqlJoinCompareOperate = Lists.newArrayList();
@@ -195,7 +197,8 @@ public Row fillData(Row input, Object line) {
195197
Object obj = input.getField(entry.getValue());
196198
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
197199
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
198-
obj = ((Timestamp) obj).getTime();
200+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
201+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
199202
}
200203

201204
row.setField(entry.getKey(), obj);

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,10 @@ public Row fillData(Row input, Object sideInput) {
9191

9292
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
9393
if(obj instanceof Timestamp && isTimeIndicatorTypeInfo){
94-
obj = ((Timestamp)obj).getTime();
94+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
95+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
9596
}
97+
9698
row.setField(entry.getKey(), obj);
9799
}
98100

0 commit comments

Comments
 (0)