Skip to content

Commit ad8a4c2

Browse files
committed
optmize code hbase async side
1 parent f9cd1d8 commit ad8a4c2

File tree

3 files changed

+103
-82
lines changed

3 files changed

+103
-82
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.util;
20+
21+
import org.apache.flink.util.Preconditions;
22+
23+
import java.io.File;
24+
25+
/**
26+
* @program: flinkStreamSQL
27+
* @author: wuren
28+
* @create: 2020/09/21
29+
**/
30+
public class DtFileUtils {
31+
public static void checkExists(String path) {
32+
File file = new File(path);
33+
String errorMsg = "%s file is not exist!";
34+
Preconditions.checkState(file.exists(), errorMsg, path);
35+
}
36+
}

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java

Lines changed: 35 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,34 +20,36 @@
2020

2121
package com.dtstack.flink.sql.side.hbase;
2222

23+
import com.dtstack.flink.sql.factory.DTThreadFactory;
24+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2325
import com.dtstack.flink.sql.side.BaseAsyncReqRow;
2426
import com.dtstack.flink.sql.side.FieldInfo;
2527
import com.dtstack.flink.sql.side.JoinInfo;
26-
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2728
import com.dtstack.flink.sql.side.hbase.rowkeydealer.AbstractRowKeyModeDealer;
2829
import com.dtstack.flink.sql.side.hbase.rowkeydealer.PreRowKeyModeDealerDealer;
2930
import com.dtstack.flink.sql.side.hbase.rowkeydealer.RowKeyEqualModeDealer;
3031
import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo;
31-
import com.dtstack.flink.sql.factory.DTThreadFactory;
3232
import com.dtstack.flink.sql.side.hbase.utils.HbaseConfigUtils;
33+
import com.dtstack.flink.sql.util.DtFileUtils;
3334
import com.stumbleupon.async.Deferred;
3435
import org.apache.commons.collections.MapUtils;
3536
import org.apache.commons.lang3.StringUtils;
3637
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3738
import org.apache.flink.configuration.Configuration;
38-
import org.apache.flink.types.Row;
39-
import org.apache.flink.table.dataformat.BaseRow;
39+
import org.apache.flink.runtime.security.DynamicConfiguration;
40+
import org.apache.flink.runtime.security.KerberosUtils;
4041
import org.apache.flink.streaming.api.functions.async.ResultFuture;
41-
import org.apache.hadoop.security.UserGroupInformation;
42+
import org.apache.flink.table.dataformat.BaseRow;
43+
import org.apache.flink.types.Row;
4244
import org.apache.hadoop.security.authentication.util.KerberosName;
4345
import org.hbase.async.Config;
4446
import org.hbase.async.HBaseClient;
4547
import org.slf4j.Logger;
4648
import org.slf4j.LoggerFactory;
4749
import sun.security.krb5.KrbException;
4850

51+
import javax.security.auth.login.AppConfigurationEntry;
4952
import java.io.File;
50-
import java.security.PrivilegedExceptionAction;
5153
import java.util.List;
5254
import java.util.Map;
5355
import java.util.concurrent.ExecutorService;
@@ -99,39 +101,34 @@ public void open(Configuration parameters) throws Exception {
99101

100102
ExecutorService executorService =new ThreadPoolExecutor(DEFAULT_POOL_SIZE, DEFAULT_POOL_SIZE,
101103
0L, TimeUnit.MILLISECONDS,
102-
new LinkedBlockingQueue<>(), new DTThreadFactory("hbase-aysnc"));
104+
new LinkedBlockingQueue<>(), new DTThreadFactory("hbase-async"));
103105

104106
Config config = new Config();
105107
config.overrideConfig(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, hbaseSideTableInfo.getHost());
106108
config.overrideConfig(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM, hbaseSideTableInfo.getParent());
107-
HbaseConfigUtils.loadKrb5Conf(hbaseConfig);
108109
hbaseConfig.entrySet().forEach(entity -> {
109110
config.overrideConfig(entity.getKey(), (String) entity.getValue());
110111
});
111112

112-
String principal = null;
113-
String keyTab = null;
114-
if (HbaseConfigUtils.asyncOpenKerberos(hbaseConfig)) {
115-
principal = MapUtils.getString(hbaseConfig, HbaseConfigUtils.KEY_PRINCIPAL);
116-
keyTab = System.getProperty("user.dir") + File.separator + MapUtils.getString(hbaseConfig, HbaseConfigUtils.KEY_KEY_TAB);
117-
String jaasStr = HbaseConfigUtils.buildJaasStr(hbaseConfig, principal, keyTab);
118-
String jaasFilePath = HbaseConfigUtils.creatJassFile(jaasStr);
119-
System.setProperty(HbaseConfigUtils.KEY_JAVA_SECURITY_AUTH_LOGIN_CONF, jaasFilePath);
120-
config.overrideConfig(HbaseConfigUtils.KEY_JAVA_SECURITY_AUTH_LOGIN_CONF, jaasFilePath);
113+
if (HbaseConfigUtils.isEnableKerberos(hbaseConfig)) {
114+
HbaseConfigUtils.loadKrb5Conf(hbaseConfig);
115+
String principal = MapUtils.getString(hbaseConfig, HbaseConfigUtils.KEY_PRINCIPAL);
116+
HbaseConfigUtils.checkOpt(principal, HbaseConfigUtils.KEY_PRINCIPAL);
117+
String regionserverPrincipal = MapUtils.getString(hbaseConfig, HbaseConfigUtils.KEY_HBASE_KERBEROS_REGIONSERVER_PRINCIPAL);
118+
HbaseConfigUtils.checkOpt(regionserverPrincipal, HbaseConfigUtils.KEY_HBASE_KERBEROS_REGIONSERVER_PRINCIPAL);
119+
String keytab = MapUtils.getString(hbaseConfig, HbaseConfigUtils.KEY_KEY_TAB);
120+
HbaseConfigUtils.checkOpt(keytab, HbaseConfigUtils.KEY_KEY_TAB);
121+
String keytabPath = System.getProperty("user.dir") + File.separator + keytab;
122+
DtFileUtils.checkExists(keytabPath);
123+
124+
LOG.info("Kerberos login with keytab: {} and principal: {}", keytab, principal);
125+
String name = "HBaseClient";
126+
config.overrideConfig("hbase.sasl.clientconfig", name);
127+
appendJaasConf(name, keytab, principal);
121128
refreshConfig();
122129
}
123130

124-
hBaseClient = null;
125-
if (HbaseConfigUtils.asyncOpenKerberos(hbaseConfig)) {
126-
hBaseClient = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTab).doAs(new PrivilegedExceptionAction<HBaseClient>() {
127-
@Override
128-
public HBaseClient run() throws Exception {
129-
return new HBaseClient(config, executorService);
130-
}
131-
});
132-
} else {
133-
hBaseClient = new HBaseClient(config, executorService);
134-
}
131+
hBaseClient = new HBaseClient(config, executorService);
135132

136133
try {
137134
Deferred deferred = hBaseClient.ensureTableExists(tableName)
@@ -165,6 +162,16 @@ private void refreshConfig() throws KrbException {
165162
// javax.security.auth.login.Configuration.setConfiguration(null);
166163
}
167164

165+
private void appendJaasConf(String name, String keytab, String principal) {
166+
javax.security.auth.login.Configuration priorConfig = javax.security.auth.login.Configuration.getConfiguration();
167+
// construct a dynamic JAAS configuration
168+
DynamicConfiguration currentConfig = new DynamicConfiguration(priorConfig);
169+
// wire up the configured JAAS login contexts to use the krb5 entries
170+
AppConfigurationEntry krb5Entry = KerberosUtils.keytabEntry(keytab, principal);
171+
currentConfig.addAppConfigurationEntry(name, krb5Entry);
172+
javax.security.auth.login.Configuration.setConfiguration(currentConfig);
173+
}
174+
168175
@Override
169176
public void handleAsyncInvoke(Map<String, Object> inputParams, Row input, ResultFuture<BaseRow> resultFuture) throws Exception {
170177
rowKeyMode.asyncGetData(tableName, buildCacheKey(inputParams), input, resultFuture, sideInfo.getSideCache());

hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/utils/HbaseConfigUtils.java

Lines changed: 32 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818

1919
package com.dtstack.flink.sql.side.hbase.utils;
2020

21+
import com.dtstack.flink.sql.util.DtFileUtils;
22+
import com.google.common.base.Preconditions;
23+
import com.google.common.base.Strings;
2124
import org.apache.commons.collections.MapUtils;
2225
import org.apache.commons.lang3.StringUtils;
2326
import org.apache.hadoop.conf.Configuration;
@@ -26,9 +29,7 @@
2629
import org.slf4j.Logger;
2730
import org.slf4j.LoggerFactory;
2831

29-
import java.io.BufferedWriter;
3032
import java.io.File;
31-
import java.io.FileWriter;
3233
import java.io.IOException;
3334
import java.util.Arrays;
3435
import java.util.List;
@@ -57,7 +58,7 @@ public class HbaseConfigUtils {
5758
// async side kerberos
5859
private final static String KEY_HBASE_SECURITY_AUTH_ENABLE = "hbase.security.auth.enable";
5960
private final static String KEY_HBASE_SASL_CLIENTCONFIG = "hbase.sasl.clientconfig";
60-
private final static String KEY_HBASE_KERBEROS_REGIONSERVER_PRINCIPAL = "hbase.kerberos.regionserver.principal";
61+
public final static String KEY_HBASE_KERBEROS_REGIONSERVER_PRINCIPAL = "hbase.kerberos.regionserver.principal";
6162
public static final String KEY_KEY_TAB = "hbase.keytab";
6263
public static final String KEY_PRINCIPAL = "hbase.principal";
6364

@@ -66,12 +67,6 @@ public class HbaseConfigUtils {
6667

6768

6869
private static final String KEY_JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
69-
public static final String KEY_JAVA_SECURITY_AUTH_LOGIN_CONF = "java.security.auth.login.config";
70-
71-
72-
private static final String SP = File.separator;
73-
private static final String KEY_KRB5_CONF = "krb5.conf";
74-
7570

7671
private static List<String> KEYS_KERBEROS_REQUIRED = Arrays.asList(
7772
KEY_HBASE_SECURITY_AUTHENTICATION,
@@ -100,23 +95,29 @@ public static Configuration getConfig(Map<String, Object> hbaseConfigMap) {
10095
return hConfiguration;
10196
}
10297

103-
public static boolean openKerberos(Map<String, Object> hbaseConfigMap) {
104-
if (!MapUtils.getBooleanValue(hbaseConfigMap, KEY_HBASE_SECURITY_AUTHORIZATION)) {
105-
return false;
98+
public static boolean isEnableKerberos(Map<String, Object> hbaseConfigMap) {
99+
boolean hasAuthorization = AUTHENTICATION_TYPE.equalsIgnoreCase(
100+
MapUtils.getString(hbaseConfigMap, KEY_HBASE_SECURITY_AUTHORIZATION)
101+
);
102+
boolean hasAuthentication = AUTHENTICATION_TYPE.equalsIgnoreCase(
103+
MapUtils.getString(hbaseConfigMap, KEY_HBASE_SECURITY_AUTHENTICATION)
104+
);
105+
boolean hasAuthEnable = MapUtils.getBooleanValue(hbaseConfigMap, KEY_HBASE_SECURITY_AUTH_ENABLE);
106+
107+
if(hasAuthentication || hasAuthorization || hasAuthEnable) {
108+
LOG.info("Enable kerberos for hbase.");
109+
setKerberosConf(hbaseConfigMap);
110+
return true;
106111
}
107-
return AUTHENTICATION_TYPE.equalsIgnoreCase(MapUtils.getString(hbaseConfigMap, KEY_HBASE_SECURITY_AUTHENTICATION));
112+
return false;
108113
}
109114

110-
public static boolean asyncOpenKerberos(Map<String, Object> hbaseConfigMap) {
111-
if (!MapUtils.getBooleanValue(hbaseConfigMap, KEY_HBASE_SECURITY_AUTH_ENABLE)) {
112-
return false;
113-
}
114-
return AUTHENTICATION_TYPE.equalsIgnoreCase(MapUtils.getString(hbaseConfigMap, KEY_HBASE_SECURITY_AUTHENTICATION));
115+
private static void setKerberosConf(Map<String,Object> hbaseConfigMap) {
116+
hbaseConfigMap.put(KEY_HBASE_SECURITY_AUTHORIZATION, AUTHENTICATION_TYPE);
117+
hbaseConfigMap.put(KEY_HBASE_SECURITY_AUTHENTICATION, AUTHENTICATION_TYPE);
118+
hbaseConfigMap.put(KEY_HBASE_SECURITY_AUTH_ENABLE, true);
115119
}
116120

117-
118-
119-
120121
public static Configuration getHadoopConfiguration(Map<String, Object> hbaseConfigMap) {
121122
for (String key : KEYS_KERBEROS_REQUIRED) {
122123
if (StringUtils.isEmpty(MapUtils.getString(hbaseConfigMap, key))) {
@@ -157,43 +158,20 @@ public static String getKeytab(Map<String, Object> hbaseConfigMap) {
157158
throw new IllegalArgumentException("");
158159
}
159160

160-
public static void loadKrb5Conf(Map<String, Object> kerberosConfig) {
161-
String krb5FilePath = System.getProperty("user.dir") + File.separator + MapUtils.getString(kerberosConfig, KEY_JAVA_SECURITY_KRB5_CONF);
162-
if (!org.apache.commons.lang.StringUtils.isEmpty(krb5FilePath)) {
163-
System.setProperty(KEY_JAVA_SECURITY_KRB5_CONF, krb5FilePath);;
164-
}
165-
}
166-
167-
public static String creatJassFile(String configStr) throws IOException {
168-
String fileName = System.getProperty("user.dir");
169-
File krbConf = new File(fileName);
170-
File temp = File.createTempFile("JAAS", ".conf", krbConf);
171-
temp.deleteOnExit();
172-
BufferedWriter out = new BufferedWriter(new FileWriter(temp, false));
173-
out.write(configStr + "\n");
174-
out.close();
175-
return temp.getAbsolutePath();
161+
public static void loadKrb5Conf(Map<String, Object> config) {
162+
String krb5conf = MapUtils.getString(config, KEY_JAVA_SECURITY_KRB5_CONF);
163+
checkOpt(krb5conf, KEY_JAVA_SECURITY_KRB5_CONF);
164+
String krb5FilePath = System.getProperty("user.dir") + File.separator + MapUtils.getString(config, KEY_JAVA_SECURITY_KRB5_CONF);
165+
DtFileUtils.checkExists(krb5FilePath);
166+
System.setProperty(KEY_JAVA_SECURITY_KRB5_CONF, krb5FilePath);
167+
LOG.info("{} is set to {}", KEY_JAVA_SECURITY_KRB5_CONF, krb5FilePath);
176168
}
177169

178-
public static String buildJaasStr(Map<String, Object> kerberosConfig,String principal,String keyTab) {
179-
for (String key : ASYNC_KEYS_KERBEROS_REQUIRED) {
180-
if (StringUtils.isEmpty(MapUtils.getString(kerberosConfig, key))) {
181-
throw new IllegalArgumentException(String.format("Must provide [%s] when authentication is Kerberos", key));
182-
}
183-
}
184-
185-
StringBuilder jaasSB = new StringBuilder("Client {\n" +
186-
" com.sun.security.auth.module.Krb5LoginModule required\n" +
187-
" useKeyTab=true\n" +
188-
" useTicketCache=false\n");
189-
jaasSB.append(" keyTab=\"").append(keyTab).append("\"").append("\n");
190-
jaasSB.append(" principal=\"").append(principal).append("\"").append(";\n");
191-
jaasSB.append("};");
192-
return jaasSB.toString();
170+
// TODO 日后改造可以下沉到Core模块
171+
public static void checkOpt(String opt, String key) {
172+
Preconditions.checkState(!Strings.isNullOrEmpty(opt), "%s must be set!", key);
193173
}
194174

195-
196-
197175
public static UserGroupInformation loginAndReturnUGI(Configuration conf, String principal, String keytab) throws IOException {
198176
if (conf == null) {
199177
throw new IllegalArgumentException("kerberos conf can not be null");

0 commit comments

Comments
 (0)