6767import oracle .nosql .driver .StatsControl ;
6868import oracle .nosql .driver .TableNotFoundException ;
6969import oracle .nosql .driver .UnsupportedProtocolException ;
70+ import oracle .nosql .driver .UnsupportedQueryVersionException ;
7071import oracle .nosql .driver .WriteThrottlingException ;
7172import oracle .nosql .driver .httpclient .HttpClient ;
7273import oracle .nosql .driver .httpclient .ResponseHandler ;
7980import oracle .nosql .driver .ops .GetRequest ;
8081import oracle .nosql .driver .ops .GetResult ;
8182import oracle .nosql .driver .ops .GetTableRequest ;
83+ import oracle .nosql .driver .ops .PrepareRequest ;
8284import oracle .nosql .driver .ops .PutRequest ;
8385import oracle .nosql .driver .ops .QueryRequest ;
8486import oracle .nosql .driver .ops .QueryResult ;
9193import oracle .nosql .driver .ops .WriteResult ;
9294import oracle .nosql .driver .ops .serde .BinaryProtocol ;
9395import oracle .nosql .driver .ops .serde .BinarySerializerFactory ;
96+ import oracle .nosql .driver .ops .serde .Serializer ;
9497import oracle .nosql .driver .ops .serde .SerializerFactory ;
9598import oracle .nosql .driver .ops .serde .nson .NsonSerializerFactory ;
9699import oracle .nosql .driver .query .QueryDriver ;
100+ import oracle .nosql .driver .query .TopologyInfo ;
97101import oracle .nosql .driver .util .ByteInputStream ;
98102import oracle .nosql .driver .util .HttpConstants ;
99103import oracle .nosql .driver .util .NettyByteInputStream ;
@@ -180,6 +184,9 @@ public class Client {
180184
181185 private volatile short serialVersion = DEFAULT_SERIAL_VERSION ;
182186
187+ /* separate version for query compatibility */
188+ private volatile short queryVersion = QueryDriver .QUERY_VERSION ;
189+
183190 /* for one-time messages */
184191 private final HashSet <String > oneTimeMessages ;
185192
@@ -207,6 +214,8 @@ public class Client {
207214 /* for keeping track of SDKs usage */
208215 private String userAgent ;
209216
217+ private volatile TopologyInfo topology ;
218+
210219 public Client (Logger logger ,
211220 NoSQLHandleConfig httpConfig ) {
212221
@@ -376,9 +385,16 @@ public Result execute(Request kvRequest) {
376385 kvRequest .setRetryStats (null );
377386 kvRequest .setRateLimitDelayedMs (0 );
378387
388+ /* kvRequest.isQueryRequest() returns true if kvRequest is a
389+ * non-internal QueryRequest */
379390 if (kvRequest .isQueryRequest ()) {
391+
380392 QueryRequest qreq = (QueryRequest )kvRequest ;
381393
394+ /* Set the topo seq num in the request, if it has not been set
395+ * already */
396+ kvRequest .setTopoSeqNum (getTopoSeqNum ());
397+
382398 statsControl .observeQuery (qreq );
383399
384400 /*
@@ -406,7 +422,6 @@ public Result execute(Request kvRequest) {
406422 trace ("QueryRequest has no QueryDriver, but is prepared" , 2 );
407423 QueryDriver driver = new QueryDriver (qreq );
408424 driver .setClient (this );
409- driver .setTopologyInfo (qreq .topologyInfo ());
410425 return new QueryResult (qreq , false );
411426 }
412427
@@ -422,6 +437,7 @@ public Result execute(Request kvRequest) {
422437 * QueryResult.
423438 */
424439 trace ("QueryRequest has no QueryDriver and is not prepared" , 2 );
440+ qreq .incBatchCounter ();
425441 }
426442
427443 int timeoutMs = kvRequest .getTimeoutInternal ();
@@ -557,6 +573,7 @@ public Result execute(Request kvRequest) {
557573
558574 ResponseHandler responseHandler = null ;
559575 short serialVersionUsed = serialVersion ;
576+ short queryVersionUsed = queryVersion ;
560577 ByteBuf buffer = null ;
561578 try {
562579 /*
@@ -588,6 +605,13 @@ public Result execute(Request kvRequest) {
588605 */
589606 kvRequest .setCheckRequestSize (false );
590607
608+ /* Set the topo seq num in the request, if it has not been set
609+ * already */
610+ if (!(kvRequest instanceof QueryRequest ) ||
611+ kvRequest .isQueryRequest ()) {
612+ kvRequest .setTopoSeqNum (getTopoSeqNum ());
613+ }
614+
591615 /*
592616 * Temporarily change the timeout in the request object so
593617 * the serialized timeout sent to the server is correct for
@@ -596,7 +620,8 @@ public Result execute(Request kvRequest) {
596620 * processing (retry delays, etc) work correctly.
597621 */
598622 kvRequest .setTimeoutInternal (thisIterationTimeoutMs );
599- serialVersionUsed = writeContent (buffer , kvRequest );
623+ serialVersionUsed = writeContent (buffer , kvRequest ,
624+ queryVersionUsed );
600625 kvRequest .setTimeoutInternal (timeoutMs );
601626
602627 /*
@@ -690,13 +715,16 @@ public Result execute(Request kvRequest) {
690715 Result res = processResponse (responseHandler .getStatus (),
691716 responseHandler .getHeaders (),
692717 wireContent ,
693- kvRequest );
718+ kvRequest ,
719+ serialVersionUsed ,
720+ queryVersionUsed );
694721 rateDelayedMs += getRateDelayedFromHeader (
695722 responseHandler .getHeaders ());
696723 int resSize = wireContent .readerIndex ();
697724 long networkLatency =
698725 (System .nanoTime () - latencyNanos ) / 1_000_000 ;
699726
727+ setTopology (res .getTopology ());
700728
701729 if (serialVersionUsed < 3 ) {
702730 /* so we can emit a one-time message if the app */
@@ -708,6 +736,11 @@ public Result execute(Request kvRequest) {
708736 }
709737 }
710738
739+ if (res instanceof QueryResult && kvRequest .isQueryRequest ()) {
740+ QueryRequest qreq = (QueryRequest )kvRequest ;
741+ qreq .addQueryTraces (((QueryResult )res ).getQueryTraces ());
742+ }
743+
711744 if (res instanceof TableResult && rateLimiterMap != null ) {
712745 /* update rate limiter settings for table */
713746 TableLimits tl = ((TableResult )res ).getTableLimits ();
@@ -843,8 +876,17 @@ public Result execute(Request kvRequest) {
843876 kvRequest .incrementRetries ();
844877 exception = re ;
845878 continue ;
879+ } catch (UnsupportedQueryVersionException uqve ) {
880+ /* decrement query version and try again */
881+ if (decrementQueryVersion (queryVersionUsed ) == true ) {
882+ logFine (logger , "Got unsupported query version error " +
883+ "from server: decrementing query version to " +
884+ queryVersion + " and trying again." );
885+ continue ;
886+ }
887+ throw uqve ;
846888 } catch (UnsupportedProtocolException upe ) {
847- /* reduce protocol version and try again */
889+ /* decrement protocol version and try again */
848890 if (decrementSerialVersion (serialVersionUsed ) == true ) {
849891 /* Don't set this exception: it's misleading */
850892 /* exception = upe; */
@@ -1133,16 +1175,25 @@ boolean timeoutRequest(long startNanos,
11331175 *
11341176 * @throws IOException
11351177 */
1136- private short writeContent (ByteBuf content , Request kvRequest )
1178+ private short writeContent (ByteBuf content , Request kvRequest ,
1179+ short queryVersion )
11371180 throws IOException {
11381181
11391182 final NettyByteOutputStream bos = new NettyByteOutputStream (content );
11401183 final short versionUsed = serialVersion ;
11411184 SerializerFactory factory = chooseFactory (kvRequest );
11421185 factory .writeSerialVersion (versionUsed , bos );
1143- kvRequest .createSerializer (factory ).serialize (kvRequest ,
1144- versionUsed ,
1145- bos );
1186+ if (kvRequest instanceof QueryRequest ||
1187+ kvRequest instanceof PrepareRequest ) {
1188+ kvRequest .createSerializer (factory ).serialize (kvRequest ,
1189+ versionUsed ,
1190+ queryVersion ,
1191+ bos );
1192+ } else {
1193+ kvRequest .createSerializer (factory ).serialize (kvRequest ,
1194+ versionUsed ,
1195+ bos );
1196+ }
11461197 return versionUsed ;
11471198 }
11481199
@@ -1157,7 +1208,9 @@ private short writeContent(ByteBuf content, Request kvRequest)
11571208 final Result processResponse (HttpResponseStatus status ,
11581209 HttpHeaders headers ,
11591210 ByteBuf content ,
1160- Request kvRequest ) {
1211+ Request kvRequest ,
1212+ short serialVersionUsed ,
1213+ short queryVersionUsed ) {
11611214
11621215 if (!HttpResponseStatus .OK .equals (status )) {
11631216 processNotOKResponse (status , content );
@@ -1170,7 +1223,8 @@ final Result processResponse(HttpResponseStatus status,
11701223 setSessionCookie (headers );
11711224
11721225 try (ByteInputStream bis = new NettyByteInputStream (content )) {
1173- return processOKResponse (bis , kvRequest );
1226+ return processOKResponse (bis , kvRequest , serialVersionUsed ,
1227+ queryVersionUsed );
11741228 }
11751229 }
11761230
@@ -1181,17 +1235,26 @@ final Result processResponse(HttpResponseStatus status,
11811235 *
11821236 * @throws NoSQLException if the stream could not be read for some reason
11831237 */
1184- Result processOKResponse (ByteInputStream in , Request kvRequest ) {
1238+ Result processOKResponse (ByteInputStream in , Request kvRequest ,
1239+ short serialVersionUsed , short queryVersionUsed ) {
11851240 try {
11861241 SerializerFactory factory = chooseFactory (kvRequest );
11871242 int code = factory .readErrorCode (in );
11881243 /* note: this will always be zero in V4 */
11891244 if (code == 0 ) {
1190- Result res =
1191- kvRequest .createDeserializer (factory ).
1192- deserialize (kvRequest ,
1193- in ,
1194- serialVersion );
1245+ Result res ;
1246+ Serializer ser = kvRequest .createDeserializer (factory );
1247+ if (kvRequest instanceof QueryRequest ||
1248+ kvRequest instanceof PrepareRequest ) {
1249+ res = ser .deserialize (kvRequest ,
1250+ in ,
1251+ serialVersionUsed ,
1252+ queryVersionUsed );
1253+ } else {
1254+ res = ser .deserialize (kvRequest ,
1255+ in ,
1256+ serialVersionUsed );
1257+ }
11951258
11961259 if (kvRequest .isQueryRequest ()) {
11971260 QueryRequest qreq = (QueryRequest )kvRequest ;
@@ -1531,6 +1594,24 @@ private synchronized boolean decrementSerialVersion(short versionUsed) {
15311594 return false ;
15321595 }
15331596
1597+ /**
1598+ * @hidden
1599+ *
1600+ * Try to decrement the query protocol version.
1601+ * @return true: version was decremented
1602+ * false: already at lowest version number.
1603+ */
1604+ private synchronized boolean decrementQueryVersion (short versionUsed ) {
1605+ if (queryVersion != versionUsed ) {
1606+ return true ;
1607+ }
1608+ if (queryVersion == QueryDriver .QUERY_V4 ) {
1609+ queryVersion = QueryDriver .QUERY_V3 ;
1610+ return true ;
1611+ }
1612+ return false ;
1613+ }
1614+
15341615 /**
15351616 * @hidden
15361617 * For testing use
@@ -1789,4 +1870,24 @@ private byte[] getBodyBytes(ByteBuf buffer) {
17891870 public void setDefaultNamespace (String ns ) {
17901871 config .setDefaultNamespace (ns );
17911872 }
1873+
1874+ public TopologyInfo getTopology () {
1875+ return topology ;
1876+ }
1877+
1878+ private synchronized int getTopoSeqNum () {
1879+ return (topology == null ? -1 : topology .getSeqNum ());
1880+ }
1881+
1882+ private synchronized void setTopology (TopologyInfo topo ) {
1883+
1884+ if (topo == null ) {
1885+ return ;
1886+ }
1887+
1888+ if (topology == null || topology .getSeqNum () < topo .getSeqNum ()) {
1889+ topology = topo ;
1890+ trace ("New topology: " + topo , 1 );
1891+ }
1892+ }
17921893}
0 commit comments