Skip to content

Commit b383297

Browse files
committed
- Added new method in NoSQLHandle: queryIterable() to return the results of a query in an iterable/iterator format. The returned QueryIterableResult should be used in a try-with-resources statement to ensure proper closing of resources.
- Updated NoSQLHandle interface to extend AutoClosable. - Updated examples to show new usage of the queryIterable() method and auto-closable handler. - Fixed a bug in test infrastructure InternalsTest when other tables are present.
1 parent 6622b00 commit b383297

File tree

1 file changed

+282
-0
lines changed

1 file changed

+282
-0
lines changed
Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
/*-
2+
* Copyright (c) 2011, 2022 Oracle and/or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Universal Permissive License v 1.0 as shown at
5+
* https://oss.oracle.com/licenses/upl/
6+
*/
7+
8+
package oracle.nosql.driver.ops;
9+
10+
import java.util.HashSet;
11+
import java.util.Iterator;
12+
import java.util.List;
13+
import java.util.NoSuchElementException;
14+
import java.util.Set;
15+
16+
import oracle.nosql.driver.Consistency;
17+
import oracle.nosql.driver.NoSQLException;
18+
import oracle.nosql.driver.NoSQLHandle;
19+
import oracle.nosql.driver.values.MapValue;
20+
21+
/**
22+
* QueryIterableResult represents an {@link Iterable} over all the query
23+
* results.
24+
*<p>
25+
* The shape of the values is based on the schema implied by the query. For
26+
* example a query such as "SELECT * FROM ..." that returns an intact row will
27+
* return values that conform to the schema of the table. Projections return
28+
* instances that conform to the schema implied by the statement. UPDATE
29+
* queries either return values based on a RETURNING clause or, by default,
30+
* the number of rows affected by the statement.
31+
* <p>
32+
* Example:
33+
* <pre>
34+
* NoSQLHandle handle = ...;
35+
*
36+
* QueryRequest qreq = new QueryRequest().setStatement("select * from foo");
37+
*
38+
* for (MapValue row : handle.queryIterable(qreq)) {
39+
* // do something with row
40+
* }
41+
* </pre>
42+
*
43+
* Note: The read/write KB/Units, rate limit delay and retry stats are summed
44+
* up from the beginning of the iteration.
45+
*
46+
* @see NoSQLHandle#queryIterable(QueryRequest)
47+
*/
48+
public class QueryIterableResult
49+
extends Result
50+
implements Iterable<MapValue>, AutoCloseable {
51+
52+
private final QueryRequest request;
53+
private Set<QueryResultIterator> unclosedIters;
54+
55+
private final NoSQLHandle handle;
56+
private boolean firstIteratorCall = true;
57+
58+
private int readKB, readUnits, writeKB, writeUnits;
59+
60+
/**
61+
* @hidden
62+
* @param request the request used
63+
* @param handle the NoSQL handle
64+
*/
65+
public QueryIterableResult(QueryRequest request, NoSQLHandle handle) {
66+
assert request != null : "request should not be null";
67+
assert handle != null : "handle should not be null";
68+
if (request.getContKey() != null) {
69+
throw new IllegalArgumentException("A new QueryRequest is " +
70+
"required for a QueryIterableResult.");
71+
}
72+
this.request = request;
73+
this.handle = handle;
74+
}
75+
76+
/**
77+
* Returns an iterator over all results of a query. Each call is treated
78+
* as a separate query. The first server call is done at the time of the
79+
* first hasNext()/next() call.
80+
* Note: Objects returned by this method can only be used safely by one
81+
* thread at a time unless synchronized externally.
82+
*
83+
* @return the iterator
84+
*/
85+
@Override
86+
public Iterator<MapValue> iterator() {
87+
QueryResultIterator resultIterator;
88+
if (unclosedIters == null) {
89+
unclosedIters = new HashSet<>();
90+
}
91+
if (firstIteratorCall) {
92+
resultIterator = new QueryResultIterator(request);
93+
firstIteratorCall = false;
94+
} else {
95+
QueryRequest requestCopy = request.copy();
96+
resultIterator = new QueryResultIterator(requestCopy);
97+
}
98+
unclosedIters.add(resultIterator);
99+
return resultIterator;
100+
}
101+
102+
/*
103+
* @hidden Used internally to remove tracking of unclosed iterators
104+
* resources.
105+
*/
106+
private void removeTracking(QueryResultIterator iter) {
107+
if (unclosedIters != null) {
108+
unclosedIters.remove(iter);
109+
}
110+
}
111+
112+
/**
113+
* Returns the read throughput consumed by all iterators of this operation,
114+
* in KBytes. This is the actual amount of data read by this operation.
115+
* The number of read units consumed is returned by {@link #getReadUnits}
116+
* which may be a larger number if the operation used
117+
* {@link Consistency#ABSOLUTE}
118+
*
119+
* @return the read KBytes consumed
120+
*/
121+
public int getReadKB() {
122+
return readKB;
123+
}
124+
125+
/**
126+
* Returns the write throughput consumed by all iterators of this operation,
127+
* in KBytes.
128+
*
129+
* @return the write KBytes consumed
130+
*/
131+
public int getWriteKB() {
132+
return writeKB;
133+
}
134+
135+
/**
136+
* Returns the read throughput consumed by all iterators of this operation,
137+
* in read units.
138+
* This number may be larger than that returned by {@link #getReadKB} if
139+
* the operation used {@link Consistency#ABSOLUTE}
140+
*
141+
* @return the read units consumed
142+
*/
143+
public int getReadUnits() {
144+
return readUnits;
145+
}
146+
147+
/**
148+
* Returns the write throughput consumed by all iterators of this operation,
149+
* in write units.
150+
*
151+
* @return the write units consumed
152+
*/
153+
public int getWriteUnits() {
154+
return writeUnits;
155+
}
156+
157+
@Override
158+
public void close() {
159+
if (unclosedIters != null) {
160+
unclosedIters.forEach(iter -> iter.close());
161+
}
162+
}
163+
164+
/**
165+
* Implements an iterator over all results of a query.
166+
* Internally the driver gets a batch of rows, at a time, from the server.
167+
*/
168+
private class QueryResultIterator implements Iterator<MapValue> {
169+
final QueryRequest internalRequest;
170+
Iterator<MapValue> partialResultsIterator;
171+
boolean closed = false;
172+
173+
QueryResultIterator(QueryRequest queryRequest) {
174+
this.internalRequest = queryRequest;
175+
}
176+
177+
private void compute() {
178+
QueryResult internalResult;
179+
if (partialResultsIterator == null) {
180+
internalResult =
181+
handle.query(internalRequest);
182+
List<MapValue> partialResults = internalResult.getResults();
183+
partialResultsIterator = partialResults.iterator();
184+
setStats(internalResult);
185+
}
186+
187+
while (!partialResultsIterator.hasNext() &&
188+
!internalRequest.isDone()) {
189+
190+
// get the batch of results
191+
internalResult = handle.query(internalRequest);
192+
193+
partialResultsIterator = internalResult.getResults().iterator();
194+
setStats(internalResult);
195+
}
196+
197+
if (internalRequest.isDone() && !partialResultsIterator.hasNext()) {
198+
close();
199+
}
200+
}
201+
202+
private void setStats(QueryResult internalResult) {
203+
readKB += internalResult.getReadKB();
204+
readUnits += internalResult.getReadUnits();
205+
writeKB += internalResult.getWriteKB();
206+
writeUnits += internalResult.getWriteUnits();
207+
setRateLimitDelayedMs(
208+
getRateLimitDelayedMs() +
209+
internalResult.getRateLimitDelayedMs());
210+
setReadKB(getReadKB() +
211+
internalResult.getReadKB());
212+
setReadUnits(
213+
getReadUnits() +
214+
internalResult.getReadUnits());
215+
setWriteKB(getWriteKB()
216+
+ internalResult.getWriteKB());
217+
218+
if(internalResult.getRetryStats() != null) {
219+
if (getRetryStats() == null) {
220+
setRetryStats(
221+
new RetryStats());
222+
}
223+
getRetryStats().addStats(
224+
internalResult.getRetryStats());
225+
}
226+
}
227+
228+
/**
229+
* Returns {@code true} if the iteration has more results.
230+
*
231+
* @return {@code true} if the iteration has more results
232+
*
233+
* @throws IllegalArgumentException if any of the parameters are invalid
234+
* or required parameters are missing
235+
*
236+
* @throws NoSQLException if the operation cannot be performed for
237+
* any other reason
238+
*/
239+
@Override
240+
public boolean hasNext() {
241+
if (closed) {
242+
return false;
243+
}
244+
compute();
245+
return partialResultsIterator.hasNext();
246+
}
247+
248+
/**
249+
* Returns the next result for the query.
250+
*
251+
* @return the next result
252+
*
253+
* @throws NoSuchElementException if the iteration has no more results
254+
*
255+
* @throws IllegalArgumentException if any of the parameters are invalid
256+
* or required parameters are missing
257+
*
258+
* @throws NoSQLException if the operation cannot be performed for
259+
* any other reason
260+
*/
261+
@Override
262+
public MapValue next() {
263+
if (closed) {
264+
throw new NoSuchElementException("Iterator already closed.");
265+
}
266+
compute();
267+
return partialResultsIterator.next();
268+
}
269+
270+
/**
271+
* Terminates the query execution and releases any memory consumed by
272+
* the query at the driver. An application should use this method if it
273+
* wishes to terminate query execution before retrieving all of the
274+
* query results.
275+
*/
276+
public void close() {
277+
closed = true;
278+
internalRequest.close();
279+
removeTracking(this);
280+
}
281+
}
282+
}

0 commit comments

Comments
 (0)