Skip to content

Only run the provided delayed operation early #3101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/firestore/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# Unreleased
- [fixed] Fixed an issue that could cause Firestore to temporarily go
offline when a Window visibility event occurred.
- [feature] Added support for calling `FirebaseFiresore.settings` with
`{ ignoreUndefinedProperties: true }`. When set, Firestore ignores
undefined properties inside objects rather than rejecting the API call.
Expand Down
2 changes: 1 addition & 1 deletion packages/firestore/src/local/index_free_query_engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ export class IndexFreeQueryEngine implements QueryEngine {
if (getLogLevel() <= LogLevel.DEBUG) {
logDebug(
'IndexFreeQueryEngine',
'Using full collection scan to execute query: %s',
'Using full collection scan to execute query:',
query.toString()
);
}
Expand Down
5 changes: 2 additions & 3 deletions packages/firestore/src/local/indexeddb_persistence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ import { DocumentKey } from '../model/document_key';
import { Platform } from '../platform/platform';
import { JsonProtoSerializer } from '../remote/serializer';
import { debugAssert, fail } from '../util/assert';
import { AsyncQueue, TimerId } from '../util/async_queue';
import { AsyncQueue, DelayedOperation, TimerId } from '../util/async_queue';
import { Code, FirestoreError } from '../util/error';
import { logDebug, logError } from '../util/log';
import { CancelablePromise } from '../util/promise';
import {
decodeResourcePath,
EncodedResourcePath,
Expand Down Expand Up @@ -215,7 +214,7 @@ export class IndexedDbPersistence implements Persistence {
private documentVisibilityHandler: ((e?: Event) => void) | null = null;

/** The client metadata refresh task. */
private clientMetadataRefresher: CancelablePromise<void> | null = null;
private clientMetadataRefresher: DelayedOperation<void> | null = null;

/** The last time we garbage collected the client metadata object store. */
private lastGarbageCollectionTime = Number.NEGATIVE_INFINITY;
Expand Down
5 changes: 2 additions & 3 deletions packages/firestore/src/local/lru_garbage_collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
import { ListenSequence } from '../core/listen_sequence';
import { ListenSequenceNumber, TargetId } from '../core/types';
import { debugAssert } from '../util/assert';
import { AsyncQueue, TimerId } from '../util/async_queue';
import { AsyncQueue, DelayedOperation, TimerId } from '../util/async_queue';
import { getLogLevel, logDebug, LogLevel } from '../util/log';
import { primitiveComparator } from '../util/misc';
import { CancelablePromise } from '../util/promise';
import { SortedMap } from '../util/sorted_map';
import { SortedSet } from '../util/sorted_set';
import { ignoreIfPrimaryLeaseLoss, LocalStore } from './local_store';
Expand Down Expand Up @@ -222,7 +221,7 @@ const REGULAR_GC_DELAY_MS = 5 * 60 * 1000;
*/
export class LruScheduler implements GarbageCollectionScheduler {
private hasRun: boolean = false;
private gcTask: CancelablePromise<void> | null;
private gcTask: DelayedOperation<void> | null;

constructor(
private readonly garbageCollector: LruGarbageCollector,
Expand Down
13 changes: 10 additions & 3 deletions packages/firestore/src/remote/backoff.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
* limitations under the License.
*/

import { AsyncQueue, TimerId } from '../util/async_queue';
import { AsyncQueue, DelayedOperation, TimerId } from '../util/async_queue';
import { logDebug } from '../util/log';
import { CancelablePromise } from '../util/promise';

const LOG_TAG = 'ExponentialBackoff';

/**
Expand All @@ -42,7 +42,7 @@ const DEFAULT_BACKOFF_MAX_DELAY_MS = 60 * 1000;
*/
export class ExponentialBackoff {
private currentBaseMs: number = 0;
private timerPromise: CancelablePromise<void> | null = null;
private timerPromise: DelayedOperation<void> | null = null;
/** The last backoff attempt, as epoch milliseconds. */
private lastAttemptTime = Date.now();

Expand Down Expand Up @@ -149,6 +149,13 @@ export class ExponentialBackoff {
}
}

skipBackoff(): void {
if (this.timerPromise !== null) {
this.timerPromise.skipDelay();
this.timerPromise = null;
}
}

cancel(): void {
if (this.timerPromise !== null) {
this.timerPromise.cancel();
Expand Down
5 changes: 2 additions & 3 deletions packages/firestore/src/remote/online_state_tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

import { OnlineState } from '../core/types';
import { debugAssert } from '../util/assert';
import { AsyncQueue, TimerId } from '../util/async_queue';
import { AsyncQueue, DelayedOperation, TimerId } from '../util/async_queue';
import { FirestoreError } from '../util/error';
import { logError, logDebug } from '../util/log';
import { CancelablePromise } from '../util/promise';

const LOG_TAG = 'OnlineStateTracker';

Expand Down Expand Up @@ -64,7 +63,7 @@ export class OnlineStateTracker {
* transition from OnlineState.Unknown to OnlineState.Offline without waiting
* for the stream to actually fail (MAX_WATCH_STREAM_FAILURES times).
*/
private onlineStateTimer: CancelablePromise<void> | null = null;
private onlineStateTimer: DelayedOperation<void> | null = null;

/**
* Whether the client should log a warning message if it fails to connect to
Expand Down
5 changes: 2 additions & 3 deletions packages/firestore/src/remote/persistent_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ import { TargetData } from '../local/target_data';
import { Mutation, MutationResult } from '../model/mutation';
import * as api from '../protos/firestore_proto_api';
import { hardAssert, debugAssert } from '../util/assert';
import { AsyncQueue, TimerId } from '../util/async_queue';
import { AsyncQueue, DelayedOperation, TimerId } from '../util/async_queue';
import { Code, FirestoreError } from '../util/error';
import { logError, logDebug } from '../util/log';

import { CancelablePromise } from '../util/promise';
import { isNullOrUndefined } from '../util/types';
import { ExponentialBackoff } from './backoff';
import { Connection, Stream } from './connection';
Expand Down Expand Up @@ -164,7 +163,7 @@ export abstract class PersistentStream<
*/
private closeCount = 0;

private idleTimer: CancelablePromise<void> | null = null;
private idleTimer: DelayedOperation<void> | null = null;
private stream: Stream<SendType, ReceiveType> | null = null;

protected backoff: ExponentialBackoff;
Expand Down
27 changes: 12 additions & 15 deletions packages/firestore/src/util/async_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import { debugAssert, fail } from './assert';
import { Code, FirestoreError } from './error';
import { logDebug, logError } from './log';
import { CancelablePromise, Deferred } from './promise';
import { Deferred } from './promise';
import { ExponentialBackoff } from '../remote/backoff';
import { PlatformSupport } from '../platform/platform';
import { isIndexedDbTransactionError } from '../local/simple_db';
Expand Down Expand Up @@ -86,8 +86,12 @@ export const enum TimerId {
* It is created via DelayedOperation.createAndSchedule().
*
* Supports cancellation (via cancel()) and early execution (via skipDelay()).
*
* Note: We implement `PromiseLike` instead of `Promise`, as the `Promise` type
* in newer versions of TypeScript defines `finally`, which is not available in
* IE.
*/
class DelayedOperation<T extends unknown> implements CancelablePromise<T> {
export class DelayedOperation<T extends unknown> implements PromiseLike<T> {
// handle for use with clearTimeout(), or null if the operation has been
// executed or canceled already.
private timerHandle: TimerHandle | null;
Expand Down Expand Up @@ -175,10 +179,7 @@ class DelayedOperation<T extends unknown> implements CancelablePromise<T> {
}
}

// Promise implementation.
readonly [Symbol.toStringTag]: 'Promise';
then = this.deferred.promise.then.bind(this.deferred.promise);
catch = this.deferred.promise.catch.bind(this.deferred.promise);

private handleDelayElapsed(): void {
this.asyncQueue.enqueueAndForget(() => {
Expand Down Expand Up @@ -234,10 +235,7 @@ export class AsyncQueue {
// Visibility handler that triggers an immediate retry of all retryable
// operations. Meant to speed up recovery when we regain file system access
// after page comes into foreground.
private visibilityHandler = (): void => {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.runDelayedOperationsEarly(TimerId.AsyncQueueRetry);
};
private visibilityHandler = (): void => this.backoff.skipBackoff();

constructor() {
const window = PlatformSupport.getPlatform().window;
Expand Down Expand Up @@ -379,14 +377,14 @@ export class AsyncQueue {

/**
* Schedules an operation to be queued on the AsyncQueue once the specified
* `delayMs` has elapsed. The returned CancelablePromise can be used to cancel
* the operation prior to its running.
* `delayMs` has elapsed. The returned DelayedOperation can be used to cancel
* or fast-forward the operation prior to its running.
*/
enqueueAfterDelay<T extends unknown>(
timerId: TimerId,
delayMs: number,
op: () => Promise<T>
): CancelablePromise<T> {
): DelayedOperation<T> {
this.verifyNotFailed();

debugAssert(
Expand Down Expand Up @@ -466,11 +464,10 @@ export class AsyncQueue {
* For Tests: Runs some or all delayed operations early.
*
* @param lastTimerId Delayed operations up to and including this TimerId will
* be drained. Throws if no such operation exists. Pass TimerId.All to run
* all delayed operations.
* be drained. Pass TimerId.All to run all delayed operations.
* @returns a Promise that resolves once all operations have been run.
*/
runDelayedOperationsEarly(lastTimerId: TimerId): Promise<void> {
runAllDelayedOperationsUntil(lastTimerId: TimerId): Promise<void> {
// Note that draining may generate more delayed ops, so we do that first.
return this.drain().then(() => {
// Run ops in the same order they'd run if they ran naturally.
Expand Down
18 changes: 0 additions & 18 deletions packages/firestore/src/util/promise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,6 @@ export interface Rejecter {
(reason?: Error): void;
}

export interface CancelablePromise<T> {
// We are not extending Promise, since Node's Promise API require us to
// implement 'finally', which is not fully supported on Web.
then<TResult1 = T, TResult2 = never>(
onfulfilled?:
| ((value: T) => TResult1 | PromiseLike<TResult1>)
| undefined
| null,
onrejected?: // eslint-disable-next-line @typescript-eslint/no-explicit-any
((reason: any) => TResult2 | PromiseLike<TResult2>) | undefined | null
): Promise<TResult1 | TResult2>;
catch<TResult = never>(
onrejected?: // eslint-disable-next-line @typescript-eslint/no-explicit-any
((reason: any) => TResult | PromiseLike<TResult>) | undefined | null
): Promise<T | TResult>;
cancel(): void;
}

export class Deferred<R> {
promise: Promise<R>;
// Assigned synchronously in constructor by Promise constructor callback.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ apiDescribe('Idle Timeout', (persistence: boolean) => {
return docRef
.set({ foo: 'bar' })
.then(() => {
return asyncQueue(db).runDelayedOperationsEarly(
return asyncQueue(db).runAllDelayedOperationsUntil(
TimerId.WriteStreamIdle
);
})
Expand All @@ -53,7 +53,7 @@ apiDescribe('Idle Timeout', (persistence: boolean) => {

return awaitOnlineSnapshot()
.then(() => {
return asyncQueue(db).runDelayedOperationsEarly(
return asyncQueue(db).runAllDelayedOperationsUntil(
TimerId.ListenStreamIdle
);
})
Expand Down
4 changes: 2 additions & 2 deletions packages/firestore/test/integration/remote/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ describe('Write Stream', () => {
expect(queue.containsDelayedOperation(TimerId.WriteStreamIdle)).to.be
.true;
return Promise.all([
queue.runDelayedOperationsEarly(TimerId.WriteStreamIdle),
queue.runAllDelayedOperationsUntil(TimerId.WriteStreamIdle),
streamListener.awaitCallback('close')
]);
})
Expand All @@ -229,7 +229,7 @@ describe('Write Stream', () => {
writeStream.writeMutations(SINGLE_MUTATION);
await streamListener.awaitCallback('mutationResult');

await queue.runDelayedOperationsEarly(TimerId.All);
await queue.runAllDelayedOperationsUntil(TimerId.All);
expect(writeStream.isOpen()).to.be.true;
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1185,7 +1185,7 @@ describe('IndexedDb: allowTabSynchronization', () => {
it('ignores intermittent IndexedDbTransactionError during lease refresh', async () => {
await withPersistence('clientA', async (db, _, queue) => {
db.injectFailures = ['updateClientMetadataAndTryBecomePrimary'];
await queue.runDelayedOperationsEarly(TimerId.ClientMetadataRefresh);
await queue.runAllDelayedOperationsUntil(TimerId.ClientMetadataRefresh);
await queue.enqueue(() => {
db.injectFailures = [];
return db.runTransaction('check success', 'readwrite-primary', () =>
Expand Down
10 changes: 6 additions & 4 deletions packages/firestore/test/unit/specs/spec_test_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ abstract class TestRunner {
TimerId.ListenStreamConnectionBackoff
)
) {
await this.queue.runDelayedOperationsEarly(
await this.queue.runAllDelayedOperationsUntil(
TimerId.ListenStreamConnectionBackoff
);
}
Expand Down Expand Up @@ -605,7 +605,7 @@ abstract class TestRunner {
);
// The watch stream should re-open if we have active listeners.
if (spec.runBackoffTimer && !this.queryListeners.isEmpty()) {
await this.queue.runDelayedOperationsEarly(
await this.queue.runAllDelayedOperationsUntil(
TimerId.ListenStreamConnectionBackoff
);
await this.connection.waitForWatchOpen();
Expand Down Expand Up @@ -656,7 +656,7 @@ abstract class TestRunner {
// not, then there won't be a matching item on the queue and
// runDelayedOperationsEarly() will throw.
const timerId = timer as TimerId;
await this.queue.runDelayedOperationsEarly(timerId);
await this.queue.runAllDelayedOperationsUntil(timerId);
}

private async doDisableNetwork(): Promise<void> {
Expand Down Expand Up @@ -706,7 +706,9 @@ abstract class TestRunner {

if (state.primary) {
await clearCurrentPrimaryLease();
await this.queue.runDelayedOperationsEarly(TimerId.ClientMetadataRefresh);
await this.queue.runAllDelayedOperationsUntil(
TimerId.ClientMetadataRefresh
);
}

return Promise.resolve();
Expand Down
12 changes: 6 additions & 6 deletions packages/firestore/test/unit/util/async_queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import * as chaiAsPromised from 'chai-as-promised';
import { expect, use } from 'chai';
import { AsyncQueue, TimerId } from '../../../src/util/async_queue';
import { Code } from '../../../src/util/error';
import { getLogLevel, setLogLevel, LogLevel } from '../../../src/util/log';
import { getLogLevel, LogLevel, setLogLevel } from '../../../src/util/log';
import { Deferred, Rejecter, Resolver } from '../../../src/util/promise';
import { fail } from '../../../src/util/assert';
import { IndexedDbTransactionError } from '../../../src/local/simple_db';
Expand Down Expand Up @@ -171,7 +171,7 @@ describe('AsyncQueue', () => {
err => expect(err.code === Code.CANCELLED)
);

await queue.runDelayedOperationsEarly(TimerId.All);
await queue.runAllDelayedOperationsUntil(TimerId.All);
expect(completedSteps).to.deep.equal([1]);
});

Expand All @@ -187,7 +187,7 @@ describe('AsyncQueue', () => {
queue.enqueueAfterDelay(timerId2, 10000, () => doStep(3));
queue.enqueueAndForget(() => doStep(2));

await queue.runDelayedOperationsEarly(TimerId.All);
await queue.runAllDelayedOperationsUntil(TimerId.All);
expect(completedSteps).to.deep.equal([1, 2, 3, 4]);
});

Expand All @@ -205,7 +205,7 @@ describe('AsyncQueue', () => {
queue.enqueueAfterDelay(timerId3, 15000, () => doStep(4));
queue.enqueueAndForget(() => doStep(2));

await queue.runDelayedOperationsEarly(timerId3);
await queue.runAllDelayedOperationsUntil(timerId3);
expect(completedSteps).to.deep.equal([1, 2, 3, 4]);
});

Expand All @@ -223,7 +223,7 @@ describe('AsyncQueue', () => {
);
}
});
await queue.runDelayedOperationsEarly(TimerId.AsyncQueueRetry);
await queue.runAllDelayedOperationsUntil(TimerId.AsyncQueueRetry);
expect(completedSteps).to.deep.equal([1, 1]);
});

Expand Down Expand Up @@ -279,7 +279,7 @@ describe('AsyncQueue', () => {
expect(completedSteps).to.deep.equal([1]);

// Fast forward all operations
await queue.runDelayedOperationsEarly(TimerId.AsyncQueueRetry);
await queue.runAllDelayedOperationsUntil(TimerId.AsyncQueueRetry);
expect(completedSteps).to.deep.equal([1, 1]);
});

Expand Down