Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.

Commit 8b3279b

Browse files
authored
NET-193: Storage config propagation (#227)
Modified addToStorageNode to wait until the information has been propagated to the Broker. At that point, we can be sure that any published data will be stored. To enable the propagation check, new client options were added: storageNode.address and storageNode.url. Updated also all relevant integration tests: if a test does a resend for some stream, that stream must have been added to a StorageNode.
1 parent 829591a commit 8b3279b

19 files changed

+76
-16
lines changed

package-lock.json

Lines changed: 1 addition & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@
117117
"pretty-bytes": "^5.6.0",
118118
"process": "^0.11.10",
119119
"sinon": "^9.2.4",
120-
"streamr-test-utils": "^1.3.1",
121120
"terser-webpack-plugin": "^5.1.1",
122121
"ts-jest": "^26.5.1",
123122
"ts-loader": "^8.0.17",
@@ -160,6 +159,7 @@
160159
"quick-lru": "^6.0.0",
161160
"readable-stream": "^3.6.0",
162161
"streamr-client-protocol": "^8.0.0-beta.2",
162+
"streamr-test-utils": "^1.3.1",
163163
"ts-toolbelt": "^9.3.12",
164164
"uuid": "^8.3.2",
165165
"webpack-node-externals": "^2.5.2",

src/Config.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ export type StrictStreamrClientOptions = {
7171
templateMainnetAddress: EthereumAddress
7272
templateSidechainAddress: EthereumAddress
7373
},
74+
storageNode: {
75+
address: EthereumAddress
76+
url: string
77+
}
7478
cache: {
7579
maxSize: number,
7680
maxAge: number
@@ -139,6 +143,10 @@ export const STREAM_CLIENT_DEFAULTS: StrictStreamrClientOptions = {
139143
templateMainnetAddress: '0x5FE790E3751dd775Cb92e9086Acd34a2adeB8C7b',
140144
templateSidechainAddress: '0xf1E9d6E254BeA3f0129018AcA1A50AEcb7D528be',
141145
},
146+
storageNode: {
147+
address: '0x31546eEA76F2B2b3C5cC06B1c93601dc35c9D916',
148+
url: 'https://corea1.streamr.network:8001'
149+
},
142150
cache: {
143151
maxSize: 10000,
144152
maxAge: 30 * 60 * 1000, // 30 minutes
@@ -160,7 +168,8 @@ export default function ClientConfig(opts: StreamrClientOptions = {}) {
160168
'dataUnion.factoryMainnetAddress',
161169
'dataUnion.factorySidechainAddress',
162170
'dataUnion.templateMainnetAddress',
163-
'dataUnion.templateSidechainAddress'
171+
'dataUnion.templateSidechainAddress',
172+
'storageNode.address'
164173
])
165174

166175
const options: StrictStreamrClientOptions = {
@@ -174,7 +183,7 @@ export default function ClientConfig(opts: StreamrClientOptions = {}) {
174183
...STREAM_CLIENT_DEFAULTS.cache,
175184
...opts.cache,
176185
}
177-
// NOTE: sidechain is not merged with the defaults
186+
// NOTE: sidechain and storageNode settings are not merged with the defaults
178187
}
179188

180189
const parts = options.url!.split('?')

src/rest/StreamEndpoints.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import debugFactory from 'debug'
77
import { getEndpointUrl } from '../utils'
88
import { validateOptions } from '../stream/utils'
99
import { Stream, StreamOperation, StreamProperties } from '../stream'
10-
import StreamPart from '../stream/StreamPart'
10+
import { StreamPart } from '../stream/StreamPart'
1111
import { isKeyExchangeStream } from '../stream/KeyExchange'
1212

1313
import authFetch, { ErrorCode, NotFoundError } from './authFetch'

src/stream/StorageNode.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { EthereumAddress } from '../types'
22

3-
export default class StorageNode {
3+
export class StorageNode {
44

55
private _address: EthereumAddress
66

src/stream/StreamPart.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
export default class StreamPart {
1+
export class StreamPart {
22

33
_streamId: string
44
_streamPartition: number

src/stream/index.ts

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
import fetch from 'node-fetch'
2+
import { getAddress } from '@ethersproject/address'
3+
import { waitForCondition } from 'streamr-test-utils'
14
import { getEndpointUrl } from '../utils'
25
import authFetch from '../rest/authFetch'
36

4-
import StorageNode from './StorageNode'
7+
import { StorageNode } from './StorageNode'
58
import { StreamrClient } from '../StreamrClient'
69

710
// TODO explicit types: e.g. we never provide both streamId and id, or both streamPartition and partition
@@ -221,6 +224,12 @@ export class Stream {
221224
}
222225

223226
async addToStorageNode(address: string) {
227+
// currently we support only one storage node
228+
// -> we can validate that the given address is that address
229+
// -> remove this comparison when we start to support multiple storage nodes
230+
if (getAddress(address) !== this._client.options.storageNode.address) {
231+
throw new Error('Unknown storage node: ' + address)
232+
}
224233
await authFetch(
225234
getEndpointUrl(this._client.options.restUrl, 'streams', this.id, 'storageNodes'),
226235
this._client.session,
@@ -231,6 +240,24 @@ export class Stream {
231240
})
232241
},
233242
)
243+
// wait for propagation: the storage node sees the database change in E&E and
244+
// is ready to store the any stream data which we publish
245+
const TIMEOUT = 30 * 1000
246+
const POLL_INTERVAL = 500
247+
await waitForCondition(() => this.isStreamStoredInStorageNode(this.id), TIMEOUT, POLL_INTERVAL,
248+
() => `Propagation timeout when adding stream to a storage node: ${this.id}`)
249+
}
250+
251+
private async isStreamStoredInStorageNode(streamId: string) {
252+
const url = `${this._client.options.storageNode.url}/api/v1/streams/${encodeURIComponent(streamId)}/storage/partitions/0`
253+
const response = await fetch(url)
254+
if (response.status === 200) {
255+
return true
256+
}
257+
if (response.status === 404) { // eslint-disable-line padding-line-between-statements
258+
return false
259+
}
260+
throw new Error(`Unexpected response code ${response.status} when fetching stream storage status`)
234261
}
235262

236263
async removeFromStorageNode(address: string) {

test/integration/Encryption.test.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ describe('decryption', () => {
106106
requireEncryptedData: true,
107107
})
108108

109+
await stream.addToStorageNode(config.clientOptions.storageNode.address)
110+
109111
publishTestMessages = getPublishTestMessages(client, {
110112
stream
111113
})

test/integration/GapFill.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ describeRepeats('GapFill with resends', () => {
4747
requireSignedData: true,
4848
name: uid('stream')
4949
})
50+
await stream.addToStorageNode(config.clientOptions.storageNode.address)
5051

5152
client.debug('connecting before test <<')
5253
publishTestMessages = getPublishTestMessages(client, stream.id)

test/integration/MultipleClients.test.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ describeRepeats('PubSub with multiple clients', () => {
5050
stream = await mainClient.createStream({
5151
name: uid('stream')
5252
})
53+
await stream.addToStorageNode(config.clientOptions.storageNode.address)
5354
})
5455

5556
afterEach(async () => {

0 commit comments

Comments
 (0)