Skip to content

Commit a2bd7c9

Browse files
修竹修竹
authored andcommitted
impala维表增加权限认证,增加维表和结果表文档说明
1 parent b10e48b commit a2bd7c9

File tree

12 files changed

+839
-29
lines changed

12 files changed

+839
-29
lines changed

docs/impalaSide.md

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
2+
## 1.格式:
3+
```
4+
CREATE TABLE tableName(
5+
colName cloType,
6+
...
7+
PRIMARY KEY(keyInfo),
8+
PERIOD FOR SYSTEM_TIME
9+
)WITH(
10+
type='impala',
11+
url='jdbcUrl',
12+
userName='dbUserName',
13+
password='dbPwd',
14+
tableName='tableName',
15+
cache ='LRU',
16+
cacheSize ='10000',
17+
cacheTTLMs ='60000',
18+
parallelism ='1',
19+
partitionedJoin='false'
20+
);
21+
```
22+
23+
# 2.支持版本
24+
todo
25+
26+
## 3.表结构定义
27+
28+
|参数名称|含义|
29+
|----|---|
30+
| tableName | 注册到flink的表名称|
31+
| colName | 列名称|
32+
| colType | 列类型 [colType支持的类型](colType.md)|
33+
| PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息|
34+
| PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开|
35+
36+
## 4.参数
37+
38+
|参数名称|含义|是否必填|默认值|
39+
|----|---|---|----|
40+
| type | 表明维表的类型[impala] |||
41+
| url | 连接postgresql数据库 jdbcUrl |||
42+
| userName | postgresql连接用户名 |||
43+
| password | postgresql连接密码|||
44+
| tableName | postgresql表名称|||
45+
| authMech | 身份验证机制 (0, 1, 2, 3) ||0|
46+
| principal | kerberos用于登录的principal(authMech=1时独有) |authMech=1为必填|
47+
| keyTabFilePath | keytab文件的路径(authMech=1时独有) |authMech=1为必填 ||
48+
| krb5FilePath | krb5.conf文件路径(authMech=1时独有) |authMech=1为必填||
49+
| krbServiceName | Impala服务器的Kerberos principal名称(authMech=1时独有) |authMech=1为必填||
50+
| krbRealm | Kerberos的域名(authMech=1时独有) || HADOOP.COM |
51+
| cache | 维表缓存策略(NONE/LRU/ALL)||NONE|
52+
| partitionedJoin | 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量)||false|
53+
54+
----------
55+
> 缓存策略
56+
* NONE: 不做内存缓存
57+
* LRU:
58+
* cacheSize: 缓存的条目数量
59+
* cacheTTLMs:缓存的过期时间(ms)
60+
61+
62+
## 5.样例
63+
```
64+
create table sideTable(
65+
channel varchar,
66+
xccount int,
67+
PRIMARY KEY(channel),
68+
PERIOD FOR SYSTEM_TIME
69+
)WITH(
70+
type='impala',
71+
url='jdbc:impala://localhost:21050/mytest',
72+
userName='dtstack',
73+
password='abc123',
74+
tableName='sidetest',
75+
authMech='3',
76+
cache ='LRU',
77+
cacheSize ='10000',
78+
cacheTTLMs ='60000',
79+
parallelism ='1',
80+
partitionedJoin='false'
81+
);
82+
83+
84+
```
85+
86+

docs/impalaSink.md

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
## 1.格式:
2+
```
3+
CREATE TABLE tableName(
4+
colName colType,
5+
...
6+
colNameX colType
7+
)WITH(
8+
type ='impala',
9+
url ='jdbcUrl',
10+
userName ='userName',
11+
password ='pwd',
12+
tableName ='tableName',
13+
parallelism ='parllNum'
14+
);
15+
16+
```
17+
18+
## 2.支持版本
19+
todo
20+
21+
## 3.表结构定义
22+
23+
|参数名称|含义|
24+
|----|---|
25+
| tableName| 在 sql 中使用的名称;即注册到flink-table-env上的名称|
26+
| colName | 列名称|
27+
| colType | 列类型 [colType支持的类型](colType.md)|
28+
29+
## 4.参数:
30+
31+
|参数名称|含义|是否必填|默认值|
32+
|----|----|----|----|
33+
| type |表明 输出表类型[impala]|||
34+
| url | 连接postgresql数据库 jdbcUrl |||
35+
| userName | postgresql连接用户名 |||
36+
| password | postgresql连接密码|||
37+
| tableName | postgresqll表名称|||
38+
| authMech | 身份验证机制 (0, 1, 2, 3) ||0|
39+
| principal | kerberos用于登录的principal(authMech=1时独有) |authMech=1为必填|
40+
| keyTabFilePath | keytab文件的路径(authMech=1时独有) |authMech=1为必填 ||
41+
| krb5FilePath | krb5.conf文件路径(authMech=1时独有) |authMech=1为必填||
42+
| krbHostFQDN | 主机的标准域名(authMech=1时独有) |authMech=1为必填 ||
43+
| krbServiceName | Impala服务器的Kerberos principal名称(authMech=1时独有) |authMech=1为必填||
44+
| krbRealm | Kerberos的域名(authMech=1时独有) || HADOOP.COM |
45+
| parallelism | 并行度设置||1|
46+
47+
48+
## 5.样例:
49+
```
50+
CREATE TABLE MyResult(
51+
channel VARCHAR,
52+
pv VARCHAR
53+
)WITH(
54+
type ='impala',
55+
url ='jdbc:impala://localhost:21050/mytest',
56+
userName ='dtstack',
57+
password ='abc123',
58+
authMech = '3',
59+
tableName ='pv2',
60+
parallelism ='1'
61+
)
62+
```

docs/postgresqlSide.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
|参数名称|含义|
2929
|----|---|
30-
| tableName | 注册到flink的表名称(可选填;不填默认和hbase对应的表名称相同)|
30+
| tableName | 注册到flink的表名称|
3131
| colName | 列名称|
3232
| colType | 列类型 [colType支持的类型](colType.md)|
3333
| PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息|
@@ -42,7 +42,6 @@
4242
| userName | postgresql连接用户名 |||
4343
| password | postgresql连接密码|||
4444
| tableName | postgresql表名称|||
45-
| tableName | postgresql 的表名称|||
4645
| cache | 维表缓存策略(NONE/LRU/ALL)||NONE|
4746
| partitionedJoin | 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量)||false|
4847

impala/impala-side/impala-all-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAllReqRow.java

Lines changed: 70 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,18 @@
2121
import com.dtstack.flink.sql.side.FieldInfo;
2222
import com.dtstack.flink.sql.side.JoinInfo;
2323
import com.dtstack.flink.sql.side.SideTableInfo;
24+
import com.dtstack.flink.sql.side.impala.table.ImpalaSideTableInfo;
2425
import com.dtstack.flink.sql.side.rdb.all.RdbAllReqRow;
2526
import com.dtstack.flink.sql.util.DtStringUtil;
27+
import com.dtstack.flink.sql.util.JDBCUtils;
2628
import com.google.common.collect.Maps;
2729
import org.apache.flink.api.java.typeutils.RowTypeInfo;
30+
import org.apache.hadoop.conf.Configuration;
31+
import org.apache.hadoop.security.UserGroupInformation;
2832
import org.slf4j.Logger;
2933
import org.slf4j.LoggerFactory;
3034

35+
import java.io.IOException;
3136
import java.sql.Connection;
3237
import java.sql.DriverManager;
3338
import java.util.List;
@@ -49,22 +54,81 @@ public class ImpalaAllReqRow extends RdbAllReqRow {
4954

5055
private static final String IMPALA_DRIVER = "com.cloudera.impala.jdbc41.Driver";
5156

57+
private ImpalaSideTableInfo impalaSideTableInfo;
58+
5259
public ImpalaAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
5360
super(new ImpalaAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
61+
this.impalaSideTableInfo = (ImpalaSideTableInfo) sideTableInfo;
5462
}
5563

5664
@Override
5765
public Connection getConn(String dbURL, String userName, String password) {
5866
try {
59-
Class.forName(IMPALA_DRIVER);
60-
//add param useCursorFetch=true
61-
Map<String, String> addParams = Maps.newHashMap();
62-
addParams.put("useCursorFetch", "true");
63-
//String targetDbUrl = DtStringUtil.addJdbcParam(dbURL, addParams, true);
64-
return DriverManager.getConnection(dbURL, userName, password);
67+
Connection connection ;
68+
String url = getUrl();
69+
JDBCUtils.forName(IMPALA_DRIVER);
70+
connection = DriverManager.getConnection(url);
71+
connection.setAutoCommit(false);
72+
return connection;
6573
} catch (Exception e) {
6674
LOG.error("", e);
6775
throw new RuntimeException("", e);
6876
}
6977
}
78+
79+
public String getUrl() throws IOException {
80+
81+
String newUrl = "";
82+
Integer authMech = impalaSideTableInfo.getAuthMech();
83+
84+
StringBuffer urlBuffer = new StringBuffer(impalaSideTableInfo.getUrl());
85+
if (authMech == 0) {
86+
newUrl = urlBuffer.toString();
87+
88+
} else if (authMech == 1) {
89+
String keyTabFilePath = impalaSideTableInfo.getKeyTabFilePath();
90+
String krb5FilePath = impalaSideTableInfo.getKrb5FilePath();
91+
String principal = impalaSideTableInfo.getPrincipal();
92+
String krbRealm = impalaSideTableInfo.getKrbRealm();
93+
String krbHostFQDN = impalaSideTableInfo.getKrbHostFQDN();
94+
String krbServiceName = impalaSideTableInfo.getKrbServiceName();
95+
urlBuffer.append(";"
96+
.concat("AuthMech=1;")
97+
.concat("KrbRealm=").concat(krbRealm).concat(";")
98+
.concat("KrbHostFQDN=").concat(krbHostFQDN).concat(";")
99+
.concat("KrbServiceName=").concat(krbServiceName).concat(";")
100+
);
101+
newUrl = urlBuffer.toString();
102+
System.setProperty("java.security.krb5.conf", krb5FilePath);
103+
Configuration configuration = new Configuration();
104+
configuration.set("hadoop.security.authentication" , "Kerberos");
105+
UserGroupInformation.setConfiguration(configuration);
106+
UserGroupInformation.loginUserFromKeytab(principal, keyTabFilePath);
107+
108+
} else if (authMech == 2) {
109+
String uName = impalaSideTableInfo.getUserName();
110+
urlBuffer.append(";"
111+
.concat("AuthMech=3;")
112+
.concat("UID=").concat(uName).concat(";")
113+
.concat("PWD=;")
114+
.concat("UseSasl=0")
115+
);
116+
newUrl = urlBuffer.toString();
117+
118+
} else if (authMech == 3) {
119+
String uName = impalaSideTableInfo.getUserName();
120+
String pwd = impalaSideTableInfo.getPassword();
121+
urlBuffer.append(";"
122+
.concat("AuthMech=3;")
123+
.concat("UID=").concat(uName).concat(";")
124+
.concat("PWD=").concat(pwd)
125+
);
126+
newUrl = urlBuffer.toString();
127+
128+
} else {
129+
throw new IllegalArgumentException("The value of authMech is illegal, Please select 0, 1, 2, 3");
130+
}
131+
132+
return newUrl;
133+
}
70134
}

impala/impala-side/impala-async-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAsyncReqRow.java

Lines changed: 78 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.dtstack.flink.sql.side.JoinInfo;
2323
import com.dtstack.flink.sql.side.SideInfo;
2424
import com.dtstack.flink.sql.side.SideTableInfo;
25+
import com.dtstack.flink.sql.side.impala.table.ImpalaSideTableInfo;
2526
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncReqRow;
2627
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
2728
import io.vertx.core.Vertx;
@@ -30,9 +31,11 @@
3031
import io.vertx.ext.jdbc.JDBCClient;
3132
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3233
import org.apache.flink.configuration.Configuration;
34+
import org.apache.hadoop.security.UserGroupInformation;
3335
import org.slf4j.Logger;
3436
import org.slf4j.LoggerFactory;
3537

38+
import java.io.IOException;
3639
import java.util.List;
3740

3841
/**
@@ -56,17 +59,7 @@ public ImpalaAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldI
5659
@Override
5760
public void open(Configuration parameters) throws Exception {
5861
super.open(parameters);
59-
JsonObject impalaClientConfig = new JsonObject();
60-
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
61-
impalaClientConfig.put("url", rdbSideTableInfo.getUrl())
62-
.put("driver_class", IMPALA_DRIVER)
63-
.put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
64-
.put("user", rdbSideTableInfo.getUserName())
65-
.put("password", rdbSideTableInfo.getPassword())
66-
.put("provider_class", DT_PROVIDER_CLASS)
67-
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
68-
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
69-
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
62+
JsonObject impalaClientConfig = getClientConfig();
7063

7164
System.setProperty("vertx.disableFileCPResolving", "true");
7265

@@ -77,4 +70,78 @@ public void open(Configuration parameters) throws Exception {
7770
Vertx vertx = Vertx.vertx(vo);
7871
setRdbSQLClient(JDBCClient.createNonShared(vertx, impalaClientConfig));
7972
}
73+
74+
public JsonObject getClientConfig() {
75+
JsonObject impalaClientConfig = new JsonObject();
76+
impalaClientConfig.put("url", getUrl())
77+
.put("driver_class", IMPALA_DRIVER)
78+
.put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
79+
.put("provider_class", DT_PROVIDER_CLASS)
80+
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
81+
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
82+
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
83+
84+
return impalaClientConfig;
85+
}
86+
87+
public String getUrl() {
88+
ImpalaSideTableInfo impalaSideTableInfo = (ImpalaSideTableInfo) sideInfo.getSideTableInfo();
89+
90+
String newUrl = "";
91+
Integer authMech = impalaSideTableInfo.getAuthMech();
92+
93+
StringBuffer urlBuffer = new StringBuffer(impalaSideTableInfo.getUrl());
94+
if (authMech == 0) {
95+
newUrl = urlBuffer.toString();
96+
97+
} else if (authMech == 1) {
98+
String keyTabFilePath = impalaSideTableInfo.getKeyTabFilePath();
99+
String krb5FilePath = impalaSideTableInfo.getKrb5FilePath();
100+
String principal = impalaSideTableInfo.getPrincipal();
101+
String krbRealm = impalaSideTableInfo.getKrbRealm();
102+
String krbHostFQDN = impalaSideTableInfo.getKrbHostFQDN();
103+
String krbServiceName = impalaSideTableInfo.getKrbServiceName();
104+
urlBuffer.append(";"
105+
.concat("AuthMech=1;")
106+
.concat("KrbRealm=").concat(krbRealm).concat(";")
107+
.concat("KrbHostFQDN=").concat(krbHostFQDN).concat(";")
108+
.concat("KrbServiceName=").concat(krbServiceName).concat(";")
109+
);
110+
newUrl = urlBuffer.toString();
111+
System.setProperty("java.security.krb5.conf", krb5FilePath);
112+
org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
113+
configuration.set("hadoop.security.authentication" , "Kerberos");
114+
UserGroupInformation.setConfiguration(configuration);
115+
try {
116+
UserGroupInformation.loginUserFromKeytab(principal, keyTabFilePath);
117+
} catch (IOException e) {
118+
throw new RuntimeException("kerberos login fail! e: " + e);
119+
}
120+
121+
} else if (authMech == 2) {
122+
String uName = impalaSideTableInfo.getUserName();
123+
urlBuffer.append(";"
124+
.concat("AuthMech=3;")
125+
.concat("UID=").concat(uName).concat(";")
126+
.concat("PWD=;")
127+
.concat("UseSasl=0")
128+
);
129+
newUrl = urlBuffer.toString();
130+
131+
} else if (authMech == 3) {
132+
String uName = impalaSideTableInfo.getUserName();
133+
String pwd = impalaSideTableInfo.getPassword();
134+
urlBuffer.append(";"
135+
.concat("AuthMech=3;")
136+
.concat("UID=").concat(uName).concat(";")
137+
.concat("PWD=").concat(pwd)
138+
);
139+
newUrl = urlBuffer.toString();
140+
141+
} else {
142+
throw new IllegalArgumentException("The value of authMech is illegal, Please select 0, 1, 2, 3");
143+
}
144+
145+
return newUrl;
146+
}
80147
}

0 commit comments

Comments
 (0)