@@ -4,12 +4,14 @@ import { StreamrClient } from '../../src/StreamrClient'
44import { MessageLayer } from 'streamr-client-protocol'
55import { Stream } from '../../src/stream'
66import { Subscription } from '../../src/subscribe'
7- import { fakePrivateKey } from '../utils'
7+ import { fakePrivateKey , addAfterFn } from '../utils'
88import Connection from '../../src/Connection'
99import prettyBytes from 'pretty-bytes'
1010
1111const TRAM_DEMO_STREAM = '7wa7APtlTq6EC5iTCBy6dw'
1212
13+ const log = Debug ( 'MessageQuantityTest' )
14+
1315function logMemory ( ) {
1416 const res = process . memoryUsage ( )
1517 return {
@@ -26,6 +28,7 @@ describe('no memleaks when processing a high quantity of large messages', () =>
2628 let stream : Stream
2729 let expectErrors = 0 // check no errors by default
2830 let onError = jest . fn ( )
31+ const afterFn = addAfterFn ( )
2932
3033 const createClient = ( opts = { } ) => new StreamrClient ( {
3134 autoConnect : false ,
@@ -51,8 +54,9 @@ describe('no memleaks when processing a high quantity of large messages', () =>
5154 } )
5255
5356 beforeEach ( ( ) => {
54- client . debug ( 'disabling logging for long tests' )
57+ client . debug ( 'disabling verbose client logging for long tests' )
5558 Debug . disable ( )
59+ Debug . enable ( 'MessageQuantityTest' )
5660 } )
5761
5862 afterEach ( ( ) => {
@@ -124,18 +128,18 @@ describe('no memleaks when processing a high quantity of large messages', () =>
124128 let count = 0
125129 let sub : Subscription
126130
127- const onMessage = ( maxMessages : number ) => ( msg : any , streamMessage : MessageLayer . StreamMessage ) => {
131+ const onMessage = ( maxMessages : number , maxMemoryUsage : number ) => ( msg : any , streamMessage : MessageLayer . StreamMessage ) => {
128132 totalBytes += Buffer . byteLength ( streamMessage . serializedContent , 'utf8' )
129133 if ( count % 1000 === 0 ) {
130134 const { rss } = process . memoryUsage ( )
131135 rssValues . push ( rss )
132- // eslint-disable-next-line no-console
133- console . info ( {
136+ log ( {
134137 msg,
135138 count,
136139 memory : logMemory ( ) ,
137140 total : prettyBytes ( totalBytes )
138141 } )
142+ expect ( rss ) . toBeLessThan ( maxMemoryUsage )
139143 }
140144
141145 if ( count === maxMessages ) {
@@ -151,8 +155,7 @@ describe('no memleaks when processing a high quantity of large messages', () =>
151155 const mean = rssValues . length ? rssValues . reduce ( ( a , b ) => a + b , 0 ) / rssValues . length : 0
152156 const median = rssValues . length ? rssValues [ Math . floor ( rssValues . length / 2 ) ] : 0
153157 const variance = rssValues . length ? Math . sqrt ( rssValues . reduce ( ( a , b ) => a + ( ( b - mean ) ** 2 ) , 0 ) / rssValues . length ) : 0
154- // eslint-disable-next-line no-console
155- console . info ( 'done' , {
158+ log ( 'done' , {
156159 max : prettyBytes ( max ) ,
157160 min : prettyBytes ( min ) ,
158161 mean : prettyBytes ( mean ) ,
@@ -177,39 +180,46 @@ describe('no memleaks when processing a high quantity of large messages', () =>
177180 }
178181 } )
179182
180- test . only ( 'realtime' , async ( ) => {
181- const MAX_MEMORY_USAGE = 2e+8 // 300MB
182- const MAX_MESSAGES = 6000
183- const MAX_TEST_TIME = 60000
184- sub = await client . subscribe ( {
185- stream : stream . id ,
186- } , onMessage ( MAX_MESSAGES ) )
187- const t = setTimeout ( ( ) => {
188- sub . unsubscribe ( )
189- } , MAX_TEST_TIME ) // run for 60s or 6k messages
190- await sub . onDone ( )
191- clearTimeout ( t )
192- validate ( MAX_MEMORY_USAGE )
193- } , 120000 )
183+ describe ( 'with realtime' , ( ) => {
184+ const MAX_MEMORY_USAGE = 2e+8 // 200MB
185+ // run for period or some number of messages, whichever comes first
186+ const MAX_TEST_TIME = 360000
187+ const MAX_MESSAGES = MAX_TEST_TIME / 10
194188
195- test ( 'resendSubscribe' , async ( ) => {
196- const MAX_MEMORY_USAGE = 2e+8 // 300MB
197- const MAX_MESSAGES = 6000
198- const MAX_TEST_TIME = 60000
199- sub = await client . subscribe ( {
200- stream : stream . id ,
201- resend : {
202- last : Math . floor ( MAX_MESSAGES / 2 ) ,
203- }
204- } , onMessage ( MAX_MESSAGES ) )
205- const t = setTimeout ( ( ) => {
206- sub . unsubscribe ( )
207- } , MAX_TEST_TIME ) // run for 60s or 6k messages
208- await sub . onDone ( )
209- clearTimeout ( t )
210- await sub . onDone ( )
211- validate ( MAX_MEMORY_USAGE )
212- } , 120000 )
189+ test ( 'just realtime' , async ( ) => {
190+ sub = await client . subscribe ( {
191+ stream : stream . id ,
192+ } , onMessage ( MAX_MESSAGES , MAX_MEMORY_USAGE ) )
193+ const t = setTimeout ( ( ) => {
194+ sub . unsubscribe ( )
195+ } , MAX_TEST_TIME )
196+ afterFn ( ( ) => {
197+ clearTimeout ( t )
198+ } )
199+ await sub . onDone ( )
200+ clearTimeout ( t )
201+ validate ( MAX_MEMORY_USAGE )
202+ } , MAX_TEST_TIME * 2 )
203+
204+ test ( 'resendSubscribe' , async ( ) => {
205+ sub = await client . subscribe ( {
206+ stream : stream . id ,
207+ resend : {
208+ last : Math . floor ( MAX_MESSAGES / 2 ) ,
209+ }
210+ } , onMessage ( MAX_MESSAGES , MAX_MEMORY_USAGE ) )
211+ const t = setTimeout ( ( ) => {
212+ sub . unsubscribe ( )
213+ } , MAX_TEST_TIME )
214+ afterFn ( ( ) => {
215+ clearTimeout ( t )
216+ } )
217+ await sub . onDone ( )
218+ clearTimeout ( t )
219+ await sub . onDone ( )
220+ validate ( MAX_MEMORY_USAGE )
221+ } , MAX_TEST_TIME * 2 )
222+ } )
213223
214224 test ( 'resend' , async ( ) => {
215225 const MAX_MEMORY_USAGE = 5e+8 // 500MB
0 commit comments