Skip to content

Commit 4ed25ae

Browse files
committed
[fix-33341][hbase]hbase 全量维表kerberos开启认证
1 parent 2d3c26a commit 4ed25ae

File tree

3 files changed

+106
-45
lines changed

3 files changed

+106
-45
lines changed

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
import com.dtstack.flink.sql.side.JoinInfo;
2727
import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo;
2828
import com.dtstack.flink.sql.side.hbase.utils.HbaseConfigUtils;
29-
import com.dtstack.flink.sql.util.RowDataComplete;
3029
import com.dtstack.flink.sql.side.hbase.utils.HbaseUtils;
30+
import com.dtstack.flink.sql.util.RowDataComplete;
3131
import com.google.common.collect.Maps;
3232
import org.apache.calcite.sql.JoinType;
3333
import org.apache.commons.collections.map.HashedMap;
@@ -37,8 +37,11 @@
3737
import org.apache.flink.types.Row;
3838
import org.apache.flink.util.Collector;
3939
import org.apache.hadoop.conf.Configuration;
40+
import org.apache.hadoop.hbase.AuthUtil;
4041
import org.apache.hadoop.hbase.Cell;
4142
import org.apache.hadoop.hbase.CellUtil;
43+
import org.apache.hadoop.hbase.ChoreService;
44+
import org.apache.hadoop.hbase.ScheduledChore;
4245
import org.apache.hadoop.hbase.TableName;
4346
import org.apache.hadoop.hbase.client.Connection;
4447
import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -50,9 +53,10 @@
5053
import org.apache.hadoop.security.UserGroupInformation;
5154
import org.slf4j.Logger;
5255
import org.slf4j.LoggerFactory;
56+
import sun.security.krb5.KrbException;
5357

58+
import java.io.File;
5459
import java.io.IOException;
55-
5660
import java.security.PrivilegedAction;
5761
import java.sql.SQLException;
5862
import java.sql.Timestamp;
@@ -75,7 +79,6 @@ public class HbaseAllReqRow extends BaseAllReqRow {
7579
private Connection conn = null;
7680
private Table table = null;
7781
private ResultScanner resultScanner = null;
78-
private Configuration conf = null;
7982

8083
public HbaseAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
8184
super(new HbaseAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
@@ -181,26 +184,40 @@ private void loadData(Map<String, Map<String, Object>> tmpCache) throws SQLExcep
181184
Map<String, String> colRefType = ((HbaseAllSideInfo)sideInfo).getColRefType();
182185
HbaseSideTableInfo hbaseSideTableInfo = (HbaseSideTableInfo) sideTableInfo;
183186
boolean openKerberos = hbaseSideTableInfo.isKerberosAuthEnable();
187+
Configuration conf;
184188
int loadDataCount = 0;
185189
try {
186190
if (openKerberos) {
187191
conf = HbaseConfigUtils.getHadoopConfiguration(hbaseSideTableInfo.getHbaseConfig());
188192
conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, hbaseSideTableInfo.getHost());
189193
conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM, hbaseSideTableInfo.getParent());
194+
190195
String principal = HbaseConfigUtils.getPrincipal(hbaseSideTableInfo.getHbaseConfig());
191196
String keytab = HbaseConfigUtils.getKeytab(hbaseSideTableInfo.getHbaseConfig());
192197

193-
UserGroupInformation userGroupInformation = HbaseConfigUtils.loginAndReturnUGI(conf, principal, keytab);
198+
HbaseConfigUtils.fillSyncKerberosConfig(conf, hbaseSideTableInfo.getHbaseConfig());
199+
keytab = System.getProperty("user.dir") + File.separator + keytab;
200+
201+
LOG.info("kerberos principal:{},keytab:{}", principal, keytab);
202+
203+
conf.set(HbaseConfigUtils.KEY_HBASE_CLIENT_KEYTAB_FILE, keytab);
204+
conf.set(HbaseConfigUtils.KEY_HBASE_CLIENT_KERBEROS_PRINCIPAL, principal);
205+
206+
UserGroupInformation userGroupInformation = HbaseConfigUtils.loginAndReturnUGI2(conf, principal, keytab);
194207
Configuration finalConf = conf;
195-
conn = userGroupInformation.doAs(new PrivilegedAction<Connection>() {
196-
@Override
197-
public Connection run() {
198-
try {
199-
return ConnectionFactory.createConnection(finalConf);
200-
} catch (IOException e) {
201-
LOG.error("Get connection fail with config:{}", finalConf);
202-
throw new RuntimeException(e);
208+
conn = userGroupInformation.doAs((PrivilegedAction<Connection>) () -> {
209+
try {
210+
ScheduledChore authChore = AuthUtil.getAuthChore(finalConf);
211+
if (authChore != null) {
212+
ChoreService choreService = new ChoreService("hbaseKerberosSink");
213+
choreService.scheduleChore(authChore);
203214
}
215+
216+
return ConnectionFactory.createConnection(finalConf);
217+
218+
} catch (IOException e) {
219+
LOG.error("Get connection fail with config:{}", finalConf);
220+
throw new RuntimeException(e);
204221
}
205222
});
206223

@@ -226,7 +243,7 @@ public Connection run() {
226243
loadDataCount++;
227244
tmpCache.put(new String(r.getRow()), kv);
228245
}
229-
} catch (IOException e) {
246+
} catch (IOException | KrbException e) {
230247
throw new RuntimeException(e);
231248
} finally {
232249
LOG.info("load Data count: {}", loadDataCount);

hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideParser.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public class HbaseSideParser extends AbstractSideTableParser {
4747

4848
public static final String ZOOKEEPER_PARENT = "zookeeperParent";
4949

50+
public static final String KERBEROS_ENABLE = "hbase.security.auth.enable";
51+
5052
public static final String TABLE_NAME_KEY = "tableName";
5153

5254
public static final String PRE_ROW_KEY = "preRowKey";
@@ -69,6 +71,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
6971
hbaseTableInfo.setParent((String)props.get(ZOOKEEPER_PARENT.toLowerCase()));
7072
hbaseTableInfo.setPreRowKey(MathUtil.getBoolean(props.get(PRE_ROW_KEY.toLowerCase()), false));
7173
hbaseTableInfo.setCacheType((String) props.get(CACHE));
74+
hbaseTableInfo.setKerberosAuthEnable(MathUtil.getBoolean(props.get(KERBEROS_ENABLE), false));
7275
props.entrySet().stream()
7376
.filter(entity -> entity.getKey().contains("."))
7477
.map(entity -> hbaseTableInfo.getHbaseConfig().put(entity.getKey(), String.valueOf(entity.getValue())))

hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/utils/HbaseConfigUtils.java

Lines changed: 73 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,13 @@
2525
import org.apache.commons.lang3.StringUtils;
2626
import org.apache.hadoop.conf.Configuration;
2727
import org.apache.hadoop.hbase.HBaseConfiguration;
28+
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
2829
import org.apache.hadoop.security.UserGroupInformation;
30+
import org.apache.hadoop.security.authentication.util.KerberosName;
2931
import org.slf4j.Logger;
3032
import org.slf4j.LoggerFactory;
33+
import sun.security.krb5.Config;
34+
import sun.security.krb5.KrbException;
3135

3236
import java.io.File;
3337
import java.io.IOException;
@@ -51,38 +55,34 @@ public class HbaseConfigUtils {
5155
private final static String KEY_HBASE_SECURITY_AUTHENTICATION = "hbase.security.authentication";
5256
private final static String KEY_HBASE_SECURITY_AUTHORIZATION = "hbase.security.authorization";
5357
private final static String KEY_HBASE_MASTER_KERBEROS_PRINCIPAL = "hbase.master.kerberos.principal";
54-
private final static String KEY_HBASE_MASTER_KEYTAB_FILE = "hbase.master.keytab.file";
55-
private final static String KEY_HBASE_REGIONSERVER_KEYTAB_FILE = "hbase.regionserver.keytab.file";
5658
private final static String KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL = "hbase.regionserver.kerberos.principal";
5759

60+
public final static String KEY_HBASE_CLIENT_KEYTAB_FILE = "hbase.client.keytab.file";
61+
public final static String KEY_HBASE_CLIENT_KERBEROS_PRINCIPAL = "hbase.client.kerberos.principal";
62+
63+
public static final String KEY_HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication";
64+
public static final String KEY_HADOOP_SECURITY_AUTH_TO_LOCAL = "hadoop.security.auth_to_local";
65+
public static final String KEY_HADOOP_SECURITY_AUTHORIZATION = "hadoop.security.authorization";
66+
5867
// async side kerberos
5968
private final static String KEY_HBASE_SECURITY_AUTH_ENABLE = "hbase.security.auth.enable";
60-
private final static String KEY_HBASE_SASL_CLIENTCONFIG = "hbase.sasl.clientconfig";
6169
public final static String KEY_HBASE_KERBEROS_REGIONSERVER_PRINCIPAL = "hbase.kerberos.regionserver.principal";
6270
public static final String KEY_KEY_TAB = "hbase.keytab";
6371
public static final String KEY_PRINCIPAL = "hbase.principal";
6472

6573
public final static String KEY_HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
6674
public final static String KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM = "hbase.zookeeper.znode.parent";
6775

68-
76+
public static final String KEY_ZOOKEEPER_SASL_CLIENT = "zookeeper.sasl.client";
6977
private static final String KEY_JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
7078

7179
private static List<String> KEYS_KERBEROS_REQUIRED = Arrays.asList(
7280
KEY_HBASE_SECURITY_AUTHENTICATION,
73-
KEY_HBASE_MASTER_KERBEROS_PRINCIPAL,
74-
KEY_HBASE_MASTER_KEYTAB_FILE,
75-
KEY_HBASE_REGIONSERVER_KEYTAB_FILE,
76-
KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL
77-
);
78-
79-
private static List<String> ASYNC_KEYS_KERBEROS_REQUIRED = Arrays.asList(
80-
KEY_HBASE_SECURITY_AUTH_ENABLE,
81-
KEY_HBASE_SASL_CLIENTCONFIG,
8281
KEY_HBASE_KERBEROS_REGIONSERVER_PRINCIPAL,
83-
KEY_HBASE_SECURITY_AUTHENTICATION,
84-
KEY_KEY_TAB);
85-
82+
KEY_PRINCIPAL,
83+
KEY_KEY_TAB,
84+
KEY_JAVA_SECURITY_KRB5_CONF
85+
);
8686

8787
public static Configuration getConfig(Map<String, Object> hbaseConfigMap) {
8888
Configuration hConfiguration = HBaseConfiguration.create();
@@ -124,24 +124,11 @@ public static Configuration getHadoopConfiguration(Map<String, Object> hbaseConf
124124
throw new IllegalArgumentException(String.format("Must provide [%s] when authentication is Kerberos", key));
125125
}
126126
}
127-
loadKrb5Conf(hbaseConfigMap);
128-
129-
Configuration conf = new Configuration();
130-
if (hbaseConfigMap == null) {
131-
return conf;
132-
}
133-
134-
hbaseConfigMap.forEach((key, val) -> {
135-
if (val != null) {
136-
conf.set(key, val.toString());
137-
}
138-
});
139-
140-
return conf;
127+
return HBaseConfiguration.create();
141128
}
142129

143130
public static String getPrincipal(Map<String, Object> hbaseConfigMap) {
144-
String principal = MapUtils.getString(hbaseConfigMap, KEY_HBASE_MASTER_KERBEROS_PRINCIPAL);
131+
String principal = MapUtils.getString(hbaseConfigMap, KEY_PRINCIPAL);
145132
if (StringUtils.isNotEmpty(principal)) {
146133
return principal;
147134
}
@@ -150,14 +137,37 @@ public static String getPrincipal(Map<String, Object> hbaseConfigMap) {
150137
}
151138

152139
public static String getKeytab(Map<String, Object> hbaseConfigMap) {
153-
String keytab = MapUtils.getString(hbaseConfigMap, KEY_HBASE_MASTER_KEYTAB_FILE);
140+
String keytab = MapUtils.getString(hbaseConfigMap, KEY_KEY_TAB);
154141
if (StringUtils.isNotEmpty(keytab)) {
155142
return keytab;
156143
}
157144

158145
throw new IllegalArgumentException("");
159146
}
160147

148+
public static void fillSyncKerberosConfig(org.apache.hadoop.conf.Configuration config, Map<String, Object> hbaseConfigMap) throws IOException {
149+
if (StringUtils.isEmpty(MapUtils.getString(hbaseConfigMap, KEY_HBASE_KERBEROS_REGIONSERVER_PRINCIPAL))) {
150+
throw new IllegalArgumentException("Must provide regionserverPrincipal when authentication is Kerberos");
151+
}
152+
153+
String regionserverPrincipal = MapUtils.getString(hbaseConfigMap, KEY_HBASE_KERBEROS_REGIONSERVER_PRINCIPAL);
154+
config.set(HbaseConfigUtils.KEY_HBASE_MASTER_KERBEROS_PRINCIPAL, regionserverPrincipal);
155+
config.set(HbaseConfigUtils.KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL, regionserverPrincipal);
156+
config.set(HbaseConfigUtils.KEY_HBASE_SECURITY_AUTHORIZATION, "true");
157+
config.set(HbaseConfigUtils.KEY_HBASE_SECURITY_AUTHENTICATION, "kerberos");
158+
159+
if (!StringUtils.isEmpty(MapUtils.getString(hbaseConfigMap, KEY_ZOOKEEPER_SASL_CLIENT))) {
160+
System.setProperty(HbaseConfigUtils.KEY_ZOOKEEPER_SASL_CLIENT, MapUtils.getString(hbaseConfigMap, KEY_ZOOKEEPER_SASL_CLIENT));
161+
}
162+
163+
String securityKrb5Conf = MapUtils.getString(hbaseConfigMap, KEY_JAVA_SECURITY_KRB5_CONF);
164+
if (!StringUtils.isEmpty(securityKrb5Conf)) {
165+
String krb5ConfPath = System.getProperty("user.dir") + File.separator + securityKrb5Conf;
166+
LOG.info("krb5ConfPath:{}", krb5ConfPath);
167+
System.setProperty(HbaseConfigUtils.KEY_JAVA_SECURITY_KRB5_CONF, krb5ConfPath);
168+
}
169+
}
170+
161171
public static void loadKrb5Conf(Map<String, Object> config) {
162172
String krb5conf = MapUtils.getString(config, KEY_JAVA_SECURITY_KRB5_CONF);
163173
checkOpt(krb5conf, KEY_JAVA_SECURITY_KRB5_CONF);
@@ -190,4 +200,35 @@ public static UserGroupInformation loginAndReturnUGI(Configuration conf, String
190200

191201
return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
192202
}
203+
204+
public static UserGroupInformation loginAndReturnUGI2(Configuration conf, String principal, String keytab) throws IOException, KrbException {
205+
LOG.info("loginAndReturnUGI principal {}",principal);
206+
LOG.info("loginAndReturnUGI keytab {}",keytab);
207+
if (conf == null) {
208+
throw new IllegalArgumentException("kerberos conf can not be null");
209+
}
210+
211+
if (org.apache.commons.lang.StringUtils.isEmpty(principal)) {
212+
throw new IllegalArgumentException("principal can not be null");
213+
}
214+
215+
if (org.apache.commons.lang.StringUtils.isEmpty(keytab)) {
216+
throw new IllegalArgumentException("keytab can not be null");
217+
}
218+
219+
if (!new File(keytab).exists()){
220+
throw new IllegalArgumentIOException("keytab ["+ keytab + "] not exist");
221+
}
222+
223+
conf.set(KEY_HADOOP_SECURITY_AUTHENTICATION, "Kerberos");
224+
//conf.set("hadoop.security.auth_to_local", "DEFAULT");
225+
conf.set(KEY_HADOOP_SECURITY_AUTH_TO_LOCAL, "RULE:[1:$1] RULE:[2:$1]");
226+
conf.set(KEY_HADOOP_SECURITY_AUTHORIZATION, "true");
227+
228+
Config.refresh();
229+
KerberosName.resetDefaultRealm();
230+
UserGroupInformation.setConfiguration(conf);
231+
232+
return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
233+
}
193234
}

0 commit comments

Comments
 (0)