|
4 | 4 | import io.lettuce.core.RedisCommandExecutionException; |
5 | 5 | import io.lettuce.core.RedisURI; |
6 | 6 | import io.lettuce.core.api.StatefulRedisConnection; |
| 7 | +import io.lettuce.core.api.async.RedisAsyncCommands; |
7 | 8 | import io.lettuce.core.codec.ByteArrayCodec; |
8 | 9 | import io.lettuce.core.resource.ClientResources; |
9 | 10 | import software.amazon.jdbc.AwsWrapperProperty; |
@@ -174,23 +175,42 @@ public byte[] readFromCache(String key) { |
174 | 175 | } |
175 | 176 |
|
176 | 177 | public void writeToCache(String key, byte[] value, int expiry) { |
177 | | - boolean isBroken = false; |
178 | 178 | StatefulRedisConnection<byte[], byte[]> conn = null; |
179 | 179 | try { |
180 | 180 | initializeCacheConnectionIfNeeded(false); |
181 | 181 | // get a connection from the write connection pool |
182 | 182 | conn = writeConnectionPool.borrowObject(); |
183 | | - // TODO: make the write to the cache to be async. |
184 | | - conn.sync().setex(computeHashDigest(key.getBytes(StandardCharsets.UTF_8)), expiry, value); |
| 183 | + // Add support to make write to the cache to be async. |
| 184 | + RedisAsyncCommands<byte[], byte[]> asyncCommands = conn.async(); |
| 185 | + byte[] keyHash = computeHashDigest(key.getBytes(StandardCharsets.UTF_8)); |
| 186 | + |
| 187 | + StatefulRedisConnection<byte[], byte[]> finalConn = conn; |
| 188 | + asyncCommands.setex(keyHash, expiry, value) |
| 189 | + .whenComplete((result, exception) -> { |
| 190 | + if (exception != null) { |
| 191 | + LOGGER.warning("Failed to write to cache: " + exception.getMessage()); |
| 192 | + if (writeConnectionPool != null) { |
| 193 | + try { |
| 194 | + returnConnectionBackToPool(finalConn, true, false); |
| 195 | + } catch (Exception ex) { |
| 196 | + LOGGER.warning("Error returning broken write connection back to pool: " + ex.getMessage()); |
| 197 | + } |
| 198 | + } |
| 199 | + } else { |
| 200 | + if (writeConnectionPool != null) { |
| 201 | + try { |
| 202 | + returnConnectionBackToPool(finalConn, false, false); |
| 203 | + } catch (Exception ex) { |
| 204 | + LOGGER.warning("Error returning write connection back to pool: " + ex.getMessage()); |
| 205 | + } |
| 206 | + } |
| 207 | + } |
| 208 | + }); |
185 | 209 | } catch (Exception e) { |
186 | | - if (conn != null){ |
187 | | - isBroken = true; |
188 | | - } |
189 | 210 | LOGGER.warning("Failed to write to cache: " + e.getMessage()); |
190 | | - } finally { |
191 | 211 | if (conn != null && writeConnectionPool != null) { |
192 | 212 | try { |
193 | | - this.returnConnectionBackToPool(conn, isBroken, false); |
| 213 | + returnConnectionBackToPool(conn, true, false); |
194 | 214 | } catch (Exception ex) { |
195 | 215 | LOGGER.warning("Error closing write connection: " + ex.getMessage()); |
196 | 216 | } |
|
0 commit comments