1818
1919package com .dtstack .flink .sql .side .impala ;
2020
21- import com .dtstack .flink .sql .factory .DTThreadFactory ;
2221import com .dtstack .flink .sql .side .FieldInfo ;
2322import com .dtstack .flink .sql .side .JoinInfo ;
2423import com .dtstack .flink .sql .side .AbstractSideTableInfo ;
2524import com .dtstack .flink .sql .side .impala .table .ImpalaSideTableInfo ;
2625import com .dtstack .flink .sql .side .rdb .async .RdbAsyncReqRow ;
26+ import com .dtstack .flink .sql .util .KrbUtils ;
2727import io .vertx .core .Vertx ;
2828import io .vertx .core .VertxOptions ;
2929import io .vertx .core .json .JsonObject ;
3030import io .vertx .ext .jdbc .JDBCClient ;
31+ import io .vertx .ext .sql .SQLClient ;
3132import org .apache .flink .api .java .typeutils .RowTypeInfo ;
3233import org .apache .flink .configuration .Configuration ;
34+ import org .apache .flink .streaming .api .functions .async .ResultFuture ;
35+ import org .apache .flink .table .dataformat .BaseRow ;
36+ import org .apache .flink .types .Row ;
3337import org .apache .hadoop .security .UserGroupInformation ;
3438import org .slf4j .Logger ;
3539import org .slf4j .LoggerFactory ;
3640
37- import java .io . IOException ;
41+ import java .security . PrivilegedAction ;
3842import java .util .List ;
39- import java .util .concurrent .LinkedBlockingQueue ;
40- import java .util .concurrent .ThreadPoolExecutor ;
41- import java .util .concurrent .TimeUnit ;
43+ import java .util .Map ;
44+ import java .util .concurrent .CountDownLatch ;
45+
46+ import java .util .concurrent .atomic .AtomicBoolean ;
47+ import java .util .concurrent .atomic .AtomicLong ;
4248
4349/**
4450 * Date: 2019/11/12
@@ -53,27 +59,40 @@ public class ImpalaAsyncReqRow extends RdbAsyncReqRow {
5359
5460 private final static String IMPALA_DRIVER = "com.cloudera.impala.jdbc41.Driver" ;
5561
62+ protected UserGroupInformation ugi = null ;
5663
5764 public ImpalaAsyncReqRow (RowTypeInfo rowTypeInfo , JoinInfo joinInfo , List <FieldInfo > outFieldInfoList , AbstractSideTableInfo sideTableInfo ) {
5865 super (new ImpalaAsyncSideInfo (rowTypeInfo , joinInfo , outFieldInfoList , sideTableInfo ));
5966 }
6067
6168 @ Override
6269 public void open (Configuration parameters ) throws Exception {
63- super .open (parameters );
6470 ImpalaSideTableInfo impalaSideTableInfo = (ImpalaSideTableInfo ) sideInfo .getSideTableInfo ();
71+ if (impalaSideTableInfo .getAuthMech () == 1 ) {
72+ String keyTabFilePath = impalaSideTableInfo .getKeyTabFilePath ();
73+ String krb5FilePath = impalaSideTableInfo .getKrb5FilePath ();
74+ String principal = impalaSideTableInfo .getPrincipal ();
75+ ugi = KrbUtils .getUgi (principal , keyTabFilePath , krb5FilePath );
76+ openJdbc (parameters );
77+ } else {
78+ openJdbc (parameters );
79+ }
80+ }
6581
82+ public void openJdbc (Configuration parameters ) throws Exception {
83+ super .open (parameters );
84+ ImpalaSideTableInfo impalaSideTableInfo = (ImpalaSideTableInfo ) sideInfo .getSideTableInfo ();
6685 JsonObject impalaClientConfig = new JsonObject ();
6786 impalaClientConfig .put ("url" , getUrl ())
68- .put ("driver_class" , IMPALA_DRIVER )
69- .put ("max_pool_size" , impalaSideTableInfo .getAsyncPoolSize ())
70- .put ("provider_class" , DT_PROVIDER_CLASS )
71- .put ("idle_connection_test_period" , 300 )
72- .put ("test_connection_on_checkin" , DEFAULT_TEST_CONNECTION_ON_CHECKIN )
73- .put ("max_idle_time" , 600 )
74- .put ("preferred_test_query" , PREFERRED_TEST_QUERY_SQL )
75- .put ("idle_connection_test_period" , DEFAULT_IDLE_CONNECTION_TEST_PEROID )
76- .put ("test_connection_on_checkin" , DEFAULT_TEST_CONNECTION_ON_CHECKIN );
87+ .put ("driver_class" , IMPALA_DRIVER )
88+ .put ("max_pool_size" , impalaSideTableInfo .getAsyncPoolSize ())
89+ .put ("provider_class" , DT_PROVIDER_CLASS )
90+ .put ("idle_connection_test_period" , 300 )
91+ .put ("test_connection_on_checkin" , DEFAULT_TEST_CONNECTION_ON_CHECKIN )
92+ .put ("max_idle_time" , 600 )
93+ .put ("preferred_test_query" , PREFERRED_TEST_QUERY_SQL )
94+ .put ("idle_connection_test_period" , DEFAULT_IDLE_CONNECTION_TEST_PEROID )
95+ .put ("test_connection_on_checkin" , DEFAULT_TEST_CONNECTION_ON_CHECKIN );
7796
7897 System .setProperty ("vertx.disableFileCPResolving" , "true" );
7998
@@ -85,7 +104,6 @@ public void open(Configuration parameters) throws Exception {
85104 setRdbSqlClient (JDBCClient .createNonShared (vertx , impalaClientConfig ));
86105 }
87106
88-
89107 public String getUrl () {
90108 ImpalaSideTableInfo impalaSideTableInfo = (ImpalaSideTableInfo ) sideInfo .getSideTableInfo ();
91109
@@ -95,11 +113,7 @@ public String getUrl() {
95113 StringBuffer urlBuffer = new StringBuffer (impalaSideTableInfo .getUrl ());
96114 if (authMech == 0 ) {
97115 newUrl = urlBuffer .toString ();
98-
99116 } else if (authMech == 1 ) {
100- String keyTabFilePath = impalaSideTableInfo .getKeyTabFilePath ();
101- String krb5FilePath = impalaSideTableInfo .getKrb5FilePath ();
102- String principal = impalaSideTableInfo .getPrincipal ();
103117 String krbRealm = impalaSideTableInfo .getKrbRealm ();
104118 String krbHostFQDN = impalaSideTableInfo .getKrbHostFQDN ();
105119 String krbServiceName = impalaSideTableInfo .getKrbServiceName ();
@@ -110,16 +124,6 @@ public String getUrl() {
110124 .concat ("KrbServiceName=" ).concat (krbServiceName ).concat (";" )
111125 );
112126 newUrl = urlBuffer .toString ();
113- System .setProperty ("java.security.krb5.conf" , krb5FilePath );
114- org .apache .hadoop .conf .Configuration configuration = new org .apache .hadoop .conf .Configuration ();
115- configuration .set ("hadoop.security.authentication" , "Kerberos" );
116- UserGroupInformation .setConfiguration (configuration );
117- try {
118- UserGroupInformation .loginUserFromKeytab (principal , keyTabFilePath );
119- } catch (IOException e ) {
120- throw new RuntimeException ("kerberos login fail! e: " + e );
121- }
122-
123127 } else if (authMech == 2 ) {
124128 String uName = impalaSideTableInfo .getUserName ();
125129 urlBuffer .append (";"
@@ -129,7 +133,6 @@ public String getUrl() {
129133 .concat ("UseSasl=0" )
130134 );
131135 newUrl = urlBuffer .toString ();
132-
133136 } else if (authMech == 3 ) {
134137 String uName = impalaSideTableInfo .getUserName ();
135138 String pwd = impalaSideTableInfo .getPassword ();
@@ -139,11 +142,41 @@ public String getUrl() {
139142 .concat ("PWD=" ).concat (pwd )
140143 );
141144 newUrl = urlBuffer .toString ();
142-
143145 } else {
144146 throw new IllegalArgumentException ("The value of authMech is illegal, Please select 0, 1, 2, 3" );
145147 }
146-
147148 return newUrl ;
148149 }
150+
151+ @ Override
152+ protected void asyncQueryData (Map <String , Object > inputParams ,
153+ Row input ,
154+ ResultFuture <BaseRow > resultFuture ,
155+ SQLClient rdbSqlClient ,
156+ AtomicLong failCounter ,
157+ AtomicBoolean finishFlag ,
158+ CountDownLatch latch ) {
159+ if (ugi == null ) {
160+ doAsyncQueryData (inputParams ,
161+ input , resultFuture ,
162+ rdbSqlClient ,
163+ failCounter ,
164+ finishFlag ,
165+ latch );
166+ } else {
167+ // Kerberos
168+ ugi .doAs (new PrivilegedAction <Object >() {
169+ @ Override
170+ public Object run () {
171+ doAsyncQueryData (inputParams ,
172+ input , resultFuture ,
173+ rdbSqlClient ,
174+ failCounter ,
175+ finishFlag ,
176+ latch );
177+ return null ;
178+ }
179+ });
180+ }
181+ }
149182}
0 commit comments