1919package com .dtstack .flink .sql .sink .redis ;
2020
2121import com .dtstack .flink .sql .outputformat .AbstractDtRichOutputFormat ;
22+ import com .dtstack .flink .sql .sink .redis .enums .RedisType ;
23+ import com .google .common .collect .Maps ;
2224import org .apache .commons .lang3 .StringUtils ;
2325import org .apache .commons .pool2 .impl .GenericObjectPoolConfig ;
2426import org .apache .flink .api .common .typeinfo .TypeInformation ;
3537
3638import java .io .Closeable ;
3739import java .io .IOException ;
38- import java .util .HashMap ;
39- import java .util .HashSet ;
40- import java .util .LinkedList ;
41- import java .util .List ;
42- import java .util .Set ;
40+ import java .util .*;
4341
4442/**
4543 * @author yanxi
@@ -49,7 +47,7 @@ public class RedisOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
4947
5048 private String url ;
5149
52- private String database ;
50+ private String database = "0" ;
5351
5452 private String tableName ;
5553
@@ -71,7 +69,7 @@ public class RedisOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
7169
7270 protected List <String > primaryKeys ;
7371
74- protected int timeout ;
72+ protected int timeout = 10000 ;
7573
7674 private JedisPool pool ;
7775
@@ -121,29 +119,21 @@ private void establishConnection() {
121119 String [] ipPortPair = StringUtils .split (ipPort , ":" );
122120 addresses .add (new HostAndPort (ipPortPair [0 ].trim (), Integer .valueOf (ipPortPair [1 ].trim ())));
123121 }
124- if (timeout == 0 ){
125- timeout = 10000 ;
126- }
127- if (database == null )
128- {
129- database = "0" ;
130- }
131122
132- switch (redisType ){
133- //单机
134- case 1 :
123+ switch (RedisType .parse (redisType )){
124+ case STANDALONE :
135125 pool = new JedisPool (poolConfig , firstIp , Integer .parseInt (firstPort ), timeout , password , Integer .parseInt (database ));
136126 jedis = pool .getResource ();
137127 break ;
138- //哨兵
139- case 2 :
128+ case SENTINEL :
140129 jedisSentinelPool = new JedisSentinelPool (masterName , ipPorts , poolConfig , timeout , password , Integer .parseInt (database ));
141130 jedis = jedisSentinelPool .getResource ();
142131 break ;
143- //集群
144- case 3 :
132+ case CLUSTER :
145133 jedis = new JedisCluster (addresses , timeout , timeout , 10 , password , poolConfig );
134+ break ;
146135 default :
136+ throw new RuntimeException ("unsupport redis type[ " + redisType + "]" );
147137 }
148138 }
149139
@@ -158,36 +148,14 @@ public void writeRecord(Tuple2 record) throws IOException {
158148 if (row .getArity () != fieldNames .length ) {
159149 return ;
160150 }
161-
162- HashMap <String , Integer > map = new HashMap <>(8 );
163- for (String primaryKey : primaryKeys ) {
164- for (int i = 0 ; i < fieldNames .length ; i ++) {
165- if (fieldNames [i ].equals (primaryKey )) {
166- map .put (primaryKey , i );
167- }
168- }
169- }
170-
171- List <String > kvList = new LinkedList <>();
172- for (String primaryKey : primaryKeys ){
173- StringBuilder primaryKv = new StringBuilder ();
174- int index = map .get (primaryKey ).intValue ();
175- primaryKv .append (primaryKey ).append (":" ).append (row .getField (index ));
176- kvList .add (primaryKv .toString ());
177- }
178-
179- String perKey = String .join (":" , kvList );
180- for (int i = 0 ; i < fieldNames .length ; i ++) {
181- StringBuilder key = new StringBuilder ();
182- key .append (tableName ).append (":" ).append (perKey ).append (":" ).append (fieldNames [i ]);
183-
184- String value = "null" ;
185- Object field = row .getField (i );
186- if (field != null ) {
187- value = field .toString ();
188- }
189- jedis .set (key .toString (), value );
151+ Map <String , Object > refData = Maps .newHashMap ();
152+ for (int i = 0 ; i < fieldNames .length ; i ++){
153+ refData .put (fieldNames [i ], row .getField (i ));
190154 }
155+ String redisKey = buildCacheKey (refData );
156+ refData .entrySet ().forEach (e ->{
157+ jedis .hset (redisKey , e .getKey (), String .valueOf (e .getValue ()));
158+ });
191159
192160 if (outRecords .getCount () % ROW_PRINT_FREQUENCY == 0 ){
193161 LOG .info (record .toString ());
@@ -211,6 +179,17 @@ public void close() throws IOException {
211179
212180 }
213181
182+ public String buildCacheKey (Map <String , Object > refData ) {
183+ StringBuilder keyBuilder = new StringBuilder (tableName );
184+ for (String primaryKey : primaryKeys ){
185+ if (!refData .containsKey (primaryKey )){
186+ return null ;
187+ }
188+ keyBuilder .append ("_" ).append (refData .get (primaryKey ));
189+ }
190+ return keyBuilder .toString ();
191+ }
192+
214193 public static RedisOutputFormatBuilder buildRedisOutputFormat (){
215194 return new RedisOutputFormatBuilder ();
216195 }
0 commit comments