Skip to content

Commit f308580

Browse files
committed
Fix for RetryStats and rate limiting in complex queries.
Fixed problem with rate limiting in complex queries with sorting, where limits were counted twice resulting in 1/2 of expected throughput. Also fixed stats not being set properly into QueryResult.
1 parent e2e8874 commit f308580

File tree

9 files changed

+317
-83
lines changed

9 files changed

+317
-83
lines changed

driver/src/main/java/oracle/nosql/driver/http/Client.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,10 @@ public Result execute(Request kvRequest) {
325325
*/
326326
kvRequest.validate();
327327

328+
/* clear any retry stats that may exist on this request object */
329+
kvRequest.setRetryStats(null);
330+
kvRequest.setRateLimitDelayedMs(0);
331+
328332
if (kvRequest.isQueryRequest()) {
329333
QueryRequest qreq = (QueryRequest)kvRequest;
330334

@@ -377,9 +381,6 @@ public Result execute(Request kvRequest) {
377381

378382
Throwable exception = null;
379383

380-
/* clear any retry stats that may exist on this request object */
381-
kvRequest.setRetryStats(null);
382-
383384
/*
384385
* If the request doesn't set an explicit compartment, use
385386
* the config default if provided.
@@ -603,6 +604,12 @@ public Result execute(Request kvRequest) {
603604
updateRateLimiters(((TableResult)res).getTableName(), tl);
604605
}
605606

607+
/*
608+
* We may not have rate limiters yet because queries may
609+
* not have a tablename until after the first request.
610+
* So try to get rate limiters if we don't have them yet and
611+
* this is a QueryRequest.
612+
*/
606613
if (rateLimiterMap != null && readLimiter == null) {
607614
readLimiter = getQueryRateLimiter(kvRequest, true);
608615
}
@@ -829,9 +836,18 @@ private RateLimiter getQueryRateLimiter(Request request, boolean read) {
829836
}
830837

831838
if (read) {
832-
return rateLimiterMap.getReadLimiter(tableName);
839+
RateLimiter rl = rateLimiterMap.getReadLimiter(tableName);
840+
if (rl != null) {
841+
request.setReadRateLimiter(rl);
842+
}
843+
return rl;
844+
}
845+
846+
RateLimiter rl = rateLimiterMap.getWriteLimiter(tableName);
847+
if (rl != null) {
848+
request.setWriteRateLimiter(rl);
833849
}
834-
return rateLimiterMap.getWriteLimiter(tableName);
850+
return rl;
835851
}
836852

837853
/**

driver/src/main/java/oracle/nosql/driver/ops/QueryRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ public QueryRequest() {
114114
public QueryRequest copyInternal() {
115115

116116
QueryRequest internalReq = new QueryRequest();
117-
internalReq.timeoutMs = timeoutMs;
117+
super.copyTo(internalReq);
118+
118119
internalReq.traceLevel = traceLevel;
119120
internalReq.limit = limit;
120121
internalReq.maxReadKB = maxReadKB;
@@ -125,7 +126,6 @@ public QueryRequest copyInternal() {
125126
internalReq.preparedStatement = preparedStatement;
126127
internalReq.isInternal = true;
127128
internalReq.driver = driver;
128-
internalReq.tableName = tableName;
129129
return internalReq;
130130
}
131131

driver/src/main/java/oracle/nosql/driver/ops/QueryResult.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
import oracle.nosql.driver.Consistency;
1313
import oracle.nosql.driver.NoSQLHandle;
14-
import oracle.nosql.driver.RateLimiter;
1514
import oracle.nosql.driver.query.QueryDriver;
1615
import oracle.nosql.driver.values.MapValue;
1716

@@ -230,23 +229,6 @@ void compute() {
230229
QueryDriver driver = request.getDriver();
231230
driver.compute(this);
232231
isComputed = true;
233-
234-
/*
235-
* If the original request specified rate limiting, apply the
236-
* used read/write units to the limiter(s) here
237-
*/
238-
if (request != null) {
239-
RateLimiter readLimiter = request.getReadRateLimiter();
240-
if (readLimiter != null) {
241-
readLimiter.consumeUnitsUnconditionally(
242-
super.getReadUnitsInternal());
243-
}
244-
RateLimiter writeLimiter = request.getWriteRateLimiter();
245-
if (writeLimiter != null) {
246-
writeLimiter.consumeUnitsUnconditionally(
247-
super.getWriteUnitsInternal());
248-
}
249-
}
250232
}
251233

252234
/**

driver/src/main/java/oracle/nosql/driver/ops/Request.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,4 +423,20 @@ public int getRateLimitDelayedMs() {
423423
* @return the type name of the request
424424
*/
425425
public abstract String getTypeName();
426+
427+
/**
428+
* @hidden
429+
* Copy internal fields to another Request object.
430+
*/
431+
public void copyTo(Request other) {
432+
other.setTimeoutInternal(this.timeoutMs);
433+
other.setCheckRequestSize(this.checkRequestSize);
434+
other.setCompartmentInternal(this.compartment);
435+
other.setTableNameInternal(this.tableName);
436+
other.setStartTimeMs(this.startTimeMs);
437+
other.setRetryStats(this.retryStats);
438+
other.setReadRateLimiter(this.readRateLimiter);
439+
other.setWriteRateLimiter(this.writeRateLimiter);
440+
other.setRateLimitDelayedMs(this.rateLimitDelayedMs);
441+
}
426442
}

driver/src/main/java/oracle/nosql/driver/ops/RetryStats.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class RetryStats {
3636
* Internal use only.
3737
* Create a new retry stats object.
3838
*/
39-
RetryStats() {
39+
public RetryStats() {
4040
this.retries = 0;
4141
this.delayMs = 0;
4242
this.exceptionMap = new HashMap<Class<? extends Throwable>, Integer>();
@@ -117,6 +117,50 @@ public void clear() {
117117
exceptionMap.clear();
118118
}
119119

120+
public Map<Class<? extends Throwable>, Integer> getExceptionMap() {
121+
return exceptionMap;
122+
}
123+
124+
/**
125+
* @hidden
126+
* Internal use only.
127+
* Adds stats to the current object.
128+
*/
129+
public void addStats(RetryStats rs) {
130+
if (rs == null) {
131+
return;
132+
}
133+
delayMs += rs.getDelayMs();
134+
retries += rs.getRetries();
135+
Map<Class<? extends Throwable>, Integer> emap = rs.getExceptionMap();
136+
if (emap == null || emap.isEmpty()) {
137+
return;
138+
}
139+
for (Map.Entry<Class<? extends Throwable>, Integer> entry:
140+
emap.entrySet()) {
141+
int i = entry.getValue().intValue();
142+
Integer val = exceptionMap.get(entry.getKey());
143+
if (val != null) {
144+
i += val.intValue();
145+
}
146+
exceptionMap.put(entry.getKey(), i);
147+
}
148+
}
149+
150+
@Override
151+
public boolean equals(Object o) {
152+
if (!(o instanceof RetryStats)) {
153+
return false;
154+
}
155+
RetryStats rs = (RetryStats)o;
156+
if (retries != rs.retries ||
157+
delayMs != rs.delayMs ||
158+
exceptionMap.equals(rs.exceptionMap) == false) {
159+
return false;
160+
}
161+
return true;
162+
}
163+
120164
@Override
121165
public String toString() {
122166
StringBuilder sb = new StringBuilder();

driver/src/main/java/oracle/nosql/driver/query/QueryDriver.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,8 @@ private void setQueryResult(QueryResult result) {
217217
result.setReadKB(theRCB.getReadKB());
218218
result.setReadUnits(theRCB.getReadUnits());
219219
result.setWriteKB(theRCB.getWriteKB());
220+
result.setRateLimitDelayedMs(theRCB.getRateLimitDelayedMs());
221+
result.setRetryStats(theRCB.getRetryStats());
220222

221223
theResults = null;
222224
theRCB.resetKBConsumption();

driver/src/main/java/oracle/nosql/driver/query/ReceiveIter.java

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.util.List;
1717
import java.util.TreeSet;
1818

19+
import oracle.nosql.driver.NoSQLException;
1920
import oracle.nosql.driver.RetryableException;
2021
import oracle.nosql.driver.ops.QueryRequest;
2122
import oracle.nosql.driver.ops.QueryResult;
@@ -390,6 +391,36 @@ private boolean sortingNext(
390391
}
391392
}
392393

394+
/**
395+
* Execute a copy of a request.
396+
* After execution, copy specific fields from the request copy back
397+
* to the original request. This is needed because client.execute() may
398+
* set or update specific fields in the request, such as startTime,
399+
* retry stats, rate limiters, etc.
400+
*/
401+
private QueryResult execute(RuntimeControlBlock rcb,
402+
QueryRequest origRequest,
403+
QueryRequest reqCopy) {
404+
NoSQLException e = null;
405+
QueryResult result = null;
406+
try {
407+
result = (QueryResult)rcb.getClient().execute(reqCopy);
408+
} catch (NoSQLException qe) {
409+
e = qe;
410+
}
411+
/*
412+
* Copy values back to original request, even when the execute()
413+
* throws an error. This is because even in the error case the
414+
* execute() call may have set/updated these values.
415+
*/
416+
reqCopy.copyTo(origRequest);
417+
/* if the execute() call threw an error, throw it here */
418+
if (e != null) {
419+
throw e;
420+
}
421+
return result;
422+
}
423+
393424
/*
394425
* Make sure we receive (and cache) at least one result per partition
395426
* (except from partitions that do not contain any results at all).
@@ -405,15 +436,16 @@ private void initPartitionSort(
405436
* the partition whose id is specified in theContinuationKey and
406437
* from any other partition that is co-located with that partition.
407438
*/
408-
QueryRequest req = rcb.getRequest().copyInternal();
409-
req.setContKey(state.theContinuationKey);
439+
QueryRequest origRequest = rcb.getRequest();
440+
QueryRequest reqCopy = origRequest.copyInternal();
441+
reqCopy.setContKey(state.theContinuationKey);
410442

411443
if (rcb.getTraceLevel() >= 1) {
412444
rcb.trace("ReceiveIter : executing remote request for " +
413445
"sorting phase 1");
414446
}
415447

416-
QueryResult result = (QueryResult)rcb.getClient().execute(req);
448+
QueryResult result = execute(rcb, origRequest, reqCopy);
417449

418450
int numPids = result.getNumPids();
419451
List<MapValue> results = result.getResultsInternal();
@@ -423,6 +455,8 @@ private void initPartitionSort(
423455
rcb.tallyReadKB(result.getReadKB());
424456
rcb.tallyReadUnits(result.getReadUnits());
425457
rcb.tallyWriteKB(result.getWriteKB());
458+
rcb.tallyRateLimitDelayedMs(result.getRateLimitDelayedMs());
459+
rcb.tallyRetryStats(result.getRetryStats());
426460

427461
if (rcb.getTraceLevel() >= 1) {
428462
rcb.trace("ReceiveIter.initPartitionSort() : got result.\n" +
@@ -708,31 +742,32 @@ MapValue next() {
708742

709743
void fetch() {
710744

711-
QueryRequest req = theRCB.getRequest().copyInternal();
712-
req.setContKey(theContinuationKey);
713-
req.setShardId(theIsForShard ? theShardOrPartId : -1);
745+
QueryRequest origRequest = theRCB.getRequest();
746+
QueryRequest reqCopy = origRequest.copyInternal();
747+
reqCopy.setContKey(theContinuationKey);
748+
reqCopy.setShardId(theIsForShard ? theShardOrPartId : -1);
714749

715750
if (doesSort() && !theIsForShard) {
716751
theState.theMemoryConsumption -= theResultsSize;
717752
theRCB.decMemoryConsumption(theResultsSize);
718753
long numResults =
719-
((req.getMaxMemoryConsumption() - theState.theDupElimMemory) /
754+
((reqCopy.getMaxMemoryConsumption() - theState.theDupElimMemory) /
720755
((theState.theSortedScanners.size() + 1) *
721756
(theState.theTotalResultsSize /
722757
theState.theTotalNumResults)));
723758
if (numResults > 2048) {
724759
numResults = 2048;
725760
}
726-
req.setLimit((int)numResults);
761+
reqCopy.setLimit((int)numResults);
727762
}
728763

729764
if (theRCB.getTraceLevel() >= 1) {
730765
theRCB.trace("RemoteScanner : executing remote request. spid = " +
731766
theShardOrPartId);
732-
assert(req.hasDriver());
767+
assert(reqCopy.hasDriver());
733768
}
734769

735-
QueryResult result = (QueryResult)theRCB.getClient().execute(req);
770+
QueryResult result = execute(theRCB, origRequest, reqCopy);
736771

737772
theResults = result.getResultsInternal();
738773
theContinuationKey = result.getContinuationKey();
@@ -742,6 +777,8 @@ void fetch() {
742777
theRCB.tallyReadKB(result.getReadKB());
743778
theRCB.tallyReadUnits(result.getReadUnits());
744779
theRCB.tallyWriteKB(result.getWriteKB());
780+
theRCB.tallyRateLimitDelayedMs(result.getRateLimitDelayedMs());
781+
theRCB.tallyRetryStats(result.getRetryStats());
745782

746783
assert(result.reachedLimit() || !theMoreRemoteResults);
747784

driver/src/main/java/oracle/nosql/driver/query/RuntimeControlBlock.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import oracle.nosql.driver.Consistency;
1313
import oracle.nosql.driver.http.Client;
1414
import oracle.nosql.driver.ops.QueryRequest;
15+
import oracle.nosql.driver.ops.RetryStats;
1516
import oracle.nosql.driver.values.FieldValue;
1617

1718
/**
@@ -62,6 +63,12 @@ public class RuntimeControlBlock {
6263
private int theReadUnits;
6364
private int theWriteKB;
6465

66+
/*
67+
* Stats for retries and rate limiting during query batch execution
68+
*/
69+
private int theRateLimitDelayedMs;
70+
private RetryStats theRetryStats;
71+
6572
/*
6673
* The number of memory bytes consumed by the query at the client for
6774
* blocking operations (duplicate eleimination, sorting). Not applicable
@@ -179,6 +186,20 @@ public void setRegVal(int regId, FieldValue value) {
179186
theRegisters[regId] = value;
180187
}
181188

189+
public void tallyRateLimitDelayedMs(int ms) {
190+
theRateLimitDelayedMs += ms;
191+
}
192+
193+
public void tallyRetryStats(RetryStats rs) {
194+
if (rs == null) {
195+
return;
196+
}
197+
if (theRetryStats == null) {
198+
theRetryStats = new RetryStats();
199+
}
200+
theRetryStats.addStats(rs);
201+
}
202+
182203
public void tallyReadKB(int nkb) {
183204
theReadKB += nkb;
184205
}
@@ -203,12 +224,22 @@ public void resetKBConsumption() {
203224
theReadKB = 0;
204225
theReadUnits = 0;
205226
theWriteKB = 0;
227+
theRateLimitDelayedMs = 0;
228+
theRetryStats = null;
206229
}
207230

208231
public int getWriteKB() {
209232
return theWriteKB;
210233
}
211234

235+
public RetryStats getRetryStats() {
236+
return theRetryStats;
237+
}
238+
239+
public int getRateLimitDelayedMs() {
240+
return theRateLimitDelayedMs;
241+
}
242+
212243
public void setReachedLimit(boolean value) {
213244
theReachedLimit = value;
214245
}

0 commit comments

Comments
 (0)