diff --git a/packages/firestore/src/core/firestore_client.ts b/packages/firestore/src/core/firestore_client.ts index e2b2357a188..747fe52f673 100644 --- a/packages/firestore/src/core/firestore_client.ts +++ b/packages/firestore/src/core/firestore_client.ts @@ -53,6 +53,7 @@ import { OnlineState, OnlineStateSource } from './types'; import { ViewSnapshot } from './view_snapshot'; const LOG_TAG = 'FirestoreClient'; +const MAX_CONCURRENT_LIMBO_RESOLUTIONS = 100; /** DOMException error code constants. */ const DOM_EXCEPTION_INVALID_STATE = 11; @@ -369,7 +370,8 @@ export class FirestoreClient { this.localStore, this.remoteStore, this.sharedClientState, - user + user, + MAX_CONCURRENT_LIMBO_RESOLUTIONS ); this.sharedClientState.onlineStateHandler = sharedClientStateOnlineStateChangedHandler; diff --git a/packages/firestore/src/core/sync_engine.ts b/packages/firestore/src/core/sync_engine.ts index 7e9119234a0..71a69549df0 100644 --- a/packages/firestore/src/core/sync_engine.ts +++ b/packages/firestore/src/core/sync_engine.ts @@ -148,10 +148,23 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { q.canonicalId() ); private queriesByTarget: { [targetId: number]: Query[] } = {}; - private limboTargetsByKey = new SortedMap( + /** + * The keys of documents that are in limbo for which we haven't yet started a + * limbo resolution query. + */ + private enqueuedLimboResolutions: DocumentKey[] = []; + /** + * Keeps track of the target ID for each document that is in limbo with an + * active target. + */ + private activeLimboTargetsByKey = new SortedMap( DocumentKey.comparator ); - private limboResolutionsByTarget: { + /** + * Keeps track of the information about an active limbo resolution for each + * active target ID that was started for the purpose of limbo resolution. + */ + private activeLimboResolutionsByTarget: { [targetId: number]: LimboResolution; } = {}; private limboDocumentRefs = new ReferenceSet(); @@ -174,7 +187,8 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { private remoteStore: RemoteStore, // PORTING NOTE: Manages state synchronization in multi-tab environments. private sharedClientState: SharedClientState, - private currentUser: User + private currentUser: User, + private maxConcurrentLimboResolutions: number ) {} // Only used for testing. @@ -400,7 +414,9 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { const changes = await this.localStore.applyRemoteEvent(remoteEvent); // Update `receivedDocument` as appropriate for any limbo targets. objUtils.forEach(remoteEvent.targetChanges, (targetId, targetChange) => { - const limboResolution = this.limboResolutionsByTarget[Number(targetId)]; + const limboResolution = this.activeLimboResolutionsByTarget[ + Number(targetId) + ]; if (limboResolution) { // Since this is a limbo resolution lookup, it's for a single document // and it could be added, modified, or removed, but not a combination. @@ -479,13 +495,16 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { // PORTING NOTE: Multi-tab only. this.sharedClientState.updateQueryState(targetId, 'rejected', err); - const limboResolution = this.limboResolutionsByTarget[targetId]; + const limboResolution = this.activeLimboResolutionsByTarget[targetId]; const limboKey = limboResolution && limboResolution.key; if (limboKey) { // Since this query failed, we won't want to manually unlisten to it. // So go ahead and remove it from bookkeeping. - this.limboTargetsByKey = this.limboTargetsByKey.remove(limboKey); - delete this.limboResolutionsByTarget[targetId]; + this.activeLimboTargetsByKey = this.activeLimboTargetsByKey.remove( + limboKey + ); + delete this.activeLimboResolutionsByTarget[targetId]; + this.pumpEnqueuedLimboResolutions(); // TODO(klimt): We really only should do the following on permission // denied errors, but we don't have the cause code here. @@ -731,15 +750,16 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { private removeLimboTarget(key: DocumentKey): void { // It's possible that the target already got removed because the query failed. In that case, // the key won't exist in `limboTargetsByKey`. Only do the cleanup if we still have the target. - const limboTargetId = this.limboTargetsByKey.get(key); + const limboTargetId = this.activeLimboTargetsByKey.get(key); if (limboTargetId === null) { // This target already got removed, because the query failed. return; } this.remoteStore.unlisten(limboTargetId); - this.limboTargetsByKey = this.limboTargetsByKey.remove(key); - delete this.limboResolutionsByTarget[limboTargetId]; + this.activeLimboTargetsByKey = this.activeLimboTargetsByKey.remove(key); + delete this.activeLimboResolutionsByTarget[limboTargetId]; + this.pumpEnqueuedLimboResolutions(); } private updateTrackedLimbos( @@ -768,29 +788,54 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { private trackLimboChange(limboChange: AddedLimboDocument): void { const key = limboChange.key; - if (!this.limboTargetsByKey.get(key)) { + if (!this.activeLimboTargetsByKey.get(key)) { logDebug(LOG_TAG, 'New document in limbo: ' + key); + this.enqueuedLimboResolutions.push(key); + this.pumpEnqueuedLimboResolutions(); + } + } + + /** + * Starts listens for documents in limbo that are enqueued for resolution, + * subject to a maximum number of concurrent resolutions. + * + * Without bounding the number of concurrent resolutions, the server can fail + * with "resource exhausted" errors which can lead to pathological client + * behavior as seen in https://github.com/firebase/firebase-js-sdk/issues/2683. + */ + private pumpEnqueuedLimboResolutions(): void { + while ( + this.enqueuedLimboResolutions.length > 0 && + this.activeLimboTargetsByKey.size < this.maxConcurrentLimboResolutions + ) { + const key = this.enqueuedLimboResolutions.shift()!; const limboTargetId = this.limboTargetIdGenerator.next(); - const query = Query.atPath(key.path); - this.limboResolutionsByTarget[limboTargetId] = new LimboResolution(key); + this.activeLimboResolutionsByTarget[limboTargetId] = new LimboResolution( + key + ); + this.activeLimboTargetsByKey = this.activeLimboTargetsByKey.insert( + key, + limboTargetId + ); this.remoteStore.listen( new TargetData( - query.toTarget(), + Query.atPath(key.path).toTarget(), limboTargetId, TargetPurpose.LimboResolution, ListenSequence.INVALID ) ); - this.limboTargetsByKey = this.limboTargetsByKey.insert( - key, - limboTargetId - ); } } // Visible for testing - currentLimboDocs(): SortedMap { - return this.limboTargetsByKey; + activeLimboDocumentResolutions(): SortedMap { + return this.activeLimboTargetsByKey; + } + + // Visible for testing + enqueuedLimboDocumentResolutions(): DocumentKey[] { + return this.enqueuedLimboResolutions; } private async emitNewSnapsAndNotifyLocalStore( @@ -936,12 +981,12 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { // PORTING NOTE: Multi-tab only. private resetLimboDocuments(): void { - objUtils.forEachNumber(this.limboResolutionsByTarget, targetId => { + objUtils.forEachNumber(this.activeLimboResolutionsByTarget, targetId => { this.remoteStore.unlisten(targetId); }); this.limboDocumentRefs.removeAllReferences(); - this.limboResolutionsByTarget = []; - this.limboTargetsByKey = new SortedMap( + this.activeLimboResolutionsByTarget = []; + this.activeLimboTargetsByKey = new SortedMap( DocumentKey.comparator ); } @@ -1138,7 +1183,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { } getRemoteKeysForTarget(targetId: TargetId): DocumentKeySet { - const limboResolution = this.limboResolutionsByTarget[targetId]; + const limboResolution = this.activeLimboResolutionsByTarget[targetId]; if (limboResolution && limboResolution.receivedDocument) { return documentKeySet().add(limboResolution.key); } else { diff --git a/packages/firestore/test/unit/specs/limbo_spec.test.ts b/packages/firestore/test/unit/specs/limbo_spec.test.ts index 82ee4fae344..322549dad07 100644 --- a/packages/firestore/test/unit/specs/limbo_spec.test.ts +++ b/packages/firestore/test/unit/specs/limbo_spec.test.ts @@ -1,6 +1,6 @@ /** * @license - * Copyright 2017 Google Inc. + * Copyright 2017 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -639,4 +639,293 @@ describeSpec('Limbo Documents:', [], () => { ); } ); + + specTest( + 'Limbo resolution throttling with all results at once from watch', + // TODO(dconeybe) Remove the 'no-*' tags as these platforms implement limbo + // resolution throttling. + ['no-android', 'no-ios'], + () => { + const query = Query.atPath(path('collection')); + const doc1 = doc('collection/a', 1000, { key: 'a' }); + const doc2 = doc('collection/b', 1000, { key: 'b' }); + const doc3 = doc('collection/c', 1000, { key: 'c' }); + const doc4 = doc('collection/d', 1000, { key: 'd' }); + const doc5 = doc('collection/e', 1000, { key: 'e' }); + const limboQuery1 = Query.atPath(doc1.key.path); + const limboQuery2 = Query.atPath(doc2.key.path); + const limboQuery3 = Query.atPath(doc3.key.path); + const limboQuery4 = Query.atPath(doc4.key.path); + const limboQuery5 = Query.atPath(doc5.key.path); + + // Simulate Watch sending us a reset if another client deletes the + // documents that match our query. Verify that limbo throttling works + // when Watch resolves the limbo documents listens in a single snapshot. + return ( + spec() + .withMaxConcurrentLimboResolutions(2) + .userListens(query) + .watchAcksFull(query, 1000, doc1, doc2, doc3, doc4, doc5) + .expectEvents(query, { + added: [doc1, doc2, doc3, doc4, doc5] + }) + .watchResets(query) + .watchSends({ affects: [query] }) + .watchCurrents(query, 'resume-token-2000') + .watchSnapshots(2000) + .expectLimboDocs(doc1.key, doc2.key) + .expectEnqueuedLimboDocs(doc3.key, doc4.key, doc5.key) + // Limbo document causes query to be "inconsistent" + .expectEvents(query, { fromCache: true }) + .watchAcks(limboQuery1) + .watchAcks(limboQuery2) + // Resolve limbo documents doc1 and doc2 in a single snapshot. + .watchCurrents(limboQuery1, 'resume-token-2001') + .watchCurrents(limboQuery2, 'resume-token-2001') + .watchSnapshots(2001) + .expectEvents(query, { + removed: [doc1, doc2], + fromCache: true + }) + // Start the second round of limbo resolutions. + .expectLimboDocs(doc3.key, doc4.key) + .expectEnqueuedLimboDocs(doc5.key) + .watchAcks(limboQuery3) + .watchAcks(limboQuery4) + // Resolve limbo documents doc3 and doc4 in a single snapshot. + .watchCurrents(limboQuery3, 'resume-token-2002') + .watchCurrents(limboQuery4, 'resume-token-2002') + .watchSnapshots(2002) + .expectEvents(query, { + removed: [doc3, doc4], + fromCache: true + }) + // Start the final round of limbo resolutions. + .expectLimboDocs(doc5.key) + .expectEnqueuedLimboDocs() + .watchAcks(limboQuery5) + // Resolve limbo document doc5. + .watchCurrents(limboQuery5, 'resume-token-2003') + .watchSnapshots(2003) + .expectEvents(query, { + removed: [doc5], + fromCache: false + }) + .expectLimboDocs() + .expectEnqueuedLimboDocs() + ); + } + ); + + specTest( + 'Limbo resolution throttling with results one at a time from watch', + // TODO(dconeybe) Remove the 'no-*' tags as these platforms implement limbo + // resolution throttling. + ['no-android', 'no-ios'], + () => { + const query = Query.atPath(path('collection')); + const doc1 = doc('collection/a', 1000, { key: 'a' }); + const doc2 = doc('collection/b', 1000, { key: 'b' }); + const doc3 = doc('collection/c', 1000, { key: 'c' }); + const doc4 = doc('collection/d', 1000, { key: 'd' }); + const doc5 = doc('collection/e', 1000, { key: 'e' }); + const limboQuery1 = Query.atPath(doc1.key.path); + const limboQuery2 = Query.atPath(doc2.key.path); + const limboQuery3 = Query.atPath(doc3.key.path); + const limboQuery4 = Query.atPath(doc4.key.path); + const limboQuery5 = Query.atPath(doc5.key.path); + + // Simulate Watch sending us a reset if another client deletes the + // documents that match our query. Verify that limbo throttling works + // when Watch resolves the limbo documents listens one per snapshot. + return ( + spec() + .withMaxConcurrentLimboResolutions(2) + .userListens(query) + .watchAcksFull(query, 1000, doc1, doc2, doc3, doc4, doc5) + .expectEvents(query, { + added: [doc1, doc2, doc3, doc4, doc5] + }) + .watchResets(query) + .watchSends({ affects: [query] }) + .watchCurrents(query, 'resume-token-2000') + .watchSnapshots(2000) + .expectLimboDocs(doc1.key, doc2.key) + .expectEnqueuedLimboDocs(doc3.key, doc4.key, doc5.key) + // Limbo document causes query to be "inconsistent" + .expectEvents(query, { fromCache: true }) + .watchAcks(limboQuery1) + .watchAcks(limboQuery2) + // Resolve the limbo documents doc1 in its own snapshot. + .watchCurrents(limboQuery1, 'resume-token-2001') + .watchSnapshots(2001) + .expectEvents(query, { removed: [doc1], fromCache: true }) + // Start the next limbo resolution since one has finished. + .expectLimboDocs(doc2.key, doc3.key) + .expectEnqueuedLimboDocs(doc4.key, doc5.key) + .watchAcks(limboQuery3) + // Resolve the limbo documents doc2 in its own snapshot. + .watchCurrents(limboQuery2, 'resume-token-2002') + .watchSnapshots(2002) + .expectEvents(query, { removed: [doc2], fromCache: true }) + // Start the next limbo resolution since one has finished. + .expectLimboDocs(doc3.key, doc4.key) + .expectEnqueuedLimboDocs(doc5.key) + .watchAcks(limboQuery4) + // Resolve the limbo documents doc3 in its own snapshot. + .watchCurrents(limboQuery3, 'resume-token-2003') + .watchSnapshots(2003) + .expectEvents(query, { removed: [doc3], fromCache: true }) + // Start the next limbo resolution since one has finished. + .expectLimboDocs(doc4.key, doc5.key) + .expectEnqueuedLimboDocs() + .watchAcks(limboQuery5) + // Resolve the limbo documents doc4 in its own snapshot. + .watchCurrents(limboQuery4, 'resume-token-2004') + .watchSnapshots(2004) + .expectEvents(query, { removed: [doc4], fromCache: true }) + // The final limbo document listen is already active; resolve it. + .expectLimboDocs(doc5.key) + .expectEnqueuedLimboDocs() + // Resolve the limbo documents doc5 in its own snapshot. + .watchCurrents(limboQuery5, 'resume-token-2005') + .watchSnapshots(2005) + .expectEvents(query, { removed: [doc5], fromCache: false }) + .expectLimboDocs() + .expectEnqueuedLimboDocs() + ); + } + ); + + specTest( + 'Limbo resolution throttling when a limbo listen is rejected.', + // TODO(dconeybe) Remove the 'no-*' tags as these platforms implement limbo + // resolution throttling. + ['no-android', 'no-ios'], + () => { + const query = Query.atPath(path('collection')); + const doc1 = doc('collection/a', 1000, { key: 'a' }); + const doc2 = doc('collection/b', 1000, { key: 'b' }); + const limboQuery1 = Query.atPath(doc1.key.path); + const limboQuery2 = Query.atPath(doc2.key.path); + + // Simulate Watch sending us a reset if another client deletes the + // documents that match our query. Verify that limbo throttling works + // when Watch rejects the listens for limbo resolution. + return ( + spec() + .withMaxConcurrentLimboResolutions(1) + .userListens(query) + .watchAcksFull(query, 1000, doc1, doc2) + .expectEvents(query, { added: [doc1, doc2] }) + .watchResets(query) + .watchSends({ affects: [query] }) + .watchCurrents(query, 'resume-token-1001') + .watchSnapshots(2000) + .expectLimboDocs(doc1.key) + .expectEnqueuedLimboDocs(doc2.key) + // Limbo document causes query to be "inconsistent" + .expectEvents(query, { fromCache: true }) + .watchRemoves( + limboQuery1, + new RpcError(Code.RESOURCE_EXHAUSTED, 'Resource exhausted') + ) + // When a limbo listen gets rejected, we assume that it was deleted. + // But now that doc1 is resolved, the limbo resolution for doc2 can + // start. + .expectEvents(query, { removed: [doc1], fromCache: true }) + .expectLimboDocs(doc2.key) + .expectEnqueuedLimboDocs() + // Reject the listen for the second limbo resolution as well, in order + // to exercise the code path of a rejected limbo resolution without + // any enqueued limbo resolutions. + .watchRemoves( + limboQuery2, + new RpcError(Code.RESOURCE_EXHAUSTED, 'Resource exhausted') + ) + .expectEvents(query, { removed: [doc2] }) + .expectLimboDocs() + .expectEnqueuedLimboDocs() + ); + } + ); + + specTest( + 'Limbo resolution throttling with existence filter mismatch', + // TODO(dconeybe) Remove the 'no-*' tags as these platforms implement limbo + // resolution throttling. + ['no-android', 'no-ios'], + () => { + const query = Query.atPath(path('collection')); + const docA1 = doc('collection/a1', 1000, { key: 'a1' }); + const docA2 = doc('collection/a2', 1000, { key: 'a2' }); + const docA3 = doc('collection/a3', 1000, { key: 'a3' }); + const docB1 = doc('collection/b1', 1000, { key: 'b1' }); + const docB2 = doc('collection/b2', 1000, { key: 'b2' }); + const docB3 = doc('collection/b3', 1000, { key: 'b3' }); + const docA1Query = Query.atPath(docA1.key.path); + const docA2Query = Query.atPath(docA2.key.path); + const docA3Query = Query.atPath(docA3.key.path); + + // Verify that limbo resolution throttling works correctly with existence + // filter mismatches. This test exercises the steps that resulted in + // unbounded reads that motivated throttling: + // https://github.com/firebase/firebase-js-sdk/issues/2683 + return ( + spec() + .withMaxConcurrentLimboResolutions(2) + .userListens(query) + .watchAcks(query) + .watchSends({ affects: [query] }, docA1, docA2, docA3) + .watchCurrents(query, 'resume-token-1000') + .watchSnapshots(1000) + .expectEvents(query, { added: [docA1, docA2, docA3] }) + // Simulate that the client loses network connection. + .disableNetwork() + // Limbo document causes query to be "inconsistent" + .expectEvents(query, { fromCache: true }) + .enableNetwork() + .restoreListen(query, 'resume-token-1000') + .watchAcks(query) + // While this client was disconnected, another client deleted all the + // docAs replaced them with docBs. If Watch has to re-run the + // underlying query when this client re-listens, Watch won't be able + // to tell that docAs were deleted and will only send us existing + // documents that changed since the resume token. This will cause it + // to just send the docBs with an existence filter with a count of 3. + .watchSends({ affects: [query] }, docB1, docB2, docB3) + .watchFilters([query], docB1.key, docB2.key, docB3.key) + .watchSnapshots(1001) + .expectEvents(query, { + added: [docB1, docB2, docB3], + fromCache: true + }) + // The view now contains the docAs and the docBs (6 documents), but + // the existence filter indicated only 3 should match. This causes + // the client to re-listen without a resume token. + .expectActiveTargets({ query, resumeToken: '' }) + // When the existence filter mismatch was detected, the client removed + // then re-added the target. Watch needs to acknowledge the removal. + .watchRemoves(query) + .watchAcksFull(query, 1002, docB1, docB2, docB3) + // The docAs are now in limbo; the client begins limbo resolution. + .expectLimboDocs(docA1.key, docA2.key) + .expectEnqueuedLimboDocs(docA3.key) + .watchAcks(docA1Query) + .watchAcks(docA2Query) + .watchCurrents(docA1Query, 'resume-token-1003') + .watchCurrents(docA2Query, 'resume-token-1003') + .watchSnapshots(1003) + .expectEvents(query, { removed: [docA1, docA2], fromCache: true }) + .expectLimboDocs(docA3.key) + .expectEnqueuedLimboDocs() + .watchAcks(docA3Query) + .watchCurrents(docA3Query, 'resume-token-1004') + .watchSnapshots(1004) + .expectEvents(query, { removed: [docA3] }) + .expectLimboDocs() + .expectEnqueuedLimboDocs() + ); + } + ); }); diff --git a/packages/firestore/test/unit/specs/spec_builder.ts b/packages/firestore/test/unit/specs/spec_builder.ts index 3dc053cd5f7..ba2f6da2603 100644 --- a/packages/firestore/test/unit/specs/spec_builder.ts +++ b/packages/firestore/test/unit/specs/spec_builder.ts @@ -217,6 +217,11 @@ export class SpecBuilder { return this; } + withMaxConcurrentLimboResolutions(value?: number): this { + this.config.maxConcurrentLimboResolutions = value; + return this; + } + userListens(query: Query, resumeToken?: string): this { this.nextStep(); @@ -355,7 +360,8 @@ export class SpecBuilder { enableNetwork: false, expectedState: { activeTargets: {}, - limboDocs: [] + activeLimboDocs: [], + enqueuedLimboDocs: [] } }; return this; @@ -383,7 +389,8 @@ export class SpecBuilder { restart: true, expectedState: { activeTargets: {}, - limboDocs: [] + activeLimboDocs: [], + enqueuedLimboDocs: [] } }; // Reset our mappings / target ids since all existing listens will be @@ -398,7 +405,8 @@ export class SpecBuilder { shutdown: true, expectedState: { activeTargets: {}, - limboDocs: [] + activeLimboDocs: [], + enqueuedLimboDocs: [] } }; // Reset our mappings / target ids since all existing listens will be @@ -461,7 +469,7 @@ export class SpecBuilder { }); currentStep.expectedState = currentStep.expectedState || {}; - currentStep.expectedState.limboDocs = keys.map(k => + currentStep.expectedState.activeLimboDocs = keys.map(k => SpecBuilder.keyToSpec(k) ); currentStep.expectedState.activeTargets = objUtils.shallowCopy( @@ -470,6 +478,22 @@ export class SpecBuilder { return this; } + /** + * Expects a document to be in limbo, enqueued for limbo resolution, and + * therefore *without* an active targetId. + */ + expectEnqueuedLimboDocs(...keys: DocumentKey[]): this { + this.assertStep('Limbo expectation requires previous step'); + const currentStep = this.currentStep!; + + currentStep.expectedState = currentStep.expectedState || {}; + currentStep.expectedState.enqueuedLimboDocs = keys.map(k => + SpecBuilder.keyToSpec(k) + ); + + return this; + } + /** * Special helper for limbo documents that acks with either a document or * with no document for NoDocument. This is translated into normal watch diff --git a/packages/firestore/test/unit/specs/spec_test_runner.ts b/packages/firestore/test/unit/specs/spec_test_runner.ts index f4f398737c8..418d4ad648a 100644 --- a/packages/firestore/test/unit/specs/spec_test_runner.ts +++ b/packages/firestore/test/unit/specs/spec_test_runner.ts @@ -113,6 +113,7 @@ import { } from '../local/persistence_test_helpers'; import { MULTI_CLIENT_TAG } from './describe_spec'; import { ByteString } from '../../../src/util/byte_string'; +import { SortedSet } from '../../../src/util/sorted_set'; const ARBITRARY_SEQUENCE_NUMBER = 2; @@ -407,7 +408,8 @@ abstract class TestRunner { q.canonicalId() ); - private expectedLimboDocs: DocumentKey[]; + private expectedActiveLimboDocs: DocumentKey[]; + private expectedEnqueuedLimboDocs: DocumentKey[]; private expectedActiveTargets: { [targetId: number]: { queries: SpecQuery[]; resumeToken: string }; }; @@ -423,6 +425,7 @@ abstract class TestRunner { private useGarbageCollection: boolean; private numClients: number; + private maxConcurrentLimboResolutions?: number; private databaseInfo: DatabaseInfo; protected user = User.UNAUTHENTICATED; @@ -456,8 +459,10 @@ abstract class TestRunner { this.useGarbageCollection = config.useGarbageCollection; this.numClients = config.numClients; + this.maxConcurrentLimboResolutions = config.maxConcurrentLimboResolutions; - this.expectedLimboDocs = []; + this.expectedActiveLimboDocs = []; + this.expectedEnqueuedLimboDocs = []; this.expectedActiveTargets = {}; this.acknowledgedDocs = []; this.rejectedDocs = []; @@ -510,7 +515,8 @@ abstract class TestRunner { this.localStore, this.remoteStore, this.sharedClientState, - this.user + this.user, + this.maxConcurrentLimboResolutions ?? Number.MAX_SAFE_INTEGER ); // Set up wiring between sync engine and other components @@ -1038,8 +1044,13 @@ abstract class TestRunner { expectedState.watchStreamRequestCount ); } - if ('limboDocs' in expectedState) { - this.expectedLimboDocs = expectedState.limboDocs!.map(key); + if ('activeLimboDocs' in expectedState) { + this.expectedActiveLimboDocs = expectedState.activeLimboDocs!.map(key); + } + if ('enqueuedLimboDocs' in expectedState) { + this.expectedEnqueuedLimboDocs = expectedState.enqueuedLimboDocs!.map( + key + ); } if ('activeTargets' in expectedState) { this.expectedActiveTargets = expectedState.activeTargets!; @@ -1076,7 +1087,8 @@ abstract class TestRunner { if (this.started) { // Always validate that the expected limbo docs match the actual limbo // docs - this.validateLimboDocs(); + this.validateActiveLimboDocs(); + this.validateEnqueuedLimboDocs(); // Always validate that the expected active targets match the actual // active targets await this.validateActiveTargets(); @@ -1090,19 +1102,20 @@ abstract class TestRunner { this.snapshotsInSyncEvents = 0; } - private validateLimboDocs(): void { - let actualLimboDocs = this.syncEngine.currentLimboDocs(); - // Validate that each limbo doc has an expected active target + private validateActiveLimboDocs(): void { + let actualLimboDocs = this.syncEngine.activeLimboDocumentResolutions(); + // Validate that each active limbo doc has an expected active target actualLimboDocs.forEach((key, targetId) => { const targetIds: number[] = []; obj.forEachNumber(this.expectedActiveTargets, id => targetIds.push(id)); expect(obj.contains(this.expectedActiveTargets, targetId)).to.equal( true, - `Found limbo doc, but its target ID ${targetId} was not in the set of ` + - `expected active target IDs (${targetIds.join(', ')})` + `Found limbo doc ${key.toString()}, but its target ID ${targetId} ` + + `was not in the set of expected active target IDs ` + + `(${targetIds.join(', ')})` ); }); - for (const expectedLimboDoc of this.expectedLimboDocs) { + for (const expectedLimboDoc of this.expectedActiveLimboDocs) { expect(actualLimboDocs.get(expectedLimboDoc)).to.not.equal( null, 'Expected doc to be in limbo, but was not: ' + @@ -1112,10 +1125,36 @@ abstract class TestRunner { } expect(actualLimboDocs.size).to.equal( 0, - 'Unexpected docs in limbo: ' + actualLimboDocs.toString() + 'Unexpected active docs in limbo: ' + actualLimboDocs.toString() ); } + private validateEnqueuedLimboDocs(): void { + let actualLimboDocs = new SortedSet(DocumentKey.comparator); + this.syncEngine.enqueuedLimboDocumentResolutions().forEach(key => { + actualLimboDocs = actualLimboDocs.add(key); + }); + let expectedLimboDocs = new SortedSet(DocumentKey.comparator); + this.expectedEnqueuedLimboDocs.forEach(key => { + expectedLimboDocs = expectedLimboDocs.add(key); + }); + actualLimboDocs.forEach(key => { + expect(expectedLimboDocs.has(key)).to.equal( + true, + `Found enqueued limbo doc ${key.toString()}, but it was not in ` + + `the set of expected enqueued limbo documents ` + + `(${expectedLimboDocs.toString()})` + ); + }); + expectedLimboDocs.forEach(key => { + expect(actualLimboDocs.has(key)).to.equal( + true, + `Expected doc ${key.toString()} to be enqueued for limbo resolution, ` + + `but it was not in the queue (${actualLimboDocs.toString()})` + ); + }); + } + private async validateActiveTargets(): Promise { if (!this.isPrimaryClient || !this.networkEnabled) { expect(this.connection.activeTargets).to.be.empty; @@ -1372,6 +1411,13 @@ export interface SpecConfig { /** The number of active clients for this test run. */ numClients: number; + + /** + * The maximum number of concurrently-active listens for limbo resolutions. + * This value must be strictly greater than zero, or undefined to use the + * default value. + */ + maxConcurrentLimboResolutions?: number; } /** @@ -1632,8 +1678,17 @@ export interface StateExpectation { writeStreamRequestCount?: number; /** Number of requests sent to the watch stream. */ watchStreamRequestCount?: number; - /** Current documents in limbo. Verified in each step until overwritten. */ - limboDocs?: string[]; + /** + * Current documents in limbo that have an active target. + * Verified in each step until overwritten. + */ + activeLimboDocs?: string[]; + /** + * Current documents in limbo that are enqueued and therefore do not have an + * active target. + * Verified in each step until overwritten. + */ + enqueuedLimboDocs?: string[]; /** * Whether the instance holds the primary lease. Used in multi-client tests. */