Skip to content

Commit e3f1b0e

Browse files
committed
[opt-34201][redis] Redis Sink 重构.
1 parent 8c4a25a commit e3f1b0e

File tree

3 files changed

+62
-97
lines changed

3 files changed

+62
-97
lines changed

redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java

Lines changed: 57 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -37,44 +37,29 @@
3737

3838
import java.io.Closeable;
3939
import java.io.IOException;
40-
import java.util.HashMap;
4140
import java.util.HashSet;
42-
import java.util.LinkedList;
4341
import java.util.List;
42+
import java.util.Map;
4443
import java.util.Set;
4544

4645
/**
4746
* @author yanxi
4847
*/
4948
public class RedisOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
5049
private static final Logger LOG = LoggerFactory.getLogger(RedisOutputFormat.class);
51-
50+
protected String[] fieldNames;
51+
protected TypeInformation<?>[] fieldTypes;
52+
protected List<String> primaryKeys;
53+
protected int timeout = 10000;
5254
private String url;
53-
54-
private String database;
55-
55+
private String database = "0";
5656
private String tableName;
57-
5857
private String password;
59-
6058
private int redisType;
61-
6259
private String maxTotal;
63-
6460
private String maxIdle;
65-
6661
private String minIdle;
67-
6862
private String masterName;
69-
70-
protected String[] fieldNames;
71-
72-
protected TypeInformation<?>[] fieldTypes;
73-
74-
protected List<String> primaryKeys;
75-
76-
protected int timeout;
77-
7863
private JedisPool pool;
7964

8065
private JedisCommands jedis;
@@ -83,8 +68,13 @@ public class RedisOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
8368

8469
private GenericObjectPoolConfig poolConfig;
8570

86-
private RedisOutputFormat(){
71+
private RedisOutputFormat() {
8772
}
73+
74+
public static RedisOutputFormatBuilder buildRedisOutputFormat() {
75+
return new RedisOutputFormatBuilder();
76+
}
77+
8878
@Override
8979
public void configure(Configuration parameters) {
9080

@@ -96,15 +86,15 @@ public void open(int taskNumber, int numTasks) throws IOException {
9686
initMetric();
9787
}
9888

99-
private GenericObjectPoolConfig setPoolConfig(String maxTotal, String maxIdle, String minIdle){
89+
private GenericObjectPoolConfig setPoolConfig(String maxTotal, String maxIdle, String minIdle) {
10090
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
101-
if (maxTotal != null){
91+
if (maxTotal != null) {
10292
config.setMaxTotal(Integer.parseInt(maxTotal));
10393
}
104-
if (maxIdle != null){
94+
if (maxIdle != null) {
10595
config.setMaxIdle(Integer.parseInt(maxIdle));
10696
}
107-
if (minIdle != null){
97+
if (minIdle != null) {
10898
config.setMinIdle(Integer.parseInt(minIdle));
10999
}
110100
return config;
@@ -121,31 +111,23 @@ private void establishConnection() {
121111
for (String ipPort : nodes) {
122112
ipPorts.add(ipPort);
123113
String[] ipPortPair = StringUtils.split(ipPort, ":");
124-
addresses.add(new HostAndPort(ipPortPair[0].trim(), Integer.valueOf(ipPortPair[1].trim())));
125-
}
126-
if (timeout == 0){
127-
timeout = 10000;
128-
}
129-
if (database == null)
130-
{
131-
database = "0";
114+
addresses.add(new HostAndPort(ipPortPair[0].trim(), Integer.parseInt(ipPortPair[1].trim())));
132115
}
133116

134-
switch (redisType){
135-
//单机
136-
case 1:
117+
switch (RedisType.parse(redisType)) {
118+
case STANDALONE:
137119
pool = new JedisPool(poolConfig, firstIp, Integer.parseInt(firstPort), timeout, password, Integer.parseInt(database));
138120
jedis = pool.getResource();
139121
break;
140-
//哨兵
141-
case 2:
122+
case SENTINEL:
142123
jedisSentinelPool = new JedisSentinelPool(masterName, ipPorts, poolConfig, timeout, password, Integer.parseInt(database));
143124
jedis = jedisSentinelPool.getResource();
144125
break;
145-
//集群
146-
case 3:
126+
case CLUSTER:
147127
jedis = new JedisCluster(addresses, timeout, timeout, 10, password, poolConfig);
128+
break;
148129
default:
130+
throw new RuntimeException("unsupported redis type[ " + redisType + "]");
149131
}
150132
}
151133

@@ -160,38 +142,14 @@ public void writeRecord(Tuple2 record) throws IOException {
160142
if (row.getArity() != fieldNames.length) {
161143
return;
162144
}
163-
164-
HashMap<String, Integer> map = new HashMap<>(8);
165-
for (String primaryKey : primaryKeys) {
166-
for (int i = 0; i < fieldNames.length; i++) {
167-
if (fieldNames[i].equals(primaryKey)) {
168-
map.put(primaryKey, i);
169-
}
170-
}
171-
}
172-
173-
List<String> kvList = new LinkedList<>();
174-
for (String primaryKey : primaryKeys){
175-
StringBuilder primaryKv = new StringBuilder();
176-
int index = map.get(primaryKey).intValue();
177-
primaryKv.append(primaryKey).append(":").append(row.getField(index));
178-
kvList.add(primaryKv.toString());
179-
}
180-
181-
String perKey = String.join(":", kvList);
145+
Map<String, Object> refData = Maps.newHashMap();
182146
for (int i = 0; i < fieldNames.length; i++) {
183-
StringBuilder key = new StringBuilder();
184-
key.append(tableName).append(":").append(perKey).append(":").append(fieldNames[i]);
185-
186-
String value = "null";
187-
Object field = row.getField(i);
188-
if (field != null) {
189-
value = field.toString();
190-
}
191-
jedis.set(key.toString(), value);
147+
refData.put(fieldNames[i], row.getField(i));
192148
}
149+
String redisKey = buildCacheKey(refData);
150+
refData.forEach((key, value) -> jedis.hset(redisKey, key, String.valueOf(value)));
193151

194-
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0){
152+
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) {
195153
LOG.info(record.toString());
196154
}
197155
outRecords.inc();
@@ -205,96 +163,103 @@ public void close() throws IOException {
205163
if (pool != null) {
206164
pool.close();
207165
}
208-
if (jedis != null){
209-
if (jedis instanceof Closeable){
166+
if (jedis != null) {
167+
if (jedis instanceof Closeable) {
210168
((Closeable) jedis).close();
211169
}
212170
}
213171

214172
}
215173

216-
public static RedisOutputFormatBuilder buildRedisOutputFormat(){
217-
return new RedisOutputFormatBuilder();
174+
public String buildCacheKey(Map<String, Object> refData) {
175+
StringBuilder keyBuilder = new StringBuilder(tableName);
176+
for (String primaryKey : primaryKeys) {
177+
if (!refData.containsKey(primaryKey)) {
178+
return null;
179+
}
180+
keyBuilder.append("_").append(refData.get(primaryKey));
181+
}
182+
return keyBuilder.toString();
218183
}
219184

220-
public static class RedisOutputFormatBuilder{
185+
public static class RedisOutputFormatBuilder {
221186
private final RedisOutputFormat redisOutputFormat;
222187

223-
protected RedisOutputFormatBuilder(){
188+
protected RedisOutputFormatBuilder() {
224189
this.redisOutputFormat = new RedisOutputFormat();
225190
}
226191

227-
public RedisOutputFormatBuilder setUrl(String url){
192+
public RedisOutputFormatBuilder setUrl(String url) {
228193
redisOutputFormat.url = url;
229194
return this;
230195
}
231196

232-
public RedisOutputFormatBuilder setDatabase(String database){
197+
public RedisOutputFormatBuilder setDatabase(String database) {
233198
redisOutputFormat.database = database;
234199
return this;
235200
}
236201

237-
public RedisOutputFormatBuilder setTableName(String tableName){
202+
public RedisOutputFormatBuilder setTableName(String tableName) {
238203
redisOutputFormat.tableName = tableName;
239204
return this;
240205
}
241206

242-
public RedisOutputFormatBuilder setPassword(String password){
207+
public RedisOutputFormatBuilder setPassword(String password) {
243208
redisOutputFormat.password = password;
244209
return this;
245210
}
246211

247-
public RedisOutputFormatBuilder setFieldNames(String[] fieldNames){
212+
public RedisOutputFormatBuilder setFieldNames(String[] fieldNames) {
248213
redisOutputFormat.fieldNames = fieldNames;
249214
return this;
250215
}
251216

252-
public RedisOutputFormatBuilder setFieldTypes(TypeInformation<?>[] fieldTypes){
217+
public RedisOutputFormatBuilder setFieldTypes(TypeInformation<?>[] fieldTypes) {
253218
redisOutputFormat.fieldTypes = fieldTypes;
254219
return this;
255220
}
256221

257-
public RedisOutputFormatBuilder setPrimaryKeys(List<String > primaryKeys){
222+
public RedisOutputFormatBuilder setPrimaryKeys(List<String> primaryKeys) {
258223
redisOutputFormat.primaryKeys = primaryKeys;
259224
return this;
260225
}
261226

262-
public RedisOutputFormatBuilder setTimeout(int timeout){
227+
public RedisOutputFormatBuilder setTimeout(int timeout) {
263228
redisOutputFormat.timeout = timeout;
264229
return this;
265230
}
266231

267-
public RedisOutputFormatBuilder setRedisType(int redisType){
232+
public RedisOutputFormatBuilder setRedisType(int redisType) {
268233
redisOutputFormat.redisType = redisType;
269234
return this;
270235
}
271236

272-
public RedisOutputFormatBuilder setMaxTotal(String maxTotal){
237+
public RedisOutputFormatBuilder setMaxTotal(String maxTotal) {
273238
redisOutputFormat.maxTotal = maxTotal;
274239
return this;
275240
}
276241

277-
public RedisOutputFormatBuilder setMaxIdle(String maxIdle){
242+
public RedisOutputFormatBuilder setMaxIdle(String maxIdle) {
278243
redisOutputFormat.maxIdle = maxIdle;
279244
return this;
280245
}
281246

282-
public RedisOutputFormatBuilder setMinIdle(String minIdle){
247+
public RedisOutputFormatBuilder setMinIdle(String minIdle) {
283248
redisOutputFormat.minIdle = minIdle;
284249
return this;
285250
}
286251

287-
public RedisOutputFormatBuilder setMasterName(String masterName){
252+
public RedisOutputFormatBuilder setMasterName(String masterName) {
288253
redisOutputFormat.masterName = masterName;
289254
return this;
290255
}
291256

292-
public RedisOutputFormat finish(){
293-
if (redisOutputFormat.url == null){
257+
public RedisOutputFormat finish() {
258+
if (redisOutputFormat.url == null) {
294259
throw new IllegalArgumentException("No URL supplied.");
295260
}
296261

297-
if (redisOutputFormat.tableName == null){
262+
if (redisOutputFormat.tableName == null) {
298263
throw new IllegalArgumentException("No tablename supplied.");
299264
}
300265

redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisSink.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public class RedisSink implements RetractStreamTableSink<Row>, IStreamSinkGener<
7171

7272
protected String registerTableName;
7373

74-
public RedisSink(){
74+
public RedisSink() {
7575

7676
}
7777

@@ -108,7 +108,7 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
108108
@Override
109109
public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
110110
RedisOutputFormat.RedisOutputFormatBuilder builder = RedisOutputFormat.buildRedisOutputFormat();
111-
builder.setUrl(this.url)
111+
RedisOutputFormat redisOutputFormat = builder.setUrl(this.url)
112112
.setDatabase(this.database)
113113
.setTableName(this.tableName)
114114
.setPassword(this.password)
@@ -120,8 +120,8 @@ public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<
120120
.setMaxTotal(this.maxTotal)
121121
.setMaxIdle(this.maxIdle)
122122
.setMinIdle(this.minIdle)
123-
.setMasterName(this.masterName);
124-
RedisOutputFormat redisOutputFormat = builder.finish();
123+
.setMasterName(this.masterName)
124+
.finish();
125125
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(redisOutputFormat);
126126
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction)
127127
.setParallelism(parallelism)

redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/enums/RedisType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,6 @@ public static RedisType parse(int redisType){
2828
return type;
2929
}
3030
}
31-
throw new RuntimeException("unsupport redis type["+ redisType + "]");
31+
throw new RuntimeException("unsupported redis type["+ redisType + "]");
3232
}
3333
}

0 commit comments

Comments
 (0)