|
20 | 20 |
|
21 | 21 | package com.dtstack.flink.sql.side.hbase; |
22 | 22 |
|
23 | | -import com.dtstack.flink.sql.enums.ECacheContentType; |
24 | 23 | import com.dtstack.flink.sql.side.BaseAsyncReqRow; |
25 | 24 | import com.dtstack.flink.sql.side.FieldInfo; |
26 | 25 | import com.dtstack.flink.sql.side.JoinInfo; |
27 | 26 | import com.dtstack.flink.sql.side.AbstractSideTableInfo; |
28 | | -import com.dtstack.flink.sql.side.cache.CacheObj; |
29 | 27 | import com.dtstack.flink.sql.side.hbase.rowkeydealer.AbstractRowKeyModeDealer; |
30 | 28 | import com.dtstack.flink.sql.side.hbase.rowkeydealer.PreRowKeyModeDealerDealer; |
31 | 29 | import com.dtstack.flink.sql.side.hbase.rowkeydealer.RowKeyEqualModeDealer; |
32 | 30 | import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo; |
33 | 31 | import com.dtstack.flink.sql.factory.DTThreadFactory; |
34 | 32 | import com.dtstack.flink.sql.side.hbase.utils.HbaseConfigUtils; |
35 | | -import com.dtstack.flink.sql.util.AuthUtil; |
36 | | -import com.google.common.collect.Maps; |
37 | 33 | import com.stumbleupon.async.Deferred; |
38 | 34 | import org.apache.commons.lang3.StringUtils; |
39 | 35 | import org.apache.flink.api.java.typeutils.RowTypeInfo; |
|
42 | 38 | import org.apache.flink.table.runtime.types.CRow; |
43 | 39 | import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; |
44 | 40 | import org.apache.flink.types.Row; |
| 41 | +import org.apache.hadoop.security.authentication.util.KerberosName; |
45 | 42 | import org.hbase.async.Config; |
46 | 43 | import org.hbase.async.HBaseClient; |
47 | 44 | import org.slf4j.Logger; |
48 | 45 | import org.slf4j.LoggerFactory; |
| 46 | +import sun.security.krb5.KrbException; |
49 | 47 |
|
50 | | -import java.io.File; |
51 | | -import java.io.IOException; |
52 | 48 | import java.sql.Timestamp; |
53 | | -import java.util.Collections; |
54 | 49 | import java.util.List; |
55 | 50 | import java.util.Map; |
56 | 51 | import java.util.TimeZone; |
@@ -118,9 +113,10 @@ public void open(Configuration parameters) throws Exception { |
118 | 113 | if (HbaseConfigUtils.asyncOpenKerberos(hbaseConfig)) { |
119 | 114 | String jaasStr = HbaseConfigUtils.buildJaasStr(hbaseConfig); |
120 | 115 | String jaasFilePath = HbaseConfigUtils.creatJassFile(jaasStr); |
| 116 | + System.setProperty(HbaseConfigUtils.KEY_JAVA_SECURITY_AUTH_LOGIN_CONF, jaasFilePath); |
121 | 117 | config.overrideConfig(HbaseConfigUtils.KEY_JAVA_SECURITY_AUTH_LOGIN_CONF, jaasFilePath); |
| 118 | + refreshConfig(); |
122 | 119 | } |
123 | | - |
124 | 120 | hBaseClient = new HBaseClient(config, executorService); |
125 | 121 |
|
126 | 122 | try { |
@@ -148,6 +144,13 @@ public void open(Configuration parameters) throws Exception { |
148 | 144 | } |
149 | 145 | } |
150 | 146 |
|
| 147 | + private void refreshConfig() throws KrbException { |
| 148 | + sun.security.krb5.Config.refresh(); |
| 149 | + KerberosName.resetDefaultRealm(); |
| 150 | + //reload java.security.auth.login.config |
| 151 | + javax.security.auth.login.Configuration.setConfiguration(null); |
| 152 | + } |
| 153 | + |
151 | 154 | @Override |
152 | 155 | public void handleAsyncInvoke(Map<String, Object> inputParams, CRow input, ResultFuture<CRow> resultFuture) throws Exception { |
153 | 156 | rowKeyMode.asyncGetData(tableName, buildCacheKey(inputParams), input, resultFuture, sideInfo.getSideCache()); |
|
0 commit comments