Skip to content

Commit aac3a27

Browse files
committed
redis can
1 parent 36549fb commit aac3a27

File tree

12 files changed

+56
-23
lines changed

12 files changed

+56
-23
lines changed

redis5/redis5-side/redis-all-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAllReqRow.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@
3939
import java.util.*;
4040
import java.util.concurrent.atomic.AtomicReference;
4141
import java.util.stream.Collectors;
42-
42+
/**
43+
* @author yanxi
44+
*/
4345
public class RedisAllReqRow extends AllReqRow{
4446

4547
private static final long serialVersionUID = 7578879189085344807L;
@@ -259,7 +261,8 @@ private JedisCommands getJedis(RedisSideTableInfo tableInfo) {
259261
break;
260262
//集群
261263
case 3:
262-
jedis = new JedisCluster(addresses, timeout, timeout,1, poolConfig);
264+
jedis = new JedisCluster(addresses, timeout, timeout, 1, poolConfig);
265+
default:
263266
}
264267

265268
return jedis;

redis5/redis5-side/redis-all-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAllSideInfo.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
import com.google.common.collect.Lists;
2929

3030
import java.util.List;
31-
31+
/**
32+
* @author yanxi
33+
*/
3234
public class RedisAllSideInfo extends SideInfo {
3335

3436
private static final long serialVersionUID = 1998703966487857613L;

redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@
4343
import java.util.List;
4444
import java.util.Map;
4545
import java.util.function.Consumer;
46-
46+
/**
47+
* @author yanxi
48+
*/
4749
public class RedisAsyncReqRow extends AsyncReqRow {
4850

4951
private static final long serialVersionUID = -2079908694523987738L;
@@ -108,6 +110,7 @@ private void buildRedisClient(RedisSideTableInfo tableInfo){
108110
clusterClient = RedisClusterClient.create(clusterUri.toString());
109111
clusterConnection = clusterClient.connect();
110112
async = clusterConnection.async();
113+
default:
111114
}
112115
}
113116

redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncSideInfo.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
import com.google.common.collect.Lists;
2929

3030
import java.util.List;
31-
31+
/**
32+
* @author yanxi
33+
*/
3234
public class RedisAsyncSideInfo extends SideInfo {
3335
private static final long serialVersionUID = -4851348392924455039L;
3436

redis5/redis5-side/redis-side-core/src/main/java/com/dtstack/flink/sql/side/redis/table/RedisSideParser.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424

2525
import java.util.Map;
2626

27+
/**
28+
* @author yanxi
29+
*/
2730
public class RedisSideParser extends AbsSideTableParser {
2831

2932
@Override

redis5/redis5-side/redis-side-core/src/main/java/com/dtstack/flink/sql/side/redis/table/RedisSideTableInfo.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020

2121
import com.dtstack.flink.sql.side.SideTableInfo;
2222
import com.google.common.base.Preconditions;
23-
23+
/**
24+
* @author yanxi
25+
*/
2426
public class RedisSideTableInfo extends SideTableInfo {
2527

2628
private static final long serialVersionUID = -1L;

redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,23 @@
2626
import org.apache.flink.types.Row;
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
29-
import redis.clients.jedis.*;
29+
import redis.clients.jedis.HostAndPort;
30+
import redis.clients.jedis.JedisCluster;
31+
import redis.clients.jedis.JedisCommands;
32+
import redis.clients.jedis.JedisPool;
33+
import redis.clients.jedis.JedisSentinelPool;
34+
3035
import java.io.Closeable;
3136
import java.io.IOException;
32-
import java.util.*;
33-
37+
import java.util.HashMap;
38+
import java.util.HashSet;
39+
import java.util.LinkedList;
40+
import java.util.List;
41+
import java.util.Set;
42+
43+
/**
44+
* @author yanxi
45+
*/
3446
public class RedisOutputFormat extends DtRichOutputFormat<Tuple2> {
3547
private static final Logger LOG = LoggerFactory.getLogger(RedisOutputFormat.class);
3648

@@ -129,7 +141,8 @@ private void establishConnection() {
129141
break;
130142
//集群
131143
case 3:
132-
jedis = new JedisCluster(addresses, timeout, timeout,10, password, poolConfig);
144+
jedis = new JedisCluster(addresses, timeout, timeout, 10, password, poolConfig);
145+
default:
133146
}
134147
}
135148

@@ -145,21 +158,21 @@ public void writeRecord(Tuple2 record) throws IOException {
145158
return;
146159
}
147160

148-
HashMap<String, Integer> map = new HashMap<>();
149-
for (String primaryKey : primaryKeys){
150-
for (int i=0; i<fieldNames.length; i++){
151-
if (fieldNames[i].equals(primaryKey)){
161+
HashMap<String, Integer> map = new HashMap<>(8);
162+
for (String primaryKey : primaryKeys) {
163+
for (int i = 0; i < fieldNames.length; i++) {
164+
if (fieldNames[i].equals(primaryKey)) {
152165
map.put(primaryKey, i);
153166
}
154167
}
155168
}
156169

157170
List<String> kvList = new LinkedList<>();
158171
for (String primaryKey : primaryKeys){
159-
StringBuilder primaryKV = new StringBuilder();
172+
StringBuilder primaryKv = new StringBuilder();
160173
int index = map.get(primaryKey).intValue();
161-
primaryKV.append(primaryKey).append(":").append(row.getField(index));
162-
kvList.add(primaryKV.toString());
174+
primaryKv.append(primaryKey).append(":").append(row.getField(index));
175+
kvList.add(primaryKv.toString());
163176
}
164177

165178
String perKey = String.join(":", kvList);

redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisSink.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
import org.apache.flink.types.Row;
3434

3535
import java.util.List;
36-
36+
/**
37+
* @author yanxi
38+
*/
3739
public class RedisSink implements RetractStreamTableSink<Row>, IStreamSinkGener<RedisSink> {
3840

3941
protected String[] fieldNames;

redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/table/RedisSinkParser.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@
2424
import com.google.common.collect.Lists;
2525
import org.apache.commons.lang3.StringUtils;
2626

27-
import java.util.ArrayList;
2827
import java.util.Arrays;
2928
import java.util.List;
3029
import java.util.Map;
31-
30+
/**
31+
* @author yanxi
32+
*/
3233
public class RedisSinkParser extends AbsTableParser {
3334
@Override
3435
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {

redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/table/RedisTableInfo.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020

2121
import com.dtstack.flink.sql.table.TargetTableInfo;
2222
import com.google.common.base.Preconditions;
23-
23+
/**
24+
* @author yanxi
25+
*/
2426
public class RedisTableInfo extends TargetTableInfo {
2527

2628
private static final String CURR_TYPE = "redis";

0 commit comments

Comments
 (0)