Skip to content

Commit fa0a242

Browse files
committed
resolve conflicts
1 parent 5553ee5 commit fa0a242

File tree

15 files changed

+71
-99
lines changed

15 files changed

+71
-99
lines changed

core/pom.xml

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -105,43 +105,12 @@
105105
<version>${guava.version}</version>
106106
</dependency>
107107

108-
<dependency>
109-
<groupId>org.apache.flink</groupId>
110-
<artifactId>flink-cep-scala_2.11</artifactId>
111-
<version>${flink.version}</version>
112-
</dependency>
113-
114-
<dependency>
115-
<groupId>org.apache.flink</groupId>
116-
<artifactId>flink-scala_2.11</artifactId>
117-
<version>${flink.version}</version>
118-
</dependency>
119-
120-
<dependency>
121-
<groupId>org.apache.flink</groupId>
122-
<artifactId>flink-yarn_2.11</artifactId>
123-
<version>${flink.version}</version>
124-
<exclusions>
125-
<exclusion>
126-
<groupId>org.apache.flink</groupId>
127-
<artifactId>flink-shaded-hadoop2</artifactId>
128-
</exclusion>
129-
</exclusions>
130-
</dependency>
131-
132108
<dependency>
133109
<groupId>org.apache.flink</groupId>
134110
<artifactId>flink-shaded-hadoop2</artifactId>
135111
<version>2.7.5-1.8.1</version>
136112
</dependency>
137113

138-
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-statebackend-rocksdb -->
139-
<dependency>
140-
<groupId>org.apache.flink</groupId>
141-
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
142-
<version>${flink.version}</version>
143-
</dependency>
144-
145114
<dependency>
146115
<groupId>junit</groupId>
147116
<artifactId>junit</artifactId>
@@ -154,7 +123,6 @@
154123
<artifactId>joda-time</artifactId>
155124
<version>2.5</version>
156125
</dependency>
157-
158126
</dependencies>
159127

160128
<build>

core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,12 @@
2121
package com.dtstack.flink.sql.parser;
2222

2323
import com.dtstack.flink.sql.util.DtStringUtil;
24-
import org.apache.calcite.sql.*;
24+
import org.apache.calcite.sql.SqlBasicCall;
25+
import org.apache.calcite.sql.SqlJoin;
26+
import org.apache.calcite.sql.SqlKind;
27+
import org.apache.calcite.sql.SqlNode;
28+
import org.apache.calcite.sql.SqlSelect;
29+
import org.apache.calcite.sql.SqlMatchRecognize;
2530
import com.google.common.collect.Lists;
2631
import java.util.List;
2732
import java.util.regex.Matcher;

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableParser.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
* limitations under the License.
1717
*/
1818

19+
20+
1921
package com.dtstack.flink.sql.table;
2022

2123
import com.dtstack.flink.sql.util.ClassUtil;
@@ -44,7 +46,7 @@ public abstract class AbstractTableParser {
4446
private static final String CHAR_TYPE_NO_LENGTH = "CHAR";
4547

4648
private static Pattern primaryKeyPattern = Pattern.compile("(?i)PRIMARY\\s+KEY\\s*\\((.*)\\)");
47-
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(.+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
49+
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
4850
private static Pattern physicalFieldFunPattern = Pattern.compile("\\w+\\((\\w+)\\)$");
4951
private static Pattern charTypePattern = Pattern.compile("(?i)CHAR\\((\\d*)\\)$");
5052

@@ -82,14 +84,13 @@ public boolean dealKeyPattern(String fieldRow, AbstractTableInfo tableInfo){
8284
return false;
8385
}
8486

85-
public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo) {
87+
public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo){
8688

8789
List<String> fieldRows = DtStringUtil.splitIgnoreQuota(fieldsInfo, ',');
88-
89-
for (String fieldRow : fieldRows) {
90+
for(String fieldRow : fieldRows){
9091
fieldRow = fieldRow.trim();
9192

92-
if (StringUtils.isBlank(fieldRow)) {
93+
if(StringUtils.isBlank(fieldRow)){
9394
throw new RuntimeException(String.format("table [%s],exists field empty.", tableInfo.getName()));
9495
}
9596

@@ -131,7 +132,7 @@ public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo) {
131132
tableInfo.finish();
132133
}
133134

134-
public void dealPrimaryKey(Matcher matcher, AbstractTableInfo tableInfo) {
135+
public void dealPrimaryKey(Matcher matcher, AbstractTableInfo tableInfo){
135136
String primaryFields = matcher.group(1).trim();
136137
String[] splitArry = primaryFields.split(",");
137138
List<String> primaryKes = Lists.newArrayList(splitArry);
@@ -170,5 +171,4 @@ protected void addParserHandler(String parserName, Pattern pattern, ITableFieldD
170171
patternMap.put(parserName, pattern);
171172
handlerMap.put(parserName, handler);
172173
}
173-
174174
}

elasticsearch6/elasticsearch6-side/elasticsearch6-all-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AllReqRow.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import com.google.common.collect.Maps;
3030
import org.apache.commons.collections.CollectionUtils;
3131
import org.apache.commons.lang3.StringUtils;
32-
import org.apache.flink.api.java.tuple.Tuple2;
3332
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3433
import org.apache.flink.table.dataformat.BaseRow;
3534
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
@@ -107,11 +106,9 @@ public Row fillData(Row input, Object sideInput) {
107106

108107
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
109108
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
110-
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
111-
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
109+
obj = ((Timestamp) obj).getTime();
112110
}
113111

114-
115112
row.setField(entry.getKey(), obj);
116113
}
117114

elasticsearch6/elasticsearch6-side/elasticsearch6-async-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AsyncReqRow.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package com.dtstack.flink.sql.side.elasticsearch6;
2020

2121
import com.dtstack.flink.sql.util.RowDataComplete;
22-
import org.apache.flink.api.java.tuple.Tuple2;
2322
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2423
import org.apache.flink.configuration.Configuration;
2524
import org.apache.flink.streaming.api.functions.async.ResultFuture;
@@ -54,7 +53,6 @@
5453
import java.sql.Timestamp;
5554
import java.util.List;
5655
import java.util.Map;
57-
import java.util.TimeZone;
5856

5957
/**
6058
* @author yinxi
@@ -63,7 +61,6 @@
6361
public class Elasticsearch6AsyncReqRow extends BaseAsyncReqRow implements Serializable {
6462

6563
private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6AsyncReqRow.class);
66-
private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
6764
private transient RestHighLevelClient rhlClient;
6865
private SearchRequest searchRequest;
6966
private List<String> sqlJoinCompareOperate = Lists.newArrayList();
@@ -194,8 +191,7 @@ public Row fillData(Row input, Object line) {
194191
Object obj = input.getField(entry.getValue());
195192
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
196193
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
197-
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
198-
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
194+
obj = ((Timestamp) obj).getTime();
199195
}
200196

201197
row.setField(entry.getKey(), obj);

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
import com.dtstack.flink.sql.util.RowDataComplete;
3030
import org.apache.calcite.sql.JoinType;
3131
import org.apache.commons.collections.map.HashedMap;
32-
import org.apache.flink.api.java.tuple.Tuple2;
33-
import org.apache.commons.lang.StringUtils;
3432
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3533
import com.google.common.collect.Maps;
3634
import org.apache.flink.table.dataformat.BaseRow;
@@ -40,7 +38,6 @@
4038
import org.apache.hadoop.conf.Configuration;
4139
import org.apache.hadoop.hbase.Cell;
4240
import org.apache.hadoop.hbase.CellUtil;
43-
import org.apache.hadoop.hbase.HBaseConfiguration;
4441
import org.apache.hadoop.hbase.TableName;
4542
import org.apache.hadoop.hbase.client.Connection;
4643
import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -53,7 +50,6 @@
5350
import org.slf4j.Logger;
5451
import org.slf4j.LoggerFactory;
5552

56-
import java.io.File;
5753
import java.io.IOException;
5854

5955
import java.security.PrivilegedAction;
@@ -104,7 +100,6 @@ public Row fillData(Row input, Object sideInput) {
104100
if (obj instanceof LocalDateTime && isTimeIndicatorTypeInfo) {
105101
obj = Timestamp.valueOf(((LocalDateTime) obj));
106102
}
107-
108103
row.setField(entry.getKey(), obj);
109104
}
110105

hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/utils/HbaseConfigUtils.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.util.Arrays;
3434
import java.util.List;
3535
import java.util.Map;
36-
import java.util.UUID;
3736

3837
/**
3938
*

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseConfigUtils.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package com.dtstack.flink.sql.sink.hbase;
2020

2121
import org.apache.hadoop.conf.Configuration;
22-
import org.apache.hadoop.security.HadoopKerberosName;
2322
import org.apache.hadoop.security.UserGroupInformation;
2423
import org.apache.hadoop.security.authentication.util.KerberosName;
2524
import org.slf4j.Logger;

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/table/HbaseSinkParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public void parseFieldsInfo(String fieldsInfo, HbaseTableInfo tableInfo){
129129
}
130130

131131
private Map<String, String> parseColumnFamily(Map<String, String> physicalFieldMap){
132-
Map<String, String> columnFamiles = new LinkedHashMap<>();
132+
Map<String, String> columnFamiles = Maps.newHashMap();
133133
physicalFieldMap.values().forEach(x -> {
134134
String[] columnFamily = StringUtils.split(x.trim(), ":");
135135
columnFamiles.put(x, columnFamily[1]);

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/AbstractKafkaSource.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.dtstack.flink.sql.source.IStreamSourceGener;
2222
import com.dtstack.flink.sql.source.kafka.enums.EKafkaOffset;
2323
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
24-
import com.dtstack.flink.sql.util.DataTypeUtils;
2524
import com.dtstack.flink.sql.util.DtStringUtil;
2625
import com.dtstack.flink.sql.util.PluginUtil;
2726
import org.apache.commons.lang3.StringUtils;
@@ -34,7 +33,6 @@
3433
import org.apache.flink.types.Row;
3534
import org.apache.kafka.clients.consumer.ConsumerConfig;
3635

37-
import java.lang.reflect.Array;
3836
import java.util.Map;
3937
import java.util.Properties;
4038
import java.util.stream.Collectors;
@@ -74,16 +72,9 @@ protected String generateOperatorName(String tabName, String topicName) {
7472
}
7573

7674
protected TypeInformation<Row> getRowTypeInformation(KafkaSourceTableInfo kafkaSourceTableInfo) {
77-
String[] fieldTypes = kafkaSourceTableInfo.getFieldTypes();
7875
Class<?>[] fieldClasses = kafkaSourceTableInfo.getFieldClasses();
79-
TypeInformation[] types =
80-
IntStream.range(0, fieldClasses.length)
81-
.mapToObj(i -> {
82-
if (fieldClasses[i].isArray()) {
83-
return DataTypeUtils.convertToArray(fieldTypes[i]);
84-
}
85-
return TypeInformation.of(fieldClasses[i]);
86-
})
76+
TypeInformation[] types = IntStream.range(0, fieldClasses.length)
77+
.mapToObj(i -> TypeInformation.of(fieldClasses[i]))
8778
.toArray(TypeInformation[]::new);
8879

8980
return new RowTypeInfo(types, kafkaSourceTableInfo.getFields());

0 commit comments

Comments
 (0)