3737
3838import java .io .Closeable ;
3939import java .io .IOException ;
40- import java .util .HashMap ;
4140import java .util .HashSet ;
42- import java .util .LinkedList ;
4341import java .util .List ;
42+ import java .util .Map ;
4443import java .util .Set ;
4544
4645/**
4746 * @author yanxi
4847 */
4948public class RedisOutputFormat extends AbstractDtRichOutputFormat <Tuple2 > {
5049 private static final Logger LOG = LoggerFactory .getLogger (RedisOutputFormat .class );
51-
50+ protected String [] fieldNames ;
51+ protected TypeInformation <?>[] fieldTypes ;
52+ protected List <String > primaryKeys ;
53+ protected int timeout = 10000 ;
5254 private String url ;
53-
54- private String database ;
55-
55+ private String database = "0" ;
5656 private String tableName ;
57-
5857 private String password ;
59-
6058 private int redisType ;
61-
6259 private String maxTotal ;
63-
6460 private String maxIdle ;
65-
6661 private String minIdle ;
67-
6862 private String masterName ;
69-
70- protected String [] fieldNames ;
71-
72- protected TypeInformation <?>[] fieldTypes ;
73-
74- protected List <String > primaryKeys ;
75-
76- protected int timeout ;
77-
7863 private JedisPool pool ;
7964
8065 private JedisCommands jedis ;
@@ -83,8 +68,13 @@ public class RedisOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
8368
8469 private GenericObjectPoolConfig poolConfig ;
8570
86- private RedisOutputFormat (){
71+ private RedisOutputFormat () {
8772 }
73+
74+ public static RedisOutputFormatBuilder buildRedisOutputFormat () {
75+ return new RedisOutputFormatBuilder ();
76+ }
77+
8878 @ Override
8979 public void configure (Configuration parameters ) {
9080
@@ -96,15 +86,15 @@ public void open(int taskNumber, int numTasks) throws IOException {
9686 initMetric ();
9787 }
9888
99- private GenericObjectPoolConfig setPoolConfig (String maxTotal , String maxIdle , String minIdle ){
89+ private GenericObjectPoolConfig setPoolConfig (String maxTotal , String maxIdle , String minIdle ) {
10090 GenericObjectPoolConfig config = new GenericObjectPoolConfig ();
101- if (maxTotal != null ){
91+ if (maxTotal != null ) {
10292 config .setMaxTotal (Integer .parseInt (maxTotal ));
10393 }
104- if (maxIdle != null ){
94+ if (maxIdle != null ) {
10595 config .setMaxIdle (Integer .parseInt (maxIdle ));
10696 }
107- if (minIdle != null ){
97+ if (minIdle != null ) {
10898 config .setMinIdle (Integer .parseInt (minIdle ));
10999 }
110100 return config ;
@@ -121,31 +111,23 @@ private void establishConnection() {
121111 for (String ipPort : nodes ) {
122112 ipPorts .add (ipPort );
123113 String [] ipPortPair = StringUtils .split (ipPort , ":" );
124- addresses .add (new HostAndPort (ipPortPair [0 ].trim (), Integer .valueOf (ipPortPair [1 ].trim ())));
125- }
126- if (timeout == 0 ){
127- timeout = 10000 ;
128- }
129- if (database == null )
130- {
131- database = "0" ;
114+ addresses .add (new HostAndPort (ipPortPair [0 ].trim (), Integer .parseInt (ipPortPair [1 ].trim ())));
132115 }
133116
134- switch (redisType ){
135- //单机
136- case 1 :
117+ switch (RedisType .parse (redisType )) {
118+ case STANDALONE :
137119 pool = new JedisPool (poolConfig , firstIp , Integer .parseInt (firstPort ), timeout , password , Integer .parseInt (database ));
138120 jedis = pool .getResource ();
139121 break ;
140- //哨兵
141- case 2 :
122+ case SENTINEL :
142123 jedisSentinelPool = new JedisSentinelPool (masterName , ipPorts , poolConfig , timeout , password , Integer .parseInt (database ));
143124 jedis = jedisSentinelPool .getResource ();
144125 break ;
145- //集群
146- case 3 :
126+ case CLUSTER :
147127 jedis = new JedisCluster (addresses , timeout , timeout , 10 , password , poolConfig );
128+ break ;
148129 default :
130+ throw new RuntimeException ("unsupported redis type[ " + redisType + "]" );
149131 }
150132 }
151133
@@ -160,38 +142,14 @@ public void writeRecord(Tuple2 record) throws IOException {
160142 if (row .getArity () != fieldNames .length ) {
161143 return ;
162144 }
163-
164- HashMap <String , Integer > map = new HashMap <>(8 );
165- for (String primaryKey : primaryKeys ) {
166- for (int i = 0 ; i < fieldNames .length ; i ++) {
167- if (fieldNames [i ].equals (primaryKey )) {
168- map .put (primaryKey , i );
169- }
170- }
171- }
172-
173- List <String > kvList = new LinkedList <>();
174- for (String primaryKey : primaryKeys ){
175- StringBuilder primaryKv = new StringBuilder ();
176- int index = map .get (primaryKey ).intValue ();
177- primaryKv .append (primaryKey ).append (":" ).append (row .getField (index ));
178- kvList .add (primaryKv .toString ());
179- }
180-
181- String perKey = String .join (":" , kvList );
145+ Map <String , Object > refData = Maps .newHashMap ();
182146 for (int i = 0 ; i < fieldNames .length ; i ++) {
183- StringBuilder key = new StringBuilder ();
184- key .append (tableName ).append (":" ).append (perKey ).append (":" ).append (fieldNames [i ]);
185-
186- String value = "null" ;
187- Object field = row .getField (i );
188- if (field != null ) {
189- value = field .toString ();
190- }
191- jedis .set (key .toString (), value );
147+ refData .put (fieldNames [i ], row .getField (i ));
192148 }
149+ String redisKey = buildCacheKey (refData );
150+ refData .forEach ((key , value ) -> jedis .hset (redisKey , key , String .valueOf (value )));
193151
194- if (outRecords .getCount () % ROW_PRINT_FREQUENCY == 0 ){
152+ if (outRecords .getCount () % ROW_PRINT_FREQUENCY == 0 ) {
195153 LOG .info (record .toString ());
196154 }
197155 outRecords .inc ();
@@ -205,96 +163,103 @@ public void close() throws IOException {
205163 if (pool != null ) {
206164 pool .close ();
207165 }
208- if (jedis != null ){
209- if (jedis instanceof Closeable ){
166+ if (jedis != null ) {
167+ if (jedis instanceof Closeable ) {
210168 ((Closeable ) jedis ).close ();
211169 }
212170 }
213171
214172 }
215173
216- public static RedisOutputFormatBuilder buildRedisOutputFormat (){
217- return new RedisOutputFormatBuilder ();
174+ public String buildCacheKey (Map <String , Object > refData ) {
175+ StringBuilder keyBuilder = new StringBuilder (tableName );
176+ for (String primaryKey : primaryKeys ) {
177+ if (!refData .containsKey (primaryKey )) {
178+ return null ;
179+ }
180+ keyBuilder .append ("_" ).append (refData .get (primaryKey ));
181+ }
182+ return keyBuilder .toString ();
218183 }
219184
220- public static class RedisOutputFormatBuilder {
185+ public static class RedisOutputFormatBuilder {
221186 private final RedisOutputFormat redisOutputFormat ;
222187
223- protected RedisOutputFormatBuilder (){
188+ protected RedisOutputFormatBuilder () {
224189 this .redisOutputFormat = new RedisOutputFormat ();
225190 }
226191
227- public RedisOutputFormatBuilder setUrl (String url ){
192+ public RedisOutputFormatBuilder setUrl (String url ) {
228193 redisOutputFormat .url = url ;
229194 return this ;
230195 }
231196
232- public RedisOutputFormatBuilder setDatabase (String database ){
197+ public RedisOutputFormatBuilder setDatabase (String database ) {
233198 redisOutputFormat .database = database ;
234199 return this ;
235200 }
236201
237- public RedisOutputFormatBuilder setTableName (String tableName ){
202+ public RedisOutputFormatBuilder setTableName (String tableName ) {
238203 redisOutputFormat .tableName = tableName ;
239204 return this ;
240205 }
241206
242- public RedisOutputFormatBuilder setPassword (String password ){
207+ public RedisOutputFormatBuilder setPassword (String password ) {
243208 redisOutputFormat .password = password ;
244209 return this ;
245210 }
246211
247- public RedisOutputFormatBuilder setFieldNames (String [] fieldNames ){
212+ public RedisOutputFormatBuilder setFieldNames (String [] fieldNames ) {
248213 redisOutputFormat .fieldNames = fieldNames ;
249214 return this ;
250215 }
251216
252- public RedisOutputFormatBuilder setFieldTypes (TypeInformation <?>[] fieldTypes ){
217+ public RedisOutputFormatBuilder setFieldTypes (TypeInformation <?>[] fieldTypes ) {
253218 redisOutputFormat .fieldTypes = fieldTypes ;
254219 return this ;
255220 }
256221
257- public RedisOutputFormatBuilder setPrimaryKeys (List <String > primaryKeys ){
222+ public RedisOutputFormatBuilder setPrimaryKeys (List <String > primaryKeys ) {
258223 redisOutputFormat .primaryKeys = primaryKeys ;
259224 return this ;
260225 }
261226
262- public RedisOutputFormatBuilder setTimeout (int timeout ){
227+ public RedisOutputFormatBuilder setTimeout (int timeout ) {
263228 redisOutputFormat .timeout = timeout ;
264229 return this ;
265230 }
266231
267- public RedisOutputFormatBuilder setRedisType (int redisType ){
232+ public RedisOutputFormatBuilder setRedisType (int redisType ) {
268233 redisOutputFormat .redisType = redisType ;
269234 return this ;
270235 }
271236
272- public RedisOutputFormatBuilder setMaxTotal (String maxTotal ){
237+ public RedisOutputFormatBuilder setMaxTotal (String maxTotal ) {
273238 redisOutputFormat .maxTotal = maxTotal ;
274239 return this ;
275240 }
276241
277- public RedisOutputFormatBuilder setMaxIdle (String maxIdle ){
242+ public RedisOutputFormatBuilder setMaxIdle (String maxIdle ) {
278243 redisOutputFormat .maxIdle = maxIdle ;
279244 return this ;
280245 }
281246
282- public RedisOutputFormatBuilder setMinIdle (String minIdle ){
247+ public RedisOutputFormatBuilder setMinIdle (String minIdle ) {
283248 redisOutputFormat .minIdle = minIdle ;
284249 return this ;
285250 }
286251
287- public RedisOutputFormatBuilder setMasterName (String masterName ){
252+ public RedisOutputFormatBuilder setMasterName (String masterName ) {
288253 redisOutputFormat .masterName = masterName ;
289254 return this ;
290255 }
291256
292- public RedisOutputFormat finish (){
293- if (redisOutputFormat .url == null ){
257+ public RedisOutputFormat finish () {
258+ if (redisOutputFormat .url == null ) {
294259 throw new IllegalArgumentException ("No URL supplied." );
295260 }
296261
297- if (redisOutputFormat .tableName == null ){
262+ if (redisOutputFormat .tableName == null ) {
298263 throw new IllegalArgumentException ("No tablename supplied." );
299264 }
300265
0 commit comments