@@ -116,8 +116,8 @@ export class RemoteStore {
116116
117117 private accumulatedWatchChanges : WatchChange [ ] = [ ] ;
118118
119- private watchStream : PersistentListenStream ;
120- private writeStream : PersistentWriteStream ;
119+ private watchStream : PersistentListenStream = null ;
120+ private writeStream : PersistentWriteStream = null ;
121121
122122 /**
123123 * The online state of the watch stream. The state is set to healthy if and
@@ -149,10 +149,7 @@ export class RemoteStore {
149149 * LocalStore, etc.
150150 */
151151 start ( ) : Promise < void > {
152- return this . setupStreams ( ) . then ( ( ) => {
153- // Resume any writes
154- return this . fillWritePipeline ( ) ;
155- } ) ;
152+ return this . enableNetwork ( ) ;
156153 }
157154
158155 private setOnlineStateToHealthy ( ) : void {
@@ -192,7 +189,26 @@ export class RemoteStore {
192189 }
193190 }
194191
195- private setupStreams ( ) : Promise < void > {
192+ private isNetworkEnabled ( ) : boolean {
193+ assert (
194+ ( this . watchStream == null ) == ( this . writeStream == null ) ,
195+ 'WatchStream and WriteStream should both be null or non-null'
196+ ) ;
197+ return this . watchStream != null ;
198+ }
199+
200+ /** Re-enables the network. Only to be called as the counterpart to disableNetwork(). */
201+ enableNetwork ( ) : Promise < void > {
202+ assert (
203+ this . watchStream == null ,
204+ 'enableNetwork() called with non-null watchStream.'
205+ ) ;
206+ assert (
207+ this . writeStream == null ,
208+ 'enableNetwork() called with non-null writeStream.'
209+ ) ;
210+
211+ // Create new streams (but note they're not started yet).
196212 this . watchStream = this . datastore . newPersistentWatchStream ( {
197213 onOpen : this . onWatchStreamOpen . bind ( this ) ,
198214 onClose : this . onWatchStreamClose . bind ( this ) ,
@@ -208,15 +224,38 @@ export class RemoteStore {
208224 // Load any saved stream token from persistent storage
209225 return this . localStore . getLastStreamToken ( ) . then ( token => {
210226 this . writeStream . lastStreamToken = token ;
227+
228+ if ( this . shouldStartWatchStream ( ) ) {
229+ this . startWatchStream ( ) ;
230+ }
231+
232+ this . updateAndBroadcastOnlineState ( OnlineState . Unknown ) ;
233+
234+ return this . fillWritePipeline ( ) ; // This may start the writeStream.
211235 } ) ;
212236 }
213237
214- shutdown ( ) : Promise < void > {
215- log . debug ( LOG_TAG , 'RemoteStore shutting down.' ) ;
216- this . cleanupWatchStreamState ( ) ;
217- this . writeStream . stop ( ) ;
238+ /** Temporarily disables the network. The network can be re-enabled using enableNetwork(). */
239+ disableNetwork ( ) : Promise < void > {
240+ this . updateAndBroadcastOnlineState ( OnlineState . Failed ) ;
241+
242+ // NOTE: We're guaranteed not to get any further events from these streams (not even a close
243+ // event).
218244 this . watchStream . stop ( ) ;
245+ this . writeStream . stop ( ) ;
246+
247+ this . cleanUpWatchStreamState ( ) ;
248+ this . cleanUpWriteStreamState ( ) ;
219249
250+ this . writeStream = null ;
251+ this . watchStream = null ;
252+
253+ return Promise . resolve ( ) ;
254+ }
255+
256+ shutdown ( ) : Promise < void > {
257+ log . debug ( LOG_TAG , 'RemoteStore shutting down.' ) ;
258+ this . disableNetwork ( ) ;
220259 return Promise . resolve ( undefined ) ;
221260 }
222261
@@ -228,11 +267,12 @@ export class RemoteStore {
228267 ) ;
229268 // Mark this as something the client is currently listening for.
230269 this . listenTargets [ queryData . targetId ] = queryData ;
231- if ( this . watchStream . isOpen ( ) ) {
232- this . sendWatchRequest ( queryData ) ;
233- } else if ( ! this . watchStream . isStarted ( ) ) {
270+
271+ if ( this . shouldStartWatchStream ( ) ) {
234272 // The listen will be sent in onWatchStreamOpen
235273 this . startWatchStream ( ) ;
274+ } else if ( this . isNetworkEnabled ( ) && this . watchStream . isOpen ( ) ) {
275+ this . sendWatchRequest ( queryData ) ;
236276 }
237277 }
238278
@@ -244,7 +284,7 @@ export class RemoteStore {
244284 ) ;
245285 const queryData = this . listenTargets [ targetId ] ;
246286 delete this . listenTargets [ targetId ] ;
247- if ( this . watchStream . isOpen ( ) ) {
287+ if ( this . isNetworkEnabled ( ) && this . watchStream . isOpen ( ) ) {
248288 this . sendUnwatchRequest ( targetId ) ;
249289 }
250290 }
@@ -279,10 +319,9 @@ export class RemoteStore {
279319 }
280320
281321 private startWatchStream ( ) : void {
282- assert ( ! this . watchStream . isStarted ( ) , "Can't restart started watch stream" ) ;
283322 assert (
284323 this . shouldStartWatchStream ( ) ,
285- 'Tried to start watch stream even though it should not be started '
324+ 'startWriteStream() called when shouldStartWatchStream() is false. '
286325 ) ;
287326 this . watchStream . start ( ) ;
288327 }
@@ -292,10 +331,14 @@ export class RemoteStore {
292331 * active targets trying to be listened too
293332 */
294333 private shouldStartWatchStream ( ) : boolean {
295- return ! objUtils . isEmpty ( this . listenTargets ) ;
334+ return (
335+ this . isNetworkEnabled ( ) &&
336+ ! this . watchStream . isStarted ( ) &&
337+ ! objUtils . isEmpty ( this . listenTargets )
338+ ) ;
296339 }
297340
298- private cleanupWatchStreamState ( ) : void {
341+ private cleanUpWatchStreamState ( ) : void {
299342 // If the connection is closed then we'll never get a snapshot version for
300343 // the accumulated changes and so we'll never be able to complete the batch.
301344 // When we start up again the server is going to resend these changes
@@ -314,7 +357,12 @@ export class RemoteStore {
314357 }
315358
316359 private onWatchStreamClose ( error : FirestoreError | null ) : Promise < void > {
317- this . cleanupWatchStreamState ( ) ;
360+ assert (
361+ this . isNetworkEnabled ( ) ,
362+ 'onWatchStreamClose() should only be called when the network is enabled'
363+ ) ;
364+
365+ this . cleanUpWatchStreamState ( ) ;
318366
319367 // If there was an error, retry the connection.
320368 if ( this . shouldStartWatchStream ( ) ) {
@@ -510,6 +558,11 @@ export class RemoteStore {
510558 return promiseChain ;
511559 }
512560
561+ cleanUpWriteStreamState ( ) {
562+ this . lastBatchSeen = BATCHID_UNKNOWN ;
563+ this . pendingWrites = [ ] ;
564+ }
565+
513566 /**
514567 * Notifies that there are new mutations to process in the queue. This is
515568 * typically called by SyncEngine after it has sent mutations to LocalStore.
@@ -543,7 +596,9 @@ export class RemoteStore {
543596 * writes complete the backend will be able to accept more.
544597 */
545598 canWriteMutations ( ) : boolean {
546- return this . pendingWrites . length < MAX_PENDING_WRITES ;
599+ return (
600+ this . isNetworkEnabled ( ) && this . pendingWrites . length < MAX_PENDING_WRITES
601+ ) ;
547602 }
548603
549604 // For testing
@@ -565,15 +620,26 @@ export class RemoteStore {
565620
566621 this . pendingWrites . push ( batch ) ;
567622
568- if ( ! this . writeStream . isStarted ( ) ) {
623+ if ( this . shouldStartWriteStream ( ) ) {
569624 this . startWriteStream ( ) ;
570- } else if ( this . writeStream . handshakeComplete ) {
625+ } else if ( this . isNetworkEnabled ( ) && this . writeStream . handshakeComplete ) {
571626 this . writeStream . writeMutations ( batch . mutations ) ;
572627 }
573628 }
574629
630+ private shouldStartWriteStream ( ) : boolean {
631+ return (
632+ this . isNetworkEnabled ( ) &&
633+ ! this . writeStream . isStarted ( ) &&
634+ this . pendingWrites . length > 0
635+ ) ;
636+ }
637+
575638 private startWriteStream ( ) : void {
576- assert ( ! this . writeStream . isStarted ( ) , "Can't restart started write stream" ) ;
639+ assert (
640+ this . shouldStartWriteStream ( ) ,
641+ 'startWriteStream() called when shouldStartWriteStream() is false.'
642+ ) ;
577643 this . writeStream . start ( ) ;
578644 }
579645
@@ -632,6 +698,11 @@ export class RemoteStore {
632698 }
633699
634700 private onWriteStreamClose ( error ?: FirestoreError ) : Promise < void > {
701+ assert (
702+ this . isNetworkEnabled ( ) ,
703+ 'onWriteStreamClose() should only be called when the network is enabled'
704+ ) ;
705+
635706 // Ignore close if there are no pending writes.
636707 if ( this . pendingWrites . length > 0 ) {
637708 assert (
@@ -653,7 +724,7 @@ export class RemoteStore {
653724 return errorHandling . then ( ( ) => {
654725 // The write stream might have been started by refilling the write
655726 // pipeline for failed writes
656- if ( this . pendingWrites . length > 0 && ! this . writeStream . isStarted ( ) ) {
727+ if ( this . shouldStartWriteStream ( ) ) {
657728 this . startWriteStream ( ) ;
658729 }
659730 } ) ;
@@ -713,33 +784,10 @@ export class RemoteStore {
713784 handleUserChange ( user : User ) : Promise < void > {
714785 log . debug ( LOG_TAG , 'RemoteStore changing users: uid=' , user . uid ) ;
715786
716- // Clear pending writes because those are per-user. Watched targets
717- // persist across users so don't clear those.
718- this . lastBatchSeen = BATCHID_UNKNOWN ;
719- this . pendingWrites = [ ] ;
720-
721- // Stop the streams. They promise not to call us back.
722- this . watchStream . stop ( ) ;
723- this . writeStream . stop ( ) ;
724-
725- this . cleanupWatchStreamState ( ) ;
726-
727- // Create new streams (but note they're not started yet).
728- return this . setupStreams ( )
729- . then ( ( ) => {
730- // If there are any watchedTargets, properly handle the stream
731- // restart now that RemoteStore is ready to handle them.
732- if ( this . shouldStartWatchStream ( ) ) {
733- this . startWatchStream ( ) ;
734- }
735-
736- // Resume any writes
737- return this . fillWritePipeline ( ) ;
738- } )
739- . then ( ( ) => {
740- // User change moves us back to the unknown state because we might
741- // not want to re-open the stream
742- this . setOnlineStateToUnknown ( ) ;
743- } ) ;
787+ // Tear down and re-create our network streams. This will ensure we get a fresh auth token
788+ // for the new user and re-fill the write pipeline with new mutations from the LocalStore
789+ // (since mutations are per-user).
790+ this . disableNetwork ( ) ;
791+ return this . enableNetwork ( ) ;
744792 }
745793}
0 commit comments