3737
3838import java .io .Closeable ;
3939import java .io .IOException ;
40- import java .util .HashMap ;
4140import java .util .HashSet ;
42- import java .util .LinkedList ;
4341import java .util .List ;
42+ import java .util .Map ;
4443import java .util .Set ;
4544
4645/**
4948public class RedisOutputFormat extends AbstractDtRichOutputFormat <Tuple2 > {
5049 private static final Logger LOG = LoggerFactory .getLogger (RedisOutputFormat .class );
5150
51+ protected String [] fieldNames ;
52+
53+ protected TypeInformation <?>[] fieldTypes ;
54+
55+ protected List <String > primaryKeys ;
56+
57+ protected int timeout = 10000 ;
58+
5259 private String url ;
5360
54- private String database ;
61+ private String database = "0" ;
5562
5663 private String tableName ;
5764
@@ -67,15 +74,7 @@ public class RedisOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
6774
6875 private String masterName ;
6976
70- protected String [] fieldNames ;
71-
72- protected TypeInformation <?>[] fieldTypes ;
73-
74- protected List <String > primaryKeys ;
75-
76- protected int timeout ;
77-
78- protected long keyExpiredTime ;
77+ protected int keyExpiredTime ;
7978
8079 private JedisPool pool ;
8180
@@ -85,8 +84,13 @@ public class RedisOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
8584
8685 private GenericObjectPoolConfig poolConfig ;
8786
88- private RedisOutputFormat (){
87+ private RedisOutputFormat () {
8988 }
89+
90+ public static RedisOutputFormatBuilder buildRedisOutputFormat () {
91+ return new RedisOutputFormatBuilder ();
92+ }
93+
9094 @ Override
9195 public void configure (Configuration parameters ) {
9296
@@ -98,15 +102,15 @@ public void open(int taskNumber, int numTasks) throws IOException {
98102 initMetric ();
99103 }
100104
101- private GenericObjectPoolConfig setPoolConfig (String maxTotal , String maxIdle , String minIdle ){
105+ private GenericObjectPoolConfig setPoolConfig (String maxTotal , String maxIdle , String minIdle ) {
102106 GenericObjectPoolConfig config = new GenericObjectPoolConfig ();
103- if (maxTotal != null ){
107+ if (maxTotal != null ) {
104108 config .setMaxTotal (Integer .parseInt (maxTotal ));
105109 }
106- if (maxIdle != null ){
110+ if (maxIdle != null ) {
107111 config .setMaxIdle (Integer .parseInt (maxIdle ));
108112 }
109- if (minIdle != null ){
113+ if (minIdle != null ) {
110114 config .setMinIdle (Integer .parseInt (minIdle ));
111115 }
112116 return config ;
@@ -123,31 +127,23 @@ private void establishConnection() {
123127 for (String ipPort : nodes ) {
124128 ipPorts .add (ipPort );
125129 String [] ipPortPair = StringUtils .split (ipPort , ":" );
126- addresses .add (new HostAndPort (ipPortPair [0 ].trim (), Integer .valueOf (ipPortPair [1 ].trim ())));
127- }
128- if (timeout == 0 ){
129- timeout = 10000 ;
130- }
131- if (database == null )
132- {
133- database = "0" ;
130+ addresses .add (new HostAndPort (ipPortPair [0 ].trim (), Integer .parseInt (ipPortPair [1 ].trim ())));
134131 }
135132
136- switch (redisType ){
137- //单机
138- case 1 :
133+ switch (RedisType .parse (redisType )) {
134+ case STANDALONE :
139135 pool = new JedisPool (poolConfig , firstIp , Integer .parseInt (firstPort ), timeout , password , Integer .parseInt (database ));
140136 jedis = pool .getResource ();
141137 break ;
142- //哨兵
143- case 2 :
138+ case SENTINEL :
144139 jedisSentinelPool = new JedisSentinelPool (masterName , ipPorts , poolConfig , timeout , password , Integer .parseInt (database ));
145140 jedis = jedisSentinelPool .getResource ();
146141 break ;
147- //集群
148- case 3 :
142+ case CLUSTER :
149143 jedis = new JedisCluster (addresses , timeout , timeout , 10 , password , poolConfig );
144+ break ;
150145 default :
146+ throw new RuntimeException ("unsupported redis type[ " + redisType + "]" );
151147 }
152148 }
153149
@@ -162,58 +158,32 @@ public void writeRecord(Tuple2 record) throws IOException {
162158 if (row .getArity () != fieldNames .length ) {
163159 return ;
164160 }
165-
166- HashMap <String , Integer > map = new HashMap <>(8 );
167- for (String primaryKey : primaryKeys ) {
168- for (int i = 0 ; i < fieldNames .length ; i ++) {
169- if (fieldNames [i ].equals (primaryKey )) {
170- map .put (primaryKey , i );
171- }
172- }
173- }
174-
175- List <String > kvList = new LinkedList <>();
176- for (String primaryKey : primaryKeys ){
177- StringBuilder primaryKv = new StringBuilder ();
178- int index = map .get (primaryKey ).intValue ();
179- primaryKv .append (primaryKey ).append (":" ).append (row .getField (index ));
180- kvList .add (primaryKv .toString ());
181- }
182-
183- String perKey = String .join (":" , kvList );
161+ Map <String , Object > refData = Maps .newHashMap ();
184162 for (int i = 0 ; i < fieldNames .length ; i ++) {
185- StringBuilder key = new StringBuilder ();
186- key .append (tableName ).append (":" ).append (perKey ).append (":" ).append (fieldNames [i ]);
187-
188- String value = "null" ;
189- Object field = row .getField (i );
190- if (field != null ) {
191- value = field .toString ();
192- }
193- saveKey (key .toString (), value );
163+ refData .put (fieldNames [i ], row .getField (i ));
194164 }
195-
196- if (outRecords .getCount () % ROW_PRINT_FREQUENCY == 0 ){
165+ save ( refData );
166+ if (outRecords .getCount () % ROW_PRINT_FREQUENCY == 0 ) {
197167 LOG .info (record .toString ());
198168 }
199169 outRecords .inc ();
200170 }
201171
172+
202173 /**
203- * 1. save key and value .
204- * 2. set expired time for key when keyExpiredTime has been set .
205- * @param key
206- * @param value
174+ * 1. build key from map .
175+ * 2. save key and value .
176+ * 3. set expired time for key when keyExpiredTime has been set.
177+ * @param refData
207178 */
208- private void saveKey (String key , String value ) {
209- if (keyExpiredTime != 0L ) {
210- boolean keyExist = jedis .exists (key );
211- if (keyExist ) {
212- jedis .del (key );
179+ private synchronized void save (Map <String , Object > refData ) {
180+ String key = buildCacheKey (refData );
181+ try {
182+ refData .forEach ((field , value ) -> jedis .hset (key , field , String .valueOf (value )));
183+ } finally {
184+ if (keyExpiredTime != 0 ) {
185+ jedis .expire (key , keyExpiredTime );
213186 }
214- jedis .set (key , value , "NX" , "PX" , keyExpiredTime );
215- } else {
216- jedis .set (key , value );
217187 }
218188 }
219189
@@ -225,101 +195,108 @@ public void close() throws IOException {
225195 if (pool != null ) {
226196 pool .close ();
227197 }
228- if (jedis != null ){
229- if (jedis instanceof Closeable ){
198+ if (jedis != null ) {
199+ if (jedis instanceof Closeable ) {
230200 ((Closeable ) jedis ).close ();
231201 }
232202 }
233203
234204 }
235205
236- public static RedisOutputFormatBuilder buildRedisOutputFormat (){
237- return new RedisOutputFormatBuilder ();
206+ public String buildCacheKey (Map <String , Object > refData ) {
207+ StringBuilder keyBuilder = new StringBuilder (tableName );
208+ for (String primaryKey : primaryKeys ) {
209+ if (!refData .containsKey (primaryKey )) {
210+ return null ;
211+ }
212+ keyBuilder .append ("_" ).append (refData .get (primaryKey ));
213+ }
214+ return keyBuilder .toString ();
238215 }
239216
240- public static class RedisOutputFormatBuilder {
217+ public static class RedisOutputFormatBuilder {
241218 private final RedisOutputFormat redisOutputFormat ;
242219
243- protected RedisOutputFormatBuilder (){
220+ protected RedisOutputFormatBuilder () {
244221 this .redisOutputFormat = new RedisOutputFormat ();
245222 }
246223
247- public RedisOutputFormatBuilder setUrl (String url ){
224+ public RedisOutputFormatBuilder setUrl (String url ) {
248225 redisOutputFormat .url = url ;
249226 return this ;
250227 }
251228
252- public RedisOutputFormatBuilder setDatabase (String database ){
229+ public RedisOutputFormatBuilder setDatabase (String database ) {
253230 redisOutputFormat .database = database ;
254231 return this ;
255232 }
256233
257- public RedisOutputFormatBuilder setTableName (String tableName ){
234+ public RedisOutputFormatBuilder setTableName (String tableName ) {
258235 redisOutputFormat .tableName = tableName ;
259236 return this ;
260237 }
261238
262- public RedisOutputFormatBuilder setPassword (String password ){
239+ public RedisOutputFormatBuilder setPassword (String password ) {
263240 redisOutputFormat .password = password ;
264241 return this ;
265242 }
266243
267- public RedisOutputFormatBuilder setFieldNames (String [] fieldNames ){
244+ public RedisOutputFormatBuilder setFieldNames (String [] fieldNames ) {
268245 redisOutputFormat .fieldNames = fieldNames ;
269246 return this ;
270247 }
271248
272- public RedisOutputFormatBuilder setFieldTypes (TypeInformation <?>[] fieldTypes ){
249+ public RedisOutputFormatBuilder setFieldTypes (TypeInformation <?>[] fieldTypes ) {
273250 redisOutputFormat .fieldTypes = fieldTypes ;
274251 return this ;
275252 }
276253
277- public RedisOutputFormatBuilder setPrimaryKeys (List <String > primaryKeys ){
254+ public RedisOutputFormatBuilder setPrimaryKeys (List <String > primaryKeys ) {
278255 redisOutputFormat .primaryKeys = primaryKeys ;
279256 return this ;
280257 }
281258
282- public RedisOutputFormatBuilder setTimeout (int timeout ){
259+ public RedisOutputFormatBuilder setTimeout (int timeout ) {
283260 redisOutputFormat .timeout = timeout ;
284261 return this ;
285262 }
286263
287- public RedisOutputFormatBuilder setRedisType (int redisType ){
264+ public RedisOutputFormatBuilder setRedisType (int redisType ) {
288265 redisOutputFormat .redisType = redisType ;
289266 return this ;
290267 }
291268
292- public RedisOutputFormatBuilder setMaxTotal (String maxTotal ){
269+ public RedisOutputFormatBuilder setMaxTotal (String maxTotal ) {
293270 redisOutputFormat .maxTotal = maxTotal ;
294271 return this ;
295272 }
296273
297- public RedisOutputFormatBuilder setMaxIdle (String maxIdle ){
274+ public RedisOutputFormatBuilder setMaxIdle (String maxIdle ) {
298275 redisOutputFormat .maxIdle = maxIdle ;
299276 return this ;
300277 }
301278
302- public RedisOutputFormatBuilder setMinIdle (String minIdle ){
279+ public RedisOutputFormatBuilder setMinIdle (String minIdle ) {
303280 redisOutputFormat .minIdle = minIdle ;
304281 return this ;
305282 }
306283
307- public RedisOutputFormatBuilder setMasterName (String masterName ){
284+ public RedisOutputFormatBuilder setMasterName (String masterName ) {
308285 redisOutputFormat .masterName = masterName ;
309286 return this ;
310287 }
311288
312- public RedisOutputFormatBuilder setKeyExpiredTime (long keyExpiredTime ){
289+ public RedisOutputFormatBuilder setKeyExpiredTime (int keyExpiredTime ){
313290 redisOutputFormat .keyExpiredTime = keyExpiredTime ;
314291 return this ;
315292 }
316293
317- public RedisOutputFormat finish (){
318- if (redisOutputFormat .url == null ){
294+ public RedisOutputFormat finish () {
295+ if (redisOutputFormat .url == null ) {
319296 throw new IllegalArgumentException ("No URL supplied." );
320297 }
321298
322- if (redisOutputFormat .tableName == null ){
299+ if (redisOutputFormat .tableName == null ) {
323300 throw new IllegalArgumentException ("No tablename supplied." );
324301 }
325302
0 commit comments