Skip to content

Commit aa1b250

Browse files
author
dapeng
committed
hbase 解析PRIMARY KEY优化
1 parent a83e544 commit aa1b250

File tree

3 files changed

+43
-32
lines changed

3 files changed

+43
-32
lines changed

hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/ReplaceInfo.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@
2020

2121
package com.dtstack.flink.sql.side.hbase;
2222

23-
import com.dtstack.flink.sql.side.hbase.enums.EReplaceType;
24-
2523
import java.io.Serializable;
24+
import java.util.List;
25+
26+
import com.dtstack.flink.sql.side.hbase.enums.EReplaceType;
2627

2728
/**
2829
* Reason:
@@ -39,6 +40,8 @@ public class ReplaceInfo implements Serializable {
3940
private EReplaceType type;
4041

4142
private String param;
43+
44+
private List<ReplaceInfo> subReplaceInfos;
4245

4346
public ReplaceInfo(EReplaceType type){
4447
this.type = type;
@@ -59,4 +62,12 @@ public String getParam() {
5962
public void setParam(String param) {
6063
this.param = param;
6164
}
65+
66+
public List<ReplaceInfo> getSubReplaceInfos() {
67+
return subReplaceInfos;
68+
}
69+
70+
public void setSubReplaceInfos(List<ReplaceInfo> subReplaceInfos) {
71+
this.subReplaceInfos = subReplaceInfos;
72+
}
6273
}

hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/RowKeyBuilder.java

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
package com.dtstack.flink.sql.side.hbase;
2222

2323
import com.dtstack.flink.sql.side.hbase.enums.EReplaceType;
24+
import com.dtstack.flink.sql.util.MD5Utils;
2425
import com.google.common.collect.Lists;
26+
import org.apache.commons.collections.CollectionUtils;
2527

2628
import java.io.Serializable;
2729
import java.util.List;
@@ -42,31 +44,10 @@ public class RowKeyBuilder implements Serializable{
4244

4345
private static Pattern Md5Operator = Pattern.compile("(?i)^md5\\(\\s*(.*)\\s*\\)$");
4446

45-
private List<List<ReplaceInfo>> operatorChain = Lists.newArrayList();
47+
private List<ReplaceInfo> operatorChain = Lists.newArrayList();
4648

4749
public void init(String rowKeyTempl){
48-
49-
String[] strArr = splitIgnoreQuotaBrackets(rowKeyTempl, "\\+");
50-
51-
for(String infoAlias : strArr){
52-
infoAlias = infoAlias.trim();
53-
Matcher matcher = Md5Operator.matcher(infoAlias);
54-
String fieldCols = null;
55-
if(matcher.find()){
56-
fieldCols = matcher.group(1);
57-
}else{
58-
fieldCols = infoAlias;
59-
}
60-
61-
String[] fieldArr = fieldCols.split("\\+");
62-
List<ReplaceInfo> fieldList = Lists.newArrayList();
63-
for(String oneField : fieldArr){
64-
ReplaceInfo replaceInfo = getReplaceInfo(oneField);
65-
fieldList.add(replaceInfo);
66-
}
67-
68-
operatorChain.add(fieldList);
69-
}
50+
operatorChain.addAll(makeFormula(rowKeyTempl));
7051
}
7152

7253
/**
@@ -75,18 +56,15 @@ public void init(String rowKeyTempl){
7556
* @return
7657
*/
7758
public String getRowKey(Map<String, Object> refData){
78-
79-
StringBuilder sb = new StringBuilder("");
80-
for(List<ReplaceInfo> fieldList : operatorChain){
81-
sb.append(buildStr(fieldList, refData));
82-
}
83-
84-
return sb.toString();
59+
return buildStr(operatorChain, refData);
8560
}
8661

8762

8863

8964
private String buildStr(List<ReplaceInfo> fieldList, Map<String, Object> refData){
65+
if(CollectionUtils.isEmpty(fieldList)){
66+
return "";
67+
}
9068
StringBuffer sb = new StringBuffer("");
9169
for(ReplaceInfo replaceInfo : fieldList){
9270

@@ -95,6 +73,10 @@ private String buildStr(List<ReplaceInfo> fieldList, Map<String, Object> refData
9573
continue;
9674
}
9775

76+
if(replaceInfo.getType() == EReplaceType.FUNC){
77+
sb.append(MD5Utils.getMD5String(buildStr(replaceInfo.getSubReplaceInfos(), refData)));
78+
continue;
79+
}
9880
String replaceName = replaceInfo.getParam();
9981
if(!refData.containsKey(replaceName)){
10082
throw new RuntimeException(String.format("build rowKey with field %s which value not found.", replaceName));
@@ -137,4 +119,21 @@ public ReplaceInfo getReplaceInfo(String field){
137119
return replaceInfo;
138120
}
139121

122+
private List<ReplaceInfo> makeFormula(String formula){
123+
if(formula == null || formula.length() <= 0){
124+
Lists.newArrayList();
125+
}
126+
List<ReplaceInfo> result = Lists.newArrayList();
127+
for(String meta: splitIgnoreQuotaBrackets(formula, "\\+")){
128+
Matcher matcher = Md5Operator.matcher(meta.trim());
129+
if(matcher.find()){
130+
ReplaceInfo replaceInfo = new ReplaceInfo(EReplaceType.FUNC);
131+
replaceInfo.setSubReplaceInfos(makeFormula(matcher.group(1)));
132+
result.add(replaceInfo);
133+
} else {
134+
result.add(getReplaceInfo(meta));
135+
}
136+
}
137+
return result;
138+
}
140139
}

hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/enums/EReplaceType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,6 @@
2929
*/
3030
public enum EReplaceType {
3131
PARAM,
32+
FUNC,
3233
CONSTANT;
3334
}

0 commit comments

Comments
 (0)