diff --git a/packages/auth/src/error_auth.js b/packages/auth/src/error_auth.js index 251718ea235..e6ae679ff62 100644 --- a/packages/auth/src/error_auth.js +++ b/packages/auth/src/error_auth.js @@ -147,6 +147,7 @@ fireauth.authenum.Error = { INVALID_PASSWORD: 'wrong-password', INVALID_PERSISTENCE: 'invalid-persistence-type', INVALID_PHONE_NUMBER: 'invalid-phone-number', + INVALID_PROVIDER_ID: 'invalid-provider-id', INVALID_RECIPIENT_EMAIL: 'invalid-recipient-email', INVALID_SENDER: 'invalid-sender', INVALID_SESSION_INFO: 'invalid-verification-id', @@ -294,6 +295,8 @@ fireauth.AuthError.MESSAGES_[fireauth.authenum.Error.INVALID_PHONE_NUMBER] = 'phone number in a format that can be parsed into E.164 format. E.164 ' + 'phone numbers are written in the format [+][country code][subscriber ' + 'number including area code].'; +fireauth.AuthError.MESSAGES_[fireauth.authenum.Error.INVALID_PROVIDER_ID] = + 'The specified provider ID is invalid.'; fireauth.AuthError.MESSAGES_[fireauth.authenum.Error.INVALID_RECIPIENT_EMAIL] = 'The email corresponding to this action failed to send as the provided ' + 'recipient email address is invalid.'; diff --git a/packages/auth/src/rpchandler.js b/packages/auth/src/rpchandler.js index 65a5352e38f..14aa28f4a9d 100644 --- a/packages/auth/src/rpchandler.js +++ b/packages/auth/src/rpchandler.js @@ -207,6 +207,7 @@ fireauth.RpcHandler.ServerError = { INVALID_OOB_CODE: 'INVALID_OOB_CODE', INVALID_PASSWORD: 'INVALID_PASSWORD', INVALID_PHONE_NUMBER: 'INVALID_PHONE_NUMBER', + INVALID_PROVIDER_ID: 'INVALID_PROVIDER_ID', INVALID_RECIPIENT_EMAIL: 'INVALID_RECIPIENT_EMAIL', INVALID_SENDER: 'INVALID_SENDER', INVALID_SESSION_INFO: 'INVALID_SESSION_INFO', @@ -2244,6 +2245,10 @@ fireauth.RpcHandler.getDeveloperError_ = errorMap[fireauth.RpcHandler.ServerError.MISSING_OOB_CODE] = fireauth.authenum.Error.INTERNAL_ERROR; + // Get Auth URI errors: + errorMap[fireauth.RpcHandler.ServerError.INVALID_PROVIDER_ID] = + fireauth.authenum.Error.INVALID_PROVIDER_ID; + // Operations that require ID token in request: errorMap[fireauth.RpcHandler.ServerError.CREDENTIAL_TOO_OLD_LOGIN_AGAIN] = fireauth.authenum.Error.CREDENTIAL_TOO_OLD_LOGIN_AGAIN; diff --git a/packages/auth/test/rpchandler_test.js b/packages/auth/test/rpchandler_test.js index 08fa7837219..27adb1e2471 100644 --- a/packages/auth/test/rpchandler_test.js +++ b/packages/auth/test/rpchandler_test.js @@ -5268,6 +5268,30 @@ function testGetAuthUri_success() { } +/** + * Tests server side getAuthUri error. + */ +function testGetAuthUri_caughtServerError() { + var expectedUrl = 'https://www.googleapis.com/identitytoolkit/v3/relyin' + + 'gparty/createAuthUri?key=apiKey'; + var requestBody = { + 'providerId': 'abc.com', + 'continueUri': 'http://localhost/widget', + 'customParameter': {} + }; + var errorMap = {}; + // All related server errors for getAuthUri. + errorMap[fireauth.RpcHandler.ServerError.INVALID_PROVIDER_ID] = + fireauth.authenum.Error.INVALID_PROVIDER_ID; + + assertServerErrorsAreHandled(function() { + return rpcHandler.getAuthUri( + 'abc.com', + 'http://localhost/widget'); + }, errorMap, expectedUrl, requestBody); +} + + /** * Tests successful getAuthUri request with Google provider and sessionId. */ diff --git a/packages/firestore/CHANGELOG.md b/packages/firestore/CHANGELOG.md index ca0b5555545..a9180db70fa 100644 --- a/packages/firestore/CHANGELOG.md +++ b/packages/firestore/CHANGELOG.md @@ -1,4 +1,9 @@ # Unreleased +- [changed] Improved how Firestore handles idle queries to reduce the cost of + re-listening within 30 minutes. +- [changed] Improved offline performance with many outstanding writes. + +# 0.6.0 - [fixed] Fixed an issue where queries returned fewer results than they should, caused by documents that were cached as deleted when they should not have been (firebase/firebase-ios-sdk#1548). Because some cache data is cleared, diff --git a/packages/firestore/src/core/sync_engine.ts b/packages/firestore/src/core/sync_engine.ts index 212cd5cb86b..6319a2e85f5 100644 --- a/packages/firestore/src/core/sync_engine.ts +++ b/packages/firestore/src/core/sync_engine.ts @@ -50,7 +50,6 @@ import { MutationBatchState, OnlineState, OnlineStateSource, - ProtoByteString, TargetId } from './types'; import { @@ -88,12 +87,6 @@ class QueryView { * stream to identify this query. */ public targetId: TargetId, - /** - * An identifier from the datastore backend that indicates the last state - * of the results that was received. This can be used to indicate where - * to continue receiving new doc changes for the query. - */ - public resumeToken: ProtoByteString, /** * The view is responsible for computing the final merged truth of what * docs are in the query. It gets notified of local and remote changes, @@ -274,12 +267,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { 'applyChanges for new view should always return a snapshot' ); - const data = new QueryView( - query, - queryData.targetId, - queryData.resumeToken, - view - ); + const data = new QueryView(query, queryData.targetId, view); this.queryViewsByQuery.set(query, data); this.queryViewsByTarget[queryData.targetId] = data; return viewChange.snapshot!; diff --git a/packages/firestore/src/local/indexeddb_mutation_queue.ts b/packages/firestore/src/local/indexeddb_mutation_queue.ts index 5cf51246c27..1b0c902b22c 100644 --- a/packages/firestore/src/local/indexeddb_mutation_queue.ts +++ b/packages/firestore/src/local/indexeddb_mutation_queue.ts @@ -18,6 +18,7 @@ import { Timestamp } from '../api/timestamp'; import { User } from '../auth/user'; import { Query } from '../core/query'; import { BatchId, ProtoByteString } from '../core/types'; +import { DocumentKeySet } from '../model/collections'; import { DocumentKey } from '../model/document_key'; import { Mutation } from '../model/mutation'; import { BATCHID_UNKNOWN, MutationBatch } from '../model/mutation_batch'; @@ -40,8 +41,8 @@ import { LocalSerializer } from './local_serializer'; import { MutationQueue } from './mutation_queue'; import { PersistenceTransaction } from './persistence'; import { PersistencePromise } from './persistence_promise'; -import { SimpleDb, SimpleDbStore } from './simple_db'; -import { DocumentKeySet } from '../model/collections'; +import { SimpleDbStore } from './simple_db'; +import { IndexedDbPersistence } from './indexeddb_persistence'; /** A mutation queue for a specific user, backed by IndexedDB. */ export class IndexedDbMutationQueue implements MutationQueue { @@ -342,6 +343,50 @@ export class IndexedDbMutationQueue implements MutationQueue { .next(() => results); } + getAllMutationBatchesAffectingDocumentKeys( + transaction: PersistenceTransaction, + documentKeys: DocumentKeySet + ): PersistencePromise { + let uniqueBatchIDs = new SortedSet(primitiveComparator); + + const promises: Array> = []; + documentKeys.forEach(documentKey => { + const indexStart = DbDocumentMutation.prefixForPath( + this.userId, + documentKey.path + ); + const range = IDBKeyRange.lowerBound(indexStart); + + const promise = documentMutationsStore(transaction).iterate( + { range }, + (indexKey, _, control) => { + const [userID, encodedPath, batchID] = indexKey; + + // Only consider rows matching exactly the specific key of + // interest. Note that because we order by path first, and we + // order terminators before path separators, we'll encounter all + // the index rows for documentKey contiguously. In particular, all + // the rows for documentKey will occur before any rows for + // documents nested in a subcollection beneath documentKey so we + // can stop as soon as we hit any such row. + const path = EncodedResourcePath.decode(encodedPath); + if (userID !== this.userId || !documentKey.path.isEqual(path)) { + control.done(); + return; + } + + uniqueBatchIDs = uniqueBatchIDs.add(batchID); + } + ); + + promises.push(promise); + }); + + return PersistencePromise.waitFor(promises).next(() => + this.lookupMutationBatches(transaction, uniqueBatchIDs) + ); + } + getAllMutationBatchesAffectingQuery( transaction: PersistenceTransaction, query: Query @@ -393,34 +438,39 @@ export class IndexedDbMutationQueue implements MutationQueue { } uniqueBatchIDs = uniqueBatchIDs.add(batchID); }) - .next(() => { - const results: MutationBatch[] = []; - const promises: Array> = []; - // TODO(rockwood): Implement this using iterate. - uniqueBatchIDs.forEach(batchId => { - promises.push( - mutationsStore(transaction) - .get(batchId) - .next(mutation => { - if (!mutation) { - fail( - 'Dangling document-mutation reference found, ' + - 'which points to ' + - batchId - ); - } - assert( - mutation.userId === this.userId, - `Unexpected user '${ - mutation.userId - }' for mutation batch ${batchId}` - ); - results.push(this.serializer.fromDbMutationBatch(mutation!)); - }) - ); - }); - return PersistencePromise.waitFor(promises).next(() => results); - }); + .next(() => this.lookupMutationBatches(transaction, uniqueBatchIDs)); + } + + private lookupMutationBatches( + transaction: PersistenceTransaction, + batchIDs: SortedSet + ): PersistencePromise { + const results: MutationBatch[] = []; + const promises: Array> = []; + // TODO(rockwood): Implement this using iterate. + batchIDs.forEach(batchId => { + promises.push( + mutationsStore(transaction) + .get(batchId) + .next(mutation => { + if (mutation === null) { + fail( + 'Dangling document-mutation reference found, ' + + 'which points to ' + + batchId + ); + } + assert( + mutation.userId === this.userId, + `Unexpected user '${ + mutation.userId + }' for mutation batch ${batchId}` + ); + results.push(this.serializer.fromDbMutationBatch(mutation!)); + }) + ); + }); + return PersistencePromise.waitFor(promises).next(() => results); } removeMutationBatches( @@ -567,7 +617,7 @@ function convertStreamToken(token: ProtoByteString): string { function mutationsStore( txn: PersistenceTransaction ): SimpleDbStore { - return SimpleDb.getStore( + return IndexedDbPersistence.getStore( txn, DbMutationBatch.store ); @@ -579,10 +629,10 @@ function mutationsStore( function documentMutationsStore( txn: PersistenceTransaction ): SimpleDbStore { - return SimpleDb.getStore( - txn, - DbDocumentMutation.store - ); + return IndexedDbPersistence.getStore< + DbDocumentMutationKey, + DbDocumentMutation + >(txn, DbDocumentMutation.store); } /** @@ -591,7 +641,7 @@ function documentMutationsStore( function mutationQueuesStore( txn: PersistenceTransaction ): SimpleDbStore { - return SimpleDb.getStore( + return IndexedDbPersistence.getStore( txn, DbMutationQueue.store ); diff --git a/packages/firestore/src/local/indexeddb_persistence.ts b/packages/firestore/src/local/indexeddb_persistence.ts index a60d3dc883d..830df9b30ab 100644 --- a/packages/firestore/src/local/indexeddb_persistence.ts +++ b/packages/firestore/src/local/indexeddb_persistence.ts @@ -17,7 +17,7 @@ import { User } from '../auth/user'; import { DatabaseInfo } from '../core/database_info'; import { JsonProtoSerializer } from '../remote/serializer'; -import { assert } from '../util/assert'; +import { assert, fail } from '../util/assert'; import { Code, FirestoreError } from '../util/error'; import * as log from '../util/log'; @@ -83,6 +83,12 @@ const UNSUPPORTED_PLATFORM_ERROR_MSG = // firestore_zombie__ const ZOMBIED_CLIENTS_KEY_PREFIX = 'firestore_zombie'; +export class IndexedDbTransaction extends PersistenceTransaction { + constructor(readonly simpleDbTransaction: SimpleDbTransaction) { + super(); + } +} + /** * An IndexedDB-backed instance of Persistence. Data is stored persistently * across sessions. @@ -115,6 +121,17 @@ const ZOMBIED_CLIENTS_KEY_PREFIX = 'firestore_zombie'; * TODO(multitab): Update this comment with multi-tab changes. */ export class IndexedDbPersistence implements Persistence { + static getStore( + txn: PersistenceTransaction, + store: string + ): SimpleDbStore { + if (txn instanceof IndexedDbTransaction) { + return SimpleDb.getStore(txn.simpleDbTransaction, store); + } else { + fail('IndexedDbPersistence must use instances of IndexedDbTransaction'); + } + } + /** * The name of the main (and currently only) IndexedDB database. this name is * appended to the prefix provided to the IndexedDbPersistence constructor. @@ -470,7 +487,7 @@ export class IndexedDbPersistence implements Persistence { action: string, requirePrimaryLease: boolean, transactionOperation: ( - transaction: PersistenceTransaction + transaction: IndexedDbTransaction ) => PersistencePromise ): Promise { // TODO(multitab): Consider removing `requirePrimaryLease` and exposing @@ -483,39 +500,47 @@ export class IndexedDbPersistence implements Persistence { // Do all transactions as readwrite against all object stores, since we // are the only reader/writer. - return this.simpleDb.runTransaction('readwrite', ALL_STORES, txn => { - if (requirePrimaryLease) { - // While we merely verify that we have (or can acquire) the lease - // immediately, we wait to extend the primary lease until after - // executing transactionOperation(). This ensures that even if the - // transactionOperation takes a long time, we'll use a recent - // leaseTimestampMs in the extended (or newly acquired) lease. - return this.canActAsPrimary(txn) - .next(canActAsPrimary => { - if (!canActAsPrimary) { - // TODO(multitab): Handle this gracefully and transition back to - // secondary state. - log.error( - `Failed to obtain primary lease for action '${action}'.` + return this.simpleDb.runTransaction( + 'readwrite', + ALL_STORES, + simpleDbTxn => { + if (requirePrimaryLease) { + // While we merely verify that we have (or can acquire) the lease + // immediately, we wait to extend the primary lease until after + // executing transactionOperation(). This ensures that even if the + // transactionOperation takes a long time, we'll use a recent + // leaseTimestampMs in the extended (or newly acquired) lease. + return this.canActAsPrimary(simpleDbTxn) + .next(canActAsPrimary => { + if (!canActAsPrimary) { + // TODO(multitab): Handle this gracefully and transition back to + // secondary state. + log.error( + `Failed to obtain primary lease for action '${action}'.` + ); + this.isPrimary = false; + this.queue.enqueue(() => this.primaryStateListener(false)); + throw new FirestoreError( + Code.FAILED_PRECONDITION, + PRIMARY_LEASE_LOST_ERROR_MSG + ); + } + return transactionOperation( + new IndexedDbTransaction(simpleDbTxn) ); - this.isPrimary = false; - this.queue.enqueue(() => this.primaryStateListener(false)); - throw new FirestoreError( - Code.FAILED_PRECONDITION, - PRIMARY_LEASE_LOST_ERROR_MSG + }) + .next(result => { + return this.acquireOrExtendPrimaryLease(simpleDbTxn).next( + () => result ); - } - return transactionOperation(txn); - }) - .next(result => { - return this.acquireOrExtendPrimaryLease(txn).next(() => result); - }); - } else { - return this.verifyAllowTabSynchronization(txn).next(() => - transactionOperation(txn) - ); + }); + } else { + return this.verifyAllowTabSynchronization(simpleDbTxn).next(() => + transactionOperation(new IndexedDbTransaction(simpleDbTxn)) + ); + } } - }); + ); } /** diff --git a/packages/firestore/src/local/indexeddb_query_cache.ts b/packages/firestore/src/local/indexeddb_query_cache.ts index 79e107ad067..e268a7680a9 100644 --- a/packages/firestore/src/local/indexeddb_query_cache.ts +++ b/packages/firestore/src/local/indexeddb_query_cache.ts @@ -38,8 +38,9 @@ import { PersistenceTransaction } from './persistence'; import { PersistencePromise } from './persistence_promise'; import { QueryCache } from './query_cache'; import { QueryData } from './query_data'; -import { SimpleDb, SimpleDbStore } from './simple_db'; import { TargetIdGenerator } from '../core/target_id_generator'; +import { SimpleDbStore } from './simple_db'; +import { IndexedDbPersistence } from './indexeddb_persistence'; export class IndexedDbQueryCache implements QueryCache { constructor(private serializer: LocalSerializer) {} @@ -221,7 +222,7 @@ export class IndexedDbQueryCache implements QueryCache { targetId: TargetId ): PersistencePromise { // PORTING NOTE: The reverse index (documentsTargets) is maintained by - // Indexeddb. + // IndexedDb. const promises: Array> = []; const store = documentTargetStore(txn); keys.forEach(key => { @@ -316,6 +317,8 @@ export class IndexedDbQueryCache implements QueryCache { this.garbageCollector = gc; } + // TODO(gsoltis): we can let the compiler assert that txn !== null if we + // drop null from the type bounds on txn. containsKey( txn: PersistenceTransaction | null, key: DocumentKey @@ -369,7 +372,10 @@ export class IndexedDbQueryCache implements QueryCache { function targetsStore( txn: PersistenceTransaction ): SimpleDbStore { - return SimpleDb.getStore(txn, DbTarget.store); + return IndexedDbPersistence.getStore( + txn, + DbTarget.store + ); } /** @@ -378,7 +384,7 @@ function targetsStore( function globalTargetStore( txn: PersistenceTransaction ): SimpleDbStore { - return SimpleDb.getStore( + return IndexedDbPersistence.getStore( txn, DbTargetGlobal.store ); @@ -390,7 +396,7 @@ function globalTargetStore( function documentTargetStore( txn: PersistenceTransaction ): SimpleDbStore { - return SimpleDb.getStore( + return IndexedDbPersistence.getStore( txn, DbTargetDocument.store ); diff --git a/packages/firestore/src/local/indexeddb_remote_document_cache.ts b/packages/firestore/src/local/indexeddb_remote_document_cache.ts index ef4bebdea80..53a4340c1b1 100644 --- a/packages/firestore/src/local/indexeddb_remote_document_cache.ts +++ b/packages/firestore/src/local/indexeddb_remote_document_cache.ts @@ -31,13 +31,14 @@ import { DbRemoteDocumentChanges, DbRemoteDocumentChangesKey } from './indexeddb_schema'; +import { IndexedDbPersistence } from './indexeddb_persistence'; import { LocalSerializer } from './local_serializer'; import { PersistenceTransaction } from './persistence'; import { PersistencePromise } from './persistence_promise'; import { RemoteDocumentCache } from './remote_document_cache'; -import { SimpleDb, SimpleDbStore } from './simple_db'; import { SnapshotVersion } from '../core/snapshot_version'; import { assert } from '../util/assert'; +import { SimpleDbStore } from './simple_db'; export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache { /** The last id read by `getNewDocumentChanges()`. */ @@ -191,7 +192,7 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache { function remoteDocumentsStore( txn: PersistenceTransaction ): SimpleDbStore { - return SimpleDb.getStore( + return IndexedDbPersistence.getStore( txn, DbRemoteDocument.store ); @@ -204,10 +205,10 @@ function remoteDocumentsStore( function documentChangesStore( txn: PersistenceTransaction ): SimpleDbStore { - return SimpleDb.getStore( - txn, - DbRemoteDocumentChanges.store - ); + return IndexedDbPersistence.getStore< + DbRemoteDocumentChangesKey, + DbRemoteDocumentChanges + >(txn, DbRemoteDocumentChanges.store); } function dbKey(docKey: DocumentKey): DbRemoteDocumentKey { diff --git a/packages/firestore/src/local/local_documents_view.ts b/packages/firestore/src/local/local_documents_view.ts index 6466e8a26c2..78f33d9e050 100644 --- a/packages/firestore/src/local/local_documents_view.ts +++ b/packages/firestore/src/local/local_documents_view.ts @@ -17,7 +17,6 @@ import { Query } from '../core/query'; import { SnapshotVersion } from '../core/snapshot_version'; import { - documentKeySet, DocumentKeySet, DocumentMap, documentMap, @@ -26,6 +25,7 @@ import { } from '../model/collections'; import { Document, MaybeDocument, NoDocument } from '../model/document'; import { DocumentKey } from '../model/document_key'; +import { MutationBatch } from '../model/mutation_batch'; import { ResourcePath } from '../model/path'; import { fail } from '../util/assert'; @@ -56,11 +56,23 @@ export class LocalDocumentsView { transaction: PersistenceTransaction, key: DocumentKey ): PersistencePromise { - return this.remoteDocumentCache - .getEntry(transaction, key) - .next(remoteDoc => { - return this.computeLocalDocument(transaction, key, remoteDoc); - }); + return this.mutationQueue + .getAllMutationBatchesAffectingDocumentKey(transaction, key) + .next(batches => this.getDocumentInternal(transaction, key, batches)); + } + + /** Internal version of `getDocument` that allows reusing batches. */ + private getDocumentInternal( + transaction: PersistenceTransaction, + key: DocumentKey, + inBatches: MutationBatch[] + ): PersistencePromise { + return this.remoteDocumentCache.getEntry(transaction, key).next(doc => { + for (const batch of inBatches) { + doc = batch.applyToLocalView(key, doc); + } + return doc; + }); } /** @@ -73,20 +85,29 @@ export class LocalDocumentsView { transaction: PersistenceTransaction, keys: DocumentKeySet ): PersistencePromise { - const promises = [] as Array>; - let results = maybeDocumentMap(); - keys.forEach(key => { - promises.push( - this.getDocument(transaction, key).next(maybeDoc => { - // TODO(http://b/32275378): Don't conflate missing / deleted. - if (!maybeDoc) { - maybeDoc = new NoDocument(key, SnapshotVersion.forDeletedDoc()); - } - results = results.insert(key, maybeDoc); - }) - ); - }); - return PersistencePromise.waitFor(promises).next(() => results); + return this.mutationQueue + .getAllMutationBatchesAffectingDocumentKeys(transaction, keys) + .next(batches => { + const promises = [] as Array>; + let results = maybeDocumentMap(); + keys.forEach(key => { + promises.push( + this.getDocumentInternal(transaction, key, batches).next( + maybeDoc => { + // TODO(http://b/32275378): Don't conflate missing / deleted. + if (!maybeDoc) { + maybeDoc = new NoDocument( + key, + SnapshotVersion.forDeletedDoc() + ); + } + results = results.insert(key, maybeDoc); + } + ) + ); + }); + return PersistencePromise.waitFor(promises).next(() => results); + }); } /** @@ -126,48 +147,40 @@ export class LocalDocumentsView { query: Query ): PersistencePromise { // Query the remote documents and overlay mutations. - // TODO(mikelehen): There may be significant overlap between the mutations - // affecting these remote documents and the - // getAllMutationBatchesAffectingQuery() mutations. Consider optimizing. let results: DocumentMap; return this.remoteDocumentCache .getDocumentsMatchingQuery(transaction, query) .next(queryResults => { - return this.computeLocalDocuments(transaction, queryResults); - }) - .next(promisedResults => { - results = promisedResults; - // Now use the mutation queue to discover any other documents that may - // match the query after applying mutations. + results = queryResults; return this.mutationQueue.getAllMutationBatchesAffectingQuery( transaction, query ); }) .next(matchingMutationBatches => { - let matchingKeys = documentKeySet(); for (const batch of matchingMutationBatches) { for (const mutation of batch.mutations) { - // TODO(mikelehen): PERF: Check if this mutation actually - // affects the query to reduce work. - if (!results.get(mutation.key)) { - matchingKeys = matchingKeys.add(mutation.key); + const key = mutation.key; + // Only process documents belonging to the collection. + if (!query.path.isImmediateParentOf(key.path)) { + continue; + } + + const baseDoc = results.get(key); + const mutatedDoc = mutation.applyToLocalView( + baseDoc, + baseDoc, + batch.localWriteTime + ); + if (!mutatedDoc || mutatedDoc instanceof NoDocument) { + results = results.remove(key); + } else if (mutatedDoc instanceof Document) { + results = results.insert(key, mutatedDoc); + } else { + fail('Unknown MaybeDocument: ' + mutatedDoc); } } } - - // Now add in the results for the matchingKeys. - const promises = [] as Array>; - matchingKeys.forEach(key => { - promises.push( - this.getDocument(transaction, key).next(doc => { - if (doc instanceof Document) { - results = results.insert(doc.key, doc); - } - }) - ); - }); - return PersistencePromise.waitFor(promises); }) .next(() => { // Finally, filter out any documents that don't actually match @@ -181,57 +194,4 @@ export class LocalDocumentsView { return results; }); } - - /** - * Takes a remote document and applies local mutations to generate the local - * view of the document. - * @param transaction The transaction in which to perform any persistence - * operations. - * @param documentKey The key of the document (necessary when remoteDocument - * is null). - * @param document The base remote document to apply mutations to or null. - */ - private computeLocalDocument( - transaction: PersistenceTransaction, - documentKey: DocumentKey, - document: MaybeDocument | null - ): PersistencePromise { - return this.mutationQueue - .getAllMutationBatchesAffectingDocumentKey(transaction, documentKey) - .next(batches => { - for (const batch of batches) { - document = batch.applyToLocalView(documentKey, document); - } - return document; - }); - } - - /** - * Takes a set of remote documents and applies local mutations to generate the - * local view of the documents. - * @param transaction The transaction in which to perform any persistence - * operations. - * @param documents The base remote documents to apply mutations to. - * @return The local view of the documents. - */ - private computeLocalDocuments( - transaction: PersistenceTransaction, - documents: DocumentMap - ): PersistencePromise { - const promises = [] as Array>; - documents.forEach((key, doc) => { - promises.push( - this.computeLocalDocument(transaction, key, doc).next(mutatedDoc => { - if (mutatedDoc instanceof Document) { - documents = documents.insert(mutatedDoc.key, mutatedDoc); - } else if (mutatedDoc instanceof NoDocument) { - documents = documents.remove(mutatedDoc.key); - } else { - fail('Unknown MaybeDocument: ' + mutatedDoc); - } - }) - ); - }); - return PersistencePromise.waitFor(promises).next(() => documents); - } } diff --git a/packages/firestore/src/local/local_store.ts b/packages/firestore/src/local/local_store.ts index 340c6bfb804..b258d9eb4ac 100644 --- a/packages/firestore/src/local/local_store.ts +++ b/packages/firestore/src/local/local_store.ts @@ -118,6 +118,15 @@ export interface UserChangeResult { * unrecoverable error (should be caught / reported by the async_queue). */ export class LocalStore { + /** + * The maximum time to leave a resume token buffered without writing it out. + * This value is arbitrary: it's long enough to avoid several writes + * (possibly indefinitely if updates come more frequently than this) but + * short enough that restarting after crashing will still have a pretty + * recent resume token. + */ + private static readonly RESUME_TOKEN_MAX_AGE_MICROS = 5 * 60 * 1e6; + /** * The set of all mutations that have been sent but not yet been applied to * the backend. @@ -543,12 +552,18 @@ export class LocalStore { // any preexisting value. const resumeToken = change.resumeToken; if (resumeToken.length > 0) { + const oldQueryData = queryData; queryData = queryData.copy({ resumeToken, snapshotVersion: remoteEvent.snapshotVersion }); this.targetIds[targetId] = queryData; - promises.push(this.queryCache.updateQueryData(txn, queryData)); + + if ( + LocalStore.shouldPersistQueryData(oldQueryData, queryData, change) + ) { + promises.push(this.queryCache.updateQueryData(txn, queryData)); + } } } ); @@ -630,6 +645,50 @@ export class LocalStore { }); } + /** + * Returns true if the newQueryData should be persisted during an update of + * an active target. QueryData should always be persisted when a target is + * being released and should not call this function. + * + * While the target is active, QueryData updates can be omitted when nothing + * about the target has changed except metadata like the resume token or + * snapshot version. Occasionally it's worth the extra write to prevent these + * values from getting too stale after a crash, but this doesn't have to be + * too frequent. + */ + private static shouldPersistQueryData( + oldQueryData: QueryData, + newQueryData: QueryData, + change: TargetChange + ): boolean { + // Avoid clearing any existing value + if (newQueryData.resumeToken.length === 0) return false; + + // Any resume token is interesting if there isn't one already. + if (oldQueryData.resumeToken.length === 0) return true; + + // Don't allow resume token changes to be buffered indefinitely. This + // allows us to be reasonably up-to-date after a crash and avoids needing + // to loop over all active queries on shutdown. Especially in the browser + // we may not get time to do anything interesting while the current tab is + // closing. + const timeDelta = + newQueryData.snapshotVersion.toMicroseconds() - + oldQueryData.snapshotVersion.toMicroseconds(); + if (timeDelta >= this.RESUME_TOKEN_MAX_AGE_MICROS) return true; + + // Otherwise if the only thing that has changed about a target is its resume + // token it's not worth persisting. Note that the RemoteStore keeps an + // in-memory view of the currently active targets which includes the current + // resume token, so stream failure or user changes will still use an + // up-to-date resume token regardless of what we do here. + const changes = + change.addedDocuments.size + + change.modifiedDocuments.size + + change.removedDocuments.size; + return changes > 0; + } + /** * Notify local store of the changed views to locally pin documents. */ @@ -732,10 +791,21 @@ export class LocalStore { queryData != null, 'Tried to release nonexistent query: ' + query ); - this.localViewReferences.removeReferencesForId(queryData!.targetId); - delete this.targetIds[queryData!.targetId]; + const targetId = queryData!.targetId; + const cachedQueryData = this.targetIds[targetId]; + + this.localViewReferences.removeReferencesForId(targetId); + delete this.targetIds[targetId]; if (!keepPersistedQueryData && this.garbageCollector.isEager) { return this.queryCache.removeQueryData(txn, queryData!); + } else if ( + cachedQueryData.snapshotVersion > queryData!.snapshotVersion + ) { + // If we've been avoiding persisting the resumeToken (see + // shouldPersistQueryData for conditions and rationale) we need to + // persist the token now because there will no longer be an + // in-memory version to fall back on. + return this.queryCache.updateQueryData(txn, cachedQueryData); } else { return PersistencePromise.resolve(); } @@ -748,12 +818,8 @@ export class LocalStore { this.remoteDocuments ); return this.releaseHeldBatchResults(txn, documentBuffer).next( - () => { - documentBuffer.apply(txn); - } + () => documentBuffer.apply(txn) ); - } else { - return PersistencePromise.resolve(); } }); } diff --git a/packages/firestore/src/local/memory_mutation_queue.ts b/packages/firestore/src/local/memory_mutation_queue.ts index d87876a86e1..ba5af955576 100644 --- a/packages/firestore/src/local/memory_mutation_queue.ts +++ b/packages/firestore/src/local/memory_mutation_queue.ts @@ -17,6 +17,7 @@ import { Timestamp } from '../api/timestamp'; import { Query } from '../core/query'; import { BatchId, ProtoByteString } from '../core/types'; +import { DocumentKeySet } from '../model/collections'; import { DocumentKey } from '../model/document_key'; import { Mutation } from '../model/mutation'; import { BATCHID_UNKNOWN, MutationBatch } from '../model/mutation_batch'; @@ -30,7 +31,6 @@ import { MutationQueue } from './mutation_queue'; import { PersistenceTransaction } from './persistence'; import { PersistencePromise } from './persistence_promise'; import { DocReference } from './reference_set'; -import { DocumentKeySet } from '../model/collections'; export class MemoryMutationQueue implements MutationQueue { /** @@ -249,6 +249,28 @@ export class MemoryMutationQueue implements MutationQueue { return PersistencePromise.resolve(result); } + getAllMutationBatchesAffectingDocumentKeys( + transaction: PersistenceTransaction, + documentKeys: DocumentKeySet + ): PersistencePromise { + let uniqueBatchIDs = new SortedSet(primitiveComparator); + + documentKeys.forEach(documentKey => { + const start = new DocReference(documentKey, 0); + const end = new DocReference(documentKey, Number.POSITIVE_INFINITY); + this.batchesByDocumentKey.forEachInRange([start, end], ref => { + assert( + documentKey.isEqual(ref.key), + "For each key, should only iterate over a single key's batches" + ); + + uniqueBatchIDs = uniqueBatchIDs.add(ref.targetOrBatchId); + }); + }); + + return PersistencePromise.resolve(this.findMutationBatches(uniqueBatchIDs)); + } + getAllMutationBatchesAffectingQuery( transaction: PersistenceTransaction, query: Query @@ -290,16 +312,20 @@ export class MemoryMutationQueue implements MutationQueue { } }, start); + return PersistencePromise.resolve(this.findMutationBatches(uniqueBatchIDs)); + } + + private findMutationBatches(batchIDs: SortedSet): MutationBatch[] { // Construct an array of matching batches, sorted by batchID to ensure that // multiple mutations affecting the same document key are applied in order. const result: MutationBatch[] = []; - uniqueBatchIDs.forEach(batchId => { + batchIDs.forEach(batchId => { const batch = this.findMutationBatch(batchId); if (batch !== null) { result.push(batch); } }); - return PersistencePromise.resolve(result); + return result; } removeMutationBatches( diff --git a/packages/firestore/src/local/memory_persistence.ts b/packages/firestore/src/local/memory_persistence.ts index e17e8536d4d..f5d69e54b4c 100644 --- a/packages/firestore/src/local/memory_persistence.ts +++ b/packages/firestore/src/local/memory_persistence.ts @@ -109,9 +109,12 @@ export class MemoryPersistence implements Persistence { ) => PersistencePromise ): Promise { debug(LOG_TAG, 'Starting transaction:', action); - return transactionOperation(new MemoryPersistenceTransaction()).toPromise(); + return transactionOperation(new MemoryTransaction()).toPromise(); } } -/** Dummy class since memory persistence doesn't actually use transactions. */ -class MemoryPersistenceTransaction implements PersistenceTransaction {} +/** + * Memory persistence is not actually transactional, but future implementations + * may have transaction-scoped state. + */ +export class MemoryTransaction implements PersistenceTransaction {} diff --git a/packages/firestore/src/local/mutation_queue.ts b/packages/firestore/src/local/mutation_queue.ts index a26ac14f1a2..4db5e60fa31 100644 --- a/packages/firestore/src/local/mutation_queue.ts +++ b/packages/firestore/src/local/mutation_queue.ts @@ -17,6 +17,7 @@ import { Timestamp } from '../api/timestamp'; import { Query } from '../core/query'; import { BatchId, ProtoByteString } from '../core/types'; +import { DocumentKeySet } from '../model/collections'; import { DocumentKey } from '../model/document_key'; import { Mutation } from '../model/mutation'; import { MutationBatch } from '../model/mutation_batch'; @@ -150,23 +151,44 @@ export interface MutationQueue extends GarbageSource { * document key, so when looping through the batch you'll need to check that * the mutation itself matches the key. * + * Batches are guaranteed to be in sorted order. + * * Note that because of this requirement implementations are free to return * mutation batches that don't contain the document key at all if it's * convenient. */ // TODO(mcg): This should really return an enumerator - // also for b/32992024, all backing stores should really index by document key getAllMutationBatchesAffectingDocumentKey( transaction: PersistenceTransaction, documentKey: DocumentKey ): PersistencePromise; + /** + * Finds all mutation batches that could possibly affect the given set of + * document keys. Not all mutations in a batch will necessarily affect each + * key, so when looping through the batch you'll need to check that the + * mutation itself matches the key. + * + * Batches are guaranteed to be in sorted order. + * + * Note that because of this requirement implementations are free to return + * mutation batches that don't contain any of the document keys at all if it's + * convenient. + */ + // TODO(mcg): This should really return an enumerator + getAllMutationBatchesAffectingDocumentKeys( + transaction: PersistenceTransaction, + documentKeys: DocumentKeySet + ): PersistencePromise; + /** * Finds all mutation batches that could affect the results for the given * query. Not all mutations in a batch will necessarily affect the query, so * when looping through the batch you'll need to check that the mutation * itself matches the query. * + * Batches are guaranteed to be in sorted order. + * * Note that because of this requirement implementations are free to return * mutation batches that don't match the query at all if it's convenient. * diff --git a/packages/firestore/src/local/persistence.ts b/packages/firestore/src/local/persistence.ts index 7a9cedc17d6..0f77a0ecdbe 100644 --- a/packages/firestore/src/local/persistence.ts +++ b/packages/firestore/src/local/persistence.ts @@ -17,7 +17,6 @@ import { User } from '../auth/user'; import { MutationQueue } from './mutation_queue'; -import { PersistenceTransaction } from './persistence'; import { PersistencePromise } from './persistence_promise'; import { QueryCache } from './query_cache'; import { RemoteDocumentCache } from './remote_document_cache'; @@ -30,7 +29,7 @@ import { ClientId } from './shared_client_state'; * pass it to your callback. You then pass it to any method that operates * on persistence. */ -export interface PersistenceTransaction {} +export abstract class PersistenceTransaction {} /** * Callback type for primary state notifications. This callback can be diff --git a/packages/firestore/src/local/simple_db.ts b/packages/firestore/src/local/simple_db.ts index 91646c3eab2..e429b736341 100644 --- a/packages/firestore/src/local/simple_db.ts +++ b/packages/firestore/src/local/simple_db.ts @@ -14,14 +14,12 @@ * limitations under the License. */ -import { assert, fail } from '../util/assert'; +import { assert } from '../util/assert'; import { debug } from '../util/log'; import { AnyDuringMigration } from '../util/misc'; - import { PersistencePromise } from './persistence_promise'; import { SCHEMA_VERSION } from './indexeddb_schema'; import { Deferred } from '../util/promise'; -import { PersistenceTransaction } from './persistence'; import { Code, FirestoreError } from '../util/error'; const LOG_TAG = 'SimpleDb'; @@ -150,14 +148,10 @@ export class SimpleDb { /** Helper to get a typed SimpleDbStore from a transaction. */ static getStore( - txn: PersistenceTransaction, + txn: SimpleDbTransaction, store: string ): SimpleDbStore { - if (txn instanceof SimpleDbTransaction) { - return txn.store(store); - } else { - return fail('Invalid transaction object provided!'); - } + return txn.store(store); } constructor(private db: IDBDatabase) {} @@ -547,25 +541,19 @@ export class SimpleDbStore { } private cursor(options: IterateOptions): IDBRequest { - let direction = 'next'; + let direction: IDBCursorDirection = 'next'; if (options.reverse) { direction = 'prev'; } if (options.index) { const index = this.store.index(options.index); if (options.keysOnly) { - return index.openKeyCursor( - options.range, - direction as AnyDuringMigration - ); + return index.openKeyCursor(options.range, direction); } else { - return index.openCursor(options.range, direction as AnyDuringMigration); + return index.openCursor(options.range, direction); } } else { - return this.store.openCursor( - options.range, - direction as AnyDuringMigration - ); + return this.store.openCursor(options.range, direction); } } } diff --git a/packages/firestore/src/model/path.ts b/packages/firestore/src/model/path.ts index 4226ca7d1b2..6ca8271a7f4 100644 --- a/packages/firestore/src/model/path.ts +++ b/packages/firestore/src/model/path.ts @@ -143,6 +143,20 @@ export abstract class Path { return true; } + isImmediateParentOf(potentialChild: this): boolean { + if (this.length + 1 !== potentialChild.length) { + return false; + } + + for (let i = 0; i < this.length; i++) { + if (this.get(i) !== potentialChild.get(i)) { + return false; + } + } + + return true; + } + forEach(fn: (segment: string) => void): void { for (let i = this.offset, end = this.limit(); i < end; i++) { fn(this.segments[i]); diff --git a/packages/firestore/src/remote/datastore.ts b/packages/firestore/src/remote/datastore.ts index 6930764f38a..73a2fee80a1 100644 --- a/packages/firestore/src/remote/datastore.ts +++ b/packages/firestore/src/remote/datastore.ts @@ -23,7 +23,7 @@ import { Mutation, MutationResult } from '../model/mutation'; import { assert } from '../util/assert'; import { Code, FirestoreError } from '../util/error'; import { AsyncQueue } from '../util/async_queue'; - +import { WatchStreamListener, WriteStreamListener } from './persistent_stream'; import { Connection } from './connection'; import { PersistentListenStream, @@ -54,21 +54,27 @@ export class Datastore { private serializer: JsonProtoSerializer ) {} - newPersistentWriteStream(): PersistentWriteStream { + newPersistentWriteStream( + listener: WriteStreamListener + ): PersistentWriteStream { return new PersistentWriteStream( this.queue, this.connection, this.credentials, - this.serializer + this.serializer, + listener ); } - newPersistentWatchStream(): PersistentListenStream { + newPersistentWatchStream( + listener: WatchStreamListener + ): PersistentListenStream { return new PersistentListenStream( this.queue, this.connection, this.credentials, - this.serializer + this.serializer, + listener ); } diff --git a/packages/firestore/src/remote/persistent_stream.ts b/packages/firestore/src/remote/persistent_stream.ts index bd2774e0cc0..05139841220 100644 --- a/packages/firestore/src/remote/persistent_stream.ts +++ b/packages/firestore/src/remote/persistent_stream.ts @@ -43,47 +43,58 @@ interface ListenRequest extends api.ListenRequest { export interface WriteRequest extends api.WriteRequest { database?: string; } - +/** + * PersistentStream can be in one of 5 states (each described in detail below) + * based on the following state transition diagram: + * + * start() called auth & connection succeeded + * INITIAL ----------------> STARTING -----------------------------> OPEN + * ^ | | + * | | error occurred | + * | \-----------------------------v-----/ + * | | + * backoff | | + * elapsed | start() called | + * \--- BACKOFF <---------------- ERROR + * + * [any state] --------------------------> INITIAL + * stop() called or + * idle timer expired + */ enum PersistentStreamState { /** - * The streaming RPC is not running and there's no error condition. - * Calling `start` will start the stream immediately without backoff. - * While in this state isStarted will return false. + * The streaming RPC is not yet running and there's no error condition. + * Calling start() will start the stream immediately without backoff. + * While in this state isStarted() will return false. */ Initial, /** - * The stream is starting, and is waiting for an auth token to attach to - * the initial request. While in this state, isStarted will return - * true but isOpen will return false. + * The stream is starting, either waiting for an auth token or for the stream + * to successfully open. While in this state, isStarted() will return true but + * isOpen() will return false. */ - Auth, + Starting, /** * The streaming RPC is up and running. Requests and responses can flow - * freely. Both isStarted and isOpen will return true. + * freely. Both isStarted() and isOpen() will return true. */ Open, /** * The stream encountered an error. The next start attempt will back off. * While in this state isStarted() will return false. - * */ Error, /** * An in-between state after an error where the stream is waiting before - * re-starting. After - * waiting is complete, the stream will try to open. While in this - * state isStarted() will return YES but isOpen will return false. + * re-starting. After waiting is complete, the stream will try to open. + * While in this state isStarted() will return true but isOpen() will return + * false. */ - Backoff, - - /** - * The stream has been explicitly stopped; no further events will be emitted. - */ - Stopped + Backoff } /** @@ -125,6 +136,7 @@ const IDLE_TIMEOUT_MS = 60 * 1000; * - Exponential backoff on failure * - Authentication via CredentialsProvider * - Dispatching all callbacks into the shared worker queue + * - Closing idle streams after 60 seconds of inactivity * * Subclasses of PersistentStream implement serialization of models to and * from the JSON representation of the protocol buffers for a specific @@ -132,12 +144,12 @@ const IDLE_TIMEOUT_MS = 60 * 1000; * * ## Starting and Stopping * - * Streaming RPCs are stateful and need to be `start`ed before messages can - * be sent and received. The PersistentStream will call the onOpen function + * Streaming RPCs are stateful and need to be start()ed before messages can + * be sent and received. The PersistentStream will call the onOpen() function * of the listener once the stream is ready to accept requests. * - * Should a `start` fail, PersistentStream will call the registered - * onClose with a FirestoreError indicating what went wrong. + * Should a start() fail, PersistentStream will call the registered onClose() + * listener with a FirestoreError indicating what went wrong. * * A PersistentStream can be started and stopped repeatedly. * @@ -153,20 +165,26 @@ export abstract class PersistentStream< ReceiveType, ListenerType extends PersistentStreamListener > { - private state: PersistentStreamState; - private inactivityTimerPromise: CancelablePromise | null = null; + private state = PersistentStreamState.Initial; + /** + * A close count that's incremented every time the stream is closed; used by + * getCloseGuardedDispatcher() to invalidate callbacks that happen after + * close. + */ + private closeCount = 0; + + private idleTimer: CancelablePromise | null = null; private stream: Stream | null = null; protected backoff: ExponentialBackoff; - protected listener: ListenerType | null = null; - constructor( private queue: AsyncQueue, connectionTimerId: TimerId, private idleTimerId: TimerId, protected connection: Connection, - private credentialsProvider: CredentialsProvider + private credentialsProvider: CredentialsProvider, + protected listener: ListenerType ) { this.backoff = new ExponentialBackoff( queue, @@ -175,26 +193,25 @@ export abstract class PersistentStream< BACKOFF_FACTOR, BACKOFF_MAX_DELAY_MS ); - this.state = PersistentStreamState.Initial; } /** - * Returns true if `start` has been called and no error has occurred. True + * Returns true if start() has been called and no error has occurred. True * indicates the stream is open or in the process of opening (which * encompasses respecting backoff, getting auth tokens, and starting the - * actual RPC). Use `isOpen` to determine if the stream is open and ready for + * actual RPC). Use isOpen() to determine if the stream is open and ready for * outbound requests. */ isStarted(): boolean { return ( - this.state === PersistentStreamState.Backoff || - this.state === PersistentStreamState.Auth || - this.state === PersistentStreamState.Open + this.state === PersistentStreamState.Starting || + this.state === PersistentStreamState.Open || + this.state === PersistentStreamState.Backoff ); } /** - * Returns true if the underlying RPC is open (the openHandler has been + * Returns true if the underlying RPC is open (the onOpen() listener has been * called) and the stream is ready for outbound requests. */ isOpen(): boolean { @@ -202,32 +219,31 @@ export abstract class PersistentStream< } /** - * Starts the RPC. Only allowed if isStarted returns false. The stream is - * not immediately ready for use: onOpen will be invoked when the RPC is ready - * for outbound requests, at which point isOpen will return true. + * Starts the RPC. Only allowed if isStarted() returns false. The stream is + * not immediately ready for use: onOpen() will be invoked when the RPC is + * ready for outbound requests, at which point isOpen() will return true. * - * When start returns, isStarted will return true. + * When start returns, isStarted() will return true. */ - start(listener: ListenerType): void { + start(): void { if (this.state === PersistentStreamState.Error) { - this.performBackoff(listener); + this.performBackoff(); return; } assert(this.state === PersistentStreamState.Initial, 'Already started'); - this.listener = listener; this.auth(); } /** * Stops the RPC. This call is idempotent and allowed regardless of the - * current isStarted state. + * current isStarted() state. * - * When stop returns, isStarted and isOpen will both return false. + * When stop returns, isStarted() and isOpen() will both return false. */ stop(): void { if (this.isStarted()) { - this.close(PersistentStreamState.Stopped); + this.close(PersistentStreamState.Initial); } } @@ -236,7 +252,7 @@ export abstract class PersistentStream< * start it. If the error warrants an immediate restart of the stream, the * sender can use this to indicate that the receiver should not back off. * - * Each error will call the onClose function. That function can decide to + * Each error will call the onClose() listener. That function can decide to * inhibit backoff if required. */ inhibitBackoff(): void { @@ -259,8 +275,8 @@ export abstract class PersistentStream< markIdle(): void { // Starts the idle time if we are in state 'Open' and are not yet already // running a timer (in which case the previous idle timeout still applies). - if (this.isOpen() && this.inactivityTimerPromise === null) { - this.inactivityTimerPromise = this.queue.enqueueAfterDelay( + if (this.isOpen() && this.idleTimer === null) { + this.idleTimer = this.queue.enqueueAfterDelay( this.idleTimerId, IDLE_TIMEOUT_MS, () => this.handleIdleCloseTimer() @@ -285,9 +301,9 @@ export abstract class PersistentStream< /** Marks the stream as active again. */ private cancelIdleCheck(): void { - if (this.inactivityTimerPromise) { - this.inactivityTimerPromise.cancel(); - this.inactivityTimerPromise = null; + if (this.idleTimer) { + this.idleTimer.cancel(); + this.idleTimer = null; } } @@ -299,8 +315,7 @@ export abstract class PersistentStream< * * sets internal stream state to 'finalState'; * * adjusts the backoff timer based on the error * - * A new stream can be opened by calling `start` unless `finalState` is set to - * `PersistentStreamState.Stopped`. + * A new stream can be opened by calling start(). * * @param finalState the intended state of the stream after closing. * @param error the error the connection was closed with. @@ -309,18 +324,20 @@ export abstract class PersistentStream< finalState: PersistentStreamState, error?: FirestoreError ): Promise { + assert(this.isStarted(), 'Only started streams should be closed.'); assert( finalState === PersistentStreamState.Error || isNullOrUndefined(error), "Can't provide an error when not in an error state." ); - // The stream will be closed so we don't need our idle close timer anymore. + // Cancel any outstanding timers (they're guaranteed not to execute). this.cancelIdleCheck(); - - // Ensure we don't leave a pending backoff operation queued (in case close() - // was called while we were waiting to reconnect). this.backoff.cancel(); + // Invalidates any stream-related callbacks (e.g. from auth or the + // underlying stream), guaranteeing they won't execute. + this.closeCount++; + if (finalState !== PersistentStreamState.Error) { // If this is an intentional close ensure we don't delay our next connection attempt. this.backoff.reset(); @@ -347,16 +364,9 @@ export abstract class PersistentStream< // This state must be assigned before calling onClose() to allow the callback to // inhibit backoff or otherwise manipulate the state in its non-started state. this.state = finalState; - const listener = this.listener!; - - // Clear the listener to avoid bleeding of events from the underlying streams. - this.listener = null; - // If the caller explicitly requested a stream stop, don't notify them of a closing stream (it - // could trigger undesirable recovery logic, etc.). - if (finalState !== PersistentStreamState.Stopped) { - return listener.onClose(error); - } + // Notify the listener that the stream closed. + await this.listener.onClose(error); } /** @@ -386,98 +396,84 @@ export abstract class PersistentStream< 'Must be in initial state to auth' ); - this.state = PersistentStreamState.Auth; + this.state = PersistentStreamState.Starting; + + const dispatchIfNotClosed = this.getCloseGuardedDispatcher(this.closeCount); + + // TODO(mikelehen): Just use dispatchIfNotClosed, but see TODO below. + const closeCount = this.closeCount; this.credentialsProvider.getToken().then( token => { - // Normally we'd have to schedule the callback on the AsyncQueue. - // However, the following calls are safe to be called outside the - // AsyncQueue since they don't chain asynchronous calls - this.startStream(token); + // Stream can be stopped while waiting for authentication. + // TODO(mikelehen): We really should just use dispatchIfNotClosed + // and let this dispatch onto the queue, but that opened a spec test can + // of worms that I don't want to deal with in this PR. + if (this.closeCount === closeCount) { + // Normally we'd have to schedule the callback on the AsyncQueue. + // However, the following calls are safe to be called outside the + // AsyncQueue since they don't chain asynchronous calls + this.startStream(token); + } }, (error: Error) => { - this.queue.enqueue(async () => { - if (this.state !== PersistentStreamState.Stopped) { - // Stream can be stopped while waiting for authorization. - const rpcError = new FirestoreError( - Code.UNKNOWN, - 'Fetching auth token failed: ' + error.message - ); - return this.handleStreamClose(rpcError); - } + dispatchIfNotClosed(() => { + const rpcError = new FirestoreError( + Code.UNKNOWN, + 'Fetching auth token failed: ' + error.message + ); + return this.handleStreamClose(rpcError); }); } ); } private startStream(token: Token | null): void { - if (this.state === PersistentStreamState.Stopped) { - // Stream can be stopped while waiting for authorization. - return; - } - assert( - this.state === PersistentStreamState.Auth, - 'Trying to start stream in a non-auth state' + this.state === PersistentStreamState.Starting, + 'Trying to start stream in a non-starting state' ); - // Helper function to dispatch to AsyncQueue and make sure that any - // close will seem instantaneous and events are prevented from being - // raised after the close call - const dispatchIfStillActive = ( - stream: Stream, - fn: () => Promise - ) => { - this.queue.enqueue(async () => { - // Only raise events if the stream instance has not changed - if (this.stream === stream) { - return fn(); - } - }); - }; - // Only start stream if listener has not changed - if (this.listener !== null) { - const currentStream = this.startRpc(token); - this.stream = currentStream; - this.stream.onOpen(() => { - dispatchIfStillActive(currentStream, () => { - assert( - this.state === PersistentStreamState.Auth, - 'Expected stream to be in state auth, but was ' + this.state - ); - this.state = PersistentStreamState.Open; - return this.listener!.onOpen(); - }); + const dispatchIfNotClosed = this.getCloseGuardedDispatcher(this.closeCount); + + this.stream = this.startRpc(token); + this.stream.onOpen(() => { + dispatchIfNotClosed(() => { + assert( + this.state === PersistentStreamState.Starting, + 'Expected stream to be in state Starting, but was ' + this.state + ); + this.state = PersistentStreamState.Open; + return this.listener!.onOpen(); }); - this.stream.onClose((error: FirestoreError) => { - dispatchIfStillActive(currentStream, () => { - return this.handleStreamClose(error); - }); + }); + this.stream.onClose((error: FirestoreError) => { + dispatchIfNotClosed(() => { + return this.handleStreamClose(error); }); - this.stream.onMessage((msg: ReceiveType) => { - dispatchIfStillActive(currentStream, () => { - return this.onMessage(msg); - }); + }); + this.stream.onMessage((msg: ReceiveType) => { + dispatchIfNotClosed(() => { + return this.onMessage(msg); }); - } + }); } - private performBackoff(listener: ListenerType): void { + private performBackoff(): void { assert( this.state === PersistentStreamState.Error, - 'Should only perform backoff in an error case' + 'Should only perform backoff when in Error state' ); this.state = PersistentStreamState.Backoff; this.backoff.backoffAndRun(async () => { - if (this.state === PersistentStreamState.Stopped) { - // We should have canceled the backoff timer when the stream was - // closed, but just in case we make this a no-op. - return; - } + assert( + this.state === PersistentStreamState.Backoff, + 'Backoff elapsed but state is now: ' + this.state + ); this.state = PersistentStreamState.Initial; - this.start(listener); + this.start(); assert(this.isStarted(), 'PersistentStream should have started'); }); } @@ -495,6 +491,30 @@ export abstract class PersistentStream< // without a backoff accidentally, we set the stream to error in all cases. return this.close(PersistentStreamState.Error, error); } + + /** + * Returns a "dispatcher" function that dispatches operations onto the + * AsyncQueue but only runs them if closeCount remains unchanged. This allows + * us to turn auth / stream callbacks into no-ops if the stream is closed / + * re-opened, etc. + */ + private getCloseGuardedDispatcher( + startCloseCount: number + ): (fn: () => Promise) => void { + return (fn: () => Promise): void => { + this.queue.enqueue(() => { + if (this.closeCount === startCloseCount) { + return fn(); + } else { + log.debug( + LOG_TAG, + 'stream callback skipped by getCloseGuardedDispatcher.' + ); + return Promise.resolve(); + } + }); + }; + } } /** Listener for the PersistentWatchStream */ @@ -512,9 +532,9 @@ export interface WatchStreamListener extends PersistentStreamListener { /** * A PersistentStream that implements the Listen RPC. * - * Once the Listen stream has called the openHandler, any number of listen and - * unlisten calls calls can be sent to control what changes will be sent from - * the server for ListenResponses. + * Once the Listen stream has called the onOpen() listener, any number of + * listen() and unlisten() calls can be made to control what changes will be + * sent from the server for ListenResponses. */ export class PersistentListenStream extends PersistentStream< api.ListenRequest, @@ -525,14 +545,16 @@ export class PersistentListenStream extends PersistentStream< queue: AsyncQueue, connection: Connection, credentials: CredentialsProvider, - private serializer: JsonProtoSerializer + private serializer: JsonProtoSerializer, + listener: WatchStreamListener ) { super( queue, TimerId.ListenStreamConnectionBackoff, TimerId.ListenStreamIdle, connection, - credentials + credentials, + listener ); } @@ -633,14 +655,16 @@ export class PersistentWriteStream extends PersistentStream< queue: AsyncQueue, connection: Connection, credentials: CredentialsProvider, - private serializer: JsonProtoSerializer + private serializer: JsonProtoSerializer, + listener: WriteStreamListener ) { super( queue, TimerId.WriteStreamConnectionBackoff, TimerId.WriteStreamIdle, connection, - credentials + credentials, + listener ); } @@ -663,9 +687,9 @@ export class PersistentWriteStream extends PersistentStream< } // Override of PersistentStream.start - start(listener: WriteStreamListener): void { + start(): void { this.handshakeComplete_ = false; - super.start(listener); + super.start(); } protected tearDown(): void { diff --git a/packages/firestore/src/remote/remote_store.ts b/packages/firestore/src/remote/remote_store.ts index 4c0dbe9bc46..59cc291f41c 100644 --- a/packages/firestore/src/remote/remote_store.ts +++ b/packages/firestore/src/remote/remote_store.ts @@ -76,9 +76,6 @@ const MAX_PENDING_WRITES = 10; * - pulling pending mutations from LocalStore and sending them to Datastore. * - retrying mutations that failed because of network problems. * - acking mutations to the SyncEngine once they are accepted or rejected. - * - * RemoteStore always starts out offline. A call to `enableNetwork()` - * initializes the network connection. */ export class RemoteStore implements TargetMetadataProvider { /** @@ -111,17 +108,16 @@ export class RemoteStore implements TargetMetadataProvider { */ private listenTargets: { [targetId: number]: QueryData } = {}; - private watchStream: PersistentListenStream = null; - private writeStream: PersistentWriteStream = null; + private watchStream: PersistentListenStream; + private writeStream: PersistentWriteStream; private watchChangeAggregator: WatchChangeAggregator = null; /** * Set to true by enableNetwork() and false by disableNetwork() and indicates - * the user-preferred network state. A network connection is only established - * if `networkAllowed` is true, the client is primary and there are - * outstanding mutations or active listens. + * the user-preferred network state. */ - private networkAllowed = true; + private networkEnabled = false; + private isPrimary = false; private onlineStateTracker: OnlineStateTracker; @@ -140,6 +136,20 @@ export class RemoteStore implements TargetMetadataProvider { asyncQueue, onlineStateHandler ); + + // Create streams (but note they're not started yet). + this.watchStream = this.datastore.newPersistentWatchStream({ + onOpen: this.onWatchStreamOpen.bind(this), + onClose: this.onWatchStreamClose.bind(this), + onWatchChange: this.onWatchStreamChange.bind(this) + }); + + this.writeStream = this.datastore.newPersistentWriteStream({ + onOpen: this.onWriteStreamOpen.bind(this), + onClose: this.onWriteStreamClose.bind(this), + onHandshakeComplete: this.onWriteHandshakeComplete.bind(this), + onMutationResult: this.onMutationResult.bind(this) + }); } /** SyncEngine to notify of watch and write events. */ @@ -150,43 +160,24 @@ export class RemoteStore implements TargetMetadataProvider { * LocalStore, etc. */ start(): Promise { - // Start is a no-op for RemoteStore. - return Promise.resolve(); - } - - private isNetworkEnabled(): boolean { - assert( - (this.watchStream == null) === (this.writeStream == null), - 'WatchStream and WriteStream should both be null or non-null' - ); - return this.watchStream != null; + return this.enableNetwork(); } /** Re-enables the network. Idempotent. */ async enableNetwork(): Promise { - this.networkAllowed = true; + this.networkEnabled = true; - if (this.isPrimary) { - if (this.isNetworkEnabled()) { - return; - } - - // Create new streams (but note they're not started yet). - this.watchStream = this.datastore.newPersistentWatchStream(); - this.writeStream = this.datastore.newPersistentWriteStream(); + if (this.canUseNetwork()) { + this.writeStream.lastStreamToken = await this.localStore.getLastStreamToken(); - // Load any saved stream token from persistent storage - return this.localStore.getLastStreamToken().then(token => { - this.writeStream.lastStreamToken = token; - - if (this.shouldStartWatchStream()) { - this.startWatchStream(); - } else { - this.onlineStateTracker.set(OnlineState.Unknown); - } + if (this.shouldStartWatchStream()) { + this.startWatchStream(); + } else { + this.onlineStateTracker.set(OnlineState.Unknown); + } - return this.fillWritePipeline(); // This may start the writeStream. - }); + // This will start the write stream if necessary. + await this.fillWritePipeline(); } } @@ -195,43 +186,33 @@ export class RemoteStore implements TargetMetadataProvider { * enableNetwork(). */ async disableNetwork(): Promise { - this.networkAllowed = false; - + this.networkEnabled = false; this.disableNetworkInternal(); + // Set the OnlineState to Offline so get()s return from cache, etc. this.onlineStateTracker.set(OnlineState.Offline); } - /** - * Disables the network, if it is currently enabled. - */ private disableNetworkInternal(): void { - if (this.isNetworkEnabled()) { - // NOTE: We're guaranteed not to get any further events from these streams (not even a close - // event). - this.watchStream.stop(); - this.writeStream.stop(); - - this.cleanUpWatchStreamState(); + this.writeStream.stop(); + this.watchStream.stop(); + if (this.writePipeline.length > 0) { log.debug( LOG_TAG, - 'Stopping write stream with ' + - this.writePipeline.length + - ' pending writes' + `Stopping write stream with ${this.writePipeline.length} pending writes` ); - // TODO(mikelehen): We only actually need to clear the write pipeline if - // this is being called as part of handleUserChange(). Consider reworking. this.writePipeline = []; - - this.writeStream = null; - this.watchStream = null; } + + this.cleanUpWatchStreamState(); } shutdown(): Promise { log.debug(LOG_TAG, 'RemoteStore shutting down.'); + this.networkEnabled = false; this.disableNetworkInternal(); + // Set the OnlineState to Unknown (rather than Offline) to avoid potentially // triggering spurious listener events with cached data, etc. this.onlineStateTracker.set(OnlineState.Unknown); @@ -250,7 +231,7 @@ export class RemoteStore implements TargetMetadataProvider { if (this.shouldStartWatchStream()) { // The listen will be sent in onWatchStreamOpen this.startWatchStream(); - } else if (this.isNetworkEnabled() && this.watchStream.isOpen()) { + } else if (this.watchStream.isOpen()) { this.sendWatchRequest(queryData); } } @@ -262,7 +243,7 @@ export class RemoteStore implements TargetMetadataProvider { 'unlisten called without assigned target ID!' ); delete this.listenTargets[targetId]; - if (this.isNetworkEnabled() && this.watchStream.isOpen()) { + if (this.watchStream.isOpen()) { this.sendUnwatchRequest(targetId); if (objUtils.isEmpty(this.listenTargets)) { this.watchStream.markIdle(); @@ -302,15 +283,11 @@ export class RemoteStore implements TargetMetadataProvider { private startWatchStream(): void { assert( this.shouldStartWatchStream(), - 'startWriteStream() called when shouldStartWatchStream() is false.' + 'startWatchStream() called when shouldStartWatchStream() is false.' ); this.watchChangeAggregator = new WatchChangeAggregator(this); - this.watchStream.start({ - onOpen: this.onWatchStreamOpen.bind(this), - onClose: this.onWatchStreamClose.bind(this), - onWatchChange: this.onWatchStreamChange.bind(this) - }); + this.watchStream.start(); this.onlineStateTracker.handleWatchStreamStart(); } @@ -320,39 +297,41 @@ export class RemoteStore implements TargetMetadataProvider { */ private shouldStartWatchStream(): boolean { return ( - this.isNetworkEnabled() && + this.canUseNetwork() && !this.watchStream.isStarted() && !objUtils.isEmpty(this.listenTargets) ); } + private canUseNetwork(): boolean { + return this.isPrimary && this.networkEnabled; + } + private cleanUpWatchStreamState(): void { this.watchChangeAggregator = null; } private async onWatchStreamOpen(): Promise { - // TODO(b/35852690): close the stream again (with some timeout?) if no watch - // targets are active objUtils.forEachNumber(this.listenTargets, (targetId, queryData) => { this.sendWatchRequest(queryData); }); } private async onWatchStreamClose(error?: FirestoreError): Promise { - assert( - this.isNetworkEnabled(), - 'onWatchStreamClose() should only be called when the network is enabled' - ); + if (error === undefined) { + // Graceful stop (due to stop() or idle timeout). Make sure that's + // desirable. + assert( + !this.shouldStartWatchStream(), + 'Watch stream was stopped gracefully while still needed.' + ); + } this.cleanUpWatchStreamState(); // If we still need the watch stream, retry the connection. if (this.shouldStartWatchStream()) { - // There should generally be an error if the watch stream was closed when - // it's still needed, but it's not quite worth asserting. - if (error) { - this.onlineStateTracker.handleWatchStreamFailure(error); - } + this.onlineStateTracker.handleWatchStreamFailure(error); this.startWatchStream(); } else { @@ -498,28 +477,32 @@ export class RemoteStore implements TargetMetadataProvider { this.writePipeline.length > 0 ? this.writePipeline[this.writePipeline.length - 1].batchId : BATCHID_UNKNOWN; - return this.localStore - .nextMutationBatch(lastBatchIdRetrieved) - .then(batch => { - if (batch === null) { - if (this.writePipeline.length === 0) { - this.writeStream.markIdle(); - } - } else { - this.addToWritePipeline(batch); - return this.fillWritePipeline(); - } - }); + const batch = await this.localStore.nextMutationBatch( + lastBatchIdRetrieved + ); + + if (batch === null) { + if (this.writePipeline.length === 0) { + this.writeStream.markIdle(); + } + } else { + this.addToWritePipeline(batch); + await this.fillWritePipeline(); + } + } + + if (this.shouldStartWriteStream()) { + this.startWriteStream(); } } /** - * Returns true if we can add to the write pipeline (i.e. it is not full and - * the network is enabled). + * Returns true if we can add to the write pipeline (i.e. the network is + * enabled and the write pipeline is not full). */ private canAddToWritePipeline(): boolean { return ( - this.isNetworkEnabled() && this.writePipeline.length < MAX_PENDING_WRITES + this.canUseNetwork() && this.writePipeline.length < MAX_PENDING_WRITES ); } @@ -530,8 +513,7 @@ export class RemoteStore implements TargetMetadataProvider { /** * Queues additional writes to be sent to the write stream, sending them - * immediately if the write stream is established, else starting the write - * stream if it is not yet started. + * immediately if the write stream is established. */ private addToWritePipeline(batch: MutationBatch): void { assert( @@ -540,16 +522,14 @@ export class RemoteStore implements TargetMetadataProvider { ); this.writePipeline.push(batch); - if (this.shouldStartWriteStream()) { - this.startWriteStream(); - } else if (this.isNetworkEnabled() && this.writeStream.handshakeComplete) { + if (this.writeStream.isOpen() && this.writeStream.handshakeComplete) { this.writeStream.writeMutations(batch.mutations); } } private shouldStartWriteStream(): boolean { return ( - this.isNetworkEnabled() && + this.canUseNetwork() && !this.writeStream.isStarted() && this.writePipeline.length > 0 ); @@ -560,12 +540,7 @@ export class RemoteStore implements TargetMetadataProvider { this.shouldStartWriteStream(), 'startWriteStream() called when shouldStartWriteStream() is false.' ); - this.writeStream.start({ - onOpen: this.onWriteStreamOpen.bind(this), - onClose: this.onWriteStreamClose.bind(this), - onHandshakeComplete: this.onWriteHandshakeComplete.bind(this), - onMutationResult: this.onMutationResult.bind(this) - }); + this.writeStream.start(); } private async onWriteStreamOpen(): Promise { @@ -626,10 +601,14 @@ export class RemoteStore implements TargetMetadataProvider { } private async onWriteStreamClose(error?: FirestoreError): Promise { - assert( - this.isNetworkEnabled(), - 'onWriteStreamClose() should only be called when the network is enabled' - ); + if (error === undefined) { + // Graceful stop (due to stop() or idle timeout). Make sure that's + // desirable. + assert( + !this.shouldStartWriteStream(), + 'Write stream was stopped gracefully while still needed.' + ); + } // If the write stream closed due to an error, invoke the error callbacks if // there are pending writes. @@ -704,18 +683,17 @@ export class RemoteStore implements TargetMetadataProvider { return new Transaction(this.datastore); } - handleUserChange(user: User): Promise { + async handleUserChange(user: User): Promise { log.debug(LOG_TAG, 'RemoteStore changing users: uid=', user.uid); - // If the network has been explicitly disabled, make sure we don't - // accidentally re-enable it. - if (this.isNetworkEnabled()) { + if (this.canUseNetwork()) { // Tear down and re-create our network streams. This will ensure we get a fresh auth token // for the new user and re-fill the write pipeline with new mutations from the LocalStore // (since mutations are per-user). + this.networkEnabled = false; this.disableNetworkInternal(); this.onlineStateTracker.set(OnlineState.Unknown); - return this.enableNetwork(); + await this.enableNetwork(); } } @@ -725,9 +703,9 @@ export class RemoteStore implements TargetMetadataProvider { async applyPrimaryState(isPrimary: boolean): Promise { this.isPrimary = isPrimary; - if (isPrimary && this.networkAllowed) { + if (isPrimary && this.networkEnabled) { await this.enableNetwork(); - } else if (!isPrimary && this.isNetworkEnabled()) { + } else if (!isPrimary) { this.disableNetworkInternal(); this.onlineStateTracker.set(OnlineState.Unknown); } diff --git a/packages/firestore/src/remote/watch_change.ts b/packages/firestore/src/remote/watch_change.ts index faeef82abe5..8ccebed2c69 100644 --- a/packages/firestore/src/remote/watch_change.ts +++ b/packages/firestore/src/remote/watch_change.ts @@ -297,7 +297,7 @@ export class WatchChangeAggregator { /** Processes and adds the WatchTargetChange to the current set of changes. */ handleTargetChange(targetChange: WatchTargetChange): void { - targetChange.targetIds.forEach(targetId => { + this.forEachTarget(targetChange, targetId => { const targetState = this.ensureTargetState(targetId); switch (targetChange.state) { case WatchTargetChangeState.NoChange: @@ -352,6 +352,22 @@ export class WatchChangeAggregator { }); } + /** + * Iterates over all targetIds that the watch change applies to: either the + * targetIds explicitly listed in the change or the targetIds of all currently + * active targets. + */ + forEachTarget( + targetChange: WatchTargetChange, + fn: (targetId: TargetId) => void + ): void { + if (targetChange.targetIds.length > 0) { + targetChange.targetIds.forEach(fn); + } else { + objUtils.forEachNumber(this.targetStates, fn); + } + } + /** * Handles existence filters and synthesizes deletes for filter mismatches. * Targets that are invalidated by filter mismatches are added to diff --git a/packages/firestore/test/integration/remote/stream.test.ts b/packages/firestore/test/integration/remote/stream.test.ts index 24997b816ce..94d2c747dcd 100644 --- a/packages/firestore/test/integration/remote/stream.test.ts +++ b/packages/firestore/test/integration/remote/stream.test.ts @@ -134,20 +134,20 @@ describe('Watch Stream', () => { }); /** - * Verifies that the watch stream does not issue an onClose callback after a + * Verifies that the watch stream issues an onClose callback after a * call to stop(). */ it('can be stopped before handshake', () => { let watchStream: PersistentListenStream; return withTestDatastore(ds => { - watchStream = ds.newPersistentWatchStream(); - watchStream.start(streamListener); + watchStream = ds.newPersistentWatchStream(streamListener); + watchStream.start(); return streamListener.awaitCallback('open').then(() => { - // Stop must not call onClose because the full implementation of the callback could - // attempt to restart the stream in the event it had pending watches. watchStream.stop(); + + return streamListener.awaitCallback('close'); }); }); }); @@ -183,22 +183,20 @@ describe('Write Stream', () => { }); /** - * Verifies that the write stream does not issue an onClose callback after a - * call to stop(). + * Verifies that the write stream issues an onClose callback after a call to + * stop(). */ it('can be stopped before handshake', () => { let writeStream: PersistentWriteStream; return withTestDatastore(ds => { - writeStream = ds.newPersistentWriteStream(); - writeStream.start(streamListener); + writeStream = ds.newPersistentWriteStream(streamListener); + writeStream.start(); return streamListener.awaitCallback('open'); }).then(() => { - // Don't start the handshake. - - // Stop must not call onClose because the full implementation of the callback could - // attempt to restart the stream in the event it had pending writes. writeStream.stop(); + + return streamListener.awaitCallback('close'); }); }); @@ -206,8 +204,8 @@ describe('Write Stream', () => { let writeStream: PersistentWriteStream; return withTestDatastore(ds => { - writeStream = ds.newPersistentWriteStream(); - writeStream.start(streamListener); + writeStream = ds.newPersistentWriteStream(streamListener); + writeStream.start(); return streamListener.awaitCallback('open'); }) .then(() => { @@ -225,6 +223,8 @@ describe('Write Stream', () => { }) .then(() => { writeStream.stop(); + + return streamListener.awaitCallback('close'); }); }); @@ -232,8 +232,8 @@ describe('Write Stream', () => { const queue = new AsyncQueue(); return withTestDatastore(ds => { - const writeStream = ds.newPersistentWriteStream(); - writeStream.start(streamListener); + const writeStream = ds.newPersistentWriteStream(streamListener); + writeStream.start(); return streamListener .awaitCallback('open') .then(() => { @@ -259,8 +259,8 @@ describe('Write Stream', () => { const queue = new AsyncQueue(); return withTestDatastore(ds => { - const writeStream = ds.newPersistentWriteStream(); - writeStream.start(streamListener); + const writeStream = ds.newPersistentWriteStream(streamListener); + writeStream.start(); return streamListener .awaitCallback('open') .then(() => { @@ -288,8 +288,8 @@ describe('Write Stream', () => { return withTestDatastore( ds => { - const writeStream = ds.newPersistentWriteStream(); - writeStream.start(streamListener); + const writeStream = ds.newPersistentWriteStream(streamListener); + writeStream.start(); return streamListener .awaitCallback('open') .then(() => { @@ -301,7 +301,7 @@ describe('Write Stream', () => { return streamListener.awaitCallback('close'); }) .then(() => { - writeStream.start(streamListener); + writeStream.start(); return streamListener.awaitCallback('open'); }) .then(() => { @@ -312,7 +312,7 @@ describe('Write Stream', () => { return streamListener.awaitCallback('close'); }) .then(() => { - writeStream.start(streamListener); + writeStream.start(); return streamListener.awaitCallback('open'); }) .then(() => { diff --git a/packages/firestore/test/unit/local/eager_garbage_collector.test.ts b/packages/firestore/test/unit/local/eager_garbage_collector.test.ts index f12a579b217..af5d748a2be 100644 --- a/packages/firestore/test/unit/local/eager_garbage_collector.test.ts +++ b/packages/firestore/test/unit/local/eager_garbage_collector.test.ts @@ -65,7 +65,7 @@ describe('EagerGarbageCollector', () => { expect(referenceSet.isEmpty()).to.equal(false); referenceSet.removeReferencesForId(2); - return gc.collectGarbage(true).toPromise(); + return gc.collectGarbage(null).toPromise(); }) .then(garbage => { expectSetToEqual(garbage, [key3]); diff --git a/packages/firestore/test/unit/local/mutation_queue.test.ts b/packages/firestore/test/unit/local/mutation_queue.test.ts index afd672d9245..08c4e4abd55 100644 --- a/packages/firestore/test/unit/local/mutation_queue.test.ts +++ b/packages/firestore/test/unit/local/mutation_queue.test.ts @@ -20,6 +20,7 @@ import { Query } from '../../../src/core/query'; import { EagerGarbageCollector } from '../../../src/local/eager_garbage_collector'; import { IndexedDbPersistence } from '../../../src/local/indexeddb_persistence'; import { Persistence } from '../../../src/local/persistence'; +import { documentKeySet } from '../../../src/model/collections'; import { BATCHID_UNKNOWN, MutationBatch @@ -40,7 +41,6 @@ import { addEqualityMatcher } from '../../util/equality_matcher'; let persistence: Persistence; let mutationQueue: TestMutationQueue; - describe('MemoryMutationQueue', () => { beforeEach(() => { return persistenceHelpers.testMemoryPersistence().then(p => { @@ -73,7 +73,7 @@ describe('IndexedDbMutationQueue', () => { function genericMutationQueueTests(): void { addEqualityMatcher(); - beforeEach(() => { + beforeEach(async () => { mutationQueue = new TestMutationQueue( persistence, persistence.getMutationQueue(new User('user')) @@ -83,6 +83,58 @@ function genericMutationQueueTests(): void { afterEach(() => persistence.shutdown(/* deleteData= */ true)); + /** + * Creates a new MutationBatch with the next batch ID and a set of dummy + * mutations. + */ + function addMutationBatch(key?: string): Promise { + let keyStr = key; + if (keyStr === undefined) { + keyStr = 'foo/bar'; + } + const mutation = setMutation(keyStr, { a: 1 }); + return mutationQueue.addMutationBatch([mutation]); + } + + /** + * Creates an array of batches containing count dummy MutationBatches. Each + * has a different batchID. + */ + async function createBatches(count: number): Promise { + const batches = []; + for (let i = 0; i < count; i++) { + const batch = await addMutationBatch(); + batches.push(batch); + } + return batches; + } + + /** + * Removes entries from from the given a batches and returns them. + * + * @param holes An array of indexes in the batches array; in increasing order. + * Indexes are relative to the original state of the batches array, not any + * intermediate state that might occur. + * @param batches The array to mutate, removing entries from it. + * @return A new array containing all the entries that were removed from + * batches. + */ + async function makeHolesInBatches( + holes: number[], + batches: MutationBatch[] + ): Promise { + const removed = []; + for (let i = 0; i < holes.length; i++) { + const index = holes[i] - i; + const batch = batches[index]; + await mutationQueue.removeMutationBatches([batch]); + + batches.splice(index, 1); + removed.push(batch); + } + return removed; + } + it('can count batches', async () => { expect(await mutationQueue.countBatches()).to.equal(0); expect(await mutationQueue.checkEmpty()).to.equal(true); @@ -329,7 +381,30 @@ function genericMutationQueueTests(): void { const matches = await mutationQueue.getAllMutationBatchesAffectingDocumentKey( key('foo/bar') ); - expect(matches.length).to.deep.equal(expected.length); + expectEqualArrays(matches, expected); + }); + + it('can getAllMutationBatchesAffectingDocumentKeys()', async () => { + const mutations = [ + setMutation('fob/bar', { a: 1 }), + setMutation('foo/bar', { a: 1 }), + patchMutation('foo/bar', { b: 1 }), + setMutation('foo/bar/suffix/key', { a: 1 }), + setMutation('foo/baz', { a: 1 }), + setMutation('food/bar', { a: 1 }) + ]; + // Store all the mutations. + const batches: MutationBatch[] = []; + for (const mutation of mutations) { + const batch = await mutationQueue.addMutationBatch([mutation]); + batches.push(batch); + } + const expected = [batches[1], batches[2], batches[4]]; + const matches = await mutationQueue.getAllMutationBatchesAffectingDocumentKeys( + documentKeySet() + .add(key('foo/bar')) + .add(key('foo/baz')) + ); expectEqualArrays(matches, expected); }); @@ -498,55 +573,3 @@ function genericMutationQueueTests(): void { expect(await mutationQueue.checkEmpty()).to.equal(true); }); } - -/** - * Creates a new MutationBatch with the next batch ID and a set of dummy - * mutations. - */ -function addMutationBatch(key?: string): Promise { - let keyStr = key; - if (keyStr === undefined) { - keyStr = 'foo/bar'; - } - const mutation = setMutation(keyStr, { a: 1 }); - return mutationQueue.addMutationBatch([mutation]); -} - -/** - * Creates an array of batches containing count dummy MutationBatches. Each - * has a different batchID. - */ -async function createBatches(count: number): Promise { - const batches = []; - for (let i = 0; i < count; i++) { - const batch = await addMutationBatch(); - batches.push(batch); - } - return batches; -} - -/** - * Removes entries from from the given a batches and returns them. - * - * @param holes An array of indexes in the batches array; in increasing order. - * Indexes are relative to the original state of the batches array, not any - * intermediate state that might occur. - * @param batches The array to mutate, removing entries from it. - * @return A new array containing all the entries that were removed from - * batches. - */ -async function makeHolesInBatches( - holes: number[], - batches: MutationBatch[] -): Promise { - const removed = []; - for (let i = 0; i < holes.length; i++) { - const index = holes[i] - i; - const batch = batches[index]; - await mutationQueue.removeMutationBatches([batch]); - - batches.splice(index, 1); - removed.push(batch); - } - return removed; -} diff --git a/packages/firestore/test/unit/local/query_cache.test.ts b/packages/firestore/test/unit/local/query_cache.test.ts index 2a3ede52eba..bb4f4f0da03 100644 --- a/packages/firestore/test/unit/local/query_cache.test.ts +++ b/packages/firestore/test/unit/local/query_cache.test.ts @@ -35,17 +35,8 @@ import * as persistenceHelpers from './persistence_test_helpers'; import { TestGarbageCollector } from './test_garbage_collector'; import { TestQueryCache } from './test_query_cache'; -let persistence: Persistence; -let cache: TestQueryCache; - describe('MemoryQueryCache', () => { - beforeEach(() => { - return persistenceHelpers.testMemoryPersistence().then(p => { - persistence = p; - }); - }); - - genericQueryCacheTests(); + genericQueryCacheTests(persistenceHelpers.testMemoryPersistence); }); describe('IndexedDbQueryCache', () => { @@ -54,22 +45,22 @@ describe('IndexedDbQueryCache', () => { return; } - beforeEach(() => { - return persistenceHelpers.testIndexedDbPersistence().then(p => { - persistence = p; - }); + let persistencePromise: Promise; + beforeEach(async () => { + persistencePromise = persistenceHelpers.testIndexedDbPersistence(); }); - afterEach(() => persistence.shutdown(/* deleteData= */ true)); - - genericQueryCacheTests(); + genericQueryCacheTests(() => persistencePromise); }); /** * Defines the set of tests to run against both query cache implementations. */ -function genericQueryCacheTests(): void { +function genericQueryCacheTests( + persistencePromise: () => Promise +): void { addEqualityMatcher(); + let cache: TestQueryCache; const QUERY_ROOMS = Query.atPath(path('rooms')); const QUERY_HALLS = Query.atPath(path('halls')); @@ -98,11 +89,17 @@ function genericQueryCacheTests(): void { ); } + let persistence: Persistence; beforeEach(async () => { + persistence = await persistencePromise(); cache = new TestQueryCache(persistence, persistence.getQueryCache()); await cache.start(); }); + afterEach(async () => { + persistence.shutdown(/* deleteData= */ true); + }); + it('returns null for query not in cache', () => { return cache.getQueryData(QUERY_ROOMS).then(queryData => { expect(queryData).to.equal(null); diff --git a/packages/firestore/test/unit/local/remote_document_cache.test.ts b/packages/firestore/test/unit/local/remote_document_cache.test.ts index 59491cc0bac..bc19e796233 100644 --- a/packages/firestore/test/unit/local/remote_document_cache.test.ts +++ b/packages/firestore/test/unit/local/remote_document_cache.test.ts @@ -32,17 +32,8 @@ import * as persistenceHelpers from './persistence_test_helpers'; import { TestRemoteDocumentCache } from './test_remote_document_cache'; import { MaybeDocumentMap } from '../../../src/model/collections'; -let persistence: Persistence; -let cache: TestRemoteDocumentCache; - describe('MemoryRemoteDocumentCache', () => { - beforeEach(() => { - return persistenceHelpers.testMemoryPersistence().then(p => { - persistence = p; - }); - }); - - genericRemoteDocumentCacheTests(); + genericRemoteDocumentCacheTests(persistenceHelpers.testMemoryPersistence); }); describe('IndexedDbRemoteDocumentCache', () => { @@ -51,31 +42,27 @@ describe('IndexedDbRemoteDocumentCache', () => { return; } - beforeEach(() => { - // We turn on `synchronizeTabs` to test the document change log. - return persistenceHelpers - .testIndexedDbPersistence(/* synchronizeTabs= */ true) - .then(p => { - persistence = p; - }); - }); - - afterEach(() => persistence.shutdown(/* deleteData= */ true)); - - genericRemoteDocumentCacheTests(); + genericRemoteDocumentCacheTests(() => + persistenceHelpers.testIndexedDbPersistence(/* synchronizeTabs= */ true) + ); }); /** * Defines the set of tests to run against both remote document cache * implementations. */ -function genericRemoteDocumentCacheTests(): void { +function genericRemoteDocumentCacheTests( + persistencePromise: () => Promise +): void { // Helpers for use throughout tests. const DOC_PATH = 'a/b'; const LONG_DOC_PATH = 'a/b/c/d/e/f'; const DOC_DATA = { a: 1, b: 2 }; const VERSION = 42; + let persistence: Persistence; + let cache: TestRemoteDocumentCache; + function setAndReadDocument(doc: MaybeDocument): Promise { return cache .addEntries([doc]) @@ -105,15 +92,16 @@ function genericRemoteDocumentCacheTests(): void { }); } - beforeEach(() => { + beforeEach(async () => { + persistence = await persistencePromise(); cache = new TestRemoteDocumentCache( persistence, persistence.getRemoteDocumentCache() ); - - return cache.start(); }); + afterEach(() => persistence.shutdown(/* deleteData= */ true)); + it('returns null for document not in cache', () => { return cache.getEntry(key(DOC_PATH)).then(doc => { expect(doc).to.equal(null); diff --git a/packages/firestore/test/unit/local/remote_document_change_buffer.test.ts b/packages/firestore/test/unit/local/remote_document_change_buffer.test.ts index e19af434519..ad95e86620c 100644 --- a/packages/firestore/test/unit/local/remote_document_change_buffer.test.ts +++ b/packages/firestore/test/unit/local/remote_document_change_buffer.test.ts @@ -16,7 +16,6 @@ import { expect } from 'chai'; import { IndexedDbPersistence } from '../../../src/local/indexeddb_persistence'; -import { Persistence } from '../../../src/local/persistence'; import { RemoteDocumentChangeBuffer } from '../../../src/local/remote_document_change_buffer'; import { deletedDoc, doc, expectEqual, key } from '../../util/helpers'; @@ -24,7 +23,7 @@ import { testIndexedDbPersistence } from './persistence_test_helpers'; import { TestRemoteDocumentCache } from './test_remote_document_cache'; import { TestRemoteDocumentChangeBuffer } from './test_remote_document_change_buffer'; -let persistence: Persistence; +let persistence: IndexedDbPersistence; let cache: TestRemoteDocumentCache; let buffer: TestRemoteDocumentChangeBuffer; const INITIAL_DOC = doc('coll/a', 42, { test: 'data' }); diff --git a/packages/firestore/test/unit/local/test_mutation_queue.ts b/packages/firestore/test/unit/local/test_mutation_queue.ts index 12ca96614ea..a3094dff5d0 100644 --- a/packages/firestore/test/unit/local/test_mutation_queue.ts +++ b/packages/firestore/test/unit/local/test_mutation_queue.ts @@ -127,7 +127,7 @@ export class TestMutationQueue { ): Promise { return this.persistence.runTransaction( 'getAllMutationBatchesThroughBatchId', - true, + true, txn => { return this.queue.getAllMutationBatchesThroughBatchId(txn, batchId); } @@ -149,6 +149,21 @@ export class TestMutationQueue { ); } + getAllMutationBatchesAffectingDocumentKeys( + documentKeys: DocumentKeySet + ): Promise { + return this.persistence.runTransaction( + 'getAllMutationBatchesAffectingDocumentKeys', + true, + txn => { + return this.queue.getAllMutationBatchesAffectingDocumentKeys( + txn, + documentKeys + ); + } + ); + } + getAllMutationBatchesAffectingQuery(query: Query): Promise { return this.persistence.runTransaction( 'getAllMutationBatchesAffectingQuery', diff --git a/packages/firestore/test/unit/specs/limbo_spec.test.ts b/packages/firestore/test/unit/specs/limbo_spec.test.ts index 90f53cc6a6a..6ba4f233137 100644 --- a/packages/firestore/test/unit/specs/limbo_spec.test.ts +++ b/packages/firestore/test/unit/specs/limbo_spec.test.ts @@ -362,34 +362,30 @@ describeSpec('Limbo Documents:', [], () => { const docB = doc('collection/b', 1001, { key: 'b' }); const deletedDocB = deletedDoc('collection/b', 1005); - return ( - client(0, false) - .expectPrimaryState(true) - .client(1) - .userListens(query) - .client(0) - .expectListen(query) - .watchAcksFull(query, 1002, docA, docB) - .client(1) - .expectEvents(query, { added: [docA, docB] }) - .client(0) - .watchRemovesDoc(docB.key, query) - .watchSnapshots(1003) - .expectLimboDocs(docB.key) - .shutdown() - .client(1) - .expectEvents(query, { fromCache: true }) - .runTimer(TimerId.ClientMetadataRefresh) - .expectPrimaryState(true) - // TODO(37254270): This should be 'resume-token-1003' from the last - // global snapshot. - .expectListen(query, 'resume-token-1002') - .watchAcksFull(query, 1004) - .expectLimboDocs(docB.key) - .ackLimbo(1005, deletedDocB) - .expectLimboDocs() - .expectEvents(query, { removed: [docB] }) - ); + return client(0, false) + .expectPrimaryState(true) + .client(1) + .userListens(query) + .client(0) + .expectListen(query) + .watchAcksFull(query, 1 * 1e6, docA, docB) + .client(1) + .expectEvents(query, { added: [docA, docB] }) + .client(0) + .watchRemovesDoc(docB.key, query) + .watchSnapshots(2 * 1e6) + .expectLimboDocs(docB.key) + .shutdown() + .client(1) + .expectEvents(query, { fromCache: true }) + .runTimer(TimerId.ClientMetadataRefresh) + .expectPrimaryState(true) + .expectListen(query, 'resume-token-1000000') + .watchAcksFull(query, 3 * 1e6) + .expectLimboDocs(docB.key) + .ackLimbo(4 * 1e6, deletedDocB) + .expectLimboDocs() + .expectEvents(query, { removed: [docB] }); } ); @@ -404,41 +400,37 @@ describeSpec('Limbo Documents:', [], () => { const deletedDocB = deletedDoc('collection/b', 1006); const deletedDocC = deletedDoc('collection/c', 1008); - return ( - client(0, false) - .expectPrimaryState(true) - .userListens(query) - .watchAcksFull(query, 1002, docA, docB, docC) - .expectEvents(query, { added: [docA, docB, docC] }) - .watchRemovesDoc(docB.key, query) - .watchRemovesDoc(docC.key, query) - .watchSnapshots(1003) - .expectEvents(query, { fromCache: true }) - .expectLimboDocs(docB.key, docC.key) - .client(1) - .stealPrimaryLease() - .client(0) - .runTimer(TimerId.ClientMetadataRefresh) - .expectPrimaryState(false) - .expectLimboDocs() - .client(1) - // TODO(37254270): This should be 'resume-token-1003' from the last - // global snapshot. - .expectListen(query, 'resume-token-1002') - .watchAcksFull(query, 1005) - .expectLimboDocs(docB.key, docC.key) - .ackLimbo(1006, deletedDocB) - .expectLimboDocs(docC.key) - .client(0) - .expectEvents(query, { removed: [docB], fromCache: true }) - .stealPrimaryLease() - .expectListen(query, 'resume-token-1005') - .watchAcksFull(query, 1007) - .expectLimboDocs(docC.key) - .ackLimbo(1007, deletedDocC) - .expectLimboDocs() - .expectEvents(query, { removed: [docC] }) - ); + return client(0, false) + .expectPrimaryState(true) + .userListens(query) + .watchAcksFull(query, 1 * 1e6, docA, docB, docC) + .expectEvents(query, { added: [docA, docB, docC] }) + .watchRemovesDoc(docB.key, query) + .watchRemovesDoc(docC.key, query) + .watchSnapshots(2 * 1e6) + .expectEvents(query, { fromCache: true }) + .expectLimboDocs(docB.key, docC.key) + .client(1) + .stealPrimaryLease() + .client(0) + .runTimer(TimerId.ClientMetadataRefresh) + .expectPrimaryState(false) + .expectLimboDocs() + .client(1) + .expectListen(query, 'resume-token-1000000') + .watchAcksFull(query, 3 * 1e6) + .expectLimboDocs(docB.key, docC.key) + .ackLimbo(3 * 1e6, deletedDocB) + .expectLimboDocs(docC.key) + .client(0) + .expectEvents(query, { removed: [docB], fromCache: true }) + .stealPrimaryLease() + .expectListen(query, 'resume-token-1000000') + .watchAcksFull(query, 5 * 1e6) + .expectLimboDocs(docC.key) + .ackLimbo(6 * 1e6, deletedDocC) + .expectLimboDocs() + .expectEvents(query, { removed: [docC] }); } ); }); diff --git a/packages/firestore/test/unit/specs/listen_spec.test.ts b/packages/firestore/test/unit/specs/listen_spec.test.ts index 4f12ac1940c..a1864515284 100644 --- a/packages/firestore/test/unit/specs/listen_spec.test.ts +++ b/packages/firestore/test/unit/specs/listen_spec.test.ts @@ -544,6 +544,90 @@ describeSpec('Listens:', [], () => { .expectEvents(query, {}); }); + specTest('Persists global resume tokens on unlisten', [], () => { + const query = Query.atPath(path('collection')); + const docA = doc('collection/a', 1000, { key: 'a' }); + + return ( + spec() + .withGCEnabled(false) + .userListens(query) + .watchAcksFull(query, 1000, docA) + .expectEvents(query, { added: [docA] }) + + // Some time later, watch sends an updated resume token and the user stops + // listening. + .watchSnapshots(2000, [], 'resume-token-2000') + .userUnlistens(query) + .watchRemoves(query) + + .userListens(query, 'resume-token-2000') + .expectEvents(query, { added: [docA], fromCache: true }) + .watchAcks(query) + .watchCurrents(query, 'resume-token-3000') + .watchSnapshots(3000) + .expectEvents(query, { fromCache: false }) + ); + }); + + specTest('Omits global resume tokens for a short while', [], () => { + const query = Query.atPath(path('collection')); + const docA = doc('collection/a', 1000, { key: 'a' }); + + return ( + spec() + .withGCEnabled(false) + .userListens(query) + .watchAcksFull(query, 1000, docA) + .expectEvents(query, { added: [docA] }) + + // One millisecond later, watch sends an updated resume token but the + // user doesn't manage to unlisten before restart. + .watchSnapshots(2000, [], 'resume-token-2000') + .restart() + + .userListens(query, 'resume-token-1000') + .expectEvents(query, { added: [docA], fromCache: true }) + .watchAcks(query) + .watchCurrents(query, 'resume-token-3000') + .watchSnapshots(3000) + .expectEvents(query, { fromCache: false }) + ); + }); + + specTest( + 'Persists global resume tokens if the snapshot is old enough', + [], + () => { + const initialVersion = 1000; + const minutesLater = 5 * 60 * 1e6 + initialVersion; + const evenLater = 1000 + minutesLater; + + const query = Query.atPath(path('collection')); + const docA = doc('collection/a', initialVersion, { key: 'a' }); + + return ( + spec() + .withGCEnabled(false) + .userListens(query) + .watchAcksFull(query, initialVersion, docA) + .expectEvents(query, { added: [docA] }) + + // 5 minutes later, watch sends an updated resume token but the user + // doesn't manage to unlisten before restart. + .watchSnapshots(minutesLater, [], 'resume-token-minutes-later') + .restart() + + .userListens(query, 'resume-token-minutes-later') + .expectEvents(query, { added: [docA], fromCache: true }) + .watchAcks(query) + .watchCurrents(query, 'resume-token-even-later') + .watchSnapshots(evenLater) + .expectEvents(query, { fromCache: false }) + ); + } + ); + specTest('Query is executed by primary client', ['multi-client'], () => { const query = Query.atPath(path('collection')); const docA = doc('collection/a', 1000, { key: 'a' }); diff --git a/packages/firestore/test/unit/specs/spec_test_runner.ts b/packages/firestore/test/unit/specs/spec_test_runner.ts index 13eaaeca895..ca1a0217d00 100644 --- a/packages/firestore/test/unit/specs/spec_test_runner.ts +++ b/packages/firestore/test/unit/specs/spec_test_runner.ts @@ -494,9 +494,11 @@ abstract class TestRunner { } async shutdown(): Promise { - if (this.started) { - await this.doShutdown(); - } + await this.queue.enqueue(async () => { + if (this.started) { + await this.doShutdown(); + } + }); } /** Runs a single SpecStep on this runner. */ @@ -1144,7 +1146,6 @@ class MemoryTestRunner extends TestRunner { */ class IndexedDbTestRunner extends TestRunner { static TEST_DB_NAME = 'firestore/[DEFAULT]/specs'; - protected getSharedClientState(): SharedClientState { return new WebStorageSharedClientState( this.queue, diff --git a/packages/firestore/test/unit/specs/write_spec.test.ts b/packages/firestore/test/unit/specs/write_spec.test.ts index 11db44ba686..62ff28a955e 100644 --- a/packages/firestore/test/unit/specs/write_spec.test.ts +++ b/packages/firestore/test/unit/specs/write_spec.test.ts @@ -1245,4 +1245,30 @@ describeSpec('Writes:', [], () => { ); } ); + + specTest( + 'Mutation are not sent twice after primary failover', + ['multi-client'], + () => { + const query = Query.atPath(path('collection')); + const docA = doc('collection/a', 0, { k: 'a' }); + const docB = doc('collection/b', 0, { k: 'b' }); + + return client(0) + .expectPrimaryState(true) + .userSets('collection/a', { k: 'a' }) + .userSets('collection/b', { k: 'b' }) + .client(1) + .stealPrimaryLease() + .writeAcks('collection/a', 1000, { expectUserCallback: false }) + .client(0) + .expectUserCallbacks({ + acknowledged: ['collection/a'] + }) + .stealPrimaryLease() + .writeAcks('collection/b', 2000) + .userListens(query) + .expectEvents(query, { added: [docA, docB], fromCache: true }); + } + ); }); diff --git a/packages/rxfire/docs/storage.md b/packages/rxfire/docs/storage.md new file mode 100644 index 00000000000..0f99640ca27 --- /dev/null +++ b/packages/rxfire/docs/storage.md @@ -0,0 +1,172 @@ +# RxFire Storage + +## Task Observables + +### `fromTask()` +The `fromTask()` function creates an observable that emits progress changes. + +| | | +|-----------------|--------------------------------------------| +| **function** | `fromTask()` | +| **params** | `storage.UploadTask` | +| **import path** | `rxfire/storage` | +| **return** | `Observable` | + +#### TypeScript Example +```ts +import { fromTask } from 'rxfire/firestore'; +import * as firebase from 'firebase'; +import 'firebase/storage'; + +// Set up Firebase +const app = initializeApp({ /* config */ }); +const storage = app.storage(); +const davidRef = storage.ref('users/david.png'); + +// Upload a transparent 1x1 pixel image +const task = davidRef.putString('R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7', 'base64'); + +fromTask(task) + .subscribe(snap => { console.log(snap.bytesTransferred); }); +``` + +### `percentage()` +The `percentage()` function creates an observable that emits percentage of the uploaded bytes. + +| | | +|-----------------|--------------------------------------------| +| **function** | `fromTask()` | +| **params** | `storage.UploadTask` | +| **import path** | `rxfire/storage` | +| **return** | `Observable` | + +#### TypeScript Example +```ts +import { percentage } from 'rxfire/firestore'; +import * as firebase from 'firebase'; +import 'firebase/storage'; + +// Set up Firebase +const app = initializeApp({ /* config */ }); +const storage = app.storage(); +const davidRef = storage.ref('users/david.png'); + +// Upload a transparent 1x1 pixel image +const task = davidRef.putString('R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7', 'base64'); + +percentage(task) + .subscribe(uploadProgress => { console.log(uploadProgress); }); +``` + +## Reference Observables + +### `getDownloadURL()` +The `getDownloadURL()` function creates an observable that emits the URL of the file. + +| | | +|-----------------|------------------------------------------| +| **function** | `getDownloadURL()` | +| **params** | `storage.Reference` | +| **import path** | `rxfire/storage` | +| **return** | `Observable` | + +#### TypeScript Example +```ts +import { getDownloadURL } from 'rxfire/storage'; +import * as firebase from 'firebase'; +import 'firebase/storage'; + +// Set up Firebase +const app = initializeApp({ /* config */ }); +const storage = app.storage(); + +// Assume this exists +const davidRef = storage.ref('users/david.png'); + +getDownloadURL(davidRef) + .subscribe(url => { console.log(url) }); +``` + +### `getMetadata()` +The `getMetadata()` function creates an observable that emits the URL of the file's metadta. + +| | | +|-----------------|------------------------------------------| +| **function** | `getMetadata()` | +| **params** | `storage.Reference` | +| **import path** | `rxfire/storage` | +| **return** | `Observable` | + +#### TypeScript Example +```ts +import { getMetadata } from 'rxfire/storage'; +import * as firebase from 'firebase'; +import 'firebase/storage'; + +// Set up Firebase +const app = initializeApp({ /* config */ }); +const storage = app.storage(); + +// Assume this exists +const davidRef = storage.ref('users/david.png'); + +getMetadata(davidRef) + .subscribe(meta => { console.log(meta) }); +``` + +### `put()` +The `put()` function creates an observable that emits the upload progress of a file. + +| | | +|-----------------|------------------------------------------| +| **function** | `put()` | +| **params** | ref: `storage.Reference`, data: `any`, metadata?: `storage.UploadMetadata` | +| **import path** | `rxfire/storage` | +| **return** | `Observable` | + +#### TypeScript Example +```ts +import { put } from 'rxfire/storage'; +import * as firebase from 'firebase'; +import 'firebase/storage'; + +// Set up Firebase +const app = initializeApp({ /* config */ }); +const storage = app.storage(); +const dataRef = storage.ref('users/david.json'); + +const blob = new Blob( + [JSON.stringify({ name: 'david'}, null, 2)], + { type : 'application/json' } +); + +put(davidRef, blob, { type : 'application/json' }) + .subscribe(snap => { console.log(snap.bytesTransferred) }); +``` + +### `putString()` +The `putString()` function creates an observable that emits the upload progress of a file. + +| | | +|-----------------|------------------------------------------| +| **function** | `putString()` | +| **params** | ref: `storage.Reference`, data: `string`, metadata?: `storage.UploadMetadata` | +| **import path** | `rxfire/storage` | +| **return** | `Observable` | + +#### TypeScript Example +```ts +import { putString } from 'rxfire/storage'; +import * as firebase from 'firebase'; +import 'firebase/storage'; + +// Set up Firebase +const app = initializeApp({ /* config */ }); +const storage = app.storage(); +const davidRef = storage.ref('users/david.png'); + +const base64 = 'R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7'; + +putString(davidRef, base64, { type : 'application/json' }) + .subscribe(snap => { console.log(snap.bytesTransferred) }); +``` diff --git a/packages/rxfire/firestore/collection/index.ts b/packages/rxfire/firestore/collection/index.ts index 817de03722b..ecc647ad66c 100644 --- a/packages/rxfire/firestore/collection/index.ts +++ b/packages/rxfire/firestore/collection/index.ts @@ -117,7 +117,7 @@ function processDocumentChanges( * order of occurence. * @param query */ -export function docChanges( +export function collectionChanges( query: firestore.Query, events: firestore.DocumentChangeType[] = ALL_EVENTS ) { @@ -144,7 +144,7 @@ export function sortedChanges( query: firestore.Query, events?: firestore.DocumentChangeType[] ) { - return docChanges(query, events).pipe( + return collectionChanges(query, events).pipe( scan( ( current: firestore.DocumentChange[], @@ -163,7 +163,7 @@ export function auditTrail( query: firestore.Query, events?: firestore.DocumentChangeType[] ): Observable { - return docChanges(query, events).pipe( + return collectionChanges(query, events).pipe( scan((current, action) => [...current, ...action], []) ); }