Skip to content

Commit 837336b

Browse files
committed
[hotfix-34625][redis5][sink]add expire attribute for redis sink's key
1 parent 4c42a02 commit 837336b

File tree

5 files changed

+48
-2
lines changed

5 files changed

+48
-2
lines changed

docs/plugin/redisSink.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ redis5.0
4444
|minIdle|最小空闲连接数|||0|
4545
|masterName| 哨兵模式下的masterName|||
4646
|primarykeys|主键字段,多个字段以逗号分割|||
47+
|keyExpiredTime|redis sink的key的过期时间。默认是0(永不过期),单位是ms。|||
4748
4849

4950
## 5.样例:

redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ public class RedisOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
7575

7676
protected int timeout;
7777

78+
protected long keyExpiredTime;
79+
7880
private JedisPool pool;
7981

8082
private JedisCommands jedis;
@@ -188,7 +190,7 @@ public void writeRecord(Tuple2 record) throws IOException {
188190
if (field != null) {
189191
value = field.toString();
190192
}
191-
jedis.set(key.toString(), value);
193+
saveKey(key.toString(), value);
192194
}
193195

194196
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0){
@@ -197,6 +199,24 @@ public void writeRecord(Tuple2 record) throws IOException {
197199
outRecords.inc();
198200
}
199201

202+
/**
203+
* 1. save key and value.
204+
* 2. set expired time for key when keyExpiredTime has been set.
205+
* @param key
206+
* @param value
207+
*/
208+
private void saveKey(String key, String value) {
209+
if (keyExpiredTime != 0L) {
210+
boolean keyExist = jedis.exists(key);
211+
if (keyExist) {
212+
jedis.del(key);
213+
}
214+
jedis.set(key, value, "NX", "PX", keyExpiredTime);
215+
} else {
216+
jedis.set(key, value);
217+
}
218+
}
219+
200220
@Override
201221
public void close() throws IOException {
202222
if (jedisSentinelPool != null) {
@@ -289,6 +309,11 @@ public RedisOutputFormatBuilder setMasterName(String masterName){
289309
return this;
290310
}
291311

312+
public RedisOutputFormatBuilder setKeyExpiredTime(long keyExpiredTime){
313+
redisOutputFormat.keyExpiredTime = keyExpiredTime;
314+
return this;
315+
}
316+
292317
public RedisOutputFormat finish(){
293318
if (redisOutputFormat.url == null){
294319
throw new IllegalArgumentException("No URL supplied.");

redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisSink.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ public class RedisSink implements RetractStreamTableSink<Row>, IStreamSinkGener<
7171

7272
protected String registerTableName;
7373

74+
protected long keyExpiredTime;
75+
7476
public RedisSink(){
7577

7678
}
@@ -92,6 +94,7 @@ public RedisSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
9294
this.parallelism = Objects.isNull(redisTableInfo.getParallelism()) ?
9395
parallelism : redisTableInfo.getParallelism();
9496
this.registerTableName = redisTableInfo.getName();
97+
this.keyExpiredTime = redisTableInfo.getKeyExpiredTime();
9598
return this;
9699
}
97100

@@ -120,7 +123,8 @@ public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<
120123
.setMaxTotal(this.maxTotal)
121124
.setMaxIdle(this.maxIdle)
122125
.setMinIdle(this.minIdle)
123-
.setMasterName(this.masterName);
126+
.setMasterName(this.masterName)
127+
.setKeyExpiredTime(this.keyExpiredTime);
124128
RedisOutputFormat redisOutputFormat = builder.finish();
125129
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(redisOutputFormat);
126130
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction)

redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/table/RedisSinkParser.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
4949
redisTableInfo.setRedisType(MathUtil.getString(props.get(RedisTableInfo.REDIS_TYPE.toLowerCase())));
5050
redisTableInfo.setMasterName(MathUtil.getString(props.get(RedisTableInfo.MASTER_NAME.toLowerCase())));
5151

52+
if (props.get(RedisTableInfo.KEY_EXPIRED_TIME.toLowerCase()) != null) {
53+
redisTableInfo.setKeyExpiredTime(Long.parseLong(MathUtil.getString(props.get(RedisTableInfo.KEY_EXPIRED_TIME.toLowerCase()))));
54+
}
55+
5256
String primaryKeysStr = MathUtil.getString(props.get(RedisTableInfo.PRIMARY_KEYS_NAME));
5357
List<String> primaryKeysList = Lists.newArrayList();
5458
if (!StringUtils.isEmpty(primaryKeysStr)) {

redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/table/RedisTableInfo.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public class RedisTableInfo extends AbstractTargetTableInfo {
4949

5050
public static final String PRIMARY_KEYS_NAME = "primarykeys";
5151

52+
public static final String KEY_EXPIRED_TIME = "keyExpiredTime";
53+
5254
public RedisTableInfo(){
5355
setType(CURR_TYPE);
5456
}
@@ -73,6 +75,8 @@ public RedisTableInfo(){
7375

7476
private String masterName;
7577

78+
private long keyExpiredTime;
79+
7680
public String getUrl() {
7781
return url;
7882
}
@@ -153,6 +157,14 @@ public void setMasterName(String masterName) {
153157
this.masterName = masterName;
154158
}
155159

160+
public long getKeyExpiredTime() {
161+
return keyExpiredTime;
162+
}
163+
164+
public void setKeyExpiredTime(long keyExpiredTime) {
165+
this.keyExpiredTime = keyExpiredTime;
166+
}
167+
156168
@Override
157169
public boolean check() {
158170
Preconditions.checkNotNull(url, "redis field of URL is required");

0 commit comments

Comments
 (0)