Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.

Commit 86fcb83

Browse files
committed
fix(pipeline): Fix memleak in pipeline.
1 parent 1eaa55a commit 86fcb83

File tree

1 file changed

+35
-26
lines changed

1 file changed

+35
-26
lines changed

src/utils/iterators.js

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import Emitter from 'events'
12
import pMemoize from 'p-memoize'
23

34
import { Defer, pTimeout, allSettledValues, AggregatedError } from './index'
@@ -149,7 +150,7 @@ export function CancelableGenerator(iterable, onFinally = () => {}, { timeout =
149150
let pendingNextCount = 0
150151
let error
151152

152-
const onCancel = Defer()
153+
const cancelSignal = new Emitter()
153154
const onDone = Defer()
154155

155156
let iterator
@@ -179,15 +180,9 @@ export function CancelableGenerator(iterable, onFinally = () => {}, { timeout =
179180
}
180181

181182
if (error) {
182-
onCancel.reject(error)
183-
if (!started) {
184-
onCancel.catch(() => {})
185-
}
183+
cancelSignal.emit('cancel', error)
186184
} else {
187-
onCancel.resolve({
188-
value,
189-
done: true,
190-
})
185+
cancelSignal.emit('cancel', value)
191186
}
192187
}
193188

@@ -211,32 +206,46 @@ export function CancelableGenerator(iterable, onFinally = () => {}, { timeout =
211206
// manually iterate
212207
iterator = iterable[Symbol.asyncIterator]()
213208

214-
// keep track of pending calls to next()
215-
// so we can cancel early if nothing pending
216-
async function next(...args) {
217-
// use symbol instead of true so we can tell if called multiple times
218-
// see === comparison below
219-
pendingNextCount += 1
220-
try {
221-
return await iterator.next(...args)
222-
} finally {
223-
pendingNextCount = Math.max(0, pendingNextCount - 1) // eslint-disable-line require-atomic-updates
224-
}
225-
}
209+
let onCancel
226210

227211
try {
228212
yield* {
229213
// here is the meat:
230-
// each next() races against cancel promise
231-
next: async (...args) => Promise.race([
232-
next(...args),
233-
onCancel,
234-
]),
214+
// each next() races against cancel signal
215+
next: async (...args) => {
216+
if (onCancel) {
217+
cancelSignal.off('cancel', onCancel)
218+
}
219+
const p = Defer()
220+
onCancel = (v) => {
221+
if (v instanceof Error) {
222+
p.reject(v)
223+
} else {
224+
p.resolve()
225+
}
226+
}
227+
228+
try {
229+
cancelSignal.once('cancel', onCancel)
230+
return await Promise.race([
231+
iterator.next(...args),
232+
p,
233+
])
234+
} finally {
235+
cancelSignal.off('cancel', onCancel)
236+
}
237+
},
235238
async throw(err) {
239+
if (onCancel) {
240+
cancelSignal.off('cancel', onCancel)
241+
}
236242
await endGeneratorTimeout(iterator, err, timeout)
237243
throw err
238244
},
239245
async return(v) {
246+
if (onCancel) {
247+
cancelSignal.off('cancel', onCancel)
248+
}
240249
await endGeneratorTimeout(iterator, error, timeout)
241250
return {
242251
value: v,

0 commit comments

Comments
 (0)