Skip to content

Commit 56c176b

Browse files
Retry WebStorage operations
1 parent 5a63738 commit 56c176b

File tree

10 files changed

+370
-71
lines changed

10 files changed

+370
-71
lines changed

packages/firestore/src/local/shared_client_state.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -755,7 +755,7 @@ export class WebStorageSharedClientState implements SharedClientState {
755755
return;
756756
}
757757

758-
this.queue.enqueueAndForget(async () => {
758+
this.queue.enqueueRetryable(async () => {
759759
if (!this.started) {
760760
this.earlyEvents.push(event);
761761
return;

packages/firestore/src/platform_node/grpc_connection.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
/**
32
* @license
43
* Copyright 2017 Google LLC

packages/firestore/src/remote/backoff.ts

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -119,24 +119,28 @@ export class ExponentialBackoff {
119119
desiredDelayWithJitterMs - delaySoFarMs
120120
);
121121

122-
if (this.currentBaseMs > 0) {
122+
if (remainingDelayMs > 0) {
123123
logDebug(
124124
LOG_TAG,
125125
`Backing off for ${remainingDelayMs} ms ` +
126126
`(base delay: ${this.currentBaseMs} ms, ` +
127127
`delay with jitter: ${desiredDelayWithJitterMs} ms, ` +
128128
`last attempt: ${delaySoFarMs} ms ago)`
129129
);
130-
}
131-
132-
this.timerPromise = this.queue.enqueueAfterDelay(
133-
this.timerId,
134-
remainingDelayMs,
135-
() => {
130+
this.timerPromise = this.queue.enqueueAfterDelay(
131+
this.timerId,
132+
remainingDelayMs,
133+
() => {
134+
this.lastAttemptTime = Date.now();
135+
return op();
136+
}
137+
);
138+
} else {
139+
this.queue.enqueueAndForget(() => {
136140
this.lastAttemptTime = Date.now();
137141
return op();
138-
}
139-
);
142+
});
143+
}
140144

141145
// Apply backoff factor to determine next delay and ensure it is within
142146
// bounds.

packages/firestore/src/util/async_queue.ts

Lines changed: 74 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,12 @@
1717

1818
import { assert, fail } from './assert';
1919
import { Code, FirestoreError } from './error';
20-
import { logError } from './log';
20+
import { logDebug, logError } from './log';
2121
import { CancelablePromise, Deferred } from './promise';
22+
import { ExponentialBackoff } from '../remote/backoff';
23+
import { PlatformSupport } from '../platform/platform';
24+
25+
const LOG_TAG = 'AsyncQueue';
2226

2327
// Accept any return type from setTimeout().
2428
// eslint-disable-next-line @typescript-eslint/no-explicit-any
@@ -66,7 +70,13 @@ export const enum TimerId {
6670
* A timer used to retry transactions. Since there can be multiple concurrent
6771
* transactions, multiple of these may be in the queue at a given time.
6872
*/
69-
RetryTransaction = 'retry_transaction'
73+
RetryTransaction = 'retry_transaction',
74+
75+
/**
76+
* A timer used to retry operations scheduled via retryable AsyncQueue
77+
* operations.
78+
*/
79+
AsyncQueueRetry = 'async_queue_retry'
7080
}
7181

7282
/**
@@ -213,6 +223,29 @@ export class AsyncQueue {
213223
// List of TimerIds to fast-forward delays for.
214224
private timerIdsToSkip: TimerId[] = [];
215225

226+
// Backoff timer used for retryable operations
227+
private backoff = new ExponentialBackoff(this, TimerId.AsyncQueueRetry);
228+
229+
// If set, points to the first retryable operation. The first retryable
230+
// operation is the only operation that is retried with backoff. If this
231+
// operation suceeds, all other retryable operations are run right away.
232+
private firstRetryableOperation?: Promise<void>;
233+
234+
// Visibility handler that triggers an immediate retry of all retryable
235+
// operations. Meant to speed up recovery when we regain file system access
236+
// after page comes into foreground.
237+
private visibilityHandler = (): void => {
238+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
239+
this.runDelayedOperationsEarly(TimerId.AsyncQueueRetry);
240+
};
241+
242+
constructor() {
243+
const window = PlatformSupport.getPlatform().window;
244+
if (window) {
245+
window.addEventListener('visibilitychange', this.visibilityHandler);
246+
}
247+
}
248+
216249
// Is this AsyncQueue being shut down? If true, this instance will not enqueue
217250
// any new operations, Promises from enqueue requests will not resolve.
218251
get isShuttingDown(): boolean {
@@ -262,6 +295,10 @@ export class AsyncQueue {
262295
this.verifyNotFailed();
263296
if (!this._isShuttingDown) {
264297
this._isShuttingDown = true;
298+
const window = PlatformSupport.getPlatform().window;
299+
if (window) {
300+
window.removeEventListener('visibilitychange', this.visibilityHandler);
301+
}
265302
await this.enqueueEvenAfterShutdown(op);
266303
}
267304
}
@@ -279,6 +316,41 @@ export class AsyncQueue {
279316
return this.enqueueInternal(op);
280317
}
281318

319+
enqueueRetryable(op: () => Promise<void>): void {
320+
this.verifyNotFailed();
321+
322+
if (this._isShuttingDown) {
323+
return;
324+
}
325+
326+
if (this.firstRetryableOperation) {
327+
// If there is already a retryable operation, enqueue the current
328+
// operation right after. We want to successfully run the first retryable
329+
// operation before running all others.
330+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
331+
this.firstRetryableOperation.then(() => {
332+
this.enqueueRetryable(op);
333+
});
334+
} else {
335+
const deferred = new Deferred<void>();
336+
this.firstRetryableOperation = deferred.promise;
337+
338+
const retryingOp = async (): Promise<void> => {
339+
try {
340+
await op();
341+
deferred.resolve();
342+
this.backoff.reset();
343+
this.firstRetryableOperation = undefined;
344+
} catch (e) {
345+
logDebug(LOG_TAG, 'Retryable operation failed: ' + e.message);
346+
this.backoff.backoffAndRun(retryingOp);
347+
}
348+
};
349+
350+
this.backoff.backoffAndRun(retryingOp);
351+
}
352+
}
353+
282354
private enqueueInternal<T extends unknown>(op: () => Promise<T>): Promise<T> {
283355
const newTail = this.tail.then(() => {
284356
this.operationInProgress = true;
@@ -399,12 +471,6 @@ export class AsyncQueue {
399471
runDelayedOperationsEarly(lastTimerId: TimerId): Promise<void> {
400472
// Note that draining may generate more delayed ops, so we do that first.
401473
return this.drain().then(() => {
402-
assert(
403-
lastTimerId === TimerId.All ||
404-
this.containsDelayedOperation(lastTimerId),
405-
`Attempted to drain to missing operation ${lastTimerId}`
406-
);
407-
408474
// Run ops in the same order they'd run if they ran naturally.
409475
this.delayedOperations.sort((a, b) => a.targetTimeMs - b.targetTimeMs);
410476

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/**
2+
* @license
3+
* Copyright 2020 Google LLC
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
import { describeSpec, specTest } from './describe_spec';
19+
import { client } from './spec_builder';
20+
import { TimerId } from '../../../src/util/async_queue';
21+
import { Query } from '../../../src/core/query';
22+
import { path } from '../../util/helpers';
23+
24+
describeSpec(
25+
'Persistence Recovery',
26+
['durable-persistence', 'no-ios', 'no-android'],
27+
() => {
28+
specTest(
29+
'Write is acknowledged by primary client (with recovery)',
30+
['multi-client'],
31+
() => {
32+
return client(0)
33+
.expectPrimaryState(true)
34+
.client(1)
35+
.expectPrimaryState(false)
36+
.userSets('collection/a', { v: 1 })
37+
.failDatabase()
38+
.client(0)
39+
.writeAcks('collection/a', 1, { expectUserCallback: false })
40+
.client(1)
41+
.recoverDatabase()
42+
.runTimer(TimerId.AsyncQueueRetry)
43+
.expectUserCallbacks({
44+
acknowledged: ['collection/a']
45+
});
46+
}
47+
);
48+
49+
specTest(
50+
'Query raises events in secondary client (with recovery)',
51+
['multi-client'],
52+
() => {
53+
const query = Query.atPath(path('collection'));
54+
55+
return client(0)
56+
.expectPrimaryState(true)
57+
.client(1)
58+
.expectPrimaryState(false)
59+
.userListens(query)
60+
.failDatabase()
61+
.client(0)
62+
.expectListen(query)
63+
.watchAcksFull(query, 1000)
64+
.client(1)
65+
.recoverDatabase()
66+
.runTimer(TimerId.AsyncQueueRetry)
67+
.expectEvents(query, {});
68+
}
69+
);
70+
}
71+
);

packages/firestore/test/unit/specs/remote_store_spec.test.ts

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -84,36 +84,4 @@ describeSpec('Remote store:', [], () => {
8484
.expectEvents(query, { added: [doc1] })
8585
);
8686
});
87-
88-
// TODO(b/72313632): This test is web-only because the Android / iOS spec
89-
// tests exclude backoff entirely.
90-
specTest(
91-
'Handles user changes while offline (b/74749605).',
92-
['no-android', 'no-ios'],
93-
() => {
94-
const query = Query.atPath(path('collection'));
95-
return (
96-
spec()
97-
.userListens(query)
98-
99-
// close the stream (this should trigger retry with backoff; but don't
100-
// run it in an attempt to reproduce b/74749605).
101-
.watchStreamCloses(Code.UNAVAILABLE, { runBackoffTimer: false })
102-
.expectEvents(query, { fromCache: true })
103-
104-
// Because we didn't let the backoff timer run and restart the watch
105-
// stream, there will be no active targets.
106-
.expectActiveTargets()
107-
108-
// Change user (will shut down existing streams and start new ones).
109-
.changeUser('abc')
110-
// Our query should be sent to the new stream.
111-
.expectActiveTargets({ query, resumeToken: '' })
112-
113-
// Close the (newly-created) stream as if it too failed (should trigger
114-
// retry with backoff, potentially reproducing the crash in b/74749605).
115-
.watchStreamCloses(Code.UNAVAILABLE)
116-
);
117-
}
118-
);
11987
});

packages/firestore/test/unit/specs/spec_builder.ts

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,22 @@ export class SpecBuilder {
418418
return this;
419419
}
420420

421+
failDatabase(): this {
422+
this.nextStep();
423+
this.currentStep = {
424+
failDatabase: true
425+
};
426+
return this;
427+
}
428+
429+
recoverDatabase(): this {
430+
this.nextStep();
431+
this.currentStep = {
432+
failDatabase: false
433+
};
434+
return this;
435+
}
436+
421437
expectIsShutdown(): this {
422438
this.assertStep('Active target expectation requires previous step');
423439
const currentStep = this.currentStep!;
@@ -716,19 +732,14 @@ export class SpecBuilder {
716732
return this;
717733
}
718734

719-
watchStreamCloses(error: Code, opts?: { runBackoffTimer: boolean }): this {
720-
if (!opts) {
721-
opts = { runBackoffTimer: true };
722-
}
723-
735+
watchStreamCloses(error: Code): this {
724736
this.nextStep();
725737
this.currentStep = {
726738
watchStreamClose: {
727739
error: {
728740
code: mapRpcCodeFromCode(error),
729741
message: 'Simulated Backend Error'
730-
},
731-
runBackoffTimer: opts.runBackoffTimer
742+
}
732743
}
733744
};
734745
return this;

0 commit comments

Comments
 (0)