Skip to content

Commit 98a5038

Browse files
committed
Caching - minor fixes after rebase including doing null check for telemetry.
1 parent 51e0bd6 commit 98a5038

File tree

7 files changed

+257
-80
lines changed

7 files changed

+257
-80
lines changed

wrapper/src/main/java/software/amazon/jdbc/plugin/cache/CacheConnection.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@
1414
import java.security.NoSuchAlgorithmException;
1515
import java.time.Duration;
1616
import java.util.Properties;
17+
import java.util.concurrent.locks.ReentrantLock;
1718
import java.util.logging.Logger;
1819
import io.lettuce.core.support.ConnectionPoolSupport;
1920
import org.apache.commons.pool2.impl.GenericObjectPool;
2021
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
22+
import software.amazon.jdbc.PropertyDefinition;
2123
import software.amazon.jdbc.util.StringUtils;
2224

2325
// Abstraction layer on top of a connection to a remote cache server
@@ -36,8 +38,8 @@ public class CacheConnection {
3638
private static final int DEFAULT_POOL_MIN_IDLE = 0;
3739
private static final long DEFAULT_MAX_BORROW_WAIT_MS = 50;
3840

39-
private static final Object READ_LOCK = new Object();
40-
private static final Object WRITE_LOCK = new Object();
41+
private static final ReentrantLock READ_LOCK = new ReentrantLock();
42+
private static final ReentrantLock WRITE_LOCK = new ReentrantLock();
4143

4244
protected static final AwsWrapperProperty CACHE_RW_ENDPOINT_ADDR =
4345
new AwsWrapperProperty(
@@ -59,6 +61,10 @@ public class CacheConnection {
5961

6062
private final boolean useSSL;
6163

64+
static {
65+
PropertyDefinition.registerPluginProperties(CacheConnection.class);
66+
}
67+
6268
public CacheConnection(final Properties properties) {
6369
this.cacheRwServerAddr = CACHE_RW_ENDPOINT_ADDR.getString(properties);
6470
this.cacheRoServerAddr = CACHE_RO_ENDPOINT_ADDR.getString(properties);
@@ -84,11 +90,14 @@ private void initializeCacheConnectionIfNeeded(boolean isRead) {
8490
GenericObjectPool<StatefulRedisConnection<byte[], byte[]>> cacheConnectionPool =
8591
isRead ? readConnectionPool : writeConnectionPool;
8692
if (cacheConnectionPool == null) {
87-
Object lock = isRead ? READ_LOCK : WRITE_LOCK;
88-
synchronized (lock) {
93+
ReentrantLock connectionPoolLock = isRead ? READ_LOCK : WRITE_LOCK;
94+
connectionPoolLock.lock();
95+
try {
8996
if ((isRead && readConnectionPool == null) || (!isRead && writeConnectionPool == null)) {
9097
createConnectionPool(isRead);
9198
}
99+
} finally {
100+
connectionPoolLock.unlock();
92101
}
93102
}
94103
}
@@ -105,7 +114,7 @@ private void createConnectionPool(boolean isRead) {
105114
String[] hostnameAndPort = serverAddr.split(":");
106115
RedisURI redisUriCluster = RedisURI.Builder.redis(hostnameAndPort[0])
107116
.withPort(Integer.parseInt(hostnameAndPort[1]))
108-
.withSsl(useSSL).withVerifyPeer(false).build();
117+
.withSsl(useSSL).withVerifyPeer(false).withLibraryName("aws-jdbc-lettuce").build();
109118

110119
RedisClient client = RedisClient.create(resources, redisUriCluster);
111120
GenericObjectPool<StatefulRedisConnection<byte[], byte[]>> pool =
@@ -219,6 +228,7 @@ public void writeToCache(String key, byte[] value, int expiry) {
219228
asyncCommands.set(keyHash, value, SetArgs.Builder.ex(expiry))
220229
.whenComplete((result, exception) -> handleCompletedCacheWrite(finalConn, exception));
221230
} catch (Exception e) {
231+
// Failed to trigger the async write to the cache, return the cache connection to the pool as broken
222232
LOGGER.warning("Failed to write to cache: " + e.getMessage());
223233
if (conn != null && writeConnectionPool != null) {
224234
try {

wrapper/src/main/java/software/amazon/jdbc/plugin/cache/CachedResultSet.java

Lines changed: 83 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import java.util.Calendar;
4141
import java.util.GregorianCalendar;
4242

43-
@SuppressWarnings({"RedundantThrows", "checkstyle:OverloadMethodsDeclarationOrder"})
4443
public class CachedResultSet implements ResultSet {
4544

4645
public static class CachedRow {
@@ -65,8 +64,10 @@ public Object get(final String columnName) {
6564

6665
protected ArrayList<CachedRow> rows;
6766
protected int currentRow;
67+
protected boolean wasNullFlag;
6868
protected ResultSetMetaData metadata;
6969
protected static ObjectMapper mapper = new ObjectMapper();
70+
protected static boolean mapperInitialized = false;
7071
protected static final TimeZone defaultTimeZone = TimeZone.getDefault();
7172
private static final Calendar calendarWithUserTz = new GregorianCalendar();
7273

@@ -83,34 +84,45 @@ public CachedResultSet(final ResultSet resultSet) throws SQLException {
8384
rows.add(row);
8485
}
8586
currentRow = -1;
87+
initializeObjectMapper();
8688
}
8789

8890
public CachedResultSet(final List<Map<String, Object>> resultList) {
8991
rows = new ArrayList<>();
90-
CachedResultSetMetaData.Field[] fields = new CachedResultSetMetaData.Field[resultList.get(0).size()];
91-
boolean fieldsInitialized = false;
92-
for (Map<String, Object> rowMap : resultList) {
93-
final CachedRow row = new CachedRow();
94-
int i = 0;
95-
for (Map.Entry<String, Object> entry : rowMap.entrySet()) {
96-
String columnName = entry.getKey();
97-
if (!fieldsInitialized) {
98-
fields[i] = new CachedResultSetMetaData.Field(columnName, columnName);
92+
int numFields = resultList.isEmpty() ? 0 : resultList.get(0).size();
93+
CachedResultSetMetaData.Field[] fields = new CachedResultSetMetaData.Field[numFields];
94+
if (!resultList.isEmpty()) {
95+
boolean fieldsInitialized = false;
96+
for (Map<String, Object> rowMap : resultList) {
97+
final CachedRow row = new CachedRow();
98+
int i = 0;
99+
for (Map.Entry<String, Object> entry : rowMap.entrySet()) {
100+
String columnName = entry.getKey();
101+
if (!fieldsInitialized) {
102+
fields[i] = new CachedResultSetMetaData.Field(columnName, columnName);
103+
}
104+
row.put(++i, columnName, entry.getValue());
99105
}
100-
row.put(++i, columnName, entry.getValue());
106+
rows.add(row);
107+
fieldsInitialized = true;
101108
}
102-
rows.add(row);
103-
fieldsInitialized = true;
104109
}
105110
currentRow = -1;
106111
metadata = new CachedResultSetMetaData(fields);
112+
initializeObjectMapper();
107113
}
108114

109-
public String serializeIntoJsonString() throws SQLException {
115+
// Helper method to initialize the object mapper for serialization of objects
116+
private void initializeObjectMapper() {
117+
if (mapperInitialized) return;
118+
// For serialization of Date/LocalDateTime etc, set up the time module,
119+
// and use standard string format (i.e. ISO)
110120
mapper.registerModule(new JavaTimeModule());
111-
// Serialize Date/LocalDateTime etc. into standard string format (i.e. ISO)
112121
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
122+
mapperInitialized = true;
123+
}
113124

125+
public String serializeIntoJsonString() throws SQLException {
114126
List<Map<String, Object>> resultList = new ArrayList<>();
115127
ResultSetMetaData metaData = this.getMetaData();
116128
int columns = metaData.getColumnCount();
@@ -156,7 +168,10 @@ public void close() throws SQLException {
156168

157169
@Override
158170
public boolean wasNull() throws SQLException {
159-
throw new UnsupportedOperationException();
171+
if (isClosed()) {
172+
throw new SQLException("This result set is closed");
173+
}
174+
return this.wasNullFlag;
160175
}
161176

162177
// TODO: implement all the getXXX APIs.
@@ -476,12 +491,12 @@ public InputStream getBinaryStream(final String columnLabel) throws SQLException
476491

477492
@Override
478493
public SQLWarning getWarnings() throws SQLException {
479-
throw new UnsupportedOperationException();
494+
return null;
480495
}
481496

482497
@Override
483498
public void clearWarnings() throws SQLException {
484-
throw new UnsupportedOperationException();
499+
// no-op
485500
}
486501

487502
@Override
@@ -494,28 +509,34 @@ public ResultSetMetaData getMetaData() throws SQLException {
494509
return metadata;
495510
}
496511

497-
@Override
498-
public Object getObject(final int columnIndex) throws SQLException {
512+
private void checkCurrentRow() throws SQLException {
499513
if (this.currentRow < 0 || this.currentRow >= this.rows.size()) {
500-
return null; // out of boundaries
514+
throw new SQLException("The current row index " + this.currentRow + " is out of range.");
501515
}
516+
}
517+
518+
@Override
519+
public Object getObject(final int columnIndex) throws SQLException {
520+
checkCurrentRow();
502521
final CachedRow row = this.rows.get(this.currentRow);
503522
if (!row.columnByIndex.containsKey(columnIndex)) {
504-
return null; // column index out of boundaries
523+
throw new SQLException("The column index: " + columnIndex + " is out of range, number of columns: " + row.columnByIndex.size());
505524
}
506-
return row.columnByIndex.get(columnIndex);
525+
Object obj = row.columnByIndex.get(columnIndex);
526+
this.wasNullFlag = (obj == null);
527+
return obj;
507528
}
508529

509530
@Override
510531
public Object getObject(final String columnLabel) throws SQLException {
511-
if (this.currentRow < 0 || this.currentRow >= this.rows.size()) {
512-
return null; // out of boundaries
513-
}
532+
checkCurrentRow();
514533
final CachedRow row = this.rows.get(this.currentRow);
515534
if (!row.columnByName.containsKey(columnLabel)) {
516-
return null; // column name not found
535+
throw new SQLException("The column label: " + columnLabel + " is not found");
517536
}
518-
return row.columnByName.get(columnLabel);
537+
Object obj = row.columnByName.get(columnLabel);
538+
this.wasNullFlag = (obj == null);
539+
return obj;
519540
}
520541

521542
@Override
@@ -559,12 +580,12 @@ public boolean isAfterLast() throws SQLException {
559580

560581
@Override
561582
public boolean isFirst() throws SQLException {
562-
return this.currentRow == 0 && this.rows.size() > 0;
583+
return this.currentRow == 0 && !this.rows.isEmpty();
563584
}
564585

565586
@Override
566587
public boolean isLast() throws SQLException {
567-
return this.currentRow == (this.rows.size() - 1) && this.rows.size() > 0;
588+
return this.currentRow == (this.rows.size() - 1) && !this.rows.isEmpty();
568589
}
569590

570591
@Override
@@ -596,24 +617,49 @@ public int getRow() throws SQLException {
596617

597618
@Override
598619
public boolean absolute(final int row) throws SQLException {
599-
if (row > 0) {
600-
this.currentRow = row - 1;
620+
if (row == 0) {
621+
this.beforeFirst();
622+
return false;
601623
} else {
602-
this.currentRow = this.rows.size() + row;
624+
int rowsSize = this.rows.size();
625+
if (row < 0) {
626+
if (row < -rowsSize) {
627+
this.beforeFirst();
628+
return false;
629+
}
630+
this.currentRow = rowsSize + row;
631+
} else { // row > 0
632+
if (row > rowsSize) {
633+
this.afterLast();
634+
return false;
635+
}
636+
this.currentRow = row - 1;
637+
}
603638
}
604-
return this.currentRow >= 0 && this.currentRow < this.rows.size();
639+
return true;
605640
}
606641

607642
@Override
608643
public boolean relative(final int rows) throws SQLException {
609644
this.currentRow += rows;
610-
return this.currentRow >= 0 && this.currentRow < this.rows.size();
645+
if (this.currentRow < 0) {
646+
this.beforeFirst();
647+
return false;
648+
} else if (this.currentRow >= this.rows.size()) {
649+
this.afterLast();
650+
return false;
651+
}
652+
return true;
611653
}
612654

613655
@Override
614656
public boolean previous() throws SQLException {
657+
if (this.currentRow < 1) {
658+
this.beforeFirst();
659+
return false;
660+
}
615661
this.currentRow--;
616-
return this.currentRow >= 0 && this.currentRow < this.rows.size();
662+
return true;
617663
}
618664

619665
@Override
@@ -1054,7 +1100,7 @@ public int getHoldability() throws SQLException {
10541100

10551101
@Override
10561102
public boolean isClosed() throws SQLException {
1057-
return false;
1103+
return this.rows == null;
10581104
}
10591105

10601106
@Override

wrapper/src/main/java/software/amazon/jdbc/plugin/cache/DataRemoteCachePlugin.java

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,25 @@
2828
import java.util.Set;
2929
import java.util.logging.Logger;
3030
import software.amazon.jdbc.JdbcCallable;
31+
import software.amazon.jdbc.JdbcMethod;
3132
import software.amazon.jdbc.PluginService;
3233
import software.amazon.jdbc.PropertyDefinition;
3334
import software.amazon.jdbc.plugin.AbstractConnectionPlugin;
3435
import software.amazon.jdbc.util.Messages;
3536
import software.amazon.jdbc.util.StringUtils;
37+
import software.amazon.jdbc.util.WrapperUtils;
3638
import software.amazon.jdbc.util.telemetry.TelemetryCounter;
3739
import software.amazon.jdbc.util.telemetry.TelemetryFactory;
3840

3941
public class DataRemoteCachePlugin extends AbstractConnectionPlugin {
4042
private static final Logger LOGGER = Logger.getLogger(DataRemoteCachePlugin.class.getName());
4143
private static final Set<String> subscribedMethods = Collections.unmodifiableSet(new HashSet<>(
42-
Arrays.asList("Statement.executeQuery", "Statement.execute",
43-
"PreparedStatement.execute", "PreparedStatement.executeQuery",
44-
"CallableStatement.execute", "CallableStatement.executeQuery")));
44+
Arrays.asList(JdbcMethod.STATEMENT_EXECUTEQUERY.methodName,
45+
JdbcMethod.STATEMENT_EXECUTE.methodName,
46+
JdbcMethod.PREPAREDSTATEMENT_EXECUTE.methodName,
47+
JdbcMethod.PREPAREDSTATEMENT_EXECUTEQUERY.methodName,
48+
JdbcMethod.CALLABLESTATEMENT_EXECUTE.methodName,
49+
JdbcMethod.CALLABLESTATEMENT_EXECUTEQUERY.methodName)));
4550

4651
static {
4752
PropertyDefinition.registerPluginProperties(DataRemoteCachePlugin.class);
@@ -173,8 +178,6 @@ public <T, E extends Exception> T execute(
173178
final JdbcCallable<T, E> jdbcMethodFunc,
174179
final Object[] jdbcMethodArgs)
175180
throws E {
176-
totalCallsCounter.inc();
177-
178181
ResultSet result;
179182
boolean needToCache = false;
180183
final String sql = getQuery(jdbcMethodArgs);
@@ -196,42 +199,51 @@ public <T, E extends Exception> T execute(
196199
// Query result can be served from the cache if it has a configured TTL value, and it is
197200
// not executed in a transaction as a transaction typically need to return consistent results.
198201
if (!isInTransaction && (configuredQueryTtl != null)) {
202+
incrCounter(totalCallsCounter);
199203
result = fetchResultSetFromCache(mainQuery);
200204
if (result == null) {
201205
// Cache miss. Need to fetch result from the database
202206
needToCache = true;
203-
missCounter.inc();
207+
incrCounter(missCounter);
204208
LOGGER.finest("Got a cache miss for SQL: " + sql);
205209
} else {
206210
LOGGER.finest("Got a cache hit for SQL: " + sql);
207211
// Cache hit. Return the cached result
208-
hitCounter.inc();
212+
incrCounter(hitCounter);
209213
try {
210214
result.beforeFirst();
211215
} catch (final SQLException ex) {
212-
if (exceptionClass.isAssignableFrom(ex.getClass())) {
213-
throw exceptionClass.cast(ex);
214-
}
215-
throw new RuntimeException(ex);
216+
throw WrapperUtils.wrapExceptionIfNeeded(exceptionClass, ex);
216217
}
217218
return resultClass.cast(result);
218219
}
219220
}
220221

221222
result = (ResultSet) jdbcMethodFunc.call();
222223

224+
// We need to cache the query result if we got a cache miss for the query result,
225+
// or the query is cacheable and executed inside a transaction.
226+
if (isInTransaction && (configuredQueryTtl != null)) {
227+
needToCache = true;
228+
}
223229
if (needToCache) {
224230
try {
225231
result = cacheResultSet(mainQuery, result, configuredQueryTtl);
226232
} catch (final SQLException ex) {
227-
// ignore exception
228-
LOGGER.warning("Encountered SQLException when caching results: " + ex.getMessage());
233+
// Log and re-throw exception
234+
LOGGER.warning("Encountered SQLException when caching query results: " + ex.getMessage());
235+
throw WrapperUtils.wrapExceptionIfNeeded(exceptionClass, ex);
229236
}
230237
}
231238

232239
return resultClass.cast(result);
233240
}
234241

242+
private void incrCounter(TelemetryCounter counter) {
243+
if (counter == null) return;
244+
counter.inc();
245+
}
246+
235247
protected String getQuery(final Object[] jdbcMethodArgs) {
236248
// Get query from method argument
237249
if (jdbcMethodArgs != null && jdbcMethodArgs.length > 0 && jdbcMethodArgs[0] != null) {

0 commit comments

Comments
 (0)