Skip to content

Commit 354f43f

Browse files
author
gituser
committed
Merge branch 'hotfix_1.10_4.0.x_34265' into 1.10_release_4.0.x
2 parents 95cb682 + 65123ba commit 354f43f

File tree

5 files changed

+61
-3
lines changed

5 files changed

+61
-3
lines changed

docs/plugin/redisSink.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ redis5.0
4444
|minIdle|最小空闲连接数|||0|
4545
|masterName| 哨兵模式下的masterName|||
4646
|primarykeys|主键字段,多个字段以逗号分割|||
47+
|keyExpiredTime|redis sink的key的过期时间。默认是0(永不过期),单位是s。|||
4748
4849

4950
## 5.样例:

redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,19 +47,35 @@
4747
*/
4848
public class RedisOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
4949
private static final Logger LOG = LoggerFactory.getLogger(RedisOutputFormat.class);
50+
5051
protected String[] fieldNames;
52+
5153
protected TypeInformation<?>[] fieldTypes;
54+
5255
protected List<String> primaryKeys;
56+
5357
protected int timeout = 10000;
58+
5459
private String url;
60+
5561
private String database = "0";
62+
5663
private String tableName;
64+
5765
private String password;
66+
5867
private int redisType;
68+
5969
private String maxTotal;
70+
6071
private String maxIdle;
72+
6173
private String minIdle;
74+
6275
private String masterName;
76+
77+
protected int keyExpiredTime;
78+
6379
private JedisPool pool;
6480

6581
private JedisCommands jedis;
@@ -146,15 +162,31 @@ public void writeRecord(Tuple2 record) throws IOException {
146162
for (int i = 0; i < fieldNames.length; i++) {
147163
refData.put(fieldNames[i], row.getField(i));
148164
}
149-
String redisKey = buildCacheKey(refData);
150-
refData.forEach((key, value) -> jedis.hset(redisKey, key, String.valueOf(value)));
151-
165+
save(refData);
152166
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) {
153167
LOG.info(record.toString());
154168
}
155169
outRecords.inc();
156170
}
157171

172+
173+
/**
174+
* 1. build key from map.
175+
* 2. save key and value.
176+
* 3. set expired time for key when keyExpiredTime has been set.
177+
* @param refData
178+
*/
179+
private synchronized void save(Map<String, Object> refData) {
180+
String key = buildCacheKey(refData);
181+
try {
182+
refData.forEach((field, value) -> jedis.hset(key, field, String.valueOf(value)));
183+
} finally {
184+
if (keyExpiredTime != 0) {
185+
jedis.expire(key, keyExpiredTime);
186+
}
187+
}
188+
}
189+
158190
@Override
159191
public void close() throws IOException {
160192
if (jedisSentinelPool != null) {
@@ -254,6 +286,11 @@ public RedisOutputFormatBuilder setMasterName(String masterName) {
254286
return this;
255287
}
256288

289+
public RedisOutputFormatBuilder setKeyExpiredTime(int keyExpiredTime){
290+
redisOutputFormat.keyExpiredTime = keyExpiredTime;
291+
return this;
292+
}
293+
257294
public RedisOutputFormat finish() {
258295
if (redisOutputFormat.url == null) {
259296
throw new IllegalArgumentException("No URL supplied.");

redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisSink.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ public class RedisSink implements RetractStreamTableSink<Row>, IStreamSinkGener<
7171

7272
protected String registerTableName;
7373

74+
protected int keyExpiredTime;
75+
7476
public RedisSink() {
7577

7678
}
@@ -92,6 +94,7 @@ public RedisSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
9294
this.parallelism = Objects.isNull(redisTableInfo.getParallelism()) ?
9395
parallelism : redisTableInfo.getParallelism();
9496
this.registerTableName = redisTableInfo.getName();
97+
this.keyExpiredTime = redisTableInfo.getKeyExpiredTime();
9598
return this;
9699
}
97100

@@ -121,6 +124,7 @@ public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<
121124
.setMaxIdle(this.maxIdle)
122125
.setMinIdle(this.minIdle)
123126
.setMasterName(this.masterName)
127+
.setKeyExpiredTime(this.keyExpiredTime)
124128
.finish();
125129
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(redisOutputFormat);
126130
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction)

redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/table/RedisSinkParser.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
4949
redisTableInfo.setRedisType(MathUtil.getString(props.get(RedisTableInfo.REDIS_TYPE.toLowerCase())));
5050
redisTableInfo.setMasterName(MathUtil.getString(props.get(RedisTableInfo.MASTER_NAME.toLowerCase())));
5151

52+
if (props.get(RedisTableInfo.KEY_EXPIRED_TIME.toLowerCase()) != null) {
53+
redisTableInfo.setKeyExpiredTime(Integer.parseInt(MathUtil.getString(props.get(RedisTableInfo.KEY_EXPIRED_TIME.toLowerCase()))));
54+
}
55+
5256
String primaryKeysStr = MathUtil.getString(props.get(RedisTableInfo.PRIMARY_KEYS_NAME));
5357
List<String> primaryKeysList = Lists.newArrayList();
5458
if (!StringUtils.isEmpty(primaryKeysStr)) {

redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/table/RedisTableInfo.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public class RedisTableInfo extends AbstractTargetTableInfo {
4949

5050
public static final String PRIMARY_KEYS_NAME = "primarykeys";
5151

52+
public static final String KEY_EXPIRED_TIME = "keyExpiredTime";
53+
5254
public RedisTableInfo(){
5355
setType(CURR_TYPE);
5456
}
@@ -73,6 +75,8 @@ public RedisTableInfo(){
7375

7476
private String masterName;
7577

78+
private int keyExpiredTime;
79+
7680
public String getUrl() {
7781
return url;
7882
}
@@ -153,6 +157,14 @@ public void setMasterName(String masterName) {
153157
this.masterName = masterName;
154158
}
155159

160+
public int getKeyExpiredTime() {
161+
return keyExpiredTime;
162+
}
163+
164+
public void setKeyExpiredTime(int keyExpiredTime) {
165+
this.keyExpiredTime = keyExpiredTime;
166+
}
167+
156168
@Override
157169
public boolean check() {
158170
Preconditions.checkNotNull(url, "redis field of URL is required");

0 commit comments

Comments
 (0)