Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.

Commit 0a98488

Browse files
committed
Prevent suppressing of rethrown error from subscription pipeline onError.
1 parent 9ae1578 commit 0a98488

File tree

1 file changed

+30
-54
lines changed

1 file changed

+30
-54
lines changed

src/subscribe/pipeline.js

Lines changed: 30 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -64,89 +64,65 @@ export default function MessagePipeline(client, opts = {}, onFinally = async ()
6464
...beforeSteps,
6565
// unpack stream message
6666
async function* getStreamMessage(src) {
67-
try {
68-
for await (const { streamMessage } of src) {
69-
yield streamMessage
70-
}
71-
} catch (err) {
72-
await onError(err)
67+
for await (const { streamMessage } of src) {
68+
yield streamMessage
7369
}
7470
},
7571
// order messages (fill gaps)
7672
orderingUtil,
7773
// validate
7874
async function* ValidateMessages(src) {
79-
try {
80-
for await (const streamMessage of src) {
81-
try {
82-
await validate(streamMessage)
83-
} catch (err) {
84-
ignoreMessages.add(streamMessage)
85-
await onError(err)
86-
}
87-
yield streamMessage
75+
for await (const streamMessage of src) {
76+
try {
77+
await validate(streamMessage)
78+
} catch (err) {
79+
ignoreMessages.add(streamMessage)
80+
await onError(err)
8881
}
89-
} catch (err) {
90-
await onError(err)
82+
yield streamMessage
9183
}
9284
},
9385
// decrypt
9486
async function* DecryptMessages(src) {
95-
try {
96-
yield* decrypt(src, async (err, streamMessage) => {
97-
ignoreMessages.add(streamMessage)
98-
await onError(err)
99-
})
100-
} catch (err) {
87+
yield* decrypt(src, async (err, streamMessage) => {
88+
ignoreMessages.add(streamMessage)
10189
await onError(err)
102-
}
90+
})
10391
},
10492
// parse content
10593
async function* ParseMessages(src) {
106-
try {
107-
for await (const streamMessage of src) {
108-
try {
109-
streamMessage.getParsedContent()
110-
} catch (err) {
111-
ignoreMessages.add(streamMessage)
112-
await onError(err)
113-
}
114-
yield streamMessage
94+
for await (const streamMessage of src) {
95+
try {
96+
streamMessage.getParsedContent()
97+
} catch (err) {
98+
ignoreMessages.add(streamMessage)
99+
await onError(err)
115100
}
116-
} catch (err) {
117-
await onError(err)
101+
yield streamMessage
118102
}
119103
},
120104
// re-order messages (ignore gaps)
121105
internalOrderingUtil,
122106
// ignore any failed messages
123107
async function* IgnoreMessages(src) {
124-
try {
125-
for await (const streamMessage of src) {
126-
if (ignoreMessages.has(streamMessage)) {
127-
continue
128-
}
129-
yield streamMessage
108+
for await (const streamMessage of src) {
109+
if (ignoreMessages.has(streamMessage)) {
110+
continue
130111
}
131-
} catch (err) {
132-
await onError(err)
112+
yield streamMessage
133113
}
134114
},
135115
// special handling for bye message
136116
async function* ByeMessageSpecialHandling(src) {
137-
try {
138-
for await (const orderedMessage of src) {
139-
yield orderedMessage
140-
try {
141-
if (orderedMessage.isByeMessage()) {
142-
break
143-
}
144-
} catch (err) {
145-
await onError(err)
117+
for await (const orderedMessage of src) {
118+
yield orderedMessage
119+
try {
120+
if (orderedMessage.isByeMessage()) {
121+
break
146122
}
123+
} catch (err) {
124+
await onError(err)
147125
}
148-
} catch (err) {
149-
await onError(err)
150126
}
151127
},
152128
// custom pipeline steps

0 commit comments

Comments
 (0)