2323import com .dtstack .flink .sql .side .cache .CacheObj ;
2424import com .dtstack .flink .sql .side .redis .table .RedisSideReqRow ;
2525import com .dtstack .flink .sql .side .redis .table .RedisSideTableInfo ;
26- import io .lettuce .core .KeyValue ;
2726import io .lettuce .core .RedisClient ;
2827import io .lettuce .core .RedisFuture ;
2928import io .lettuce .core .api .StatefulRedisConnection ;
29+ import io .lettuce .core .api .async .RedisHashAsyncCommands ;
3030import io .lettuce .core .api .async .RedisKeyAsyncCommands ;
3131import io .lettuce .core .api .async .RedisStringAsyncCommands ;
3232import io .lettuce .core .cluster .RedisClusterClient ;
3333import io .lettuce .core .cluster .api .StatefulRedisClusterConnection ;
34+ import org .apache .commons .collections .MapUtils ;
35+ import org .apache .commons .lang .StringUtils ;
3436import org .apache .flink .api .java .typeutils .RowTypeInfo ;
35- import com .google .common .collect .Lists ;
3637import com .google .common .collect .Maps ;
3738import org .apache .flink .configuration .Configuration ;
3839import org .apache .flink .streaming .api .functions .async .ResultFuture ;
@@ -118,21 +119,21 @@ public Row fillData(Row input, Object sideInput) {
118119 @ Override
119120 public void asyncInvoke (Row input , ResultFuture <Row > resultFuture ) throws Exception {
120121 Row inputRow = Row .copy (input );
121- List <String > keyData = Lists . newLinkedList ();
122+ Map <String , Object > refData = Maps . newHashMap ();
122123 for (int i = 0 ; i < sideInfo .getEqualValIndex ().size (); i ++) {
123124 Integer conValIndex = sideInfo .getEqualValIndex ().get (i );
124125 Object equalObj = inputRow .getField (conValIndex );
125126 if (equalObj == null ){
126127 dealMissKey (inputRow , resultFuture );
127128 return ;
128129 }
129- String value = equalObj .toString ();
130- keyData .add (sideInfo .getEqualFieldList ().get (i ));
131- keyData .add (value );
130+ refData .put (sideInfo .getEqualFieldList ().get (i ), equalObj );
132131 }
133132
134- String key = buildCacheKey (keyData );
135-
133+ String key = buildCacheKey (refData );
134+ if (StringUtils .isBlank (key )){
135+ return ;
136+ }
136137 if (openCache ()){
137138 CacheObj val = getFromCache (key );
138139 if (val != null ){
@@ -154,44 +155,36 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
154155 }
155156 }
156157
157- Map <String , String > keyValue = Maps .newHashMap ();
158- List <String > value = async .keys (key + ":*" ).get ();
159- String [] values = value .toArray (new String [value .size ()]);
160- if (values .length == 0 ){
161- dealMissKey (inputRow , resultFuture );
162- } else {
163- RedisFuture <List <KeyValue <String , String >>> future = ((RedisStringAsyncCommands ) async ).mget (values );
164- future .thenAccept (new Consumer <List <KeyValue <String , String >>>() {
165- @ Override
166- public void accept (List <KeyValue <String , String >> keyValues ) {
167- if (keyValues .size () != 0 ) {
168- for (int i = 0 ; i < keyValues .size (); i ++) {
169- String [] splitKeys = keyValues .get (i ).getKey ().split (":" );
170- keyValue .put (splitKeys [1 ], splitKeys [2 ]);
171- keyValue .put (splitKeys [3 ], keyValues .get (i ).getValue ());
172- }
173- try {
174- Row row = fillData (inputRow , keyValue );
175- dealCacheData (key ,CacheObj .buildCacheObj (ECacheContentType .MultiLine , keyValue ));
176- resultFuture .complete (Collections .singleton (row ));
177- } catch (Exception e ) {
178- dealFillDataError (resultFuture , e , inputRow );
179- }
180- } else {
181- dealMissKey (inputRow , resultFuture );
182- dealCacheData (key ,CacheMissVal .getMissKeyObj ());
158+ RedisFuture <Map <String , String >> future = ((RedisHashAsyncCommands ) async ).hgetall (key );
159+ future .thenAccept (new Consumer <Map <String , String >>() {
160+ @ Override
161+ public void accept (Map <String , String > values ) {
162+ if (MapUtils .isNotEmpty (values )) {
163+ try {
164+ Row row = fillData (inputRow , values );
165+ dealCacheData (key ,CacheObj .buildCacheObj (ECacheContentType .MultiLine , values ));
166+ resultFuture .complete (Collections .singleton (row ));
167+ } catch (Exception e ) {
168+ dealFillDataError (resultFuture , e , inputRow );
183169 }
170+ } else {
171+ dealMissKey (inputRow , resultFuture );
172+ dealCacheData (key ,CacheMissVal .getMissKeyObj ());
184173 }
185- });
186- }
174+ }
175+ });
187176 }
188177
189- private String buildCacheKey (List <String > keyData ) {
190- String kv = String .join (":" , keyData );
191- String tableName = redisSideTableInfo .getTableName ();
192- StringBuilder preKey = new StringBuilder ();
193- preKey .append (tableName ).append (":" ).append (kv );
194- return preKey .toString ();
178+ private String buildCacheKey (Map <String , Object > refData ) {
179+ StringBuilder keyBuilder = new StringBuilder (redisSideTableInfo .getTableName ());
180+ List <String > primaryKeys = redisSideTableInfo .getPrimaryKeys ();
181+ for (String primaryKey : primaryKeys ){
182+ if (refData .containsKey (primaryKey )){
183+ keyBuilder .append ("_" ).append (refData .get (primaryKey ));
184+ }
185+ return null ;
186+ }
187+ return keyBuilder .toString ();
195188 }
196189
197190 @ Override
0 commit comments