Skip to content

Commit 6b6b3b2

Browse files
committed
chore(nodejs): grow the buffer if data does not fit in it
1 parent 5f050a7 commit 6b6b3b2

File tree

4 files changed

+105
-68
lines changed

4 files changed

+105
-68
lines changed

notes.md

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,9 @@
22
> ./scripts/generateCerts.sh . questdbPwd123
33
44
### TODO:
5-
- The client should grow the buffer and call flush() immediately instead of throwing on insufficient buffer size,
6-
like other clients do.
7-
Alternatively, the sender could be growing the buffer on row writes and shrinking it on a subsequent flush() call.
8-
The bufferSize parameter in the constructor can also be optional if this is done.
9-
- Config object for Sender
5+
- ~~The client should grow the buffer instead of throwing on insufficient buffer size, like other clients do~~
6+
- If the buffer had to be extended, shrink it back to original size on a subsequent flush() call?
7+
- Config object for Sender, make bufferSize optional by setting a default
108
- ~~Github actions to run 'npm test' on each commit and run 'npm publish' when version is bumped in package.json~~
119
- ~~Provide a builder style API:
1210
builder.table('tab').symbol('symName', any).intColumn('intCol', 34).timestampColumn('tsCol', 23232323).atNow();~~

src/sender.js

Lines changed: 67 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ class Sender {
2626
/** @private */ bufferSize;
2727
/** @private */ buffer;
2828
/** @private */ position;
29+
/** @private */ endOfLastRow;
2930
/** @private */ hasTable;
3031
/** @private */ hasSymbols;
3132
/** @private */ hasColumns;
@@ -39,18 +40,23 @@ class Sender {
3940
constructor(bufferSize, jwk = undefined) {
4041
this.jwk = jwk;
4142
this.resize(bufferSize);
43+
this.reset();
4244
}
4345

4446
/**
45-
* Reinitializes the buffer of the sender. <br>
47+
* Extends the size of the sender's buffer. <br>
4648
* Can be used to increase the size of buffer if overflown.
49+
* The buffer's content is copied into the new buffer.
4750
*
4851
* @param {number} bufferSize - New size of the buffer used by the sender, provided in bytes.
4952
*/
5053
resize(bufferSize) {
5154
this.bufferSize = bufferSize;
52-
this.buffer = Buffer.alloc(this.bufferSize + 1, 0, 'utf8');
53-
this.reset();
55+
const newBuffer = Buffer.alloc(this.bufferSize + 1, 0, 'utf8');
56+
if (this.buffer) {
57+
this.buffer.copy(newBuffer);
58+
}
59+
this.buffer = newBuffer;
5460
}
5561

5662
/**
@@ -136,30 +142,32 @@ class Sender {
136142
}
137143

138144
/**
139-
* Sends the buffer's content to the database and clears the buffer.
145+
* Sends the buffer's content to the database and compacts the buffer.
146+
* If the last row is not finished it stays in the sender's buffer.
147+
*
148+
* @return {boolean} Returns true if there was data in the buffer to send.
140149
*/
141150
async flush() {
142-
const data = this.toBuffer();
151+
const data = this.toBuffer(this.endOfLastRow);
152+
if (!data) {
153+
return false;
154+
}
143155
return new Promise((resolve, reject) => {
144156
this.socket.write(data, err => {
145-
this.reset();
146-
err ? reject(err) : resolve();
157+
compact(this);
158+
err ? reject(err) : resolve(true);
147159
});
148160
});
149161
}
150162

151163
/**
152164
* @ignore
153-
* @return {Buffer} Returns a cropped buffer ready to send to the server.
165+
* @return {Buffer} Returns a cropped buffer ready to send to the server or null if there is nothing to send.
154166
*/
155-
toBuffer() {
156-
if (this.hasTable) {
157-
throw new Error("The buffer's content is invalid, row needs to be closed by calling at() or atNow()");
158-
}
159-
if (this.position < 1) {
160-
throw new Error("The buffer is empty");
161-
}
162-
return this.buffer.subarray(0, this.position);
167+
toBuffer(pos = this.position) {
168+
return pos > 0
169+
? this.buffer.subarray(0, pos)
170+
: null;
163171
}
164172

165173
/**
@@ -176,6 +184,7 @@ class Sender {
176184
throw new Error("Table name has already been set");
177185
}
178186
validateTableName(table);
187+
checkCapacity(this, [table]);
179188
writeEscaped(this, table);
180189
this.hasTable = true;
181190
return this;
@@ -195,11 +204,13 @@ class Sender {
195204
if (!this.hasTable || this.hasColumns) {
196205
throw new Error("Symbol can be added only after table name is set and before any column added");
197206
}
207+
const valueStr = value.toString();
208+
checkCapacity(this, [name, valueStr], 2 + name.length + valueStr.length);
198209
write(this, ',');
199210
validateColumnName(name);
200211
writeEscaped(this, name);
201212
write(this, '=');
202-
writeEscaped(this, value.toString());
213+
writeEscaped(this, valueStr);
203214
this.hasSymbols = true;
204215
return this;
205216
}
@@ -213,6 +224,7 @@ class Sender {
213224
*/
214225
stringColumn(name, value) {
215226
writeColumn(this, name, value, () => {
227+
checkCapacity(this, [value], 2 + value.length);
216228
write(this, '"');
217229
writeEscaped(this, value, true);
218230
write(this, '"');
@@ -229,6 +241,7 @@ class Sender {
229241
*/
230242
booleanColumn(name, value) {
231243
writeColumn(this, name, value, () => {
244+
checkCapacity(this, [], 1);
232245
write(this, value ? 't' : 'f');
233246
}, "boolean");
234247
return this;
@@ -243,7 +256,9 @@ class Sender {
243256
*/
244257
floatColumn(name, value) {
245258
writeColumn(this, name, value, () => {
246-
write(this, value.toString());
259+
const valueStr = value.toString();
260+
checkCapacity(this, [valueStr], valueStr.length);
261+
write(this, valueStr);
247262
}, "number");
248263
return this;
249264
}
@@ -260,7 +275,9 @@ class Sender {
260275
throw new Error(`Value must be an integer, received ${value}`);
261276
}
262277
writeColumn(this, name, value, () => {
263-
write(this, value.toString());
278+
const valueStr = value.toString();
279+
checkCapacity(this, [valueStr], 1 + valueStr.length);
280+
write(this, valueStr);
264281
write(this, 'i');
265282
}, "number");
266283
return this;
@@ -278,7 +295,9 @@ class Sender {
278295
throw new Error(`Value must be an integer, received ${value}`);
279296
}
280297
writeColumn(this, name, value, () => {
281-
write(this, value.toString());
298+
const valueStr = value.toString();
299+
checkCapacity(this, [valueStr], 1 + valueStr.length);
300+
write(this, valueStr);
282301
write(this, 't');
283302
}, "number");
284303
return this;
@@ -297,8 +316,10 @@ class Sender {
297316
throw new Error(`The designated timestamp must be of type string, received ${typeof timestamp}`);
298317
}
299318
validateDesignatedTimestamp(timestamp);
319+
const timestampStr = timestamp.toString();
320+
checkCapacity(this, [], 2 + timestampStr.length);
300321
write(this, ' ');
301-
write(this, timestamp.toString());
322+
write(this, timestampStr);
302323
write(this, '\n');
303324
startNewRow(this);
304325
}
@@ -311,6 +332,7 @@ class Sender {
311332
if (!this.hasSymbols && !this.hasColumns) {
312333
throw new Error("The row must have a symbol or column set before it is closed");
313334
}
335+
checkCapacity(this, [], 1);
314336
write(this, '\n');
315337
startNewRow(this);
316338
}
@@ -338,11 +360,34 @@ async function authenticate(sender, challenge) {
338360
}
339361

340362
function startNewRow(sender) {
363+
sender.endOfLastRow = sender.position;
341364
sender.hasTable = false;
342365
sender.hasSymbols = false;
343366
sender.hasColumns = false;
344367
}
345368

369+
function checkCapacity(sender, data, base = 0) {
370+
let length = base;
371+
for (const str of data) {
372+
length += Buffer.byteLength(str, 'utf8');
373+
}
374+
if (sender.position + length > sender.bufferSize) {
375+
let newSize = sender.bufferSize;
376+
do {
377+
newSize += sender.bufferSize;
378+
} while(sender.position + length > newSize);
379+
sender.resize(newSize);
380+
}
381+
}
382+
383+
function compact(sender) {
384+
if (sender.endOfLastRow > 0) {
385+
sender.buffer.copy(sender.buffer, 0, sender.endOfLastRow, sender.position);
386+
sender.position = sender.position - sender.endOfLastRow;
387+
sender.endOfLastRow = 0;
388+
}
389+
}
390+
346391
function writeColumn(sender, name, value, writeValue, valueType) {
347392
if (typeof name !== "string") {
348393
throw new Error(`Column name must be a string, received ${typeof name}`);
@@ -353,6 +398,7 @@ function writeColumn(sender, name, value, writeValue, valueType) {
353398
if (!sender.hasTable) {
354399
throw new Error("Column can be set only after table name is set");
355400
}
401+
checkCapacity(sender, [name], 2 + name.length);
356402
write(sender, sender.hasColumns ? ',' : ' ');
357403
validateColumnName(name);
358404
writeEscaped(sender, name);

src/validation.js

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -132,20 +132,8 @@ function validateDesignatedTimestamp(timestamp) {
132132
}
133133
for (let i = 0; i < len; i++) {
134134
let ch = timestamp[i];
135-
switch (ch) {
136-
case '0':
137-
case '1':
138-
case '2':
139-
case '3':
140-
case '4':
141-
case '5':
142-
case '6':
143-
case '7':
144-
case '8':
145-
case '9':
146-
break;
147-
default:
148-
throw new Error(`Invalid character in designated timestamp: ${ch}`);
135+
if (ch < '0' || ch > '9') {
136+
throw new Error(`Invalid character in designated timestamp: ${ch}`);
149137
}
150138
}
151139
}

test/sender.test.js

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -335,20 +335,21 @@ describe('Sender message builder test suite (anything not covered in client inte
335335
).toThrow("Symbol can be added only after table name is set and before any column added");
336336
});
337337

338-
it('throws exception if preparing an empty buffer for send', function () {
338+
it('returns null if preparing an empty buffer for send', function () {
339339
const sender = new Sender(1024);
340-
expect(
341-
() => sender.toBuffer()
342-
).toThrow("The buffer is empty");
340+
expect(sender.toBuffer()).toBe(null);
343341
});
344342

345-
it('throws exception if preparing a buffer with an unclosed row for send', function () {
343+
it('ignores unfinished rows when preparing a buffer for send', function () {
346344
const sender = new Sender(1024);
345+
sender.table("tableName")
346+
.symbol("name", "value")
347+
.at("1234567890");
348+
sender.table("tableName")
349+
.symbol("name", "value2");
347350
expect(
348-
() => sender.table("tableName")
349-
.symbol("name", "value")
350-
.toBuffer()
351-
).toThrow("The buffer's content is invalid, row needs to be closed by calling at() or atNow()");
351+
sender.toBuffer(sender.endOfLastRow).toString()
352+
).toBe("tableName,name=value 1234567890\n");
352353
});
353354

354355
it('throws exception if a float is passed as integer field', function () {
@@ -394,31 +395,35 @@ describe('Sender message builder test suite (anything not covered in client inte
394395
).toThrow("The row must have a symbol or column set before it is closed");
395396
});
396397

397-
it('throws exception if buffer overflows', function () {
398-
const sender = new Sender(16);
399-
expect(
400-
() => sender.table("tableName")
401-
.intColumn("intField", 123)
402-
).toThrow("Buffer overflow [position=17, bufferSize=16]");
403-
});
404-
405-
it('can fix buffer overflows by calling resize()', function () {
406-
const sender = new Sender(16);
407-
expect(
408-
() => sender.table("tableName")
409-
.floatColumn("floatField", 123.34)
410-
).toThrow("Buffer overflow [position=17, bufferSize=16]");
398+
it('extends the size of the buffer if data does not fit', function () {
399+
const sender = new Sender(8);
400+
expect(sender.bufferSize).toBe(8);
401+
expect(sender.position).toBe(0);
402+
sender.table("tableName");
403+
expect(sender.bufferSize).toBe(16);
404+
expect(sender.position).toBe("tableName".length);
405+
sender.intColumn("intField", 123);
406+
expect(sender.bufferSize).toBe(32);
407+
expect(sender.position).toBe("tableName intField=123i".length);
408+
sender.atNow();
409+
expect(sender.bufferSize).toBe(32);
410+
expect(sender.position).toBe("tableName intField=123i\n".length);
411+
expect(sender.toBuffer().toString()).toBe(
412+
"tableName intField=123i\n"
413+
);
411414

412-
sender.resize(1024);
413-
sender.table("tableName")
414-
.floatColumn("floatField", 123.34)
415+
sender.table("table2")
416+
.intColumn("intField", 125)
417+
.stringColumn("strField", "test")
415418
.atNow();
419+
expect(sender.bufferSize).toBe(64);
420+
expect(sender.position).toBe("tableName intField=123i\ntable2 intField=125i,strField=\"test\"\n".length);
416421
expect(sender.toBuffer().toString()).toBe(
417-
"tableName floatField=123.34\n"
422+
"tableName intField=123i\ntable2 intField=125i,strField=\"test\"\n"
418423
);
419424
});
420425

421-
it('is possible to reuse the buffer by calling reset()', function () {
426+
it('is possible to clear the buffer by calling reset()', function () {
422427
const sender = new Sender(1024);
423428
sender.table("tableName")
424429
.booleanColumn("boolCol", true)

0 commit comments

Comments
 (0)