Skip to content

Commit db6d8b0

Browse files
committed
add hbase plugin constant join support
1 parent a314faa commit db6d8b0

File tree

5 files changed

+36
-10
lines changed

5 files changed

+36
-10
lines changed

core/src/main/java/com/dtstack/flink/sql/side/BaseSideInfo.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,9 @@ private void evalEquation(SqlIdentifier left, SqlIdentifier right, String sideTa
172172
*/
173173
private void evalConstantEquation(SqlLiteral literal, SqlIdentifier identifier) {
174174
String tableName = identifier.getComponent(0).getSimple();
175+
String sideTableName = sideTableInfo.getName();
176+
String errorMsg = "only support set side table constant field, error field " + identifier;
177+
Preconditions.checkState(tableName.equals(sideTableName), errorMsg);
175178
String fieldName = identifier.getComponent(1).getSimple();
176179
Object constant = literal.getValue();
177180
List<PredicateInfo> predicateInfos = sideTableInfo.getPredicateInfoes();

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllSideInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInf
4646
throw new RuntimeException("Primary key dimension table must be filled");
4747
}
4848

49-
rowKeyBuilder.init(sideTableInfo.getPrimaryKeys().get(0));
49+
rowKeyBuilder.init(sideTableInfo.getPrimaryKeys().get(0), sideTableInfo);
5050

5151
String sideTableName = joinInfo.getSideTableName();
5252
SqlNode conditionNode = joinInfo.getCondition();

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncSideInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInf
4141
}
4242

4343
HbaseSideTableInfo hbaseSideTableInfo = (HbaseSideTableInfo) sideTableInfo;
44-
rowKeyBuilder.init(sideTableInfo.getPrimaryKeys().get(0));
44+
rowKeyBuilder.init(sideTableInfo.getPrimaryKeys().get(0), sideTableInfo);
4545

4646
colRefType = Maps.newHashMap();
4747
for(int i=0; i<hbaseSideTableInfo.getColumnRealNames().length; i++){

hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/RowKeyBuilder.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020

2121
package com.dtstack.flink.sql.side.hbase;
2222

23+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
24+
import com.dtstack.flink.sql.side.PredicateInfo;
2325
import com.dtstack.flink.sql.side.hbase.enums.EReplaceType;
26+
import com.dtstack.flink.sql.util.DtStringUtil;
2427
import com.dtstack.flink.sql.util.MD5Utils;
2528
import com.google.common.collect.Lists;
2629
import org.apache.commons.collections.CollectionUtils;
@@ -46,20 +49,43 @@ public class RowKeyBuilder implements Serializable{
4649

4750
private List<ReplaceInfo> operatorChain = Lists.newArrayList();
4851

49-
public void init(String rowKeyTempl){
52+
private AbstractSideTableInfo sideTableInfo;
53+
54+
public void init(String rowKeyTempl) {
5055
operatorChain.addAll(makeFormula(rowKeyTempl));
5156
}
5257

58+
public void init(String rowKeyTempl, AbstractSideTableInfo sideTableInfo) {
59+
this.init(rowKeyTempl);
60+
this.sideTableInfo = sideTableInfo;
61+
}
62+
5363
/**
5464
*
5565
* @param refData
5666
* @return
5767
*/
58-
public String getRowKey(Map<String, Object> refData){
68+
public String getRowKey(Map<String, Object> refData) {
69+
addConstant(refData);
5970
return buildStr(operatorChain, refData);
6071
}
6172

62-
73+
/**
74+
* add constant join fields
75+
* @param rowkeyMap
76+
*/
77+
private void addConstant(Map<String, Object> rowkeyMap) {
78+
List<PredicateInfo> predicateInfos = sideTableInfo.getPredicateInfoes();
79+
final String name = sideTableInfo.getName();
80+
for (PredicateInfo info : predicateInfos) {
81+
if (info.getOwnerTable().equals(name) &&
82+
info.getOperatorName().equals("=")) {
83+
String condition = info.getCondition();
84+
String conditionWithoutQuota = condition.replaceAll("['\"]", "");
85+
rowkeyMap.put(info.getFieldName(), conditionWithoutQuota);
86+
}
87+
}
88+
}
6389

6490
private String buildStr(List<ReplaceInfo> fieldList, Map<String, Object> refData){
6591
if(CollectionUtils.isEmpty(fieldList)){

redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818

1919
package com.dtstack.flink.sql.side.redis;
2020

21-
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
22-
import com.dtstack.flink.sql.side.BaseAsyncReqRow;
21+
import com.dtstack.flink.sql.side.*;
2322
import io.lettuce.core.KeyValue;
2423
import io.lettuce.core.api.async.RedisStringAsyncCommands;
2524
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -29,9 +28,6 @@
2928
import org.apache.flink.types.Row;
3029

3130
import com.dtstack.flink.sql.enums.ECacheContentType;
32-
import com.dtstack.flink.sql.side.CacheMissVal;
33-
import com.dtstack.flink.sql.side.FieldInfo;
34-
import com.dtstack.flink.sql.side.JoinInfo;
3531
import com.dtstack.flink.sql.side.cache.CacheObj;
3632
import com.dtstack.flink.sql.side.redis.enums.RedisType;
3733
import com.dtstack.flink.sql.side.redis.table.RedisSideReqRow;
@@ -154,6 +150,7 @@ public void accept(Map<String, String> values) {
154150
});
155151
}
156152

153+
// TODO 升级对常量JOIN的支持
157154
@Override
158155
public String buildCacheKey(Map<String, Object> refData) {
159156
StringBuilder keyBuilder = new StringBuilder(redisSideTableInfo.getTableName());

0 commit comments

Comments
 (0)