Skip to content

Commit f4130eb

Browse files
committed
Merge branch 'hotfix_1.10_4.0.x_30250' into 'v1.10.0_dev'
Hotfix 1.10 4.0.x 30250 See merge request dt-insight-engine/flinkStreamSQL!92
2 parents 044fa68 + 2d45278 commit f4130eb

File tree

11 files changed

+142
-7
lines changed

11 files changed

+142
-7
lines changed

core/src/main/java/com/dtstack/flink/sql/parser/SqlParser.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,11 @@
2828
import com.google.common.collect.Lists;
2929
import com.google.common.base.Strings;
3030

31+
import java.util.ArrayList;
3132
import java.util.List;
3233
import java.util.Set;
34+
import java.util.regex.Matcher;
35+
import java.util.regex.Pattern;
3336

3437
/**
3538
* Reason:
@@ -51,6 +54,8 @@ public static void setLocalSqlPluginRoot(String localSqlPluginRoot){
5154
LOCAL_SQL_PLUGIN_ROOT = localSqlPluginRoot;
5255
}
5356

57+
private static final Pattern ADD_FIlE_PATTERN = Pattern.compile("(?i).*add\\s+file\\s+.+");
58+
5459
/**
5560
* flink support sql syntax
5661
* CREATE TABLE sls_stream() with ();
@@ -70,6 +75,7 @@ public static SqlTree parseSql(String sql, String pluginLoadMode) throws Excepti
7075
.replace("\t", " ").trim();
7176

7277
List<String> sqlArr = DtStringUtil.splitIgnoreQuota(sql, SQL_DELIMITER);
78+
sqlArr = removeAddFileStmt(sqlArr);
7379
SqlTree sqlTree = new SqlTree();
7480
AbstractTableInfoParser tableInfoParser = new AbstractTableInfoParser();
7581
for(String childSql : sqlArr){
@@ -150,4 +156,18 @@ public static SqlTree parseSql(String sql, String pluginLoadMode) throws Excepti
150156

151157
return sqlTree;
152158
}
159+
160+
/**
161+
* remove add file with statment etc. add file /etc/krb5.conf;
162+
*/
163+
private static List<String> removeAddFileStmt(List<String> stmts) {
164+
List<String> cleanedStmts = new ArrayList<>();
165+
for (String stmt : stmts) {
166+
Matcher matcher = ADD_FIlE_PATTERN.matcher(stmt);
167+
if(!matcher.matches()) {
168+
cleanedStmts.add(stmt);
169+
}
170+
}
171+
return cleanedStmts;
172+
}
153173
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.parser;
20+
21+
import org.junit.Assert;
22+
import org.junit.Test;
23+
import org.powermock.reflect.Whitebox;
24+
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
28+
/**
29+
* @program: flink.sql
30+
* @author: wuren
31+
* @create: 2020/09/15
32+
**/
33+
public class SqlParserTest {
34+
35+
@Test
36+
public void testRemoveAddFileStmt() throws Exception {
37+
List<String> rawStmts = new ArrayList<>();
38+
String sql1 = " add file asdasdasd ";
39+
String sql2 = " aDd fIle With asdasdasd ";
40+
String sql3 = " INSERT INTO dwd_foo SELECT id, name FROM ods_foo";
41+
String sql4 = " ADD FILE asb ";
42+
rawStmts.add(sql1);
43+
rawStmts.add(sql2);
44+
rawStmts.add(sql3);
45+
rawStmts.add(sql4);
46+
47+
List<String> stmts = Whitebox.invokeMethod(SqlParser.class, "removeAddFileStmt", rawStmts);
48+
Assert.assertEquals(stmts.get(0), sql3);
49+
}
50+
51+
}

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/AbstractKafkaSource.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ protected Properties getKafkaProperties(KafkaSourceTableInfo kafkaSourceTableInf
5454

5555
if (DtStringUtil.isJson(kafkaSourceTableInfo.getOffsetReset())) {
5656
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, EKafkaOffset.NONE.name().toLowerCase());
57+
} else if(StringUtils.equalsIgnoreCase(EKafkaOffset.TIMESTAMP.name().toLowerCase(), kafkaSourceTableInfo.getOffsetReset())){
58+
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, EKafkaOffset.EARLIEST.name().toLowerCase());
5759
} else {
5860
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaSourceTableInfo.getOffsetReset());
5961
}
@@ -95,10 +97,12 @@ protected void setParallelism(Integer parallelism, DataStreamSource kafkaSource)
9597
}
9698
}
9799

98-
protected void setStartPosition(String offset, String topicName, FlinkKafkaConsumerBase<Row> kafkaSrc) {
100+
protected void setStartPosition(String offset, String topicName, FlinkKafkaConsumerBase<Row> kafkaSrc, Runnable runnable) {
99101
if (StringUtils.equalsIgnoreCase(offset, EKafkaOffset.EARLIEST.name())) {
100102
kafkaSrc.setStartFromEarliest();
101-
} else if (DtStringUtil.isJson(offset)) {
103+
} else if(StringUtils.equalsIgnoreCase(offset, EKafkaOffset.TIMESTAMP.name())) {
104+
runnable.run();
105+
}else if (DtStringUtil.isJson(offset)) {
102106
Map<KafkaTopicPartition, Long> specificStartupOffsets = buildOffsetMap(offset, topicName);
103107
kafkaSrc.setStartFromSpecificOffsets(specificStartupOffsets);
104108
} else {

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/enums/EKafkaOffset.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,6 @@ public enum EKafkaOffset {
2727

2828
LATEST,
2929
EARLIEST,
30+
TIMESTAMP,
3031
NONE
3132
}

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
5656
kafkaSourceTableInfo.setFieldDelimiter(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.CSV_FIELD_DELIMITER_KEY.toLowerCase(), "|")));
5757
kafkaSourceTableInfo.setSourceDataType(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.SOURCE_DATA_TYPE_KEY.toLowerCase(), FormatType.DT_NEST.name())));
5858

59+
if(props.containsKey(KafkaSourceTableInfo.TIMESTAMP_OFFSET.toLowerCase())){
60+
kafkaSourceTableInfo.setTimestampOffset(MathUtil.getLongVal(props.getOrDefault(KafkaSourceTableInfo.TIMESTAMP_OFFSET.toLowerCase(), System.currentTimeMillis())));
61+
}
5962
Map<String, String> kafkaParams = props.keySet().stream()
6063
.filter(key -> !key.isEmpty() && key.startsWith("kafka."))
6164
.collect(Collectors.toMap(

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ public class KafkaSourceTableInfo extends AbstractSourceTableInfo {
5555

5656
public static final String CHARSET_NAME_KEY = "charsetName";
5757

58+
public static final String TIMESTAMP_OFFSET = "timestampOffset";
59+
5860
private String bootstrapServers;
5961

6062
private String topic;
@@ -75,6 +77,8 @@ public class KafkaSourceTableInfo extends AbstractSourceTableInfo {
7577

7678
public String charsetName;
7779

80+
private Long timestampOffset;
81+
7882
public String getBootstrapServers() {
7983
return bootstrapServers;
8084
}
@@ -159,6 +163,14 @@ public void setCharsetName(String charsetName) {
159163
this.charsetName = charsetName;
160164
}
161165

166+
public Long getTimestampOffset() {
167+
return timestampOffset;
168+
}
169+
170+
public void setTimestampOffset(Long timestampOffset) {
171+
this.timestampOffset = timestampOffset;
172+
}
173+
162174
@Override
163175
public boolean check() {
164176
Preconditions.checkNotNull(getType(), "kafka of type is required");

kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public Table genStreamSource(AbstractSourceTableInfo sourceTableInfo, StreamExec
5151
DataStreamSource kafkaSource = env.addSource(kafkaSrc, sourceOperatorName, typeInformation);
5252

5353
setParallelism(kafkaSourceTableInfo.getParallelism(), kafkaSource);
54-
setStartPosition(kafkaSourceTableInfo.getOffsetReset(), topicName, kafkaSrc);
54+
setStartPosition(kafkaSourceTableInfo.getOffsetReset(), topicName, kafkaSrc, () -> kafkaSrc.setStartFromTimestamp(kafkaSourceTableInfo.getTimestampOffset()));
5555
String fields = StringUtils.join(kafkaSourceTableInfo.getFields(), ",");
5656

5757
return tableEnv.fromDataStream(kafkaSource, fields);

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public Table genStreamSource(AbstractSourceTableInfo sourceTableInfo, StreamExec
5252
DataStreamSource kafkaSource = env.addSource(kafkaSrc, sourceOperatorName, typeInformation);
5353

5454
setParallelism(kafkaSourceTableInfo.getParallelism(), kafkaSource);
55-
setStartPosition(kafkaSourceTableInfo.getOffsetReset(), topicName, kafkaSrc);
55+
setStartPosition(kafkaSourceTableInfo.getOffsetReset(), topicName, kafkaSrc, () -> kafkaSrc.setStartFromLatest());
5656
String fields = StringUtils.join(kafkaSourceTableInfo.getFields(), ",");
5757

5858
return tableEnv.fromDataStream(kafkaSource, fields);

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public Table genStreamSource(AbstractSourceTableInfo sourceTableInfo, StreamExec
5555
DataStreamSource kafkaSource = env.addSource(kafkaSrc, sourceOperatorName, typeInformation);
5656

5757
setParallelism(kafkaSourceTableInfo.getParallelism(), kafkaSource);
58-
setStartPosition(kafkaSourceTableInfo.getOffsetReset(), topicName, kafkaSrc);
58+
setStartPosition(kafkaSourceTableInfo.getOffsetReset(), topicName, kafkaSrc, () -> kafkaSrc.setStartFromTimestamp(kafkaSourceTableInfo.getTimestampOffset()));
5959
String fields = StringUtils.join(kafkaSourceTableInfo.getFields(), ",");
6060

6161
return tableEnv.fromDataStream(kafkaSource, fields);

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public Table genStreamSource(AbstractSourceTableInfo sourceTableInfo, StreamExec
5454
DataStreamSource kafkaSource = env.addSource(kafkaSrc, sourceOperatorName, typeInformation);
5555

5656
setParallelism(kafkaSourceTableInfo.getParallelism(), kafkaSource);
57-
setStartPosition(kafkaSourceTableInfo.getOffsetReset(), topicName, kafkaSrc);
57+
setStartPosition(kafkaSourceTableInfo.getOffsetReset(), topicName, kafkaSrc, () -> kafkaSrc.setStartFromTimestamp(kafkaSourceTableInfo.getTimestampOffset()));
5858
String fields = StringUtils.join(kafkaSourceTableInfo.getFields(), ",");
5959

6060
return tableEnv.fromDataStream(kafkaSource, fields);

0 commit comments

Comments
 (0)