Skip to content

Take WatchStream offline when IndexedDB is unavailable #3010

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 85 additions & 33 deletions packages/firestore/src/remote/remote_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ export class RemoteStore implements TargetMetadataProvider {

private isPrimary = false;

/**
* When set to `true`, the network was taken offline due to an IndexedDB
* failure. The state is flipped to `false` when access becomes available
* again.
*/
private indexedDbFailed = false;

private onlineStateTracker: OnlineStateTracker;

constructor(
Expand All @@ -132,7 +139,7 @@ export class RemoteStore implements TargetMetadataProvider {
private localStore: LocalStore,
/** The client-side proxy for interacting with the backend. */
private datastore: Datastore,
asyncQueue: AsyncQueue,
private asyncQueue: AsyncQueue,
onlineStateHandler: (onlineState: OnlineState) => void,
connectivityMonitor: ConnectivityMonitor
) {
Expand Down Expand Up @@ -184,9 +191,12 @@ export class RemoteStore implements TargetMetadataProvider {
}

/** Re-enables the network. Idempotent. */
async enableNetwork(): Promise<void> {
enableNetwork(): Promise<void> {
this.networkEnabled = true;
return this.enableNetworkInternal();
}

private async enableNetworkInternal(): Promise<void> {
if (this.canUseNetwork()) {
this.writeStream.lastStreamToken = await this.localStore.getLastStreamToken();

Expand Down Expand Up @@ -339,7 +349,7 @@ export class RemoteStore implements TargetMetadataProvider {
}

canUseNetwork(): boolean {
return this.isPrimary && this.networkEnabled;
return !this.indexedDbFailed && this.isPrimary && this.networkEnabled;
}

private cleanUpWatchStreamState(): void {
Expand Down Expand Up @@ -391,7 +401,18 @@ export class RemoteStore implements TargetMetadataProvider {
) {
// There was an error on a target, don't wait for a consistent snapshot
// to raise events
return this.handleTargetError(watchChange);
try {
await this.handleTargetError(watchChange);
} catch (e) {
logDebug(
LOG_TAG,
'Failed to remove targets %s: %s ',
watchChange.targetIds.join(','),
e
);
await this.disableNetworkUntilRecovery(e);
}
return;
}

if (watchChange instanceof DocumentWatchChange) {
Expand All @@ -407,15 +428,52 @@ export class RemoteStore implements TargetMetadataProvider {
}

if (!snapshotVersion.isEqual(SnapshotVersion.min())) {
const lastRemoteSnapshotVersion = await this.localStore.getLastRemoteSnapshotVersion();
if (snapshotVersion.compareTo(lastRemoteSnapshotVersion) >= 0) {
// We have received a target change with a global snapshot if the snapshot
// version is not equal to SnapshotVersion.min().
await this.raiseWatchSnapshot(snapshotVersion);
try {
const lastRemoteSnapshotVersion = await this.localStore.getLastRemoteSnapshotVersion();
if (snapshotVersion.compareTo(lastRemoteSnapshotVersion) >= 0) {
// We have received a target change with a global snapshot if the snapshot
// version is not equal to SnapshotVersion.min().
await this.raiseWatchSnapshot(snapshotVersion);
}
} catch (e) {
logDebug(LOG_TAG, 'Failed to raise snapshot:', e);
await this.disableNetworkUntilRecovery(e);
}
}
}

/**
* Recovery logic for IndexedDB errors that takes the network offline until
* IndexedDb probing succeeds. Retries are scheduled with backoff using
* `enqueueRetryable()`.
*/
private async disableNetworkUntilRecovery(e: FirestoreError): Promise<void> {
if (e.name === 'IndexedDbTransactionError') {
debugAssert(
!this.indexedDbFailed,
'Unexpected network event when IndexedDB was marked failed.'
);
this.indexedDbFailed = true;

// Disable network and raise offline snapshots
await this.disableNetworkInternal();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

disableNetworkInternal does not change the onlineStateTracker, but it seems like while we've intentionally disabled the network we should mark ourselves offline, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will create some flicker, but it is the right thing to do. Updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to add the state transition here since we don't raise offline events when disableNetworkInternal is called from shutdown or for primary state transitions.

this.onlineStateTracker.set(OnlineState.Offline);

// Probe IndexedDB periodically and re-enable network
this.asyncQueue.enqueueRetryable(async () => {
logDebug(LOG_TAG, 'Retrying IndexedDB access');
// Issue a simple read operation to determine if IndexedDB recovered.
// Ideally, we would expose a health check directly on SimpleDb, but
// RemoteStore only has access to persistence through LocalStore.
await this.localStore.getLastRemoteSnapshotVersion();
this.indexedDbFailed = false;
await this.enableNetworkInternal();
});
} else {
throw e;
}
}

/**
* Takes a batch of changes from the Datastore, repackages them as a
* RemoteEvent, and passes that on to the listener, which is typically the
Expand Down Expand Up @@ -486,21 +544,19 @@ export class RemoteStore implements TargetMetadataProvider {
}

/** Handles an error on a target */
private handleTargetError(watchChange: WatchTargetChange): Promise<void> {
private async handleTargetError(
watchChange: WatchTargetChange
): Promise<void> {
debugAssert(!!watchChange.cause, 'Handling target error without a cause');
const error = watchChange.cause!;
let promiseChain = Promise.resolve();
watchChange.targetIds.forEach(targetId => {
promiseChain = promiseChain.then(async () => {
// A watched target might have been removed already.
if (this.listenTargets.has(targetId)) {
this.listenTargets.delete(targetId);
this.watchChangeAggregator!.removeTarget(targetId);
return this.syncEngine.rejectListen(targetId, error);
}
});
});
return promiseChain;
for (const targetId of watchChange.targetIds) {
// A watched target might have been removed already.
if (this.listenTargets.has(targetId)) {
await this.syncEngine.rejectListen(targetId, error);
this.listenTargets.delete(targetId);
this.watchChangeAggregator!.removeTarget(targetId);
}
}
}

/**
Expand Down Expand Up @@ -637,25 +693,21 @@ export class RemoteStore implements TargetMetadataProvider {
// If the write stream closed due to an error, invoke the error callbacks if
// there are pending writes.
if (error && this.writePipeline.length > 0) {
// A promise that is resolved after we processed the error
let errorHandling: Promise<void>;
if (this.writeStream.handshakeComplete) {
// This error affects the actual write.
errorHandling = this.handleWriteError(error!);
await this.handleWriteError(error!);
} else {
// If there was an error before the handshake has finished, it's
// possible that the server is unable to process the stream token
// we're sending. (Perhaps it's too old?)
errorHandling = this.handleHandshakeError(error!);
await this.handleHandshakeError(error!);
}

return errorHandling.then(() => {
// The write stream might have been started by refilling the write
// pipeline for failed writes
if (this.shouldStartWriteStream()) {
this.startWriteStream();
}
});
// The write stream might have been started by refilling the write
// pipeline for failed writes
if (this.shouldStartWriteStream()) {
this.startWriteStream();
}
}
// No pending writes, nothing to do
}
Expand Down
84 changes: 84 additions & 0 deletions packages/firestore/test/unit/specs/recovery_spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { TimerId } from '../../../src/util/async_queue';
import { Query } from '../../../src/core/query';
import { Code } from '../../../src/util/error';
import { doc, path } from '../../util/helpers';
import { RpcError } from './spec_rpc_error';

describeSpec('Persistence Recovery', ['no-ios', 'no-android'], () => {
specTest(
Expand Down Expand Up @@ -196,4 +197,87 @@ describeSpec('Persistence Recovery', ['no-ios', 'no-android'], () => {
.watchAcksFull(query2, 1)
.expectEvents(query2, {});
});

specTest(
'Recovers when watch update cannot be persisted',
['durable-persistence'],
() => {
const query = Query.atPath(path('collection'));
const doc1 = doc('collection/key1', 1000, { foo: 'a' });
const doc2 = doc('collection/key2', 2000, { foo: 'b' });
return (
spec()
.userListens(query)
.watchAcksFull(query, 1000, doc1)
.expectEvents(query, {
added: [doc1]
})
.watchSends({ affects: [query] }, doc2)
.failDatabase()
.watchSnapshots(1500)
// `failDatabase()` causes us to go offline.
.expectActiveTargets()
.expectEvents(query, { fromCache: true })
.recoverDatabase()
.runTimer(TimerId.AsyncQueueRetry)
.expectActiveTargets({ query, resumeToken: 'resume-token-1000' })
.watchAcksFull(query, 2000, doc2)
.expectEvents(query, {
added: [doc2]
})
);
}
);

specTest(
'Recovers when watch rejection cannot be persisted',
['durable-persistence'],
() => {
const doc1Query = Query.atPath(path('collection/key1'));
const doc2Query = Query.atPath(path('collection/key2'));
const doc1a = doc('collection/key1', 1000, { foo: 'a' });
const doc1b = doc('collection/key1', 4000, { foo: 'a', updated: true });
const doc2 = doc('collection/key2', 2000, { foo: 'b' });
return (
spec()
.userListens(doc1Query)
.watchAcksFull(doc1Query, 1000, doc1a)
.expectEvents(doc1Query, {
added: [doc1a]
})
.userListens(doc2Query)
.watchAcksFull(doc2Query, 2000, doc2)
.expectEvents(doc2Query, {
added: [doc2]
})
.failDatabase()
.watchRemoves(
doc1Query,
new RpcError(Code.PERMISSION_DENIED, 'Simulated target error')
)
// `failDatabase()` causes us to go offline.
.expectActiveTargets()
.expectEvents(doc1Query, { fromCache: true })
.expectEvents(doc2Query, { fromCache: true })
.recoverDatabase()
.runTimer(TimerId.AsyncQueueRetry)
.expectActiveTargets(
{ query: doc1Query, resumeToken: 'resume-token-1000' },
{ query: doc2Query, resumeToken: 'resume-token-2000' }
)
.watchAcksFull(doc1Query, 3000)
.expectEvents(doc1Query, {})
.watchRemoves(
doc2Query,
new RpcError(Code.PERMISSION_DENIED, 'Simulated target error')
)
.expectEvents(doc2Query, { errorCode: Code.PERMISSION_DENIED })
.watchSends({ affects: [doc1Query] }, doc1b)
.watchSnapshots(4000)
.expectEvents(doc1Query, {
modified: [doc1b]
})
);
}
);
});