Skip to content

Commit 3613a79

Browse files
committed
add redis constant join
1 parent db6d8b0 commit 3613a79

File tree

5 files changed

+47
-47
lines changed

5 files changed

+47
-47
lines changed

core/src/main/java/com/dtstack/flink/sql/util/TableUtils.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919

2020
package com.dtstack.flink.sql.util;
2121

22+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2223
import com.dtstack.flink.sql.side.FieldInfo;
2324
import com.dtstack.flink.sql.side.JoinInfo;
25+
import com.dtstack.flink.sql.side.PredicateInfo;
2426
import com.google.common.base.Preconditions;
2527
import com.google.common.base.Strings;
2628
import com.google.common.collect.HashBasedTable;
@@ -710,4 +712,20 @@ public static String buildTableNameWithScope(String tableName, String scope){
710712
return tableName + "_" + scope;
711713
}
712714

715+
/**
716+
* add constant join fields, using in such as hbase、redis etc kv database
717+
* @param keyMap
718+
*/
719+
public static void addConstant(Map<String, Object> keyMap, AbstractSideTableInfo sideTableInfo) {
720+
List<PredicateInfo> predicateInfos = sideTableInfo.getPredicateInfoes();
721+
final String name = sideTableInfo.getName();
722+
for (PredicateInfo info : predicateInfos) {
723+
if (info.getOwnerTable().equals(name) &&
724+
info.getOperatorName().equals("=")) {
725+
String condition = info.getCondition();
726+
String conditionWithoutQuota = condition.replaceAll("['\"]", "");
727+
keyMap.put(info.getFieldName(), conditionWithoutQuota);
728+
}
729+
}
730+
}
713731
}

hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/RowKeyBuilder.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.dtstack.flink.sql.side.hbase.enums.EReplaceType;
2626
import com.dtstack.flink.sql.util.DtStringUtil;
2727
import com.dtstack.flink.sql.util.MD5Utils;
28+
import com.dtstack.flink.sql.util.TableUtils;
2829
import com.google.common.collect.Lists;
2930
import org.apache.commons.collections.CollectionUtils;
3031

@@ -66,27 +67,10 @@ public void init(String rowKeyTempl, AbstractSideTableInfo sideTableInfo) {
6667
* @return
6768
*/
6869
public String getRowKey(Map<String, Object> refData) {
69-
addConstant(refData);
70+
TableUtils.addConstant(refData, sideTableInfo);
7071
return buildStr(operatorChain, refData);
7172
}
7273

73-
/**
74-
* add constant join fields
75-
* @param rowkeyMap
76-
*/
77-
private void addConstant(Map<String, Object> rowkeyMap) {
78-
List<PredicateInfo> predicateInfos = sideTableInfo.getPredicateInfoes();
79-
final String name = sideTableInfo.getName();
80-
for (PredicateInfo info : predicateInfos) {
81-
if (info.getOwnerTable().equals(name) &&
82-
info.getOperatorName().equals("=")) {
83-
String condition = info.getCondition();
84-
String conditionWithoutQuota = condition.replaceAll("['\"]", "");
85-
rowkeyMap.put(info.getFieldName(), conditionWithoutQuota);
86-
}
87-
}
88-
}
89-
9074
private String buildStr(List<ReplaceInfo> fieldList, Map<String, Object> refData){
9175
if(CollectionUtils.isEmpty(fieldList)){
9276
return "";

redis5/redis5-side/redis-all-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAllReqRow.java

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public class RedisAllReqRow extends BaseAllReqRow {
7979

8080
public RedisAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
8181
super(new RedisAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
82-
this.redisSideReqRow = new RedisSideReqRow(super.sideInfo);
82+
this.redisSideReqRow = new RedisSideReqRow(super.sideInfo, (RedisSideTableInfo) sideTableInfo);
8383
}
8484

8585
@Override
@@ -111,7 +111,7 @@ protected void reloadCache() {
111111

112112
@Override
113113
public void flatMap(CRow input, Collector<CRow> out) throws Exception {
114-
Map<String, String> inputParams = Maps.newHashMap();
114+
Map<String, Object> inputParams = Maps.newHashMap();
115115
for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) {
116116
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
117117
Object equalObj = input.row().getField(conValIndex);
@@ -122,9 +122,9 @@ public void flatMap(CRow input, Collector<CRow> out) throws Exception {
122122
}
123123
return;
124124
}
125-
inputParams.put(sideInfo.getEqualFieldList().get(i), equalObj.toString());
125+
inputParams.put(sideInfo.getEqualFieldList().get(i), equalObj);
126126
}
127-
String key = buildCacheKey(inputParams);
127+
String key = redisSideReqRow.buildCacheKey(inputParams);
128128
if(StringUtils.isBlank(key)){
129129
return;
130130
}
@@ -143,19 +143,6 @@ public void flatMap(CRow input, Collector<CRow> out) throws Exception {
143143
out.collect(new CRow(newRow, input.change()));
144144
}
145145

146-
private String buildCacheKey(Map<String, String> refData) {
147-
StringBuilder keyBuilder = new StringBuilder(tableInfo.getTableName());
148-
List<String> primaryKeys = tableInfo.getPrimaryKeys();
149-
for(String primaryKey : primaryKeys){
150-
if(!refData.containsKey(primaryKey)){
151-
return null;
152-
}
153-
keyBuilder.append("_").append(refData.get(primaryKey));
154-
}
155-
return keyBuilder.toString();
156-
}
157-
158-
159146
private void loadData(Map<String, Map<String, String>> tmpCache) throws SQLException {
160147
JedisCommands jedis = null;
161148
try {

redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public class RedisAsyncReqRow extends BaseAsyncReqRow {
7070

7171
public RedisAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
7272
super(new RedisAsyncSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
73-
redisSideReqRow = new RedisSideReqRow(super.sideInfo);
73+
redisSideReqRow = new RedisSideReqRow(super.sideInfo, (RedisSideTableInfo) sideTableInfo);
7474
}
7575

7676
@Override
@@ -126,6 +126,7 @@ public Row fillData(Row input, Object sideInput) {
126126

127127
@Override
128128
public void handleAsyncInvoke(Map<String, Object> inputParams, CRow input, ResultFuture<CRow> resultFuture) throws Exception {
129+
129130
String key = buildCacheKey(inputParams);
130131
if(StringUtils.isBlank(key)){
131132
return;
@@ -137,7 +138,7 @@ public void accept(Map<String, String> values) {
137138
if (MapUtils.isNotEmpty(values)) {
138139
try {
139140
Row row = fillData(input.row(), values);
140-
dealCacheData(key,CacheObj.buildCacheObj(ECacheContentType.MultiLine, values));
141+
dealCacheData(key,CacheObj.buildCacheObj(ECacheContentType.SingleLine, values));
141142
resultFuture.complete(Collections.singleton(new CRow(row, input.change())));
142143
} catch (Exception e) {
143144
dealFillDataError(input, resultFuture, e);
@@ -153,15 +154,7 @@ public void accept(Map<String, String> values) {
153154
// TODO 升级对常量JOIN的支持
154155
@Override
155156
public String buildCacheKey(Map<String, Object> refData) {
156-
StringBuilder keyBuilder = new StringBuilder(redisSideTableInfo.getTableName());
157-
List<String> primaryKeys = redisSideTableInfo.getPrimaryKeys();
158-
for(String primaryKey : primaryKeys){
159-
if(!refData.containsKey(primaryKey)){
160-
return null;
161-
}
162-
keyBuilder.append("_").append(refData.get(primaryKey));
163-
}
164-
return keyBuilder.toString();
157+
return redisSideReqRow.buildCacheKey(refData);
165158
}
166159

167160
@Override

redis5/redis5-side/redis-side-core/src/main/java/com/dtstack/flink/sql/side/redis/table/RedisSideReqRow.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,18 @@
1818

1919
package com.dtstack.flink.sql.side.redis.table;
2020

21+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2122
import com.dtstack.flink.sql.side.ISideReqRow;
2223
import com.dtstack.flink.sql.side.BaseSideInfo;
24+
import com.dtstack.flink.sql.util.TableUtils;
2325
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
2426
import org.apache.flink.types.Row;
2527

2628
import java.io.Serializable;
2729
import java.math.BigDecimal;
2830
import java.sql.Date;
2931
import java.sql.Timestamp;
32+
import java.util.List;
3033
import java.util.Map;
3134
import java.util.TimeZone;
3235

@@ -45,8 +48,10 @@ public class RedisSideReqRow implements ISideReqRow, Serializable {
4548

4649
private BaseSideInfo sideInfo;
4750

48-
public RedisSideReqRow(BaseSideInfo sideInfo){
51+
private RedisSideTableInfo sideTableInfo;
52+
public RedisSideReqRow(BaseSideInfo sideInfo, RedisSideTableInfo sideTableInfo) {
4953
this.sideInfo = sideInfo;
54+
this.sideTableInfo = sideTableInfo;
5055
}
5156

5257
@Override
@@ -76,6 +81,19 @@ public Row fillData(Row input, Object sideInput) {
7681
return row;
7782
}
7883

84+
public String buildCacheKey(Map<String, Object> refData) {
85+
TableUtils.addConstant(refData, sideTableInfo);
86+
StringBuilder keyBuilder = new StringBuilder(sideTableInfo.getTableName());
87+
List<String> primaryKeys = sideTableInfo.getPrimaryKeys();
88+
for(String primaryKey : primaryKeys){
89+
if(!refData.containsKey(primaryKey)){
90+
return null;
91+
}
92+
keyBuilder.append("_").append(refData.get(primaryKey));
93+
}
94+
return keyBuilder.toString();
95+
}
96+
7997
public void setRowField(Row row, Integer index, BaseSideInfo sideInfo, String value) {
8098
Integer keyIndex = sideInfo.getSideFieldIndex().get(index);
8199
String classType = sideInfo.getSideTableInfo().getFieldClassList().get(keyIndex).getName();

0 commit comments

Comments
 (0)