1+ import fetch from 'node-fetch'
2+ import { getAddress } from '@ethersproject/address'
3+ import { waitForCondition } from 'streamr-test-utils'
14import { getEndpointUrl } from '../utils'
25import authFetch from '../rest/authFetch'
36
4- import StorageNode from './StorageNode'
7+ import { StorageNode } from './StorageNode'
58import { StreamrClient } from '../StreamrClient'
69
710// TODO explicit types: e.g. we never provide both streamId and id, or both streamPartition and partition
@@ -221,6 +224,12 @@ export class Stream {
221224 }
222225
223226 async addToStorageNode ( address : string ) {
227+ // currently we support only one storage node
228+ // -> we can validate that the given address is that address
229+ // -> remove this comparison when we start to support multiple storage nodes
230+ if ( getAddress ( address ) !== this . _client . options . storageNode . address ) {
231+ throw new Error ( 'Unknown storage node: ' + address )
232+ }
224233 await authFetch (
225234 getEndpointUrl ( this . _client . options . restUrl , 'streams' , this . id , 'storageNodes' ) ,
226235 this . _client . session ,
@@ -231,6 +240,24 @@ export class Stream {
231240 } )
232241 } ,
233242 )
243+ // wait for propagation: the storage node sees the database change in E&E and
244+ // is ready to store the any stream data which we publish
245+ const TIMEOUT = 30 * 1000
246+ const POLL_INTERVAL = 500
247+ await waitForCondition ( ( ) => this . isStreamStoredInStorageNode ( this . id ) , TIMEOUT , POLL_INTERVAL ,
248+ ( ) => `Propagation timeout when adding stream to a storage node: ${ this . id } ` )
249+ }
250+
251+ private async isStreamStoredInStorageNode ( streamId : string ) {
252+ const url = `${ this . _client . options . storageNode . url } /api/v1/streams/${ encodeURIComponent ( streamId ) } /storage/partitions/0`
253+ const response = await fetch ( url )
254+ if ( response . status === 200 ) {
255+ return true
256+ }
257+ if ( response . status === 404 ) { // eslint-disable-line padding-line-between-statements
258+ return false
259+ }
260+ throw new Error ( `Unexpected response code ${ response . status } when fetching stream storage status` )
234261 }
235262
236263 async removeFromStorageNode ( address : string ) {
0 commit comments