Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.

Commit 6ee58f9

Browse files
committed
test(subscribe, iterators): Fix error handling.
1 parent 41fe830 commit 6ee58f9

File tree

3 files changed

+21
-12
lines changed

3 files changed

+21
-12
lines changed

src/subscribe/index.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,15 @@ import MessagePipeline from './pipeline'
1010
import Validator from './Validator'
1111
import messageStream from './messageStream'
1212
import resendStream from './resendStream'
13-
import { MaybeAsync, Todo } from '../types'
13+
import { Todo, MaybeAsync } from '../types'
1414
import StreamrClient, { StreamPartDefinition, SubscribeOptions } from '..'
1515

16+
async function defaultOnFinally(err?: Error) {
17+
if (err) {
18+
throw err
19+
}
20+
}
21+
1622
/**
1723
* @category Important
1824
*/
@@ -39,7 +45,7 @@ export class Subscription extends Emitter {
3945
/** @internal */
4046
iterated?: Todo
4147

42-
constructor(client: StreamrClient, opts: Todo, onFinally = async () => {}) {
48+
constructor(client: StreamrClient, opts: Todo, onFinally = defaultOnFinally) {
4349
super()
4450
this.client = client
4551
this.options = validateOptions(opts)
@@ -400,7 +406,7 @@ class Subscriptions {
400406
this.subSessions = new Map()
401407
}
402408

403-
async add(opts: StreamPartDefinition, onFinally: MaybeAsync<(err?: any) => void> = async () => {}) {
409+
async add(opts: StreamPartDefinition, onFinally: MaybeAsync<(err?: any) => void> = defaultOnFinally) {
404410
const options = validateOptions(opts)
405411
const { key } = options
406412

src/subscribe/pipeline.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ async function collect(src) {
2222
* Subscription message processing pipeline
2323
*/
2424

25-
export default function MessagePipeline(client, opts = {}, onFinally = async () => {}) {
25+
export default function MessagePipeline(client, opts = {}, onFinally = async (err) => { if (err) { throw err } }) {
2626
const options = validateOptions(opts)
2727
const { key, afterSteps = [], beforeSteps = [] } = options
2828
const id = counterId('MessagePipeline') + key
@@ -128,7 +128,7 @@ export default function MessagePipeline(client, opts = {}, onFinally = async ()
128128
// custom pipeline steps
129129
...afterSteps
130130
], async (err, ...args) => {
131-
// await msgStream.cancel(err)
131+
await msgStream.cancel(err)
132132
try {
133133
if (err) {
134134
await onError(err)

src/utils/iterators.js

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -351,14 +351,17 @@ export function pipeline(iterables = [], onFinally = defaultOnFinally, { end, ..
351351
firstSrc = v
352352
}
353353

354-
const last = iterables.reduce((_prev, next, index) => {
355-
let prev
356-
let nextIterable
354+
iterables.forEach((nextIterable) => {
355+
if (nextIterable.cancel) {
356+
cancelFns.add(nextIterable)
357+
}
358+
})
357359

360+
const last = iterables.reduce((_prev, next, index) => {
358361
const it = CancelableGenerator((async function* Gen() {
359-
prev = index === 0 ? firstSrc : _prev
362+
const prev = index === 0 ? firstSrc : _prev
360363
// take first "prev" from outer iterator, if one exists
361-
nextIterable = typeof next === 'function' ? next(prev) : next
364+
const nextIterable = typeof next === 'function' ? next(prev) : next
362365

363366
if (prev && nextIterable[isPipeline]) {
364367
nextIterable.setFirstSource(prev)
@@ -372,7 +375,6 @@ export function pipeline(iterables = [], onFinally = defaultOnFinally, { end, ..
372375
prev.id = prev.id || 'inter-' + nextIterable.id
373376
nextIterable.from(prev, { end })
374377
}
375-
376378
yield* nextIterable
377379
}()), async (err) => {
378380
if (!error && err && error !== err) {
@@ -384,7 +386,8 @@ export function pipeline(iterables = [], onFinally = defaultOnFinally, { end, ..
384386
return it
385387
}, undefined)
386388

387-
pipelineValue = iteratorFinally(last, async () => {
389+
pipelineValue = iteratorFinally(last, async (err) => {
390+
error = AggregatedError.from(error, err)
388391
if (!cancelled) {
389392
await cancelAll(error)
390393
}

0 commit comments

Comments
 (0)