Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.

Commit 73019a5

Browse files
committed
test: Split single failing resendSubscribe test into own test file.
1 parent c1920fb commit 73019a5

File tree

2 files changed

+161
-66
lines changed

2 files changed

+161
-66
lines changed
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
import { wait } from 'streamr-test-utils'
2+
3+
import {
4+
Msg,
5+
uid,
6+
collect,
7+
describeRepeats,
8+
fakePrivateKey,
9+
getWaitForStorage,
10+
getPublishTestMessages,
11+
} from '../utils'
12+
import { StreamrClient } from '../../src/StreamrClient'
13+
import Connection from '../../src/Connection'
14+
15+
import config from './config'
16+
import { Stream } from '../../src/stream'
17+
import { Subscriber } from '../../src/subscribe'
18+
19+
/* eslint-disable no-await-in-loop */
20+
21+
const WAIT_FOR_STORAGE_TIMEOUT = process.env.CI ? 12000 : 6000
22+
const MAX_MESSAGES = 5
23+
const ITERATIONS = 3
24+
25+
describeRepeats('sequential resend subscribe', () => {
26+
let expectErrors = 0 // check no errors by default
27+
let onError = jest.fn()
28+
let client: StreamrClient
29+
let stream: Stream
30+
let published: any[]
31+
let publishedRequests: any[]
32+
let publishTestMessages: ReturnType<typeof getPublishTestMessages>
33+
let waitForStorage: (...args: any[]) => Promise<void>
34+
let subscriber: Subscriber
35+
36+
const createClient = (opts = {}) => {
37+
const c = new StreamrClient({
38+
...config.clientOptions,
39+
auth: {
40+
privateKey: fakePrivateKey(),
41+
},
42+
// @ts-expect-error
43+
publishAutoDisconnectDelay: 1000,
44+
autoConnect: false,
45+
autoDisconnect: false,
46+
maxRetries: 2,
47+
...opts,
48+
})
49+
c.onError = jest.fn()
50+
c.on('error', onError)
51+
return c
52+
}
53+
54+
beforeAll(async () => {
55+
client = createClient()
56+
subscriber = client.subscriber
57+
58+
// eslint-disable-next-line require-atomic-updates
59+
client.debug('connecting before test >>')
60+
await Promise.all([
61+
client.connect(),
62+
client.session.getSessionToken(),
63+
])
64+
stream = await client.createStream({
65+
name: uid('stream')
66+
})
67+
await stream.addToStorageNode(config.clientOptions.storageNode.address)
68+
client.debug('connecting before test <<')
69+
70+
publishTestMessages = getPublishTestMessages(client, {
71+
stream,
72+
})
73+
74+
waitForStorage = getWaitForStorage(client, {
75+
stream,
76+
timeout: WAIT_FOR_STORAGE_TIMEOUT,
77+
})
78+
79+
await client.connect()
80+
const results = await publishTestMessages.raw(MAX_MESSAGES, {
81+
waitForLast: true,
82+
timestamp: 111111,
83+
})
84+
85+
published = results.map(([msg]: any) => msg)
86+
publishedRequests = results.map(([, req]: any) => req)
87+
}, WAIT_FOR_STORAGE_TIMEOUT * 2)
88+
89+
beforeEach(async () => {
90+
await client.connect()
91+
expectErrors = 0
92+
onError = jest.fn()
93+
})
94+
95+
afterEach(async () => {
96+
await wait(0)
97+
// ensure no unexpected errors
98+
expect(onError).toHaveBeenCalledTimes(expectErrors)
99+
if (client) {
100+
expect(client.onError).toHaveBeenCalledTimes(expectErrors)
101+
}
102+
})
103+
104+
afterEach(async () => {
105+
await client.connect()
106+
// ensure last message is in storage
107+
const lastRequest = publishedRequests[publishedRequests.length - 1]
108+
await waitForStorage(lastRequest)
109+
client.debug('was stored', lastRequest)
110+
})
111+
112+
afterEach(async () => {
113+
if (client) {
114+
client.debug('disconnecting after test')
115+
await client.disconnect()
116+
}
117+
118+
const openSockets = Connection.getOpen()
119+
if (openSockets !== 0) {
120+
await Connection.closeOpen()
121+
throw new Error(`sockets not closed: ${openSockets}`)
122+
}
123+
client.debug('\n\n\n\n')
124+
})
125+
126+
for (let i = 0; i < ITERATIONS; i++) {
127+
const id = (i + 1) * 111111
128+
// eslint-disable-next-line no-loop-func
129+
test(`test ${id}`, async () => {
130+
const debug = client.debug.extend(`check ${id}`)
131+
debug('check >')
132+
const sub = await subscriber.resendSubscribe({
133+
streamId: stream.id,
134+
last: published.length,
135+
})
136+
137+
const onResent = jest.fn()
138+
sub.on('resent', onResent)
139+
140+
const message = Msg()
141+
// eslint-disable-next-line no-await-in-loop
142+
debug('PUBLISH >')
143+
const req = await client.publish(stream.id, message, id) // should be realtime
144+
debug('PUBLISH <')
145+
published.push(message)
146+
publishedRequests.push(req)
147+
debug('COLLECT >')
148+
const receivedMsgs = await collect(sub, async ({ received }) => {
149+
if (received.length === published.length) {
150+
await sub.return()
151+
}
152+
})
153+
debug('COLLECT <')
154+
155+
const msgs = receivedMsgs
156+
expect(msgs).toHaveLength(published.length)
157+
expect(msgs).toEqual(published)
158+
client.debug('check <')
159+
}, 30000)
160+
}
161+
})

test/integration/SubscriberResends.test.ts

Lines changed: 0 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -501,70 +501,4 @@ describeRepeats('resends', () => {
501501
})
502502
})
503503
})
504-
505-
it.only('sequential resendSubscribe', async () => {
506-
await client.connect()
507-
const results = await publishTestMessages.raw(MAX_MESSAGES, {
508-
waitForLast: true,
509-
timestamp: 111111,
510-
})
511-
512-
published = results.map(([msg]: any) => msg)
513-
publishedRequests = results.map(([, req]: any) => req)
514-
515-
async function waitForLastMessage() {
516-
await client.connect()
517-
// ensure last message is in storage
518-
const lastRequest = publishedRequests[publishedRequests.length - 1]
519-
await waitForStorage(lastRequest)
520-
client.debug('was stored', lastRequest)
521-
}
522-
523-
async function check(id: number) {
524-
const debug = client.debug.extend(`check ${id}`)
525-
debug('check >')
526-
const sub = await subscriber.resendSubscribe({
527-
streamId: stream.id,
528-
last: published.length,
529-
})
530-
531-
const onResent = jest.fn()
532-
sub.on('resent', onResent)
533-
534-
const message = Msg()
535-
// eslint-disable-next-line no-await-in-loop
536-
debug('PUBLISH >')
537-
const req = await client.publish(stream.id, message, id) // should be realtime
538-
debug('PUBLISH <')
539-
published.push(message)
540-
publishedRequests.push(req)
541-
debug('COLLECT >')
542-
const receivedMsgs = await collect(sub, async ({ received }) => {
543-
if (received.length === published.length) {
544-
await sub.return()
545-
}
546-
})
547-
debug('COLLECT <')
548-
549-
const msgs = receivedMsgs
550-
expect(msgs).toHaveLength(published.length)
551-
expect(msgs).toEqual(published)
552-
client.debug('check <')
553-
}
554-
555-
await client.connect()
556-
await check(111111)
557-
await waitForLastMessage()
558-
await client.disconnect()
559-
560-
await client.connect()
561-
await check(222222)
562-
await waitForLastMessage()
563-
await client.disconnect()
564-
565-
await client.connect()
566-
await check(333333)
567-
await waitForLastMessage()
568-
await client.disconnect()
569-
}, 60000)
570504
})

0 commit comments

Comments
 (0)