Skip to content

Commit 3dce20e

Browse files
Write stream retries
1 parent 2a6ee9c commit 3dce20e

File tree

3 files changed

+208
-50
lines changed

3 files changed

+208
-50
lines changed

packages/firestore/src/core/sync_engine.ts

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -530,18 +530,18 @@ export class SyncEngine implements RemoteSyncer {
530530

531531
const batchId = mutationBatchResult.batch.batchId;
532532

533-
// The local store may or may not be able to apply the write result and
534-
// raise events immediately (depending on whether the watcher is caught
535-
// up), so we raise user callbacks first so that they consistently happen
536-
// before listen events.
537-
this.processUserCallback(batchId, /*error=*/ null);
538-
539-
this.triggerPendingWritesCallbacks(batchId);
540-
541533
try {
542534
const changes = await this.localStore.acknowledgeBatch(
543535
mutationBatchResult
544536
);
537+
538+
// The local store may or may not be able to apply the write result and
539+
// raise events immediately (depending on whether the watcher is caught
540+
// up), so we raise user callbacks first so that they consistently happen
541+
// before listen events.
542+
this.processUserCallback(batchId, /*error=*/ null);
543+
this.triggerPendingWritesCallbacks(batchId);
544+
545545
this.sharedClientState.updateMutationState(batchId, 'acknowledged');
546546
await this.emitNewSnapsAndNotifyLocalStore(changes);
547547
} catch (error) {
@@ -555,16 +555,16 @@ export class SyncEngine implements RemoteSyncer {
555555
): Promise<void> {
556556
this.assertSubscribed('rejectFailedWrite()');
557557

558-
// The local store may or may not be able to apply the write result and
559-
// raise events immediately (depending on whether the watcher is caught up),
560-
// so we raise user callbacks first so that they consistently happen before
561-
// listen events.
562-
this.processUserCallback(batchId, error);
563-
564-
this.triggerPendingWritesCallbacks(batchId);
565-
566558
try {
567559
const changes = await this.localStore.rejectBatch(batchId);
560+
561+
// The local store may or may not be able to apply the write result and
562+
// raise events immediately (depending on whether the watcher is caught up),
563+
// so we raise user callbacks first so that they consistently happen before
564+
// listen events.
565+
this.processUserCallback(batchId, error);
566+
this.triggerPendingWritesCallbacks(batchId);
567+
568568
this.sharedClientState.updateMutationState(batchId, 'rejected', error);
569569
await this.emitNewSnapsAndNotifyLocalStore(changes);
570570
} catch (error) {

packages/firestore/src/remote/remote_store.ts

Lines changed: 82 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -125,14 +125,21 @@ export class RemoteStore implements TargetMetadataProvider {
125125

126126
private onlineStateTracker: OnlineStateTracker;
127127

128+
/**
129+
* A barrier to track unresolved operations that block the restart of the
130+
* write stream. This is used to remove writes from the mutation queue if the
131+
* initial removal attempt failed.
132+
*/
133+
private writeStreamBarrier = 0;
134+
128135
constructor(
129136
/**
130137
* The local store, used to fill the write pipeline with outbound mutations.
131138
*/
132139
private localStore: LocalStore,
133140
/** The client-side proxy for interacting with the backend. */
134141
private datastore: Datastore,
135-
asyncQueue: AsyncQueue,
142+
private asyncQueue: AsyncQueue,
136143
onlineStateHandler: (onlineState: OnlineState) => void,
137144
connectivityMonitor: ConnectivityMonitor
138145
) {
@@ -184,9 +191,12 @@ export class RemoteStore implements TargetMetadataProvider {
184191
}
185192

186193
/** Re-enables the network. Idempotent. */
187-
async enableNetwork(): Promise<void> {
194+
enableNetwork(): Promise<void> {
188195
this.networkEnabled = true;
196+
return this.enableNetworkInternal();
197+
}
189198

199+
async enableNetworkInternal(): Promise<void> {
190200
if (this.canUseNetwork()) {
191201
if (this.shouldStartWatchStream()) {
192202
this.startWatchStream();
@@ -226,6 +236,29 @@ export class RemoteStore implements TargetMetadataProvider {
226236
this.cleanUpWatchStreamState();
227237
}
228238

239+
/**
240+
* Recovery logic for IndexedDB errors that takes the network offline until
241+
* `op` succeeds. Retries are scheduled with backoff using `enqueueRetryable()`.
242+
*/
243+
private async disableNetworkUntilRecovery(
244+
e: FirestoreError,
245+
op: () => Promise<void>
246+
): Promise<void> {
247+
if (e.name === 'IndexedDbTransactionError') {
248+
// Increment the write stream barrier to prevent out of band stream
249+
// restarts.
250+
++this.writeStreamBarrier;
251+
await this.disableNetworkInternal();
252+
this.asyncQueue.enqueueRetryable(async () => {
253+
await op();
254+
--this.writeStreamBarrier;
255+
await this.enableNetworkInternal();
256+
});
257+
} else {
258+
throw e;
259+
}
260+
}
261+
229262
async shutdown(): Promise<void> {
230263
logDebug(LOG_TAG, 'RemoteStore shutting down.');
231264
this.networkEnabled = false;
@@ -337,7 +370,9 @@ export class RemoteStore implements TargetMetadataProvider {
337370
}
338371

339372
canUseNetwork(): boolean {
340-
return this.isPrimary && this.networkEnabled;
373+
return (
374+
this.writeStreamBarrier === 0 && this.isPrimary && this.networkEnabled
375+
);
341376
}
342377

343378
private cleanUpWatchStreamState(): void {
@@ -510,27 +545,31 @@ export class RemoteStore implements TargetMetadataProvider {
510545
* Starts the write stream if necessary.
511546
*/
512547
async fillWritePipeline(): Promise<void> {
513-
if (this.canAddToWritePipeline()) {
514-
const lastBatchIdRetrieved =
515-
this.writePipeline.length > 0
516-
? this.writePipeline[this.writePipeline.length - 1].batchId
517-
: BATCHID_UNKNOWN;
518-
const batch = await this.localStore.nextMutationBatch(
519-
lastBatchIdRetrieved
520-
);
521-
522-
if (batch === null) {
523-
if (this.writePipeline.length === 0) {
524-
this.writeStream.markIdle();
548+
try {
549+
while (this.canAddToWritePipeline()) {
550+
const lastBatchIdRetrieved =
551+
this.writePipeline.length > 0
552+
? this.writePipeline[this.writePipeline.length - 1].batchId
553+
: BATCHID_UNKNOWN;
554+
555+
const batch = await this.localStore.nextMutationBatch(
556+
lastBatchIdRetrieved
557+
);
558+
559+
if (batch) {
560+
this.addToWritePipeline(batch);
561+
} else {
562+
break;
525563
}
526-
} else {
527-
this.addToWritePipeline(batch);
528-
await this.fillWritePipeline();
529564
}
530-
}
531565

532-
if (this.shouldStartWriteStream()) {
533-
this.startWriteStream();
566+
if (this.shouldStartWriteStream()) {
567+
this.startWriteStream();
568+
} else if (this.writePipeline.length === 0) {
569+
this.writeStream.markIdle();
570+
}
571+
} catch (e) {
572+
await this.disableNetworkUntilRecovery(e, () => Promise.resolve());
534573
}
535574
}
536575

@@ -568,6 +607,7 @@ export class RemoteStore implements TargetMetadataProvider {
568607
private shouldStartWriteStream(): boolean {
569608
return (
570609
this.canUseNetwork() &&
610+
this.writeStreamBarrier === 0 &&
571611
!this.writeStream.isStarted() &&
572612
this.writePipeline.length > 0
573613
);
@@ -592,7 +632,7 @@ export class RemoteStore implements TargetMetadataProvider {
592632
}
593633
}
594634

595-
private onMutationResult(
635+
private async onMutationResult(
596636
commitVersion: SnapshotVersion,
597637
results: MutationResult[]
598638
): Promise<void> {
@@ -604,11 +644,16 @@ export class RemoteStore implements TargetMetadataProvider {
604644
);
605645
const batch = this.writePipeline.shift()!;
606646
const success = MutationBatchResult.from(batch, commitVersion, results);
607-
return this.syncEngine.applySuccessfulWrite(success).then(() => {
647+
try {
648+
await this.syncEngine.applySuccessfulWrite(success);
608649
// It's possible that with the completion of this mutation another
609650
// slot has freed up.
610-
return this.fillWritePipeline();
611-
});
651+
await this.fillWritePipeline();
652+
} catch (e) {
653+
await this.disableNetworkUntilRecovery(e, () =>
654+
this.syncEngine.applySuccessfulWrite(success)
655+
);
656+
}
612657
}
613658

614659
private async onWriteStreamClose(error?: FirestoreError): Promise<void> {
@@ -621,8 +666,8 @@ export class RemoteStore implements TargetMetadataProvider {
621666
);
622667
}
623668

624-
// If the write stream closed after the write handshake completes, a write
625-
// operation failed and we fail the pending operation.
669+
// An error that occurs after the write handshake completes is an indication
670+
// that the write operation itself failed.
626671
if (error && this.writeStream.handshakeComplete) {
627672
// This error affects the actual write.
628673
await this.handleWriteError(error!);
@@ -648,13 +693,16 @@ export class RemoteStore implements TargetMetadataProvider {
648693
// restart.
649694
this.writeStream.inhibitBackoff();
650695

651-
return this.syncEngine
652-
.rejectFailedWrite(batch.batchId, error)
653-
.then(() => {
654-
// It's possible that with the completion of this mutation
655-
// another slot has freed up.
656-
return this.fillWritePipeline();
657-
});
696+
try {
697+
await this.syncEngine.rejectFailedWrite(batch.batchId, error);
698+
// It's possible that with the completion of this mutation
699+
// another slot has freed up.
700+
await this.fillWritePipeline();
701+
} catch (e) {
702+
await this.disableNetworkUntilRecovery(e, () =>
703+
this.syncEngine.rejectFailedWrite(batch.batchId, error)
704+
);
705+
}
658706
} else {
659707
// Transient error, just let the retry logic kick in.
660708
}

packages/firestore/test/unit/specs/recovery_spec.test.ts

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import { client, spec } from './spec_builder';
2020
import { TimerId } from '../../../src/util/async_queue';
2121
import { Query } from '../../../src/core/query';
2222
import { doc, path } from '../../util/helpers';
23+
import { Code } from '../../../src/util/error';
24+
import { RpcError } from './spec_rpc_error';
2325

2426
describeSpec('Persistence Recovery', ['no-ios', 'no-android'], () => {
2527
specTest(
@@ -162,4 +164,112 @@ describeSpec('Persistence Recovery', ['no-ios', 'no-android'], () => {
162164
.watchAcksFull(query, 2, doc1, doc3)
163165
.expectEvents(query, { metadata: [doc1, doc3] });
164166
});
167+
168+
specTest('Recovers when write acknowledgment cannot be persisted', [], () => {
169+
return spec()
170+
.userSets('collection/a', { v: 1 })
171+
.userSets('collection/b', { v: 2 })
172+
.userSets('collection/c', { v: 3 })
173+
.writeAcks('collection/a', 1)
174+
.failDatabase()
175+
.writeAcks('collection/b', 2, { expectUserCallback: false })
176+
.recoverDatabase()
177+
.runTimer(TimerId.AsyncQueueRetry)
178+
.expectUserCallbacks({ acknowledged: ['collection/b'] })
179+
.writeAcks('collection/c', 1);
180+
});
181+
182+
specTest('Recovers when write rejection cannot be persisted', [], () => {
183+
return spec()
184+
.userPatches('collection/a', { v: 1 })
185+
.userPatches('collection/a', { v: 2 })
186+
.userPatches('collection/c', { v: 3 })
187+
.failWrite(
188+
'collection/a',
189+
new RpcError(Code.FAILED_PRECONDITION, 'Simulated test error')
190+
)
191+
.failDatabase()
192+
.failWrite(
193+
'collection/b',
194+
new RpcError(Code.FAILED_PRECONDITION, 'Simulated test error'),
195+
{ expectUserCallback: false }
196+
)
197+
.recoverDatabase()
198+
.runTimer(TimerId.AsyncQueueRetry)
199+
.expectUserCallbacks({ rejected: ['collection/a'] })
200+
.failWrite(
201+
'collection/c',
202+
new RpcError(Code.FAILED_PRECONDITION, 'Simulated test error')
203+
);
204+
});
205+
206+
specTest(
207+
'Recovers when write acknowledgment cannot be persisted (with restart)',
208+
['durable-persistence'],
209+
() => {
210+
// This test verifies the current behavior of the client, which is not
211+
// ideal. Instead of resending the write to 'collection/b' (whose
212+
// rejection failed with an IndexedDB failure), the client should drop the
213+
// write.
214+
return spec()
215+
.userSets('collection/a', { v: 1 })
216+
.userSets('collection/b', { v: 2 })
217+
.userSets('collection/c', { v: 3 })
218+
.writeAcks('collection/a', 1)
219+
.failDatabase()
220+
.writeAcks('collection/b', 2, {
221+
expectUserCallback: false,
222+
keepInQueue: true
223+
})
224+
.restart()
225+
.expectNumOutstandingWrites(2)
226+
.writeAcks('collection/b', 2, { expectUserCallback: false })
227+
.writeAcks('collection/c', 3, { expectUserCallback: false });
228+
}
229+
);
230+
231+
specTest('Writes are pending until acknowledgement is persisted', [], () => {
232+
const query = Query.atPath(path('collection'));
233+
const doc1Local = doc(
234+
'collection/a',
235+
0,
236+
{ v: 1 },
237+
{ hasLocalMutations: true }
238+
);
239+
const doc1 = doc('collection/a', 1001, { v: 1 });
240+
const doc2Local = doc(
241+
'collection/b',
242+
0,
243+
{ v: 2 },
244+
{ hasLocalMutations: true }
245+
);
246+
const doc2 = doc('collection/b', 1002, { v: 2 });
247+
return (
248+
spec()
249+
.userListens(query)
250+
.watchAcksFull(query, 1000)
251+
.expectEvents(query, {})
252+
.userSets('collection/a', { v: 1 })
253+
.expectEvents(query, { added: [doc1Local], hasPendingWrites: true })
254+
.userSets('collection/b', { v: 2 })
255+
.expectEvents(query, { added: [doc2Local], hasPendingWrites: true })
256+
.failDatabase()
257+
.writeAcks('collection/a', 1, { expectUserCallback: false })
258+
// The write ack cannot be persisted and the client goes offline, which
259+
// clears all active targets, but doesn't raise a new snapshot since
260+
// the document is still marked `hasPendingWrites`.
261+
.expectActiveTargets()
262+
.recoverDatabase()
263+
.runTimer(TimerId.AsyncQueueRetry)
264+
// Client is back online
265+
.expectActiveTargets({ query, resumeToken: 'resume-token-1000' })
266+
.expectUserCallbacks({ acknowledged: ['collection/a'] })
267+
.watchAcksFull(query, 1001, doc1)
268+
.expectEvents(query, { metadata: [doc1], hasPendingWrites: true })
269+
.writeAcks('collection/b', 2)
270+
.watchSends({ affects: [query] }, doc2)
271+
.watchSnapshots(1002)
272+
.expectEvents(query, { metadata: [doc2] })
273+
);
274+
});
165275
});

0 commit comments

Comments
 (0)