Skip to content

Commit 379796b

Browse files
Merge 50d4701 into 0d6f14e
2 parents 0d6f14e + 50d4701 commit 379796b

File tree

3 files changed

+197
-49
lines changed

3 files changed

+197
-49
lines changed

packages/firestore/src/core/sync_engine.ts

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -525,18 +525,18 @@ export class SyncEngine implements RemoteSyncer {
525525

526526
const batchId = mutationBatchResult.batch.batchId;
527527

528-
// The local store may or may not be able to apply the write result and
529-
// raise events immediately (depending on whether the watcher is caught
530-
// up), so we raise user callbacks first so that they consistently happen
531-
// before listen events.
532-
this.processUserCallback(batchId, /*error=*/ null);
533-
534-
this.triggerPendingWritesCallbacks(batchId);
535-
536528
try {
537529
const changes = await this.localStore.acknowledgeBatch(
538530
mutationBatchResult
539531
);
532+
533+
// The local store may or may not be able to apply the write result and
534+
// raise events immediately (depending on whether the watcher is caught
535+
// up), so we raise user callbacks first so that they consistently happen
536+
// before listen events.
537+
this.processUserCallback(batchId, /*error=*/ null);
538+
this.triggerPendingWritesCallbacks(batchId);
539+
540540
this.sharedClientState.updateMutationState(batchId, 'acknowledged');
541541
await this.emitNewSnapsAndNotifyLocalStore(changes);
542542
} catch (error) {
@@ -550,16 +550,16 @@ export class SyncEngine implements RemoteSyncer {
550550
): Promise<void> {
551551
this.assertSubscribed('rejectFailedWrite()');
552552

553-
// The local store may or may not be able to apply the write result and
554-
// raise events immediately (depending on whether the watcher is caught up),
555-
// so we raise user callbacks first so that they consistently happen before
556-
// listen events.
557-
this.processUserCallback(batchId, error);
558-
559-
this.triggerPendingWritesCallbacks(batchId);
560-
561553
try {
562554
const changes = await this.localStore.rejectBatch(batchId);
555+
556+
// The local store may or may not be able to apply the write result and
557+
// raise events immediately (depending on whether the watcher is caught up),
558+
// so we raise user callbacks first so that they consistently happen before
559+
// listen events.
560+
this.processUserCallback(batchId, error);
561+
this.triggerPendingWritesCallbacks(batchId);
562+
563563
this.sharedClientState.updateMutationState(batchId, 'rejected', error);
564564
await this.emitNewSnapsAndNotifyLocalStore(changes);
565565
} catch (error) {

packages/firestore/src/remote/remote_store.ts

Lines changed: 72 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,11 @@ export class RemoteStore implements TargetMetadataProvider {
199199

200200
private async enableNetworkInternal(): Promise<void> {
201201
if (this.canUseNetwork()) {
202-
this.writeStream.lastStreamToken = await this.localStore.getLastStreamToken();
202+
try {
203+
this.writeStream.lastStreamToken = await this.localStore.getLastStreamToken();
204+
} catch (e) {
205+
return this.disableNetworkUntilRecovery(e);
206+
}
203207

204208
if (this.shouldStartWatchStream()) {
205209
this.startWatchStream();
@@ -445,10 +449,14 @@ export class RemoteStore implements TargetMetadataProvider {
445449

446450
/**
447451
* Recovery logic for IndexedDB errors that takes the network offline until
448-
* IndexedDb probing succeeds. Retries are scheduled with backoff using
449-
* `enqueueRetryable()`.
452+
* `op` succeeds. Retries are scheduled with backoff using
453+
* `enqueueRetryable()`. If `op()` is not provided, IndexedDB access is
454+
* validated via a generic operation.
450455
*/
451-
private async disableNetworkUntilRecovery(e: FirestoreError): Promise<void> {
456+
private async disableNetworkUntilRecovery(
457+
e: FirestoreError,
458+
op?: () => Promise<unknown>
459+
): Promise<void> {
452460
if (isIndexedDbTransactionError(e)) {
453461
debugAssert(
454462
!this.indexedDbFailed,
@@ -460,13 +468,17 @@ export class RemoteStore implements TargetMetadataProvider {
460468
await this.disableNetworkInternal();
461469
this.onlineStateTracker.set(OnlineState.Offline);
462470

471+
if (!op) {
472+
// Use a simple read operation to determine if IndexedDB recovered.
473+
// Ideally, we would expose a health check directly on SimpleDb, but
474+
// RemoteStore only has access to persistence through LocalStore.
475+
op = () => this.localStore.getLastRemoteSnapshotVersion();
476+
}
477+
463478
// Probe IndexedDB periodically and re-enable network
464479
this.asyncQueue.enqueueRetryable(async () => {
465480
logDebug(LOG_TAG, 'Retrying IndexedDB access');
466-
// Issue a simple read operation to determine if IndexedDB recovered.
467-
// Ideally, we would expose a health check directly on SimpleDb, but
468-
// RemoteStore only has access to persistence through LocalStore.
469-
await this.localStore.getLastRemoteSnapshotVersion();
481+
await op!();
470482
this.indexedDbFailed = false;
471483
await this.enableNetworkInternal();
472484
});
@@ -569,25 +581,30 @@ export class RemoteStore implements TargetMetadataProvider {
569581
* Starts the write stream if necessary.
570582
*/
571583
async fillWritePipeline(): Promise<void> {
572-
if (this.canAddToWritePipeline()) {
584+
while (this.canAddToWritePipeline()) {
573585
const lastBatchIdRetrieved =
574586
this.writePipeline.length > 0
575587
? this.writePipeline[this.writePipeline.length - 1].batchId
576588
: BATCHID_UNKNOWN;
577-
const batch = await this.localStore.nextMutationBatch(
578-
lastBatchIdRetrieved
579-
);
580589

581-
if (batch === null) {
582-
if (this.writePipeline.length === 0) {
583-
this.writeStream.markIdle();
590+
try {
591+
const batch = await this.localStore.nextMutationBatch(
592+
lastBatchIdRetrieved
593+
);
594+
if (batch !== null) {
595+
this.addToWritePipeline(batch);
596+
} else {
597+
break;
584598
}
585-
} else {
586-
this.addToWritePipeline(batch);
587-
await this.fillWritePipeline();
599+
} catch (e) {
600+
await this.disableNetworkUntilRecovery(e);
588601
}
589602
}
590603

604+
if (this.writePipeline.length === 0) {
605+
this.writeStream.markIdle();
606+
}
607+
591608
if (this.shouldStartWriteStream()) {
592609
this.startWriteStream();
593610
}
@@ -654,10 +671,15 @@ export class RemoteStore implements TargetMetadataProvider {
654671
this.writeStream.writeMutations(batch.mutations);
655672
}
656673
})
657-
.catch(ignoreIfPrimaryLeaseLoss);
674+
.catch(ignoreIfPrimaryLeaseLoss)
675+
.catch(e =>
676+
this.disableNetworkUntilRecovery(e, () =>
677+
this.localStore.setLastStreamToken(this.writeStream.lastStreamToken)
678+
)
679+
);
658680
}
659681

660-
private onMutationResult(
682+
private async onMutationResult(
661683
commitVersion: SnapshotVersion,
662684
results: MutationResult[]
663685
): Promise<void> {
@@ -674,11 +696,18 @@ export class RemoteStore implements TargetMetadataProvider {
674696
results,
675697
this.writeStream.lastStreamToken
676698
);
677-
return this.syncEngine.applySuccessfulWrite(success).then(() => {
678-
// It's possible that with the completion of this mutation another
679-
// slot has freed up.
680-
return this.fillWritePipeline();
681-
});
699+
try {
700+
await this.syncEngine.applySuccessfulWrite(success);
701+
} catch (e) {
702+
await this.disableNetworkUntilRecovery(e, () =>
703+
this.syncEngine.applySuccessfulWrite(success)
704+
);
705+
return;
706+
}
707+
708+
// It's possible that with the completion of this mutation another
709+
// slot has freed up.
710+
await this.fillWritePipeline();
682711
}
683712

684713
private async onWriteStreamClose(error?: FirestoreError): Promise<void> {
@@ -727,7 +756,12 @@ export class RemoteStore implements TargetMetadataProvider {
727756

728757
return this.localStore
729758
.setLastStreamToken(ByteString.EMPTY_BYTE_STRING)
730-
.catch(ignoreIfPrimaryLeaseLoss);
759+
.catch(ignoreIfPrimaryLeaseLoss)
760+
.catch(e =>
761+
this.disableNetworkUntilRecovery(e, () =>
762+
this.localStore.setLastStreamToken(ByteString.EMPTY_BYTE_STRING)
763+
)
764+
);
731765
} else {
732766
// Some other error, don't reset stream token. Our stream logic will
733767
// just retry with exponential backoff.
@@ -747,13 +781,18 @@ export class RemoteStore implements TargetMetadataProvider {
747781
// restart.
748782
this.writeStream.inhibitBackoff();
749783

750-
return this.syncEngine
751-
.rejectFailedWrite(batch.batchId, error)
752-
.then(() => {
753-
// It's possible that with the completion of this mutation
754-
// another slot has freed up.
755-
return this.fillWritePipeline();
756-
});
784+
try {
785+
await this.syncEngine.rejectFailedWrite(batch.batchId, error);
786+
} catch (e) {
787+
await this.disableNetworkUntilRecovery(e, () =>
788+
this.syncEngine.rejectFailedWrite(batch.batchId, error)
789+
);
790+
return;
791+
}
792+
793+
// It's possible that with the completion of this mutation
794+
// another slot has freed up.
795+
await this.fillWritePipeline();
757796
} else {
758797
// Transient error, just let the retry logic kick in.
759798
}

packages/firestore/test/unit/specs/recovery_spec.test.ts

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,115 @@ describeSpec('Persistence Recovery', ['no-ios', 'no-android'], () => {
336336
.expectEvents(query, { metadata: [doc1, doc3] });
337337
});
338338

339+
specTest('Recovers when write acknowledgment cannot be persisted', [], () => {
340+
return spec()
341+
.userSets('collection/a', { v: 1 })
342+
.userSets('collection/b', { v: 2 })
343+
.userSets('collection/c', { v: 3 })
344+
.writeAcks('collection/a', 1)
345+
.failDatabaseTransactions('Acknowledge batch')
346+
.writeAcks('collection/b', 2, { expectUserCallback: false })
347+
.recoverDatabase()
348+
.runTimer(TimerId.AsyncQueueRetry)
349+
.expectUserCallbacks({ acknowledged: ['collection/b'] })
350+
.writeAcks('collection/c', 1);
351+
});
352+
353+
specTest('Recovers when write rejection cannot be persisted', [], () => {
354+
return spec()
355+
.userPatches('collection/a', { v: 1 })
356+
.userPatches('collection/a', { v: 2 })
357+
.userPatches('collection/c', { v: 3 })
358+
.failWrite(
359+
'collection/a',
360+
new RpcError(Code.FAILED_PRECONDITION, 'Simulated test error')
361+
)
362+
.failDatabaseTransactions('Reject batch')
363+
.failWrite(
364+
'collection/b',
365+
new RpcError(Code.FAILED_PRECONDITION, 'Simulated test error'),
366+
{ expectUserCallback: false }
367+
)
368+
.recoverDatabase()
369+
.runTimer(TimerId.AsyncQueueRetry)
370+
.expectUserCallbacks({ rejected: ['collection/a'] })
371+
.failWrite(
372+
'collection/c',
373+
new RpcError(Code.FAILED_PRECONDITION, 'Simulated test error')
374+
);
375+
});
376+
377+
specTest(
378+
'Recovers when write acknowledgment cannot be persisted (with restart)',
379+
['durable-persistence'],
380+
() => {
381+
// This test verifies the current behavior of the client, which is not
382+
// ideal. Instead of resending the write to 'collection/b' (whose
383+
// rejection failed with an IndexedDB failure), the client should drop the
384+
// write.
385+
return spec()
386+
.userSets('collection/a', { v: 1 })
387+
.userSets('collection/b', { v: 2 })
388+
.userSets('collection/c', { v: 3 })
389+
.writeAcks('collection/a', 1)
390+
.failDatabaseTransactions('Acknowledge batch')
391+
.writeAcks('collection/b', 2, {
392+
expectUserCallback: false,
393+
keepInQueue: true
394+
})
395+
.restart()
396+
.expectNumOutstandingWrites(2)
397+
.writeAcks('collection/b', 2, { expectUserCallback: false })
398+
.writeAcks('collection/c', 3, { expectUserCallback: false });
399+
}
400+
);
401+
402+
specTest('Writes are pending until acknowledgement is persisted', [], () => {
403+
const query = Query.atPath(path('collection'));
404+
const doc1Local = doc(
405+
'collection/a',
406+
0,
407+
{ v: 1 },
408+
{ hasLocalMutations: true }
409+
);
410+
const doc1 = doc('collection/a', 1001, { v: 1 });
411+
const doc2Local = doc(
412+
'collection/b',
413+
0,
414+
{ v: 2 },
415+
{ hasLocalMutations: true }
416+
);
417+
const doc2 = doc('collection/b', 1002, { v: 2 });
418+
return (
419+
spec()
420+
.userListens(query)
421+
.watchAcksFull(query, 1000)
422+
.expectEvents(query, {})
423+
.userSets('collection/a', { v: 1 })
424+
.expectEvents(query, { added: [doc1Local], hasPendingWrites: true })
425+
.userSets('collection/b', { v: 2 })
426+
.expectEvents(query, { added: [doc2Local], hasPendingWrites: true })
427+
.failDatabaseTransactions('Acknowledge batch')
428+
.writeAcks('collection/a', 1, { expectUserCallback: false })
429+
// The write ack cannot be persisted and the client goes offline, which
430+
// clears all active targets, but doesn't raise a new snapshot since
431+
// the document is still marked `hasPendingWrites`.
432+
.expectEvents(query, { fromCache: true, hasPendingWrites: true })
433+
.expectActiveTargets()
434+
.recoverDatabase()
435+
.runTimer(TimerId.AsyncQueueRetry)
436+
// Client is back online
437+
.expectActiveTargets({ query, resumeToken: 'resume-token-1000' })
438+
.expectUserCallbacks({ acknowledged: ['collection/a'] })
439+
.watchAcksFull(query, 1001, doc1)
440+
.expectEvents(query, { metadata: [doc1], hasPendingWrites: true })
441+
.writeAcks('collection/b', 2)
442+
.watchSends({ affects: [query] }, doc2)
443+
.watchSnapshots(1002)
444+
.expectEvents(query, { metadata: [doc2] })
445+
);
446+
});
447+
339448
specTest(
340449
'Surfaces local documents if notifyLocalViewChanges fails',
341450
[],

0 commit comments

Comments
 (0)