Skip to content

Commit db37a99

Browse files
committed
增加impala结果表和维表
1 parent a2bd7c9 commit db37a99

File tree

12 files changed

+350
-14
lines changed

12 files changed

+350
-14
lines changed

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626

2727
# 已支持
2828
* 源表:kafka 0.9、0.10、0.11、1.x版本
29-
* 维表:mysql, SQlServer,oracle, hbase, mongo, redis, cassandra, serversocket, kudu, postgresql, clickhouse
30-
* 结果表:mysql, SQlServer, oracle, hbase, elasticsearch5.x, mongo, redis, cassandra, console, kudu, postgresql, clickhouse
29+
* 维表:mysql, SQlServer,oracle, hbase, mongo, redis, cassandra, serversocket, kudu, postgresql, clickhouse, impala
30+
* 结果表:mysql, SQlServer, oracle, hbase, elasticsearch5.x, mongo, redis, cassandra, console, kudu, postgresql, clickhouse, impala
3131

3232
# 后续开发计划
3333
* 维表快照
@@ -179,6 +179,7 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
179179
* [kudu 结果表插件](docs/kuduSink.md)
180180
* [postgresql 结果表插件](docs/postgresqlSink.md)
181181
* [clickhouse 结果表插件](docs/clickhouseSink.md)
182+
* [impalahouse 结果表插件](docs/impalaSink.md)
182183

183184
### 2.3 维表插件
184185
* [hbase 维表插件](docs/hbaseSide.md)
@@ -189,6 +190,7 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
189190
* [kudu 维表插件](docs/kuduSide.md)
190191
* [postgresql 维表插件](docs/postgresqlSide.md)
191192
* [clickhouse 维表插件](docs/clickhouseSide.md)
193+
* [impalahouse 维表插件](docs/impalaSide.md)
192194

193195
## 3 性能指标(新增)
194196

docs/impalaSide.md

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
```
2222

2323
# 2.支持版本
24-
todo
24+
2.10.0-cdh5.13.0
2525

2626
## 3.表结构定义
2727

@@ -48,6 +48,10 @@
4848
| krb5FilePath | krb5.conf文件路径(authMech=1时独有) |authMech=1为必填||
4949
| krbServiceName | Impala服务器的Kerberos principal名称(authMech=1时独有) |authMech=1为必填||
5050
| krbRealm | Kerberos的域名(authMech=1时独有) || HADOOP.COM |
51+
| enablePartition | 是否支持分区||false|
52+
| partitionfields | 分区字段名|否,enablePartition='true'时为必填||
53+
| partitionFieldTypes | 分区字段类型 |否,enablePartition='true'时为必填||
54+
| partitionValues | 分区值|否,enablePartition='true'时为必填||
5155
| cache | 维表缓存策略(NONE/LRU/ALL)||NONE|
5256
| partitionedJoin | 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量)||false|
5357

@@ -83,4 +87,33 @@ create table sideTable(
8387
8488
```
8589

90+
## 6.分区样例
91+
92+
```
93+
create table sideTable(
94+
channel varchar,
95+
xccount int,
96+
name varchar,
97+
PRIMARY KEY(channel),
98+
PERIOD FOR SYSTEM_TIME
99+
)WITH(
100+
type='impala',
101+
url='jdbc:impala://localhost:21050/mytest',
102+
userName='dtstack',
103+
password='abc123',
104+
tableName='sidetest',
105+
authMech='3',
106+
cache ='LRU',
107+
cacheSize ='10000',
108+
cacheTTLMs ='60000',
109+
parallelism ='1',
110+
enablePartition='true',
111+
partitionfields='name',
112+
partitionFieldTypes='varchar',
113+
partitionValues='{"name":["tom","jeck"]}',
114+
partitionedJoin='false'
115+
);
116+
117+
```
118+
86119

docs/impalaSink.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ CREATE TABLE tableName(
1616
```
1717

1818
## 2.支持版本
19-
todo
19+
2.10.0-cdh5.13.0
2020

2121
## 3.表结构定义
2222

@@ -42,6 +42,8 @@ CREATE TABLE tableName(
4242
| krbHostFQDN | 主机的标准域名(authMech=1时独有) |authMech=1为必填 ||
4343
| krbServiceName | Impala服务器的Kerberos principal名称(authMech=1时独有) |authMech=1为必填||
4444
| krbRealm | Kerberos的域名(authMech=1时独有) || HADOOP.COM |
45+
| enablePartition | 是否支持分区 ||false|
46+
| partitionFields | 分区字段名|否,enablePartition='true'时为必填||
4547
| parallelism | 并行度设置||1|
4648

4749

impala/impala-side/impala-all-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public Connection getConn(String dbURL, String userName, String password) {
6666
try {
6767
Connection connection ;
6868
String url = getUrl();
69-
JDBCUtils.forName(IMPALA_DRIVER);
69+
JDBCUtils.forName(IMPALA_DRIVER, getClass().getClassLoader());
7070
connection = DriverManager.getConnection(url);
7171
connection.setAutoCommit(false);
7272
return connection;

impala/impala-side/impala-all-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAllSideInfo.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,61 @@
2121
import com.dtstack.flink.sql.side.FieldInfo;
2222
import com.dtstack.flink.sql.side.JoinInfo;
2323
import com.dtstack.flink.sql.side.SideTableInfo;
24+
import com.dtstack.flink.sql.side.impala.table.ImpalaSideTableInfo;
2425
import com.dtstack.flink.sql.side.rdb.all.RdbAllSideInfo;
26+
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
2527
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2628

2729
import java.util.List;
30+
import java.util.Map;
2831

2932
public class ImpalaAllSideInfo extends RdbAllSideInfo {
3033

3134
public ImpalaAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
3235
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
3336
}
37+
38+
@Override
39+
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
40+
ImpalaSideTableInfo impalaSideTableInfo = (ImpalaSideTableInfo) sideTableInfo;
41+
42+
boolean enablePartiton = impalaSideTableInfo.isEnablePartition();
43+
44+
String sqlTmp = "select ${selectField} from ${tableName} ";
45+
sqlCondition = sqlTmp.replace("${tableName}", impalaSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields);
46+
47+
if (enablePartiton){
48+
String whereTmp = "where ";
49+
String[] partitionfields = impalaSideTableInfo.getPartitionfields();
50+
String[] partitionFieldTypes = impalaSideTableInfo.getPartitionFieldTypes();
51+
Map<String, List> partitionVaules = impalaSideTableInfo.getPartitionValues();
52+
int fieldsSize = partitionfields.length;
53+
for (int i=0; i < fieldsSize; i++) {
54+
String fieldName = partitionfields[i];
55+
String fieldType = partitionFieldTypes[i];
56+
List values = partitionVaules.get(fieldName);
57+
String vauleAppend = getVauleAppend(fieldType, values);
58+
if (fieldsSize - 1 == i) {
59+
whereTmp = whereTmp + String.format("%s in (%s)", fieldName, vauleAppend);
60+
}else {
61+
whereTmp = whereTmp + String.format("%s in (%s) and ", fieldName, vauleAppend);
62+
}
63+
64+
}
65+
sqlCondition = sqlCondition + whereTmp;
66+
}
67+
}
68+
69+
public String getVauleAppend(String fieldType, List values) {
70+
String vauleAppend = "";
71+
for(int i=0; i < values.size(); i++) {
72+
if (fieldType.toLowerCase().equals("string") || fieldType.toLowerCase().equals("varchar")) {
73+
vauleAppend = vauleAppend + "," + "'" + values.get(i) + "'";
74+
continue;
75+
}
76+
vauleAppend = vauleAppend + "," + values.get(i).toString();
77+
}
78+
vauleAppend = vauleAppend.replaceFirst(",", "");
79+
return vauleAppend;
80+
}
3481
}

impala/impala-side/impala-async-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAsyncSideInfo.java

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,16 @@
2121
import com.dtstack.flink.sql.side.FieldInfo;
2222
import com.dtstack.flink.sql.side.JoinInfo;
2323
import com.dtstack.flink.sql.side.SideTableInfo;
24+
import com.dtstack.flink.sql.side.impala.table.ImpalaSideTableInfo;
2425
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncSideInfo;
26+
import com.dtstack.flink.sql.util.ParseUtils;
27+
import com.google.common.collect.Lists;
28+
import org.apache.calcite.sql.SqlNode;
29+
import org.apache.calcite.sql.SqlSelect;
2530
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2631

2732
import java.util.List;
33+
import java.util.Map;
2834

2935
/**
3036
* Date: 2019/11/12
@@ -34,7 +40,94 @@
3440
*/
3541

3642
public class ImpalaAsyncSideInfo extends RdbAsyncSideInfo {
43+
3744
public ImpalaAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
3845
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
3946
}
47+
48+
@Override
49+
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
50+
ImpalaSideTableInfo impalaSideTableInfo = (ImpalaSideTableInfo) sideTableInfo;
51+
52+
String sideTableName = joinInfo.getSideTableName();
53+
54+
SqlNode conditionNode = joinInfo.getCondition();
55+
56+
List<SqlNode> sqlNodeList = Lists.newArrayList();
57+
58+
List<String> sqlJoinCompareOperate= Lists.newArrayList();
59+
60+
ParseUtils.parseAnd(conditionNode, sqlNodeList);
61+
ParseUtils.parseJoinCompareOperate(conditionNode, sqlJoinCompareOperate);
62+
63+
for (SqlNode sqlNode : sqlNodeList) {
64+
dealOneEqualCon(sqlNode, sideTableName);
65+
}
66+
67+
List<String> whereConditionList = Lists.newArrayList();;
68+
Map<String, String> physicalFields = impalaSideTableInfo.getPhysicalFields();
69+
SqlNode whereNode = ((SqlSelect) joinInfo.getSelectNode()).getWhere();
70+
if (whereNode != null) {
71+
// 解析维表中的过滤条件
72+
ParseUtils.parseSideWhere(whereNode, physicalFields, whereConditionList);
73+
}
74+
75+
sqlCondition = "select ${selectField} from ${tableName} where ";
76+
for (int i = 0; i < equalFieldList.size(); i++) {
77+
String equalField = sideTableInfo.getPhysicalFields().getOrDefault(equalFieldList.get(i), equalFieldList.get(i));
78+
79+
sqlCondition += equalField + " " + sqlJoinCompareOperate.get(i) + " ? ";
80+
if (i != equalFieldList.size() - 1) {
81+
sqlCondition += " and ";
82+
}
83+
}
84+
if (0 != whereConditionList.size()) {
85+
// 如果where条件中第一个符合条件的是维表中的条件
86+
String firstCondition = whereConditionList.get(0);
87+
if (!"and".equalsIgnoreCase(firstCondition) && !"or".equalsIgnoreCase(firstCondition)) {
88+
whereConditionList.add(0, "and (");
89+
} else {
90+
whereConditionList.add(1, "(");
91+
}
92+
whereConditionList.add(whereConditionList.size(), ")");
93+
sqlCondition += String.join(" ", whereConditionList);
94+
}
95+
96+
sqlCondition = sqlCondition.replace("${tableName}", impalaSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields);
97+
98+
boolean enablePartiton = impalaSideTableInfo.isEnablePartition();
99+
if (enablePartiton){
100+
String whereTmp = " ";
101+
String[] partitionfields = impalaSideTableInfo.getPartitionfields();
102+
String[] partitionFieldTypes = impalaSideTableInfo.getPartitionFieldTypes();
103+
Map<String, List> partitionVaules = impalaSideTableInfo.getPartitionValues();
104+
int fieldsSize = partitionfields.length;
105+
for (int i=0; i < fieldsSize; i++) {
106+
String fieldName = partitionfields[i];
107+
String fieldType = partitionFieldTypes[i];
108+
List values = partitionVaules.get(fieldName);
109+
String vauleAppend = getVauleAppend(fieldType, values);
110+
whereTmp = whereTmp + String.format("and %s in (%s) ", fieldName, vauleAppend);
111+
112+
}
113+
sqlCondition = sqlCondition + whereTmp;
114+
}
115+
116+
System.out.println("--------side sql query:-------------------");
117+
System.out.println(sqlCondition);
118+
}
119+
120+
public String getVauleAppend(String fieldType, List values) {
121+
String vauleAppend = "";
122+
for(int i=0; i < values.size(); i++) {
123+
if (fieldType.toLowerCase().equals("string") || fieldType.toLowerCase().equals("varchar")) {
124+
vauleAppend = vauleAppend + "," + "'" + values.get(i) + "'";
125+
continue;
126+
}
127+
vauleAppend = vauleAppend + "," + values.get(i).toString();
128+
}
129+
vauleAppend = vauleAppend.replaceFirst(",", "");
130+
return vauleAppend;
131+
}
132+
40133
}

impala/impala-side/impala-side-core/src/main/java/com/dtstack/flink/sql/side/impala/table/ImpalaSideParser.java

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121
import com.dtstack.flink.sql.side.rdb.table.RdbSideParser;
2222
import com.dtstack.flink.sql.table.TableInfo;
2323
import com.dtstack.flink.sql.util.MathUtil;
24+
import com.fasterxml.jackson.databind.ObjectMapper;
2425

25-
import java.util.Arrays;
26-
import java.util.List;
27-
import java.util.Map;
26+
import java.io.IOException;
27+
import java.util.*;
2828

2929
/**
3030
* Reason:
@@ -53,6 +53,8 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
5353
impalaSideTableInfo.setPassword(MathUtil.getString(props.get(ImpalaSideTableInfo.PASSWORD_KEY.toLowerCase())));
5454
impalaSideTableInfo.setSchema(MathUtil.getString(props.get(ImpalaSideTableInfo.SCHEMA_KEY.toLowerCase())));
5555

56+
57+
//set authmech params
5658
Integer authMech = MathUtil.getIntegerVal(props.get(ImpalaSideTableInfo.AUTHMECH_KEY.toLowerCase()));
5759

5860
authMech = authMech == null? 0 : authMech;
@@ -77,7 +79,39 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
7779
impalaSideTableInfo.setPassword(MathUtil.getString(props.get(ImpalaSideTableInfo.PASSWORD_KEY.toLowerCase())));
7880
}
7981

82+
//set partition params
83+
String enablePartitionStr = (String) props.get(ImpalaSideTableInfo.ENABLEPARTITION_KEY.toLowerCase());
84+
boolean enablePartition = MathUtil.getBoolean(enablePartitionStr == null? "false":enablePartitionStr);
85+
impalaSideTableInfo.setEnablePartition(enablePartition);
86+
if (enablePartition) {
87+
String partitionfieldsStr = MathUtil.getString(props.get(ImpalaSideTableInfo.PARTITIONFIELDS_KEY.toLowerCase()));
88+
impalaSideTableInfo.setPartitionfields(partitionfieldsStr.split(","));
89+
String partitionfieldTypesStr = MathUtil.getString(props.get(ImpalaSideTableInfo.PARTITIONFIELDTYPES_KEY.toLowerCase()));
90+
impalaSideTableInfo.setPartitionFieldTypes(partitionfieldTypesStr.split(","));
91+
String partitionfieldValuesStr = MathUtil.getString(props.get(ImpalaSideTableInfo.PARTITIONVALUES_KEY.toLowerCase()));
92+
impalaSideTableInfo.setPartitionValues(setPartitionFieldValues(partitionfieldValuesStr));
93+
}
94+
8095
impalaSideTableInfo.check();
8196
return impalaSideTableInfo;
8297
}
98+
99+
public Map setPartitionFieldValues(String partitionfieldValuesStr){
100+
Map<String, Object> fieldValues = new HashMap();
101+
try {
102+
ObjectMapper objectMapper = new ObjectMapper();
103+
fieldValues = objectMapper.readValue(partitionfieldValuesStr, Map.class);
104+
for (String key : fieldValues.keySet()) {
105+
List value = (List)fieldValues.get(key);
106+
fieldValues.put(key, value);
107+
}
108+
return fieldValues;
109+
} catch (Exception e) {
110+
e.printStackTrace();
111+
throw new RuntimeException(e);
112+
}
113+
114+
115+
116+
}
83117
}

0 commit comments

Comments
 (0)