@@ -132,13 +132,6 @@ export class RemoteStore implements TargetMetadataProvider {
132
132
133
133
private onlineStateTracker : OnlineStateTracker ;
134
134
135
- /**
136
- * A barrier to track unresolved operations that block the restart of the
137
- * write stream. This is used to remove writes from the mutation queue if the
138
- * initial removal attempt failed.
139
- */
140
- private writeStreamBarrier = 0 ;
141
-
142
135
constructor (
143
136
/**
144
137
* The local store, used to fill the write pipeline with outbound mutations.
@@ -449,12 +442,13 @@ export class RemoteStore implements TargetMetadataProvider {
449
442
450
443
/**
451
444
* Recovery logic for IndexedDB errors that takes the network offline until
452
- * IndexedDb probing succeeds. Retries are scheduled with backoff using
453
- * `enqueueRetryable()`.
445
+ * `op` succeeds. Retries are scheduled with backoff using
446
+ * `enqueueRetryable()`. If `op()` is not provided, IndexedDB access is
447
+ * validated via a generic operation.
454
448
*/
455
449
private async disableNetworkUntilRecovery (
456
450
e : FirestoreError ,
457
- op ?: ( ) => Promise < void >
451
+ op ?: ( ) => Promise < unknown >
458
452
) : Promise < void > {
459
453
if ( e . name === 'IndexedDbTransactionError' ) {
460
454
debugAssert (
@@ -467,17 +461,17 @@ export class RemoteStore implements TargetMetadataProvider {
467
461
await this . disableNetworkInternal ( ) ;
468
462
this . onlineStateTracker . set ( OnlineState . Offline ) ;
469
463
464
+ if ( ! op ) {
465
+ // Use a simple read operation to determine if IndexedDB recovered.
466
+ // Ideally, we would expose a health check directly on SimpleDb, but
467
+ // RemoteStore only has access to persistence through LocalStore.
468
+ op = ( ) => this . localStore . getLastRemoteSnapshotVersion ( ) ;
469
+ }
470
+
470
471
// Probe IndexedDB periodically and re-enable network
471
472
this . asyncQueue . enqueueRetryable ( async ( ) => {
472
473
logDebug ( LOG_TAG , 'Retrying IndexedDB access' ) ;
473
- if ( op ) {
474
- await op ( ) ;
475
- } else {
476
- // Issue a simple read operation to determine if IndexedDB recovered.
477
- // Ideally, we would expose a health check directly on SimpleDb, but
478
- // RemoteStore only has access to persistence through LocalStore.
479
- await this . localStore . getLastRemoteSnapshotVersion ( ) ;
480
- }
474
+ await op ! ( ) ;
481
475
this . indexedDbFailed = false ;
482
476
await this . enableNetworkInternal ( ) ;
483
477
} ) ;
@@ -580,31 +574,32 @@ export class RemoteStore implements TargetMetadataProvider {
580
574
* Starts the write stream if necessary.
581
575
*/
582
576
async fillWritePipeline ( ) : Promise < void > {
583
- try {
584
- while ( this . canAddToWritePipeline ( ) ) {
585
- const lastBatchIdRetrieved =
586
- this . writePipeline . length > 0
587
- ? this . writePipeline [ this . writePipeline . length - 1 ] . batchId
588
- : BATCHID_UNKNOWN ;
577
+ while ( this . canAddToWritePipeline ( ) ) {
578
+ const lastBatchIdRetrieved =
579
+ this . writePipeline . length > 0
580
+ ? this . writePipeline [ this . writePipeline . length - 1 ] . batchId
581
+ : BATCHID_UNKNOWN ;
589
582
583
+ try {
590
584
const batch = await this . localStore . nextMutationBatch (
591
585
lastBatchIdRetrieved
592
586
) ;
593
-
594
- if ( batch ) {
587
+ if ( batch !== null ) {
595
588
this . addToWritePipeline ( batch ) ;
596
589
} else {
597
590
break ;
598
591
}
592
+ } catch ( e ) {
593
+ await this . disableNetworkUntilRecovery ( e ) ;
599
594
}
595
+ }
600
596
601
- if ( this . shouldStartWriteStream ( ) ) {
602
- this . startWriteStream ( ) ;
603
- } else if ( this . writePipeline . length === 0 ) {
604
- this . writeStream . markIdle ( ) ;
605
- }
606
- } catch ( e ) {
607
- await this . disableNetworkUntilRecovery ( e ) ;
597
+ if ( this . writePipeline . length === 0 ) {
598
+ this . writeStream . markIdle ( ) ;
599
+ }
600
+
601
+ if ( this . shouldStartWriteStream ( ) ) {
602
+ this . startWriteStream ( ) ;
608
603
}
609
604
}
610
605
@@ -642,7 +637,6 @@ export class RemoteStore implements TargetMetadataProvider {
642
637
private shouldStartWriteStream ( ) : boolean {
643
638
return (
644
639
this . canUseNetwork ( ) &&
645
- this . writeStreamBarrier === 0 &&
646
640
! this . writeStream . isStarted ( ) &&
647
641
this . writePipeline . length > 0
648
642
) ;
@@ -660,11 +654,12 @@ export class RemoteStore implements TargetMetadataProvider {
660
654
this . writeStream . writeHandshake ( ) ;
661
655
}
662
656
663
- private async onWriteHandshakeComplete ( ) : Promise < void > {
657
+ private onWriteHandshakeComplete ( ) : Promise < void > {
664
658
// Send the write pipeline now that the stream is established.
665
659
for ( const batch of this . writePipeline ) {
666
660
this . writeStream . writeMutations ( batch . mutations ) ;
667
661
}
662
+ return Promise . resolve ( ) ;
668
663
}
669
664
670
665
private async onMutationResult (
@@ -681,14 +676,16 @@ export class RemoteStore implements TargetMetadataProvider {
681
676
const success = MutationBatchResult . from ( batch , commitVersion , results ) ;
682
677
try {
683
678
await this . syncEngine . applySuccessfulWrite ( success ) ;
684
- // It's possible that with the completion of this mutation another
685
- // slot has freed up.
686
- await this . fillWritePipeline ( ) ;
687
679
} catch ( e ) {
688
680
await this . disableNetworkUntilRecovery ( e , ( ) =>
689
681
this . syncEngine . applySuccessfulWrite ( success )
690
682
) ;
683
+ return ;
691
684
}
685
+
686
+ // It's possible that with the completion of this mutation another
687
+ // slot has freed up.
688
+ await this . fillWritePipeline ( ) ;
692
689
}
693
690
694
691
private async onWriteStreamClose ( error ?: FirestoreError ) : Promise < void > {
@@ -730,14 +727,16 @@ export class RemoteStore implements TargetMetadataProvider {
730
727
731
728
try {
732
729
await this . syncEngine . rejectFailedWrite ( batch . batchId , error ) ;
733
- // It's possible that with the completion of this mutation
734
- // another slot has freed up.
735
- await this . fillWritePipeline ( ) ;
736
730
} catch ( e ) {
737
731
await this . disableNetworkUntilRecovery ( e , ( ) =>
738
732
this . syncEngine . rejectFailedWrite ( batch . batchId , error )
739
733
) ;
734
+ return ;
740
735
}
736
+
737
+ // It's possible that with the completion of this mutation
738
+ // another slot has freed up.
739
+ await this . fillWritePipeline ( ) ;
741
740
} else {
742
741
// Transient error, just let the retry logic kick in.
743
742
}
0 commit comments