Skip to content

Commit 5f61009

Browse files
authored
Fix serialization problem using heap based netty ByteBuf
The serialization code assumed that a non-direct Netty ByteBuf has a single contiguous byte array backing it and this is not always the case. The optimization for this case has been removed.
1 parent b3f5759 commit 5f61009

File tree

4 files changed

+105
-93
lines changed

4 files changed

+105
-93
lines changed

driver/src/main/java/oracle/nosql/driver/util/NettyByteInputStream.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,11 @@ public byte[] array() {
5858

5959
@Override
6060
public void setOffset(int offset) {
61-
buffer.readerIndex(offset);
61+
try {
62+
buffer.readerIndex(offset);
63+
} catch (IndexOutOfBoundsException e) {
64+
throw new IllegalArgumentException(e);
65+
}
6266
}
6367

6468
@Override

driver/src/main/java/oracle/nosql/driver/util/PackedInteger.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,8 +178,20 @@ public static long readSortedLong(byte[] buf, int off) {
178178
*/
179179
public static int getReadSortedIntLength(byte[] buf, int off) {
180180

181-
/* The first byte of the buf stores the length of the value part. */
182-
int b1 = buf[off] & 0xff;
181+
return getReadSortedIntLength(buf[off]);
182+
}
183+
184+
/**
185+
* Returns the number of bytes that would be read by {@link
186+
* #readSortedInt} based on the initial byte of the buffer.
187+
*
188+
* @param value the byte containing the length
189+
*
190+
* @return the number of bytes that would be read.
191+
*/
192+
public static int getReadSortedIntLength(byte value) {
193+
194+
int b1 = value & 0xff;
183195
if (b1 < 0x08) {
184196
return 1 + 0x08 - b1;
185197
}

driver/src/main/java/oracle/nosql/driver/util/SerializationUtil.java

Lines changed: 8 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -35,24 +35,9 @@ public class SerializationUtil {
3535
*/
3636
public static int readPackedInt(ByteInputStream in) throws IOException {
3737

38-
if (in.isDirect()) {
39-
return readPackedIntDirect(in);
40-
}
41-
in.ensureCapacity(1);
42-
final int offset = in.getOffset();
43-
final byte[] array = in.array();
44-
final int len = PackedInteger.getReadSortedIntLength(array, offset);
45-
/* move the offset past the integer; this also ensures length */
46-
in.skip(len);
47-
return PackedInteger.readSortedInt(array, offset);
48-
}
49-
50-
private static int readPackedIntDirect(ByteInputStream in)
51-
throws IOException {
52-
5338
final byte[] bytes = new byte[PackedInteger.MAX_LENGTH];
5439
in.readFully(bytes, 0, 1);
55-
final int len = PackedInteger.getReadSortedIntLength(bytes, 0);
40+
final int len = PackedInteger.getReadSortedIntLength(bytes[0]);
5641
try {
5742
in.readFully(bytes, 1, len - 1);
5843
} catch (IndexOutOfBoundsException e) {
@@ -71,23 +56,8 @@ private static int readPackedIntDirect(ByteInputStream in)
7156
public static int skipPackedInt(ByteInputStream in)
7257
throws IOException {
7358

74-
if (in.isDirect()) {
75-
return skipPackedIntDirect(in);
76-
}
77-
in.ensureCapacity(1);
78-
final int offset = in.getOffset();
79-
final byte[] array = in.array();
80-
final int len = PackedInteger.getReadSortedIntLength(array, offset);
81-
/* move the offset past the integer; this also ensures length */
82-
in.skip(len);
83-
return len;
84-
}
85-
86-
private static int skipPackedIntDirect(ByteInputStream in)
87-
throws IOException {
88-
8959
byte b = in.readByte();
90-
int len = PackedInteger.getReadSortedIntLength(new byte[]{b}, 0);
60+
int len = PackedInteger.getReadSortedIntLength(b);
9161
if (len > 1) {
9262
in.skip(len - 1);
9363
}
@@ -105,20 +75,6 @@ private static int skipPackedIntDirect(ByteInputStream in)
10575
public static int writePackedInt(ByteOutputStream out, int value)
10676
throws IOException {
10777

108-
if (out.isDirect()) {
109-
return writePackedIntDirect(out, value);
110-
}
111-
final int len = PackedInteger.getWriteSortedIntLength(value);
112-
out.ensureCapacity(len);
113-
final int offset = out.getOffset();
114-
final byte[] array = out.array();
115-
out.skip(len);
116-
return PackedInteger.writeSortedInt(array, offset, value);
117-
}
118-
119-
public static int writePackedIntDirect(ByteOutputStream out, int value)
120-
throws IOException {
121-
12278
final byte[] buf = new byte[PackedInteger.MAX_LENGTH];
12379
final int offset = PackedInteger.writeSortedInt(buf, 0, value);
12480
out.write(buf, 0, offset);
@@ -134,25 +90,10 @@ public static int writePackedIntDirect(ByteOutputStream out, int value)
13490
*/
13591
public static long readPackedLong(ByteInputStream in) throws IOException {
13692

137-
if (in.isDirect()) {
138-
return readPackedLongDirect(in);
139-
}
140-
141-
in.ensureCapacity(1);
142-
final int offset = in.getOffset();
143-
final byte[] array = in.array();
144-
final int len = PackedInteger.getReadSortedLongLength(array, offset);
145-
/* move the offset past the integer; this also ensures length */
146-
in.skip(len);
147-
return PackedInteger.readSortedLong(array, offset);
148-
}
149-
150-
public static long readPackedLongDirect(ByteInputStream in)
151-
throws IOException {
152-
15393
final byte[] bytes = new byte[PackedInteger.MAX_LONG_LENGTH];
15494
in.readFully(bytes, 0, 1);
155-
final int len = PackedInteger.getReadSortedLongLength(bytes, 0);
95+
/* long and int store length the same way */
96+
final int len = PackedInteger.getReadSortedIntLength(bytes[0]);
15697
try {
15798
in.readFully(bytes, 1, len - 1);
15899
} catch (IndexOutOfBoundsException e) {
@@ -184,21 +125,6 @@ public static int skipPackedLong(ByteInputStream in) throws IOException {
184125
public static int writePackedLong(ByteOutputStream out, long value)
185126
throws IOException {
186127

187-
if (out.isDirect()) {
188-
return writePackedLongDirect(out, value);
189-
}
190-
191-
final int len = PackedInteger.getWriteSortedLongLength(value);
192-
out.ensureCapacity(len);
193-
final int offset = out.getOffset();
194-
final byte[] array = out.array();
195-
out.skip(len);
196-
return PackedInteger.writeSortedLong(array, offset, value);
197-
}
198-
199-
public static int writePackedLongDirect(ByteOutputStream out, long value)
200-
throws IOException {
201-
202128
final byte[] buf = new byte[PackedInteger.MAX_LONG_LENGTH];
203129
final int offset = PackedInteger.writeSortedLong(buf, 0, value);
204130
out.write(buf, 0, offset);
@@ -285,18 +211,10 @@ private static String readStdUTF8String(ByteInputStream in)
285211
return EMPTY_STRING;
286212
}
287213

288-
if (in.isDirect()) {
289-
final byte[] bytes = new byte[length];
290-
in.readFully(bytes);
291-
return StandardCharsets.UTF_8.decode(
292-
ByteBuffer.wrap(bytes)).toString();
293-
} else {
294-
final byte[] bytes = in.array();
295-
int offset = in.getOffset();
296-
in.skip(length);
297-
return StandardCharsets.UTF_8.decode(
298-
ByteBuffer.wrap(bytes, offset, length)).toString();
299-
}
214+
final byte[] bytes = new byte[length];
215+
in.readFully(bytes);
216+
return StandardCharsets.UTF_8.decode(
217+
ByteBuffer.wrap(bytes)).toString();
300218
}
301219

302220
/**

driver/src/main/java/oracle/nosql/driver/values/MapWalker.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,46 @@ public String getCurrentName() {
7474
return currentName;
7575
}
7676

77+
/**
78+
* Returns the current index into the map
79+
* @return the index
80+
*/
81+
public int getCurrentIndex() {
82+
return currentIndex;
83+
}
84+
85+
/**
86+
* Returns the current offset of the underlying ByteInputStream
87+
*
88+
* @return the offset
89+
*/
90+
public int getStreamOffset() {
91+
return bis.getOffset();
92+
}
93+
94+
/**
95+
* Resets current index and stream state, allowing a MapWalker to
96+
* be reset to an earlier state. The current name will not be set
97+
* correctly until a {@link MapWalker#next} operation is performed.
98+
* <p>
99+
* This operation can be used in conjunction with
100+
* {@link MapWalker#getCurrentIndex} and {@link MapWalker#getStreamOffset}.
101+
* The stream should be positioned at the <b>beginning</b> of a map
102+
* entry, e.g. at the name, ready for the {@link MapWalker#next} call to be
103+
* made. The caller acquires the current index and offset and can then
104+
* manipulate the MapWalker as needed, after which is can call this method
105+
* with the saved offset and index to reset the state. The positioning of
106+
* the stream isn't enforced, but the caller must know the state so that
107+
* if it continues to use the MapWalker after reset it does not fail.
108+
*
109+
* @param index index into the map
110+
* @param streamOffset offset to use into the input stream
111+
*/
112+
public void reset(int index, int streamOffset) {
113+
currentIndex = index;
114+
bis.setOffset(streamOffset);
115+
}
116+
77117
/**
78118
* Returns the number of elements in the map
79119
* @return the number of elements
@@ -123,4 +163,42 @@ public void next() throws IOException {
123163
public void skip() throws IOException {
124164
Nson.generateEventsFromNson(null, bis, true);
125165
}
166+
167+
/**
168+
* Searches the map looking for a matching field, optionally
169+
* case-sensitive or case-insensitive. If
170+
* the field is found the method returns true and the stream is positioned
171+
* at the field's value. If the field does not exist the method returns
172+
* false and the stream is at the end of the map. In both cases it is
173+
* likely that the caller will want to reset the stream in the walker using
174+
* the {@link MapWalker#reset} method, using information saved before
175+
* this call because this call does not preserve the current location in
176+
* the stream.
177+
* <p>
178+
* When this call is made the stream must be positioned at the start of a
179+
* field, on the field name, or it will fail with an unpredictable
180+
* exception.
181+
* <p>
182+
* The field name is a single component and only the current map will be
183+
* searched. No navigation is performed.
184+
*
185+
* @param fieldName the field to look for
186+
* @param caseSensitive set to true for a case-sensitive search
187+
* @return true if the field is found, false otherwise
188+
*/
189+
public boolean find(String fieldName,
190+
boolean caseSensitive) throws IOException {
191+
while (hasNext()) {
192+
next();
193+
boolean found = (caseSensitive ?
194+
fieldName.equals(currentName) :
195+
fieldName.equalsIgnoreCase(currentName));
196+
if (found) {
197+
return true;
198+
} else {
199+
skip();
200+
}
201+
}
202+
return false;
203+
}
126204
}

0 commit comments

Comments
 (0)