diff --git a/packages/firestore/src/remote/persistent_stream.ts b/packages/firestore/src/remote/persistent_stream.ts index fbfee3762cb..9038896396e 100644 --- a/packages/firestore/src/remote/persistent_stream.ts +++ b/packages/firestore/src/remote/persistent_stream.ts @@ -31,6 +31,7 @@ import { Connection, Stream } from './connection'; import { JsonProtoSerializer } from './serializer'; import { WatchChange } from './watch_change'; import { isNullOrUndefined } from '../util/types'; +import { CancelablePromise } from '../util/promise'; const LOG_TAG = 'PersistentStream'; @@ -154,7 +155,7 @@ export abstract class PersistentStream< ListenerType extends PersistentStreamListener > { private state: PersistentStreamState; - private idle = false; + private inactivityTimerPromise: CancelablePromise | null = null; private stream: Stream | null = null; protected backoff: ExponentialBackoff; @@ -245,16 +246,25 @@ export abstract class PersistentStream< } /** - * Initializes the idle timer. If no write takes place within one minute, the - * WebChannel stream will be closed. + * Marks this stream as idle. If no further actions are performed on the + * stream for one minute, the stream will automatically close itself and + * notify the stream's onClose() handler with Status.OK. The stream will then + * be in a !isStarted() state, requiring the caller to start the stream again + * before further use. + * + * Only streams that are in state 'Open' can be marked idle, as all other + * states imply pending network operations. */ markIdle(): void { - this.idle = true; - this.queue - .schedule(() => { - return this.handleIdleCloseTimer(); - }, IDLE_TIMEOUT_MS) - .catch((err: FirestoreError) => { + // Starts the idle time if we are in state 'Open' and are not yet already + // running a timer (in which case the previous idle timeout still applies). + if (this.isOpen() && this.inactivityTimerPromise === null) { + this.inactivityTimerPromise = this.queue.scheduleWithDelay( + () => this.handleIdleCloseTimer(), + IDLE_TIMEOUT_MS + ); + + this.inactivityTimerPromise.catch((err: FirestoreError) => { // When the AsyncQueue gets drained during testing, pending Promises // (including these idle checks) will get rejected. We special-case // these cancelled idle checks to make sure that these specific Promise @@ -266,6 +276,7 @@ export abstract class PersistentStream< }` ); }); + } } /** Sends a message to the underlying stream. */ @@ -276,7 +287,7 @@ export abstract class PersistentStream< /** Called by the idle timer when the stream should close due to inactivity. */ private handleIdleCloseTimer(): Promise { - if (this.isOpen() && this.idle) { + if (this.isOpen()) { // When timing out an idle stream there's no reason to force the stream into backoff when // it restarts so set the stream state to Initial instead of Error. return this.close(PersistentStreamState.Initial); @@ -286,7 +297,10 @@ export abstract class PersistentStream< /** Marks the stream as active again. */ private cancelIdleCheck() { - this.idle = false; + if (this.inactivityTimerPromise) { + this.inactivityTimerPromise.cancel(); + this.inactivityTimerPromise = null; + } } /** diff --git a/packages/firestore/src/util/async_queue.ts b/packages/firestore/src/util/async_queue.ts index 4dce61ed40f..7fa76fa1902 100644 --- a/packages/firestore/src/util/async_queue.ts +++ b/packages/firestore/src/util/async_queue.ts @@ -17,15 +17,105 @@ import { assert, fail } from './assert'; import * as log from './log'; import { AnyDuringMigration, AnyJs } from './misc'; -import { Deferred } from './promise'; +import { Deferred, CancelablePromise } from './promise'; import { Code, FirestoreError } from './error'; -type DelayedOperation = { - // tslint:disable-next-line:no-any Accept any return type from setTimeout(). - handle: any; - op: () => Promise; - deferred: Deferred; -}; +// tslint:disable-next-line:no-any Accept any return type from setTimeout(). +type TimerHandle = any; + +/** + * Represents an operation scheduled to be run in the future on an AsyncQueue. + * + * It is created via DelayedOperation.createAndSchedule(). + * + * Supports cancellation (via cancel()) and early execution (via skipDelay()). + */ +class DelayedOperation implements CancelablePromise { + // handle for use with clearTimeout(), or null if the operation has been + // executed or canceled already. + private timerHandle: TimerHandle | null; + + private readonly deferred = new Deferred(); + + private constructor( + private asyncQueue: AsyncQueue, + private op: () => Promise + ) {} + + /** + * Creates and returns a DelayedOperation that has been scheduled to be + * executed on the provided asyncQueue after the provided delayMs. + */ + static createAndSchedule( + asyncQueue: AsyncQueue, + op: () => Promise, + delayMs: number + ): DelayedOperation { + const delayedOp = new DelayedOperation(asyncQueue, op); + delayedOp.start(delayMs); + return delayedOp; + } + + /** + * Starts the timer. This is called immediately after construction by + * createAndSchedule(). + */ + private start(delayMs: number): void { + this.timerHandle = setTimeout(() => this.handleDelayElapsed(), delayMs); + } + + /** + * Queues the operation to run immediately (if it hasn't already been run or + * canceled). + */ + skipDelay(): void { + return this.handleDelayElapsed(); + } + + /** + * Cancels the operation if it hasn't already been executed or canceled. The + * promise will be rejected. + * + * As long as the operation has not yet been run, calling cancel() provides a + * guarantee that the operation will not be run. + */ + cancel(reason?: string): void { + if (this.timerHandle !== null) { + this.clearTimeout(); + this.deferred.reject( + new FirestoreError( + Code.CANCELLED, + 'Operation cancelled' + (reason ? ': ' + reason : '') + ) + ); + } + } + + // Promise implementation. + readonly [Symbol.toStringTag]: 'Promise'; + then = this.deferred.promise.then.bind(this.deferred.promise); + catch = this.deferred.promise.catch.bind(this.deferred.promise); + + private handleDelayElapsed(): void { + this.asyncQueue.schedule(() => { + if (this.timerHandle !== null) { + this.clearTimeout(); + return this.op().then(result => { + return this.deferred.resolve(result); + }); + } else { + return Promise.resolve(); + } + }); + } + + private clearTimeout() { + if (this.timerHandle) { + clearTimeout(this.timerHandle); + this.timerHandle = null; + } + } +} export class AsyncQueue { // The last promise in the queue. @@ -33,17 +123,14 @@ export class AsyncQueue { // A list with timeout handles and their respective deferred promises. // Contains an entry for each operation that is queued to run in the future - // (i.e. it has a delay that has not yet elapsed). Prior to cleanup, this list - // may also contain entries that have already been run (in which case `handle` is - // null). + // (i.e. it has a delay that has not yet elapsed). private delayedOperations: Array> = []; // The number of operations that are queued to be run in the future (i.e. they - // have a delay that has not yet elapsed). Unlike `delayedOperations`, this - // is guaranteed to only contain operations that have not yet been run. - // - // Visible for testing. - delayedOperationsCount = 0; + // have a delay that has not yet elapsed). Used for testing. + get delayedOperationsCount() { + return this.delayedOperations.length; + } // visible for testing failure: Error; @@ -55,47 +142,10 @@ export class AsyncQueue { /** * Adds a new operation to the queue. Returns a promise that will be resolved * when the promise returned by the new operation is (with its value). - * - * Can optionally specify a delay (in milliseconds) to wait before queuing the - * operation. */ - schedule(op: () => Promise, delay?: number): Promise { - if (this.failure) { - fail( - 'AsyncQueue is already failed: ' + - (this.failure.stack || this.failure.message) - ); - } - - if ((delay || 0) > 0) { - this.delayedOperationsCount++; - const delayedOp: DelayedOperation = { - handle: null, - op, - deferred: new Deferred() - }; - delayedOp.handle = setTimeout(() => { - this.scheduleInternal(() => { - return delayedOp.op().then(result => { - delayedOp.deferred.resolve(result); - }); - }); - delayedOp.handle = null; - - this.delayedOperationsCount--; - if (this.delayedOperationsCount === 0) { - this.delayedOperations = []; - } - }, delay); - this.delayedOperations.push(delayedOp); - return delayedOp.deferred.promise; - } else { - return this.scheduleInternal(op); - } - } - - private scheduleInternal(op: () => Promise): Promise { - this.tail = this.tail.then(() => { + schedule(op: () => Promise): Promise { + this.verifyNotFailed(); + const newTail = this.tail.then(() => { this.operationInProgress = true; return op() .catch(error => { @@ -118,11 +168,45 @@ export class AsyncQueue { // and return the rejected Promise. throw error; }) - .then(() => { + .then(result => { this.operationInProgress = false; + return result; }); }); - return this.tail as AnyDuringMigration; + this.tail = newTail; + return newTail; + } + + /** + * Schedules an operation to be run on the AsyncQueue once the specified + * `delayMs` has elapsed. The returned DelayedOperationResult can be + * used to cancel the operation prior to its running. + */ + scheduleWithDelay( + op: () => Promise, + delayMs: number + ): CancelablePromise { + this.verifyNotFailed(); + + const delayedOp = DelayedOperation.createAndSchedule(this, op, delayMs); + this.delayedOperations.push(delayedOp); + + delayedOp.catch(err => {}).then(() => { + // NOTE: indexOf / slice are O(n), but delayedOperations is expected to be small. + const index = this.delayedOperations.indexOf(delayedOp); + assert(index >= 0, 'Delayed operation not found.'); + this.delayedOperations.slice(index, 1); + }); + return delayedOp; + } + + private verifyNotFailed(): void { + if (this.failure) { + fail( + 'AsyncQueue is already failed: ' + + (this.failure.stack || this.failure.message) + ); + } } /** @@ -143,26 +227,13 @@ export class AsyncQueue { * scheduled with a delay can be rejected or queued for immediate execution. */ drain(executeDelayedTasks: boolean): Promise { - this.delayedOperations.forEach(entry => { - if (entry.handle) { - clearTimeout(entry.handle); - if (executeDelayedTasks) { - this.scheduleInternal(entry.op).then( - entry.deferred.resolve, - entry.deferred.reject - ); - } else { - entry.deferred.reject( - new FirestoreError( - Code.CANCELLED, - 'Operation cancelled by shutdown' - ) - ); - } + this.delayedOperations.forEach(delayedOp => { + if (executeDelayedTasks) { + delayedOp.skipDelay(); + } else { + delayedOp.cancel('shutdown'); } }); - this.delayedOperations = []; - this.delayedOperationsCount = 0; return this.schedule(() => Promise.resolve()); } } diff --git a/packages/firestore/src/util/promise.ts b/packages/firestore/src/util/promise.ts index 368290a06bc..d47a9fd6ecf 100644 --- a/packages/firestore/src/util/promise.ts +++ b/packages/firestore/src/util/promise.ts @@ -24,6 +24,10 @@ export interface Rejecter { (reason?: Error): void; } +export interface CancelablePromise extends Promise { + cancel(): void; +} + export class Deferred { promise: Promise; resolve: Resolver;