Skip to content

Commit 25fadc4

Browse files
committed
Merge branch 'hotfix_1.10_4.0.x_30358' into '1.10_release_4.0.x'
Hotfix 1.10 4.0.x 30358 See merge request dt-insight-engine/flinkStreamSQL!114
2 parents 25316fe + ad8a4c2 commit 25fadc4

File tree

3 files changed

+106
-70
lines changed

3 files changed

+106
-70
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.util;
20+
21+
import org.apache.flink.util.Preconditions;
22+
23+
import java.io.File;
24+
25+
/**
26+
* @program: flinkStreamSQL
27+
* @author: wuren
28+
* @create: 2020/09/21
29+
**/
30+
public class DtFileUtils {
31+
public static void checkExists(String path) {
32+
File file = new File(path);
33+
String errorMsg = "%s file is not exist!";
34+
Preconditions.checkState(file.exists(), errorMsg, path);
35+
}
36+
}

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,30 +20,36 @@
2020

2121
package com.dtstack.flink.sql.side.hbase;
2222

23+
import com.dtstack.flink.sql.factory.DTThreadFactory;
24+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2325
import com.dtstack.flink.sql.side.BaseAsyncReqRow;
2426
import com.dtstack.flink.sql.side.FieldInfo;
2527
import com.dtstack.flink.sql.side.JoinInfo;
26-
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2728
import com.dtstack.flink.sql.side.hbase.rowkeydealer.AbstractRowKeyModeDealer;
2829
import com.dtstack.flink.sql.side.hbase.rowkeydealer.PreRowKeyModeDealerDealer;
2930
import com.dtstack.flink.sql.side.hbase.rowkeydealer.RowKeyEqualModeDealer;
3031
import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo;
31-
import com.dtstack.flink.sql.factory.DTThreadFactory;
3232
import com.dtstack.flink.sql.side.hbase.utils.HbaseConfigUtils;
33+
import com.dtstack.flink.sql.util.DtFileUtils;
3334
import com.stumbleupon.async.Deferred;
35+
import org.apache.commons.collections.MapUtils;
3436
import org.apache.commons.lang3.StringUtils;
3537
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3638
import org.apache.flink.configuration.Configuration;
37-
import org.apache.flink.types.Row;
38-
import org.apache.flink.table.dataformat.BaseRow;
39+
import org.apache.flink.runtime.security.DynamicConfiguration;
40+
import org.apache.flink.runtime.security.KerberosUtils;
3941
import org.apache.flink.streaming.api.functions.async.ResultFuture;
42+
import org.apache.flink.table.dataformat.BaseRow;
43+
import org.apache.flink.types.Row;
4044
import org.apache.hadoop.security.authentication.util.KerberosName;
4145
import org.hbase.async.Config;
4246
import org.hbase.async.HBaseClient;
4347
import org.slf4j.Logger;
4448
import org.slf4j.LoggerFactory;
4549
import sun.security.krb5.KrbException;
4650

51+
import javax.security.auth.login.AppConfigurationEntry;
52+
import java.io.File;
4753
import java.util.List;
4854
import java.util.Map;
4955
import java.util.concurrent.ExecutorService;
@@ -95,21 +101,30 @@ public void open(Configuration parameters) throws Exception {
95101

96102
ExecutorService executorService =new ThreadPoolExecutor(DEFAULT_POOL_SIZE, DEFAULT_POOL_SIZE,
97103
0L, TimeUnit.MILLISECONDS,
98-
new LinkedBlockingQueue<>(), new DTThreadFactory("hbase-aysnc"));
104+
new LinkedBlockingQueue<>(), new DTThreadFactory("hbase-async"));
99105

100106
Config config = new Config();
101107
config.overrideConfig(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, hbaseSideTableInfo.getHost());
102108
config.overrideConfig(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM, hbaseSideTableInfo.getParent());
103-
HbaseConfigUtils.loadKrb5Conf(hbaseConfig);
104109
hbaseConfig.entrySet().forEach(entity -> {
105110
config.overrideConfig(entity.getKey(), (String) entity.getValue());
106111
});
107112

108-
if (HbaseConfigUtils.asyncOpenKerberos(hbaseConfig)) {
109-
String jaasStr = HbaseConfigUtils.buildJaasStr(hbaseConfig);
110-
String jaasFilePath = HbaseConfigUtils.creatJassFile(jaasStr);
111-
System.setProperty(HbaseConfigUtils.KEY_JAVA_SECURITY_AUTH_LOGIN_CONF, jaasFilePath);
112-
config.overrideConfig(HbaseConfigUtils.KEY_JAVA_SECURITY_AUTH_LOGIN_CONF, jaasFilePath);
113+
if (HbaseConfigUtils.isEnableKerberos(hbaseConfig)) {
114+
HbaseConfigUtils.loadKrb5Conf(hbaseConfig);
115+
String principal = MapUtils.getString(hbaseConfig, HbaseConfigUtils.KEY_PRINCIPAL);
116+
HbaseConfigUtils.checkOpt(principal, HbaseConfigUtils.KEY_PRINCIPAL);
117+
String regionserverPrincipal = MapUtils.getString(hbaseConfig, HbaseConfigUtils.KEY_HBASE_KERBEROS_REGIONSERVER_PRINCIPAL);
118+
HbaseConfigUtils.checkOpt(regionserverPrincipal, HbaseConfigUtils.KEY_HBASE_KERBEROS_REGIONSERVER_PRINCIPAL);
119+
String keytab = MapUtils.getString(hbaseConfig, HbaseConfigUtils.KEY_KEY_TAB);
120+
HbaseConfigUtils.checkOpt(keytab, HbaseConfigUtils.KEY_KEY_TAB);
121+
String keytabPath = System.getProperty("user.dir") + File.separator + keytab;
122+
DtFileUtils.checkExists(keytabPath);
123+
124+
LOG.info("Kerberos login with keytab: {} and principal: {}", keytab, principal);
125+
String name = "HBaseClient";
126+
config.overrideConfig("hbase.sasl.clientconfig", name);
127+
appendJaasConf(name, keytab, principal);
113128
refreshConfig();
114129
}
115130

@@ -144,7 +159,17 @@ private void refreshConfig() throws KrbException {
144159
sun.security.krb5.Config.refresh();
145160
KerberosName.resetDefaultRealm();
146161
//reload java.security.auth.login.config
147-
javax.security.auth.login.Configuration.setConfiguration(null);
162+
// javax.security.auth.login.Configuration.setConfiguration(null);
163+
}
164+
165+
private void appendJaasConf(String name, String keytab, String principal) {
166+
javax.security.auth.login.Configuration priorConfig = javax.security.auth.login.Configuration.getConfiguration();
167+
// construct a dynamic JAAS configuration
168+
DynamicConfiguration currentConfig = new DynamicConfiguration(priorConfig);
169+
// wire up the configured JAAS login contexts to use the krb5 entries
170+
AppConfigurationEntry krb5Entry = KerberosUtils.keytabEntry(keytab, principal);
171+
currentConfig.addAppConfigurationEntry(name, krb5Entry);
172+
javax.security.auth.login.Configuration.setConfiguration(currentConfig);
148173
}
149174

150175
@Override

hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/utils/HbaseConfigUtils.java

Lines changed: 33 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818

1919
package com.dtstack.flink.sql.side.hbase.utils;
2020

21+
import com.dtstack.flink.sql.util.DtFileUtils;
22+
import com.google.common.base.Preconditions;
23+
import com.google.common.base.Strings;
2124
import org.apache.commons.collections.MapUtils;
2225
import org.apache.commons.lang3.StringUtils;
2326
import org.apache.hadoop.conf.Configuration;
@@ -26,9 +29,7 @@
2629
import org.slf4j.Logger;
2730
import org.slf4j.LoggerFactory;
2831

29-
import java.io.BufferedWriter;
3032
import java.io.File;
31-
import java.io.FileWriter;
3233
import java.io.IOException;
3334
import java.util.Arrays;
3435
import java.util.List;
@@ -57,21 +58,15 @@ public class HbaseConfigUtils {
5758
// async side kerberos
5859
private final static String KEY_HBASE_SECURITY_AUTH_ENABLE = "hbase.security.auth.enable";
5960
private final static String KEY_HBASE_SASL_CLIENTCONFIG = "hbase.sasl.clientconfig";
60-
private final static String KEY_HBASE_KERBEROS_REGIONSERVER_PRINCIPAL = "hbase.kerberos.regionserver.principal";
61+
public final static String KEY_HBASE_KERBEROS_REGIONSERVER_PRINCIPAL = "hbase.kerberos.regionserver.principal";
6162
public static final String KEY_KEY_TAB = "hbase.keytab";
62-
private static final String KEY_PRINCIPAL = "hbase.principal";
63+
public static final String KEY_PRINCIPAL = "hbase.principal";
6364

6465
public final static String KEY_HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
6566
public final static String KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM = "hbase.zookeeper.znode.parent";
6667

6768

6869
private static final String KEY_JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
69-
public static final String KEY_JAVA_SECURITY_AUTH_LOGIN_CONF = "java.security.auth.login.config";
70-
71-
72-
private static final String SP = File.separator;
73-
private static final String KEY_KRB5_CONF = "krb5.conf";
74-
7570

7671
private static List<String> KEYS_KERBEROS_REQUIRED = Arrays.asList(
7772
KEY_HBASE_SECURITY_AUTHENTICATION,
@@ -100,23 +95,29 @@ public static Configuration getConfig(Map<String, Object> hbaseConfigMap) {
10095
return hConfiguration;
10196
}
10297

103-
public static boolean openKerberos(Map<String, Object> hbaseConfigMap) {
104-
if (!MapUtils.getBooleanValue(hbaseConfigMap, KEY_HBASE_SECURITY_AUTHORIZATION)) {
105-
return false;
98+
public static boolean isEnableKerberos(Map<String, Object> hbaseConfigMap) {
99+
boolean hasAuthorization = AUTHENTICATION_TYPE.equalsIgnoreCase(
100+
MapUtils.getString(hbaseConfigMap, KEY_HBASE_SECURITY_AUTHORIZATION)
101+
);
102+
boolean hasAuthentication = AUTHENTICATION_TYPE.equalsIgnoreCase(
103+
MapUtils.getString(hbaseConfigMap, KEY_HBASE_SECURITY_AUTHENTICATION)
104+
);
105+
boolean hasAuthEnable = MapUtils.getBooleanValue(hbaseConfigMap, KEY_HBASE_SECURITY_AUTH_ENABLE);
106+
107+
if(hasAuthentication || hasAuthorization || hasAuthEnable) {
108+
LOG.info("Enable kerberos for hbase.");
109+
setKerberosConf(hbaseConfigMap);
110+
return true;
106111
}
107-
return AUTHENTICATION_TYPE.equalsIgnoreCase(MapUtils.getString(hbaseConfigMap, KEY_HBASE_SECURITY_AUTHENTICATION));
112+
return false;
108113
}
109114

110-
public static boolean asyncOpenKerberos(Map<String, Object> hbaseConfigMap) {
111-
if (!MapUtils.getBooleanValue(hbaseConfigMap, KEY_HBASE_SECURITY_AUTH_ENABLE)) {
112-
return false;
113-
}
114-
return AUTHENTICATION_TYPE.equalsIgnoreCase(MapUtils.getString(hbaseConfigMap, KEY_HBASE_SECURITY_AUTHENTICATION));
115+
private static void setKerberosConf(Map<String,Object> hbaseConfigMap) {
116+
hbaseConfigMap.put(KEY_HBASE_SECURITY_AUTHORIZATION, AUTHENTICATION_TYPE);
117+
hbaseConfigMap.put(KEY_HBASE_SECURITY_AUTHENTICATION, AUTHENTICATION_TYPE);
118+
hbaseConfigMap.put(KEY_HBASE_SECURITY_AUTH_ENABLE, true);
115119
}
116120

117-
118-
119-
120121
public static Configuration getHadoopConfiguration(Map<String, Object> hbaseConfigMap) {
121122
for (String key : KEYS_KERBEROS_REQUIRED) {
122123
if (StringUtils.isEmpty(MapUtils.getString(hbaseConfigMap, key))) {
@@ -157,46 +158,20 @@ public static String getKeytab(Map<String, Object> hbaseConfigMap) {
157158
throw new IllegalArgumentException("");
158159
}
159160

160-
public static void loadKrb5Conf(Map<String, Object> kerberosConfig) {
161-
String krb5FilePath = System.getProperty("user.dir") + File.separator + MapUtils.getString(kerberosConfig, KEY_JAVA_SECURITY_KRB5_CONF);
162-
if (!org.apache.commons.lang.StringUtils.isEmpty(krb5FilePath)) {
163-
System.setProperty(KEY_JAVA_SECURITY_KRB5_CONF, krb5FilePath);;
164-
}
165-
}
166-
167-
public static String creatJassFile(String configStr) throws IOException {
168-
String fileName = System.getProperty("user.dir");
169-
File krbConf = new File(fileName);
170-
File temp = File.createTempFile("JAAS", ".conf", krbConf);
171-
temp.deleteOnExit();
172-
BufferedWriter out = new BufferedWriter(new FileWriter(temp, false));
173-
out.write(configStr + "\n");
174-
out.close();
175-
return temp.getAbsolutePath();
161+
public static void loadKrb5Conf(Map<String, Object> config) {
162+
String krb5conf = MapUtils.getString(config, KEY_JAVA_SECURITY_KRB5_CONF);
163+
checkOpt(krb5conf, KEY_JAVA_SECURITY_KRB5_CONF);
164+
String krb5FilePath = System.getProperty("user.dir") + File.separator + MapUtils.getString(config, KEY_JAVA_SECURITY_KRB5_CONF);
165+
DtFileUtils.checkExists(krb5FilePath);
166+
System.setProperty(KEY_JAVA_SECURITY_KRB5_CONF, krb5FilePath);
167+
LOG.info("{} is set to {}", KEY_JAVA_SECURITY_KRB5_CONF, krb5FilePath);
176168
}
177169

178-
public static String buildJaasStr(Map<String, Object> kerberosConfig) {
179-
for (String key : ASYNC_KEYS_KERBEROS_REQUIRED) {
180-
if (StringUtils.isEmpty(MapUtils.getString(kerberosConfig, key))) {
181-
throw new IllegalArgumentException(String.format("Must provide [%s] when authentication is Kerberos", key));
182-
}
183-
}
184-
185-
String keyTab = System.getProperty("user.dir") + File.separator + MapUtils.getString(kerberosConfig, KEY_KEY_TAB);
186-
String principal = MapUtils.getString(kerberosConfig, KEY_PRINCIPAL);
187-
188-
StringBuilder jaasSB = new StringBuilder("Client {\n" +
189-
" com.sun.security.auth.module.Krb5LoginModule required\n" +
190-
" useKeyTab=true\n" +
191-
" useTicketCache=false\n");
192-
jaasSB.append(" keyTab=\"").append(keyTab).append("\"").append("\n");
193-
jaasSB.append(" principal=\"").append(principal).append("\"").append(";\n");
194-
jaasSB.append("};");
195-
return jaasSB.toString();
170+
// TODO 日后改造可以下沉到Core模块
171+
public static void checkOpt(String opt, String key) {
172+
Preconditions.checkState(!Strings.isNullOrEmpty(opt), "%s must be set!", key);
196173
}
197174

198-
199-
200175
public static UserGroupInformation loginAndReturnUGI(Configuration conf, String principal, String keytab) throws IOException {
201176
if (conf == null) {
202177
throw new IllegalArgumentException("kerberos conf can not be null");

0 commit comments

Comments
 (0)