Skip to content

Commit a46687f

Browse files
committed
Fixed an issue in serial version logic.
Fixed a case where multiple threads running parallel requests to an older server might result in unsupported protocol errors.
1 parent 3a02229 commit a46687f

File tree

3 files changed

+103
-17
lines changed

3 files changed

+103
-17
lines changed

driver/src/main/java/oracle/nosql/driver/http/Client.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ public class Client {
162162
*/
163163
private ExecutorService threadPool;
164164

165-
private short serialVersion = DEFAULT_SERIAL_VERSION;
165+
private volatile short serialVersion = DEFAULT_SERIAL_VERSION;
166166

167167
/* for one-time messages */
168168
private final HashSet<String> oneTimeMessages;
@@ -489,7 +489,7 @@ public Result execute(Request kvRequest) {
489489
}
490490

491491
ResponseHandler responseHandler = null;
492-
492+
short serialVersionUsed = serialVersion;
493493
ByteBuf buffer = null;
494494
long networkLatency;
495495
try {
@@ -514,7 +514,7 @@ public Result execute(Request kvRequest) {
514514
*/
515515
kvRequest.setCheckRequestSize(false);
516516

517-
writeContent(buffer, kvRequest);
517+
serialVersionUsed = writeContent(buffer, kvRequest);
518518

519519
/*
520520
* If on-premise the authProvider will always be a
@@ -574,7 +574,7 @@ public Result execute(Request kvRequest) {
574574
int resSize = wireContent.readerIndex();
575575
networkLatency = System.currentTimeMillis() - networkLatency;
576576

577-
if (serialVersion < 3) {
577+
if (serialVersionUsed < 3) {
578578
/* so we can emit a one-time message if the app */
579579
/* tries to access modificationTime */
580580
if (res instanceof GetResult) {
@@ -690,7 +690,7 @@ public Result execute(Request kvRequest) {
690690
continue;
691691
} catch (UnsupportedProtocolException upe) {
692692
/* reduce protocol version and try again */
693-
if (decrementSerialVersion() == true) {
693+
if (decrementSerialVersion(serialVersionUsed) == true) {
694694
exception = upe;
695695
logInfo(logger, "Got unsupported protocol error " +
696696
"from server: decrementing serial version to " +
@@ -933,15 +933,17 @@ boolean timeoutRequest(long startTime,
933933
*
934934
* @throws IOException
935935
*/
936-
void writeContent(ByteBuf content, Request kvRequest)
936+
private short writeContent(ByteBuf content, Request kvRequest)
937937
throws IOException {
938938

939939
final NettyByteOutputStream bos = new NettyByteOutputStream(content);
940-
bos.writeShort(serialVersion);
940+
final short versionUsed = serialVersion;
941+
bos.writeShort(versionUsed);
941942
kvRequest.createSerializer(factory).
942943
serialize(kvRequest,
943-
serialVersion,
944+
versionUsed,
944945
bos);
946+
return versionUsed;
945947
}
946948

947949
/**
@@ -1244,7 +1246,10 @@ StatsControl getStatsControl() {
12441246
* @return true: version was decremented
12451247
* false: already at lowest version number.
12461248
*/
1247-
public boolean decrementSerialVersion() {
1249+
private synchronized boolean decrementSerialVersion(short versionUsed) {
1250+
if (serialVersion != versionUsed) {
1251+
return true;
1252+
}
12481253
if (serialVersion == V3) {
12491254
serialVersion = V2;
12501255
return true;
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*-
2+
* Copyright (c) 2011, 2022 Oracle and/or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Universal Permissive License v 1.0 as shown at
5+
* https://oss.oracle.com/licenses/upl/
6+
*/
7+
8+
package oracle.nosql.driver;
9+
10+
import static org.junit.Assert.fail;
11+
12+
import java.util.concurrent.CountDownLatch;
13+
import java.util.concurrent.atomic.AtomicInteger;
14+
15+
import org.junit.Before;
16+
import org.junit.Test;
17+
18+
import oracle.nosql.driver.ops.ListTablesRequest;
19+
20+
public class InternalsTest extends ProxyTestBase {
21+
22+
@Override
23+
protected void perTestHandleConfig(NoSQLHandleConfig config) {
24+
config.setConnectionPoolSize(10);
25+
config.setNumThreads(8);
26+
}
27+
28+
@Before
29+
@Override
30+
/*
31+
* Override the default beforeTest() so we do not make
32+
* any requests at all to the server before the actual test
33+
* is run. The default runs a ListTables request.
34+
*/
35+
public void beforeTest() throws Exception {
36+
handle = getHandle(endpoint);
37+
/* do NOT list tables here */
38+
}
39+
40+
@Test
41+
public void serialSetupTest() {
42+
/*
43+
* Start N threads. Make them all run at the same time.
44+
* Each thread will issue requests. Verify that none of them
45+
* throw UnsupportedProtocolException. This verifies that the
46+
* logic inside Client properly manages setting the serial version
47+
* on the first request(s) that may happen in parallel.
48+
*
49+
* This test really only does anything if the server it's running
50+
* against is an older version. But it's ok to run with the current
51+
* server version anyway.
52+
*/
53+
CountDownLatch latch = new CountDownLatch(1);
54+
AtomicInteger errors = new AtomicInteger(0);
55+
final int numThreads = 5;
56+
Thread[] threads = new Thread[numThreads];
57+
for (int x=0; x<numThreads; x++) {
58+
threads[x] = new Thread(() -> {
59+
try { latch.await(); } catch (Exception e) {}
60+
runRequests(errors);
61+
});
62+
threads[x].start();
63+
}
64+
65+
latch.countDown();
66+
67+
/* wait for threads to finish */
68+
for(int x=0; x<numThreads; x++) {
69+
try { threads[x].join(); } catch (Exception e) {}
70+
}
71+
if (errors.get() > 0) {
72+
fail("Got " + errors.get() + " errors");
73+
}
74+
}
75+
76+
private void runRequests(AtomicInteger errors) {
77+
ListTablesRequest req = new ListTablesRequest()
78+
.setLimit(1);
79+
try {
80+
for (int x=0; x<5; x++) {
81+
handle.listTables(req);
82+
}
83+
} catch (Exception e) {
84+
System.out.println("Got exception: " + e);
85+
errors.incrementAndGet();
86+
}
87+
}
88+
}

driver/src/test/java/oracle/nosql/driver/ProxyTestBase.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -387,15 +387,13 @@ static private TableResult dropTableWithoutWait(NoSQLHandle nosqlHandle,
387387
}
388388

389389
protected NoSQLHandle getHandle(String ep) {
390-
391390
NoSQLHandleConfig config = new NoSQLHandleConfig(ep);
392391
serviceURL = config.getServiceURL();
393392
return setupHandle(config);
394393
}
395394

396395
/* Set configuration values for the handle */
397396
protected NoSQLHandle setupHandle(NoSQLHandleConfig config) {
398-
399397
/*
400398
* 5 retries, default retry algorithm
401399
*/
@@ -412,12 +410,7 @@ protected NoSQLHandle setupHandle(NoSQLHandleConfig config) {
412410

413411
NoSQLHandle h = getHandle(config);
414412

415-
/* this will set up the right protocol serial version */
416-
try {
417-
getTable("noop", h);
418-
} catch (Exception e) {
419-
/* ignore errors */
420-
}
413+
/* serial version will be set by default ListTables() in beforeTest */
421414

422415
return h;
423416
}

0 commit comments

Comments
 (0)