@@ -443,10 +443,17 @@ export class RemoteStore implements TargetMetadataProvider {
443
443
444
444
/**
445
445
* Recovery logic for IndexedDB errors that takes the network offline until
446
- * IndexedDb probing succeeds. Retries are scheduled with backoff using
447
- * `enqueueRetryable()`.
446
+ * `op` succeeds. Retries are scheduled with backoff using
447
+ * `enqueueRetryable()`. If `op()` is not provided, IndexedDB access is
448
+ * validated via a generic operation.
449
+ *
450
+ * The returned Promise is resolved once the network is disabled and before
451
+ * any retry attempt.
448
452
*/
449
- private async disableNetworkUntilRecovery ( e : FirestoreError ) : Promise < void > {
453
+ private async disableNetworkUntilRecovery (
454
+ e : FirestoreError ,
455
+ op ?: ( ) => Promise < unknown >
456
+ ) : Promise < void > {
450
457
if ( isIndexedDbTransactionError ( e ) ) {
451
458
debugAssert (
452
459
! this . indexedDbFailed ,
@@ -458,13 +465,17 @@ export class RemoteStore implements TargetMetadataProvider {
458
465
await this . disableNetworkInternal ( ) ;
459
466
this . onlineStateTracker . set ( OnlineState . Offline ) ;
460
467
468
+ if ( ! op ) {
469
+ // Use a simple read operation to determine if IndexedDB recovered.
470
+ // Ideally, we would expose a health check directly on SimpleDb, but
471
+ // RemoteStore only has access to persistence through LocalStore.
472
+ op = ( ) => this . localStore . getLastRemoteSnapshotVersion ( ) ;
473
+ }
474
+
461
475
// Probe IndexedDB periodically and re-enable network
462
476
this . asyncQueue . enqueueRetryable ( async ( ) => {
463
477
logDebug ( LOG_TAG , 'Retrying IndexedDB access' ) ;
464
- // Issue a simple read operation to determine if IndexedDB recovered.
465
- // Ideally, we would expose a health check directly on SimpleDb, but
466
- // RemoteStore only has access to persistence through LocalStore.
467
- await this . localStore . getLastRemoteSnapshotVersion ( ) ;
478
+ await op ! ( ) ;
468
479
this . indexedDbFailed = false ;
469
480
await this . enableNetworkInternal ( ) ;
470
481
} ) ;
@@ -473,6 +484,14 @@ export class RemoteStore implements TargetMetadataProvider {
473
484
}
474
485
}
475
486
487
+ /**
488
+ * Executes `op`. If `op` fails, takes the network offline until `op`
489
+ * succeeds. Returns after the first attempt.
490
+ */
491
+ private executeWithRecovery ( op : ( ) => Promise < void > ) : Promise < void > {
492
+ return op ( ) . catch ( e => this . disableNetworkUntilRecovery ( e , op ) ) ;
493
+ }
494
+
476
495
/**
477
496
* Takes a batch of changes from the Datastore, repackages them as a
478
497
* RemoteEvent, and passes that on to the listener, which is typically the
@@ -567,22 +586,28 @@ export class RemoteStore implements TargetMetadataProvider {
567
586
* Starts the write stream if necessary.
568
587
*/
569
588
async fillWritePipeline ( ) : Promise < void > {
570
- if ( this . canAddToWritePipeline ( ) ) {
571
- const lastBatchIdRetrieved =
572
- this . writePipeline . length > 0
573
- ? this . writePipeline [ this . writePipeline . length - 1 ] . batchId
574
- : BATCHID_UNKNOWN ;
575
- const batch = await this . localStore . nextMutationBatch (
576
- lastBatchIdRetrieved
577
- ) ;
589
+ let lastBatchIdRetrieved =
590
+ this . writePipeline . length > 0
591
+ ? this . writePipeline [ this . writePipeline . length - 1 ] . batchId
592
+ : BATCHID_UNKNOWN ;
578
593
579
- if ( batch === null ) {
580
- if ( this . writePipeline . length === 0 ) {
581
- this . writeStream . markIdle ( ) ;
594
+ while ( this . canAddToWritePipeline ( ) ) {
595
+ try {
596
+ const batch = await this . localStore . nextMutationBatch (
597
+ lastBatchIdRetrieved
598
+ ) ;
599
+
600
+ if ( batch === null ) {
601
+ if ( this . writePipeline . length === 0 ) {
602
+ this . writeStream . markIdle ( ) ;
603
+ }
604
+ break ;
605
+ } else {
606
+ lastBatchIdRetrieved = batch . batchId ;
607
+ this . addToWritePipeline ( batch ) ;
582
608
}
583
- } else {
584
- this . addToWritePipeline ( batch ) ;
585
- await this . fillWritePipeline ( ) ;
609
+ } catch ( e ) {
610
+ await this . disableNetworkUntilRecovery ( e ) ;
586
611
}
587
612
}
588
613
@@ -649,7 +674,7 @@ export class RemoteStore implements TargetMetadataProvider {
649
674
}
650
675
}
651
676
652
- private onMutationResult (
677
+ private async onMutationResult (
653
678
commitVersion : SnapshotVersion ,
654
679
results : MutationResult [ ]
655
680
) : Promise < void > {
@@ -661,11 +686,14 @@ export class RemoteStore implements TargetMetadataProvider {
661
686
) ;
662
687
const batch = this . writePipeline . shift ( ) ! ;
663
688
const success = MutationBatchResult . from ( batch , commitVersion , results ) ;
664
- return this . syncEngine . applySuccessfulWrite ( success ) . then ( ( ) => {
665
- // It's possible that with the completion of this mutation another
666
- // slot has freed up.
667
- return this . fillWritePipeline ( ) ;
668
- } ) ;
689
+
690
+ await this . executeWithRecovery ( ( ) =>
691
+ this . syncEngine . applySuccessfulWrite ( success )
692
+ ) ;
693
+
694
+ // It's possible that with the completion of this mutation another
695
+ // slot has freed up.
696
+ await this . fillWritePipeline ( ) ;
669
697
}
670
698
671
699
private async onWriteStreamClose ( error ?: FirestoreError ) : Promise < void > {
@@ -705,13 +733,13 @@ export class RemoteStore implements TargetMetadataProvider {
705
733
// restart.
706
734
this . writeStream . inhibitBackoff ( ) ;
707
735
708
- return this . syncEngine
709
- . rejectFailedWrite ( batch . batchId , error )
710
- . then ( ( ) => {
711
- // It's possible that with the completion of this mutation
712
- // another slot has freed up.
713
- return this . fillWritePipeline ( ) ;
714
- } ) ;
736
+ await this . executeWithRecovery ( ( ) =>
737
+ this . syncEngine . rejectFailedWrite ( batch . batchId , error )
738
+ ) ;
739
+
740
+ // It's possible that with the completion of this mutation
741
+ // another slot has freed up.
742
+ await this . fillWritePipeline ( ) ;
715
743
} else {
716
744
// Transient error, just let the retry logic kick in.
717
745
}
0 commit comments