Skip to content

Commit e90eed1

Browse files
author
gituser
committed
Merge branch 'hotfix_1.8_3.10_27454' into 1.8_release_3.10.x
2 parents cb738ac + 9c13776 commit e90eed1

File tree

5 files changed

+43
-14
lines changed

5 files changed

+43
-14
lines changed

core/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,18 @@
108108
<groupId>org.apache.flink</groupId>
109109
<artifactId>flink-yarn_2.11</artifactId>
110110
<version>${flink.version}</version>
111+
<exclusions>
112+
<exclusion>
113+
<groupId>org.apache.flink</groupId>
114+
<artifactId>flink-shaded-hadoop2</artifactId>
115+
</exclusion>
116+
</exclusions>
117+
</dependency>
118+
119+
<dependency>
120+
<groupId>org.apache.flink</groupId>
121+
<artifactId>flink-shaded-hadoop2</artifactId>
122+
<version>2.7.5-1.8.1</version>
111123
</dependency>
112124

113125
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-statebackend-rocksdb -->

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,16 @@
2020

2121
package com.dtstack.flink.sql.side.hbase;
2222

23-
import com.dtstack.flink.sql.enums.ECacheContentType;
2423
import com.dtstack.flink.sql.side.BaseAsyncReqRow;
2524
import com.dtstack.flink.sql.side.FieldInfo;
2625
import com.dtstack.flink.sql.side.JoinInfo;
2726
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
28-
import com.dtstack.flink.sql.side.cache.CacheObj;
2927
import com.dtstack.flink.sql.side.hbase.rowkeydealer.AbstractRowKeyModeDealer;
3028
import com.dtstack.flink.sql.side.hbase.rowkeydealer.PreRowKeyModeDealerDealer;
3129
import com.dtstack.flink.sql.side.hbase.rowkeydealer.RowKeyEqualModeDealer;
3230
import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo;
3331
import com.dtstack.flink.sql.factory.DTThreadFactory;
3432
import com.dtstack.flink.sql.side.hbase.utils.HbaseConfigUtils;
35-
import com.dtstack.flink.sql.util.AuthUtil;
36-
import com.google.common.collect.Maps;
3733
import com.stumbleupon.async.Deferred;
3834
import org.apache.commons.lang3.StringUtils;
3935
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -42,15 +38,14 @@
4238
import org.apache.flink.table.runtime.types.CRow;
4339
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
4440
import org.apache.flink.types.Row;
41+
import org.apache.hadoop.security.authentication.util.KerberosName;
4542
import org.hbase.async.Config;
4643
import org.hbase.async.HBaseClient;
4744
import org.slf4j.Logger;
4845
import org.slf4j.LoggerFactory;
46+
import sun.security.krb5.KrbException;
4947

50-
import java.io.File;
51-
import java.io.IOException;
5248
import java.sql.Timestamp;
53-
import java.util.Collections;
5449
import java.util.List;
5550
import java.util.Map;
5651
import java.util.TimeZone;
@@ -118,9 +113,10 @@ public void open(Configuration parameters) throws Exception {
118113
if (HbaseConfigUtils.asyncOpenKerberos(hbaseConfig)) {
119114
String jaasStr = HbaseConfigUtils.buildJaasStr(hbaseConfig);
120115
String jaasFilePath = HbaseConfigUtils.creatJassFile(jaasStr);
116+
System.setProperty(HbaseConfigUtils.KEY_JAVA_SECURITY_AUTH_LOGIN_CONF, jaasFilePath);
121117
config.overrideConfig(HbaseConfigUtils.KEY_JAVA_SECURITY_AUTH_LOGIN_CONF, jaasFilePath);
118+
refreshConfig();
122119
}
123-
124120
hBaseClient = new HBaseClient(config, executorService);
125121

126122
try {
@@ -148,6 +144,13 @@ public void open(Configuration parameters) throws Exception {
148144
}
149145
}
150146

147+
private void refreshConfig() throws KrbException {
148+
sun.security.krb5.Config.refresh();
149+
KerberosName.resetDefaultRealm();
150+
//reload java.security.auth.login.config
151+
javax.security.auth.login.Configuration.setConfiguration(null);
152+
}
153+
151154
@Override
152155
public void handleAsyncInvoke(Map<String, Object> inputParams, CRow input, ResultFuture<CRow> resultFuture) throws Exception {
153156
rowKeyMode.asyncGetData(tableName, buildCacheKey(inputParams), input, resultFuture, sideInfo.getSideCache());

hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/utils/HbaseConfigUtils.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public class HbaseConfigUtils {
5959
private final static String KEY_HBASE_SECURITY_AUTH_ENABLE = "hbase.security.auth.enable";
6060
private final static String KEY_HBASE_SASL_CLIENTCONFIG = "hbase.sasl.clientconfig";
6161
private final static String KEY_HBASE_KERBEROS_REGIONSERVER_PRINCIPAL = "hbase.kerberos.regionserver.principal";
62-
private static final String KEY_KEY_TAB = "hbase.keytab";
62+
public static final String KEY_KEY_TAB = "hbase.keytab";
6363
private static final String KEY_PRINCIPAL = "hbase.principal";
6464

6565
public final static String KEY_HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
@@ -159,7 +159,7 @@ public static String getKeytab(Map<String, Object> hbaseConfigMap) {
159159
}
160160

161161
public static void loadKrb5Conf(Map<String, Object> kerberosConfig) {
162-
String krb5FilePath = MapUtils.getString(kerberosConfig, KEY_JAVA_SECURITY_KRB5_CONF);
162+
String krb5FilePath = System.getProperty("user.dir") + File.separator + MapUtils.getString(kerberosConfig, KEY_JAVA_SECURITY_KRB5_CONF);
163163
if (!org.apache.commons.lang.StringUtils.isEmpty(krb5FilePath)) {
164164
System.setProperty(KEY_JAVA_SECURITY_KRB5_CONF, krb5FilePath);;
165165
}
@@ -183,7 +183,7 @@ public static String buildJaasStr(Map<String, Object> kerberosConfig) {
183183
}
184184
}
185185

186-
String keyTab = MapUtils.getString(kerberosConfig, KEY_KEY_TAB);
186+
String keyTab = System.getProperty("user.dir") + File.separator + MapUtils.getString(kerberosConfig, KEY_KEY_TAB);
187187
String principal = MapUtils.getString(kerberosConfig, KEY_PRINCIPAL);
188188

189189
StringBuilder jaasSB = new StringBuilder("Client {\n" +

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseConfigUtils.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@
1919
package com.dtstack.flink.sql.sink.hbase;
2020

2121
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.security.HadoopKerberosName;
2223
import org.apache.hadoop.security.UserGroupInformation;
24+
import org.apache.hadoop.security.authentication.util.KerberosName;
2325
import org.slf4j.Logger;
2426
import org.slf4j.LoggerFactory;
27+
import sun.security.krb5.Config;
28+
import sun.security.krb5.KrbException;
2529

2630
import java.io.IOException;
2731

@@ -54,7 +58,11 @@ public class HbaseConfigUtils {
5458
public static final String KEY_JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
5559
public static final String KEY_ZOOKEEPER_SASL_CLIENT = "zookeeper.sasl.client";
5660

57-
public static UserGroupInformation loginAndReturnUGI(Configuration conf, String principal, String keytab) throws IOException {
61+
public static final String KEY_HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication";
62+
public static final String KEY_HADOOP_SECURITY_AUTH_TO_LOCAL = "hadoop.security.auth_to_local";
63+
public static final String KEY_HADOOP_SECURITY_AUTHORIZATION = "hadoop.security.authorization";
64+
65+
public static UserGroupInformation loginAndReturnUGI(Configuration conf, String principal, String keytab) throws IOException, KrbException {
5866
LOG.info("loginAndReturnUGI principal {}",principal);
5967
LOG.info("loginAndReturnUGI keytab {}",keytab);
6068
if (conf == null) {
@@ -69,7 +77,13 @@ public static UserGroupInformation loginAndReturnUGI(Configuration conf, String
6977
throw new IllegalArgumentException("keytab can not be null");
7078
}
7179

72-
conf.set("hadoop.security.authentication", "Kerberos");
80+
conf.set(KEY_HADOOP_SECURITY_AUTHENTICATION, "Kerberos");
81+
//conf.set("hadoop.security.auth_to_local", "DEFAULT");
82+
conf.set(KEY_HADOOP_SECURITY_AUTH_TO_LOCAL, "RULE:[1:$1] RULE:[2:$1]");
83+
conf.set(KEY_HADOOP_SECURITY_AUTHORIZATION, "true");
84+
85+
Config.refresh();
86+
KerberosName.resetDefaultRealm();
7387
UserGroupInformation.setConfiguration(conf);
7488

7589
return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ private void openConn(){
111111
}
112112

113113
}
114-
private void openKerberosConn() throws IOException {
114+
private void openKerberosConn() throws Exception {
115115
conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, host);
116116
conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM, zkParent);
117117

0 commit comments

Comments
 (0)