Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.

Commit d383241

Browse files
committed
refactor: Ensure cancelable generator cancels pending next calls.
1 parent 8a55a4c commit d383241

File tree

3 files changed

+48
-7
lines changed

3 files changed

+48
-7
lines changed

src/utils/PushQueue.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,14 +234,18 @@ export default class PushQueue<T> {
234234
}
235235

236236
async _cleanup() {
237+
// capture error and pending next promises
237238
const { error, nextQueue } = this
238239
this.finished = true
239240
this.error = undefined
240241
this.pending = 0
242+
// empty buffer then reassign
241243
this.buffer.length = 0
242244
this.buffer = []
245+
// reassign nextQueue, emptying would mutate value we captured
243246
this.nextQueue = []
244247
const doneValue = { value: undefined, done: true }
248+
// resolve all pending next promises
245249
while (nextQueue.length) {
246250
const p = nextQueue.shift()
247251
if (!p) { continue }

src/utils/iterators.js

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@ export function CancelableGenerator(iterable, onFinally = () => {}, { timeout =
198198
return onDone
199199
}
200200

201+
let pendingNext = 0
202+
201203
async function* CancelableGeneratorFn() {
202204
// manually iterate
203205
iterator = iterable[Symbol.asyncIterator]()
@@ -206,7 +208,8 @@ export function CancelableGenerator(iterable, onFinally = () => {}, { timeout =
206208
yield* {
207209
// each next() races against cancel signal
208210
next: async (...args) => {
209-
cancelSignal.removeAllListeners()
211+
pendingNext += 1
212+
cancelSignal.setMaxListeners(pendingNext)
210213
// NOTE:
211214
// Very easy to create a memleak here.
212215
// Using a shared promise with Promise.race
@@ -226,7 +229,9 @@ export function CancelableGenerator(iterable, onFinally = () => {}, { timeout =
226229
iterator.next(...args),
227230
cancelPromise,
228231
]).finally(() => {
229-
cancelPromise.resolve({ value: undefined, done: true })
232+
pendingNext -= 1
233+
cancelSignal.setMaxListeners(pendingNext)
234+
cancelSignal.off('cancel', onCancel)
230235
})
231236
},
232237
async throw(err) {

test/unit/iterators.test.js

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,15 @@ import IteratorTest, { expected, MAX_ITEMS } from './IteratorTest'
88

99
const WAIT = 20
1010

11-
async function* generate(items = expected) {
12-
await wait(WAIT * 0.1)
11+
async function* generate(items = expected, waitTime = WAIT) {
12+
await wait(waitTime * 0.1)
1313
for await (const item of items) {
14-
await wait(WAIT * 0.1)
14+
console.log('next', item)
15+
await wait(waitTime * 0.1)
1516
yield item
16-
await wait(WAIT * 0.1)
17+
await wait(waitTime * 0.1)
1718
}
18-
await wait(WAIT * 0.1)
19+
await wait(waitTime * 0.1)
1920
}
2021

2122
async function* generateThrow(items = expected, { max = MAX_ITEMS, err = new Error('expected') }) {
@@ -669,6 +670,37 @@ describe('Iterator Utils', () => {
669670
}
670671
})
671672

673+
it('can handle errs when queued next calls', async () => {
674+
const done = Defer()
675+
const expectedError = new Error('expected')
676+
try {
677+
const itr = CancelableGenerator((async function* Gen() {
678+
yield* generate(expected, 1000)
679+
}()), onFinally, {
680+
timeout: WAIT,
681+
})
682+
683+
const tasks = expected.map(async () => itr.next())
684+
await wait(100)
685+
await itr.cancel(expectedError)
686+
const result = Promise.allSettled(tasks)
687+
// first is error
688+
expect(result[0]).toEqual({ status: 'rejected', reason: expectedError })
689+
// rest is undefined result
690+
// not sure what good behaviour should be in this case
691+
expect(result.slice(1)).toEqual(result.slice(1).map(() => ({
692+
status: 'fulfilled',
693+
value: {
694+
value: undefined,
695+
done: true
696+
}
697+
})))
698+
expect(itr.isCancelled()).toEqual(true)
699+
} finally {
700+
await done
701+
}
702+
}, 10000)
703+
672704
it('can handle queued next calls resolving out of order', async () => {
673705
const done = Defer()
674706
try {

0 commit comments

Comments
 (0)