From f86d8c97fd12e637a986a97235a56b380ef49301 Mon Sep 17 00:00:00 2001 From: wti806 <32399754+wti806@users.noreply.github.com> Date: Mon, 30 Jul 2018 15:03:11 -0700 Subject: [PATCH 1/4] Catch invalid provider id error (#1064) --- packages/auth/src/error_auth.js | 3 +++ packages/auth/src/rpchandler.js | 5 +++++ packages/auth/test/rpchandler_test.js | 24 ++++++++++++++++++++++++ 3 files changed, 32 insertions(+) diff --git a/packages/auth/src/error_auth.js b/packages/auth/src/error_auth.js index 251718ea235..e6ae679ff62 100644 --- a/packages/auth/src/error_auth.js +++ b/packages/auth/src/error_auth.js @@ -147,6 +147,7 @@ fireauth.authenum.Error = { INVALID_PASSWORD: 'wrong-password', INVALID_PERSISTENCE: 'invalid-persistence-type', INVALID_PHONE_NUMBER: 'invalid-phone-number', + INVALID_PROVIDER_ID: 'invalid-provider-id', INVALID_RECIPIENT_EMAIL: 'invalid-recipient-email', INVALID_SENDER: 'invalid-sender', INVALID_SESSION_INFO: 'invalid-verification-id', @@ -294,6 +295,8 @@ fireauth.AuthError.MESSAGES_[fireauth.authenum.Error.INVALID_PHONE_NUMBER] = 'phone number in a format that can be parsed into E.164 format. E.164 ' + 'phone numbers are written in the format [+][country code][subscriber ' + 'number including area code].'; +fireauth.AuthError.MESSAGES_[fireauth.authenum.Error.INVALID_PROVIDER_ID] = + 'The specified provider ID is invalid.'; fireauth.AuthError.MESSAGES_[fireauth.authenum.Error.INVALID_RECIPIENT_EMAIL] = 'The email corresponding to this action failed to send as the provided ' + 'recipient email address is invalid.'; diff --git a/packages/auth/src/rpchandler.js b/packages/auth/src/rpchandler.js index 65a5352e38f..14aa28f4a9d 100644 --- a/packages/auth/src/rpchandler.js +++ b/packages/auth/src/rpchandler.js @@ -207,6 +207,7 @@ fireauth.RpcHandler.ServerError = { INVALID_OOB_CODE: 'INVALID_OOB_CODE', INVALID_PASSWORD: 'INVALID_PASSWORD', INVALID_PHONE_NUMBER: 'INVALID_PHONE_NUMBER', + INVALID_PROVIDER_ID: 'INVALID_PROVIDER_ID', INVALID_RECIPIENT_EMAIL: 'INVALID_RECIPIENT_EMAIL', INVALID_SENDER: 'INVALID_SENDER', INVALID_SESSION_INFO: 'INVALID_SESSION_INFO', @@ -2244,6 +2245,10 @@ fireauth.RpcHandler.getDeveloperError_ = errorMap[fireauth.RpcHandler.ServerError.MISSING_OOB_CODE] = fireauth.authenum.Error.INTERNAL_ERROR; + // Get Auth URI errors: + errorMap[fireauth.RpcHandler.ServerError.INVALID_PROVIDER_ID] = + fireauth.authenum.Error.INVALID_PROVIDER_ID; + // Operations that require ID token in request: errorMap[fireauth.RpcHandler.ServerError.CREDENTIAL_TOO_OLD_LOGIN_AGAIN] = fireauth.authenum.Error.CREDENTIAL_TOO_OLD_LOGIN_AGAIN; diff --git a/packages/auth/test/rpchandler_test.js b/packages/auth/test/rpchandler_test.js index 08fa7837219..27adb1e2471 100644 --- a/packages/auth/test/rpchandler_test.js +++ b/packages/auth/test/rpchandler_test.js @@ -5268,6 +5268,30 @@ function testGetAuthUri_success() { } +/** + * Tests server side getAuthUri error. + */ +function testGetAuthUri_caughtServerError() { + var expectedUrl = 'https://www.googleapis.com/identitytoolkit/v3/relyin' + + 'gparty/createAuthUri?key=apiKey'; + var requestBody = { + 'providerId': 'abc.com', + 'continueUri': 'http://localhost/widget', + 'customParameter': {} + }; + var errorMap = {}; + // All related server errors for getAuthUri. + errorMap[fireauth.RpcHandler.ServerError.INVALID_PROVIDER_ID] = + fireauth.authenum.Error.INVALID_PROVIDER_ID; + + assertServerErrorsAreHandled(function() { + return rpcHandler.getAuthUri( + 'abc.com', + 'http://localhost/widget'); + }, errorMap, expectedUrl, requestBody); +} + + /** * Tests successful getAuthUri request with Google provider and sessionId. */ From a80a597b57c7ac3cb91cc197b72913b5afcc014f Mon Sep 17 00:00:00 2001 From: David East Date: Mon, 30 Jul 2018 16:42:44 -0600 Subject: [PATCH 2/4] RxFire: Api Change and documentation (#1066) * api changes and doc updates * fixes --- packages/rxfire/docs/storage.md | 172 ++++++++++++++++++ packages/rxfire/firestore/collection/index.ts | 6 +- 2 files changed, 175 insertions(+), 3 deletions(-) create mode 100644 packages/rxfire/docs/storage.md diff --git a/packages/rxfire/docs/storage.md b/packages/rxfire/docs/storage.md new file mode 100644 index 00000000000..0f99640ca27 --- /dev/null +++ b/packages/rxfire/docs/storage.md @@ -0,0 +1,172 @@ +# RxFire Storage + +## Task Observables + +### `fromTask()` +The `fromTask()` function creates an observable that emits progress changes. + +| | | +|-----------------|--------------------------------------------| +| **function** | `fromTask()` | +| **params** | `storage.UploadTask` | +| **import path** | `rxfire/storage` | +| **return** | `Observable` | + +#### TypeScript Example +```ts +import { fromTask } from 'rxfire/firestore'; +import * as firebase from 'firebase'; +import 'firebase/storage'; + +// Set up Firebase +const app = initializeApp({ /* config */ }); +const storage = app.storage(); +const davidRef = storage.ref('users/david.png'); + +// Upload a transparent 1x1 pixel image +const task = davidRef.putString('R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7', 'base64'); + +fromTask(task) + .subscribe(snap => { console.log(snap.bytesTransferred); }); +``` + +### `percentage()` +The `percentage()` function creates an observable that emits percentage of the uploaded bytes. + +| | | +|-----------------|--------------------------------------------| +| **function** | `fromTask()` | +| **params** | `storage.UploadTask` | +| **import path** | `rxfire/storage` | +| **return** | `Observable` | + +#### TypeScript Example +```ts +import { percentage } from 'rxfire/firestore'; +import * as firebase from 'firebase'; +import 'firebase/storage'; + +// Set up Firebase +const app = initializeApp({ /* config */ }); +const storage = app.storage(); +const davidRef = storage.ref('users/david.png'); + +// Upload a transparent 1x1 pixel image +const task = davidRef.putString('R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7', 'base64'); + +percentage(task) + .subscribe(uploadProgress => { console.log(uploadProgress); }); +``` + +## Reference Observables + +### `getDownloadURL()` +The `getDownloadURL()` function creates an observable that emits the URL of the file. + +| | | +|-----------------|------------------------------------------| +| **function** | `getDownloadURL()` | +| **params** | `storage.Reference` | +| **import path** | `rxfire/storage` | +| **return** | `Observable` | + +#### TypeScript Example +```ts +import { getDownloadURL } from 'rxfire/storage'; +import * as firebase from 'firebase'; +import 'firebase/storage'; + +// Set up Firebase +const app = initializeApp({ /* config */ }); +const storage = app.storage(); + +// Assume this exists +const davidRef = storage.ref('users/david.png'); + +getDownloadURL(davidRef) + .subscribe(url => { console.log(url) }); +``` + +### `getMetadata()` +The `getMetadata()` function creates an observable that emits the URL of the file's metadta. + +| | | +|-----------------|------------------------------------------| +| **function** | `getMetadata()` | +| **params** | `storage.Reference` | +| **import path** | `rxfire/storage` | +| **return** | `Observable` | + +#### TypeScript Example +```ts +import { getMetadata } from 'rxfire/storage'; +import * as firebase from 'firebase'; +import 'firebase/storage'; + +// Set up Firebase +const app = initializeApp({ /* config */ }); +const storage = app.storage(); + +// Assume this exists +const davidRef = storage.ref('users/david.png'); + +getMetadata(davidRef) + .subscribe(meta => { console.log(meta) }); +``` + +### `put()` +The `put()` function creates an observable that emits the upload progress of a file. + +| | | +|-----------------|------------------------------------------| +| **function** | `put()` | +| **params** | ref: `storage.Reference`, data: `any`, metadata?: `storage.UploadMetadata` | +| **import path** | `rxfire/storage` | +| **return** | `Observable` | + +#### TypeScript Example +```ts +import { put } from 'rxfire/storage'; +import * as firebase from 'firebase'; +import 'firebase/storage'; + +// Set up Firebase +const app = initializeApp({ /* config */ }); +const storage = app.storage(); +const dataRef = storage.ref('users/david.json'); + +const blob = new Blob( + [JSON.stringify({ name: 'david'}, null, 2)], + { type : 'application/json' } +); + +put(davidRef, blob, { type : 'application/json' }) + .subscribe(snap => { console.log(snap.bytesTransferred) }); +``` + +### `putString()` +The `putString()` function creates an observable that emits the upload progress of a file. + +| | | +|-----------------|------------------------------------------| +| **function** | `putString()` | +| **params** | ref: `storage.Reference`, data: `string`, metadata?: `storage.UploadMetadata` | +| **import path** | `rxfire/storage` | +| **return** | `Observable` | + +#### TypeScript Example +```ts +import { putString } from 'rxfire/storage'; +import * as firebase from 'firebase'; +import 'firebase/storage'; + +// Set up Firebase +const app = initializeApp({ /* config */ }); +const storage = app.storage(); +const davidRef = storage.ref('users/david.png'); + +const base64 = 'R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7'; + +putString(davidRef, base64, { type : 'application/json' }) + .subscribe(snap => { console.log(snap.bytesTransferred) }); +``` diff --git a/packages/rxfire/firestore/collection/index.ts b/packages/rxfire/firestore/collection/index.ts index 817de03722b..ecc647ad66c 100644 --- a/packages/rxfire/firestore/collection/index.ts +++ b/packages/rxfire/firestore/collection/index.ts @@ -117,7 +117,7 @@ function processDocumentChanges( * order of occurence. * @param query */ -export function docChanges( +export function collectionChanges( query: firestore.Query, events: firestore.DocumentChangeType[] = ALL_EVENTS ) { @@ -144,7 +144,7 @@ export function sortedChanges( query: firestore.Query, events?: firestore.DocumentChangeType[] ) { - return docChanges(query, events).pipe( + return collectionChanges(query, events).pipe( scan( ( current: firestore.DocumentChange[], @@ -163,7 +163,7 @@ export function auditTrail( query: firestore.Query, events?: firestore.DocumentChangeType[] ): Observable { - return docChanges(query, events).pipe( + return collectionChanges(query, events).pipe( scan((current, action) => [...current, ...action], []) ); } From f14ebc245f58832776ec9fcde307175fba84f54e Mon Sep 17 00:00:00 2001 From: Michael Lehenbauer Date: Mon, 30 Jul 2018 16:37:10 -0700 Subject: [PATCH 3/4] Refactor PersistentStream (no behavior changes). (#1041) This breaks out a number of changes I made as prep for b/80402781 (Continue retrying streams for 1 minute (idle delay)). PersistentStream changes: * Rather than providing a stream event listener to every call of start(), the stream listener is now provided once to the constructor and cannot be changed. * Streams can now be restarted indefinitely, even after a call to stop(). * PersistentStreamState.Stopped was removed and we just return to 'Initial' after a stop() call. * Added `closeCount` member to PersistentStream in order to avoid bleedthrough issues with auth and stream events once stop() has been called. * Calling stop() now triggers the onClose() event listener, which simplifies stream cleanup. * PersistentStreamState.Auth renamed to 'Starting' to better reflect that it encompasses both authentication and opening the stream. RemoteStore changes: * Creates streams once and just stop() / start()s them as necessary, never recreating them completely. * Added networkEnabled flag to track whether the network is enabled or not, since we no longer null out the streams. * Refactored disableNetwork() / enableNetwork() to remove stream re-creation. Misc: * Comment improvements including a state diagram on PersistentStream. * Fixed spec test shutdown to schedule via the AsyncQueue to fix sequencing order I ran into. --- packages/firestore/src/remote/datastore.ts | 15 +- .../firestore/src/remote/persistent_stream.ts | 256 ++++++++++-------- packages/firestore/src/remote/remote_store.ts | 203 +++++++------- .../test/integration/remote/stream.test.ts | 46 ++-- .../test/unit/specs/spec_test_runner.ts | 8 +- 5 files changed, 278 insertions(+), 250 deletions(-) diff --git a/packages/firestore/src/remote/datastore.ts b/packages/firestore/src/remote/datastore.ts index 6930764f38a..a958e8076a5 100644 --- a/packages/firestore/src/remote/datastore.ts +++ b/packages/firestore/src/remote/datastore.ts @@ -1,3 +1,4 @@ +import { WatchStreamListener, WriteStreamListener } from './persistent_stream'; /** * Copyright 2017 Google Inc. * @@ -54,21 +55,27 @@ export class Datastore { private serializer: JsonProtoSerializer ) {} - newPersistentWriteStream(): PersistentWriteStream { + newPersistentWriteStream( + listener: WriteStreamListener + ): PersistentWriteStream { return new PersistentWriteStream( this.queue, this.connection, this.credentials, - this.serializer + this.serializer, + listener ); } - newPersistentWatchStream(): PersistentListenStream { + newPersistentWatchStream( + listener: WatchStreamListener + ): PersistentListenStream { return new PersistentListenStream( this.queue, this.connection, this.credentials, - this.serializer + this.serializer, + listener ); } diff --git a/packages/firestore/src/remote/persistent_stream.ts b/packages/firestore/src/remote/persistent_stream.ts index bd2774e0cc0..8d20cd9195c 100644 --- a/packages/firestore/src/remote/persistent_stream.ts +++ b/packages/firestore/src/remote/persistent_stream.ts @@ -43,21 +43,38 @@ interface ListenRequest extends api.ListenRequest { export interface WriteRequest extends api.WriteRequest { database?: string; } - +/** + * PersistentStream can be in one of 5 states (each described in detail below) + * based on the following state transition diagram: + * + * start() called auth & connection succeeded + * INITIAL ----------------> STARTING -----------------------------> OPEN + * ^ | | + * | | error occurred | + * | \-----------------------------v-----/ + * | | + * backoff | | + * elapsed | start() called | + * \--- BACKOFF <---------------- ERROR + * + * [any state] --------------------------> INITIAL + * stop() called or + * idle timer expired + */ enum PersistentStreamState { /** - * The streaming RPC is not running and there's no error condition. + * The streaming RPC is not yet running and there's no error condition. * Calling `start` will start the stream immediately without backoff. * While in this state isStarted will return false. */ Initial, /** - * The stream is starting, and is waiting for an auth token to attach to - * the initial request. While in this state, isStarted will return - * true but isOpen will return false. + * The stream is starting, either waiting for an auth token or for the stream + * to successfully open. While in this state, isStarted will return true but + * isOpen will return false. */ - Auth, + Starting, /** * The streaming RPC is up and running. Requests and responses can flow @@ -68,22 +85,16 @@ enum PersistentStreamState { /** * The stream encountered an error. The next start attempt will back off. * While in this state isStarted() will return false. - * */ Error, /** * An in-between state after an error where the stream is waiting before - * re-starting. After - * waiting is complete, the stream will try to open. While in this - * state isStarted() will return YES but isOpen will return false. + * re-starting. After waiting is complete, the stream will try to open. + * While in this state isStarted() will return true but isOpen will return + * false. */ - Backoff, - - /** - * The stream has been explicitly stopped; no further events will be emitted. - */ - Stopped + Backoff } /** @@ -125,6 +136,7 @@ const IDLE_TIMEOUT_MS = 60 * 1000; * - Exponential backoff on failure * - Authentication via CredentialsProvider * - Dispatching all callbacks into the shared worker queue + * - Closing idle streams after 60 seconds of inactivity * * Subclasses of PersistentStream implement serialization of models to and * from the JSON representation of the protocol buffers for a specific @@ -153,20 +165,26 @@ export abstract class PersistentStream< ReceiveType, ListenerType extends PersistentStreamListener > { - private state: PersistentStreamState; + private state = PersistentStreamState.Initial; + /** + * A close count that's incremented every time the stream is closed; used by + * getCloseGuardedDispatcher() to invalidate callbacks that happen after + * close. + */ + private closeCount = 0; + private inactivityTimerPromise: CancelablePromise | null = null; private stream: Stream | null = null; protected backoff: ExponentialBackoff; - protected listener: ListenerType | null = null; - constructor( private queue: AsyncQueue, connectionTimerId: TimerId, private idleTimerId: TimerId, protected connection: Connection, - private credentialsProvider: CredentialsProvider + private credentialsProvider: CredentialsProvider, + protected listener: ListenerType ) { this.backoff = new ExponentialBackoff( queue, @@ -175,7 +193,6 @@ export abstract class PersistentStream< BACKOFF_FACTOR, BACKOFF_MAX_DELAY_MS ); - this.state = PersistentStreamState.Initial; } /** @@ -187,14 +204,14 @@ export abstract class PersistentStream< */ isStarted(): boolean { return ( - this.state === PersistentStreamState.Backoff || - this.state === PersistentStreamState.Auth || - this.state === PersistentStreamState.Open + this.state === PersistentStreamState.Starting || + this.state === PersistentStreamState.Open || + this.state === PersistentStreamState.Backoff ); } /** - * Returns true if the underlying RPC is open (the openHandler has been + * Returns true if the underlying RPC is open (the onOpen callback has been * called) and the stream is ready for outbound requests. */ isOpen(): boolean { @@ -206,16 +223,15 @@ export abstract class PersistentStream< * not immediately ready for use: onOpen will be invoked when the RPC is ready * for outbound requests, at which point isOpen will return true. * - * When start returns, isStarted will return true. + * When start returns, isStarted will return true. */ - start(listener: ListenerType): void { + start(): void { if (this.state === PersistentStreamState.Error) { - this.performBackoff(listener); + this.performBackoff(); return; } assert(this.state === PersistentStreamState.Initial, 'Already started'); - this.listener = listener; this.auth(); } @@ -227,7 +243,7 @@ export abstract class PersistentStream< */ stop(): void { if (this.isStarted()) { - this.close(PersistentStreamState.Stopped); + this.close(PersistentStreamState.Initial); } } @@ -299,8 +315,7 @@ export abstract class PersistentStream< * * sets internal stream state to 'finalState'; * * adjusts the backoff timer based on the error * - * A new stream can be opened by calling `start` unless `finalState` is set to - * `PersistentStreamState.Stopped`. + * A new stream can be opened by calling `start`. * * @param finalState the intended state of the stream after closing. * @param error the error the connection was closed with. @@ -309,18 +324,20 @@ export abstract class PersistentStream< finalState: PersistentStreamState, error?: FirestoreError ): Promise { + assert(this.isStarted(), 'Only started streams should be closed.'); assert( finalState === PersistentStreamState.Error || isNullOrUndefined(error), "Can't provide an error when not in an error state." ); - // The stream will be closed so we don't need our idle close timer anymore. + // Cancel any outstanding timers (they're guaranteed not to execute). this.cancelIdleCheck(); - - // Ensure we don't leave a pending backoff operation queued (in case close() - // was called while we were waiting to reconnect). this.backoff.cancel(); + // Invalidates any stream-related callbacks (e.g. from auth or the + // underlying stream), guaranteeing they won't execute. + this.closeCount++; + if (finalState !== PersistentStreamState.Error) { // If this is an intentional close ensure we don't delay our next connection attempt. this.backoff.reset(); @@ -347,16 +364,9 @@ export abstract class PersistentStream< // This state must be assigned before calling onClose() to allow the callback to // inhibit backoff or otherwise manipulate the state in its non-started state. this.state = finalState; - const listener = this.listener!; - // Clear the listener to avoid bleeding of events from the underlying streams. - this.listener = null; - - // If the caller explicitly requested a stream stop, don't notify them of a closing stream (it - // could trigger undesirable recovery logic, etc.). - if (finalState !== PersistentStreamState.Stopped) { - return listener.onClose(error); - } + // Notify the listener that the stream closed. + await this.listener.onClose(error); } /** @@ -386,98 +396,84 @@ export abstract class PersistentStream< 'Must be in initial state to auth' ); - this.state = PersistentStreamState.Auth; + this.state = PersistentStreamState.Starting; + + const dispatchIfNotClosed = this.getCloseGuardedDispatcher(this.closeCount); + + // TODO(mikelehen): Just use dispatchIfNotClosed, but see TODO below. + const closeCount = this.closeCount; this.credentialsProvider.getToken().then( token => { - // Normally we'd have to schedule the callback on the AsyncQueue. - // However, the following calls are safe to be called outside the - // AsyncQueue since they don't chain asynchronous calls - this.startStream(token); + // Stream can be stopped while waiting for authentication. + // TODO(mikelehen): We really should just use dispatchIfNotClosed + // and let this dispatch onto the queue, but that opened a spec test can + // of worms that I don't want to deal with in this PR. + if (this.closeCount === closeCount) { + // Normally we'd have to schedule the callback on the AsyncQueue. + // However, the following calls are safe to be called outside the + // AsyncQueue since they don't chain asynchronous calls + this.startStream(token); + } }, (error: Error) => { - this.queue.enqueue(async () => { - if (this.state !== PersistentStreamState.Stopped) { - // Stream can be stopped while waiting for authorization. - const rpcError = new FirestoreError( - Code.UNKNOWN, - 'Fetching auth token failed: ' + error.message - ); - return this.handleStreamClose(rpcError); - } + dispatchIfNotClosed(() => { + const rpcError = new FirestoreError( + Code.UNKNOWN, + 'Fetching auth token failed: ' + error.message + ); + return this.handleStreamClose(rpcError); }); } ); } private startStream(token: Token | null): void { - if (this.state === PersistentStreamState.Stopped) { - // Stream can be stopped while waiting for authorization. - return; - } - assert( - this.state === PersistentStreamState.Auth, - 'Trying to start stream in a non-auth state' + this.state === PersistentStreamState.Starting, + 'Trying to start stream in a non-starting state' ); - // Helper function to dispatch to AsyncQueue and make sure that any - // close will seem instantaneous and events are prevented from being - // raised after the close call - const dispatchIfStillActive = ( - stream: Stream, - fn: () => Promise - ) => { - this.queue.enqueue(async () => { - // Only raise events if the stream instance has not changed - if (this.stream === stream) { - return fn(); - } - }); - }; - // Only start stream if listener has not changed - if (this.listener !== null) { - const currentStream = this.startRpc(token); - this.stream = currentStream; - this.stream.onOpen(() => { - dispatchIfStillActive(currentStream, () => { - assert( - this.state === PersistentStreamState.Auth, - 'Expected stream to be in state auth, but was ' + this.state - ); - this.state = PersistentStreamState.Open; - return this.listener!.onOpen(); - }); + const dispatchIfNotClosed = this.getCloseGuardedDispatcher(this.closeCount); + + this.stream = this.startRpc(token); + this.stream.onOpen(() => { + dispatchIfNotClosed(() => { + assert( + this.state === PersistentStreamState.Starting, + 'Expected stream to be in state Starting, but was ' + this.state + ); + this.state = PersistentStreamState.Open; + return this.listener!.onOpen(); }); - this.stream.onClose((error: FirestoreError) => { - dispatchIfStillActive(currentStream, () => { - return this.handleStreamClose(error); - }); + }); + this.stream.onClose((error: FirestoreError) => { + dispatchIfNotClosed(() => { + return this.handleStreamClose(error); }); - this.stream.onMessage((msg: ReceiveType) => { - dispatchIfStillActive(currentStream, () => { - return this.onMessage(msg); - }); + }); + this.stream.onMessage((msg: ReceiveType) => { + dispatchIfNotClosed(() => { + return this.onMessage(msg); }); - } + }); } - private performBackoff(listener: ListenerType): void { + private performBackoff(): void { assert( this.state === PersistentStreamState.Error, - 'Should only perform backoff in an error case' + 'Should only perform backoff when in Error state' ); this.state = PersistentStreamState.Backoff; this.backoff.backoffAndRun(async () => { - if (this.state === PersistentStreamState.Stopped) { - // We should have canceled the backoff timer when the stream was - // closed, but just in case we make this a no-op. - return; - } + assert( + this.state === PersistentStreamState.Backoff, + 'Backoff elapsed but state is now: ' + this.state + ); this.state = PersistentStreamState.Initial; - this.start(listener); + this.start(); assert(this.isStarted(), 'PersistentStream should have started'); }); } @@ -495,6 +491,30 @@ export abstract class PersistentStream< // without a backoff accidentally, we set the stream to error in all cases. return this.close(PersistentStreamState.Error, error); } + + /** + * Returns a "dispatcher" function that dispatches operations onto the + * AsyncQueue but only runs them if closeCount remains unchanged. This allows + * us to turn auth / stream callbacks into no-ops if the stream is closed / + * re-opened, etc. + */ + private getCloseGuardedDispatcher( + startCloseCount: number + ): (fn: () => Promise) => void { + return (fn: () => Promise): void => { + this.queue.enqueue(() => { + if (this.closeCount === startCloseCount) { + return fn(); + } else { + log.debug( + LOG_TAG, + 'stream callback skipped by getCloseGuardedDispatcher.' + ); + return Promise.resolve(); + } + }); + }; + } } /** Listener for the PersistentWatchStream */ @@ -525,14 +545,16 @@ export class PersistentListenStream extends PersistentStream< queue: AsyncQueue, connection: Connection, credentials: CredentialsProvider, - private serializer: JsonProtoSerializer + private serializer: JsonProtoSerializer, + listener: WatchStreamListener ) { super( queue, TimerId.ListenStreamConnectionBackoff, TimerId.ListenStreamIdle, connection, - credentials + credentials, + listener ); } @@ -633,14 +655,16 @@ export class PersistentWriteStream extends PersistentStream< queue: AsyncQueue, connection: Connection, credentials: CredentialsProvider, - private serializer: JsonProtoSerializer + private serializer: JsonProtoSerializer, + listener: WriteStreamListener ) { super( queue, TimerId.WriteStreamConnectionBackoff, TimerId.WriteStreamIdle, connection, - credentials + credentials, + listener ); } @@ -663,9 +687,9 @@ export class PersistentWriteStream extends PersistentStream< } // Override of PersistentStream.start - start(listener: WriteStreamListener): void { + start(): void { this.handshakeComplete_ = false; - super.start(listener); + super.start(); } protected tearDown(): void { diff --git a/packages/firestore/src/remote/remote_store.ts b/packages/firestore/src/remote/remote_store.ts index ea24b364eeb..d54189a9ba9 100644 --- a/packages/firestore/src/remote/remote_store.ts +++ b/packages/firestore/src/remote/remote_store.ts @@ -107,8 +107,10 @@ export class RemoteStore implements TargetMetadataProvider { */ private listenTargets: { [targetId: number]: QueryData } = {}; - private watchStream: PersistentListenStream = null; - private writeStream: PersistentWriteStream = null; + private networkEnabled = false; + + private watchStream: PersistentListenStream; + private writeStream: PersistentWriteStream; private watchChangeAggregator: WatchChangeAggregator = null; private onlineStateTracker: OnlineStateTracker; @@ -127,6 +129,20 @@ export class RemoteStore implements TargetMetadataProvider { asyncQueue, onlineStateHandler ); + + // Create streams (but note they're not started yet). + this.watchStream = this.datastore.newPersistentWatchStream({ + onOpen: this.onWatchStreamOpen.bind(this), + onClose: this.onWatchStreamClose.bind(this), + onWatchChange: this.onWatchStreamChange.bind(this) + }); + + this.writeStream = this.datastore.newPersistentWriteStream({ + onOpen: this.onWriteStreamOpen.bind(this), + onClose: this.onWriteStreamClose.bind(this), + onHandshakeComplete: this.onWriteHandshakeComplete.bind(this), + onMutationResult: this.onMutationResult.bind(this) + }); } /** SyncEngine to notify of watch and write events. */ @@ -136,31 +152,15 @@ export class RemoteStore implements TargetMetadataProvider { * Starts up the remote store, creating streams, restoring state from * LocalStore, etc. */ - start(): Promise { - return this.enableNetwork(); - } - - private isNetworkEnabled(): boolean { - assert( - (this.watchStream == null) === (this.writeStream == null), - 'WatchStream and WriteStream should both be null or non-null' - ); - return this.watchStream != null; + async start(): Promise { + await this.enableNetwork(); } /** Re-enables the network. Idempotent. */ - enableNetwork(): Promise { - if (this.isNetworkEnabled()) { - return Promise.resolve(); - } - - // Create new streams (but note they're not started yet). - this.watchStream = this.datastore.newPersistentWatchStream(); - this.writeStream = this.datastore.newPersistentWriteStream(); - - // Load any saved stream token from persistent storage - return this.localStore.getLastStreamToken().then(token => { - this.writeStream.lastStreamToken = token; + async enableNetwork(): Promise { + if (!this.networkEnabled) { + this.networkEnabled = true; + this.writeStream.lastStreamToken = await this.localStore.getLastStreamToken(); if (this.shouldStartWatchStream()) { this.startWatchStream(); @@ -168,8 +168,9 @@ export class RemoteStore implements TargetMetadataProvider { this.onlineStateTracker.set(OnlineState.Unknown); } - return this.fillWritePipeline(); // This may start the writeStream. - }); + // This will start the write stream if necessary. + await this.fillWritePipeline(); + } } /** @@ -177,41 +178,37 @@ export class RemoteStore implements TargetMetadataProvider { * enableNetwork(). */ async disableNetwork(): Promise { - this.disableNetworkInternal(); + await this.disableNetworkInternal(); + // Set the OnlineState to Offline so get()s return from cache, etc. this.onlineStateTracker.set(OnlineState.Offline); } - /** - * Disables the network, if it is currently enabled. - */ - private disableNetworkInternal(): void { - if (this.isNetworkEnabled()) { - // NOTE: We're guaranteed not to get any further events from these streams (not even a close - // event). - this.watchStream.stop(); - this.writeStream.stop(); + private async disableNetworkInternal(): Promise { + if (this.networkEnabled) { + this.networkEnabled = false; - this.cleanUpWatchStreamState(); + this.writeStream.stop(); + this.watchStream.stop(); - log.debug( - LOG_TAG, - 'Stopping write stream with ' + - this.writePipeline.length + - ' pending writes' - ); - // TODO(mikelehen): We only actually need to clear the write pipeline if - // this is being called as part of handleUserChange(). Consider reworking. - this.writePipeline = []; + if (this.writePipeline.length > 0) { + log.debug( + LOG_TAG, + `Stopping write stream with ${ + this.writePipeline.length + } pending writes` + ); + this.writePipeline = []; + } - this.writeStream = null; - this.watchStream = null; + this.cleanUpWatchStreamState(); } } shutdown(): Promise { log.debug(LOG_TAG, 'RemoteStore shutting down.'); this.disableNetworkInternal(); + // Set the OnlineState to Unknown (rather than Offline) to avoid potentially // triggering spurious listener events with cached data, etc. this.onlineStateTracker.set(OnlineState.Unknown); @@ -230,7 +227,7 @@ export class RemoteStore implements TargetMetadataProvider { if (this.shouldStartWatchStream()) { // The listen will be sent in onWatchStreamOpen this.startWatchStream(); - } else if (this.isNetworkEnabled() && this.watchStream.isOpen()) { + } else if (this.watchStream.isOpen()) { this.sendWatchRequest(queryData); } } @@ -242,7 +239,7 @@ export class RemoteStore implements TargetMetadataProvider { 'unlisten called without assigned target ID!' ); delete this.listenTargets[targetId]; - if (this.isNetworkEnabled() && this.watchStream.isOpen()) { + if (this.watchStream.isOpen()) { this.sendUnwatchRequest(targetId); if (objUtils.isEmpty(this.listenTargets)) { this.watchStream.markIdle(); @@ -282,15 +279,11 @@ export class RemoteStore implements TargetMetadataProvider { private startWatchStream(): void { assert( this.shouldStartWatchStream(), - 'startWriteStream() called when shouldStartWatchStream() is false.' + 'startWatchStream() called when shouldStartWatchStream() is false.' ); this.watchChangeAggregator = new WatchChangeAggregator(this); - this.watchStream.start({ - onOpen: this.onWatchStreamOpen.bind(this), - onClose: this.onWatchStreamClose.bind(this), - onWatchChange: this.onWatchStreamChange.bind(this) - }); + this.watchStream.start(); this.onlineStateTracker.handleWatchStreamStart(); } @@ -300,39 +293,43 @@ export class RemoteStore implements TargetMetadataProvider { */ private shouldStartWatchStream(): boolean { return ( - this.isNetworkEnabled() && + this.canUseNetwork() && !this.watchStream.isStarted() && !objUtils.isEmpty(this.listenTargets) ); } + private canUseNetwork(): boolean { + // TODO(mikelehen): This could take into account isPrimary when we merge + // with multitab. + return this.networkEnabled; + } + private cleanUpWatchStreamState(): void { this.watchChangeAggregator = null; } private async onWatchStreamOpen(): Promise { - // TODO(b/35852690): close the stream again (with some timeout?) if no watch - // targets are active objUtils.forEachNumber(this.listenTargets, (targetId, queryData) => { this.sendWatchRequest(queryData); }); } private async onWatchStreamClose(error?: FirestoreError): Promise { - assert( - this.isNetworkEnabled(), - 'onWatchStreamClose() should only be called when the network is enabled' - ); + if (error === undefined) { + // Graceful stop (due to stop() or idle timeout). Make sure that's + // desirable. + assert( + !this.shouldStartWatchStream(), + 'Watch stream was stopped gracefully while still needed.' + ); + } this.cleanUpWatchStreamState(); // If we still need the watch stream, retry the connection. if (this.shouldStartWatchStream()) { - // There should generally be an error if the watch stream was closed when - // it's still needed, but it's not quite worth asserting. - if (error) { - this.onlineStateTracker.handleWatchStreamFailure(error); - } + this.onlineStateTracker.handleWatchStreamFailure(error); this.startWatchStream(); } else { @@ -480,28 +477,32 @@ export class RemoteStore implements TargetMetadataProvider { this.writePipeline.length > 0 ? this.writePipeline[this.writePipeline.length - 1].batchId : BATCHID_UNKNOWN; - return this.localStore - .nextMutationBatch(lastBatchIdRetrieved) - .then(batch => { - if (batch === null) { - if (this.writePipeline.length === 0) { - this.writeStream.markIdle(); - } - } else { - this.addToWritePipeline(batch); - return this.fillWritePipeline(); - } - }); + const batch = await this.localStore.nextMutationBatch( + lastBatchIdRetrieved + ); + + if (batch === null) { + if (this.writePipeline.length === 0) { + this.writeStream.markIdle(); + } + } else { + this.addToWritePipeline(batch); + await this.fillWritePipeline(); + } + } + + if (this.shouldStartWriteStream()) { + this.startWriteStream(); } } /** - * Returns true if we can add to the write pipeline (i.e. it is not full and - * the network is enabled). + * Returns true if we can add to the write pipeline (i.e. the network is + * enabled and the write pipeline is not full). */ private canAddToWritePipeline(): boolean { return ( - this.isNetworkEnabled() && this.writePipeline.length < MAX_PENDING_WRITES + this.networkEnabled && this.writePipeline.length < MAX_PENDING_WRITES ); } @@ -512,8 +513,7 @@ export class RemoteStore implements TargetMetadataProvider { /** * Queues additional writes to be sent to the write stream, sending them - * immediately if the write stream is established, else starting the write - * stream if it is not yet started. + * immediately if the write stream is established. */ private addToWritePipeline(batch: MutationBatch): void { assert( @@ -522,16 +522,14 @@ export class RemoteStore implements TargetMetadataProvider { ); this.writePipeline.push(batch); - if (this.shouldStartWriteStream()) { - this.startWriteStream(); - } else if (this.isNetworkEnabled() && this.writeStream.handshakeComplete) { + if (this.writeStream.isOpen() && this.writeStream.handshakeComplete) { this.writeStream.writeMutations(batch.mutations); } } private shouldStartWriteStream(): boolean { return ( - this.isNetworkEnabled() && + this.canUseNetwork() && !this.writeStream.isStarted() && this.writePipeline.length > 0 ); @@ -542,12 +540,7 @@ export class RemoteStore implements TargetMetadataProvider { this.shouldStartWriteStream(), 'startWriteStream() called when shouldStartWriteStream() is false.' ); - this.writeStream.start({ - onOpen: this.onWriteStreamOpen.bind(this), - onClose: this.onWriteStreamClose.bind(this), - onHandshakeComplete: this.onWriteHandshakeComplete.bind(this), - onMutationResult: this.onMutationResult.bind(this) - }); + this.writeStream.start(); } private async onWriteStreamOpen(): Promise { @@ -591,10 +584,14 @@ export class RemoteStore implements TargetMetadataProvider { } private async onWriteStreamClose(error?: FirestoreError): Promise { - assert( - this.isNetworkEnabled(), - 'onWriteStreamClose() should only be called when the network is enabled' - ); + if (error === undefined) { + // Graceful stop (due to stop() or idle timeout). Make sure that's + // desirable. + assert( + !this.shouldStartWriteStream(), + 'Write stream was stopped gracefully while still needed.' + ); + } // If the write stream closed due to an error, invoke the error callbacks if // there are pending writes. @@ -667,18 +664,16 @@ export class RemoteStore implements TargetMetadataProvider { return new Transaction(this.datastore); } - handleUserChange(user: User): Promise { + async handleUserChange(user: User): Promise { log.debug(LOG_TAG, 'RemoteStore changing users: uid=', user.uid); - // If the network has been explicitly disabled, make sure we don't - // accidentally re-enable it. - if (this.isNetworkEnabled()) { + if (this.networkEnabled) { // Tear down and re-create our network streams. This will ensure we get a fresh auth token // for the new user and re-fill the write pipeline with new mutations from the LocalStore // (since mutations are per-user). this.disableNetworkInternal(); this.onlineStateTracker.set(OnlineState.Unknown); - return this.enableNetwork(); + await this.enableNetwork(); } } } diff --git a/packages/firestore/test/integration/remote/stream.test.ts b/packages/firestore/test/integration/remote/stream.test.ts index 24997b816ce..94d2c747dcd 100644 --- a/packages/firestore/test/integration/remote/stream.test.ts +++ b/packages/firestore/test/integration/remote/stream.test.ts @@ -134,20 +134,20 @@ describe('Watch Stream', () => { }); /** - * Verifies that the watch stream does not issue an onClose callback after a + * Verifies that the watch stream issues an onClose callback after a * call to stop(). */ it('can be stopped before handshake', () => { let watchStream: PersistentListenStream; return withTestDatastore(ds => { - watchStream = ds.newPersistentWatchStream(); - watchStream.start(streamListener); + watchStream = ds.newPersistentWatchStream(streamListener); + watchStream.start(); return streamListener.awaitCallback('open').then(() => { - // Stop must not call onClose because the full implementation of the callback could - // attempt to restart the stream in the event it had pending watches. watchStream.stop(); + + return streamListener.awaitCallback('close'); }); }); }); @@ -183,22 +183,20 @@ describe('Write Stream', () => { }); /** - * Verifies that the write stream does not issue an onClose callback after a - * call to stop(). + * Verifies that the write stream issues an onClose callback after a call to + * stop(). */ it('can be stopped before handshake', () => { let writeStream: PersistentWriteStream; return withTestDatastore(ds => { - writeStream = ds.newPersistentWriteStream(); - writeStream.start(streamListener); + writeStream = ds.newPersistentWriteStream(streamListener); + writeStream.start(); return streamListener.awaitCallback('open'); }).then(() => { - // Don't start the handshake. - - // Stop must not call onClose because the full implementation of the callback could - // attempt to restart the stream in the event it had pending writes. writeStream.stop(); + + return streamListener.awaitCallback('close'); }); }); @@ -206,8 +204,8 @@ describe('Write Stream', () => { let writeStream: PersistentWriteStream; return withTestDatastore(ds => { - writeStream = ds.newPersistentWriteStream(); - writeStream.start(streamListener); + writeStream = ds.newPersistentWriteStream(streamListener); + writeStream.start(); return streamListener.awaitCallback('open'); }) .then(() => { @@ -225,6 +223,8 @@ describe('Write Stream', () => { }) .then(() => { writeStream.stop(); + + return streamListener.awaitCallback('close'); }); }); @@ -232,8 +232,8 @@ describe('Write Stream', () => { const queue = new AsyncQueue(); return withTestDatastore(ds => { - const writeStream = ds.newPersistentWriteStream(); - writeStream.start(streamListener); + const writeStream = ds.newPersistentWriteStream(streamListener); + writeStream.start(); return streamListener .awaitCallback('open') .then(() => { @@ -259,8 +259,8 @@ describe('Write Stream', () => { const queue = new AsyncQueue(); return withTestDatastore(ds => { - const writeStream = ds.newPersistentWriteStream(); - writeStream.start(streamListener); + const writeStream = ds.newPersistentWriteStream(streamListener); + writeStream.start(); return streamListener .awaitCallback('open') .then(() => { @@ -288,8 +288,8 @@ describe('Write Stream', () => { return withTestDatastore( ds => { - const writeStream = ds.newPersistentWriteStream(); - writeStream.start(streamListener); + const writeStream = ds.newPersistentWriteStream(streamListener); + writeStream.start(); return streamListener .awaitCallback('open') .then(() => { @@ -301,7 +301,7 @@ describe('Write Stream', () => { return streamListener.awaitCallback('close'); }) .then(() => { - writeStream.start(streamListener); + writeStream.start(); return streamListener.awaitCallback('open'); }) .then(() => { @@ -312,7 +312,7 @@ describe('Write Stream', () => { return streamListener.awaitCallback('close'); }) .then(() => { - writeStream.start(streamListener); + writeStream.start(); return streamListener.awaitCallback('open'); }) .then(() => { diff --git a/packages/firestore/test/unit/specs/spec_test_runner.ts b/packages/firestore/test/unit/specs/spec_test_runner.ts index fcfc6daad8a..6865c2339e0 100644 --- a/packages/firestore/test/unit/specs/spec_test_runner.ts +++ b/packages/firestore/test/unit/specs/spec_test_runner.ts @@ -424,9 +424,11 @@ abstract class TestRunner { protected abstract destroyPersistence(): Promise; async shutdown(): Promise { - await this.remoteStore.shutdown(); - await this.persistence.shutdown(/* deleteData= */ true); - await this.destroyPersistence(); + await this.queue.enqueue(async () => { + await this.remoteStore.shutdown(); + await this.persistence.shutdown(/* deleteData= */ true); + await this.destroyPersistence(); + }); } run(steps: SpecStep[]): Promise { From ee3bec1dc2ad98eb8c7bc92b61377b6c440750b1 Mon Sep 17 00:00:00 2001 From: Sebastian Schmidt Date: Wed, 1 Aug 2018 10:44:02 -0700 Subject: [PATCH 4/4] Merging Persistent Stream refactor (#1069) * Merging PersistentStream refactor * [AUTOMATED]: Prettier Code Styling * Typo * Remove canUseNetwork state. (#1076) * Merging the latest merge into the previous merge (#1077) * Implement global resume token (#1052) * Add a spec test that shows correct global resume token handling * Minimum implementation to handle global resume tokens * Remove unused QueryView.resumeToken * Avoid persisting the resume token unless required * Persist the resume token on unlisten * Add a type parameter to Persistence (#1047) * Cherry pick sequence number starting point * Working on typed transactions * Start plumbing in sequence number * Back out sequence number changes * [AUTOMATED]: Prettier Code Styling * Fix tests * [AUTOMATED]: Prettier Code Styling * Fix lint * [AUTOMATED]: Prettier Code Styling * Uncomment line * MemoryPersistenceTransaction -> MemoryTransaction * [AUTOMATED]: Prettier Code Styling * Review updates * Style * Lint and style * Review feedback * [AUTOMATED]: Prettier Code Styling * Revert some unintentional import churn * Line 44 should definitely be empty * Checkpoint before adding helper function for stores * Use a helper for casting PersistenceTransaction to IndexedDbTransaction * [AUTOMATED]: Prettier Code Styling * Remove errant generic type * Lint * Fix typo * Port optimizations to LocalDocumentsView from iOS (#1055) * add a method to find batches affecting a set of keys (port of [1479](https://github.com/firebase/firebase-ios-sdk/pull/1479)); * use the newly-added method to avoid rereading batches when getting documents in `LocalDocumentsView` (port of [1505](https://github.com/firebase/firebase-ios-sdk/pull/1505)); * avoid rereading batches when searching for documents in a collection (port of [1533](https://github.com/firebase/firebase-ios-sdk/pull/1533)). Speedup was measured by running tests in browser and checking time spent writing 10 batches of 500 mutations each, and then querying the resulting 5K docs collection from cache in offline mode. For this case, the writing speedup is about 3x, and querying speedup is about 6x (see PR for more details). * Add a CHANGELOG entry for #1052 (#1071) * Add a CHANGELOG entry for #1052 * Add notes for #1055 * Rename idleTimer and fix comments. (#1068) * Merge (#1073) --- packages/firestore/CHANGELOG.md | 5 + packages/firestore/src/core/sync_engine.ts | 14 +- .../src/local/indexeddb_mutation_queue.ts | 122 +++++++++---- .../src/local/indexeddb_persistence.ts | 89 ++++++---- .../src/local/indexeddb_query_cache.ts | 16 +- .../local/indexeddb_remote_document_cache.ts | 13 +- .../src/local/local_documents_view.ts | 160 +++++++----------- packages/firestore/src/local/local_store.ts | 82 ++++++++- .../src/local/memory_mutation_queue.ts | 32 +++- .../firestore/src/local/memory_persistence.ts | 9 +- .../firestore/src/local/mutation_queue.ts | 24 ++- packages/firestore/src/local/persistence.ts | 3 +- packages/firestore/src/local/simple_db.ts | 26 +-- packages/firestore/src/model/path.ts | 14 ++ packages/firestore/src/remote/datastore.ts | 3 +- .../firestore/src/remote/persistent_stream.ts | 60 +++---- packages/firestore/src/remote/remote_store.ts | 112 ++++-------- packages/firestore/src/remote/watch_change.ts | 18 +- .../local/eager_garbage_collector.test.ts | 2 +- .../test/unit/local/mutation_queue.test.ts | 133 +++++++++------ .../test/unit/local/query_cache.test.ts | 33 ++-- .../unit/local/remote_document_cache.test.ts | 40 ++--- .../remote_document_change_buffer.test.ts | 3 +- .../test/unit/local/test_mutation_queue.ts | 17 +- .../test/unit/specs/limbo_spec.test.ts | 118 ++++++------- .../test/unit/specs/listen_spec.test.ts | 84 +++++++++ .../test/unit/specs/spec_test_runner.ts | 13 +- .../test/unit/specs/write_spec.test.ts | 26 +++ 28 files changed, 752 insertions(+), 519 deletions(-) diff --git a/packages/firestore/CHANGELOG.md b/packages/firestore/CHANGELOG.md index ca0b5555545..a9180db70fa 100644 --- a/packages/firestore/CHANGELOG.md +++ b/packages/firestore/CHANGELOG.md @@ -1,4 +1,9 @@ # Unreleased +- [changed] Improved how Firestore handles idle queries to reduce the cost of + re-listening within 30 minutes. +- [changed] Improved offline performance with many outstanding writes. + +# 0.6.0 - [fixed] Fixed an issue where queries returned fewer results than they should, caused by documents that were cached as deleted when they should not have been (firebase/firebase-ios-sdk#1548). Because some cache data is cleared, diff --git a/packages/firestore/src/core/sync_engine.ts b/packages/firestore/src/core/sync_engine.ts index 212cd5cb86b..6319a2e85f5 100644 --- a/packages/firestore/src/core/sync_engine.ts +++ b/packages/firestore/src/core/sync_engine.ts @@ -50,7 +50,6 @@ import { MutationBatchState, OnlineState, OnlineStateSource, - ProtoByteString, TargetId } from './types'; import { @@ -88,12 +87,6 @@ class QueryView { * stream to identify this query. */ public targetId: TargetId, - /** - * An identifier from the datastore backend that indicates the last state - * of the results that was received. This can be used to indicate where - * to continue receiving new doc changes for the query. - */ - public resumeToken: ProtoByteString, /** * The view is responsible for computing the final merged truth of what * docs are in the query. It gets notified of local and remote changes, @@ -274,12 +267,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { 'applyChanges for new view should always return a snapshot' ); - const data = new QueryView( - query, - queryData.targetId, - queryData.resumeToken, - view - ); + const data = new QueryView(query, queryData.targetId, view); this.queryViewsByQuery.set(query, data); this.queryViewsByTarget[queryData.targetId] = data; return viewChange.snapshot!; diff --git a/packages/firestore/src/local/indexeddb_mutation_queue.ts b/packages/firestore/src/local/indexeddb_mutation_queue.ts index 5cf51246c27..1b0c902b22c 100644 --- a/packages/firestore/src/local/indexeddb_mutation_queue.ts +++ b/packages/firestore/src/local/indexeddb_mutation_queue.ts @@ -18,6 +18,7 @@ import { Timestamp } from '../api/timestamp'; import { User } from '../auth/user'; import { Query } from '../core/query'; import { BatchId, ProtoByteString } from '../core/types'; +import { DocumentKeySet } from '../model/collections'; import { DocumentKey } from '../model/document_key'; import { Mutation } from '../model/mutation'; import { BATCHID_UNKNOWN, MutationBatch } from '../model/mutation_batch'; @@ -40,8 +41,8 @@ import { LocalSerializer } from './local_serializer'; import { MutationQueue } from './mutation_queue'; import { PersistenceTransaction } from './persistence'; import { PersistencePromise } from './persistence_promise'; -import { SimpleDb, SimpleDbStore } from './simple_db'; -import { DocumentKeySet } from '../model/collections'; +import { SimpleDbStore } from './simple_db'; +import { IndexedDbPersistence } from './indexeddb_persistence'; /** A mutation queue for a specific user, backed by IndexedDB. */ export class IndexedDbMutationQueue implements MutationQueue { @@ -342,6 +343,50 @@ export class IndexedDbMutationQueue implements MutationQueue { .next(() => results); } + getAllMutationBatchesAffectingDocumentKeys( + transaction: PersistenceTransaction, + documentKeys: DocumentKeySet + ): PersistencePromise { + let uniqueBatchIDs = new SortedSet(primitiveComparator); + + const promises: Array> = []; + documentKeys.forEach(documentKey => { + const indexStart = DbDocumentMutation.prefixForPath( + this.userId, + documentKey.path + ); + const range = IDBKeyRange.lowerBound(indexStart); + + const promise = documentMutationsStore(transaction).iterate( + { range }, + (indexKey, _, control) => { + const [userID, encodedPath, batchID] = indexKey; + + // Only consider rows matching exactly the specific key of + // interest. Note that because we order by path first, and we + // order terminators before path separators, we'll encounter all + // the index rows for documentKey contiguously. In particular, all + // the rows for documentKey will occur before any rows for + // documents nested in a subcollection beneath documentKey so we + // can stop as soon as we hit any such row. + const path = EncodedResourcePath.decode(encodedPath); + if (userID !== this.userId || !documentKey.path.isEqual(path)) { + control.done(); + return; + } + + uniqueBatchIDs = uniqueBatchIDs.add(batchID); + } + ); + + promises.push(promise); + }); + + return PersistencePromise.waitFor(promises).next(() => + this.lookupMutationBatches(transaction, uniqueBatchIDs) + ); + } + getAllMutationBatchesAffectingQuery( transaction: PersistenceTransaction, query: Query @@ -393,34 +438,39 @@ export class IndexedDbMutationQueue implements MutationQueue { } uniqueBatchIDs = uniqueBatchIDs.add(batchID); }) - .next(() => { - const results: MutationBatch[] = []; - const promises: Array> = []; - // TODO(rockwood): Implement this using iterate. - uniqueBatchIDs.forEach(batchId => { - promises.push( - mutationsStore(transaction) - .get(batchId) - .next(mutation => { - if (!mutation) { - fail( - 'Dangling document-mutation reference found, ' + - 'which points to ' + - batchId - ); - } - assert( - mutation.userId === this.userId, - `Unexpected user '${ - mutation.userId - }' for mutation batch ${batchId}` - ); - results.push(this.serializer.fromDbMutationBatch(mutation!)); - }) - ); - }); - return PersistencePromise.waitFor(promises).next(() => results); - }); + .next(() => this.lookupMutationBatches(transaction, uniqueBatchIDs)); + } + + private lookupMutationBatches( + transaction: PersistenceTransaction, + batchIDs: SortedSet + ): PersistencePromise { + const results: MutationBatch[] = []; + const promises: Array> = []; + // TODO(rockwood): Implement this using iterate. + batchIDs.forEach(batchId => { + promises.push( + mutationsStore(transaction) + .get(batchId) + .next(mutation => { + if (mutation === null) { + fail( + 'Dangling document-mutation reference found, ' + + 'which points to ' + + batchId + ); + } + assert( + mutation.userId === this.userId, + `Unexpected user '${ + mutation.userId + }' for mutation batch ${batchId}` + ); + results.push(this.serializer.fromDbMutationBatch(mutation!)); + }) + ); + }); + return PersistencePromise.waitFor(promises).next(() => results); } removeMutationBatches( @@ -567,7 +617,7 @@ function convertStreamToken(token: ProtoByteString): string { function mutationsStore( txn: PersistenceTransaction ): SimpleDbStore { - return SimpleDb.getStore( + return IndexedDbPersistence.getStore( txn, DbMutationBatch.store ); @@ -579,10 +629,10 @@ function mutationsStore( function documentMutationsStore( txn: PersistenceTransaction ): SimpleDbStore { - return SimpleDb.getStore( - txn, - DbDocumentMutation.store - ); + return IndexedDbPersistence.getStore< + DbDocumentMutationKey, + DbDocumentMutation + >(txn, DbDocumentMutation.store); } /** @@ -591,7 +641,7 @@ function documentMutationsStore( function mutationQueuesStore( txn: PersistenceTransaction ): SimpleDbStore { - return SimpleDb.getStore( + return IndexedDbPersistence.getStore( txn, DbMutationQueue.store ); diff --git a/packages/firestore/src/local/indexeddb_persistence.ts b/packages/firestore/src/local/indexeddb_persistence.ts index a60d3dc883d..830df9b30ab 100644 --- a/packages/firestore/src/local/indexeddb_persistence.ts +++ b/packages/firestore/src/local/indexeddb_persistence.ts @@ -17,7 +17,7 @@ import { User } from '../auth/user'; import { DatabaseInfo } from '../core/database_info'; import { JsonProtoSerializer } from '../remote/serializer'; -import { assert } from '../util/assert'; +import { assert, fail } from '../util/assert'; import { Code, FirestoreError } from '../util/error'; import * as log from '../util/log'; @@ -83,6 +83,12 @@ const UNSUPPORTED_PLATFORM_ERROR_MSG = // firestore_zombie__ const ZOMBIED_CLIENTS_KEY_PREFIX = 'firestore_zombie'; +export class IndexedDbTransaction extends PersistenceTransaction { + constructor(readonly simpleDbTransaction: SimpleDbTransaction) { + super(); + } +} + /** * An IndexedDB-backed instance of Persistence. Data is stored persistently * across sessions. @@ -115,6 +121,17 @@ const ZOMBIED_CLIENTS_KEY_PREFIX = 'firestore_zombie'; * TODO(multitab): Update this comment with multi-tab changes. */ export class IndexedDbPersistence implements Persistence { + static getStore( + txn: PersistenceTransaction, + store: string + ): SimpleDbStore { + if (txn instanceof IndexedDbTransaction) { + return SimpleDb.getStore(txn.simpleDbTransaction, store); + } else { + fail('IndexedDbPersistence must use instances of IndexedDbTransaction'); + } + } + /** * The name of the main (and currently only) IndexedDB database. this name is * appended to the prefix provided to the IndexedDbPersistence constructor. @@ -470,7 +487,7 @@ export class IndexedDbPersistence implements Persistence { action: string, requirePrimaryLease: boolean, transactionOperation: ( - transaction: PersistenceTransaction + transaction: IndexedDbTransaction ) => PersistencePromise ): Promise { // TODO(multitab): Consider removing `requirePrimaryLease` and exposing @@ -483,39 +500,47 @@ export class IndexedDbPersistence implements Persistence { // Do all transactions as readwrite against all object stores, since we // are the only reader/writer. - return this.simpleDb.runTransaction('readwrite', ALL_STORES, txn => { - if (requirePrimaryLease) { - // While we merely verify that we have (or can acquire) the lease - // immediately, we wait to extend the primary lease until after - // executing transactionOperation(). This ensures that even if the - // transactionOperation takes a long time, we'll use a recent - // leaseTimestampMs in the extended (or newly acquired) lease. - return this.canActAsPrimary(txn) - .next(canActAsPrimary => { - if (!canActAsPrimary) { - // TODO(multitab): Handle this gracefully and transition back to - // secondary state. - log.error( - `Failed to obtain primary lease for action '${action}'.` + return this.simpleDb.runTransaction( + 'readwrite', + ALL_STORES, + simpleDbTxn => { + if (requirePrimaryLease) { + // While we merely verify that we have (or can acquire) the lease + // immediately, we wait to extend the primary lease until after + // executing transactionOperation(). This ensures that even if the + // transactionOperation takes a long time, we'll use a recent + // leaseTimestampMs in the extended (or newly acquired) lease. + return this.canActAsPrimary(simpleDbTxn) + .next(canActAsPrimary => { + if (!canActAsPrimary) { + // TODO(multitab): Handle this gracefully and transition back to + // secondary state. + log.error( + `Failed to obtain primary lease for action '${action}'.` + ); + this.isPrimary = false; + this.queue.enqueue(() => this.primaryStateListener(false)); + throw new FirestoreError( + Code.FAILED_PRECONDITION, + PRIMARY_LEASE_LOST_ERROR_MSG + ); + } + return transactionOperation( + new IndexedDbTransaction(simpleDbTxn) ); - this.isPrimary = false; - this.queue.enqueue(() => this.primaryStateListener(false)); - throw new FirestoreError( - Code.FAILED_PRECONDITION, - PRIMARY_LEASE_LOST_ERROR_MSG + }) + .next(result => { + return this.acquireOrExtendPrimaryLease(simpleDbTxn).next( + () => result ); - } - return transactionOperation(txn); - }) - .next(result => { - return this.acquireOrExtendPrimaryLease(txn).next(() => result); - }); - } else { - return this.verifyAllowTabSynchronization(txn).next(() => - transactionOperation(txn) - ); + }); + } else { + return this.verifyAllowTabSynchronization(simpleDbTxn).next(() => + transactionOperation(new IndexedDbTransaction(simpleDbTxn)) + ); + } } - }); + ); } /** diff --git a/packages/firestore/src/local/indexeddb_query_cache.ts b/packages/firestore/src/local/indexeddb_query_cache.ts index 79e107ad067..e268a7680a9 100644 --- a/packages/firestore/src/local/indexeddb_query_cache.ts +++ b/packages/firestore/src/local/indexeddb_query_cache.ts @@ -38,8 +38,9 @@ import { PersistenceTransaction } from './persistence'; import { PersistencePromise } from './persistence_promise'; import { QueryCache } from './query_cache'; import { QueryData } from './query_data'; -import { SimpleDb, SimpleDbStore } from './simple_db'; import { TargetIdGenerator } from '../core/target_id_generator'; +import { SimpleDbStore } from './simple_db'; +import { IndexedDbPersistence } from './indexeddb_persistence'; export class IndexedDbQueryCache implements QueryCache { constructor(private serializer: LocalSerializer) {} @@ -221,7 +222,7 @@ export class IndexedDbQueryCache implements QueryCache { targetId: TargetId ): PersistencePromise { // PORTING NOTE: The reverse index (documentsTargets) is maintained by - // Indexeddb. + // IndexedDb. const promises: Array> = []; const store = documentTargetStore(txn); keys.forEach(key => { @@ -316,6 +317,8 @@ export class IndexedDbQueryCache implements QueryCache { this.garbageCollector = gc; } + // TODO(gsoltis): we can let the compiler assert that txn !== null if we + // drop null from the type bounds on txn. containsKey( txn: PersistenceTransaction | null, key: DocumentKey @@ -369,7 +372,10 @@ export class IndexedDbQueryCache implements QueryCache { function targetsStore( txn: PersistenceTransaction ): SimpleDbStore { - return SimpleDb.getStore(txn, DbTarget.store); + return IndexedDbPersistence.getStore( + txn, + DbTarget.store + ); } /** @@ -378,7 +384,7 @@ function targetsStore( function globalTargetStore( txn: PersistenceTransaction ): SimpleDbStore { - return SimpleDb.getStore( + return IndexedDbPersistence.getStore( txn, DbTargetGlobal.store ); @@ -390,7 +396,7 @@ function globalTargetStore( function documentTargetStore( txn: PersistenceTransaction ): SimpleDbStore { - return SimpleDb.getStore( + return IndexedDbPersistence.getStore( txn, DbTargetDocument.store ); diff --git a/packages/firestore/src/local/indexeddb_remote_document_cache.ts b/packages/firestore/src/local/indexeddb_remote_document_cache.ts index ef4bebdea80..53a4340c1b1 100644 --- a/packages/firestore/src/local/indexeddb_remote_document_cache.ts +++ b/packages/firestore/src/local/indexeddb_remote_document_cache.ts @@ -31,13 +31,14 @@ import { DbRemoteDocumentChanges, DbRemoteDocumentChangesKey } from './indexeddb_schema'; +import { IndexedDbPersistence } from './indexeddb_persistence'; import { LocalSerializer } from './local_serializer'; import { PersistenceTransaction } from './persistence'; import { PersistencePromise } from './persistence_promise'; import { RemoteDocumentCache } from './remote_document_cache'; -import { SimpleDb, SimpleDbStore } from './simple_db'; import { SnapshotVersion } from '../core/snapshot_version'; import { assert } from '../util/assert'; +import { SimpleDbStore } from './simple_db'; export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache { /** The last id read by `getNewDocumentChanges()`. */ @@ -191,7 +192,7 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache { function remoteDocumentsStore( txn: PersistenceTransaction ): SimpleDbStore { - return SimpleDb.getStore( + return IndexedDbPersistence.getStore( txn, DbRemoteDocument.store ); @@ -204,10 +205,10 @@ function remoteDocumentsStore( function documentChangesStore( txn: PersistenceTransaction ): SimpleDbStore { - return SimpleDb.getStore( - txn, - DbRemoteDocumentChanges.store - ); + return IndexedDbPersistence.getStore< + DbRemoteDocumentChangesKey, + DbRemoteDocumentChanges + >(txn, DbRemoteDocumentChanges.store); } function dbKey(docKey: DocumentKey): DbRemoteDocumentKey { diff --git a/packages/firestore/src/local/local_documents_view.ts b/packages/firestore/src/local/local_documents_view.ts index 6466e8a26c2..78f33d9e050 100644 --- a/packages/firestore/src/local/local_documents_view.ts +++ b/packages/firestore/src/local/local_documents_view.ts @@ -17,7 +17,6 @@ import { Query } from '../core/query'; import { SnapshotVersion } from '../core/snapshot_version'; import { - documentKeySet, DocumentKeySet, DocumentMap, documentMap, @@ -26,6 +25,7 @@ import { } from '../model/collections'; import { Document, MaybeDocument, NoDocument } from '../model/document'; import { DocumentKey } from '../model/document_key'; +import { MutationBatch } from '../model/mutation_batch'; import { ResourcePath } from '../model/path'; import { fail } from '../util/assert'; @@ -56,11 +56,23 @@ export class LocalDocumentsView { transaction: PersistenceTransaction, key: DocumentKey ): PersistencePromise { - return this.remoteDocumentCache - .getEntry(transaction, key) - .next(remoteDoc => { - return this.computeLocalDocument(transaction, key, remoteDoc); - }); + return this.mutationQueue + .getAllMutationBatchesAffectingDocumentKey(transaction, key) + .next(batches => this.getDocumentInternal(transaction, key, batches)); + } + + /** Internal version of `getDocument` that allows reusing batches. */ + private getDocumentInternal( + transaction: PersistenceTransaction, + key: DocumentKey, + inBatches: MutationBatch[] + ): PersistencePromise { + return this.remoteDocumentCache.getEntry(transaction, key).next(doc => { + for (const batch of inBatches) { + doc = batch.applyToLocalView(key, doc); + } + return doc; + }); } /** @@ -73,20 +85,29 @@ export class LocalDocumentsView { transaction: PersistenceTransaction, keys: DocumentKeySet ): PersistencePromise { - const promises = [] as Array>; - let results = maybeDocumentMap(); - keys.forEach(key => { - promises.push( - this.getDocument(transaction, key).next(maybeDoc => { - // TODO(http://b/32275378): Don't conflate missing / deleted. - if (!maybeDoc) { - maybeDoc = new NoDocument(key, SnapshotVersion.forDeletedDoc()); - } - results = results.insert(key, maybeDoc); - }) - ); - }); - return PersistencePromise.waitFor(promises).next(() => results); + return this.mutationQueue + .getAllMutationBatchesAffectingDocumentKeys(transaction, keys) + .next(batches => { + const promises = [] as Array>; + let results = maybeDocumentMap(); + keys.forEach(key => { + promises.push( + this.getDocumentInternal(transaction, key, batches).next( + maybeDoc => { + // TODO(http://b/32275378): Don't conflate missing / deleted. + if (!maybeDoc) { + maybeDoc = new NoDocument( + key, + SnapshotVersion.forDeletedDoc() + ); + } + results = results.insert(key, maybeDoc); + } + ) + ); + }); + return PersistencePromise.waitFor(promises).next(() => results); + }); } /** @@ -126,48 +147,40 @@ export class LocalDocumentsView { query: Query ): PersistencePromise { // Query the remote documents and overlay mutations. - // TODO(mikelehen): There may be significant overlap between the mutations - // affecting these remote documents and the - // getAllMutationBatchesAffectingQuery() mutations. Consider optimizing. let results: DocumentMap; return this.remoteDocumentCache .getDocumentsMatchingQuery(transaction, query) .next(queryResults => { - return this.computeLocalDocuments(transaction, queryResults); - }) - .next(promisedResults => { - results = promisedResults; - // Now use the mutation queue to discover any other documents that may - // match the query after applying mutations. + results = queryResults; return this.mutationQueue.getAllMutationBatchesAffectingQuery( transaction, query ); }) .next(matchingMutationBatches => { - let matchingKeys = documentKeySet(); for (const batch of matchingMutationBatches) { for (const mutation of batch.mutations) { - // TODO(mikelehen): PERF: Check if this mutation actually - // affects the query to reduce work. - if (!results.get(mutation.key)) { - matchingKeys = matchingKeys.add(mutation.key); + const key = mutation.key; + // Only process documents belonging to the collection. + if (!query.path.isImmediateParentOf(key.path)) { + continue; + } + + const baseDoc = results.get(key); + const mutatedDoc = mutation.applyToLocalView( + baseDoc, + baseDoc, + batch.localWriteTime + ); + if (!mutatedDoc || mutatedDoc instanceof NoDocument) { + results = results.remove(key); + } else if (mutatedDoc instanceof Document) { + results = results.insert(key, mutatedDoc); + } else { + fail('Unknown MaybeDocument: ' + mutatedDoc); } } } - - // Now add in the results for the matchingKeys. - const promises = [] as Array>; - matchingKeys.forEach(key => { - promises.push( - this.getDocument(transaction, key).next(doc => { - if (doc instanceof Document) { - results = results.insert(doc.key, doc); - } - }) - ); - }); - return PersistencePromise.waitFor(promises); }) .next(() => { // Finally, filter out any documents that don't actually match @@ -181,57 +194,4 @@ export class LocalDocumentsView { return results; }); } - - /** - * Takes a remote document and applies local mutations to generate the local - * view of the document. - * @param transaction The transaction in which to perform any persistence - * operations. - * @param documentKey The key of the document (necessary when remoteDocument - * is null). - * @param document The base remote document to apply mutations to or null. - */ - private computeLocalDocument( - transaction: PersistenceTransaction, - documentKey: DocumentKey, - document: MaybeDocument | null - ): PersistencePromise { - return this.mutationQueue - .getAllMutationBatchesAffectingDocumentKey(transaction, documentKey) - .next(batches => { - for (const batch of batches) { - document = batch.applyToLocalView(documentKey, document); - } - return document; - }); - } - - /** - * Takes a set of remote documents and applies local mutations to generate the - * local view of the documents. - * @param transaction The transaction in which to perform any persistence - * operations. - * @param documents The base remote documents to apply mutations to. - * @return The local view of the documents. - */ - private computeLocalDocuments( - transaction: PersistenceTransaction, - documents: DocumentMap - ): PersistencePromise { - const promises = [] as Array>; - documents.forEach((key, doc) => { - promises.push( - this.computeLocalDocument(transaction, key, doc).next(mutatedDoc => { - if (mutatedDoc instanceof Document) { - documents = documents.insert(mutatedDoc.key, mutatedDoc); - } else if (mutatedDoc instanceof NoDocument) { - documents = documents.remove(mutatedDoc.key); - } else { - fail('Unknown MaybeDocument: ' + mutatedDoc); - } - }) - ); - }); - return PersistencePromise.waitFor(promises).next(() => documents); - } } diff --git a/packages/firestore/src/local/local_store.ts b/packages/firestore/src/local/local_store.ts index 340c6bfb804..b258d9eb4ac 100644 --- a/packages/firestore/src/local/local_store.ts +++ b/packages/firestore/src/local/local_store.ts @@ -118,6 +118,15 @@ export interface UserChangeResult { * unrecoverable error (should be caught / reported by the async_queue). */ export class LocalStore { + /** + * The maximum time to leave a resume token buffered without writing it out. + * This value is arbitrary: it's long enough to avoid several writes + * (possibly indefinitely if updates come more frequently than this) but + * short enough that restarting after crashing will still have a pretty + * recent resume token. + */ + private static readonly RESUME_TOKEN_MAX_AGE_MICROS = 5 * 60 * 1e6; + /** * The set of all mutations that have been sent but not yet been applied to * the backend. @@ -543,12 +552,18 @@ export class LocalStore { // any preexisting value. const resumeToken = change.resumeToken; if (resumeToken.length > 0) { + const oldQueryData = queryData; queryData = queryData.copy({ resumeToken, snapshotVersion: remoteEvent.snapshotVersion }); this.targetIds[targetId] = queryData; - promises.push(this.queryCache.updateQueryData(txn, queryData)); + + if ( + LocalStore.shouldPersistQueryData(oldQueryData, queryData, change) + ) { + promises.push(this.queryCache.updateQueryData(txn, queryData)); + } } } ); @@ -630,6 +645,50 @@ export class LocalStore { }); } + /** + * Returns true if the newQueryData should be persisted during an update of + * an active target. QueryData should always be persisted when a target is + * being released and should not call this function. + * + * While the target is active, QueryData updates can be omitted when nothing + * about the target has changed except metadata like the resume token or + * snapshot version. Occasionally it's worth the extra write to prevent these + * values from getting too stale after a crash, but this doesn't have to be + * too frequent. + */ + private static shouldPersistQueryData( + oldQueryData: QueryData, + newQueryData: QueryData, + change: TargetChange + ): boolean { + // Avoid clearing any existing value + if (newQueryData.resumeToken.length === 0) return false; + + // Any resume token is interesting if there isn't one already. + if (oldQueryData.resumeToken.length === 0) return true; + + // Don't allow resume token changes to be buffered indefinitely. This + // allows us to be reasonably up-to-date after a crash and avoids needing + // to loop over all active queries on shutdown. Especially in the browser + // we may not get time to do anything interesting while the current tab is + // closing. + const timeDelta = + newQueryData.snapshotVersion.toMicroseconds() - + oldQueryData.snapshotVersion.toMicroseconds(); + if (timeDelta >= this.RESUME_TOKEN_MAX_AGE_MICROS) return true; + + // Otherwise if the only thing that has changed about a target is its resume + // token it's not worth persisting. Note that the RemoteStore keeps an + // in-memory view of the currently active targets which includes the current + // resume token, so stream failure or user changes will still use an + // up-to-date resume token regardless of what we do here. + const changes = + change.addedDocuments.size + + change.modifiedDocuments.size + + change.removedDocuments.size; + return changes > 0; + } + /** * Notify local store of the changed views to locally pin documents. */ @@ -732,10 +791,21 @@ export class LocalStore { queryData != null, 'Tried to release nonexistent query: ' + query ); - this.localViewReferences.removeReferencesForId(queryData!.targetId); - delete this.targetIds[queryData!.targetId]; + const targetId = queryData!.targetId; + const cachedQueryData = this.targetIds[targetId]; + + this.localViewReferences.removeReferencesForId(targetId); + delete this.targetIds[targetId]; if (!keepPersistedQueryData && this.garbageCollector.isEager) { return this.queryCache.removeQueryData(txn, queryData!); + } else if ( + cachedQueryData.snapshotVersion > queryData!.snapshotVersion + ) { + // If we've been avoiding persisting the resumeToken (see + // shouldPersistQueryData for conditions and rationale) we need to + // persist the token now because there will no longer be an + // in-memory version to fall back on. + return this.queryCache.updateQueryData(txn, cachedQueryData); } else { return PersistencePromise.resolve(); } @@ -748,12 +818,8 @@ export class LocalStore { this.remoteDocuments ); return this.releaseHeldBatchResults(txn, documentBuffer).next( - () => { - documentBuffer.apply(txn); - } + () => documentBuffer.apply(txn) ); - } else { - return PersistencePromise.resolve(); } }); } diff --git a/packages/firestore/src/local/memory_mutation_queue.ts b/packages/firestore/src/local/memory_mutation_queue.ts index d87876a86e1..ba5af955576 100644 --- a/packages/firestore/src/local/memory_mutation_queue.ts +++ b/packages/firestore/src/local/memory_mutation_queue.ts @@ -17,6 +17,7 @@ import { Timestamp } from '../api/timestamp'; import { Query } from '../core/query'; import { BatchId, ProtoByteString } from '../core/types'; +import { DocumentKeySet } from '../model/collections'; import { DocumentKey } from '../model/document_key'; import { Mutation } from '../model/mutation'; import { BATCHID_UNKNOWN, MutationBatch } from '../model/mutation_batch'; @@ -30,7 +31,6 @@ import { MutationQueue } from './mutation_queue'; import { PersistenceTransaction } from './persistence'; import { PersistencePromise } from './persistence_promise'; import { DocReference } from './reference_set'; -import { DocumentKeySet } from '../model/collections'; export class MemoryMutationQueue implements MutationQueue { /** @@ -249,6 +249,28 @@ export class MemoryMutationQueue implements MutationQueue { return PersistencePromise.resolve(result); } + getAllMutationBatchesAffectingDocumentKeys( + transaction: PersistenceTransaction, + documentKeys: DocumentKeySet + ): PersistencePromise { + let uniqueBatchIDs = new SortedSet(primitiveComparator); + + documentKeys.forEach(documentKey => { + const start = new DocReference(documentKey, 0); + const end = new DocReference(documentKey, Number.POSITIVE_INFINITY); + this.batchesByDocumentKey.forEachInRange([start, end], ref => { + assert( + documentKey.isEqual(ref.key), + "For each key, should only iterate over a single key's batches" + ); + + uniqueBatchIDs = uniqueBatchIDs.add(ref.targetOrBatchId); + }); + }); + + return PersistencePromise.resolve(this.findMutationBatches(uniqueBatchIDs)); + } + getAllMutationBatchesAffectingQuery( transaction: PersistenceTransaction, query: Query @@ -290,16 +312,20 @@ export class MemoryMutationQueue implements MutationQueue { } }, start); + return PersistencePromise.resolve(this.findMutationBatches(uniqueBatchIDs)); + } + + private findMutationBatches(batchIDs: SortedSet): MutationBatch[] { // Construct an array of matching batches, sorted by batchID to ensure that // multiple mutations affecting the same document key are applied in order. const result: MutationBatch[] = []; - uniqueBatchIDs.forEach(batchId => { + batchIDs.forEach(batchId => { const batch = this.findMutationBatch(batchId); if (batch !== null) { result.push(batch); } }); - return PersistencePromise.resolve(result); + return result; } removeMutationBatches( diff --git a/packages/firestore/src/local/memory_persistence.ts b/packages/firestore/src/local/memory_persistence.ts index e17e8536d4d..f5d69e54b4c 100644 --- a/packages/firestore/src/local/memory_persistence.ts +++ b/packages/firestore/src/local/memory_persistence.ts @@ -109,9 +109,12 @@ export class MemoryPersistence implements Persistence { ) => PersistencePromise ): Promise { debug(LOG_TAG, 'Starting transaction:', action); - return transactionOperation(new MemoryPersistenceTransaction()).toPromise(); + return transactionOperation(new MemoryTransaction()).toPromise(); } } -/** Dummy class since memory persistence doesn't actually use transactions. */ -class MemoryPersistenceTransaction implements PersistenceTransaction {} +/** + * Memory persistence is not actually transactional, but future implementations + * may have transaction-scoped state. + */ +export class MemoryTransaction implements PersistenceTransaction {} diff --git a/packages/firestore/src/local/mutation_queue.ts b/packages/firestore/src/local/mutation_queue.ts index a26ac14f1a2..4db5e60fa31 100644 --- a/packages/firestore/src/local/mutation_queue.ts +++ b/packages/firestore/src/local/mutation_queue.ts @@ -17,6 +17,7 @@ import { Timestamp } from '../api/timestamp'; import { Query } from '../core/query'; import { BatchId, ProtoByteString } from '../core/types'; +import { DocumentKeySet } from '../model/collections'; import { DocumentKey } from '../model/document_key'; import { Mutation } from '../model/mutation'; import { MutationBatch } from '../model/mutation_batch'; @@ -150,23 +151,44 @@ export interface MutationQueue extends GarbageSource { * document key, so when looping through the batch you'll need to check that * the mutation itself matches the key. * + * Batches are guaranteed to be in sorted order. + * * Note that because of this requirement implementations are free to return * mutation batches that don't contain the document key at all if it's * convenient. */ // TODO(mcg): This should really return an enumerator - // also for b/32992024, all backing stores should really index by document key getAllMutationBatchesAffectingDocumentKey( transaction: PersistenceTransaction, documentKey: DocumentKey ): PersistencePromise; + /** + * Finds all mutation batches that could possibly affect the given set of + * document keys. Not all mutations in a batch will necessarily affect each + * key, so when looping through the batch you'll need to check that the + * mutation itself matches the key. + * + * Batches are guaranteed to be in sorted order. + * + * Note that because of this requirement implementations are free to return + * mutation batches that don't contain any of the document keys at all if it's + * convenient. + */ + // TODO(mcg): This should really return an enumerator + getAllMutationBatchesAffectingDocumentKeys( + transaction: PersistenceTransaction, + documentKeys: DocumentKeySet + ): PersistencePromise; + /** * Finds all mutation batches that could affect the results for the given * query. Not all mutations in a batch will necessarily affect the query, so * when looping through the batch you'll need to check that the mutation * itself matches the query. * + * Batches are guaranteed to be in sorted order. + * * Note that because of this requirement implementations are free to return * mutation batches that don't match the query at all if it's convenient. * diff --git a/packages/firestore/src/local/persistence.ts b/packages/firestore/src/local/persistence.ts index 7a9cedc17d6..0f77a0ecdbe 100644 --- a/packages/firestore/src/local/persistence.ts +++ b/packages/firestore/src/local/persistence.ts @@ -17,7 +17,6 @@ import { User } from '../auth/user'; import { MutationQueue } from './mutation_queue'; -import { PersistenceTransaction } from './persistence'; import { PersistencePromise } from './persistence_promise'; import { QueryCache } from './query_cache'; import { RemoteDocumentCache } from './remote_document_cache'; @@ -30,7 +29,7 @@ import { ClientId } from './shared_client_state'; * pass it to your callback. You then pass it to any method that operates * on persistence. */ -export interface PersistenceTransaction {} +export abstract class PersistenceTransaction {} /** * Callback type for primary state notifications. This callback can be diff --git a/packages/firestore/src/local/simple_db.ts b/packages/firestore/src/local/simple_db.ts index 91646c3eab2..e429b736341 100644 --- a/packages/firestore/src/local/simple_db.ts +++ b/packages/firestore/src/local/simple_db.ts @@ -14,14 +14,12 @@ * limitations under the License. */ -import { assert, fail } from '../util/assert'; +import { assert } from '../util/assert'; import { debug } from '../util/log'; import { AnyDuringMigration } from '../util/misc'; - import { PersistencePromise } from './persistence_promise'; import { SCHEMA_VERSION } from './indexeddb_schema'; import { Deferred } from '../util/promise'; -import { PersistenceTransaction } from './persistence'; import { Code, FirestoreError } from '../util/error'; const LOG_TAG = 'SimpleDb'; @@ -150,14 +148,10 @@ export class SimpleDb { /** Helper to get a typed SimpleDbStore from a transaction. */ static getStore( - txn: PersistenceTransaction, + txn: SimpleDbTransaction, store: string ): SimpleDbStore { - if (txn instanceof SimpleDbTransaction) { - return txn.store(store); - } else { - return fail('Invalid transaction object provided!'); - } + return txn.store(store); } constructor(private db: IDBDatabase) {} @@ -547,25 +541,19 @@ export class SimpleDbStore { } private cursor(options: IterateOptions): IDBRequest { - let direction = 'next'; + let direction: IDBCursorDirection = 'next'; if (options.reverse) { direction = 'prev'; } if (options.index) { const index = this.store.index(options.index); if (options.keysOnly) { - return index.openKeyCursor( - options.range, - direction as AnyDuringMigration - ); + return index.openKeyCursor(options.range, direction); } else { - return index.openCursor(options.range, direction as AnyDuringMigration); + return index.openCursor(options.range, direction); } } else { - return this.store.openCursor( - options.range, - direction as AnyDuringMigration - ); + return this.store.openCursor(options.range, direction); } } } diff --git a/packages/firestore/src/model/path.ts b/packages/firestore/src/model/path.ts index 4226ca7d1b2..6ca8271a7f4 100644 --- a/packages/firestore/src/model/path.ts +++ b/packages/firestore/src/model/path.ts @@ -143,6 +143,20 @@ export abstract class Path { return true; } + isImmediateParentOf(potentialChild: this): boolean { + if (this.length + 1 !== potentialChild.length) { + return false; + } + + for (let i = 0; i < this.length; i++) { + if (this.get(i) !== potentialChild.get(i)) { + return false; + } + } + + return true; + } + forEach(fn: (segment: string) => void): void { for (let i = this.offset, end = this.limit(); i < end; i++) { fn(this.segments[i]); diff --git a/packages/firestore/src/remote/datastore.ts b/packages/firestore/src/remote/datastore.ts index a958e8076a5..73a2fee80a1 100644 --- a/packages/firestore/src/remote/datastore.ts +++ b/packages/firestore/src/remote/datastore.ts @@ -1,4 +1,3 @@ -import { WatchStreamListener, WriteStreamListener } from './persistent_stream'; /** * Copyright 2017 Google Inc. * @@ -24,7 +23,7 @@ import { Mutation, MutationResult } from '../model/mutation'; import { assert } from '../util/assert'; import { Code, FirestoreError } from '../util/error'; import { AsyncQueue } from '../util/async_queue'; - +import { WatchStreamListener, WriteStreamListener } from './persistent_stream'; import { Connection } from './connection'; import { PersistentListenStream, diff --git a/packages/firestore/src/remote/persistent_stream.ts b/packages/firestore/src/remote/persistent_stream.ts index 8d20cd9195c..05139841220 100644 --- a/packages/firestore/src/remote/persistent_stream.ts +++ b/packages/firestore/src/remote/persistent_stream.ts @@ -64,21 +64,21 @@ export interface WriteRequest extends api.WriteRequest { enum PersistentStreamState { /** * The streaming RPC is not yet running and there's no error condition. - * Calling `start` will start the stream immediately without backoff. - * While in this state isStarted will return false. + * Calling start() will start the stream immediately without backoff. + * While in this state isStarted() will return false. */ Initial, /** * The stream is starting, either waiting for an auth token or for the stream - * to successfully open. While in this state, isStarted will return true but - * isOpen will return false. + * to successfully open. While in this state, isStarted() will return true but + * isOpen() will return false. */ Starting, /** * The streaming RPC is up and running. Requests and responses can flow - * freely. Both isStarted and isOpen will return true. + * freely. Both isStarted() and isOpen() will return true. */ Open, @@ -91,7 +91,7 @@ enum PersistentStreamState { /** * An in-between state after an error where the stream is waiting before * re-starting. After waiting is complete, the stream will try to open. - * While in this state isStarted() will return true but isOpen will return + * While in this state isStarted() will return true but isOpen() will return * false. */ Backoff @@ -144,12 +144,12 @@ const IDLE_TIMEOUT_MS = 60 * 1000; * * ## Starting and Stopping * - * Streaming RPCs are stateful and need to be `start`ed before messages can - * be sent and received. The PersistentStream will call the onOpen function + * Streaming RPCs are stateful and need to be start()ed before messages can + * be sent and received. The PersistentStream will call the onOpen() function * of the listener once the stream is ready to accept requests. * - * Should a `start` fail, PersistentStream will call the registered - * onClose with a FirestoreError indicating what went wrong. + * Should a start() fail, PersistentStream will call the registered onClose() + * listener with a FirestoreError indicating what went wrong. * * A PersistentStream can be started and stopped repeatedly. * @@ -173,7 +173,7 @@ export abstract class PersistentStream< */ private closeCount = 0; - private inactivityTimerPromise: CancelablePromise | null = null; + private idleTimer: CancelablePromise | null = null; private stream: Stream | null = null; protected backoff: ExponentialBackoff; @@ -196,10 +196,10 @@ export abstract class PersistentStream< } /** - * Returns true if `start` has been called and no error has occurred. True + * Returns true if start() has been called and no error has occurred. True * indicates the stream is open or in the process of opening (which * encompasses respecting backoff, getting auth tokens, and starting the - * actual RPC). Use `isOpen` to determine if the stream is open and ready for + * actual RPC). Use isOpen() to determine if the stream is open and ready for * outbound requests. */ isStarted(): boolean { @@ -211,7 +211,7 @@ export abstract class PersistentStream< } /** - * Returns true if the underlying RPC is open (the onOpen callback has been + * Returns true if the underlying RPC is open (the onOpen() listener has been * called) and the stream is ready for outbound requests. */ isOpen(): boolean { @@ -219,11 +219,11 @@ export abstract class PersistentStream< } /** - * Starts the RPC. Only allowed if isStarted returns false. The stream is - * not immediately ready for use: onOpen will be invoked when the RPC is ready - * for outbound requests, at which point isOpen will return true. + * Starts the RPC. Only allowed if isStarted() returns false. The stream is + * not immediately ready for use: onOpen() will be invoked when the RPC is + * ready for outbound requests, at which point isOpen() will return true. * - * When start returns, isStarted will return true. + * When start returns, isStarted() will return true. */ start(): void { if (this.state === PersistentStreamState.Error) { @@ -237,9 +237,9 @@ export abstract class PersistentStream< /** * Stops the RPC. This call is idempotent and allowed regardless of the - * current isStarted state. + * current isStarted() state. * - * When stop returns, isStarted and isOpen will both return false. + * When stop returns, isStarted() and isOpen() will both return false. */ stop(): void { if (this.isStarted()) { @@ -252,7 +252,7 @@ export abstract class PersistentStream< * start it. If the error warrants an immediate restart of the stream, the * sender can use this to indicate that the receiver should not back off. * - * Each error will call the onClose function. That function can decide to + * Each error will call the onClose() listener. That function can decide to * inhibit backoff if required. */ inhibitBackoff(): void { @@ -275,8 +275,8 @@ export abstract class PersistentStream< markIdle(): void { // Starts the idle time if we are in state 'Open' and are not yet already // running a timer (in which case the previous idle timeout still applies). - if (this.isOpen() && this.inactivityTimerPromise === null) { - this.inactivityTimerPromise = this.queue.enqueueAfterDelay( + if (this.isOpen() && this.idleTimer === null) { + this.idleTimer = this.queue.enqueueAfterDelay( this.idleTimerId, IDLE_TIMEOUT_MS, () => this.handleIdleCloseTimer() @@ -301,9 +301,9 @@ export abstract class PersistentStream< /** Marks the stream as active again. */ private cancelIdleCheck(): void { - if (this.inactivityTimerPromise) { - this.inactivityTimerPromise.cancel(); - this.inactivityTimerPromise = null; + if (this.idleTimer) { + this.idleTimer.cancel(); + this.idleTimer = null; } } @@ -315,7 +315,7 @@ export abstract class PersistentStream< * * sets internal stream state to 'finalState'; * * adjusts the backoff timer based on the error * - * A new stream can be opened by calling `start`. + * A new stream can be opened by calling start(). * * @param finalState the intended state of the stream after closing. * @param error the error the connection was closed with. @@ -532,9 +532,9 @@ export interface WatchStreamListener extends PersistentStreamListener { /** * A PersistentStream that implements the Listen RPC. * - * Once the Listen stream has called the openHandler, any number of listen and - * unlisten calls calls can be sent to control what changes will be sent from - * the server for ListenResponses. + * Once the Listen stream has called the onOpen() listener, any number of + * listen() and unlisten() calls can be made to control what changes will be + * sent from the server for ListenResponses. */ export class PersistentListenStream extends PersistentStream< api.ListenRequest, diff --git a/packages/firestore/src/remote/remote_store.ts b/packages/firestore/src/remote/remote_store.ts index 3c69bb0a7e6..59cc291f41c 100644 --- a/packages/firestore/src/remote/remote_store.ts +++ b/packages/firestore/src/remote/remote_store.ts @@ -76,9 +76,6 @@ const MAX_PENDING_WRITES = 10; * - pulling pending mutations from LocalStore and sending them to Datastore. * - retrying mutations that failed because of network problems. * - acking mutations to the SyncEngine once they are accepted or rejected. - * - * RemoteStore always starts out offline. A call to `enableNetwork()` - * initializes the network connection. */ export class RemoteStore implements TargetMetadataProvider { /** @@ -111,19 +108,16 @@ export class RemoteStore implements TargetMetadataProvider { */ private listenTargets: { [targetId: number]: QueryData } = {}; - private networkEnabled = false; - private watchStream: PersistentListenStream; private writeStream: PersistentWriteStream; private watchChangeAggregator: WatchChangeAggregator = null; /** * Set to true by enableNetwork() and false by disableNetwork() and indicates - * the user-preferred network state. A network connection is only established - * if `networkAllowed` is true, the client is primary and there are - * outstanding mutations or active listens. + * the user-preferred network state. */ - private networkAllowed = true; + private networkEnabled = false; + private isPrimary = false; private onlineStateTracker: OnlineStateTracker; @@ -165,60 +159,25 @@ export class RemoteStore implements TargetMetadataProvider { * Starts up the remote store, creating streams, restoring state from * LocalStore, etc. */ -<<<<<<< HEAD start(): Promise { - // Start is a no-op for RemoteStore. - return Promise.resolve(); - } - - private isNetworkEnabled(): boolean { - assert( - (this.watchStream == null) === (this.writeStream == null), - 'WatchStream and WriteStream should both be null or non-null' - ); - return this.watchStream != null; -======= - async start(): Promise { - await this.enableNetwork(); ->>>>>>> master + return this.enableNetwork(); } /** Re-enables the network. Idempotent. */ async enableNetwork(): Promise { -<<<<<<< HEAD - this.networkAllowed = true; + this.networkEnabled = true; - if (this.isPrimary) { - if (this.isNetworkEnabled()) { - return; - } - - // Create new streams (but note they're not started yet). - this.watchStream = this.datastore.newPersistentWatchStream(); - this.writeStream = this.datastore.newPersistentWriteStream(); -======= - if (!this.networkEnabled) { - this.networkEnabled = true; + if (this.canUseNetwork()) { this.writeStream.lastStreamToken = await this.localStore.getLastStreamToken(); ->>>>>>> master - - // Load any saved stream token from persistent storage - return this.localStore.getLastStreamToken().then(token => { - this.writeStream.lastStreamToken = token; -<<<<<<< HEAD - if (this.shouldStartWatchStream()) { - this.startWatchStream(); - } else { - this.onlineStateTracker.set(OnlineState.Unknown); - } + if (this.shouldStartWatchStream()) { + this.startWatchStream(); + } else { + this.onlineStateTracker.set(OnlineState.Unknown); + } - return this.fillWritePipeline(); // This may start the writeStream. - }); -======= // This will start the write stream if necessary. await this.fillWritePipeline(); ->>>>>>> master } } @@ -227,41 +186,31 @@ export class RemoteStore implements TargetMetadataProvider { * enableNetwork(). */ async disableNetwork(): Promise { -<<<<<<< HEAD - this.networkAllowed = false; - + this.networkEnabled = false; this.disableNetworkInternal(); -======= - await this.disableNetworkInternal(); ->>>>>>> master // Set the OnlineState to Offline so get()s return from cache, etc. this.onlineStateTracker.set(OnlineState.Offline); } - private async disableNetworkInternal(): Promise { - if (this.networkEnabled) { - this.networkEnabled = false; - - this.writeStream.stop(); - this.watchStream.stop(); - - if (this.writePipeline.length > 0) { - log.debug( - LOG_TAG, - `Stopping write stream with ${ - this.writePipeline.length - } pending writes` - ); - this.writePipeline = []; - } + private disableNetworkInternal(): void { + this.writeStream.stop(); + this.watchStream.stop(); - this.cleanUpWatchStreamState(); + if (this.writePipeline.length > 0) { + log.debug( + LOG_TAG, + `Stopping write stream with ${this.writePipeline.length} pending writes` + ); + this.writePipeline = []; } + + this.cleanUpWatchStreamState(); } shutdown(): Promise { log.debug(LOG_TAG, 'RemoteStore shutting down.'); + this.networkEnabled = false; this.disableNetworkInternal(); // Set the OnlineState to Unknown (rather than Offline) to avoid potentially @@ -355,9 +304,7 @@ export class RemoteStore implements TargetMetadataProvider { } private canUseNetwork(): boolean { - // TODO(mikelehen): This could take into account isPrimary when we merge - // with multitab. - return this.networkEnabled; + return this.isPrimary && this.networkEnabled; } private cleanUpWatchStreamState(): void { @@ -555,7 +502,7 @@ export class RemoteStore implements TargetMetadataProvider { */ private canAddToWritePipeline(): boolean { return ( - this.networkEnabled && this.writePipeline.length < MAX_PENDING_WRITES + this.canUseNetwork() && this.writePipeline.length < MAX_PENDING_WRITES ); } @@ -739,10 +686,11 @@ export class RemoteStore implements TargetMetadataProvider { async handleUserChange(user: User): Promise { log.debug(LOG_TAG, 'RemoteStore changing users: uid=', user.uid); - if (this.networkEnabled) { + if (this.canUseNetwork()) { // Tear down and re-create our network streams. This will ensure we get a fresh auth token // for the new user and re-fill the write pipeline with new mutations from the LocalStore // (since mutations are per-user). + this.networkEnabled = false; this.disableNetworkInternal(); this.onlineStateTracker.set(OnlineState.Unknown); await this.enableNetwork(); @@ -755,9 +703,9 @@ export class RemoteStore implements TargetMetadataProvider { async applyPrimaryState(isPrimary: boolean): Promise { this.isPrimary = isPrimary; - if (isPrimary && this.networkAllowed) { + if (isPrimary && this.networkEnabled) { await this.enableNetwork(); - } else if (!isPrimary && this.isNetworkEnabled()) { + } else if (!isPrimary) { this.disableNetworkInternal(); this.onlineStateTracker.set(OnlineState.Unknown); } diff --git a/packages/firestore/src/remote/watch_change.ts b/packages/firestore/src/remote/watch_change.ts index faeef82abe5..8ccebed2c69 100644 --- a/packages/firestore/src/remote/watch_change.ts +++ b/packages/firestore/src/remote/watch_change.ts @@ -297,7 +297,7 @@ export class WatchChangeAggregator { /** Processes and adds the WatchTargetChange to the current set of changes. */ handleTargetChange(targetChange: WatchTargetChange): void { - targetChange.targetIds.forEach(targetId => { + this.forEachTarget(targetChange, targetId => { const targetState = this.ensureTargetState(targetId); switch (targetChange.state) { case WatchTargetChangeState.NoChange: @@ -352,6 +352,22 @@ export class WatchChangeAggregator { }); } + /** + * Iterates over all targetIds that the watch change applies to: either the + * targetIds explicitly listed in the change or the targetIds of all currently + * active targets. + */ + forEachTarget( + targetChange: WatchTargetChange, + fn: (targetId: TargetId) => void + ): void { + if (targetChange.targetIds.length > 0) { + targetChange.targetIds.forEach(fn); + } else { + objUtils.forEachNumber(this.targetStates, fn); + } + } + /** * Handles existence filters and synthesizes deletes for filter mismatches. * Targets that are invalidated by filter mismatches are added to diff --git a/packages/firestore/test/unit/local/eager_garbage_collector.test.ts b/packages/firestore/test/unit/local/eager_garbage_collector.test.ts index f12a579b217..af5d748a2be 100644 --- a/packages/firestore/test/unit/local/eager_garbage_collector.test.ts +++ b/packages/firestore/test/unit/local/eager_garbage_collector.test.ts @@ -65,7 +65,7 @@ describe('EagerGarbageCollector', () => { expect(referenceSet.isEmpty()).to.equal(false); referenceSet.removeReferencesForId(2); - return gc.collectGarbage(true).toPromise(); + return gc.collectGarbage(null).toPromise(); }) .then(garbage => { expectSetToEqual(garbage, [key3]); diff --git a/packages/firestore/test/unit/local/mutation_queue.test.ts b/packages/firestore/test/unit/local/mutation_queue.test.ts index afd672d9245..08c4e4abd55 100644 --- a/packages/firestore/test/unit/local/mutation_queue.test.ts +++ b/packages/firestore/test/unit/local/mutation_queue.test.ts @@ -20,6 +20,7 @@ import { Query } from '../../../src/core/query'; import { EagerGarbageCollector } from '../../../src/local/eager_garbage_collector'; import { IndexedDbPersistence } from '../../../src/local/indexeddb_persistence'; import { Persistence } from '../../../src/local/persistence'; +import { documentKeySet } from '../../../src/model/collections'; import { BATCHID_UNKNOWN, MutationBatch @@ -40,7 +41,6 @@ import { addEqualityMatcher } from '../../util/equality_matcher'; let persistence: Persistence; let mutationQueue: TestMutationQueue; - describe('MemoryMutationQueue', () => { beforeEach(() => { return persistenceHelpers.testMemoryPersistence().then(p => { @@ -73,7 +73,7 @@ describe('IndexedDbMutationQueue', () => { function genericMutationQueueTests(): void { addEqualityMatcher(); - beforeEach(() => { + beforeEach(async () => { mutationQueue = new TestMutationQueue( persistence, persistence.getMutationQueue(new User('user')) @@ -83,6 +83,58 @@ function genericMutationQueueTests(): void { afterEach(() => persistence.shutdown(/* deleteData= */ true)); + /** + * Creates a new MutationBatch with the next batch ID and a set of dummy + * mutations. + */ + function addMutationBatch(key?: string): Promise { + let keyStr = key; + if (keyStr === undefined) { + keyStr = 'foo/bar'; + } + const mutation = setMutation(keyStr, { a: 1 }); + return mutationQueue.addMutationBatch([mutation]); + } + + /** + * Creates an array of batches containing count dummy MutationBatches. Each + * has a different batchID. + */ + async function createBatches(count: number): Promise { + const batches = []; + for (let i = 0; i < count; i++) { + const batch = await addMutationBatch(); + batches.push(batch); + } + return batches; + } + + /** + * Removes entries from from the given a batches and returns them. + * + * @param holes An array of indexes in the batches array; in increasing order. + * Indexes are relative to the original state of the batches array, not any + * intermediate state that might occur. + * @param batches The array to mutate, removing entries from it. + * @return A new array containing all the entries that were removed from + * batches. + */ + async function makeHolesInBatches( + holes: number[], + batches: MutationBatch[] + ): Promise { + const removed = []; + for (let i = 0; i < holes.length; i++) { + const index = holes[i] - i; + const batch = batches[index]; + await mutationQueue.removeMutationBatches([batch]); + + batches.splice(index, 1); + removed.push(batch); + } + return removed; + } + it('can count batches', async () => { expect(await mutationQueue.countBatches()).to.equal(0); expect(await mutationQueue.checkEmpty()).to.equal(true); @@ -329,7 +381,30 @@ function genericMutationQueueTests(): void { const matches = await mutationQueue.getAllMutationBatchesAffectingDocumentKey( key('foo/bar') ); - expect(matches.length).to.deep.equal(expected.length); + expectEqualArrays(matches, expected); + }); + + it('can getAllMutationBatchesAffectingDocumentKeys()', async () => { + const mutations = [ + setMutation('fob/bar', { a: 1 }), + setMutation('foo/bar', { a: 1 }), + patchMutation('foo/bar', { b: 1 }), + setMutation('foo/bar/suffix/key', { a: 1 }), + setMutation('foo/baz', { a: 1 }), + setMutation('food/bar', { a: 1 }) + ]; + // Store all the mutations. + const batches: MutationBatch[] = []; + for (const mutation of mutations) { + const batch = await mutationQueue.addMutationBatch([mutation]); + batches.push(batch); + } + const expected = [batches[1], batches[2], batches[4]]; + const matches = await mutationQueue.getAllMutationBatchesAffectingDocumentKeys( + documentKeySet() + .add(key('foo/bar')) + .add(key('foo/baz')) + ); expectEqualArrays(matches, expected); }); @@ -498,55 +573,3 @@ function genericMutationQueueTests(): void { expect(await mutationQueue.checkEmpty()).to.equal(true); }); } - -/** - * Creates a new MutationBatch with the next batch ID and a set of dummy - * mutations. - */ -function addMutationBatch(key?: string): Promise { - let keyStr = key; - if (keyStr === undefined) { - keyStr = 'foo/bar'; - } - const mutation = setMutation(keyStr, { a: 1 }); - return mutationQueue.addMutationBatch([mutation]); -} - -/** - * Creates an array of batches containing count dummy MutationBatches. Each - * has a different batchID. - */ -async function createBatches(count: number): Promise { - const batches = []; - for (let i = 0; i < count; i++) { - const batch = await addMutationBatch(); - batches.push(batch); - } - return batches; -} - -/** - * Removes entries from from the given a batches and returns them. - * - * @param holes An array of indexes in the batches array; in increasing order. - * Indexes are relative to the original state of the batches array, not any - * intermediate state that might occur. - * @param batches The array to mutate, removing entries from it. - * @return A new array containing all the entries that were removed from - * batches. - */ -async function makeHolesInBatches( - holes: number[], - batches: MutationBatch[] -): Promise { - const removed = []; - for (let i = 0; i < holes.length; i++) { - const index = holes[i] - i; - const batch = batches[index]; - await mutationQueue.removeMutationBatches([batch]); - - batches.splice(index, 1); - removed.push(batch); - } - return removed; -} diff --git a/packages/firestore/test/unit/local/query_cache.test.ts b/packages/firestore/test/unit/local/query_cache.test.ts index 2a3ede52eba..bb4f4f0da03 100644 --- a/packages/firestore/test/unit/local/query_cache.test.ts +++ b/packages/firestore/test/unit/local/query_cache.test.ts @@ -35,17 +35,8 @@ import * as persistenceHelpers from './persistence_test_helpers'; import { TestGarbageCollector } from './test_garbage_collector'; import { TestQueryCache } from './test_query_cache'; -let persistence: Persistence; -let cache: TestQueryCache; - describe('MemoryQueryCache', () => { - beforeEach(() => { - return persistenceHelpers.testMemoryPersistence().then(p => { - persistence = p; - }); - }); - - genericQueryCacheTests(); + genericQueryCacheTests(persistenceHelpers.testMemoryPersistence); }); describe('IndexedDbQueryCache', () => { @@ -54,22 +45,22 @@ describe('IndexedDbQueryCache', () => { return; } - beforeEach(() => { - return persistenceHelpers.testIndexedDbPersistence().then(p => { - persistence = p; - }); + let persistencePromise: Promise; + beforeEach(async () => { + persistencePromise = persistenceHelpers.testIndexedDbPersistence(); }); - afterEach(() => persistence.shutdown(/* deleteData= */ true)); - - genericQueryCacheTests(); + genericQueryCacheTests(() => persistencePromise); }); /** * Defines the set of tests to run against both query cache implementations. */ -function genericQueryCacheTests(): void { +function genericQueryCacheTests( + persistencePromise: () => Promise +): void { addEqualityMatcher(); + let cache: TestQueryCache; const QUERY_ROOMS = Query.atPath(path('rooms')); const QUERY_HALLS = Query.atPath(path('halls')); @@ -98,11 +89,17 @@ function genericQueryCacheTests(): void { ); } + let persistence: Persistence; beforeEach(async () => { + persistence = await persistencePromise(); cache = new TestQueryCache(persistence, persistence.getQueryCache()); await cache.start(); }); + afterEach(async () => { + persistence.shutdown(/* deleteData= */ true); + }); + it('returns null for query not in cache', () => { return cache.getQueryData(QUERY_ROOMS).then(queryData => { expect(queryData).to.equal(null); diff --git a/packages/firestore/test/unit/local/remote_document_cache.test.ts b/packages/firestore/test/unit/local/remote_document_cache.test.ts index 59491cc0bac..bc19e796233 100644 --- a/packages/firestore/test/unit/local/remote_document_cache.test.ts +++ b/packages/firestore/test/unit/local/remote_document_cache.test.ts @@ -32,17 +32,8 @@ import * as persistenceHelpers from './persistence_test_helpers'; import { TestRemoteDocumentCache } from './test_remote_document_cache'; import { MaybeDocumentMap } from '../../../src/model/collections'; -let persistence: Persistence; -let cache: TestRemoteDocumentCache; - describe('MemoryRemoteDocumentCache', () => { - beforeEach(() => { - return persistenceHelpers.testMemoryPersistence().then(p => { - persistence = p; - }); - }); - - genericRemoteDocumentCacheTests(); + genericRemoteDocumentCacheTests(persistenceHelpers.testMemoryPersistence); }); describe('IndexedDbRemoteDocumentCache', () => { @@ -51,31 +42,27 @@ describe('IndexedDbRemoteDocumentCache', () => { return; } - beforeEach(() => { - // We turn on `synchronizeTabs` to test the document change log. - return persistenceHelpers - .testIndexedDbPersistence(/* synchronizeTabs= */ true) - .then(p => { - persistence = p; - }); - }); - - afterEach(() => persistence.shutdown(/* deleteData= */ true)); - - genericRemoteDocumentCacheTests(); + genericRemoteDocumentCacheTests(() => + persistenceHelpers.testIndexedDbPersistence(/* synchronizeTabs= */ true) + ); }); /** * Defines the set of tests to run against both remote document cache * implementations. */ -function genericRemoteDocumentCacheTests(): void { +function genericRemoteDocumentCacheTests( + persistencePromise: () => Promise +): void { // Helpers for use throughout tests. const DOC_PATH = 'a/b'; const LONG_DOC_PATH = 'a/b/c/d/e/f'; const DOC_DATA = { a: 1, b: 2 }; const VERSION = 42; + let persistence: Persistence; + let cache: TestRemoteDocumentCache; + function setAndReadDocument(doc: MaybeDocument): Promise { return cache .addEntries([doc]) @@ -105,15 +92,16 @@ function genericRemoteDocumentCacheTests(): void { }); } - beforeEach(() => { + beforeEach(async () => { + persistence = await persistencePromise(); cache = new TestRemoteDocumentCache( persistence, persistence.getRemoteDocumentCache() ); - - return cache.start(); }); + afterEach(() => persistence.shutdown(/* deleteData= */ true)); + it('returns null for document not in cache', () => { return cache.getEntry(key(DOC_PATH)).then(doc => { expect(doc).to.equal(null); diff --git a/packages/firestore/test/unit/local/remote_document_change_buffer.test.ts b/packages/firestore/test/unit/local/remote_document_change_buffer.test.ts index e19af434519..ad95e86620c 100644 --- a/packages/firestore/test/unit/local/remote_document_change_buffer.test.ts +++ b/packages/firestore/test/unit/local/remote_document_change_buffer.test.ts @@ -16,7 +16,6 @@ import { expect } from 'chai'; import { IndexedDbPersistence } from '../../../src/local/indexeddb_persistence'; -import { Persistence } from '../../../src/local/persistence'; import { RemoteDocumentChangeBuffer } from '../../../src/local/remote_document_change_buffer'; import { deletedDoc, doc, expectEqual, key } from '../../util/helpers'; @@ -24,7 +23,7 @@ import { testIndexedDbPersistence } from './persistence_test_helpers'; import { TestRemoteDocumentCache } from './test_remote_document_cache'; import { TestRemoteDocumentChangeBuffer } from './test_remote_document_change_buffer'; -let persistence: Persistence; +let persistence: IndexedDbPersistence; let cache: TestRemoteDocumentCache; let buffer: TestRemoteDocumentChangeBuffer; const INITIAL_DOC = doc('coll/a', 42, { test: 'data' }); diff --git a/packages/firestore/test/unit/local/test_mutation_queue.ts b/packages/firestore/test/unit/local/test_mutation_queue.ts index 12ca96614ea..a3094dff5d0 100644 --- a/packages/firestore/test/unit/local/test_mutation_queue.ts +++ b/packages/firestore/test/unit/local/test_mutation_queue.ts @@ -127,7 +127,7 @@ export class TestMutationQueue { ): Promise { return this.persistence.runTransaction( 'getAllMutationBatchesThroughBatchId', - true, + true, txn => { return this.queue.getAllMutationBatchesThroughBatchId(txn, batchId); } @@ -149,6 +149,21 @@ export class TestMutationQueue { ); } + getAllMutationBatchesAffectingDocumentKeys( + documentKeys: DocumentKeySet + ): Promise { + return this.persistence.runTransaction( + 'getAllMutationBatchesAffectingDocumentKeys', + true, + txn => { + return this.queue.getAllMutationBatchesAffectingDocumentKeys( + txn, + documentKeys + ); + } + ); + } + getAllMutationBatchesAffectingQuery(query: Query): Promise { return this.persistence.runTransaction( 'getAllMutationBatchesAffectingQuery', diff --git a/packages/firestore/test/unit/specs/limbo_spec.test.ts b/packages/firestore/test/unit/specs/limbo_spec.test.ts index 90f53cc6a6a..6ba4f233137 100644 --- a/packages/firestore/test/unit/specs/limbo_spec.test.ts +++ b/packages/firestore/test/unit/specs/limbo_spec.test.ts @@ -362,34 +362,30 @@ describeSpec('Limbo Documents:', [], () => { const docB = doc('collection/b', 1001, { key: 'b' }); const deletedDocB = deletedDoc('collection/b', 1005); - return ( - client(0, false) - .expectPrimaryState(true) - .client(1) - .userListens(query) - .client(0) - .expectListen(query) - .watchAcksFull(query, 1002, docA, docB) - .client(1) - .expectEvents(query, { added: [docA, docB] }) - .client(0) - .watchRemovesDoc(docB.key, query) - .watchSnapshots(1003) - .expectLimboDocs(docB.key) - .shutdown() - .client(1) - .expectEvents(query, { fromCache: true }) - .runTimer(TimerId.ClientMetadataRefresh) - .expectPrimaryState(true) - // TODO(37254270): This should be 'resume-token-1003' from the last - // global snapshot. - .expectListen(query, 'resume-token-1002') - .watchAcksFull(query, 1004) - .expectLimboDocs(docB.key) - .ackLimbo(1005, deletedDocB) - .expectLimboDocs() - .expectEvents(query, { removed: [docB] }) - ); + return client(0, false) + .expectPrimaryState(true) + .client(1) + .userListens(query) + .client(0) + .expectListen(query) + .watchAcksFull(query, 1 * 1e6, docA, docB) + .client(1) + .expectEvents(query, { added: [docA, docB] }) + .client(0) + .watchRemovesDoc(docB.key, query) + .watchSnapshots(2 * 1e6) + .expectLimboDocs(docB.key) + .shutdown() + .client(1) + .expectEvents(query, { fromCache: true }) + .runTimer(TimerId.ClientMetadataRefresh) + .expectPrimaryState(true) + .expectListen(query, 'resume-token-1000000') + .watchAcksFull(query, 3 * 1e6) + .expectLimboDocs(docB.key) + .ackLimbo(4 * 1e6, deletedDocB) + .expectLimboDocs() + .expectEvents(query, { removed: [docB] }); } ); @@ -404,41 +400,37 @@ describeSpec('Limbo Documents:', [], () => { const deletedDocB = deletedDoc('collection/b', 1006); const deletedDocC = deletedDoc('collection/c', 1008); - return ( - client(0, false) - .expectPrimaryState(true) - .userListens(query) - .watchAcksFull(query, 1002, docA, docB, docC) - .expectEvents(query, { added: [docA, docB, docC] }) - .watchRemovesDoc(docB.key, query) - .watchRemovesDoc(docC.key, query) - .watchSnapshots(1003) - .expectEvents(query, { fromCache: true }) - .expectLimboDocs(docB.key, docC.key) - .client(1) - .stealPrimaryLease() - .client(0) - .runTimer(TimerId.ClientMetadataRefresh) - .expectPrimaryState(false) - .expectLimboDocs() - .client(1) - // TODO(37254270): This should be 'resume-token-1003' from the last - // global snapshot. - .expectListen(query, 'resume-token-1002') - .watchAcksFull(query, 1005) - .expectLimboDocs(docB.key, docC.key) - .ackLimbo(1006, deletedDocB) - .expectLimboDocs(docC.key) - .client(0) - .expectEvents(query, { removed: [docB], fromCache: true }) - .stealPrimaryLease() - .expectListen(query, 'resume-token-1005') - .watchAcksFull(query, 1007) - .expectLimboDocs(docC.key) - .ackLimbo(1007, deletedDocC) - .expectLimboDocs() - .expectEvents(query, { removed: [docC] }) - ); + return client(0, false) + .expectPrimaryState(true) + .userListens(query) + .watchAcksFull(query, 1 * 1e6, docA, docB, docC) + .expectEvents(query, { added: [docA, docB, docC] }) + .watchRemovesDoc(docB.key, query) + .watchRemovesDoc(docC.key, query) + .watchSnapshots(2 * 1e6) + .expectEvents(query, { fromCache: true }) + .expectLimboDocs(docB.key, docC.key) + .client(1) + .stealPrimaryLease() + .client(0) + .runTimer(TimerId.ClientMetadataRefresh) + .expectPrimaryState(false) + .expectLimboDocs() + .client(1) + .expectListen(query, 'resume-token-1000000') + .watchAcksFull(query, 3 * 1e6) + .expectLimboDocs(docB.key, docC.key) + .ackLimbo(3 * 1e6, deletedDocB) + .expectLimboDocs(docC.key) + .client(0) + .expectEvents(query, { removed: [docB], fromCache: true }) + .stealPrimaryLease() + .expectListen(query, 'resume-token-1000000') + .watchAcksFull(query, 5 * 1e6) + .expectLimboDocs(docC.key) + .ackLimbo(6 * 1e6, deletedDocC) + .expectLimboDocs() + .expectEvents(query, { removed: [docC] }); } ); }); diff --git a/packages/firestore/test/unit/specs/listen_spec.test.ts b/packages/firestore/test/unit/specs/listen_spec.test.ts index 4f12ac1940c..a1864515284 100644 --- a/packages/firestore/test/unit/specs/listen_spec.test.ts +++ b/packages/firestore/test/unit/specs/listen_spec.test.ts @@ -544,6 +544,90 @@ describeSpec('Listens:', [], () => { .expectEvents(query, {}); }); + specTest('Persists global resume tokens on unlisten', [], () => { + const query = Query.atPath(path('collection')); + const docA = doc('collection/a', 1000, { key: 'a' }); + + return ( + spec() + .withGCEnabled(false) + .userListens(query) + .watchAcksFull(query, 1000, docA) + .expectEvents(query, { added: [docA] }) + + // Some time later, watch sends an updated resume token and the user stops + // listening. + .watchSnapshots(2000, [], 'resume-token-2000') + .userUnlistens(query) + .watchRemoves(query) + + .userListens(query, 'resume-token-2000') + .expectEvents(query, { added: [docA], fromCache: true }) + .watchAcks(query) + .watchCurrents(query, 'resume-token-3000') + .watchSnapshots(3000) + .expectEvents(query, { fromCache: false }) + ); + }); + + specTest('Omits global resume tokens for a short while', [], () => { + const query = Query.atPath(path('collection')); + const docA = doc('collection/a', 1000, { key: 'a' }); + + return ( + spec() + .withGCEnabled(false) + .userListens(query) + .watchAcksFull(query, 1000, docA) + .expectEvents(query, { added: [docA] }) + + // One millisecond later, watch sends an updated resume token but the + // user doesn't manage to unlisten before restart. + .watchSnapshots(2000, [], 'resume-token-2000') + .restart() + + .userListens(query, 'resume-token-1000') + .expectEvents(query, { added: [docA], fromCache: true }) + .watchAcks(query) + .watchCurrents(query, 'resume-token-3000') + .watchSnapshots(3000) + .expectEvents(query, { fromCache: false }) + ); + }); + + specTest( + 'Persists global resume tokens if the snapshot is old enough', + [], + () => { + const initialVersion = 1000; + const minutesLater = 5 * 60 * 1e6 + initialVersion; + const evenLater = 1000 + minutesLater; + + const query = Query.atPath(path('collection')); + const docA = doc('collection/a', initialVersion, { key: 'a' }); + + return ( + spec() + .withGCEnabled(false) + .userListens(query) + .watchAcksFull(query, initialVersion, docA) + .expectEvents(query, { added: [docA] }) + + // 5 minutes later, watch sends an updated resume token but the user + // doesn't manage to unlisten before restart. + .watchSnapshots(minutesLater, [], 'resume-token-minutes-later') + .restart() + + .userListens(query, 'resume-token-minutes-later') + .expectEvents(query, { added: [docA], fromCache: true }) + .watchAcks(query) + .watchCurrents(query, 'resume-token-even-later') + .watchSnapshots(evenLater) + .expectEvents(query, { fromCache: false }) + ); + } + ); + specTest('Query is executed by primary client', ['multi-client'], () => { const query = Query.atPath(path('collection')); const docA = doc('collection/a', 1000, { key: 'a' }); diff --git a/packages/firestore/test/unit/specs/spec_test_runner.ts b/packages/firestore/test/unit/specs/spec_test_runner.ts index 4f4ecee4b28..ca1a0217d00 100644 --- a/packages/firestore/test/unit/specs/spec_test_runner.ts +++ b/packages/firestore/test/unit/specs/spec_test_runner.ts @@ -494,17 +494,11 @@ abstract class TestRunner { } async shutdown(): Promise { -<<<<<<< HEAD - if (this.started) { - await this.doShutdown(); - } -======= await this.queue.enqueue(async () => { - await this.remoteStore.shutdown(); - await this.persistence.shutdown(/* deleteData= */ true); - await this.destroyPersistence(); + if (this.started) { + await this.doShutdown(); + } }); ->>>>>>> master } /** Runs a single SpecStep on this runner. */ @@ -1152,7 +1146,6 @@ class MemoryTestRunner extends TestRunner { */ class IndexedDbTestRunner extends TestRunner { static TEST_DB_NAME = 'firestore/[DEFAULT]/specs'; - protected getSharedClientState(): SharedClientState { return new WebStorageSharedClientState( this.queue, diff --git a/packages/firestore/test/unit/specs/write_spec.test.ts b/packages/firestore/test/unit/specs/write_spec.test.ts index 11db44ba686..62ff28a955e 100644 --- a/packages/firestore/test/unit/specs/write_spec.test.ts +++ b/packages/firestore/test/unit/specs/write_spec.test.ts @@ -1245,4 +1245,30 @@ describeSpec('Writes:', [], () => { ); } ); + + specTest( + 'Mutation are not sent twice after primary failover', + ['multi-client'], + () => { + const query = Query.atPath(path('collection')); + const docA = doc('collection/a', 0, { k: 'a' }); + const docB = doc('collection/b', 0, { k: 'b' }); + + return client(0) + .expectPrimaryState(true) + .userSets('collection/a', { k: 'a' }) + .userSets('collection/b', { k: 'b' }) + .client(1) + .stealPrimaryLease() + .writeAcks('collection/a', 1000, { expectUserCallback: false }) + .client(0) + .expectUserCallbacks({ + acknowledged: ['collection/a'] + }) + .stealPrimaryLease() + .writeAcks('collection/b', 2000) + .userListens(query) + .expectEvents(query, { added: [docA, docB], fromCache: true }); + } + ); });