1919package com .dtstack .flink .sql .side .redis ;
2020
2121import com .dtstack .flink .sql .side .*;
22+ import com .dtstack .flink .sql .side .redis .enums .RedisType ;
2223import com .dtstack .flink .sql .side .redis .table .RedisSideReqRow ;
2324import com .dtstack .flink .sql .side .redis .table .RedisSideTableInfo ;
2425import com .esotericsoftware .minlog .Log ;
2526import org .apache .calcite .sql .JoinType ;
27+ import org .apache .commons .collections .CollectionUtils ;
28+ import org .apache .commons .collections .MapUtils ;
29+ import org .apache .commons .lang .StringUtils ;
2630import org .apache .commons .pool2 .impl .GenericObjectPoolConfig ;
2731import org .apache .flink .api .java .typeutils .RowTypeInfo ;
2832import com .google .common .collect .Maps ;
@@ -85,13 +89,14 @@ protected void reloadCache() {
8589 }
8690
8791 cacheRef .set (newCache );
88- LOG .info ("----- Redis all cacheRef reload end:{}" , Calendar . getInstance ());
92+ LOG .info ("----- Redis all cacheRef reload end:{}" , newCache . size ());
8993 }
9094
9195 @ Override
9296 public void flatMap (Row row , Collector <Row > out ) throws Exception {
9397 Map <String , String > inputParams = Maps .newHashMap ();
94- for (Integer conValIndex : sideInfo .getEqualValIndex ()){
98+ for (int i = 0 ; i < sideInfo .getEqualValIndex ().size (); i ++) {
99+ Integer conValIndex = sideInfo .getEqualValIndex ().get (i );
95100 Object equalObj = row .getField (conValIndex );
96101 if (equalObj == null ){
97102 if (sideInfo .getJoinType () == JoinType .LEFT ){
@@ -100,107 +105,55 @@ public void flatMap(Row row, Collector<Row> out) throws Exception {
100105 }
101106 return ;
102107 }
103- String columnName = sideInfo .getEqualFieldList ().get (conValIndex );
104- inputParams .put (columnName , equalObj .toString ());
108+ inputParams .put (sideInfo .getEqualFieldList ().get (i ), equalObj .toString ());
109+ }
110+ String key = buildCacheKey (inputParams );
111+ if (StringUtils .isBlank (key )){
112+ return ;
105113 }
106- String key = buildKey (inputParams );
107114
108115 Map <String , String > cacheMap = cacheRef .get ().get (key );
109-
110- if (cacheMap == null ){
111- if (sideInfo .getJoinType () == JoinType .LEFT ){
112- Row data = fillData (row , null );
113- out .collect (data );
114- }else {
116+ if (MapUtils .isEmpty (cacheMap )){
117+ if (sideInfo .getJoinType () != JoinType .LEFT ){
115118 return ;
116119 }
117-
120+ Row data = fillData (row , null );
121+ out .collect (data );
118122 return ;
119- }
120123
124+ }
121125 Row newRow = fillData (row , cacheMap );
122126 out .collect (newRow );
123127 }
124128
125- private String buildKey (Map <String , String > inputParams ) {
126- String tableName = tableInfo .getTableName ();
127- StringBuilder key = new StringBuilder ();
128- for (int i =0 ; i <inputParams .size (); i ++){
129- key .append (tableName ).append (":" ).append (inputParams .keySet ().toArray ()[i ]).append (":" )
130- .append (inputParams .get (inputParams .keySet ().toArray ()[i ]));
129+ private String buildCacheKey (Map <String , String > refData ) {
130+ StringBuilder keyBuilder = new StringBuilder (tableInfo .getTableName ());
131+ List <String > primaryKeys = tableInfo .getPrimaryKeys ();
132+ for (String primaryKey : primaryKeys ){
133+ if (!refData .containsKey (primaryKey )){
134+ return null ;
135+ }
136+ keyBuilder .append ("_" ).append (refData .get (primaryKey ));
131137 }
132- return key .toString ();
138+ return keyBuilder .toString ();
133139 }
134140
141+
135142 private void loadData (Map <String , Map <String , String >> tmpCache ) throws SQLException {
136143 JedisCommands jedis = null ;
137-
138144 try {
139- for (int i =0 ; i <CONN_RETRY_NUM ; i ++){
140-
141- try {
142- jedis = getJedis (tableInfo );
143- break ;
144- }catch (Exception e ){
145- if (i == CONN_RETRY_NUM - 1 ){
146- throw new RuntimeException ("" , e );
147- }
148-
149- try {
150- String jedisInfo = "url:" + tableInfo .getUrl () + ",pwd:" + tableInfo .getPassword () + ",database:" + tableInfo .getDatabase ();
151- LOG .warn ("get conn fail, wait for 5 sec and try again, connInfo:" + jedisInfo );
152- Thread .sleep (5 * 1000 );
153- } catch (InterruptedException e1 ) {
154- LOG .error ("" , e1 );
155- }
156- }
145+ StringBuilder keyPattern = new StringBuilder (tableInfo .getTableName ());
146+ for (String key : tableInfo .getPrimaryKeys ()){
147+ keyPattern .append ("_" ).append ("*" );
148+ };
149+ jedis = getJedisWithRetry (CONN_RETRY_NUM );
150+ Set <String > keys = getRedisKeys (RedisType .parse (tableInfo .getRedisType ()), jedis , keyPattern .toString ());
151+ if (CollectionUtils .isEmpty (keys )){
152+ return ;
157153 }
158-
159- if (tableInfo .getRedisType () != 3 ){
160- String perKey = tableInfo .getTableName () + "*" ;
161- Set <String > keys = ((Jedis ) jedis ).keys (perKey );
162- List <String > newPerKeys = new LinkedList <>();
163- for (String key : keys ){
164- String [] splitKey = key .split (":" );
165- String newKey = splitKey [0 ] + ":" + splitKey [1 ] + ":" + splitKey [2 ];
166- newPerKeys .add (newKey );
167- }
168- List <String > list = newPerKeys .stream ().distinct ().collect (Collectors .toList ());
169- for (String key : list ){
170- Map <String , String > kv = Maps .newHashMap ();
171- String [] primaryKv = key .split (":" );
172- kv .put (primaryKv [1 ], primaryKv [2 ]);
173- String pattern = key + "*" ;
174- Set <String > realKeys = ((Jedis ) jedis ).keys (pattern );
175- for (String realKey : realKeys ){
176- kv .put (realKey .split (":" )[3 ], jedis .get (realKey ));
177- }
178- tmpCache .put (key , kv );
179- }
180- } else {
181- String perKey = tableInfo .getTableName () + "*" ;
182- Set <String > keys = keys ((JedisCluster ) jedis , perKey );
183- List <String > newPerKeys = new LinkedList <>();
184- for (String key : keys ){
185- String [] splitKey = key .split (":" );
186- String newKey = splitKey [0 ] + ":" + splitKey [1 ] + ":" + splitKey [2 ];
187- newPerKeys .add (newKey );
188- }
189- List <String > list = newPerKeys .stream ().distinct ().collect (Collectors .toList ());
190- for (String key : list ){
191- Map <String , String > kv = Maps .newHashMap ();
192- String [] primaryKv = key .split (":" );
193- kv .put (primaryKv [1 ], primaryKv [2 ]);
194- String pattern = key + "*" ;
195- Set <String > realKeys = keys ((JedisCluster ) jedis , pattern );
196- for (String realKey : realKeys ){
197- kv .put (realKey .split (":" )[3 ], jedis .get (realKey ));
198- }
199- tmpCache .put (key , kv );
200- }
154+ for (String key : keys ){
155+ tmpCache .put (key , jedis .hgetAll (key ));
201156 }
202-
203-
204157 } catch (Exception e ){
205158 LOG .error ("" , e );
206159 } finally {
@@ -223,7 +176,7 @@ private void loadData(Map<String, Map<String, String>> tmpCache) throws SQLExcep
223176 private JedisCommands getJedis (RedisSideTableInfo tableInfo ) {
224177 String url = tableInfo .getUrl ();
225178 String password = tableInfo .getPassword ();
226- String database = tableInfo .getDatabase ();
179+ String database = tableInfo .getDatabase () == null ? "0" : tableInfo . getDatabase () ;
227180 int timeout = tableInfo .getTimeout ();
228181 if (timeout == 0 ){
229182 timeout = 1000 ;
@@ -245,33 +198,58 @@ private JedisCommands getJedis(RedisSideTableInfo tableInfo) {
245198 }
246199 JedisCommands jedis = null ;
247200 GenericObjectPoolConfig poolConfig = setPoolConfig (tableInfo .getMaxTotal (), tableInfo .getMaxIdle (), tableInfo .getMinIdle ());
248- switch (tableInfo .getRedisType ()){
201+ switch (RedisType . parse ( tableInfo .getRedisType () )){
249202 //单机
250- case 1 :
203+ case STANDALONE :
251204 pool = new JedisPool (poolConfig , firstIp , Integer .parseInt (firstPort ), timeout , password , Integer .parseInt (database ));
252205 jedis = pool .getResource ();
253206 break ;
254207 //哨兵
255- case 2 :
208+ case SENTINEL :
256209 jedisSentinelPool = new JedisSentinelPool (tableInfo .getMasterName (), ipPorts , poolConfig , timeout , password , Integer .parseInt (database ));
257210 jedis = jedisSentinelPool .getResource ();
258211 break ;
259212 //集群
260- case 3 :
213+ case CLUSTER :
261214 jedis = new JedisCluster (addresses , timeout , timeout ,1 , poolConfig );
215+ default :
216+ break ;
262217 }
263218
264219 return jedis ;
265220 }
266221
267- private Set <String > keys (JedisCluster jedisCluster , String pattern ){
222+ private JedisCommands getJedisWithRetry (int retryNum ) {
223+ while (retryNum -- > 0 ){
224+ try {
225+ return getJedis (tableInfo );
226+ } catch (Exception e ) {
227+ if (retryNum <= 0 ){
228+ throw new RuntimeException ("getJedisWithRetry error" , e );
229+ }
230+ try {
231+ String jedisInfo = "url:" + tableInfo .getUrl () + ",pwd:" + tableInfo .getPassword () + ",database:" + tableInfo .getDatabase ();
232+ LOG .warn ("get conn fail, wait for 5 sec and try again, connInfo:" + jedisInfo );
233+ Thread .sleep (5 * 1000 );
234+ } catch (InterruptedException e1 ) {
235+ LOG .error ("" , e1 );
236+ }
237+ }
238+ }
239+ return null ;
240+ }
241+
242+ private Set <String > getRedisKeys (RedisType redisType , JedisCommands jedis , String keyPattern ){
243+ if (!redisType .equals (RedisType .CLUSTER )){
244+ return ((Jedis ) jedis ).keys (keyPattern );
245+ }
268246 Set <String > keys = new TreeSet <>();
269- Map <String , JedisPool > clusterNodes = jedisCluster .getClusterNodes ();
247+ Map <String , JedisPool > clusterNodes = (( JedisCluster ) jedis ) .getClusterNodes ();
270248 for (String k : clusterNodes .keySet ()){
271249 JedisPool jp = clusterNodes .get (k );
272250 Jedis connection = jp .getResource ();
273251 try {
274- keys .addAll (connection .keys (pattern ));
252+ keys .addAll (connection .keys (keyPattern ));
275253 } catch (Exception e ){
276254 LOG .error ("Getting keys error: {}" , e );
277255 } finally {
0 commit comments