Skip to content

Commit 1628f76

Browse files
Add skipDelay to backoff
1 parent ce49a32 commit 1628f76

12 files changed

+53
-67
lines changed

packages/firestore/src/local/indexeddb_persistence.ts

+2-3
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,9 @@ import { DocumentKey } from '../model/document_key';
2323
import { Platform } from '../platform/platform';
2424
import { JsonProtoSerializer } from '../remote/serializer';
2525
import { debugAssert, fail } from '../util/assert';
26-
import { AsyncQueue, TimerId } from '../util/async_queue';
26+
import { AsyncQueue, DelayedOperation, TimerId } from '../util/async_queue';
2727
import { Code, FirestoreError } from '../util/error';
2828
import { logDebug, logError } from '../util/log';
29-
import { CancelablePromise } from '../util/promise';
3029
import {
3130
decodeResourcePath,
3231
EncodedResourcePath,
@@ -215,7 +214,7 @@ export class IndexedDbPersistence implements Persistence {
215214
private documentVisibilityHandler: ((e?: Event) => void) | null = null;
216215

217216
/** The client metadata refresh task. */
218-
private clientMetadataRefresher: CancelablePromise<void> | null = null;
217+
private clientMetadataRefresher: DelayedOperation<void> | null = null;
219218

220219
/** The last time we garbage collected the client metadata object store. */
221220
private lastGarbageCollectionTime = Number.NEGATIVE_INFINITY;

packages/firestore/src/local/lru_garbage_collector.ts

+2-3
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,9 @@
1818
import { ListenSequence } from '../core/listen_sequence';
1919
import { ListenSequenceNumber, TargetId } from '../core/types';
2020
import { debugAssert } from '../util/assert';
21-
import { AsyncQueue, TimerId } from '../util/async_queue';
21+
import { AsyncQueue, DelayedOperation, TimerId } from '../util/async_queue';
2222
import { getLogLevel, logDebug, LogLevel } from '../util/log';
2323
import { primitiveComparator } from '../util/misc';
24-
import { CancelablePromise } from '../util/promise';
2524
import { SortedMap } from '../util/sorted_map';
2625
import { SortedSet } from '../util/sorted_set';
2726
import { ignoreIfPrimaryLeaseLoss, LocalStore } from './local_store';
@@ -222,7 +221,7 @@ const REGULAR_GC_DELAY_MS = 5 * 60 * 1000;
222221
*/
223222
export class LruScheduler implements GarbageCollectionScheduler {
224223
private hasRun: boolean = false;
225-
private gcTask: CancelablePromise<void> | null;
224+
private gcTask: DelayedOperation<void> | null;
226225

227226
constructor(
228227
private readonly garbageCollector: LruGarbageCollector,

packages/firestore/src/remote/backoff.ts

+10-3
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
* limitations under the License.
1616
*/
1717

18-
import { AsyncQueue, TimerId } from '../util/async_queue';
18+
import { AsyncQueue, DelayedOperation, TimerId } from '../util/async_queue';
1919
import { logDebug } from '../util/log';
20-
import { CancelablePromise } from '../util/promise';
20+
2121
const LOG_TAG = 'ExponentialBackoff';
2222

2323
/**
@@ -42,7 +42,7 @@ const DEFAULT_BACKOFF_MAX_DELAY_MS = 60 * 1000;
4242
*/
4343
export class ExponentialBackoff {
4444
private currentBaseMs: number = 0;
45-
private timerPromise: CancelablePromise<void> | null = null;
45+
private timerPromise: DelayedOperation<void> | null = null;
4646
/** The last backoff attempt, as epoch milliseconds. */
4747
private lastAttemptTime = Date.now();
4848

@@ -149,6 +149,13 @@ export class ExponentialBackoff {
149149
}
150150
}
151151

152+
skipBackoff(): void {
153+
if (this.timerPromise !== null) {
154+
this.timerPromise.skipDelay();
155+
this.timerPromise = null;
156+
}
157+
}
158+
152159
cancel(): void {
153160
if (this.timerPromise !== null) {
154161
this.timerPromise.cancel();

packages/firestore/src/remote/online_state_tracker.ts

+2-3
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@
1717

1818
import { OnlineState } from '../core/types';
1919
import { debugAssert } from '../util/assert';
20-
import { AsyncQueue, TimerId } from '../util/async_queue';
20+
import { AsyncQueue, DelayedOperation, TimerId } from '../util/async_queue';
2121
import { FirestoreError } from '../util/error';
2222
import { logError, logDebug } from '../util/log';
23-
import { CancelablePromise } from '../util/promise';
2423

2524
const LOG_TAG = 'OnlineStateTracker';
2625

@@ -64,7 +63,7 @@ export class OnlineStateTracker {
6463
* transition from OnlineState.Unknown to OnlineState.Offline without waiting
6564
* for the stream to actually fail (MAX_WATCH_STREAM_FAILURES times).
6665
*/
67-
private onlineStateTimer: CancelablePromise<void> | null = null;
66+
private onlineStateTimer: DelayedOperation<void> | null = null;
6867

6968
/**
7069
* Whether the client should log a warning message if it fails to connect to

packages/firestore/src/remote/persistent_stream.ts

+2-3
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,10 @@ import { TargetData } from '../local/target_data';
2222
import { Mutation, MutationResult } from '../model/mutation';
2323
import * as api from '../protos/firestore_proto_api';
2424
import { hardAssert, debugAssert } from '../util/assert';
25-
import { AsyncQueue, TimerId } from '../util/async_queue';
25+
import { AsyncQueue, DelayedOperation, TimerId } from '../util/async_queue';
2626
import { Code, FirestoreError } from '../util/error';
2727
import { logError, logDebug } from '../util/log';
2828

29-
import { CancelablePromise } from '../util/promise';
3029
import { isNullOrUndefined } from '../util/types';
3130
import { ExponentialBackoff } from './backoff';
3231
import { Connection, Stream } from './connection';
@@ -164,7 +163,7 @@ export abstract class PersistentStream<
164163
*/
165164
private closeCount = 0;
166165

167-
private idleTimer: CancelablePromise<void> | null = null;
166+
private idleTimer: DelayedOperation<void> | null = null;
168167
private stream: Stream<SendType, ReceiveType> | null = null;
169168

170169
protected backoff: ExponentialBackoff;

packages/firestore/src/util/async_queue.ts

+17-18
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import { debugAssert, fail } from './assert';
1919
import { Code, FirestoreError } from './error';
2020
import { logDebug, logError } from './log';
21-
import { CancelablePromise, Deferred } from './promise';
21+
import { Deferred } from './promise';
2222
import { ExponentialBackoff } from '../remote/backoff';
2323
import { PlatformSupport } from '../platform/platform';
2424
import { isIndexedDbTransactionError } from '../local/simple_db';
@@ -86,8 +86,12 @@ export const enum TimerId {
8686
* It is created via DelayedOperation.createAndSchedule().
8787
*
8888
* Supports cancellation (via cancel()) and early execution (via skipDelay()).
89+
*
90+
* Note: We implement `PromiseLike` instead of `Promise`, as the `Promise` type
91+
* in newer versions of TypeScript defines `finally`, which is not available in
92+
* IE.
8993
*/
90-
class DelayedOperation<T extends unknown> implements CancelablePromise<T> {
94+
export class DelayedOperation<T extends unknown> implements PromiseLike<T> {
9195
// handle for use with clearTimeout(), or null if the operation has been
9296
// executed or canceled already.
9397
private timerHandle: TimerHandle | null;
@@ -175,10 +179,7 @@ class DelayedOperation<T extends unknown> implements CancelablePromise<T> {
175179
}
176180
}
177181

178-
// Promise implementation.
179-
readonly [Symbol.toStringTag]: 'Promise';
180182
then = this.deferred.promise.then.bind(this.deferred.promise);
181-
catch = this.deferred.promise.catch.bind(this.deferred.promise);
182183

183184
private handleDelayElapsed(): void {
184185
this.asyncQueue.enqueueAndForget(() => {
@@ -234,10 +235,7 @@ export class AsyncQueue {
234235
// Visibility handler that triggers an immediate retry of all retryable
235236
// operations. Meant to speed up recovery when we regain file system access
236237
// after page comes into foreground.
237-
private visibilityHandler = (): void => {
238-
// eslint-disable-next-line @typescript-eslint/no-floating-promises
239-
this.runDelayedOperationsEarly(TimerId.AsyncQueueRetry);
240-
};
238+
private visibilityHandler = (): void => this.backoff.skipBackoff();
241239

242240
constructor() {
243241
const window = PlatformSupport.getPlatform().window;
@@ -379,14 +377,14 @@ export class AsyncQueue {
379377

380378
/**
381379
* Schedules an operation to be queued on the AsyncQueue once the specified
382-
* `delayMs` has elapsed. The returned CancelablePromise can be used to cancel
383-
* the operation prior to its running.
380+
* `delayMs` has elapsed. The returned DelayedOperation can be used to cancel
381+
* or fast-forward the operation prior to its running.
384382
*/
385383
enqueueAfterDelay<T extends unknown>(
386384
timerId: TimerId,
387385
delayMs: number,
388386
op: () => Promise<T>
389-
): CancelablePromise<T> {
387+
): DelayedOperation<T> {
390388
this.verifyNotFailed();
391389

392390
debugAssert(
@@ -463,21 +461,22 @@ export class AsyncQueue {
463461
}
464462

465463
/**
466-
* Runs some or all delayed operations early.
464+
* For Tests: Runs some or all delayed operations early.
467465
*
468-
* @param timerId Delayed operations to run. Pass TimerId.All to run all
469-
* delayed operations.
466+
* @param lastTimerId Delayed operations up to and including this TimerId will
467+
* be drained. Pass TimerId.All to run all delayed operations.
470468
* @returns a Promise that resolves once all operations have been run.
471469
*/
472-
runDelayedOperationsEarly(timerId: TimerId): Promise<void> {
470+
runAllDelayedOperationsUntil(lastTimerId: TimerId): Promise<void> {
473471
// Note that draining may generate more delayed ops, so we do that first.
474472
return this.drain().then(() => {
475473
// Run ops in the same order they'd run if they ran naturally.
476474
this.delayedOperations.sort((a, b) => a.targetTimeMs - b.targetTimeMs);
477475

478476
for (const op of this.delayedOperations) {
479-
if (timerId === TimerId.All || op.timerId === timerId) {
480-
op.skipDelay();
477+
op.skipDelay();
478+
if (lastTimerId !== TimerId.All && op.timerId === lastTimerId) {
479+
break;
481480
}
482481
}
483482

packages/firestore/src/util/promise.ts

-18
Original file line numberDiff line numberDiff line change
@@ -23,24 +23,6 @@ export interface Rejecter {
2323
(reason?: Error): void;
2424
}
2525

26-
export interface CancelablePromise<T> {
27-
// We are not extending Promise, since Node's Promise API require us to
28-
// implement 'finally', which is not fully supported on Web.
29-
then<TResult1 = T, TResult2 = never>(
30-
onfulfilled?:
31-
| ((value: T) => TResult1 | PromiseLike<TResult1>)
32-
| undefined
33-
| null,
34-
onrejected?: // eslint-disable-next-line @typescript-eslint/no-explicit-any
35-
((reason: any) => TResult2 | PromiseLike<TResult2>) | undefined | null
36-
): Promise<TResult1 | TResult2>;
37-
catch<TResult = never>(
38-
onrejected?: // eslint-disable-next-line @typescript-eslint/no-explicit-any
39-
((reason: any) => TResult | PromiseLike<TResult>) | undefined | null
40-
): Promise<T | TResult>;
41-
cancel(): void;
42-
}
43-
4426
export class Deferred<R> {
4527
promise: Promise<R>;
4628
// Assigned synchronously in constructor by Promise constructor callback.

packages/firestore/test/integration/api_internal/idle_timeout.test.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ apiDescribe('Idle Timeout', (persistence: boolean) => {
2727
return docRef
2828
.set({ foo: 'bar' })
2929
.then(() => {
30-
return asyncQueue(db).runDelayedOperationsEarly(
30+
return asyncQueue(db).runAllDelayedOperationsUntil(
3131
TimerId.WriteStreamIdle
3232
);
3333
})
@@ -53,7 +53,7 @@ apiDescribe('Idle Timeout', (persistence: boolean) => {
5353

5454
return awaitOnlineSnapshot()
5555
.then(() => {
56-
return asyncQueue(db).runDelayedOperationsEarly(
56+
return asyncQueue(db).runAllDelayedOperationsUntil(
5757
TimerId.ListenStreamIdle
5858
);
5959
})

packages/firestore/test/integration/remote/stream.test.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ describe('Write Stream', () => {
206206
expect(queue.containsDelayedOperation(TimerId.WriteStreamIdle)).to.be
207207
.true;
208208
return Promise.all([
209-
queue.runDelayedOperationsEarly(TimerId.WriteStreamIdle),
209+
queue.runAllDelayedOperationsUntil(TimerId.WriteStreamIdle),
210210
streamListener.awaitCallback('close')
211211
]);
212212
})
@@ -229,7 +229,7 @@ describe('Write Stream', () => {
229229
writeStream.writeMutations(SINGLE_MUTATION);
230230
await streamListener.awaitCallback('mutationResult');
231231

232-
await queue.runDelayedOperationsEarly(TimerId.All);
232+
await queue.runAllDelayedOperationsUntil(TimerId.All);
233233
expect(writeStream.isOpen()).to.be.true;
234234
});
235235
});

packages/firestore/test/unit/local/indexeddb_persistence.test.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1185,7 +1185,7 @@ describe('IndexedDb: allowTabSynchronization', () => {
11851185
it('ignores intermittent IndexedDbTransactionError during lease refresh', async () => {
11861186
await withPersistence('clientA', async (db, _, queue) => {
11871187
db.injectFailures = ['updateClientMetadataAndTryBecomePrimary'];
1188-
await queue.runDelayedOperationsEarly(TimerId.ClientMetadataRefresh);
1188+
await queue.runAllDelayedOperationsUntil(TimerId.ClientMetadataRefresh);
11891189
await queue.enqueue(() => {
11901190
db.injectFailures = [];
11911191
return db.runTransaction('check success', 'readwrite-primary', () =>

packages/firestore/test/unit/specs/spec_test_runner.ts

+6-4
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ abstract class TestRunner {
380380
TimerId.ListenStreamConnectionBackoff
381381
)
382382
) {
383-
await this.queue.runDelayedOperationsEarly(
383+
await this.queue.runAllDelayedOperationsUntil(
384384
TimerId.ListenStreamConnectionBackoff
385385
);
386386
}
@@ -605,7 +605,7 @@ abstract class TestRunner {
605605
);
606606
// The watch stream should re-open if we have active listeners.
607607
if (spec.runBackoffTimer && !this.queryListeners.isEmpty()) {
608-
await this.queue.runDelayedOperationsEarly(
608+
await this.queue.runAllDelayedOperationsUntil(
609609
TimerId.ListenStreamConnectionBackoff
610610
);
611611
await this.connection.waitForWatchOpen();
@@ -656,7 +656,7 @@ abstract class TestRunner {
656656
// not, then there won't be a matching item on the queue and
657657
// runDelayedOperationsEarly() will throw.
658658
const timerId = timer as TimerId;
659-
await this.queue.runDelayedOperationsEarly(timerId);
659+
await this.queue.runAllDelayedOperationsUntil(timerId);
660660
}
661661

662662
private async doDisableNetwork(): Promise<void> {
@@ -706,7 +706,9 @@ abstract class TestRunner {
706706

707707
if (state.primary) {
708708
await clearCurrentPrimaryLease();
709-
await this.queue.runDelayedOperationsEarly(TimerId.ClientMetadataRefresh);
709+
await this.queue.runAllDelayedOperationsUntil(
710+
TimerId.ClientMetadataRefresh
711+
);
710712
}
711713

712714
return Promise.resolve();

packages/firestore/test/unit/util/async_queue.test.ts

+7-7
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ describe('AsyncQueue', () => {
171171
err => expect(err.code === Code.CANCELLED)
172172
);
173173

174-
await queue.runDelayedOperationsEarly(TimerId.All);
174+
await queue.runAllDelayedOperationsUntil(TimerId.All);
175175
expect(completedSteps).to.deep.equal([1]);
176176
});
177177

@@ -187,7 +187,7 @@ describe('AsyncQueue', () => {
187187
queue.enqueueAfterDelay(timerId2, 10000, () => doStep(3));
188188
queue.enqueueAndForget(() => doStep(2));
189189

190-
await queue.runDelayedOperationsEarly(TimerId.All);
190+
await queue.runAllDelayedOperationsUntil(TimerId.All);
191191
expect(completedSteps).to.deep.equal([1, 2, 3, 4]);
192192
});
193193

@@ -205,13 +205,13 @@ describe('AsyncQueue', () => {
205205
queue.enqueueAfterDelay(timerId3, 15000, () => doStep(4));
206206
queue.enqueueAndForget(() => doStep(2));
207207

208-
await queue.runDelayedOperationsEarly(timerId3);
208+
await queue.runAllDelayedOperationsUntil(timerId3);
209209
expect(completedSteps).to.deep.equal([1, 2, 4]);
210210

211-
await queue.runDelayedOperationsEarly(timerId2);
211+
await queue.runAllDelayedOperationsUntil(timerId2);
212212
expect(completedSteps).to.deep.equal([1, 2, 4, 3]);
213213

214-
await queue.runDelayedOperationsEarly(timerId1);
214+
await queue.runAllDelayedOperationsUntil(timerId1);
215215
expect(completedSteps).to.deep.equal([1, 2, 4, 3, 5]);
216216
});
217217

@@ -229,7 +229,7 @@ describe('AsyncQueue', () => {
229229
);
230230
}
231231
});
232-
await queue.runDelayedOperationsEarly(TimerId.AsyncQueueRetry);
232+
await queue.runAllDelayedOperationsUntil(TimerId.AsyncQueueRetry);
233233
expect(completedSteps).to.deep.equal([1, 1]);
234234
});
235235

@@ -285,7 +285,7 @@ describe('AsyncQueue', () => {
285285
expect(completedSteps).to.deep.equal([1]);
286286

287287
// Fast forward all operations
288-
await queue.runDelayedOperationsEarly(TimerId.AsyncQueueRetry);
288+
await queue.runAllDelayedOperationsUntil(TimerId.AsyncQueueRetry);
289289
expect(completedSteps).to.deep.equal([1, 1]);
290290
});
291291

0 commit comments

Comments
 (0)