Skip to content

Take WriteStream offline when IndexedDB is unavailable #2995

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 2, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions packages/firestore/src/core/sync_engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -525,18 +525,18 @@ export class SyncEngine implements RemoteSyncer {

const batchId = mutationBatchResult.batch.batchId;

// The local store may or may not be able to apply the write result and
// raise events immediately (depending on whether the watcher is caught
// up), so we raise user callbacks first so that they consistently happen
// before listen events.
this.processUserCallback(batchId, /*error=*/ null);

this.triggerPendingWritesCallbacks(batchId);

try {
const changes = await this.localStore.acknowledgeBatch(
mutationBatchResult
);

// The local store may or may not be able to apply the write result and
// raise events immediately (depending on whether the watcher is caught
// up), so we raise user callbacks first so that they consistently happen
// before listen events.
this.processUserCallback(batchId, /*error=*/ null);
this.triggerPendingWritesCallbacks(batchId);

this.sharedClientState.updateMutationState(batchId, 'acknowledged');
await this.emitNewSnapsAndNotifyLocalStore(changes);
} catch (error) {
Expand All @@ -550,16 +550,16 @@ export class SyncEngine implements RemoteSyncer {
): Promise<void> {
this.assertSubscribed('rejectFailedWrite()');

// The local store may or may not be able to apply the write result and
// raise events immediately (depending on whether the watcher is caught up),
// so we raise user callbacks first so that they consistently happen before
// listen events.
this.processUserCallback(batchId, error);

this.triggerPendingWritesCallbacks(batchId);

try {
const changes = await this.localStore.rejectBatch(batchId);

// The local store may or may not be able to apply the write result and
// raise events immediately (depending on whether the watcher is caught up),
// so we raise user callbacks first so that they consistently happen before
// listen events.
this.processUserCallback(batchId, error);
this.triggerPendingWritesCallbacks(batchId);

this.sharedClientState.updateMutationState(batchId, 'rejected', error);
await this.emitNewSnapsAndNotifyLocalStore(changes);
} catch (error) {
Expand Down
95 changes: 61 additions & 34 deletions packages/firestore/src/remote/remote_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -443,10 +443,17 @@ export class RemoteStore implements TargetMetadataProvider {

/**
* Recovery logic for IndexedDB errors that takes the network offline until
* IndexedDb probing succeeds. Retries are scheduled with backoff using
* `enqueueRetryable()`.
* `op` succeeds. Retries are scheduled with backoff using
* `enqueueRetryable()`. If `op()` is not provided, IndexedDB access is
* validated via a generic operation.
*
* The returned Promise is resolved once the network is disabled and before
* any retry attempt.
*/
private async disableNetworkUntilRecovery(e: FirestoreError): Promise<void> {
private async disableNetworkUntilRecovery(
e: FirestoreError,
op?: () => Promise<unknown>
): Promise<void> {
if (isIndexedDbTransactionError(e)) {
debugAssert(
!this.indexedDbFailed,
Expand All @@ -458,13 +465,17 @@ export class RemoteStore implements TargetMetadataProvider {
await this.disableNetworkInternal();
this.onlineStateTracker.set(OnlineState.Offline);

if (!op) {
// Use a simple read operation to determine if IndexedDB recovered.
// Ideally, we would expose a health check directly on SimpleDb, but
// RemoteStore only has access to persistence through LocalStore.
op = () => this.localStore.getLastRemoteSnapshotVersion();
}

// Probe IndexedDB periodically and re-enable network
this.asyncQueue.enqueueRetryable(async () => {
logDebug(LOG_TAG, 'Retrying IndexedDB access');
// Issue a simple read operation to determine if IndexedDB recovered.
// Ideally, we would expose a health check directly on SimpleDb, but
// RemoteStore only has access to persistence through LocalStore.
await this.localStore.getLastRemoteSnapshotVersion();
await op!();
this.indexedDbFailed = false;
await this.enableNetworkInternal();
});
Expand Down Expand Up @@ -567,22 +578,28 @@ export class RemoteStore implements TargetMetadataProvider {
* Starts the write stream if necessary.
*/
async fillWritePipeline(): Promise<void> {
if (this.canAddToWritePipeline()) {
const lastBatchIdRetrieved =
this.writePipeline.length > 0
? this.writePipeline[this.writePipeline.length - 1].batchId
: BATCHID_UNKNOWN;
const batch = await this.localStore.nextMutationBatch(
lastBatchIdRetrieved
);
let lastBatchIdRetrieved =
this.writePipeline.length > 0
? this.writePipeline[this.writePipeline.length - 1].batchId
: BATCHID_UNKNOWN;

while (this.canAddToWritePipeline()) {
try {
const batch = await this.localStore.nextMutationBatch(
lastBatchIdRetrieved
);

if (batch === null) {
if (this.writePipeline.length === 0) {
this.writeStream.markIdle();
if (batch === null) {
if (this.writePipeline.length === 0) {
this.writeStream.markIdle();
}
break;
} else {
lastBatchIdRetrieved = batch.batchId;
this.addToWritePipeline(batch);
}
} else {
this.addToWritePipeline(batch);
await this.fillWritePipeline();
} catch (e) {
await this.disableNetworkUntilRecovery(e);
}
}

Expand Down Expand Up @@ -649,7 +666,7 @@ export class RemoteStore implements TargetMetadataProvider {
}
}

private onMutationResult(
private async onMutationResult(
commitVersion: SnapshotVersion,
results: MutationResult[]
): Promise<void> {
Expand All @@ -661,11 +678,17 @@ export class RemoteStore implements TargetMetadataProvider {
);
const batch = this.writePipeline.shift()!;
const success = MutationBatchResult.from(batch, commitVersion, results);
return this.syncEngine.applySuccessfulWrite(success).then(() => {
// It's possible that with the completion of this mutation another
// slot has freed up.
return this.fillWritePipeline();
});
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It almost seems like this try/catch blurb could be its own method that take the operation to perform, tries it, and then passes it to disableNetworkUntilRecovery.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a small wrapper. It is only used twice, but the wrapper itself is one one line, which is kind of nice.

await this.syncEngine.applySuccessfulWrite(success);
} catch (e) {
await this.disableNetworkUntilRecovery(e, () =>
this.syncEngine.applySuccessfulWrite(success)
);
}

// It's possible that with the completion of this mutation another
// slot has freed up.
await this.fillWritePipeline();
}

private async onWriteStreamClose(error?: FirestoreError): Promise<void> {
Expand Down Expand Up @@ -705,13 +728,17 @@ export class RemoteStore implements TargetMetadataProvider {
// restart.
this.writeStream.inhibitBackoff();

return this.syncEngine
.rejectFailedWrite(batch.batchId, error)
.then(() => {
// It's possible that with the completion of this mutation
// another slot has freed up.
return this.fillWritePipeline();
});
try {
await this.syncEngine.rejectFailedWrite(batch.batchId, error);
} catch (e) {
await this.disableNetworkUntilRecovery(e, () =>
this.syncEngine.rejectFailedWrite(batch.batchId, error)
);
}

// It's possible that with the completion of this mutation
// another slot has freed up.
await this.fillWritePipeline();
} else {
// Transient error, just let the retry logic kick in.
}
Expand Down
109 changes: 109 additions & 0 deletions packages/firestore/test/unit/specs/recovery_spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,115 @@ describeSpec('Persistence Recovery', ['no-ios', 'no-android'], () => {
.expectEvents(query, { metadata: [doc1, doc3] });
});

specTest('Recovers when write acknowledgment cannot be persisted', [], () => {
return spec()
.userSets('collection/a', { v: 1 })
.userSets('collection/b', { v: 2 })
.userSets('collection/c', { v: 3 })
.writeAcks('collection/a', 1)
.failDatabaseTransactions('Acknowledge batch')
.writeAcks('collection/b', 2, { expectUserCallback: false })
.recoverDatabase()
.runTimer(TimerId.AsyncQueueRetry)
.expectUserCallbacks({ acknowledged: ['collection/b'] })
.writeAcks('collection/c', 1);
});

specTest('Recovers when write rejection cannot be persisted', [], () => {
return spec()
.userPatches('collection/a', { v: 1 })
.userPatches('collection/a', { v: 2 })
.userPatches('collection/c', { v: 3 })
.failWrite(
'collection/a',
new RpcError(Code.FAILED_PRECONDITION, 'Simulated test error')
)
.failDatabaseTransactions('Reject batch')
.failWrite(
'collection/b',
new RpcError(Code.FAILED_PRECONDITION, 'Simulated test error'),
{ expectUserCallback: false }
)
.recoverDatabase()
.runTimer(TimerId.AsyncQueueRetry)
.expectUserCallbacks({ rejected: ['collection/a'] })
.failWrite(
'collection/c',
new RpcError(Code.FAILED_PRECONDITION, 'Simulated test error')
);
});

specTest(
'Recovers when write acknowledgment cannot be persisted (with restart)',
['durable-persistence'],
() => {
// This test verifies the current behavior of the client, which is not
// ideal. Instead of resending the write to 'collection/b' (whose
// rejection failed with an IndexedDB failure), the client should drop the
// write.
return spec()
.userSets('collection/a', { v: 1 })
.userSets('collection/b', { v: 2 })
.userSets('collection/c', { v: 3 })
.writeAcks('collection/a', 1)
.failDatabaseTransactions('Acknowledge batch')
.writeAcks('collection/b', 2, {
expectUserCallback: false,
keepInQueue: true
})
.restart()
.expectNumOutstandingWrites(2)
.writeAcks('collection/b', 2, { expectUserCallback: false })
.writeAcks('collection/c', 3, { expectUserCallback: false });
}
);

specTest('Writes are pending until acknowledgement is persisted', [], () => {
const query = Query.atPath(path('collection'));
const doc1Local = doc(
'collection/a',
0,
{ v: 1 },
{ hasLocalMutations: true }
);
const doc1 = doc('collection/a', 1001, { v: 1 });
const doc2Local = doc(
'collection/b',
0,
{ v: 2 },
{ hasLocalMutations: true }
);
const doc2 = doc('collection/b', 1002, { v: 2 });
return (
spec()
.userListens(query)
.watchAcksFull(query, 1000)
.expectEvents(query, {})
.userSets('collection/a', { v: 1 })
.expectEvents(query, { added: [doc1Local], hasPendingWrites: true })
.userSets('collection/b', { v: 2 })
.expectEvents(query, { added: [doc2Local], hasPendingWrites: true })
.failDatabaseTransactions('Acknowledge batch')
.writeAcks('collection/a', 1, { expectUserCallback: false })
// The write ack cannot be persisted and the client goes offline, which
// clears all active targets, but doesn't raise a new snapshot since
// the document is still marked `hasPendingWrites`.
.expectEvents(query, { fromCache: true, hasPendingWrites: true })
.expectActiveTargets()
.recoverDatabase()
.runTimer(TimerId.AsyncQueueRetry)
// Client is back online
.expectActiveTargets({ query, resumeToken: 'resume-token-1000' })
.expectUserCallbacks({ acknowledged: ['collection/a'] })
.watchAcksFull(query, 1001, doc1)
.expectEvents(query, { metadata: [doc1], hasPendingWrites: true })
.writeAcks('collection/b', 2)
.watchSends({ affects: [query] }, doc2)
.watchSnapshots(1002)
.expectEvents(query, { metadata: [doc2] })
);
});

specTest(
'Surfaces local documents if notifyLocalViewChanges fails',
[],
Expand Down