Skip to content

Take WatchStream offline when IndexedDB is unavailable #3010

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 7, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 74 additions & 33 deletions packages/firestore/src/remote/remote_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ export class RemoteStore implements TargetMetadataProvider {

private isPrimary = false;

/**
* A barrier that prevents the restart of the network streams. Incremented for
* each IndexedDB failure until access becomes available again.
*/
private indexedDbFailureBarrier = 0;

private onlineStateTracker: OnlineStateTracker;

constructor(
Expand All @@ -132,7 +138,7 @@ export class RemoteStore implements TargetMetadataProvider {
private localStore: LocalStore,
/** The client-side proxy for interacting with the backend. */
private datastore: Datastore,
asyncQueue: AsyncQueue,
private asyncQueue: AsyncQueue,
onlineStateHandler: (onlineState: OnlineState) => void,
connectivityMonitor: ConnectivityMonitor
) {
Expand Down Expand Up @@ -184,9 +190,12 @@ export class RemoteStore implements TargetMetadataProvider {
}

/** Re-enables the network. Idempotent. */
async enableNetwork(): Promise<void> {
enableNetwork(): Promise<void> {
this.networkEnabled = true;
return this.enableNetworkInternal();
}

private async enableNetworkInternal(): Promise<void> {
if (this.canUseNetwork()) {
this.writeStream.lastStreamToken = await this.localStore.getLastStreamToken();

Expand Down Expand Up @@ -339,7 +348,11 @@ export class RemoteStore implements TargetMetadataProvider {
}

canUseNetwork(): boolean {
return this.isPrimary && this.networkEnabled;
return (
this.indexedDbFailureBarrier === 0 &&
this.isPrimary &&
this.networkEnabled
);
}

private cleanUpWatchStreamState(): void {
Expand Down Expand Up @@ -391,7 +404,13 @@ export class RemoteStore implements TargetMetadataProvider {
) {
// There was an error on a target, don't wait for a consistent snapshot
// to raise events
return this.handleTargetError(watchChange);
try {
await this.handleTargetError(watchChange);
} catch (e) {
logDebug(LOG_TAG, 'Failed to remove target: ' + e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we log the query associated with the target here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless we go out of our way, we don't have access to the query here. I logged the target ID instead. We should however already have this information from the Watch stream.

FWIW, I didn't add this logging to begin with since this is a debug log. The other logs were error logs in response to user behavior. They are always visible, and often without context.

await this.disableNetworkUntilRecovery(e);
}
return;
}

if (watchChange instanceof DocumentWatchChange) {
Expand All @@ -407,15 +426,43 @@ export class RemoteStore implements TargetMetadataProvider {
}

if (!snapshotVersion.isEqual(SnapshotVersion.min())) {
const lastRemoteSnapshotVersion = await this.localStore.getLastRemoteSnapshotVersion();
if (snapshotVersion.compareTo(lastRemoteSnapshotVersion) >= 0) {
// We have received a target change with a global snapshot if the snapshot
// version is not equal to SnapshotVersion.min().
await this.raiseWatchSnapshot(snapshotVersion);
try {
const lastRemoteSnapshotVersion = await this.localStore.getLastRemoteSnapshotVersion();
if (snapshotVersion.compareTo(lastRemoteSnapshotVersion) >= 0) {
// We have received a target change with a global snapshot if the snapshot
// version is not equal to SnapshotVersion.min().
await this.raiseWatchSnapshot(snapshotVersion);
}
} catch (e) {
logDebug(LOG_TAG, 'Failed to raise snapshot: ' + e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any identifying information about the snapshot that we can log here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if it will help, but I added the snapshot version.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The snapshot version probably isn't that useful, but the target ID would be. Anything to help associate this message with its context.

If the targetId isn't available going back to what you had before is probably better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this back to the original message. The target IDs are not easily available:

  • DocumentWatchChange has updatedTargetIds and removedTargetIds
  • ExistenceFilterChange has targetId
  • WatchTargetChange has targetIds

If I understand the flow correctly, we should always be in a WatchTargetChange to raise a snapshot, but I don't want to bake this assumption into the code for better logging. I can invest some time here, but the targets are already in the Watch response that we log right before.

await this.disableNetworkUntilRecovery(e);
}
}
}

/**
* Recovery logic for IndexedDB errors that takes the network offline until
* IndexedDb probing succeeds. Retries are scheduled with backoff using
* `enqueueRetryable()`.
*/
private async disableNetworkUntilRecovery(e: FirestoreError): Promise<void> {
if (e.name === 'IndexedDbTransactionError') {
++this.indexedDbFailureBarrier;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mechanism seems more complicated/fiddly than it needs to be.

If we get multiple watch change events in a row, this is going to bump this indexedDbFailureBarrier multiple times and enqueue multiple retry tasks. When IndexedDB becomes available, each of them is going to succeed, decrement the count, and try to enable the network but only the last call to enableNetworkInternal will succeed.

It seems like instead of keeping a count, we could instead keep a boolean like indexedDbFailed. Then in this method:

  • If not indexedDbFailed, set it and enqueue the retry task
  • If indexedDbFailed, just log the error but do nothing.
  • In the retry task, if getLastRemoteSnapshotVersion succeeds, flip indexedDbFailed back to false and enable the network.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You say "complicated/fiddly", I say "advanced" :)

We should only ever get one of these events, since we go offline right away and discard all further messages from the stream. I realize that this is a good argument for making this a boolean and updated the code accordingly.

await this.disableNetworkInternal();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

disableNetworkInternal does not change the onlineStateTracker, but it seems like while we've intentionally disabled the network we should mark ourselves offline, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will create some flicker, but it is the right thing to do. Updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to add the state transition here since we don't raise offline events when disableNetworkInternal is called from shutdown or for primary state transitions.

this.asyncQueue.enqueueRetryable(async () => {
logDebug(LOG_TAG, 'Retrying IndexedDB access');
// Issue a simple read operation to determine if IndexedDB recovered.
// Ideally, we would expose a health check directly on SimpleDb, but
// RemoteStore only has access to persistence through LocalStore.
await this.localStore.getLastRemoteSnapshotVersion();
--this.indexedDbFailureBarrier;
await this.enableNetworkInternal();
});
} else {
throw e;
}
}

/**
* Takes a batch of changes from the Datastore, repackages them as a
* RemoteEvent, and passes that on to the listener, which is typically the
Expand Down Expand Up @@ -486,21 +533,19 @@ export class RemoteStore implements TargetMetadataProvider {
}

/** Handles an error on a target */
private handleTargetError(watchChange: WatchTargetChange): Promise<void> {
private async handleTargetError(
watchChange: WatchTargetChange
): Promise<void> {
debugAssert(!!watchChange.cause, 'Handling target error without a cause');
const error = watchChange.cause!;
let promiseChain = Promise.resolve();
watchChange.targetIds.forEach(targetId => {
promiseChain = promiseChain.then(async () => {
// A watched target might have been removed already.
if (this.listenTargets.has(targetId)) {
this.listenTargets.delete(targetId);
this.watchChangeAggregator!.removeTarget(targetId);
return this.syncEngine.rejectListen(targetId, error);
}
});
});
return promiseChain;
for (const targetId of watchChange.targetIds) {
// A watched target might have been removed already.
if (this.listenTargets.has(targetId)) {
await this.syncEngine.rejectListen(targetId, error);
this.listenTargets.delete(targetId);
this.watchChangeAggregator!.removeTarget(targetId);
}
}
}

/**
Expand Down Expand Up @@ -637,25 +682,21 @@ export class RemoteStore implements TargetMetadataProvider {
// If the write stream closed due to an error, invoke the error callbacks if
// there are pending writes.
if (error && this.writePipeline.length > 0) {
// A promise that is resolved after we processed the error
let errorHandling: Promise<void>;
if (this.writeStream.handshakeComplete) {
// This error affects the actual write.
errorHandling = this.handleWriteError(error!);
await this.handleWriteError(error!);
} else {
// If there was an error before the handshake has finished, it's
// possible that the server is unable to process the stream token
// we're sending. (Perhaps it's too old?)
errorHandling = this.handleHandshakeError(error!);
await this.handleHandshakeError(error!);
}

return errorHandling.then(() => {
// The write stream might have been started by refilling the write
// pipeline for failed writes
if (this.shouldStartWriteStream()) {
this.startWriteStream();
}
});
// The write stream might have been started by refilling the write
// pipeline for failed writes
if (this.shouldStartWriteStream()) {
this.startWriteStream();
}
}
// No pending writes, nothing to do
}
Expand Down
76 changes: 76 additions & 0 deletions packages/firestore/test/unit/specs/recovery_spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { TimerId } from '../../../src/util/async_queue';
import { Query } from '../../../src/core/query';
import { Code } from '../../../src/util/error';
import { doc, path } from '../../util/helpers';
import { RpcError } from './spec_rpc_error';

describeSpec('Persistence Recovery', ['no-ios', 'no-android'], () => {
specTest(
Expand Down Expand Up @@ -196,4 +197,79 @@ describeSpec('Persistence Recovery', ['no-ios', 'no-android'], () => {
.watchAcksFull(query2, 1)
.expectEvents(query2, {});
});

specTest(
'Recovers when watch update cannot be persisted',
['durable-persistence'],
() => {
const query = Query.atPath(path('collection'));
const doc1 = doc('collection/key1', 1000, { foo: 'a' });
const doc2 = doc('collection/key2', 2000, { foo: 'b' });
return spec()
.userListens(query)
.watchAcksFull(query, 1000, doc1)
.expectEvents(query, {
added: [doc1]
})
.watchSends({ affects: [query] }, doc2)
.failDatabase()
.watchSnapshots(1500)
// `failDatabase()` causes us to go offline.
.expectActiveTargets()
.recoverDatabase()
.runTimer(TimerId.AsyncQueueRetry)
.expectActiveTargets({ query, resumeToken: 'resume-token-1000' })
.watchAcksFull(query, 2000, doc2)
.expectEvents(query, {
added: [doc2]
});
}
);

specTest(
'Recovers when watch rejection cannot be persisted',
['durable-persistence'],
() => {
const doc1Query = Query.atPath(path('collection/key1'));
const doc2Query = Query.atPath(path('collection/key2'));
const doc1a = doc('collection/key1', 1000, { foo: 'a' });
const doc1b = doc('collection/key1', 4000, { foo: 'a', updated: true });
const doc2 = doc('collection/key2', 2000, { foo: 'b' });
return spec()
.userListens(doc1Query)
.watchAcksFull(doc1Query, 1000, doc1a)
.expectEvents(doc1Query, {
added: [doc1a]
})
.userListens(doc2Query)
.watchAcksFull(doc2Query, 2000, doc2)
.expectEvents(doc2Query, {
added: [doc2]
})
.failDatabase()
.watchRemoves(
doc1Query,
new RpcError(Code.PERMISSION_DENIED, 'Simulated target error')
)
// `failDatabase()` causes us to go offline.
.expectActiveTargets()
.recoverDatabase()
.runTimer(TimerId.AsyncQueueRetry)
.expectActiveTargets(
{ query: doc1Query, resumeToken: 'resume-token-1000' },
{ query: doc2Query, resumeToken: 'resume-token-2000' }
)
.watchAcksFull(doc1Query, 3000)
.watchRemoves(
doc2Query,
new RpcError(Code.PERMISSION_DENIED, 'Simulated target error')
)
.expectEvents(doc2Query, { errorCode: Code.PERMISSION_DENIED })
.watchSends({ affects: [doc1Query] }, doc1b)
.watchSnapshots(4000)
.expectEvents(doc1Query, {
modified: [doc1b]
});
}
);
});