Skip to content

Commit da409ef

Browse files
author
dapeng
committed
重构redis维表cache=all的结构
1 parent af338b7 commit da409ef

File tree

4 files changed

+113
-94
lines changed

4 files changed

+113
-94
lines changed

docs/redisSide.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
CREATE TABLE tableName(
55
colName cloType,
66
...
7+
PRIMARY KEY(colName1,colName2) ,
78
PERIOD FOR SYSTEM_TIME
89
)WITH(
910
type ='redis',
@@ -27,9 +28,10 @@
2728
| tableName | 注册到flink的表名称(可选填;不填默认和redis对应的表名称相同)|
2829
| colName | 列名称,维表列名格式 表名:主键名:主键值:列名]|
2930
| colType | 列类型,当前只支持varchar|
31+
| PRIMARY KEY |主键,多个字段做为联合主键时以逗号分隔
3032
| PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息|
3133

32-
## 3.参数
34+
## 4.参数
3335

3436
|参数名称|含义|是否必填|默认值|
3537
|----|---|---|----|
@@ -51,7 +53,7 @@
5153
* cacheTTLMs:缓存的过期时间(ms)
5254
* ALL: 缓存全量表数据
5355

54-
## 4.样例
56+
## 5.样例
5557
```
5658
create table sideTable(
5759
channel varchar,
@@ -70,5 +72,10 @@ create table sideTable(
7072
);
7173
7274
```
75+
## 6.缓存redis的存储结构规则
76+
```
77+
redis使用散列类型 hash 数据结构,key=tableName_primaryKey1_primaryKey2,value={column1=value1, column2=value2}
78+
如果以班级class表为例,id和name作为联合主键,那么redis的结构为 <class_1_john ,{id=1, name=john, age=12}>
79+
```
7380

7481

redis5/redis5-side/redis-all-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAllReqRow.java

Lines changed: 64 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,14 @@
1919
package com.dtstack.flink.sql.side.redis;
2020

2121
import com.dtstack.flink.sql.side.*;
22+
import com.dtstack.flink.sql.side.redis.enums.RedisType;
2223
import com.dtstack.flink.sql.side.redis.table.RedisSideReqRow;
2324
import com.dtstack.flink.sql.side.redis.table.RedisSideTableInfo;
2425
import com.esotericsoftware.minlog.Log;
2526
import org.apache.calcite.sql.JoinType;
27+
import org.apache.commons.collections.CollectionUtils;
28+
import org.apache.commons.collections.MapUtils;
29+
import org.apache.commons.lang.StringUtils;
2630
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
2731
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2832
import com.google.common.collect.Maps;
@@ -103,104 +107,51 @@ public void flatMap(Row row, Collector<Row> out) throws Exception {
103107
String columnName = sideInfo.getEqualFieldList().get(conValIndex);
104108
inputParams.put(columnName, equalObj.toString());
105109
}
106-
String key = buildKey(inputParams);
110+
String key = buildCacheKey(inputParams);
111+
if(StringUtils.isBlank(key)){
112+
return;
113+
}
107114

108115
Map<String, String> cacheMap = cacheRef.get().get(key);
109-
110-
if (cacheMap == null){
111-
if(sideInfo.getJoinType() == JoinType.LEFT){
112-
Row data = fillData(row, null);
113-
out.collect(data);
114-
}else{
116+
if(MapUtils.isEmpty(cacheMap)){
117+
if(sideInfo.getJoinType() != JoinType.LEFT){
115118
return;
116119
}
117-
120+
Row data = fillData(row, null);
121+
out.collect(data);
118122
return;
119-
}
120123

124+
}
121125
Row newRow = fillData(row, cacheMap);
122126
out.collect(newRow);
123127
}
124128

125-
private String buildKey(Map<String, String> inputParams) {
126-
String tableName = tableInfo.getTableName();
127-
StringBuilder key = new StringBuilder();
128-
for (int i=0; i<inputParams.size(); i++){
129-
key.append(tableName).append(":").append(inputParams.keySet().toArray()[i]).append(":")
130-
.append(inputParams.get(inputParams.keySet().toArray()[i]));
129+
private String buildCacheKey(Map<String, String> refData) {
130+
StringBuilder keyBuilder = new StringBuilder(tableInfo.getTableName());
131+
List<String> primaryKeys = tableInfo.getPrimaryKeys();
132+
for(String primaryKey : primaryKeys){
133+
if(!refData.containsKey(primaryKey)){
134+
return null;
135+
}
136+
keyBuilder.append("_").append(refData.get(primaryKey));
131137
}
132-
return key.toString();
138+
return keyBuilder.toString();
133139
}
134140

141+
135142
private void loadData(Map<String, Map<String, String>> tmpCache) throws SQLException {
136143
JedisCommands jedis = null;
137-
138144
try {
139-
for(int i=0; i<CONN_RETRY_NUM; i++){
140-
141-
try{
142-
jedis = getJedis(tableInfo);
143-
break;
144-
}catch (Exception e){
145-
if(i == CONN_RETRY_NUM - 1){
146-
throw new RuntimeException("", e);
147-
}
148-
149-
try {
150-
String jedisInfo = "url:" + tableInfo.getUrl() + ",pwd:" + tableInfo.getPassword() + ",database:" + tableInfo.getDatabase();
151-
LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + jedisInfo);
152-
Thread.sleep(5 * 1000);
153-
} catch (InterruptedException e1) {
154-
LOG.error("", e1);
155-
}
156-
}
157-
}
158-
159-
if (tableInfo.getRedisType() != 3){
160-
String perKey = tableInfo.getTableName() + "*";
161-
Set<String> keys = ((Jedis) jedis).keys(perKey);
162-
List<String> newPerKeys = new LinkedList<>();
163-
for (String key : keys){
164-
String[] splitKey = key.split(":");
165-
String newKey = splitKey[0] + ":" + splitKey[1] + ":" + splitKey[2];
166-
newPerKeys.add(newKey);
167-
}
168-
List<String> list = newPerKeys.stream().distinct().collect(Collectors.toList());
169-
for(String key : list){
170-
Map<String, String> kv = Maps.newHashMap();
171-
String[] primaryKv = key.split(":");
172-
kv.put(primaryKv[1], primaryKv[2]);
173-
String pattern = key + "*";
174-
Set<String> realKeys = ((Jedis) jedis).keys(pattern);
175-
for (String realKey : realKeys){
176-
kv.put(realKey.split(":")[3], jedis.get(realKey));
177-
}
178-
tmpCache.put(key, kv);
179-
}
180-
} else {
181-
String perKey = tableInfo.getTableName() + "*";
182-
Set<String> keys = keys((JedisCluster) jedis, perKey);
183-
List<String> newPerKeys = new LinkedList<>();
184-
for (String key : keys){
185-
String[] splitKey = key.split(":");
186-
String newKey = splitKey[0] + ":" + splitKey[1] + ":" + splitKey[2];
187-
newPerKeys.add(newKey);
188-
}
189-
List<String> list = newPerKeys.stream().distinct().collect(Collectors.toList());
190-
for(String key : list){
191-
Map<String, String> kv = Maps.newHashMap();
192-
String[] primaryKv = key.split(":");
193-
kv.put(primaryKv[1], primaryKv[2]);
194-
String pattern = key + "*";
195-
Set<String> realKeys = keys((JedisCluster) jedis, pattern);
196-
for (String realKey : realKeys){
197-
kv.put(realKey.split(":")[3], jedis.get(realKey));
198-
}
199-
tmpCache.put(key, kv);
200-
}
145+
String keyPattern = tableInfo.getTableName() + "*";
146+
jedis = getJedisWithRetry(CONN_RETRY_NUM);
147+
Set<String> keys = getRedisKeys(RedisType.parse(tableInfo.getRedisType()), jedis, keyPattern);
148+
if(CollectionUtils.isEmpty(keys)){
149+
return;
201150
}
202-
203-
151+
JedisCommands redis = jedis;
152+
keys.forEach(k ->{
153+
tmpCache.put(k, redis.hgetAll(k));
154+
});
204155
} catch (Exception e){
205156
LOG.error("", e);
206157
} finally {
@@ -245,33 +196,58 @@ private JedisCommands getJedis(RedisSideTableInfo tableInfo) {
245196
}
246197
JedisCommands jedis = null;
247198
GenericObjectPoolConfig poolConfig = setPoolConfig(tableInfo.getMaxTotal(), tableInfo.getMaxIdle(), tableInfo.getMinIdle());
248-
switch (tableInfo.getRedisType()){
199+
switch (RedisType.parse(tableInfo.getRedisType())){
249200
//单机
250-
case 1:
201+
case STANDALONE:
251202
pool = new JedisPool(poolConfig, firstIp, Integer.parseInt(firstPort), timeout, password, Integer.parseInt(database));
252203
jedis = pool.getResource();
253204
break;
254205
//哨兵
255-
case 2:
206+
case SENTINEL:
256207
jedisSentinelPool = new JedisSentinelPool(tableInfo.getMasterName(), ipPorts, poolConfig, timeout, password, Integer.parseInt(database));
257208
jedis = jedisSentinelPool.getResource();
258209
break;
259210
//集群
260-
case 3:
211+
case CLUSTER:
261212
jedis = new JedisCluster(addresses, timeout, timeout,1, poolConfig);
213+
default:
214+
break;
262215
}
263216

264217
return jedis;
265218
}
266219

267-
private Set<String> keys(JedisCluster jedisCluster, String pattern){
220+
private JedisCommands getJedisWithRetry(int retryNum) {
221+
while (retryNum-- > 0){
222+
try {
223+
return getJedis(tableInfo);
224+
} catch (Exception e) {
225+
if(retryNum <= 0){
226+
throw new RuntimeException("getJedisWithRetry error", e);
227+
}
228+
try {
229+
String jedisInfo = "url:" + tableInfo.getUrl() + ",pwd:" + tableInfo.getPassword() + ",database:" + tableInfo.getDatabase();
230+
LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + jedisInfo);
231+
Thread.sleep(5 * 1000);
232+
} catch (InterruptedException e1) {
233+
LOG.error("", e1);
234+
}
235+
}
236+
}
237+
return null;
238+
}
239+
240+
private Set<String> getRedisKeys(RedisType redisType, JedisCommands jedis, String keyPattern){
241+
if(!redisType.equals(RedisType.CLUSTER)){
242+
return ((Jedis) jedis).keys(keyPattern);
243+
}
268244
Set<String> keys = new TreeSet<>();
269-
Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
245+
Map<String, JedisPool> clusterNodes = ((JedisCluster)jedis).getClusterNodes();
270246
for(String k : clusterNodes.keySet()){
271247
JedisPool jp = clusterNodes.get(k);
272248
Jedis connection = jp.getResource();
273249
try {
274-
keys.addAll(connection.keys(pattern));
250+
keys.addAll(connection.keys(keyPattern));
275251
} catch (Exception e){
276252
LOG.error("Getting keys error: {}", e);
277253
} finally {

redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.dtstack.flink.sql.enums.ECacheContentType;
2222
import com.dtstack.flink.sql.side.*;
2323
import com.dtstack.flink.sql.side.cache.CacheObj;
24+
import com.dtstack.flink.sql.side.redis.enums.RedisType;
2425
import com.dtstack.flink.sql.side.redis.table.RedisSideReqRow;
2526
import com.dtstack.flink.sql.side.redis.table.RedisSideTableInfo;
2627
import io.lettuce.core.RedisClient;
@@ -86,28 +87,30 @@ private void buildRedisClient(RedisSideTableInfo tableInfo){
8687
if (database == null){
8788
database = "0";
8889
}
89-
switch (tableInfo.getRedisType()){
90-
case 1:
90+
switch (RedisType.parse(tableInfo.getRedisType())){
91+
case STANDALONE:
9192
StringBuilder redisUri = new StringBuilder();
9293
redisUri.append("redis://").append(password).append(url).append("/").append(database);
9394
redisClient = RedisClient.create(redisUri.toString());
9495
connection = redisClient.connect();
9596
async = connection.async();
9697
break;
97-
case 2:
98+
case SENTINEL:
9899
StringBuilder sentinelUri = new StringBuilder();
99100
sentinelUri.append("redis-sentinel://").append(password)
100101
.append(url).append("/").append(database).append("#").append(redisSideTableInfo.getMasterName());
101102
redisClient = RedisClient.create(sentinelUri.toString());
102103
connection = redisClient.connect();
103104
async = connection.async();
104105
break;
105-
case 3:
106+
case CLUSTER:
106107
StringBuilder clusterUri = new StringBuilder();
107108
clusterUri.append("redis://").append(password).append(url);
108109
clusterClient = RedisClusterClient.create(clusterUri.toString());
109110
clusterConnection = clusterClient.connect();
110111
async = clusterConnection.async();
112+
default:
113+
break;
111114
}
112115
}
113116

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package com.dtstack.flink.sql.side.redis.enums;
2+
3+
public enum RedisType {
4+
/**
5+
* 单机
6+
*/
7+
STANDALONE(1),
8+
/**
9+
* 哨兵
10+
*/
11+
SENTINEL(2),
12+
/**
13+
* 集群
14+
*/
15+
CLUSTER(3);
16+
int type;
17+
RedisType(int type){
18+
this.type = type;
19+
}
20+
21+
public int getType(){
22+
return type;
23+
}
24+
25+
public static RedisType parse(int redisType){
26+
for(RedisType type : RedisType.values()){
27+
if(type.getType() == redisType){
28+
return type;
29+
}
30+
}
31+
throw new RuntimeException("unsupport redis type["+ redisType + "]");
32+
}
33+
}

0 commit comments

Comments
 (0)