Skip to content

Commit 643c285

Browse files
committed
Merge branch 'v1.10.0_dev' into feat_1.10_impalaSinkKerberos_mergedDev
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/util/KrbUtils.java
2 parents 91c87cf + d66d689 commit 643c285

File tree

41 files changed

+1368
-54
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1368
-54
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.constant;
20+
21+
/**
22+
* @program: flinkStreamSQL
23+
* @author: wuren
24+
* @create: 2020/09/15
25+
**/
26+
public class PluginParamConsts {
27+
public static final String PRINCIPAL = "principal";
28+
public static final String KEYTAB = "keytab";
29+
public static final String KRB5_CONF = "krb5conf";
30+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.krb;
20+
21+
import com.google.common.base.Strings;
22+
23+
/**
24+
* @program: flinkStreamSQL
25+
* @author: wuren
26+
* @create: 2020/09/15
27+
**/
28+
public interface KerberosTable {
29+
30+
String getPrincipal();
31+
32+
void setPrincipal(String principal);
33+
34+
String getKeytab();
35+
36+
void setKeytab(String keytab);
37+
38+
String getKrb5conf();
39+
40+
void setKrb5conf(String krb5conf);
41+
42+
boolean isEnableKrb();
43+
44+
void setEnableKrb(boolean enableKrb);
45+
46+
default void judgeKrbEnable() {
47+
boolean allSet =
48+
!Strings.isNullOrEmpty(getPrincipal()) &&
49+
!Strings.isNullOrEmpty(getKeytab()) &&
50+
!Strings.isNullOrEmpty(getKrb5conf());
51+
52+
boolean allNotSet =
53+
Strings.isNullOrEmpty(getPrincipal()) &&
54+
Strings.isNullOrEmpty(getKeytab()) &&
55+
Strings.isNullOrEmpty(getKrb5conf());
56+
57+
if (allSet) {
58+
setEnableKrb(true);
59+
} else if (allNotSet) {
60+
setEnableKrb(false);
61+
} else {
62+
throw new RuntimeException("Missing kerberos parameter! all kerberos params must be set, or all kerberos params are not set");
63+
}
64+
}
65+
}

core/src/main/java/com/dtstack/flink/sql/parser/SqlParser.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,11 @@
2828
import com.google.common.collect.Lists;
2929
import com.google.common.base.Strings;
3030

31+
import java.util.ArrayList;
3132
import java.util.List;
3233
import java.util.Set;
34+
import java.util.regex.Matcher;
35+
import java.util.regex.Pattern;
3336

3437
/**
3538
* Reason:
@@ -51,6 +54,8 @@ public static void setLocalSqlPluginRoot(String localSqlPluginRoot){
5154
LOCAL_SQL_PLUGIN_ROOT = localSqlPluginRoot;
5255
}
5356

57+
private static final Pattern ADD_FIlE_PATTERN = Pattern.compile("(?i).*add\\s+file\\s+.+");
58+
5459
/**
5560
* flink support sql syntax
5661
* CREATE TABLE sls_stream() with ();
@@ -70,6 +75,7 @@ public static SqlTree parseSql(String sql, String pluginLoadMode) throws Excepti
7075
.replace("\t", " ").trim();
7176

7277
List<String> sqlArr = DtStringUtil.splitIgnoreQuota(sql, SQL_DELIMITER);
78+
sqlArr = removeAddFileStmt(sqlArr);
7379
SqlTree sqlTree = new SqlTree();
7480
AbstractTableInfoParser tableInfoParser = new AbstractTableInfoParser();
7581
for(String childSql : sqlArr){
@@ -150,4 +156,18 @@ public static SqlTree parseSql(String sql, String pluginLoadMode) throws Excepti
150156

151157
return sqlTree;
152158
}
159+
160+
/**
161+
* remove add file with statment etc. add file /etc/krb5.conf;
162+
*/
163+
private static List<String> removeAddFileStmt(List<String> stmts) {
164+
List<String> cleanedStmts = new ArrayList<>();
165+
for (String stmt : stmts) {
166+
Matcher matcher = ADD_FIlE_PATTERN.matcher(stmt);
167+
if(!matcher.matches()) {
168+
cleanedStmts.add(stmt);
169+
}
170+
}
171+
return cleanedStmts;
172+
}
153173
}

core/src/main/java/com/dtstack/flink/sql/side/SidePredicatesParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ private void extractPredicateInfo(SqlNode whereNode, List<PredicateInfo> predica
140140
}
141141
}
142142

143-
private void fillPredicateInfoToList(SqlBasicCall whereNode, List<PredicateInfo> predicatesInfoList, String operatorName, SqlKind operatorKind,
143+
private void fillPredicateInfoToList(SqlBasicCall whereNode, List<PredicateInfo> predicatesInfoList, String operatorName, SqlKind operatorKind,
144144
int fieldIndex, int conditionIndex) {
145145
SqlNode sqlNode = whereNode.getOperands()[fieldIndex];
146146
if (sqlNode.getKind() == SqlKind.IDENTIFIER) {

core/src/main/java/com/dtstack/flink/sql/util/KrbUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,5 @@ public static UserGroupInformation getUgi(String principal, String keytabPath, S
5050
UserGroupInformation.setConfiguration(configuration);
5151
return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytabPath);
5252
}
53+
5354
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.parser;
20+
21+
import org.junit.Assert;
22+
import org.junit.Test;
23+
import org.powermock.reflect.Whitebox;
24+
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
28+
/**
29+
* @program: flink.sql
30+
* @author: wuren
31+
* @create: 2020/09/15
32+
**/
33+
public class SqlParserTest {
34+
35+
@Test
36+
public void testRemoveAddFileStmt() throws Exception {
37+
List<String> rawStmts = new ArrayList<>();
38+
String sql1 = " add file asdasdasd ";
39+
String sql2 = " aDd fIle With asdasdasd ";
40+
String sql3 = " INSERT INTO dwd_foo SELECT id, name FROM ods_foo";
41+
String sql4 = " ADD FILE asb ";
42+
rawStmts.add(sql1);
43+
rawStmts.add(sql2);
44+
rawStmts.add(sql3);
45+
rawStmts.add(sql4);
46+
47+
List<String> stmts = Whitebox.invokeMethod(SqlParser.class, "removeAddFileStmt", rawStmts);
48+
Assert.assertEquals(stmts.get(0), sql3);
49+
}
50+
51+
}

docs/plugin/kuduSide.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@
6060
| isFaultTolerant |查询是否容错 查询失败是否扫描第二个副本 默认false 容错 |||
6161
| cache | 维表缓存策略(NONE/LRU/ALL)||NONE|
6262
| partitionedJoin | 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量)||false|
63+
| principal |kerberos用于登录的principal |||
64+
| keytab |keytab文件的路径 |||
65+
| krb5conf |conf文件路径 |||
66+
Kerberos三个参数全部设置则开启Kerberos认证,如果缺少任何一个则会提示缺少参数错误。
67+
如果全部未设置则不开启Kerberos连接Kudu集群。
6368
--------------
6469

6570
## 5.样例
@@ -163,3 +168,20 @@ into
163168
on t1.id = t2.id;
164169
```
165170

171+
## 7.kerberos示例
172+
```
173+
create table dim (
174+
name varchar,
175+
id int,
176+
PERIOD FOR SYSTEM_TIME
177+
) WITH (
178+
type='kudu',
179+
kuduMasters='host1',
180+
tableName='foo',
181+
parallelism ='1',
182+
cache ='ALL',
183+
keytab='foo/foobar.keytab',
184+
krb5conf='bar/krb5.conf',
185+
principal='kudu/host1@DTSTACK.COM'
186+
);
187+
```

docs/plugin/kuduSink.md

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,11 @@ kudu 1.9.0+cdh6.2.0
4242
| defaultOperationTimeoutMs | 操作超时时间 ||
4343
| defaultSocketReadTimeoutMs | socket读取超时时间 ||
4444
| parallelism | 并行度设置||1|
45-
45+
| principal |kerberos用于登录的principal |||
46+
| keytab |keytab文件的路径 |||
47+
| krb5conf |conf文件路径 |||
48+
Kerberos三个参数全部设置则开启Kerberos认证,如果缺少任何一个则会提示缺少参数错误。
49+
如果全部未设置则不开启Kerberos连接Kudu集群。
4650

4751
## 5.样例:
4852
```
@@ -123,4 +127,21 @@ into
123127
### 结果数据
124128
```
125129
{"a":"2","b":"2","c":"3","d":"4"}
126-
```
130+
```
131+
132+
## 7.kerberos示例
133+
```
134+
create table dwd (
135+
name varchar,
136+
id int
137+
) WITH (
138+
type='kudu',
139+
kuduMasters='host1',
140+
tableName='foo',
141+
writeMode='insert',
142+
parallelism ='1',
143+
keytab='foo/foobar.keytab',
144+
krb5conf='bar/krb5.conf',
145+
principal='kudu/host1@DTSTACK.COM'
146+
);
147+
```

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/AbstractKafkaSource.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ protected Properties getKafkaProperties(KafkaSourceTableInfo kafkaSourceTableInf
5454

5555
if (DtStringUtil.isJson(kafkaSourceTableInfo.getOffsetReset())) {
5656
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, EKafkaOffset.NONE.name().toLowerCase());
57+
} else if(StringUtils.equalsIgnoreCase(EKafkaOffset.TIMESTAMP.name().toLowerCase(), kafkaSourceTableInfo.getOffsetReset())){
58+
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, EKafkaOffset.EARLIEST.name().toLowerCase());
5759
} else {
5860
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaSourceTableInfo.getOffsetReset());
5961
}
@@ -95,10 +97,12 @@ protected void setParallelism(Integer parallelism, DataStreamSource kafkaSource)
9597
}
9698
}
9799

98-
protected void setStartPosition(String offset, String topicName, FlinkKafkaConsumerBase<Row> kafkaSrc) {
100+
protected void setStartPosition(String offset, String topicName, FlinkKafkaConsumerBase<Row> kafkaSrc, Runnable runnable) {
99101
if (StringUtils.equalsIgnoreCase(offset, EKafkaOffset.EARLIEST.name())) {
100102
kafkaSrc.setStartFromEarliest();
101-
} else if (DtStringUtil.isJson(offset)) {
103+
} else if(StringUtils.equalsIgnoreCase(offset, EKafkaOffset.TIMESTAMP.name())) {
104+
runnable.run();
105+
}else if (DtStringUtil.isJson(offset)) {
102106
Map<KafkaTopicPartition, Long> specificStartupOffsets = buildOffsetMap(offset, topicName);
103107
kafkaSrc.setStartFromSpecificOffsets(specificStartupOffsets);
104108
} else {

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/enums/EKafkaOffset.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,6 @@ public enum EKafkaOffset {
2727

2828
LATEST,
2929
EARLIEST,
30+
TIMESTAMP,
3031
NONE
3132
}

0 commit comments

Comments
 (0)