|
31 | 31 | import com.dtstack.flink.sql.factory.DTThreadFactory; |
32 | 32 | import com.dtstack.flink.sql.side.hbase.utils.HbaseConfigUtils; |
33 | 33 | import com.stumbleupon.async.Deferred; |
| 34 | +import org.apache.commons.collections.MapUtils; |
34 | 35 | import org.apache.commons.lang3.StringUtils; |
35 | 36 | import org.apache.flink.api.java.typeutils.RowTypeInfo; |
36 | 37 | import org.apache.flink.configuration.Configuration; |
37 | 38 | import org.apache.flink.types.Row; |
38 | 39 | import org.apache.flink.table.dataformat.BaseRow; |
39 | 40 | import org.apache.flink.streaming.api.functions.async.ResultFuture; |
| 41 | +import org.apache.hadoop.security.UserGroupInformation; |
40 | 42 | import org.apache.hadoop.security.authentication.util.KerberosName; |
41 | 43 | import org.hbase.async.Config; |
42 | 44 | import org.hbase.async.HBaseClient; |
43 | 45 | import org.slf4j.Logger; |
44 | 46 | import org.slf4j.LoggerFactory; |
45 | 47 | import sun.security.krb5.KrbException; |
46 | 48 |
|
| 49 | +import java.io.File; |
| 50 | +import java.security.PrivilegedExceptionAction; |
47 | 51 | import java.util.List; |
48 | 52 | import java.util.Map; |
49 | 53 | import java.util.concurrent.ExecutorService; |
@@ -105,15 +109,29 @@ public void open(Configuration parameters) throws Exception { |
105 | 109 | config.overrideConfig(entity.getKey(), (String) entity.getValue()); |
106 | 110 | }); |
107 | 111 |
|
| 112 | + String principal = null; |
| 113 | + String keyTab = null; |
108 | 114 | if (HbaseConfigUtils.asyncOpenKerberos(hbaseConfig)) { |
109 | | - String jaasStr = HbaseConfigUtils.buildJaasStr(hbaseConfig); |
| 115 | + principal = MapUtils.getString(hbaseConfig, HbaseConfigUtils.KEY_PRINCIPAL); |
| 116 | + keyTab = System.getProperty("user.dir") + File.separator + MapUtils.getString(hbaseConfig, HbaseConfigUtils.KEY_KEY_TAB); |
| 117 | + String jaasStr = HbaseConfigUtils.buildJaasStr(hbaseConfig, principal, keyTab); |
110 | 118 | String jaasFilePath = HbaseConfigUtils.creatJassFile(jaasStr); |
111 | 119 | System.setProperty(HbaseConfigUtils.KEY_JAVA_SECURITY_AUTH_LOGIN_CONF, jaasFilePath); |
112 | 120 | config.overrideConfig(HbaseConfigUtils.KEY_JAVA_SECURITY_AUTH_LOGIN_CONF, jaasFilePath); |
113 | 121 | refreshConfig(); |
114 | 122 | } |
115 | 123 |
|
116 | | - hBaseClient = new HBaseClient(config, executorService); |
| 124 | + hBaseClient = null; |
| 125 | + if (HbaseConfigUtils.asyncOpenKerberos(hbaseConfig)) { |
| 126 | + hBaseClient = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTab).doAs(new PrivilegedExceptionAction<HBaseClient>() { |
| 127 | + @Override |
| 128 | + public HBaseClient run() throws Exception { |
| 129 | + return new HBaseClient(config, executorService); |
| 130 | + } |
| 131 | + }); |
| 132 | + } else { |
| 133 | + hBaseClient = new HBaseClient(config, executorService); |
| 134 | + } |
117 | 135 |
|
118 | 136 | try { |
119 | 137 | Deferred deferred = hBaseClient.ensureTableExists(tableName) |
@@ -144,7 +162,7 @@ private void refreshConfig() throws KrbException { |
144 | 162 | sun.security.krb5.Config.refresh(); |
145 | 163 | KerberosName.resetDefaultRealm(); |
146 | 164 | //reload java.security.auth.login.config |
147 | | - javax.security.auth.login.Configuration.setConfiguration(null); |
| 165 | + // javax.security.auth.login.Configuration.setConfiguration(null); |
148 | 166 | } |
149 | 167 |
|
150 | 168 | @Override |
|
0 commit comments