Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.

Commit 08abe11

Browse files
authored
Merge pull request #236 from streamr-dev/memleak2
NET-229: Fix Memleak(s)
2 parents 1c16d42 + e461654 commit 08abe11

File tree

10 files changed

+421
-72
lines changed

10 files changed

+421
-72
lines changed

.github/workflows/test-build.yml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,13 @@ on:
3131
# Be sure to update both workflow files if you edit any env or trigger config
3232

3333
jobs:
34+
init:
35+
runs-on: ubuntu-latest
36+
steps:
37+
- name: Cancel Previous Runs
38+
uses: styfle/cancel-workflow-action@0.8.0
39+
with:
40+
access_token: ${{ github.token }}
3441
build:
3542
name: Run build using Node 14.x
3643
runs-on: ubuntu-latest
@@ -188,6 +195,6 @@ jobs:
188195
name: run-client-testing
189196
with:
190197
max_attempts: 2
191-
timeout_minutes: 8
198+
timeout_minutes: 15
192199
retry_on: error
193-
command: cd streamr-client-testing && java -jar build/libs/client_testing-1.0-SNAPSHOT.jar -s $TEST_NAME -c config/$CONFIG_NAME.conf -n $NUM_MESSAGES
200+
command: cd streamr-client-testing && DEBUG='' java -jar build/libs/client_testing-1.0-SNAPSHOT.jar -s $TEST_NAME -c config/$CONFIG_NAME.conf -n $NUM_MESSAGES

.github/workflows/test-code.yml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,13 @@ on:
3131
# Be sure to update both workflow files if you edit any env or trigger config
3232

3333
jobs:
34+
init:
35+
runs-on: ubuntu-latest
36+
steps:
37+
- name: Cancel Previous Runs
38+
uses: styfle/cancel-workflow-action@0.8.0
39+
with:
40+
access_token: ${{ github.token }}
3441
lint:
3542
name: Run linter using Node 14.x
3643
runs-on: ubuntu-latest
@@ -133,3 +140,25 @@ jobs:
133140
retry_on: error
134141
command: npm run test-flakey || echo "::warning::Flakey Tests Failed"
135142

143+
memory:
144+
name: Memory Tests using Node ${{ matrix.node-version }}
145+
runs-on: ubuntu-latest
146+
strategy:
147+
fail-fast: false
148+
matrix:
149+
node-version: [12.x, 14.x]
150+
151+
steps:
152+
- uses: actions/checkout@v2
153+
- uses: actions/setup-node@v2
154+
with:
155+
node-version: ${{ matrix.node-version }}
156+
- name: npm ci
157+
run: npm ci
158+
- uses: nick-invision/retry@v2
159+
name: Run Test
160+
with:
161+
max_attempts: 2
162+
timeout_minutes: 20
163+
retry_on: error
164+
command: npm run test-memory

package-lock.json

Lines changed: 7 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "streamr-client",
3-
"version": "5.1.0",
3+
"version": "5.2.0-beta",
44
"description": "JavaScript client library for Streamr",
55
"private": true,
66
"repository": {
@@ -48,6 +48,7 @@
4848
"test-integration-no-resend": "jest --forceExit --testTimeout=10000 --testPathIgnorePatterns='resend|Resend' --testNamePattern='^((?!(resend|Resend|resent|Resent|gap|Gap)).)*$' test/integration/*.test.*",
4949
"test-integration-resend": "jest --forceExit --testTimeout=15000 --testNamePattern='(resend|Resend|resent|Resent)' test/integration/*.test.*",
5050
"test-integration-dataunions": "jest --forceExit --testTimeout=15000 --runInBand test/integration/dataunion",
51+
"test-memory": "node --gc-global --predictable-gc-schedule node_modules/.bin/jest test/memory",
5152
"test-flakey": "jest --forceExit test/flakey/*",
5253
"test-browser": "node ./test/browser/server.js & node node_modules/nightwatch/bin/nightwatch ./test/browser/browser.js && pkill -f server.js",
5354
"install-example": "cd examples/webpack && npm ci",
@@ -113,6 +114,7 @@
113114
"jest-circus": "^26.6.3",
114115
"lodash-webpack-plugin": "^0.11.6",
115116
"nightwatch": "^1.5.1",
117+
"pretty-bytes": "^5.6.0",
116118
"process": "^0.11.10",
117119
"sinon": "^9.2.4",
118120
"streamr-test-utils": "^1.3.1",

src/subscribe/Validator.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@ export default function Validator(client, opts) {
4141
},
4242
verify: CacheAsyncFn(SigningUtil.verify.bind(SigningUtil), {
4343
...client.options.cache,
44-
cachePromiseRejection: false,
44+
// forcibly use small cache otherwise keeps n serialized messages in memory
45+
maxSize: 100,
46+
maxAge: 10000,
47+
cachePromiseRejection: true,
4548
cacheKey: (args) => args.join('|'),
4649
})
4750
})

src/utils/PushQueue.ts

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { pOrderedResolve, Defer, pTimeout } from './index'
1+
import { Defer, pTimeout } from './index'
22

33
async function endGenerator(gtr: AsyncGenerator, error?: Error) {
44
return error
@@ -127,20 +127,11 @@ export default class PushQueue<T> {
127127
}
128128

129129
static transform<TT, U>(src: AnyIterable<TT>, fn: (value: TT) => U, opts = {}) {
130-
const buffer = new PushQueue<TT>([], opts)
131-
const orderedFn = pOrderedResolve(fn) // push must be run in sequence
130+
const buffer = new PushQueue<U>([], opts)
132131
;(async () => { // eslint-disable-line semi-style
133-
const tasks = []
134132
for await (const value of src) {
135-
// run in parallel
136-
const task = orderedFn(value).then(() => (
137-
buffer.push(value)
138-
)).catch((err) => {
139-
buffer.throw(err)
140-
})
141-
tasks.push(task)
133+
buffer.push(fn(value))
142134
}
143-
await Promise.all(tasks)
144135
if (buffer.autoEnd) {
145136
buffer.end()
146137
}
@@ -169,7 +160,7 @@ export default class PushQueue<T> {
169160
}
170161

171162
if (end) {
172-
await this.end()
163+
this.end()
173164
}
174165

175166
return Promise.resolve()
@@ -235,8 +226,6 @@ export default class PushQueue<T> {
235226
// for next()
236227
this.error = err
237228
}
238-
239-
await this._cleanup()
240229
}
241230

242231
get length() {
@@ -245,18 +234,30 @@ export default class PushQueue<T> {
245234
}
246235

247236
async _cleanup() {
237+
// capture error and pending next promises
238+
const { error, nextQueue } = this
248239
this.finished = true
249-
const { error } = this
250-
for (const p of this.nextQueue) {
240+
this.error = undefined
241+
this.pending = 0
242+
// empty buffer then reassign
243+
this.buffer.length = 0
244+
this.buffer = []
245+
// reassign nextQueue, emptying would mutate value we captured
246+
this.nextQueue = []
247+
const doneValue = { value: undefined, done: true }
248+
// resolve all pending next promises
249+
while (nextQueue.length) {
250+
const p = nextQueue.shift()
251+
if (!p) { continue }
252+
251253
if (error) {
252254
p.reject(error)
253255
} else {
254-
p.resolve(undefined)
256+
p.resolve(doneValue)
255257
}
256258
}
257-
this.pending = 0
258-
this.buffer.length = 0
259-
return this.onEnd(this.error)
259+
260+
return this.onEnd(error)
260261
}
261262

262263
push(...values: (T | null)[]) {
@@ -340,7 +341,6 @@ export default class PushQueue<T> {
340341

341342
const deferred = Defer<T>()
342343
this.nextQueue.push(deferred)
343-
344344
deferred.catch(() => {}) // prevent unhandledrejection
345345
const value = await deferred
346346

@@ -389,6 +389,7 @@ export default class PushQueue<T> {
389389
try {
390390
yield* this.iterator
391391
} finally {
392+
this._cleanup()
392393
this.finished = true
393394
if (this.signal) {
394395
this.signal.removeEventListener('abort', this.onAbort)

src/utils/index.ts

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -220,13 +220,32 @@ export function CacheFn(fn: Parameters<typeof mem>[0], {
220220
type PromiseResolve = L.Compulsory<Parameters<Promise<any>['then']>>[0]
221221
type PromiseReject = L.Compulsory<Parameters<Promise<any>['then']>>[1]
222222

223-
export function Defer<T>(executor: (...args: Parameters<Promise<T>['then']>) => void = () => {}) {
224-
let resolve: PromiseResolve = () => {}
225-
let reject: PromiseReject = () => {}
223+
const noop = () => {}
224+
225+
export function Defer<T>(executor: (...args: Parameters<Promise<T>['then']>) => void = noop) {
226+
let resolveFn: PromiseResolve | undefined
227+
let rejectFn: PromiseResolve | undefined
228+
const resolve: PromiseReject = (value) => {
229+
if (resolveFn) {
230+
const r = resolveFn
231+
resolveFn = undefined
232+
rejectFn = undefined
233+
r(value)
234+
}
235+
}
236+
const reject: PromiseReject = (error) => {
237+
if (rejectFn) {
238+
const r = rejectFn
239+
resolveFn = undefined
240+
rejectFn = undefined
241+
r(error)
242+
}
243+
}
244+
226245
// eslint-disable-next-line promise/param-names
227246
const p = new Promise((_resolve, _reject) => {
228-
resolve = _resolve
229-
reject = _reject
247+
resolveFn = _resolve
248+
rejectFn = _reject
230249
executor(resolve, reject)
231250
})
232251
p.catch(() => {}) // prevent unhandledrejection
@@ -295,21 +314,26 @@ export function LimitAsyncFnByKey<KeyType>(limit = 1) {
295314
// clean up if no more active entries (if not cleared)
296315
pending.delete(id)
297316
}
298-
queueOnEmptyTasks.delete(id)
317+
318+
if (queueOnEmptyTasks.has(id)) {
319+
queueOnEmptyTasks.get(id).resolve(undefined)
320+
queueOnEmptyTasks.delete(id)
321+
}
322+
299323
onQueueEmpty.resolve()
300324
}
301325
}
302326
}
303327

304328
f.getOnQueueEmpty = async (id: KeyType) => {
305-
return queueOnEmptyTasks.get(id) || pending.set(id, Defer()).get(id)
329+
return queueOnEmptyTasks.get(id) || queueOnEmptyTasks.set(id, Defer()).get(id)
306330
}
307331

308332
f.clear = () => {
309333
// note: does not cancel promises
310334
pending.forEach((p) => p.clearQueue())
311335
pending.clear()
312-
queueOnEmptyTasks.forEach((p) => p.resolve())
336+
queueOnEmptyTasks.forEach((p) => p.resolve(undefined))
313337
queueOnEmptyTasks.clear()
314338
}
315339
return f

0 commit comments

Comments
 (0)