diff --git a/packages/firestore/CHANGELOG.md b/packages/firestore/CHANGELOG.md index ca0b5555545..a9180db70fa 100644 --- a/packages/firestore/CHANGELOG.md +++ b/packages/firestore/CHANGELOG.md @@ -1,4 +1,9 @@ # Unreleased +- [changed] Improved how Firestore handles idle queries to reduce the cost of + re-listening within 30 minutes. +- [changed] Improved offline performance with many outstanding writes. + +# 0.6.0 - [fixed] Fixed an issue where queries returned fewer results than they should, caused by documents that were cached as deleted when they should not have been (firebase/firebase-ios-sdk#1548). Because some cache data is cleared, diff --git a/packages/firestore/src/core/sync_engine.ts b/packages/firestore/src/core/sync_engine.ts index 212cd5cb86b..6319a2e85f5 100644 --- a/packages/firestore/src/core/sync_engine.ts +++ b/packages/firestore/src/core/sync_engine.ts @@ -50,7 +50,6 @@ import { MutationBatchState, OnlineState, OnlineStateSource, - ProtoByteString, TargetId } from './types'; import { @@ -88,12 +87,6 @@ class QueryView { * stream to identify this query. */ public targetId: TargetId, - /** - * An identifier from the datastore backend that indicates the last state - * of the results that was received. This can be used to indicate where - * to continue receiving new doc changes for the query. - */ - public resumeToken: ProtoByteString, /** * The view is responsible for computing the final merged truth of what * docs are in the query. It gets notified of local and remote changes, @@ -274,12 +267,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { 'applyChanges for new view should always return a snapshot' ); - const data = new QueryView( - query, - queryData.targetId, - queryData.resumeToken, - view - ); + const data = new QueryView(query, queryData.targetId, view); this.queryViewsByQuery.set(query, data); this.queryViewsByTarget[queryData.targetId] = data; return viewChange.snapshot!; diff --git a/packages/firestore/src/local/indexeddb_mutation_queue.ts b/packages/firestore/src/local/indexeddb_mutation_queue.ts index 5cf51246c27..1b0c902b22c 100644 --- a/packages/firestore/src/local/indexeddb_mutation_queue.ts +++ b/packages/firestore/src/local/indexeddb_mutation_queue.ts @@ -18,6 +18,7 @@ import { Timestamp } from '../api/timestamp'; import { User } from '../auth/user'; import { Query } from '../core/query'; import { BatchId, ProtoByteString } from '../core/types'; +import { DocumentKeySet } from '../model/collections'; import { DocumentKey } from '../model/document_key'; import { Mutation } from '../model/mutation'; import { BATCHID_UNKNOWN, MutationBatch } from '../model/mutation_batch'; @@ -40,8 +41,8 @@ import { LocalSerializer } from './local_serializer'; import { MutationQueue } from './mutation_queue'; import { PersistenceTransaction } from './persistence'; import { PersistencePromise } from './persistence_promise'; -import { SimpleDb, SimpleDbStore } from './simple_db'; -import { DocumentKeySet } from '../model/collections'; +import { SimpleDbStore } from './simple_db'; +import { IndexedDbPersistence } from './indexeddb_persistence'; /** A mutation queue for a specific user, backed by IndexedDB. */ export class IndexedDbMutationQueue implements MutationQueue { @@ -342,6 +343,50 @@ export class IndexedDbMutationQueue implements MutationQueue { .next(() => results); } + getAllMutationBatchesAffectingDocumentKeys( + transaction: PersistenceTransaction, + documentKeys: DocumentKeySet + ): PersistencePromise { + let uniqueBatchIDs = new SortedSet(primitiveComparator); + + const promises: Array> = []; + documentKeys.forEach(documentKey => { + const indexStart = DbDocumentMutation.prefixForPath( + this.userId, + documentKey.path + ); + const range = IDBKeyRange.lowerBound(indexStart); + + const promise = documentMutationsStore(transaction).iterate( + { range }, + (indexKey, _, control) => { + const [userID, encodedPath, batchID] = indexKey; + + // Only consider rows matching exactly the specific key of + // interest. Note that because we order by path first, and we + // order terminators before path separators, we'll encounter all + // the index rows for documentKey contiguously. In particular, all + // the rows for documentKey will occur before any rows for + // documents nested in a subcollection beneath documentKey so we + // can stop as soon as we hit any such row. + const path = EncodedResourcePath.decode(encodedPath); + if (userID !== this.userId || !documentKey.path.isEqual(path)) { + control.done(); + return; + } + + uniqueBatchIDs = uniqueBatchIDs.add(batchID); + } + ); + + promises.push(promise); + }); + + return PersistencePromise.waitFor(promises).next(() => + this.lookupMutationBatches(transaction, uniqueBatchIDs) + ); + } + getAllMutationBatchesAffectingQuery( transaction: PersistenceTransaction, query: Query @@ -393,34 +438,39 @@ export class IndexedDbMutationQueue implements MutationQueue { } uniqueBatchIDs = uniqueBatchIDs.add(batchID); }) - .next(() => { - const results: MutationBatch[] = []; - const promises: Array> = []; - // TODO(rockwood): Implement this using iterate. - uniqueBatchIDs.forEach(batchId => { - promises.push( - mutationsStore(transaction) - .get(batchId) - .next(mutation => { - if (!mutation) { - fail( - 'Dangling document-mutation reference found, ' + - 'which points to ' + - batchId - ); - } - assert( - mutation.userId === this.userId, - `Unexpected user '${ - mutation.userId - }' for mutation batch ${batchId}` - ); - results.push(this.serializer.fromDbMutationBatch(mutation!)); - }) - ); - }); - return PersistencePromise.waitFor(promises).next(() => results); - }); + .next(() => this.lookupMutationBatches(transaction, uniqueBatchIDs)); + } + + private lookupMutationBatches( + transaction: PersistenceTransaction, + batchIDs: SortedSet + ): PersistencePromise { + const results: MutationBatch[] = []; + const promises: Array> = []; + // TODO(rockwood): Implement this using iterate. + batchIDs.forEach(batchId => { + promises.push( + mutationsStore(transaction) + .get(batchId) + .next(mutation => { + if (mutation === null) { + fail( + 'Dangling document-mutation reference found, ' + + 'which points to ' + + batchId + ); + } + assert( + mutation.userId === this.userId, + `Unexpected user '${ + mutation.userId + }' for mutation batch ${batchId}` + ); + results.push(this.serializer.fromDbMutationBatch(mutation!)); + }) + ); + }); + return PersistencePromise.waitFor(promises).next(() => results); } removeMutationBatches( @@ -567,7 +617,7 @@ function convertStreamToken(token: ProtoByteString): string { function mutationsStore( txn: PersistenceTransaction ): SimpleDbStore { - return SimpleDb.getStore( + return IndexedDbPersistence.getStore( txn, DbMutationBatch.store ); @@ -579,10 +629,10 @@ function mutationsStore( function documentMutationsStore( txn: PersistenceTransaction ): SimpleDbStore { - return SimpleDb.getStore( - txn, - DbDocumentMutation.store - ); + return IndexedDbPersistence.getStore< + DbDocumentMutationKey, + DbDocumentMutation + >(txn, DbDocumentMutation.store); } /** @@ -591,7 +641,7 @@ function documentMutationsStore( function mutationQueuesStore( txn: PersistenceTransaction ): SimpleDbStore { - return SimpleDb.getStore( + return IndexedDbPersistence.getStore( txn, DbMutationQueue.store ); diff --git a/packages/firestore/src/local/indexeddb_persistence.ts b/packages/firestore/src/local/indexeddb_persistence.ts index a60d3dc883d..830df9b30ab 100644 --- a/packages/firestore/src/local/indexeddb_persistence.ts +++ b/packages/firestore/src/local/indexeddb_persistence.ts @@ -17,7 +17,7 @@ import { User } from '../auth/user'; import { DatabaseInfo } from '../core/database_info'; import { JsonProtoSerializer } from '../remote/serializer'; -import { assert } from '../util/assert'; +import { assert, fail } from '../util/assert'; import { Code, FirestoreError } from '../util/error'; import * as log from '../util/log'; @@ -83,6 +83,12 @@ const UNSUPPORTED_PLATFORM_ERROR_MSG = // firestore_zombie__ const ZOMBIED_CLIENTS_KEY_PREFIX = 'firestore_zombie'; +export class IndexedDbTransaction extends PersistenceTransaction { + constructor(readonly simpleDbTransaction: SimpleDbTransaction) { + super(); + } +} + /** * An IndexedDB-backed instance of Persistence. Data is stored persistently * across sessions. @@ -115,6 +121,17 @@ const ZOMBIED_CLIENTS_KEY_PREFIX = 'firestore_zombie'; * TODO(multitab): Update this comment with multi-tab changes. */ export class IndexedDbPersistence implements Persistence { + static getStore( + txn: PersistenceTransaction, + store: string + ): SimpleDbStore { + if (txn instanceof IndexedDbTransaction) { + return SimpleDb.getStore(txn.simpleDbTransaction, store); + } else { + fail('IndexedDbPersistence must use instances of IndexedDbTransaction'); + } + } + /** * The name of the main (and currently only) IndexedDB database. this name is * appended to the prefix provided to the IndexedDbPersistence constructor. @@ -470,7 +487,7 @@ export class IndexedDbPersistence implements Persistence { action: string, requirePrimaryLease: boolean, transactionOperation: ( - transaction: PersistenceTransaction + transaction: IndexedDbTransaction ) => PersistencePromise ): Promise { // TODO(multitab): Consider removing `requirePrimaryLease` and exposing @@ -483,39 +500,47 @@ export class IndexedDbPersistence implements Persistence { // Do all transactions as readwrite against all object stores, since we // are the only reader/writer. - return this.simpleDb.runTransaction('readwrite', ALL_STORES, txn => { - if (requirePrimaryLease) { - // While we merely verify that we have (or can acquire) the lease - // immediately, we wait to extend the primary lease until after - // executing transactionOperation(). This ensures that even if the - // transactionOperation takes a long time, we'll use a recent - // leaseTimestampMs in the extended (or newly acquired) lease. - return this.canActAsPrimary(txn) - .next(canActAsPrimary => { - if (!canActAsPrimary) { - // TODO(multitab): Handle this gracefully and transition back to - // secondary state. - log.error( - `Failed to obtain primary lease for action '${action}'.` + return this.simpleDb.runTransaction( + 'readwrite', + ALL_STORES, + simpleDbTxn => { + if (requirePrimaryLease) { + // While we merely verify that we have (or can acquire) the lease + // immediately, we wait to extend the primary lease until after + // executing transactionOperation(). This ensures that even if the + // transactionOperation takes a long time, we'll use a recent + // leaseTimestampMs in the extended (or newly acquired) lease. + return this.canActAsPrimary(simpleDbTxn) + .next(canActAsPrimary => { + if (!canActAsPrimary) { + // TODO(multitab): Handle this gracefully and transition back to + // secondary state. + log.error( + `Failed to obtain primary lease for action '${action}'.` + ); + this.isPrimary = false; + this.queue.enqueue(() => this.primaryStateListener(false)); + throw new FirestoreError( + Code.FAILED_PRECONDITION, + PRIMARY_LEASE_LOST_ERROR_MSG + ); + } + return transactionOperation( + new IndexedDbTransaction(simpleDbTxn) ); - this.isPrimary = false; - this.queue.enqueue(() => this.primaryStateListener(false)); - throw new FirestoreError( - Code.FAILED_PRECONDITION, - PRIMARY_LEASE_LOST_ERROR_MSG + }) + .next(result => { + return this.acquireOrExtendPrimaryLease(simpleDbTxn).next( + () => result ); - } - return transactionOperation(txn); - }) - .next(result => { - return this.acquireOrExtendPrimaryLease(txn).next(() => result); - }); - } else { - return this.verifyAllowTabSynchronization(txn).next(() => - transactionOperation(txn) - ); + }); + } else { + return this.verifyAllowTabSynchronization(simpleDbTxn).next(() => + transactionOperation(new IndexedDbTransaction(simpleDbTxn)) + ); + } } - }); + ); } /** diff --git a/packages/firestore/src/local/indexeddb_query_cache.ts b/packages/firestore/src/local/indexeddb_query_cache.ts index 79e107ad067..e268a7680a9 100644 --- a/packages/firestore/src/local/indexeddb_query_cache.ts +++ b/packages/firestore/src/local/indexeddb_query_cache.ts @@ -38,8 +38,9 @@ import { PersistenceTransaction } from './persistence'; import { PersistencePromise } from './persistence_promise'; import { QueryCache } from './query_cache'; import { QueryData } from './query_data'; -import { SimpleDb, SimpleDbStore } from './simple_db'; import { TargetIdGenerator } from '../core/target_id_generator'; +import { SimpleDbStore } from './simple_db'; +import { IndexedDbPersistence } from './indexeddb_persistence'; export class IndexedDbQueryCache implements QueryCache { constructor(private serializer: LocalSerializer) {} @@ -221,7 +222,7 @@ export class IndexedDbQueryCache implements QueryCache { targetId: TargetId ): PersistencePromise { // PORTING NOTE: The reverse index (documentsTargets) is maintained by - // Indexeddb. + // IndexedDb. const promises: Array> = []; const store = documentTargetStore(txn); keys.forEach(key => { @@ -316,6 +317,8 @@ export class IndexedDbQueryCache implements QueryCache { this.garbageCollector = gc; } + // TODO(gsoltis): we can let the compiler assert that txn !== null if we + // drop null from the type bounds on txn. containsKey( txn: PersistenceTransaction | null, key: DocumentKey @@ -369,7 +372,10 @@ export class IndexedDbQueryCache implements QueryCache { function targetsStore( txn: PersistenceTransaction ): SimpleDbStore { - return SimpleDb.getStore(txn, DbTarget.store); + return IndexedDbPersistence.getStore( + txn, + DbTarget.store + ); } /** @@ -378,7 +384,7 @@ function targetsStore( function globalTargetStore( txn: PersistenceTransaction ): SimpleDbStore { - return SimpleDb.getStore( + return IndexedDbPersistence.getStore( txn, DbTargetGlobal.store ); @@ -390,7 +396,7 @@ function globalTargetStore( function documentTargetStore( txn: PersistenceTransaction ): SimpleDbStore { - return SimpleDb.getStore( + return IndexedDbPersistence.getStore( txn, DbTargetDocument.store ); diff --git a/packages/firestore/src/local/indexeddb_remote_document_cache.ts b/packages/firestore/src/local/indexeddb_remote_document_cache.ts index ef4bebdea80..53a4340c1b1 100644 --- a/packages/firestore/src/local/indexeddb_remote_document_cache.ts +++ b/packages/firestore/src/local/indexeddb_remote_document_cache.ts @@ -31,13 +31,14 @@ import { DbRemoteDocumentChanges, DbRemoteDocumentChangesKey } from './indexeddb_schema'; +import { IndexedDbPersistence } from './indexeddb_persistence'; import { LocalSerializer } from './local_serializer'; import { PersistenceTransaction } from './persistence'; import { PersistencePromise } from './persistence_promise'; import { RemoteDocumentCache } from './remote_document_cache'; -import { SimpleDb, SimpleDbStore } from './simple_db'; import { SnapshotVersion } from '../core/snapshot_version'; import { assert } from '../util/assert'; +import { SimpleDbStore } from './simple_db'; export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache { /** The last id read by `getNewDocumentChanges()`. */ @@ -191,7 +192,7 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache { function remoteDocumentsStore( txn: PersistenceTransaction ): SimpleDbStore { - return SimpleDb.getStore( + return IndexedDbPersistence.getStore( txn, DbRemoteDocument.store ); @@ -204,10 +205,10 @@ function remoteDocumentsStore( function documentChangesStore( txn: PersistenceTransaction ): SimpleDbStore { - return SimpleDb.getStore( - txn, - DbRemoteDocumentChanges.store - ); + return IndexedDbPersistence.getStore< + DbRemoteDocumentChangesKey, + DbRemoteDocumentChanges + >(txn, DbRemoteDocumentChanges.store); } function dbKey(docKey: DocumentKey): DbRemoteDocumentKey { diff --git a/packages/firestore/src/local/local_documents_view.ts b/packages/firestore/src/local/local_documents_view.ts index 6466e8a26c2..78f33d9e050 100644 --- a/packages/firestore/src/local/local_documents_view.ts +++ b/packages/firestore/src/local/local_documents_view.ts @@ -17,7 +17,6 @@ import { Query } from '../core/query'; import { SnapshotVersion } from '../core/snapshot_version'; import { - documentKeySet, DocumentKeySet, DocumentMap, documentMap, @@ -26,6 +25,7 @@ import { } from '../model/collections'; import { Document, MaybeDocument, NoDocument } from '../model/document'; import { DocumentKey } from '../model/document_key'; +import { MutationBatch } from '../model/mutation_batch'; import { ResourcePath } from '../model/path'; import { fail } from '../util/assert'; @@ -56,11 +56,23 @@ export class LocalDocumentsView { transaction: PersistenceTransaction, key: DocumentKey ): PersistencePromise { - return this.remoteDocumentCache - .getEntry(transaction, key) - .next(remoteDoc => { - return this.computeLocalDocument(transaction, key, remoteDoc); - }); + return this.mutationQueue + .getAllMutationBatchesAffectingDocumentKey(transaction, key) + .next(batches => this.getDocumentInternal(transaction, key, batches)); + } + + /** Internal version of `getDocument` that allows reusing batches. */ + private getDocumentInternal( + transaction: PersistenceTransaction, + key: DocumentKey, + inBatches: MutationBatch[] + ): PersistencePromise { + return this.remoteDocumentCache.getEntry(transaction, key).next(doc => { + for (const batch of inBatches) { + doc = batch.applyToLocalView(key, doc); + } + return doc; + }); } /** @@ -73,20 +85,29 @@ export class LocalDocumentsView { transaction: PersistenceTransaction, keys: DocumentKeySet ): PersistencePromise { - const promises = [] as Array>; - let results = maybeDocumentMap(); - keys.forEach(key => { - promises.push( - this.getDocument(transaction, key).next(maybeDoc => { - // TODO(http://b/32275378): Don't conflate missing / deleted. - if (!maybeDoc) { - maybeDoc = new NoDocument(key, SnapshotVersion.forDeletedDoc()); - } - results = results.insert(key, maybeDoc); - }) - ); - }); - return PersistencePromise.waitFor(promises).next(() => results); + return this.mutationQueue + .getAllMutationBatchesAffectingDocumentKeys(transaction, keys) + .next(batches => { + const promises = [] as Array>; + let results = maybeDocumentMap(); + keys.forEach(key => { + promises.push( + this.getDocumentInternal(transaction, key, batches).next( + maybeDoc => { + // TODO(http://b/32275378): Don't conflate missing / deleted. + if (!maybeDoc) { + maybeDoc = new NoDocument( + key, + SnapshotVersion.forDeletedDoc() + ); + } + results = results.insert(key, maybeDoc); + } + ) + ); + }); + return PersistencePromise.waitFor(promises).next(() => results); + }); } /** @@ -126,48 +147,40 @@ export class LocalDocumentsView { query: Query ): PersistencePromise { // Query the remote documents and overlay mutations. - // TODO(mikelehen): There may be significant overlap between the mutations - // affecting these remote documents and the - // getAllMutationBatchesAffectingQuery() mutations. Consider optimizing. let results: DocumentMap; return this.remoteDocumentCache .getDocumentsMatchingQuery(transaction, query) .next(queryResults => { - return this.computeLocalDocuments(transaction, queryResults); - }) - .next(promisedResults => { - results = promisedResults; - // Now use the mutation queue to discover any other documents that may - // match the query after applying mutations. + results = queryResults; return this.mutationQueue.getAllMutationBatchesAffectingQuery( transaction, query ); }) .next(matchingMutationBatches => { - let matchingKeys = documentKeySet(); for (const batch of matchingMutationBatches) { for (const mutation of batch.mutations) { - // TODO(mikelehen): PERF: Check if this mutation actually - // affects the query to reduce work. - if (!results.get(mutation.key)) { - matchingKeys = matchingKeys.add(mutation.key); + const key = mutation.key; + // Only process documents belonging to the collection. + if (!query.path.isImmediateParentOf(key.path)) { + continue; + } + + const baseDoc = results.get(key); + const mutatedDoc = mutation.applyToLocalView( + baseDoc, + baseDoc, + batch.localWriteTime + ); + if (!mutatedDoc || mutatedDoc instanceof NoDocument) { + results = results.remove(key); + } else if (mutatedDoc instanceof Document) { + results = results.insert(key, mutatedDoc); + } else { + fail('Unknown MaybeDocument: ' + mutatedDoc); } } } - - // Now add in the results for the matchingKeys. - const promises = [] as Array>; - matchingKeys.forEach(key => { - promises.push( - this.getDocument(transaction, key).next(doc => { - if (doc instanceof Document) { - results = results.insert(doc.key, doc); - } - }) - ); - }); - return PersistencePromise.waitFor(promises); }) .next(() => { // Finally, filter out any documents that don't actually match @@ -181,57 +194,4 @@ export class LocalDocumentsView { return results; }); } - - /** - * Takes a remote document and applies local mutations to generate the local - * view of the document. - * @param transaction The transaction in which to perform any persistence - * operations. - * @param documentKey The key of the document (necessary when remoteDocument - * is null). - * @param document The base remote document to apply mutations to or null. - */ - private computeLocalDocument( - transaction: PersistenceTransaction, - documentKey: DocumentKey, - document: MaybeDocument | null - ): PersistencePromise { - return this.mutationQueue - .getAllMutationBatchesAffectingDocumentKey(transaction, documentKey) - .next(batches => { - for (const batch of batches) { - document = batch.applyToLocalView(documentKey, document); - } - return document; - }); - } - - /** - * Takes a set of remote documents and applies local mutations to generate the - * local view of the documents. - * @param transaction The transaction in which to perform any persistence - * operations. - * @param documents The base remote documents to apply mutations to. - * @return The local view of the documents. - */ - private computeLocalDocuments( - transaction: PersistenceTransaction, - documents: DocumentMap - ): PersistencePromise { - const promises = [] as Array>; - documents.forEach((key, doc) => { - promises.push( - this.computeLocalDocument(transaction, key, doc).next(mutatedDoc => { - if (mutatedDoc instanceof Document) { - documents = documents.insert(mutatedDoc.key, mutatedDoc); - } else if (mutatedDoc instanceof NoDocument) { - documents = documents.remove(mutatedDoc.key); - } else { - fail('Unknown MaybeDocument: ' + mutatedDoc); - } - }) - ); - }); - return PersistencePromise.waitFor(promises).next(() => documents); - } } diff --git a/packages/firestore/src/local/local_store.ts b/packages/firestore/src/local/local_store.ts index 340c6bfb804..b258d9eb4ac 100644 --- a/packages/firestore/src/local/local_store.ts +++ b/packages/firestore/src/local/local_store.ts @@ -118,6 +118,15 @@ export interface UserChangeResult { * unrecoverable error (should be caught / reported by the async_queue). */ export class LocalStore { + /** + * The maximum time to leave a resume token buffered without writing it out. + * This value is arbitrary: it's long enough to avoid several writes + * (possibly indefinitely if updates come more frequently than this) but + * short enough that restarting after crashing will still have a pretty + * recent resume token. + */ + private static readonly RESUME_TOKEN_MAX_AGE_MICROS = 5 * 60 * 1e6; + /** * The set of all mutations that have been sent but not yet been applied to * the backend. @@ -543,12 +552,18 @@ export class LocalStore { // any preexisting value. const resumeToken = change.resumeToken; if (resumeToken.length > 0) { + const oldQueryData = queryData; queryData = queryData.copy({ resumeToken, snapshotVersion: remoteEvent.snapshotVersion }); this.targetIds[targetId] = queryData; - promises.push(this.queryCache.updateQueryData(txn, queryData)); + + if ( + LocalStore.shouldPersistQueryData(oldQueryData, queryData, change) + ) { + promises.push(this.queryCache.updateQueryData(txn, queryData)); + } } } ); @@ -630,6 +645,50 @@ export class LocalStore { }); } + /** + * Returns true if the newQueryData should be persisted during an update of + * an active target. QueryData should always be persisted when a target is + * being released and should not call this function. + * + * While the target is active, QueryData updates can be omitted when nothing + * about the target has changed except metadata like the resume token or + * snapshot version. Occasionally it's worth the extra write to prevent these + * values from getting too stale after a crash, but this doesn't have to be + * too frequent. + */ + private static shouldPersistQueryData( + oldQueryData: QueryData, + newQueryData: QueryData, + change: TargetChange + ): boolean { + // Avoid clearing any existing value + if (newQueryData.resumeToken.length === 0) return false; + + // Any resume token is interesting if there isn't one already. + if (oldQueryData.resumeToken.length === 0) return true; + + // Don't allow resume token changes to be buffered indefinitely. This + // allows us to be reasonably up-to-date after a crash and avoids needing + // to loop over all active queries on shutdown. Especially in the browser + // we may not get time to do anything interesting while the current tab is + // closing. + const timeDelta = + newQueryData.snapshotVersion.toMicroseconds() - + oldQueryData.snapshotVersion.toMicroseconds(); + if (timeDelta >= this.RESUME_TOKEN_MAX_AGE_MICROS) return true; + + // Otherwise if the only thing that has changed about a target is its resume + // token it's not worth persisting. Note that the RemoteStore keeps an + // in-memory view of the currently active targets which includes the current + // resume token, so stream failure or user changes will still use an + // up-to-date resume token regardless of what we do here. + const changes = + change.addedDocuments.size + + change.modifiedDocuments.size + + change.removedDocuments.size; + return changes > 0; + } + /** * Notify local store of the changed views to locally pin documents. */ @@ -732,10 +791,21 @@ export class LocalStore { queryData != null, 'Tried to release nonexistent query: ' + query ); - this.localViewReferences.removeReferencesForId(queryData!.targetId); - delete this.targetIds[queryData!.targetId]; + const targetId = queryData!.targetId; + const cachedQueryData = this.targetIds[targetId]; + + this.localViewReferences.removeReferencesForId(targetId); + delete this.targetIds[targetId]; if (!keepPersistedQueryData && this.garbageCollector.isEager) { return this.queryCache.removeQueryData(txn, queryData!); + } else if ( + cachedQueryData.snapshotVersion > queryData!.snapshotVersion + ) { + // If we've been avoiding persisting the resumeToken (see + // shouldPersistQueryData for conditions and rationale) we need to + // persist the token now because there will no longer be an + // in-memory version to fall back on. + return this.queryCache.updateQueryData(txn, cachedQueryData); } else { return PersistencePromise.resolve(); } @@ -748,12 +818,8 @@ export class LocalStore { this.remoteDocuments ); return this.releaseHeldBatchResults(txn, documentBuffer).next( - () => { - documentBuffer.apply(txn); - } + () => documentBuffer.apply(txn) ); - } else { - return PersistencePromise.resolve(); } }); } diff --git a/packages/firestore/src/local/memory_mutation_queue.ts b/packages/firestore/src/local/memory_mutation_queue.ts index d87876a86e1..ba5af955576 100644 --- a/packages/firestore/src/local/memory_mutation_queue.ts +++ b/packages/firestore/src/local/memory_mutation_queue.ts @@ -17,6 +17,7 @@ import { Timestamp } from '../api/timestamp'; import { Query } from '../core/query'; import { BatchId, ProtoByteString } from '../core/types'; +import { DocumentKeySet } from '../model/collections'; import { DocumentKey } from '../model/document_key'; import { Mutation } from '../model/mutation'; import { BATCHID_UNKNOWN, MutationBatch } from '../model/mutation_batch'; @@ -30,7 +31,6 @@ import { MutationQueue } from './mutation_queue'; import { PersistenceTransaction } from './persistence'; import { PersistencePromise } from './persistence_promise'; import { DocReference } from './reference_set'; -import { DocumentKeySet } from '../model/collections'; export class MemoryMutationQueue implements MutationQueue { /** @@ -249,6 +249,28 @@ export class MemoryMutationQueue implements MutationQueue { return PersistencePromise.resolve(result); } + getAllMutationBatchesAffectingDocumentKeys( + transaction: PersistenceTransaction, + documentKeys: DocumentKeySet + ): PersistencePromise { + let uniqueBatchIDs = new SortedSet(primitiveComparator); + + documentKeys.forEach(documentKey => { + const start = new DocReference(documentKey, 0); + const end = new DocReference(documentKey, Number.POSITIVE_INFINITY); + this.batchesByDocumentKey.forEachInRange([start, end], ref => { + assert( + documentKey.isEqual(ref.key), + "For each key, should only iterate over a single key's batches" + ); + + uniqueBatchIDs = uniqueBatchIDs.add(ref.targetOrBatchId); + }); + }); + + return PersistencePromise.resolve(this.findMutationBatches(uniqueBatchIDs)); + } + getAllMutationBatchesAffectingQuery( transaction: PersistenceTransaction, query: Query @@ -290,16 +312,20 @@ export class MemoryMutationQueue implements MutationQueue { } }, start); + return PersistencePromise.resolve(this.findMutationBatches(uniqueBatchIDs)); + } + + private findMutationBatches(batchIDs: SortedSet): MutationBatch[] { // Construct an array of matching batches, sorted by batchID to ensure that // multiple mutations affecting the same document key are applied in order. const result: MutationBatch[] = []; - uniqueBatchIDs.forEach(batchId => { + batchIDs.forEach(batchId => { const batch = this.findMutationBatch(batchId); if (batch !== null) { result.push(batch); } }); - return PersistencePromise.resolve(result); + return result; } removeMutationBatches( diff --git a/packages/firestore/src/local/memory_persistence.ts b/packages/firestore/src/local/memory_persistence.ts index e17e8536d4d..f5d69e54b4c 100644 --- a/packages/firestore/src/local/memory_persistence.ts +++ b/packages/firestore/src/local/memory_persistence.ts @@ -109,9 +109,12 @@ export class MemoryPersistence implements Persistence { ) => PersistencePromise ): Promise { debug(LOG_TAG, 'Starting transaction:', action); - return transactionOperation(new MemoryPersistenceTransaction()).toPromise(); + return transactionOperation(new MemoryTransaction()).toPromise(); } } -/** Dummy class since memory persistence doesn't actually use transactions. */ -class MemoryPersistenceTransaction implements PersistenceTransaction {} +/** + * Memory persistence is not actually transactional, but future implementations + * may have transaction-scoped state. + */ +export class MemoryTransaction implements PersistenceTransaction {} diff --git a/packages/firestore/src/local/mutation_queue.ts b/packages/firestore/src/local/mutation_queue.ts index a26ac14f1a2..4db5e60fa31 100644 --- a/packages/firestore/src/local/mutation_queue.ts +++ b/packages/firestore/src/local/mutation_queue.ts @@ -17,6 +17,7 @@ import { Timestamp } from '../api/timestamp'; import { Query } from '../core/query'; import { BatchId, ProtoByteString } from '../core/types'; +import { DocumentKeySet } from '../model/collections'; import { DocumentKey } from '../model/document_key'; import { Mutation } from '../model/mutation'; import { MutationBatch } from '../model/mutation_batch'; @@ -150,23 +151,44 @@ export interface MutationQueue extends GarbageSource { * document key, so when looping through the batch you'll need to check that * the mutation itself matches the key. * + * Batches are guaranteed to be in sorted order. + * * Note that because of this requirement implementations are free to return * mutation batches that don't contain the document key at all if it's * convenient. */ // TODO(mcg): This should really return an enumerator - // also for b/32992024, all backing stores should really index by document key getAllMutationBatchesAffectingDocumentKey( transaction: PersistenceTransaction, documentKey: DocumentKey ): PersistencePromise; + /** + * Finds all mutation batches that could possibly affect the given set of + * document keys. Not all mutations in a batch will necessarily affect each + * key, so when looping through the batch you'll need to check that the + * mutation itself matches the key. + * + * Batches are guaranteed to be in sorted order. + * + * Note that because of this requirement implementations are free to return + * mutation batches that don't contain any of the document keys at all if it's + * convenient. + */ + // TODO(mcg): This should really return an enumerator + getAllMutationBatchesAffectingDocumentKeys( + transaction: PersistenceTransaction, + documentKeys: DocumentKeySet + ): PersistencePromise; + /** * Finds all mutation batches that could affect the results for the given * query. Not all mutations in a batch will necessarily affect the query, so * when looping through the batch you'll need to check that the mutation * itself matches the query. * + * Batches are guaranteed to be in sorted order. + * * Note that because of this requirement implementations are free to return * mutation batches that don't match the query at all if it's convenient. * diff --git a/packages/firestore/src/local/persistence.ts b/packages/firestore/src/local/persistence.ts index 7a9cedc17d6..0f77a0ecdbe 100644 --- a/packages/firestore/src/local/persistence.ts +++ b/packages/firestore/src/local/persistence.ts @@ -17,7 +17,6 @@ import { User } from '../auth/user'; import { MutationQueue } from './mutation_queue'; -import { PersistenceTransaction } from './persistence'; import { PersistencePromise } from './persistence_promise'; import { QueryCache } from './query_cache'; import { RemoteDocumentCache } from './remote_document_cache'; @@ -30,7 +29,7 @@ import { ClientId } from './shared_client_state'; * pass it to your callback. You then pass it to any method that operates * on persistence. */ -export interface PersistenceTransaction {} +export abstract class PersistenceTransaction {} /** * Callback type for primary state notifications. This callback can be diff --git a/packages/firestore/src/local/simple_db.ts b/packages/firestore/src/local/simple_db.ts index 91646c3eab2..e429b736341 100644 --- a/packages/firestore/src/local/simple_db.ts +++ b/packages/firestore/src/local/simple_db.ts @@ -14,14 +14,12 @@ * limitations under the License. */ -import { assert, fail } from '../util/assert'; +import { assert } from '../util/assert'; import { debug } from '../util/log'; import { AnyDuringMigration } from '../util/misc'; - import { PersistencePromise } from './persistence_promise'; import { SCHEMA_VERSION } from './indexeddb_schema'; import { Deferred } from '../util/promise'; -import { PersistenceTransaction } from './persistence'; import { Code, FirestoreError } from '../util/error'; const LOG_TAG = 'SimpleDb'; @@ -150,14 +148,10 @@ export class SimpleDb { /** Helper to get a typed SimpleDbStore from a transaction. */ static getStore( - txn: PersistenceTransaction, + txn: SimpleDbTransaction, store: string ): SimpleDbStore { - if (txn instanceof SimpleDbTransaction) { - return txn.store(store); - } else { - return fail('Invalid transaction object provided!'); - } + return txn.store(store); } constructor(private db: IDBDatabase) {} @@ -547,25 +541,19 @@ export class SimpleDbStore { } private cursor(options: IterateOptions): IDBRequest { - let direction = 'next'; + let direction: IDBCursorDirection = 'next'; if (options.reverse) { direction = 'prev'; } if (options.index) { const index = this.store.index(options.index); if (options.keysOnly) { - return index.openKeyCursor( - options.range, - direction as AnyDuringMigration - ); + return index.openKeyCursor(options.range, direction); } else { - return index.openCursor(options.range, direction as AnyDuringMigration); + return index.openCursor(options.range, direction); } } else { - return this.store.openCursor( - options.range, - direction as AnyDuringMigration - ); + return this.store.openCursor(options.range, direction); } } } diff --git a/packages/firestore/src/model/path.ts b/packages/firestore/src/model/path.ts index 4226ca7d1b2..6ca8271a7f4 100644 --- a/packages/firestore/src/model/path.ts +++ b/packages/firestore/src/model/path.ts @@ -143,6 +143,20 @@ export abstract class Path { return true; } + isImmediateParentOf(potentialChild: this): boolean { + if (this.length + 1 !== potentialChild.length) { + return false; + } + + for (let i = 0; i < this.length; i++) { + if (this.get(i) !== potentialChild.get(i)) { + return false; + } + } + + return true; + } + forEach(fn: (segment: string) => void): void { for (let i = this.offset, end = this.limit(); i < end; i++) { fn(this.segments[i]); diff --git a/packages/firestore/src/remote/persistent_stream.ts b/packages/firestore/src/remote/persistent_stream.ts index 8d20cd9195c..05139841220 100644 --- a/packages/firestore/src/remote/persistent_stream.ts +++ b/packages/firestore/src/remote/persistent_stream.ts @@ -64,21 +64,21 @@ export interface WriteRequest extends api.WriteRequest { enum PersistentStreamState { /** * The streaming RPC is not yet running and there's no error condition. - * Calling `start` will start the stream immediately without backoff. - * While in this state isStarted will return false. + * Calling start() will start the stream immediately without backoff. + * While in this state isStarted() will return false. */ Initial, /** * The stream is starting, either waiting for an auth token or for the stream - * to successfully open. While in this state, isStarted will return true but - * isOpen will return false. + * to successfully open. While in this state, isStarted() will return true but + * isOpen() will return false. */ Starting, /** * The streaming RPC is up and running. Requests and responses can flow - * freely. Both isStarted and isOpen will return true. + * freely. Both isStarted() and isOpen() will return true. */ Open, @@ -91,7 +91,7 @@ enum PersistentStreamState { /** * An in-between state after an error where the stream is waiting before * re-starting. After waiting is complete, the stream will try to open. - * While in this state isStarted() will return true but isOpen will return + * While in this state isStarted() will return true but isOpen() will return * false. */ Backoff @@ -144,12 +144,12 @@ const IDLE_TIMEOUT_MS = 60 * 1000; * * ## Starting and Stopping * - * Streaming RPCs are stateful and need to be `start`ed before messages can - * be sent and received. The PersistentStream will call the onOpen function + * Streaming RPCs are stateful and need to be start()ed before messages can + * be sent and received. The PersistentStream will call the onOpen() function * of the listener once the stream is ready to accept requests. * - * Should a `start` fail, PersistentStream will call the registered - * onClose with a FirestoreError indicating what went wrong. + * Should a start() fail, PersistentStream will call the registered onClose() + * listener with a FirestoreError indicating what went wrong. * * A PersistentStream can be started and stopped repeatedly. * @@ -173,7 +173,7 @@ export abstract class PersistentStream< */ private closeCount = 0; - private inactivityTimerPromise: CancelablePromise | null = null; + private idleTimer: CancelablePromise | null = null; private stream: Stream | null = null; protected backoff: ExponentialBackoff; @@ -196,10 +196,10 @@ export abstract class PersistentStream< } /** - * Returns true if `start` has been called and no error has occurred. True + * Returns true if start() has been called and no error has occurred. True * indicates the stream is open or in the process of opening (which * encompasses respecting backoff, getting auth tokens, and starting the - * actual RPC). Use `isOpen` to determine if the stream is open and ready for + * actual RPC). Use isOpen() to determine if the stream is open and ready for * outbound requests. */ isStarted(): boolean { @@ -211,7 +211,7 @@ export abstract class PersistentStream< } /** - * Returns true if the underlying RPC is open (the onOpen callback has been + * Returns true if the underlying RPC is open (the onOpen() listener has been * called) and the stream is ready for outbound requests. */ isOpen(): boolean { @@ -219,11 +219,11 @@ export abstract class PersistentStream< } /** - * Starts the RPC. Only allowed if isStarted returns false. The stream is - * not immediately ready for use: onOpen will be invoked when the RPC is ready - * for outbound requests, at which point isOpen will return true. + * Starts the RPC. Only allowed if isStarted() returns false. The stream is + * not immediately ready for use: onOpen() will be invoked when the RPC is + * ready for outbound requests, at which point isOpen() will return true. * - * When start returns, isStarted will return true. + * When start returns, isStarted() will return true. */ start(): void { if (this.state === PersistentStreamState.Error) { @@ -237,9 +237,9 @@ export abstract class PersistentStream< /** * Stops the RPC. This call is idempotent and allowed regardless of the - * current isStarted state. + * current isStarted() state. * - * When stop returns, isStarted and isOpen will both return false. + * When stop returns, isStarted() and isOpen() will both return false. */ stop(): void { if (this.isStarted()) { @@ -252,7 +252,7 @@ export abstract class PersistentStream< * start it. If the error warrants an immediate restart of the stream, the * sender can use this to indicate that the receiver should not back off. * - * Each error will call the onClose function. That function can decide to + * Each error will call the onClose() listener. That function can decide to * inhibit backoff if required. */ inhibitBackoff(): void { @@ -275,8 +275,8 @@ export abstract class PersistentStream< markIdle(): void { // Starts the idle time if we are in state 'Open' and are not yet already // running a timer (in which case the previous idle timeout still applies). - if (this.isOpen() && this.inactivityTimerPromise === null) { - this.inactivityTimerPromise = this.queue.enqueueAfterDelay( + if (this.isOpen() && this.idleTimer === null) { + this.idleTimer = this.queue.enqueueAfterDelay( this.idleTimerId, IDLE_TIMEOUT_MS, () => this.handleIdleCloseTimer() @@ -301,9 +301,9 @@ export abstract class PersistentStream< /** Marks the stream as active again. */ private cancelIdleCheck(): void { - if (this.inactivityTimerPromise) { - this.inactivityTimerPromise.cancel(); - this.inactivityTimerPromise = null; + if (this.idleTimer) { + this.idleTimer.cancel(); + this.idleTimer = null; } } @@ -315,7 +315,7 @@ export abstract class PersistentStream< * * sets internal stream state to 'finalState'; * * adjusts the backoff timer based on the error * - * A new stream can be opened by calling `start`. + * A new stream can be opened by calling start(). * * @param finalState the intended state of the stream after closing. * @param error the error the connection was closed with. @@ -532,9 +532,9 @@ export interface WatchStreamListener extends PersistentStreamListener { /** * A PersistentStream that implements the Listen RPC. * - * Once the Listen stream has called the openHandler, any number of listen and - * unlisten calls calls can be sent to control what changes will be sent from - * the server for ListenResponses. + * Once the Listen stream has called the onOpen() listener, any number of + * listen() and unlisten() calls can be made to control what changes will be + * sent from the server for ListenResponses. */ export class PersistentListenStream extends PersistentStream< api.ListenRequest, diff --git a/packages/firestore/src/remote/watch_change.ts b/packages/firestore/src/remote/watch_change.ts index faeef82abe5..8ccebed2c69 100644 --- a/packages/firestore/src/remote/watch_change.ts +++ b/packages/firestore/src/remote/watch_change.ts @@ -297,7 +297,7 @@ export class WatchChangeAggregator { /** Processes and adds the WatchTargetChange to the current set of changes. */ handleTargetChange(targetChange: WatchTargetChange): void { - targetChange.targetIds.forEach(targetId => { + this.forEachTarget(targetChange, targetId => { const targetState = this.ensureTargetState(targetId); switch (targetChange.state) { case WatchTargetChangeState.NoChange: @@ -352,6 +352,22 @@ export class WatchChangeAggregator { }); } + /** + * Iterates over all targetIds that the watch change applies to: either the + * targetIds explicitly listed in the change or the targetIds of all currently + * active targets. + */ + forEachTarget( + targetChange: WatchTargetChange, + fn: (targetId: TargetId) => void + ): void { + if (targetChange.targetIds.length > 0) { + targetChange.targetIds.forEach(fn); + } else { + objUtils.forEachNumber(this.targetStates, fn); + } + } + /** * Handles existence filters and synthesizes deletes for filter mismatches. * Targets that are invalidated by filter mismatches are added to diff --git a/packages/firestore/test/unit/local/eager_garbage_collector.test.ts b/packages/firestore/test/unit/local/eager_garbage_collector.test.ts index f12a579b217..af5d748a2be 100644 --- a/packages/firestore/test/unit/local/eager_garbage_collector.test.ts +++ b/packages/firestore/test/unit/local/eager_garbage_collector.test.ts @@ -65,7 +65,7 @@ describe('EagerGarbageCollector', () => { expect(referenceSet.isEmpty()).to.equal(false); referenceSet.removeReferencesForId(2); - return gc.collectGarbage(true).toPromise(); + return gc.collectGarbage(null).toPromise(); }) .then(garbage => { expectSetToEqual(garbage, [key3]); diff --git a/packages/firestore/test/unit/local/mutation_queue.test.ts b/packages/firestore/test/unit/local/mutation_queue.test.ts index afd672d9245..08c4e4abd55 100644 --- a/packages/firestore/test/unit/local/mutation_queue.test.ts +++ b/packages/firestore/test/unit/local/mutation_queue.test.ts @@ -20,6 +20,7 @@ import { Query } from '../../../src/core/query'; import { EagerGarbageCollector } from '../../../src/local/eager_garbage_collector'; import { IndexedDbPersistence } from '../../../src/local/indexeddb_persistence'; import { Persistence } from '../../../src/local/persistence'; +import { documentKeySet } from '../../../src/model/collections'; import { BATCHID_UNKNOWN, MutationBatch @@ -40,7 +41,6 @@ import { addEqualityMatcher } from '../../util/equality_matcher'; let persistence: Persistence; let mutationQueue: TestMutationQueue; - describe('MemoryMutationQueue', () => { beforeEach(() => { return persistenceHelpers.testMemoryPersistence().then(p => { @@ -73,7 +73,7 @@ describe('IndexedDbMutationQueue', () => { function genericMutationQueueTests(): void { addEqualityMatcher(); - beforeEach(() => { + beforeEach(async () => { mutationQueue = new TestMutationQueue( persistence, persistence.getMutationQueue(new User('user')) @@ -83,6 +83,58 @@ function genericMutationQueueTests(): void { afterEach(() => persistence.shutdown(/* deleteData= */ true)); + /** + * Creates a new MutationBatch with the next batch ID and a set of dummy + * mutations. + */ + function addMutationBatch(key?: string): Promise { + let keyStr = key; + if (keyStr === undefined) { + keyStr = 'foo/bar'; + } + const mutation = setMutation(keyStr, { a: 1 }); + return mutationQueue.addMutationBatch([mutation]); + } + + /** + * Creates an array of batches containing count dummy MutationBatches. Each + * has a different batchID. + */ + async function createBatches(count: number): Promise { + const batches = []; + for (let i = 0; i < count; i++) { + const batch = await addMutationBatch(); + batches.push(batch); + } + return batches; + } + + /** + * Removes entries from from the given a batches and returns them. + * + * @param holes An array of indexes in the batches array; in increasing order. + * Indexes are relative to the original state of the batches array, not any + * intermediate state that might occur. + * @param batches The array to mutate, removing entries from it. + * @return A new array containing all the entries that were removed from + * batches. + */ + async function makeHolesInBatches( + holes: number[], + batches: MutationBatch[] + ): Promise { + const removed = []; + for (let i = 0; i < holes.length; i++) { + const index = holes[i] - i; + const batch = batches[index]; + await mutationQueue.removeMutationBatches([batch]); + + batches.splice(index, 1); + removed.push(batch); + } + return removed; + } + it('can count batches', async () => { expect(await mutationQueue.countBatches()).to.equal(0); expect(await mutationQueue.checkEmpty()).to.equal(true); @@ -329,7 +381,30 @@ function genericMutationQueueTests(): void { const matches = await mutationQueue.getAllMutationBatchesAffectingDocumentKey( key('foo/bar') ); - expect(matches.length).to.deep.equal(expected.length); + expectEqualArrays(matches, expected); + }); + + it('can getAllMutationBatchesAffectingDocumentKeys()', async () => { + const mutations = [ + setMutation('fob/bar', { a: 1 }), + setMutation('foo/bar', { a: 1 }), + patchMutation('foo/bar', { b: 1 }), + setMutation('foo/bar/suffix/key', { a: 1 }), + setMutation('foo/baz', { a: 1 }), + setMutation('food/bar', { a: 1 }) + ]; + // Store all the mutations. + const batches: MutationBatch[] = []; + for (const mutation of mutations) { + const batch = await mutationQueue.addMutationBatch([mutation]); + batches.push(batch); + } + const expected = [batches[1], batches[2], batches[4]]; + const matches = await mutationQueue.getAllMutationBatchesAffectingDocumentKeys( + documentKeySet() + .add(key('foo/bar')) + .add(key('foo/baz')) + ); expectEqualArrays(matches, expected); }); @@ -498,55 +573,3 @@ function genericMutationQueueTests(): void { expect(await mutationQueue.checkEmpty()).to.equal(true); }); } - -/** - * Creates a new MutationBatch with the next batch ID and a set of dummy - * mutations. - */ -function addMutationBatch(key?: string): Promise { - let keyStr = key; - if (keyStr === undefined) { - keyStr = 'foo/bar'; - } - const mutation = setMutation(keyStr, { a: 1 }); - return mutationQueue.addMutationBatch([mutation]); -} - -/** - * Creates an array of batches containing count dummy MutationBatches. Each - * has a different batchID. - */ -async function createBatches(count: number): Promise { - const batches = []; - for (let i = 0; i < count; i++) { - const batch = await addMutationBatch(); - batches.push(batch); - } - return batches; -} - -/** - * Removes entries from from the given a batches and returns them. - * - * @param holes An array of indexes in the batches array; in increasing order. - * Indexes are relative to the original state of the batches array, not any - * intermediate state that might occur. - * @param batches The array to mutate, removing entries from it. - * @return A new array containing all the entries that were removed from - * batches. - */ -async function makeHolesInBatches( - holes: number[], - batches: MutationBatch[] -): Promise { - const removed = []; - for (let i = 0; i < holes.length; i++) { - const index = holes[i] - i; - const batch = batches[index]; - await mutationQueue.removeMutationBatches([batch]); - - batches.splice(index, 1); - removed.push(batch); - } - return removed; -} diff --git a/packages/firestore/test/unit/local/query_cache.test.ts b/packages/firestore/test/unit/local/query_cache.test.ts index 2a3ede52eba..bb4f4f0da03 100644 --- a/packages/firestore/test/unit/local/query_cache.test.ts +++ b/packages/firestore/test/unit/local/query_cache.test.ts @@ -35,17 +35,8 @@ import * as persistenceHelpers from './persistence_test_helpers'; import { TestGarbageCollector } from './test_garbage_collector'; import { TestQueryCache } from './test_query_cache'; -let persistence: Persistence; -let cache: TestQueryCache; - describe('MemoryQueryCache', () => { - beforeEach(() => { - return persistenceHelpers.testMemoryPersistence().then(p => { - persistence = p; - }); - }); - - genericQueryCacheTests(); + genericQueryCacheTests(persistenceHelpers.testMemoryPersistence); }); describe('IndexedDbQueryCache', () => { @@ -54,22 +45,22 @@ describe('IndexedDbQueryCache', () => { return; } - beforeEach(() => { - return persistenceHelpers.testIndexedDbPersistence().then(p => { - persistence = p; - }); + let persistencePromise: Promise; + beforeEach(async () => { + persistencePromise = persistenceHelpers.testIndexedDbPersistence(); }); - afterEach(() => persistence.shutdown(/* deleteData= */ true)); - - genericQueryCacheTests(); + genericQueryCacheTests(() => persistencePromise); }); /** * Defines the set of tests to run against both query cache implementations. */ -function genericQueryCacheTests(): void { +function genericQueryCacheTests( + persistencePromise: () => Promise +): void { addEqualityMatcher(); + let cache: TestQueryCache; const QUERY_ROOMS = Query.atPath(path('rooms')); const QUERY_HALLS = Query.atPath(path('halls')); @@ -98,11 +89,17 @@ function genericQueryCacheTests(): void { ); } + let persistence: Persistence; beforeEach(async () => { + persistence = await persistencePromise(); cache = new TestQueryCache(persistence, persistence.getQueryCache()); await cache.start(); }); + afterEach(async () => { + persistence.shutdown(/* deleteData= */ true); + }); + it('returns null for query not in cache', () => { return cache.getQueryData(QUERY_ROOMS).then(queryData => { expect(queryData).to.equal(null); diff --git a/packages/firestore/test/unit/local/remote_document_cache.test.ts b/packages/firestore/test/unit/local/remote_document_cache.test.ts index 59491cc0bac..bc19e796233 100644 --- a/packages/firestore/test/unit/local/remote_document_cache.test.ts +++ b/packages/firestore/test/unit/local/remote_document_cache.test.ts @@ -32,17 +32,8 @@ import * as persistenceHelpers from './persistence_test_helpers'; import { TestRemoteDocumentCache } from './test_remote_document_cache'; import { MaybeDocumentMap } from '../../../src/model/collections'; -let persistence: Persistence; -let cache: TestRemoteDocumentCache; - describe('MemoryRemoteDocumentCache', () => { - beforeEach(() => { - return persistenceHelpers.testMemoryPersistence().then(p => { - persistence = p; - }); - }); - - genericRemoteDocumentCacheTests(); + genericRemoteDocumentCacheTests(persistenceHelpers.testMemoryPersistence); }); describe('IndexedDbRemoteDocumentCache', () => { @@ -51,31 +42,27 @@ describe('IndexedDbRemoteDocumentCache', () => { return; } - beforeEach(() => { - // We turn on `synchronizeTabs` to test the document change log. - return persistenceHelpers - .testIndexedDbPersistence(/* synchronizeTabs= */ true) - .then(p => { - persistence = p; - }); - }); - - afterEach(() => persistence.shutdown(/* deleteData= */ true)); - - genericRemoteDocumentCacheTests(); + genericRemoteDocumentCacheTests(() => + persistenceHelpers.testIndexedDbPersistence(/* synchronizeTabs= */ true) + ); }); /** * Defines the set of tests to run against both remote document cache * implementations. */ -function genericRemoteDocumentCacheTests(): void { +function genericRemoteDocumentCacheTests( + persistencePromise: () => Promise +): void { // Helpers for use throughout tests. const DOC_PATH = 'a/b'; const LONG_DOC_PATH = 'a/b/c/d/e/f'; const DOC_DATA = { a: 1, b: 2 }; const VERSION = 42; + let persistence: Persistence; + let cache: TestRemoteDocumentCache; + function setAndReadDocument(doc: MaybeDocument): Promise { return cache .addEntries([doc]) @@ -105,15 +92,16 @@ function genericRemoteDocumentCacheTests(): void { }); } - beforeEach(() => { + beforeEach(async () => { + persistence = await persistencePromise(); cache = new TestRemoteDocumentCache( persistence, persistence.getRemoteDocumentCache() ); - - return cache.start(); }); + afterEach(() => persistence.shutdown(/* deleteData= */ true)); + it('returns null for document not in cache', () => { return cache.getEntry(key(DOC_PATH)).then(doc => { expect(doc).to.equal(null); diff --git a/packages/firestore/test/unit/local/remote_document_change_buffer.test.ts b/packages/firestore/test/unit/local/remote_document_change_buffer.test.ts index e19af434519..ad95e86620c 100644 --- a/packages/firestore/test/unit/local/remote_document_change_buffer.test.ts +++ b/packages/firestore/test/unit/local/remote_document_change_buffer.test.ts @@ -16,7 +16,6 @@ import { expect } from 'chai'; import { IndexedDbPersistence } from '../../../src/local/indexeddb_persistence'; -import { Persistence } from '../../../src/local/persistence'; import { RemoteDocumentChangeBuffer } from '../../../src/local/remote_document_change_buffer'; import { deletedDoc, doc, expectEqual, key } from '../../util/helpers'; @@ -24,7 +23,7 @@ import { testIndexedDbPersistence } from './persistence_test_helpers'; import { TestRemoteDocumentCache } from './test_remote_document_cache'; import { TestRemoteDocumentChangeBuffer } from './test_remote_document_change_buffer'; -let persistence: Persistence; +let persistence: IndexedDbPersistence; let cache: TestRemoteDocumentCache; let buffer: TestRemoteDocumentChangeBuffer; const INITIAL_DOC = doc('coll/a', 42, { test: 'data' }); diff --git a/packages/firestore/test/unit/local/test_mutation_queue.ts b/packages/firestore/test/unit/local/test_mutation_queue.ts index 12ca96614ea..a3094dff5d0 100644 --- a/packages/firestore/test/unit/local/test_mutation_queue.ts +++ b/packages/firestore/test/unit/local/test_mutation_queue.ts @@ -127,7 +127,7 @@ export class TestMutationQueue { ): Promise { return this.persistence.runTransaction( 'getAllMutationBatchesThroughBatchId', - true, + true, txn => { return this.queue.getAllMutationBatchesThroughBatchId(txn, batchId); } @@ -149,6 +149,21 @@ export class TestMutationQueue { ); } + getAllMutationBatchesAffectingDocumentKeys( + documentKeys: DocumentKeySet + ): Promise { + return this.persistence.runTransaction( + 'getAllMutationBatchesAffectingDocumentKeys', + true, + txn => { + return this.queue.getAllMutationBatchesAffectingDocumentKeys( + txn, + documentKeys + ); + } + ); + } + getAllMutationBatchesAffectingQuery(query: Query): Promise { return this.persistence.runTransaction( 'getAllMutationBatchesAffectingQuery', diff --git a/packages/firestore/test/unit/specs/limbo_spec.test.ts b/packages/firestore/test/unit/specs/limbo_spec.test.ts index 90f53cc6a6a..6ba4f233137 100644 --- a/packages/firestore/test/unit/specs/limbo_spec.test.ts +++ b/packages/firestore/test/unit/specs/limbo_spec.test.ts @@ -362,34 +362,30 @@ describeSpec('Limbo Documents:', [], () => { const docB = doc('collection/b', 1001, { key: 'b' }); const deletedDocB = deletedDoc('collection/b', 1005); - return ( - client(0, false) - .expectPrimaryState(true) - .client(1) - .userListens(query) - .client(0) - .expectListen(query) - .watchAcksFull(query, 1002, docA, docB) - .client(1) - .expectEvents(query, { added: [docA, docB] }) - .client(0) - .watchRemovesDoc(docB.key, query) - .watchSnapshots(1003) - .expectLimboDocs(docB.key) - .shutdown() - .client(1) - .expectEvents(query, { fromCache: true }) - .runTimer(TimerId.ClientMetadataRefresh) - .expectPrimaryState(true) - // TODO(37254270): This should be 'resume-token-1003' from the last - // global snapshot. - .expectListen(query, 'resume-token-1002') - .watchAcksFull(query, 1004) - .expectLimboDocs(docB.key) - .ackLimbo(1005, deletedDocB) - .expectLimboDocs() - .expectEvents(query, { removed: [docB] }) - ); + return client(0, false) + .expectPrimaryState(true) + .client(1) + .userListens(query) + .client(0) + .expectListen(query) + .watchAcksFull(query, 1 * 1e6, docA, docB) + .client(1) + .expectEvents(query, { added: [docA, docB] }) + .client(0) + .watchRemovesDoc(docB.key, query) + .watchSnapshots(2 * 1e6) + .expectLimboDocs(docB.key) + .shutdown() + .client(1) + .expectEvents(query, { fromCache: true }) + .runTimer(TimerId.ClientMetadataRefresh) + .expectPrimaryState(true) + .expectListen(query, 'resume-token-1000000') + .watchAcksFull(query, 3 * 1e6) + .expectLimboDocs(docB.key) + .ackLimbo(4 * 1e6, deletedDocB) + .expectLimboDocs() + .expectEvents(query, { removed: [docB] }); } ); @@ -404,41 +400,37 @@ describeSpec('Limbo Documents:', [], () => { const deletedDocB = deletedDoc('collection/b', 1006); const deletedDocC = deletedDoc('collection/c', 1008); - return ( - client(0, false) - .expectPrimaryState(true) - .userListens(query) - .watchAcksFull(query, 1002, docA, docB, docC) - .expectEvents(query, { added: [docA, docB, docC] }) - .watchRemovesDoc(docB.key, query) - .watchRemovesDoc(docC.key, query) - .watchSnapshots(1003) - .expectEvents(query, { fromCache: true }) - .expectLimboDocs(docB.key, docC.key) - .client(1) - .stealPrimaryLease() - .client(0) - .runTimer(TimerId.ClientMetadataRefresh) - .expectPrimaryState(false) - .expectLimboDocs() - .client(1) - // TODO(37254270): This should be 'resume-token-1003' from the last - // global snapshot. - .expectListen(query, 'resume-token-1002') - .watchAcksFull(query, 1005) - .expectLimboDocs(docB.key, docC.key) - .ackLimbo(1006, deletedDocB) - .expectLimboDocs(docC.key) - .client(0) - .expectEvents(query, { removed: [docB], fromCache: true }) - .stealPrimaryLease() - .expectListen(query, 'resume-token-1005') - .watchAcksFull(query, 1007) - .expectLimboDocs(docC.key) - .ackLimbo(1007, deletedDocC) - .expectLimboDocs() - .expectEvents(query, { removed: [docC] }) - ); + return client(0, false) + .expectPrimaryState(true) + .userListens(query) + .watchAcksFull(query, 1 * 1e6, docA, docB, docC) + .expectEvents(query, { added: [docA, docB, docC] }) + .watchRemovesDoc(docB.key, query) + .watchRemovesDoc(docC.key, query) + .watchSnapshots(2 * 1e6) + .expectEvents(query, { fromCache: true }) + .expectLimboDocs(docB.key, docC.key) + .client(1) + .stealPrimaryLease() + .client(0) + .runTimer(TimerId.ClientMetadataRefresh) + .expectPrimaryState(false) + .expectLimboDocs() + .client(1) + .expectListen(query, 'resume-token-1000000') + .watchAcksFull(query, 3 * 1e6) + .expectLimboDocs(docB.key, docC.key) + .ackLimbo(3 * 1e6, deletedDocB) + .expectLimboDocs(docC.key) + .client(0) + .expectEvents(query, { removed: [docB], fromCache: true }) + .stealPrimaryLease() + .expectListen(query, 'resume-token-1000000') + .watchAcksFull(query, 5 * 1e6) + .expectLimboDocs(docC.key) + .ackLimbo(6 * 1e6, deletedDocC) + .expectLimboDocs() + .expectEvents(query, { removed: [docC] }); } ); }); diff --git a/packages/firestore/test/unit/specs/listen_spec.test.ts b/packages/firestore/test/unit/specs/listen_spec.test.ts index 4f12ac1940c..a1864515284 100644 --- a/packages/firestore/test/unit/specs/listen_spec.test.ts +++ b/packages/firestore/test/unit/specs/listen_spec.test.ts @@ -544,6 +544,90 @@ describeSpec('Listens:', [], () => { .expectEvents(query, {}); }); + specTest('Persists global resume tokens on unlisten', [], () => { + const query = Query.atPath(path('collection')); + const docA = doc('collection/a', 1000, { key: 'a' }); + + return ( + spec() + .withGCEnabled(false) + .userListens(query) + .watchAcksFull(query, 1000, docA) + .expectEvents(query, { added: [docA] }) + + // Some time later, watch sends an updated resume token and the user stops + // listening. + .watchSnapshots(2000, [], 'resume-token-2000') + .userUnlistens(query) + .watchRemoves(query) + + .userListens(query, 'resume-token-2000') + .expectEvents(query, { added: [docA], fromCache: true }) + .watchAcks(query) + .watchCurrents(query, 'resume-token-3000') + .watchSnapshots(3000) + .expectEvents(query, { fromCache: false }) + ); + }); + + specTest('Omits global resume tokens for a short while', [], () => { + const query = Query.atPath(path('collection')); + const docA = doc('collection/a', 1000, { key: 'a' }); + + return ( + spec() + .withGCEnabled(false) + .userListens(query) + .watchAcksFull(query, 1000, docA) + .expectEvents(query, { added: [docA] }) + + // One millisecond later, watch sends an updated resume token but the + // user doesn't manage to unlisten before restart. + .watchSnapshots(2000, [], 'resume-token-2000') + .restart() + + .userListens(query, 'resume-token-1000') + .expectEvents(query, { added: [docA], fromCache: true }) + .watchAcks(query) + .watchCurrents(query, 'resume-token-3000') + .watchSnapshots(3000) + .expectEvents(query, { fromCache: false }) + ); + }); + + specTest( + 'Persists global resume tokens if the snapshot is old enough', + [], + () => { + const initialVersion = 1000; + const minutesLater = 5 * 60 * 1e6 + initialVersion; + const evenLater = 1000 + minutesLater; + + const query = Query.atPath(path('collection')); + const docA = doc('collection/a', initialVersion, { key: 'a' }); + + return ( + spec() + .withGCEnabled(false) + .userListens(query) + .watchAcksFull(query, initialVersion, docA) + .expectEvents(query, { added: [docA] }) + + // 5 minutes later, watch sends an updated resume token but the user + // doesn't manage to unlisten before restart. + .watchSnapshots(minutesLater, [], 'resume-token-minutes-later') + .restart() + + .userListens(query, 'resume-token-minutes-later') + .expectEvents(query, { added: [docA], fromCache: true }) + .watchAcks(query) + .watchCurrents(query, 'resume-token-even-later') + .watchSnapshots(evenLater) + .expectEvents(query, { fromCache: false }) + ); + } + ); + specTest('Query is executed by primary client', ['multi-client'], () => { const query = Query.atPath(path('collection')); const docA = doc('collection/a', 1000, { key: 'a' }); diff --git a/packages/firestore/test/unit/specs/spec_test_runner.ts b/packages/firestore/test/unit/specs/spec_test_runner.ts index b983650137e..ca1a0217d00 100644 --- a/packages/firestore/test/unit/specs/spec_test_runner.ts +++ b/packages/firestore/test/unit/specs/spec_test_runner.ts @@ -1146,7 +1146,6 @@ class MemoryTestRunner extends TestRunner { */ class IndexedDbTestRunner extends TestRunner { static TEST_DB_NAME = 'firestore/[DEFAULT]/specs'; - protected getSharedClientState(): SharedClientState { return new WebStorageSharedClientState( this.queue,