Skip to content

Commit f9c6dde

Browse files
Take WatchStream offline when IndexedDB is unavailable (#3010)
1 parent 631a0ec commit f9c6dde

File tree

2 files changed

+169
-33
lines changed

2 files changed

+169
-33
lines changed

packages/firestore/src/remote/remote_store.ts

+85-33
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,13 @@ export class RemoteStore implements TargetMetadataProvider {
123123

124124
private isPrimary = false;
125125

126+
/**
127+
* When set to `true`, the network was taken offline due to an IndexedDB
128+
* failure. The state is flipped to `false` when access becomes available
129+
* again.
130+
*/
131+
private indexedDbFailed = false;
132+
126133
private onlineStateTracker: OnlineStateTracker;
127134

128135
constructor(
@@ -132,7 +139,7 @@ export class RemoteStore implements TargetMetadataProvider {
132139
private localStore: LocalStore,
133140
/** The client-side proxy for interacting with the backend. */
134141
private datastore: Datastore,
135-
asyncQueue: AsyncQueue,
142+
private asyncQueue: AsyncQueue,
136143
onlineStateHandler: (onlineState: OnlineState) => void,
137144
connectivityMonitor: ConnectivityMonitor
138145
) {
@@ -184,9 +191,12 @@ export class RemoteStore implements TargetMetadataProvider {
184191
}
185192

186193
/** Re-enables the network. Idempotent. */
187-
async enableNetwork(): Promise<void> {
194+
enableNetwork(): Promise<void> {
188195
this.networkEnabled = true;
196+
return this.enableNetworkInternal();
197+
}
189198

199+
private async enableNetworkInternal(): Promise<void> {
190200
if (this.canUseNetwork()) {
191201
this.writeStream.lastStreamToken = await this.localStore.getLastStreamToken();
192202

@@ -339,7 +349,7 @@ export class RemoteStore implements TargetMetadataProvider {
339349
}
340350

341351
canUseNetwork(): boolean {
342-
return this.isPrimary && this.networkEnabled;
352+
return !this.indexedDbFailed && this.isPrimary && this.networkEnabled;
343353
}
344354

345355
private cleanUpWatchStreamState(): void {
@@ -391,7 +401,18 @@ export class RemoteStore implements TargetMetadataProvider {
391401
) {
392402
// There was an error on a target, don't wait for a consistent snapshot
393403
// to raise events
394-
return this.handleTargetError(watchChange);
404+
try {
405+
await this.handleTargetError(watchChange);
406+
} catch (e) {
407+
logDebug(
408+
LOG_TAG,
409+
'Failed to remove targets %s: %s ',
410+
watchChange.targetIds.join(','),
411+
e
412+
);
413+
await this.disableNetworkUntilRecovery(e);
414+
}
415+
return;
395416
}
396417

397418
if (watchChange instanceof DocumentWatchChange) {
@@ -407,15 +428,52 @@ export class RemoteStore implements TargetMetadataProvider {
407428
}
408429

409430
if (!snapshotVersion.isEqual(SnapshotVersion.min())) {
410-
const lastRemoteSnapshotVersion = await this.localStore.getLastRemoteSnapshotVersion();
411-
if (snapshotVersion.compareTo(lastRemoteSnapshotVersion) >= 0) {
412-
// We have received a target change with a global snapshot if the snapshot
413-
// version is not equal to SnapshotVersion.min().
414-
await this.raiseWatchSnapshot(snapshotVersion);
431+
try {
432+
const lastRemoteSnapshotVersion = await this.localStore.getLastRemoteSnapshotVersion();
433+
if (snapshotVersion.compareTo(lastRemoteSnapshotVersion) >= 0) {
434+
// We have received a target change with a global snapshot if the snapshot
435+
// version is not equal to SnapshotVersion.min().
436+
await this.raiseWatchSnapshot(snapshotVersion);
437+
}
438+
} catch (e) {
439+
logDebug(LOG_TAG, 'Failed to raise snapshot:', e);
440+
await this.disableNetworkUntilRecovery(e);
415441
}
416442
}
417443
}
418444

445+
/**
446+
* Recovery logic for IndexedDB errors that takes the network offline until
447+
* IndexedDb probing succeeds. Retries are scheduled with backoff using
448+
* `enqueueRetryable()`.
449+
*/
450+
private async disableNetworkUntilRecovery(e: FirestoreError): Promise<void> {
451+
if (e.name === 'IndexedDbTransactionError') {
452+
debugAssert(
453+
!this.indexedDbFailed,
454+
'Unexpected network event when IndexedDB was marked failed.'
455+
);
456+
this.indexedDbFailed = true;
457+
458+
// Disable network and raise offline snapshots
459+
await this.disableNetworkInternal();
460+
this.onlineStateTracker.set(OnlineState.Offline);
461+
462+
// Probe IndexedDB periodically and re-enable network
463+
this.asyncQueue.enqueueRetryable(async () => {
464+
logDebug(LOG_TAG, 'Retrying IndexedDB access');
465+
// Issue a simple read operation to determine if IndexedDB recovered.
466+
// Ideally, we would expose a health check directly on SimpleDb, but
467+
// RemoteStore only has access to persistence through LocalStore.
468+
await this.localStore.getLastRemoteSnapshotVersion();
469+
this.indexedDbFailed = false;
470+
await this.enableNetworkInternal();
471+
});
472+
} else {
473+
throw e;
474+
}
475+
}
476+
419477
/**
420478
* Takes a batch of changes from the Datastore, repackages them as a
421479
* RemoteEvent, and passes that on to the listener, which is typically the
@@ -486,21 +544,19 @@ export class RemoteStore implements TargetMetadataProvider {
486544
}
487545

488546
/** Handles an error on a target */
489-
private handleTargetError(watchChange: WatchTargetChange): Promise<void> {
547+
private async handleTargetError(
548+
watchChange: WatchTargetChange
549+
): Promise<void> {
490550
debugAssert(!!watchChange.cause, 'Handling target error without a cause');
491551
const error = watchChange.cause!;
492-
let promiseChain = Promise.resolve();
493-
watchChange.targetIds.forEach(targetId => {
494-
promiseChain = promiseChain.then(async () => {
495-
// A watched target might have been removed already.
496-
if (this.listenTargets.has(targetId)) {
497-
this.listenTargets.delete(targetId);
498-
this.watchChangeAggregator!.removeTarget(targetId);
499-
return this.syncEngine.rejectListen(targetId, error);
500-
}
501-
});
502-
});
503-
return promiseChain;
552+
for (const targetId of watchChange.targetIds) {
553+
// A watched target might have been removed already.
554+
if (this.listenTargets.has(targetId)) {
555+
await this.syncEngine.rejectListen(targetId, error);
556+
this.listenTargets.delete(targetId);
557+
this.watchChangeAggregator!.removeTarget(targetId);
558+
}
559+
}
504560
}
505561

506562
/**
@@ -637,25 +693,21 @@ export class RemoteStore implements TargetMetadataProvider {
637693
// If the write stream closed due to an error, invoke the error callbacks if
638694
// there are pending writes.
639695
if (error && this.writePipeline.length > 0) {
640-
// A promise that is resolved after we processed the error
641-
let errorHandling: Promise<void>;
642696
if (this.writeStream.handshakeComplete) {
643697
// This error affects the actual write.
644-
errorHandling = this.handleWriteError(error!);
698+
await this.handleWriteError(error!);
645699
} else {
646700
// If there was an error before the handshake has finished, it's
647701
// possible that the server is unable to process the stream token
648702
// we're sending. (Perhaps it's too old?)
649-
errorHandling = this.handleHandshakeError(error!);
703+
await this.handleHandshakeError(error!);
650704
}
651705

652-
return errorHandling.then(() => {
653-
// The write stream might have been started by refilling the write
654-
// pipeline for failed writes
655-
if (this.shouldStartWriteStream()) {
656-
this.startWriteStream();
657-
}
658-
});
706+
// The write stream might have been started by refilling the write
707+
// pipeline for failed writes
708+
if (this.shouldStartWriteStream()) {
709+
this.startWriteStream();
710+
}
659711
}
660712
// No pending writes, nothing to do
661713
}

packages/firestore/test/unit/specs/recovery_spec.test.ts

+84
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { TimerId } from '../../../src/util/async_queue';
2121
import { Query } from '../../../src/core/query';
2222
import { Code } from '../../../src/util/error';
2323
import { doc, path } from '../../util/helpers';
24+
import { RpcError } from './spec_rpc_error';
2425

2526
describeSpec('Persistence Recovery', ['no-ios', 'no-android'], () => {
2627
specTest(
@@ -196,4 +197,87 @@ describeSpec('Persistence Recovery', ['no-ios', 'no-android'], () => {
196197
.watchAcksFull(query2, 1)
197198
.expectEvents(query2, {});
198199
});
200+
201+
specTest(
202+
'Recovers when watch update cannot be persisted',
203+
['durable-persistence'],
204+
() => {
205+
const query = Query.atPath(path('collection'));
206+
const doc1 = doc('collection/key1', 1000, { foo: 'a' });
207+
const doc2 = doc('collection/key2', 2000, { foo: 'b' });
208+
return (
209+
spec()
210+
.userListens(query)
211+
.watchAcksFull(query, 1000, doc1)
212+
.expectEvents(query, {
213+
added: [doc1]
214+
})
215+
.watchSends({ affects: [query] }, doc2)
216+
.failDatabase()
217+
.watchSnapshots(1500)
218+
// `failDatabase()` causes us to go offline.
219+
.expectActiveTargets()
220+
.expectEvents(query, { fromCache: true })
221+
.recoverDatabase()
222+
.runTimer(TimerId.AsyncQueueRetry)
223+
.expectActiveTargets({ query, resumeToken: 'resume-token-1000' })
224+
.watchAcksFull(query, 2000, doc2)
225+
.expectEvents(query, {
226+
added: [doc2]
227+
})
228+
);
229+
}
230+
);
231+
232+
specTest(
233+
'Recovers when watch rejection cannot be persisted',
234+
['durable-persistence'],
235+
() => {
236+
const doc1Query = Query.atPath(path('collection/key1'));
237+
const doc2Query = Query.atPath(path('collection/key2'));
238+
const doc1a = doc('collection/key1', 1000, { foo: 'a' });
239+
const doc1b = doc('collection/key1', 4000, { foo: 'a', updated: true });
240+
const doc2 = doc('collection/key2', 2000, { foo: 'b' });
241+
return (
242+
spec()
243+
.userListens(doc1Query)
244+
.watchAcksFull(doc1Query, 1000, doc1a)
245+
.expectEvents(doc1Query, {
246+
added: [doc1a]
247+
})
248+
.userListens(doc2Query)
249+
.watchAcksFull(doc2Query, 2000, doc2)
250+
.expectEvents(doc2Query, {
251+
added: [doc2]
252+
})
253+
.failDatabase()
254+
.watchRemoves(
255+
doc1Query,
256+
new RpcError(Code.PERMISSION_DENIED, 'Simulated target error')
257+
)
258+
// `failDatabase()` causes us to go offline.
259+
.expectActiveTargets()
260+
.expectEvents(doc1Query, { fromCache: true })
261+
.expectEvents(doc2Query, { fromCache: true })
262+
.recoverDatabase()
263+
.runTimer(TimerId.AsyncQueueRetry)
264+
.expectActiveTargets(
265+
{ query: doc1Query, resumeToken: 'resume-token-1000' },
266+
{ query: doc2Query, resumeToken: 'resume-token-2000' }
267+
)
268+
.watchAcksFull(doc1Query, 3000)
269+
.expectEvents(doc1Query, {})
270+
.watchRemoves(
271+
doc2Query,
272+
new RpcError(Code.PERMISSION_DENIED, 'Simulated target error')
273+
)
274+
.expectEvents(doc2Query, { errorCode: Code.PERMISSION_DENIED })
275+
.watchSends({ affects: [doc1Query] }, doc1b)
276+
.watchSnapshots(4000)
277+
.expectEvents(doc1Query, {
278+
modified: [doc1b]
279+
})
280+
);
281+
}
282+
);
199283
});

0 commit comments

Comments
 (0)