From b78bbba3891760620367a7caf4d30c8b7b6b5d72 Mon Sep 17 00:00:00 2001 From: Sebastian Schmidt Date: Fri, 28 Aug 2020 18:56:30 +0200 Subject: [PATCH] Schedule everything on the AsyncQueue --- packages/firestore/exp/src/api/components.ts | 14 +- packages/firestore/exp/src/api/database.ts | 118 ++--- packages/firestore/exp/src/api/reference.ts | 286 +++++++----- packages/firestore/exp/src/api/transaction.ts | 23 +- packages/firestore/exp/src/api/write_batch.ts | 10 +- packages/firestore/lite/src/api/database.ts | 4 +- .../firestore/src/core/firestore_client.ts | 415 ++++++++---------- 7 files changed, 430 insertions(+), 440 deletions(-) diff --git a/packages/firestore/exp/src/api/components.ts b/packages/firestore/exp/src/api/components.ts index 11409f63806..b592c558991 100644 --- a/packages/firestore/exp/src/api/components.ts +++ b/packages/firestore/exp/src/api/components.ts @@ -17,7 +17,6 @@ import { Firestore } from './database'; import { PersistenceSettings } from '../../../src/core/firestore_client'; -import { Code, FirestoreError } from '../../../src/util/error'; import { MemoryOfflineComponentProvider, OfflineComponentProvider, @@ -109,7 +108,7 @@ export async function setOnlineComponentProvider( function getOfflineComponentProvider( firestore: Firestore ): Promise { - verifyNotTerminated(firestore); + firestore._queue.verifyOperationInProgress(); if (!offlineComponentProviders.has(firestore)) { logDebug(LOG_TAG, 'Using default OfflineComponentProvider'); @@ -126,7 +125,7 @@ function getOfflineComponentProvider( function getOnlineComponentProvider( firestore: Firestore ): Promise { - verifyNotTerminated(firestore); + firestore._queue.verifyOperationInProgress(); if (!onlineComponentProviders.has(firestore)) { logDebug(LOG_TAG, 'Using default OnlineComponentProvider'); @@ -136,15 +135,6 @@ function getOnlineComponentProvider( return onlineComponentProviders.get(firestore)!; } -function verifyNotTerminated(firestore: Firestore): void { - if (firestore._terminated) { - throw new FirestoreError( - Code.FAILED_PRECONDITION, - 'The client has already been terminated.' - ); - } -} - // Note: These functions cannot be `async` since we want to throw an exception // when Firestore is terminated (via `getOnlineComponentProvider()`). diff --git a/packages/firestore/exp/src/api/database.ts b/packages/firestore/exp/src/api/database.ts index fece5ab481a..e3ed82ce6d4 100644 --- a/packages/firestore/exp/src/api/database.ts +++ b/packages/firestore/exp/src/api/database.ts @@ -22,11 +22,7 @@ import { _FirebaseService, FirebaseApp } from '@firebase/app-types-exp'; import { Provider } from '@firebase/component'; import { FirebaseAuthInternalName } from '@firebase/auth-interop-types'; -import { - enqueueNetworkEnabled, - enqueueWaitForPendingWrites, - MAX_CONCURRENT_LIMBO_RESOLUTIONS -} from '../../../src/core/firestore_client'; +import { MAX_CONCURRENT_LIMBO_RESOLUTIONS } from '../../../src/core/firestore_client'; import { AsyncQueue, wrapInUserErrorIfRecoverable @@ -62,6 +58,7 @@ import { AutoId } from '../../../src/util/misc'; import { User } from '../../../src/auth/user'; import { CredentialChangeListener } from '../../../src/api/credentials'; import { logDebug } from '../../../src/util/log'; +import { registerPendingWritesCallback } from '../../../src/core/sync_engine'; const LOG_TAG = 'Firestore'; @@ -157,6 +154,15 @@ export class Firestore }); return deferred.promise; } + + _verifyNotTerminated(): void { + if (this._terminated) { + throw new FirestoreError( + Code.FAILED_PRECONDITION, + 'The client has already been terminated.' + ); + } + } } export function initializeFirestore( @@ -199,17 +205,19 @@ export function enableIndexedDbPersistence( // `getOnlineComponentProvider()` const settings = firestoreImpl._getSettings(); - // TODO(firestoreexp): Add forceOwningTab - return setOfflineComponentProvider( - firestoreImpl, - { - durable: true, - synchronizeTabs: false, - cacheSizeBytes: - settings.cacheSizeBytes || LruParams.DEFAULT_CACHE_SIZE_BYTES, - forceOwningTab: false - }, - new IndexedDbOfflineComponentProvider() + return firestoreImpl._queue.enqueue(() => + // TODO(firestoreexp): Add forceOwningTab + setOfflineComponentProvider( + firestoreImpl, + { + durable: true, + synchronizeTabs: false, + cacheSizeBytes: + settings.cacheSizeBytes || LruParams.DEFAULT_CACHE_SIZE_BYTES, + forceOwningTab: false + }, + new IndexedDbOfflineComponentProvider() + ) ); } @@ -229,19 +237,20 @@ export function enableMultiTabIndexedDbPersistence( const offlineComponentProvider = new MultiTabOfflineComponentProvider( onlineComponentProvider ); - return setOfflineComponentProvider( - firestoreImpl, - { - durable: true, - synchronizeTabs: true, - cacheSizeBytes: - settings.cacheSizeBytes || LruParams.DEFAULT_CACHE_SIZE_BYTES, - forceOwningTab: false - }, - offlineComponentProvider - ).then(() => - setOnlineComponentProvider(firestoreImpl, onlineComponentProvider) - ); + return firestoreImpl._queue.enqueue(async () => { + await setOfflineComponentProvider( + firestoreImpl, + { + durable: true, + synchronizeTabs: true, + cacheSizeBytes: + settings.cacheSizeBytes || LruParams.DEFAULT_CACHE_SIZE_BYTES, + forceOwningTab: false + }, + offlineComponentProvider + ); + await setOnlineComponentProvider(firestoreImpl, onlineComponentProvider); + }); } export function clearIndexedDbPersistence( @@ -277,43 +286,42 @@ export function waitForPendingWrites( firestore: firestore.FirebaseFirestore ): Promise { const firestoreImpl = cast(firestore, Firestore); - return getSyncEngine(firestoreImpl).then(syncEngine => - enqueueWaitForPendingWrites(firestoreImpl._queue, syncEngine) - ); + firestoreImpl._verifyNotTerminated(); + + const deferred = new Deferred(); + firestoreImpl._queue.enqueueAndForget(async () => { + const syncEngine = await getSyncEngine(firestoreImpl); + return registerPendingWritesCallback(syncEngine, deferred); + }); + return deferred.promise; } export function enableNetwork( firestore: firestore.FirebaseFirestore ): Promise { const firestoreImpl = cast(firestore, Firestore); - return Promise.all([ - getRemoteStore(firestoreImpl), - getPersistence(firestoreImpl) - ]).then(([remoteStore, persistence]) => - enqueueNetworkEnabled( - firestoreImpl._queue, - remoteStore, - persistence, - /* enabled= */ true - ) - ); + firestoreImpl._verifyNotTerminated(); + + return firestoreImpl._queue.enqueue(async () => { + const remoteStore = await getRemoteStore(firestoreImpl); + const persistence = await getPersistence(firestoreImpl); + persistence.setNetworkEnabled(true); + return remoteStore.enableNetwork(); + }); } export function disableNetwork( firestore: firestore.FirebaseFirestore ): Promise { const firestoreImpl = cast(firestore, Firestore); - return Promise.all([ - getRemoteStore(firestoreImpl), - getPersistence(firestoreImpl) - ]).then(([remoteStore, persistence]) => - enqueueNetworkEnabled( - firestoreImpl._queue, - remoteStore, - persistence, - /* enabled= */ false - ) - ); + firestoreImpl._verifyNotTerminated(); + + return firestoreImpl._queue.enqueue(async () => { + const remoteStore = await getRemoteStore(firestoreImpl); + const persistence = await getPersistence(firestoreImpl); + persistence.setNetworkEnabled(false); + return remoteStore.disableNetwork(); + }); } export function terminate( @@ -325,7 +333,7 @@ export function terminate( } function verifyNotInitialized(firestore: Firestore): void { - if (firestore._initialized) { + if (firestore._initialized || firestore._terminated) { throw new FirestoreError( Code.FAILED_PRECONDITION, 'Firestore has already been started and persistence can no longer be ' + diff --git a/packages/firestore/exp/src/api/reference.ts b/packages/firestore/exp/src/api/reference.ts index 6880ccf07c2..b3c29a07d9f 100644 --- a/packages/firestore/exp/src/api/reference.ts +++ b/packages/firestore/exp/src/api/reference.ts @@ -44,7 +44,11 @@ import { Query } from '../../../lite/src/api/reference'; import { Document } from '../../../src/model/document'; -import { DeleteMutation, Precondition } from '../../../src/model/mutation'; +import { + DeleteMutation, + Mutation, + Precondition +} from '../../../src/model/mutation'; import { FieldPath } from '../../../lite/src/api/field_path'; import { CompleteFn, @@ -56,27 +60,46 @@ import { } from '../../../src/api/observer'; import { getEventManager, getLocalStore, getSyncEngine } from './components'; import { - enqueueListen, - enqueueWrite, - enqueueExecuteQueryViaSnapshotListener, - enqueueReadDocumentViaSnapshotListener, - enqueueReadDocumentFromCache, - enqueueExecuteQueryFromCache, - enqueueSnapshotsInSyncListen + executeQueryViaSnapshotListener, + readDocumentViaSnapshotListener, + readDocumentFromCache, + executeQueryFromCache } from '../../../src/core/firestore_client'; -import { newQueryForPath } from '../../../src/core/query'; +import { + newQueryForPath, + Query as InternalQuery +} from '../../../src/core/query'; +import { Deferred } from '../../../src/util/promise'; +import { syncEngineWrite } from '../../../src/core/sync_engine'; +import { AsyncObserver } from '../../../src/util/async_observer'; +import { + addSnapshotsInSyncListener, + eventManagerListen, + eventManagerUnlisten, + QueryListener, + removeSnapshotsInSyncListener +} from '../../../src/core/event_manager'; export function getDoc( reference: firestore.DocumentReference ): Promise> { const ref = cast>(reference, DocumentReference); const firestore = cast(ref.firestore, Firestore); - return getEventManager(firestore).then(eventManager => - enqueueReadDocumentViaSnapshotListener( - firestore._queue, + firestore._verifyNotTerminated(); + + const deferred = new Deferred(); + firestore._queue.enqueueAndForget(async () => { + const eventManager = await getEventManager(firestore); + await readDocumentViaSnapshotListener( eventManager, - ref._key - ).then(doc => convertToDocSnapshot(firestore, ref, doc)) + firestore._queue, + ref._key, + { source: 'default' }, + deferred + ); + }); + return deferred.promise.then(snapshot => + convertToDocSnapshot(firestore, ref, snapshot) ); } @@ -85,20 +108,25 @@ export function getDocFromCache( ): Promise> { const ref = cast>(reference, DocumentReference); const firestore = cast(ref.firestore, Firestore); - return getLocalStore(firestore).then(localStore => - enqueueReadDocumentFromCache(firestore._queue, localStore, ref._key).then( - doc => - new DocumentSnapshot( - firestore, - ref._key, - doc, - new SnapshotMetadata( - doc instanceof Document ? doc.hasLocalMutations : false, - /* fromCache= */ true - ), - ref._converter - ) - ) + firestore._verifyNotTerminated(); + + const deferred = new Deferred(); + firestore._queue.enqueueAndForget(async () => { + const localStore = await getLocalStore(firestore); + await readDocumentFromCache(localStore, ref._key, deferred); + }); + return deferred.promise.then( + doc => + new DocumentSnapshot( + firestore, + ref._key, + doc, + new SnapshotMetadata( + doc instanceof Document ? doc.hasLocalMutations : false, + /* fromCache= */ true + ), + ref._converter + ) ); } @@ -107,13 +135,21 @@ export function getDocFromServer( ): Promise> { const ref = cast>(reference, DocumentReference); const firestore = cast(ref.firestore, Firestore); - return getEventManager(firestore).then(eventManager => - enqueueReadDocumentViaSnapshotListener( - firestore._queue, + firestore._verifyNotTerminated(); + + const deferred = new Deferred(); + firestore._queue.enqueueAndForget(async () => { + const eventManager = await getEventManager(firestore); + await readDocumentViaSnapshotListener( eventManager, + firestore._queue, ref._key, - { source: 'server' } - ).then(viewSnapshot => convertToDocSnapshot(firestore, ref, viewSnapshot)) + { source: 'server' }, + deferred + ); + }); + return deferred.promise.then(snapshot => + convertToDocSnapshot(firestore, ref, snapshot) ); } @@ -122,14 +158,23 @@ export function getDocs( ): Promise> { const internalQuery = cast>(query, Query); const firestore = cast(query.firestore, Firestore); + firestore._verifyNotTerminated(); validateHasExplicitOrderByForLimitToLast(internalQuery._query); - return getEventManager(firestore).then(eventManager => - enqueueExecuteQueryViaSnapshotListener( - firestore._queue, + + const deferred = new Deferred(); + firestore._queue.enqueueAndForget(async () => { + const eventManager = await getEventManager(firestore); + await executeQueryViaSnapshotListener( eventManager, - internalQuery._query - ).then(snapshot => new QuerySnapshot(firestore, internalQuery, snapshot)) + firestore._queue, + internalQuery._query, + { source: 'default' }, + deferred + ); + }); + return deferred.promise.then( + snapshot => new QuerySnapshot(firestore, internalQuery, snapshot) ); } @@ -138,12 +183,15 @@ export function getDocsFromCache( ): Promise> { const internalQuery = cast>(query, Query); const firestore = cast(query.firestore, Firestore); - return getLocalStore(firestore).then(localStore => - enqueueExecuteQueryFromCache( - firestore._queue, - localStore, - internalQuery._query - ).then(snapshot => new QuerySnapshot(firestore, internalQuery, snapshot)) + firestore._verifyNotTerminated(); + + const deferred = new Deferred(); + firestore._queue.enqueueAndForget(async () => { + const localStore = await getLocalStore(firestore); + await executeQueryFromCache(localStore, internalQuery._query, deferred); + }); + return deferred.promise.then( + snapshot => new QuerySnapshot(firestore, internalQuery, snapshot) ); } @@ -152,13 +200,21 @@ export function getDocsFromServer( ): Promise> { const internalQuery = cast>(query, Query); const firestore = cast(query.firestore, Firestore); - return getEventManager(firestore).then(eventManager => - enqueueExecuteQueryViaSnapshotListener( - firestore._queue, + firestore._verifyNotTerminated(); + + const deferred = new Deferred(); + firestore._queue.enqueueAndForget(async () => { + const eventManager = await getEventManager(firestore); + await executeQueryViaSnapshotListener( eventManager, + firestore._queue, internalQuery._query, - { source: 'server' } - ).then(snapshot => new QuerySnapshot(firestore, internalQuery, snapshot)) + { source: 'server' }, + deferred + ); + }); + return deferred.promise.then( + snapshot => new QuerySnapshot(firestore, internalQuery, snapshot) ); } @@ -178,6 +234,7 @@ export function setDoc( ): Promise { const ref = cast>(reference, DocumentReference); const firestore = cast(ref.firestore, Firestore); + firestore._verifyNotTerminated(); const convertedValue = applyFirestoreDataConverter( ref._converter, @@ -194,13 +251,8 @@ export function setDoc( options ); - return getSyncEngine(firestore).then(syncEngine => - enqueueWrite( - firestore._queue, - syncEngine, - parsed.toMutations(ref._key, Precondition.none()) - ) - ); + const mutations = parsed.toMutations(ref._key, Precondition.none()); + return executeWrite(firestore, mutations); } export function updateDoc( @@ -221,6 +273,8 @@ export function updateDoc( ): Promise { const ref = cast>(reference, DocumentReference); const firestore = cast(ref.firestore, Firestore); + firestore._verifyNotTerminated(); + const dataReader = newUserDataReader(firestore); let parsed: ParsedUpdateData; @@ -245,13 +299,8 @@ export function updateDoc( ); } - return getSyncEngine(firestore).then(syncEngine => - enqueueWrite( - firestore._queue, - syncEngine, - parsed.toMutations(ref._key, Precondition.exists(true)) - ) - ); + const mutations = parsed.toMutations(ref._key, Precondition.exists(true)); + return executeWrite(firestore, mutations); } export function deleteDoc( @@ -259,11 +308,10 @@ export function deleteDoc( ): Promise { const ref = cast>(reference, DocumentReference); const firestore = cast(ref.firestore, Firestore); - return getSyncEngine(firestore).then(syncEngine => - enqueueWrite(firestore._queue, syncEngine, [ - new DeleteMutation(ref._key, Precondition.none()) - ]) - ); + firestore._verifyNotTerminated(); + + const mutations = [new DeleteMutation(ref._key, Precondition.none())]; + return executeWrite(firestore, mutations); } export function addDoc( @@ -272,8 +320,9 @@ export function addDoc( ): Promise> { const collRef = cast>(reference, CollectionReference); const firestore = cast(collRef.firestore, Firestore); - const docRef = doc(collRef); + firestore._verifyNotTerminated(); + const docRef = doc(collRef); const convertedValue = applyFirestoreDataConverter(collRef.converter, data); const dataReader = newUserDataReader(collRef.firestore); @@ -286,15 +335,8 @@ export function addDoc( {} ); - return getSyncEngine(firestore) - .then(syncEngine => - enqueueWrite( - firestore._queue, - syncEngine, - parsed.toMutations(docRef._key, Precondition.exists(false)) - ) - ) - .then(() => docRef); + const mutations = parsed.toMutations(docRef._key, Precondition.exists(false)); + return executeWrite(firestore, mutations).then(() => docRef); } // TODO(firestorexp): Make sure these overloads are tested via the Firestore @@ -385,12 +427,15 @@ export function onSnapshot( args[currArg + 2] = userObserver.complete?.bind(userObserver); } - let asyncUnsubscribe: Promise; + let observer: PartialObserver; + let firestore: Firestore; + let internalQuery: InternalQuery; if (ref instanceof DocumentReference) { - const firestore = cast(ref.firestore, Firestore); + firestore = cast(ref.firestore, Firestore); + internalQuery = newQueryForPath(ref._key.path); - const observer: PartialObserver = { + observer = { next: snapshot => { if (args[currArg]) { (args[currArg] as NextFn>)( @@ -401,21 +446,12 @@ export function onSnapshot( error: args[currArg + 1] as ErrorFn, complete: args[currArg + 2] as CompleteFn }; - - asyncUnsubscribe = getEventManager(firestore).then(eventManager => - enqueueListen( - firestore._queue, - eventManager, - newQueryForPath(ref._key.path), - internalOptions, - observer - ) - ); } else { const query = cast>(ref, Query); - const firestore = cast(query.firestore, Firestore); + firestore = cast(query.firestore, Firestore); + internalQuery = query._query; - const observer: PartialObserver = { + observer = { next: snapshot => { if (args[currArg]) { (args[currArg] as NextFn>)( @@ -428,23 +464,27 @@ export function onSnapshot( }; validateHasExplicitOrderByForLimitToLast(query._query); - - asyncUnsubscribe = getEventManager(firestore).then(eventManager => - enqueueListen( - firestore._queue, - eventManager, - query._query, - internalOptions, - observer - ) - ); } - // TODO(firestorexp): Add test that verifies that we don't raise a snapshot if - // unsubscribe is called before `asyncObserver` resolves. + firestore._verifyNotTerminated(); + + const wrappedObserver = new AsyncObserver(observer); + const listener = new QueryListener( + internalQuery, + wrappedObserver, + internalOptions + ); + firestore._queue.enqueueAndForget(async () => { + const eventManager = await getEventManager(firestore); + return eventManagerListen(eventManager, listener); + }); + return () => { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - asyncUnsubscribe.then(unsubscribe => unsubscribe()); + wrappedObserver.mute(); + firestore._queue.enqueueAndForget(async () => { + const eventManager = await getEventManager(firestore); + return eventManagerUnlisten(eventManager, listener); + }); }; } @@ -467,24 +507,42 @@ export function onSnapshotsInSync( arg: unknown ): Unsubscribe { const firestoreImpl = cast(firestore, Firestore); + firestoreImpl._verifyNotTerminated(); + const observer = isPartialObserver(arg) ? (arg as PartialObserver) : { next: arg as () => void }; - const asyncObserver = getEventManager(firestoreImpl).then(eventManager => - enqueueSnapshotsInSyncListen(firestoreImpl._queue, eventManager, observer) - ); + const wrappedObserver = new AsyncObserver(observer); + firestoreImpl._queue.enqueueAndForget(async () => { + const eventManager = await getEventManager(firestoreImpl); + addSnapshotsInSyncListener(eventManager, wrappedObserver); + }); - // TODO(firestorexp): Add test that verifies that we don't raise a snapshot if - // unsubscribe is called before `asyncObserver` resolves. return () => { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - asyncObserver.then(unsubscribe => unsubscribe()); + wrappedObserver.mute(); + firestoreImpl._queue.enqueueAndForget(async () => { + const eventManager = await getEventManager(firestoreImpl); + removeSnapshotsInSyncListener(eventManager, wrappedObserver); + }); }; } +/** Locally writes `mutations` on the async queue. */ +export function executeWrite( + firestore: Firestore, + mutations: Mutation[] +): Promise { + const deferred = new Deferred(); + firestore._queue.enqueueAndForget(async () => { + const syncEngine = await getSyncEngine(firestore); + return syncEngineWrite(syncEngine, mutations, deferred); + }); + return deferred.promise; +} + /** * Converts a ViewSnapshot that contains the single document specified by `ref` * to a DocumentSnapshot. diff --git a/packages/firestore/exp/src/api/transaction.ts b/packages/firestore/exp/src/api/transaction.ts index 8f69a4016c0..d1bc32f5ced 100644 --- a/packages/firestore/exp/src/api/transaction.ts +++ b/packages/firestore/exp/src/api/transaction.ts @@ -29,7 +29,8 @@ import { Transaction as InternalTransaction } from '../../../src/core/transactio import { validateReference } from '../../../lite/src/api/write_batch'; import { getDatastore } from '../../../lite/src/api/components'; -export class Transaction extends LiteTransaction +export class Transaction + extends LiteTransaction implements firestore.Transaction { // This class implements the same logic as the Transaction API in the Lite SDK // but is subclassed in order to return its own DocumentSnapshot types. @@ -68,14 +69,18 @@ export function runTransaction( updateFunction: (transaction: firestore.Transaction) => Promise ): Promise { const firestoreClient = cast(firestore, Firestore); - const datastore = getDatastore(firestoreClient); + firestoreClient._verifyNotTerminated(); + const deferred = new Deferred(); - new TransactionRunner( - new AsyncQueue(), - datastore, - internalTransaction => - updateFunction(new Transaction(firestoreClient, internalTransaction)), - deferred - ).run(); + firestoreClient._queue.enqueueAndForget(async () => { + const datastore = await getDatastore(firestoreClient); + new TransactionRunner( + new AsyncQueue(), + datastore, + internalTransaction => + updateFunction(new Transaction(firestoreClient, internalTransaction)), + deferred + ).run(); + }); return deferred.promise; } diff --git a/packages/firestore/exp/src/api/write_batch.ts b/packages/firestore/exp/src/api/write_batch.ts index 00f2551afef..be259c49185 100644 --- a/packages/firestore/exp/src/api/write_batch.ts +++ b/packages/firestore/exp/src/api/write_batch.ts @@ -22,16 +22,14 @@ import * as firestore from '../../../exp-types'; import { cast } from '../../../lite/src/api/util'; import { WriteBatch } from '../../../lite/src/api/write_batch'; import { Firestore } from './database'; -import { getSyncEngine } from './components'; -import { enqueueWrite } from '../../../src/core/firestore_client'; +import { executeWrite } from './reference'; export function writeBatch( firestore: firestore.FirebaseFirestore ): firestore.WriteBatch { const firestoreImpl = cast(firestore, Firestore); - return new WriteBatch(firestoreImpl, writes => - getSyncEngine(firestoreImpl).then(syncEngine => - enqueueWrite(firestoreImpl._queue, syncEngine, writes) - ) + firestoreImpl._verifyNotTerminated(); + return new WriteBatch(firestoreImpl, mutations => + executeWrite(firestoreImpl, mutations) ); } diff --git a/packages/firestore/lite/src/api/database.ts b/packages/firestore/lite/src/api/database.ts index 6945397f4ac..5075b95f3e1 100644 --- a/packages/firestore/lite/src/api/database.ts +++ b/packages/firestore/lite/src/api/database.ts @@ -52,7 +52,7 @@ export class Firestore readonly app: FirebaseApp, authProvider: Provider ) { - this._databaseId = Firestore.databaseIdFromApp(app); + this._databaseId = Firestore._databaseIdFromApp(app); this._credentials = new FirebaseCredentialsProvider(authProvider); } @@ -84,7 +84,7 @@ export class Firestore return this._settings; } - private static databaseIdFromApp(app: FirebaseApp): DatabaseId { + private static _databaseIdFromApp(app: FirebaseApp): DatabaseId { if (!Object.prototype.hasOwnProperty.apply(app.options, ['projectId'])) { throw new FirestoreError( Code.INVALID_ARGUMENT, diff --git a/packages/firestore/src/core/firestore_client.ts b/packages/firestore/src/core/firestore_client.ts index e435af13f66..2a9d7a704dd 100644 --- a/packages/firestore/src/core/firestore_client.ts +++ b/packages/firestore/src/core/firestore_client.ts @@ -34,14 +34,14 @@ import { Code, FirestoreError } from '../util/error'; import { logDebug } from '../util/log'; import { Deferred } from '../util/promise'; import { + addSnapshotsInSyncListener, EventManager, + eventManagerListen, + eventManagerUnlisten, ListenOptions, Observer, QueryListener, - eventManagerListen, - eventManagerUnlisten, - removeSnapshotsInSyncListener, - addSnapshotsInSyncListener + removeSnapshotsInSyncListener } from './event_manager'; import { registerPendingWritesCallback, @@ -62,7 +62,6 @@ import { OfflineComponentProvider, OnlineComponentProvider } from './component_provider'; -import { PartialObserver, Unsubscribe } from '../api/observer'; import { AsyncObserver } from '../util/async_observer'; import { debugAssert } from '../util/assert'; import { TransactionRunner } from './transaction_runner'; @@ -448,49 +447,59 @@ export class FirestoreClient { ): Promise { this.verifyNotTerminated(); await this.initializationDone.promise; - return enqueueReadDocumentFromCache( - this.asyncQueue, - this.localStore, - docKey + const deferred = new Deferred(); + this.asyncQueue.enqueueAndForget(() => + readDocumentFromCache(this.localStore, docKey, deferred) ); + return deferred.promise; } async getDocumentViaSnapshotListener( key: DocumentKey, - options?: GetOptions + options: GetOptions = {} ): Promise { this.verifyNotTerminated(); await this.initializationDone.promise; - return enqueueReadDocumentViaSnapshotListener( - this.asyncQueue, - this.eventMgr, - key, - options + const deferred = new Deferred(); + this.asyncQueue.enqueueAndForget(() => + readDocumentViaSnapshotListener( + this.eventMgr, + this.asyncQueue, + key, + options, + deferred + ) ); + return deferred.promise; } async getDocumentsFromLocalCache(query: Query): Promise { this.verifyNotTerminated(); await this.initializationDone.promise; - return enqueueExecuteQueryFromCache( - this.asyncQueue, - this.localStore, - query + const deferred = new Deferred(); + this.asyncQueue.enqueueAndForget(() => + executeQueryFromCache(this.localStore, query, deferred) ); + return deferred.promise; } async getDocumentsViaSnapshotListener( query: Query, - options?: GetOptions + options: GetOptions = {} ): Promise { this.verifyNotTerminated(); await this.initializationDone.promise; - return enqueueExecuteQueryViaSnapshotListener( - this.asyncQueue, - this.eventMgr, - query, - options + const deferred = new Deferred(); + this.asyncQueue.enqueueAndForget(() => + executeQueryViaSnapshotListener( + this.eventMgr, + this.asyncQueue, + query, + options, + deferred + ) ); + return deferred.promise; } write(mutations: Mutation[]): Promise { @@ -560,252 +569,174 @@ export class FirestoreClient { } } -export function enqueueWrite( - asyncQueue: AsyncQueue, - syncEngine: SyncEngine, - mutations: Mutation[] -): Promise { - const deferred = new Deferred(); - asyncQueue.enqueueAndForget(() => - syncEngineWrite(syncEngine, mutations, deferred) - ); - return deferred.promise; -} - -export function enqueueNetworkEnabled( - asyncQueue: AsyncQueue, - remoteStore: RemoteStore, - persistence: Persistence, - enabled: boolean -): Promise { - return asyncQueue.enqueue(() => { - persistence.setNetworkEnabled(enabled); - return enabled ? remoteStore.enableNetwork() : remoteStore.disableNetwork(); - }); -} - -export function enqueueWaitForPendingWrites( - asyncQueue: AsyncQueue, - syncEngine: SyncEngine +export async function readDocumentFromCache( + localStore: LocalStore, + docKey: DocumentKey, + result: Deferred ): Promise { - const deferred = new Deferred(); - asyncQueue.enqueueAndForget(() => - registerPendingWritesCallback(syncEngine, deferred) - ); - return deferred.promise; -} - -export function enqueueListen( - asyncQueue: AsyncQueue, - eventManger: EventManager, - query: Query, - options: ListenOptions, - observer: PartialObserver -): Unsubscribe { - const wrappedObserver = new AsyncObserver(observer); - const listener = new QueryListener(query, wrappedObserver, options); - asyncQueue.enqueueAndForget(() => eventManagerListen(eventManger, listener)); - return () => { - wrappedObserver.mute(); - asyncQueue.enqueueAndForget(() => - eventManagerUnlisten(eventManger, listener) + try { + const maybeDoc = await readLocalDocument(localStore, docKey); + if (maybeDoc instanceof Document) { + result.resolve(maybeDoc); + } else if (maybeDoc instanceof NoDocument) { + result.resolve(null); + } else { + result.reject( + new FirestoreError( + Code.UNAVAILABLE, + 'Failed to get document from cache. (However, this document may ' + + "exist on the server. Run again without setting 'source' in " + + 'the GetOptions to attempt to retrieve the document from the ' + + 'server.)' + ) + ); + } + } catch (e) { + const firestoreError = wrapInUserErrorIfRecoverable( + e, + `Failed to get document '${docKey} from cache` ); - }; + result.reject(firestoreError); + } } -export function enqueueSnapshotsInSyncListen( - asyncQueue: AsyncQueue, +/** + * Retrieves a latency-compensated document from the backend via a + * SnapshotListener. + */ +export function readDocumentViaSnapshotListener( eventManager: EventManager, - observer: PartialObserver -): Unsubscribe { - const wrappedObserver = new AsyncObserver(observer); - asyncQueue.enqueueAndForget(async () => - addSnapshotsInSyncListener(eventManager, wrappedObserver) - ); - return () => { - wrappedObserver.mute(); - asyncQueue.enqueueAndForget(async () => - removeSnapshotsInSyncListener(eventManager, wrappedObserver) - ); - }; -} - -export async function enqueueReadDocumentFromCache( asyncQueue: AsyncQueue, - localStore: LocalStore, - docKey: DocumentKey -): Promise { - const deferred = new Deferred(); - await asyncQueue.enqueue(async () => { - try { - const maybeDoc = await readLocalDocument(localStore, docKey); - if (maybeDoc instanceof Document) { - deferred.resolve(maybeDoc); - } else if (maybeDoc instanceof NoDocument) { - deferred.resolve(null); - } else { - deferred.reject( + key: DocumentKey, + options: GetOptions, + result: Deferred +): Promise { + const wrappedObserver = new AsyncObserver({ + next: (snap: ViewSnapshot) => { + // Remove query first before passing event to user to avoid + // user actions affecting the now stale query. + asyncQueue.enqueueAndForget(() => + eventManagerUnlisten(eventManager, listener) + ); + + const exists = snap.docs.has(key); + if (!exists && snap.fromCache) { + // TODO(dimond): If we're online and the document doesn't + // exist then we resolve with a doc.exists set to false. If + // we're offline however, we reject the Promise in this + // case. Two options: 1) Cache the negative response from + // the server so we can deliver that even when you're + // offline 2) Actually reject the Promise in the online case + // if the document doesn't exist. + result.reject( + new FirestoreError( + Code.UNAVAILABLE, + 'Failed to get document because the client is offline.' + ) + ); + } else if ( + exists && + snap.fromCache && + options && + options.source === 'server' + ) { + result.reject( new FirestoreError( Code.UNAVAILABLE, - 'Failed to get document from cache. (However, this document may ' + - "exist on the server. Run again without setting 'source' in " + - 'the GetOptions to attempt to retrieve the document from the ' + - 'server.)' + 'Failed to get document from server. (However, this ' + + 'document does exist in the local cache. Run again ' + + 'without setting source to "server" to ' + + 'retrieve the cached document.)' ) ); + } else { + debugAssert( + snap.docs.size <= 1, + 'Expected zero or a single result on a document-only query' + ); + result.resolve(snap); } - } catch (e) { - const firestoreError = wrapInUserErrorIfRecoverable( - e, - `Failed to get document '${docKey} from cache` - ); - deferred.reject(firestoreError); - } + }, + error: e => result.reject(e) }); - return deferred.promise; -} -/** - * Retrieves a latency-compensated document from the backend via a - * SnapshotListener. - */ -export function enqueueReadDocumentViaSnapshotListener( - asyncQueue: AsyncQueue, - eventManager: EventManager, - key: DocumentKey, - options?: GetOptions -): Promise { - const result = new Deferred(); - const unlisten = enqueueListen( - asyncQueue, - eventManager, + const listener = new QueryListener( newQueryForPath(key.path), + wrappedObserver, { includeMetadataChanges: true, waitForSyncWhenOnline: true - }, - { - next: (snap: ViewSnapshot) => { - // Remove query first before passing event to user to avoid - // user actions affecting the now stale query. - unlisten(); - - const exists = snap.docs.has(key); - if (!exists && snap.fromCache) { - // TODO(dimond): If we're online and the document doesn't - // exist then we resolve with a doc.exists set to false. If - // we're offline however, we reject the Promise in this - // case. Two options: 1) Cache the negative response from - // the server so we can deliver that even when you're - // offline 2) Actually reject the Promise in the online case - // if the document doesn't exist. - result.reject( - new FirestoreError( - Code.UNAVAILABLE, - 'Failed to get document because the client is ' + 'offline.' - ) - ); - } else if ( - exists && - snap.fromCache && - options && - options.source === 'server' - ) { - result.reject( - new FirestoreError( - Code.UNAVAILABLE, - 'Failed to get document from server. (However, this ' + - 'document does exist in the local cache. Run again ' + - 'without setting source to "server" to ' + - 'retrieve the cached document.)' - ) - ); - } else { - debugAssert( - snap.docs.size <= 1, - 'Expected zero or a single result on a document-only query' - ); - result.resolve(snap); - } - }, - error: e => result.reject(e) } ); - return result.promise; + return eventManagerListen(eventManager, listener); } -export async function enqueueExecuteQueryFromCache( - asyncQueue: AsyncQueue, +export async function executeQueryFromCache( localStore: LocalStore, - query: Query -): Promise { - const deferred = new Deferred(); - await asyncQueue.enqueue(async () => { - try { - const queryResult = await executeQuery( - localStore, - query, - /* usePreviousResults= */ true - ); - const view = new View(query, queryResult.remoteKeys); - const viewDocChanges = view.computeDocChanges(queryResult.documents); - const viewChange = view.applyChanges( - viewDocChanges, - /* updateLimboDocuments= */ false - ); - deferred.resolve(viewChange.snapshot!); - } catch (e) { - const firestoreError = wrapInUserErrorIfRecoverable( - e, - `Failed to execute query '${query} against cache` - ); - deferred.reject(firestoreError); - } - }); - return deferred.promise; + query: Query, + result: Deferred +): Promise { + try { + const queryResult = await executeQuery( + localStore, + query, + /* usePreviousResults= */ true + ); + const view = new View(query, queryResult.remoteKeys); + const viewDocChanges = view.computeDocChanges(queryResult.documents); + const viewChange = view.applyChanges( + viewDocChanges, + /* updateLimboDocuments= */ false + ); + result.resolve(viewChange.snapshot!); + } catch (e) { + const firestoreError = wrapInUserErrorIfRecoverable( + e, + `Failed to execute query '${query} against cache` + ); + result.reject(firestoreError); + } } /** * Retrieves a latency-compensated query snapshot from the backend via a * SnapshotListener. */ -export function enqueueExecuteQueryViaSnapshotListener( - asyncQueue: AsyncQueue, +export function executeQueryViaSnapshotListener( eventManager: EventManager, + asyncQueue: AsyncQueue, query: Query, - options?: GetOptions -): Promise { - const result = new Deferred(); - const unlisten = enqueueListen( - asyncQueue, - eventManager, - query, - { - includeMetadataChanges: true, - waitForSyncWhenOnline: true + options: GetOptions, + result: Deferred +): Promise { + const wrappedObserver = new AsyncObserver({ + next: snapshot => { + // Remove query first before passing event to user to avoid + // user actions affecting the now stale query. + asyncQueue.enqueueAndForget(() => + eventManagerUnlisten(eventManager, listener) + ); + + if (snapshot.fromCache && options.source === 'server') { + result.reject( + new FirestoreError( + Code.UNAVAILABLE, + 'Failed to get documents from server. (However, these ' + + 'documents may exist in the local cache. Run again ' + + 'without setting source to "server" to ' + + 'retrieve the cached documents.)' + ) + ); + } else { + result.resolve(snapshot); + } }, - { - next: snapshot => { - // Remove query first before passing event to user to avoid - // user actions affecting the now stale query. - unlisten(); - - if (snapshot.fromCache && options && options.source === 'server') { - result.reject( - new FirestoreError( - Code.UNAVAILABLE, - 'Failed to get documents from server. (However, these ' + - 'documents may exist in the local cache. Run again ' + - 'without setting source to "server" to ' + - 'retrieve the cached documents.)' - ) - ); - } else { - result.resolve(snapshot); - } - }, - error: e => result.reject(e) - } - ); - return result.promise; + error: e => result.reject(e) + }); + + const listener = new QueryListener(query, wrappedObserver, { + includeMetadataChanges: true, + waitForSyncWhenOnline: true + }); + return eventManagerListen(eventManager, listener); }