Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.

Commit 42ac9f1

Browse files
committed
fix(pipeline): Refix memleak.
1 parent b70f17b commit 42ac9f1

File tree

1 file changed

+14
-23
lines changed

1 file changed

+14
-23
lines changed

src/utils/iterators.js

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -140,10 +140,10 @@ const endGeneratorTimeout = pMemoize(async (gtr, error, timeout = 250) => {
140140

141141
/**
142142
* Creates a generator that can be cancelled and perform optional final cleanup.
143-
* const generator = CancelableGenerator(iterable, onFinally)
143+
* const [cancal, generator] = CancelableGenerator(iterable, onFinally)
144144
*/
145145

146-
export function CancelableGenerator(iterable, onFinally = async () => {}, { timeout = 250 } = {}) {
146+
export function CancelableGenerator(iterable, onFinally = () => {}, { timeout = 250 } = {}) {
147147
let cancelled = false
148148
let finalCalled = false
149149
let error
@@ -202,46 +202,36 @@ export function CancelableGenerator(iterable, onFinally = async () => {}, { time
202202
// manually iterate
203203
iterator = iterable[Symbol.asyncIterator]()
204204

205-
let onCancel
206-
207205
try {
208206
yield* {
209207
// here is the meat:
210208
// each next() races against cancel signal
211209
next: async (...args) => {
212-
if (onCancel) {
213-
cancelSignal.off('cancel', onCancel)
214-
}
210+
cancelSignal.removeAllListeners()
215211
const p = Defer()
216-
onCancel = (v) => {
212+
const onCancel = (v) => {
217213
if (v instanceof Error) {
218214
p.reject(v)
219215
} else {
220216
p.resolve({ value: undefined, done: true })
221217
}
222218
}
223219

224-
try {
225-
cancelSignal.once('cancel', onCancel)
226-
return await Promise.race([
227-
iterator.next(...args),
228-
p,
229-
])
230-
} finally {
231-
cancelSignal.off('cancel', onCancel)
232-
}
220+
cancelSignal.once('cancel', onCancel)
221+
return Promise.race([
222+
iterator.next(...args).finally(() => {
223+
cancelSignal.off('cancel', onCancel)
224+
}),
225+
p,
226+
])
233227
},
234228
async throw(err) {
235-
if (onCancel) {
236-
cancelSignal.off('cancel', onCancel)
237-
}
229+
cancelSignal.removeAllListeners()
238230
await endGeneratorTimeout(iterator, err, timeout)
239231
throw err
240232
},
241233
async return(v) {
242-
if (onCancel) {
243-
cancelSignal.off('cancel', onCancel)
244-
}
234+
cancelSignal.removeAllListeners()
245235
await endGeneratorTimeout(iterator, error, timeout)
246236
return {
247237
value: v,
@@ -253,6 +243,7 @@ export function CancelableGenerator(iterable, onFinally = async () => {}, { time
253243
},
254244
}
255245
} finally {
246+
cancelSignal.removeAllListeners()
256247
// try end iterator
257248
if (iterator) {
258249
await endGeneratorTimeout(iterator, error, timeout)

0 commit comments

Comments
 (0)