Skip to content

Commit a13823d

Browse files
committed
complete kudu kerberos
1 parent 67c2378 commit a13823d

File tree

13 files changed

+454
-45
lines changed

13 files changed

+454
-45
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.constant;
20+
21+
/**
22+
* @program: flinkStreamSQL
23+
* @author: wuren
24+
* @create: 2020/09/15
25+
**/
26+
public class PluginParamConsts {
27+
public static final String PRINCIPAL = "principal";
28+
public static final String KEYTAB = "keytab";
29+
public static final String KRB5_CONF = "krb5conf";
30+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.krb;
20+
21+
import com.google.common.base.Strings;
22+
23+
/**
24+
* @program: flinkStreamSQL
25+
* @author: wuren
26+
* @create: 2020/09/15
27+
**/
28+
public interface KerberosTable {
29+
30+
String getPrincipal();
31+
32+
void setPrincipal(String principal);
33+
34+
String getKeytab();
35+
36+
void setKeytab(String keytab);
37+
38+
String getKrb5conf();
39+
40+
void setKrb5conf(String krb5conf);
41+
42+
boolean isEnableKrb();
43+
44+
void setEnableKrb(boolean enableKrb);
45+
46+
default void judgeKrbEnable() {
47+
boolean allSet =
48+
!Strings.isNullOrEmpty(getPrincipal()) &&
49+
!Strings.isNullOrEmpty(getKeytab()) &&
50+
!Strings.isNullOrEmpty(getKrb5conf());
51+
52+
boolean allNotSet =
53+
Strings.isNullOrEmpty(getPrincipal()) &&
54+
Strings.isNullOrEmpty(getKeytab()) &&
55+
Strings.isNullOrEmpty(getKrb5conf());
56+
57+
if (allSet) {
58+
setEnableKrb(true);
59+
} else if (allNotSet) {
60+
setEnableKrb(false);
61+
} else {
62+
throw new RuntimeException("Missing kerberos parameter! all kerberos params must be set, or all kerberos params are not set");
63+
}
64+
}
65+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.util;
20+
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.security.UserGroupInformation;
23+
24+
import java.io.IOException;
25+
26+
/**
27+
* @program: flinkStreamSQL
28+
* @author: wuren
29+
* @create: 2020/09/14
30+
**/
31+
public class KrbUtils {
32+
33+
public static final String KRB5_CONF_KEY = "java.security.krb5.conf";
34+
public static final String HADOOP_AUTH_KEY = "hadoop.security.authentication";
35+
public static final String KRB_STR = "Kerberos";
36+
public static final String FALSE_STR = "false";
37+
public static final String SUBJECT_ONLY_KEY = "javax.security.auth.useSubjectCredsOnly";
38+
39+
public static UserGroupInformation getUgi(String principal, String keytabPath, String krb5confPath) throws IOException {
40+
System.setProperty(KRB5_CONF_KEY, krb5confPath);
41+
// System.setProperty(SUBJECT_ONLY_KEY, FALSE_STR);
42+
Configuration configuration = new Configuration();
43+
configuration.set(HADOOP_AUTH_KEY , KRB_STR);
44+
UserGroupInformation.setConfiguration(configuration);
45+
return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytabPath);
46+
}
47+
48+
public void checkKrbParams() {
49+
50+
}
51+
52+
}

docs/plugin/kuduSide.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@
6060
| isFaultTolerant |查询是否容错 查询失败是否扫描第二个副本 默认false 容错 |||
6161
| cache | 维表缓存策略(NONE/LRU/ALL)||NONE|
6262
| partitionedJoin | 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量)||false|
63+
| principal |kerberos用于登录的principal |||
64+
| keytab |keytab文件的路径 |||
65+
| krb5conf |conf文件路径 |||
66+
Kerberos三个参数全部设置则开启Kerberos认证,如果缺少任何一个则会提示缺少参数错误。
67+
如果全部未设置则不开启Kerberos连接Kudu集群。
6368
--------------
6469

6570
## 5.样例
@@ -163,3 +168,20 @@ into
163168
on t1.id = t2.id;
164169
```
165170

171+
## 7.kerberos示例
172+
```
173+
create table dim (
174+
name varchar,
175+
id int,
176+
PERIOD FOR SYSTEM_TIME
177+
) WITH (
178+
type='kudu',
179+
kuduMasters='host1',
180+
tableName='foo',
181+
parallelism ='1',
182+
cache ='ALL',
183+
keytab='foo/foobar.keytab',
184+
krb5conf='bar/krb5.conf',
185+
principal='kudu/host1@DTSTACK.COM'
186+
);
187+
```

docs/plugin/kuduSink.md

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,11 @@ kudu 1.9.0+cdh6.2.0
4242
| defaultOperationTimeoutMs | 操作超时时间 ||
4343
| defaultSocketReadTimeoutMs | socket读取超时时间 ||
4444
| parallelism | 并行度设置||1|
45-
45+
| principal |kerberos用于登录的principal |||
46+
| keytab |keytab文件的路径 |||
47+
| krb5conf |conf文件路径 |||
48+
Kerberos三个参数全部设置则开启Kerberos认证,如果缺少任何一个则会提示缺少参数错误。
49+
如果全部未设置则不开启Kerberos连接Kudu集群。
4650

4751
## 5.样例:
4852
```
@@ -123,4 +127,21 @@ into
123127
### 结果数据
124128
```
125129
{"a":"2","b":"2","c":"3","d":"4"}
126-
```
130+
```
131+
132+
## 7.kerberos示例
133+
```
134+
create table dwd (
135+
name varchar,
136+
id int
137+
) WITH (
138+
type='kudu',
139+
kuduMasters='host1',
140+
tableName='foo',
141+
writeMode='insert',
142+
parallelism ='1',
143+
keytab='foo/foobar.keytab',
144+
krb5conf='bar/krb5.conf',
145+
principal='kudu/host1@DTSTACK.COM'
146+
);
147+
```

kudu/kudu-side/kudu-all-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAllReqRow.java

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
88
import com.dtstack.flink.sql.side.kudu.table.KuduSideTableInfo;
99
import com.dtstack.flink.sql.side.kudu.utils.KuduUtil;
10+
import com.dtstack.flink.sql.util.KrbUtils;
1011
import com.dtstack.flink.sql.util.RowDataComplete;
1112
import com.google.common.base.Preconditions;
1213
import com.google.common.collect.Lists;
@@ -18,6 +19,7 @@
1819
import org.apache.flink.table.dataformat.BaseRow;
1920
import org.apache.flink.types.Row;
2021
import org.apache.flink.util.Collector;
22+
import org.apache.hadoop.security.UserGroupInformation;
2123
import org.apache.kudu.ColumnSchema;
2224
import org.apache.kudu.Schema;
2325
import org.apache.kudu.client.KuduClient;
@@ -31,6 +33,8 @@
3133
import org.slf4j.Logger;
3234
import org.slf4j.LoggerFactory;
3335

36+
import java.io.IOException;
37+
import java.security.PrivilegedAction;
3438
import java.sql.SQLException;
3539
import java.util.Arrays;
3640
import java.util.Calendar;
@@ -211,24 +215,8 @@ private String buildKey(Map<String, Object> val, List<String> equalFieldList) {
211215
private KuduScanner getConn(KuduSideTableInfo tableInfo) {
212216
try {
213217
if (client == null) {
214-
String kuduMasters = tableInfo.getKuduMasters();
215218
String tableName = tableInfo.getTableName();
216-
Integer workerCount = tableInfo.getWorkerCount();
217-
Integer defaultSocketReadTimeoutMs = tableInfo.getDefaultSocketReadTimeoutMs();
218-
Integer defaultOperationTimeoutMs = tableInfo.getDefaultOperationTimeoutMs();
219-
220-
Preconditions.checkNotNull(kuduMasters, "kuduMasters could not be null");
221-
222-
KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(kuduMasters);
223-
if (null != workerCount) {
224-
kuduClientBuilder.workerCount(workerCount);
225-
}
226-
227-
if (null != defaultOperationTimeoutMs) {
228-
kuduClientBuilder.defaultOperationTimeoutMs(defaultOperationTimeoutMs);
229-
}
230-
client = kuduClientBuilder.build();
231-
219+
client = getClient(tableInfo);
232220
if (!client.tableExists(tableName)) {
233221
throw new IllegalArgumentException("Table Open Failed , please check table exists");
234222
}
@@ -243,7 +231,35 @@ private KuduScanner getConn(KuduSideTableInfo tableInfo) {
243231
}
244232
}
245233

234+
private KuduClient getClient(KuduSideTableInfo tableInfo) throws IOException {
235+
String kuduMasters = tableInfo.getKuduMasters();
236+
Integer workerCount = tableInfo.getWorkerCount();
237+
Integer defaultOperationTimeoutMs = tableInfo.getDefaultOperationTimeoutMs();
238+
239+
Preconditions.checkNotNull(kuduMasters, "kuduMasters could not be null");
240+
241+
KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(kuduMasters);
246242

243+
if (null != workerCount) {
244+
kuduClientBuilder.workerCount(workerCount);
245+
}
246+
247+
if (null != defaultOperationTimeoutMs) {
248+
kuduClientBuilder.defaultOperationTimeoutMs(defaultOperationTimeoutMs);
249+
}
250+
251+
if (tableInfo.isEnableKrb()) {
252+
UserGroupInformation ugi = KrbUtils.getUgi(tableInfo.getPrincipal(), tableInfo.getKeytab(), tableInfo.getKrb5conf());
253+
return ugi.doAs(new PrivilegedAction<KuduClient>() {
254+
@Override
255+
public KuduClient run() {
256+
return kuduClientBuilder.build();
257+
}
258+
});
259+
} else {
260+
return kuduClientBuilder.build();
261+
}
262+
}
247263
/**
248264
* @param builder 创建AsyncKuduScanner对象
249265
* @param schema kudu中表约束

0 commit comments

Comments
 (0)