diff --git a/packages/firestore/src/core/sync_engine_impl.ts b/packages/firestore/src/core/sync_engine_impl.ts index cbe66dce8c4..efe65df6439 100644 --- a/packages/firestore/src/core/sync_engine_impl.ts +++ b/packages/firestore/src/core/sync_engine_impl.ts @@ -316,9 +316,6 @@ export async function syncEngineListen( syncEngineImpl.localStore, queryToTarget(query) ); - if (syncEngineImpl.isPrimaryClient) { - remoteStoreListen(syncEngineImpl.remoteStore, targetData); - } const status = syncEngineImpl.sharedClientState.addLocalQueryTarget( targetData.targetId @@ -331,6 +328,10 @@ export async function syncEngineListen( status === 'current', targetData.resumeToken ); + + if (syncEngineImpl.isPrimaryClient) { + remoteStoreListen(syncEngineImpl.remoteStore, targetData); + } } return viewSnapshot; diff --git a/packages/firestore/src/local/target_data.ts b/packages/firestore/src/local/target_data.ts index 95237e574a7..2893cbf342a 100644 --- a/packages/firestore/src/local/target_data.ts +++ b/packages/firestore/src/local/target_data.ts @@ -66,7 +66,13 @@ export class TargetData { * matches the target. The resume token essentially identifies a point in * time from which the server should resume sending results. */ - readonly resumeToken: ByteString = ByteString.EMPTY_BYTE_STRING + readonly resumeToken: ByteString = ByteString.EMPTY_BYTE_STRING, + /** + * The number of documents that last matched the query at the resume token or + * read time. Documents are counted only when making a listen request with + * resume token or read time, otherwise, keep it null. + */ + readonly expectedCount: number | null = null ) {} /** Creates a new target data instance with an updated sequence number. */ @@ -78,7 +84,8 @@ export class TargetData { sequenceNumber, this.snapshotVersion, this.lastLimboFreeSnapshotVersion, - this.resumeToken + this.resumeToken, + this.expectedCount ); } @@ -97,7 +104,24 @@ export class TargetData { this.sequenceNumber, snapshotVersion, this.lastLimboFreeSnapshotVersion, - resumeToken + resumeToken, + /* expectedCount= */ null + ); + } + + /** + * Creates a new target data instance with an updated expected count. + */ + withExpectedCount(expectedCount: number): TargetData { + return new TargetData( + this.target, + this.targetId, + this.purpose, + this.sequenceNumber, + this.snapshotVersion, + this.lastLimboFreeSnapshotVersion, + this.resumeToken, + expectedCount ); } @@ -115,7 +139,8 @@ export class TargetData { this.sequenceNumber, this.snapshotVersion, lastLimboFreeSnapshotVersion, - this.resumeToken + this.resumeToken, + this.expectedCount ); } } diff --git a/packages/firestore/src/remote/remote_store.ts b/packages/firestore/src/remote/remote_store.ts index 0e1a39e7ec3..769260e8d8d 100644 --- a/packages/firestore/src/remote/remote_store.ts +++ b/packages/firestore/src/remote/remote_store.ts @@ -334,6 +334,17 @@ function sendWatchRequest( remoteStoreImpl.watchChangeAggregator!.recordPendingTargetRequest( targetData.targetId ); + + if ( + targetData.resumeToken.approximateByteSize() > 0 || + targetData.snapshotVersion.compareTo(SnapshotVersion.min()) > 0 + ) { + const expectedCount = remoteStoreImpl.remoteSyncer.getRemoteKeysForTarget!( + targetData.targetId + ).size; + targetData = targetData.withExpectedCount(expectedCount); + } + ensureWatchStream(remoteStoreImpl).watch(targetData); } diff --git a/packages/firestore/src/remote/serializer.ts b/packages/firestore/src/remote/serializer.ts index 32da4f18d9b..cbdcb3b2d88 100644 --- a/packages/firestore/src/remote/serializer.ts +++ b/packages/firestore/src/remote/serializer.ts @@ -1012,6 +1012,7 @@ export function toTarget( if (targetData.resumeToken.approximateByteSize() > 0) { result.resumeToken = toBytes(serializer, targetData.resumeToken); + result.expectedCount = targetData.expectedCount ?? undefined; } else if (targetData.snapshotVersion.compareTo(SnapshotVersion.min()) > 0) { // TODO(wuandy): Consider removing above check because it is most likely true. // Right now, many tests depend on this behaviour though (leaving min() out @@ -1020,6 +1021,7 @@ export function toTarget( serializer, targetData.snapshotVersion.toTimestamp() ); + result.expectedCount = targetData.expectedCount ?? undefined; } return result; diff --git a/packages/firestore/test/unit/remote/serializer.helper.ts b/packages/firestore/test/unit/remote/serializer.helper.ts index 79140e73999..a9b22a9555d 100644 --- a/packages/firestore/test/unit/remote/serializer.helper.ts +++ b/packages/firestore/test/unit/remote/serializer.helper.ts @@ -1778,7 +1778,8 @@ export function serializerTest( } }, resumeToken: new Uint8Array([1, 2, 3]), - targetId: 1 + targetId: 1, + expectedCount: undefined }; expect(result).to.deep.equal(expected); }); diff --git a/packages/firestore/test/unit/specs/listen_spec.test.ts b/packages/firestore/test/unit/specs/listen_spec.test.ts index 8bc62391838..dc9955f7716 100644 --- a/packages/firestore/test/unit/specs/listen_spec.test.ts +++ b/packages/firestore/test/unit/specs/listen_spec.test.ts @@ -1809,4 +1809,86 @@ describeSpec('Listens:', [], () => { ); } ); + + specTest( + 'Resuming a query should specify expectedCount when adding the target', + [], + () => { + const query1 = query('collection'); + const docA = doc('collection/a', 1000, { key: 'a' }); + const docB = doc('collection/b', 1000, { key: 'b' }); + + return ( + spec() + .withGCEnabled(false) + .userListens(query1) + .watchAcksFull(query1, 1000) + .expectEvents(query1, {}) + .userUnlistens(query1) + .watchRemoves(query1) + // There are 0 remote documents from previous listen. + .userListens(query1, { + resumeToken: 'resume-token-1000', + expectedCount: 0 + }) + .expectEvents(query1, { fromCache: true }) + .watchAcksFull(query1, 2000, docA, docB) + .expectEvents(query1, { added: [docA, docB] }) + .userUnlistens(query1) + .userListens(query1, { + resumeToken: 'resume-token-2000', + expectedCount: 2 + }) + .expectEvents(query1, { added: [docA, docB], fromCache: true }) + ); + } + ); + + specTest( + 'Resuming a query should specify expectedCount that does not include pending mutations', + [], + () => { + const query1 = query('collection'); + const docA = doc('collection/a', 1000, { key: 'a' }); + const docBLocal = doc('collection/b', 1000, { + key: 'b' + }).setHasLocalMutations(); + + return spec() + .withGCEnabled(false) + .userListens(query1) + .watchAcksFull(query1, 1000, docA) + .expectEvents(query1, { added: [docA] }) + .userUnlistens(query1) + .userSets('collection/b', { key: 'b' }) + .userListens(query1, { + resumeToken: 'resume-token-1000', + expectedCount: 1 + }) + .expectEvents(query1, { + added: [docA, docBLocal], + fromCache: true, + hasPendingWrites: true + }); + } + ); + + specTest( + 'ExpectedCount in listen request should work after coming back online', + [], + () => { + const query1 = query('collection'); + const docA = doc('collection/a', 1000, { key: 'a' }); + + return spec() + .withGCEnabled(false) + .userListens(query1) + .watchAcksFull(query1, 1000, docA) + .expectEvents(query1, { added: [docA] }) + .disableNetwork() + .expectEvents(query1, { fromCache: true }) + .enableNetwork() + .restoreListen(query1, 'resume-token-1000', /* expectedCount= */ 1); + } + ); }); diff --git a/packages/firestore/test/unit/specs/spec_builder.ts b/packages/firestore/test/unit/specs/spec_builder.ts index 84f383335e1..e6d6b84e75f 100644 --- a/packages/firestore/test/unit/specs/spec_builder.ts +++ b/packages/firestore/test/unit/specs/spec_builder.ts @@ -76,12 +76,19 @@ export interface ActiveTargetSpec { queries: SpecQuery[]; resumeToken?: string; readTime?: TestSnapshotVersion; + expectedCount?: number; } export interface ActiveTargetMap { [targetId: string]: ActiveTargetSpec; } +export interface ResumeSpec { + resumeToken?: string; + readTime?: TestSnapshotVersion; + expectedCount?: number; +} + /** * Tracks the expected memory state of a client (e.g. the expected active watch * targets based on userListens(), userUnlistens(), and watchRemoves() @@ -256,10 +263,7 @@ export class SpecBuilder { return this; } - userListens( - query: Query, - resume?: { resumeToken?: string; readTime?: TestSnapshotVersion } - ): this { + userListens(query: Query, resume?: ResumeSpec): this { this.nextStep(); const target = queryToTarget(query); @@ -278,12 +282,7 @@ export class SpecBuilder { } this.queryMapping.set(target, targetId); - this.addQueryToActiveTargets( - targetId, - query, - resume?.resumeToken, - resume?.readTime - ); + this.addQueryToActiveTargets(targetId, query, resume); this.currentStep = { userListen: { targetId, query: SpecBuilder.queryToSpec(query) }, expectedState: { activeTargets: { ...this.activeTargets } } @@ -296,14 +295,21 @@ export class SpecBuilder { * Registers a previously active target with the test expectations after a * stream disconnect. */ - restoreListen(query: Query, resumeToken: string): this { + restoreListen( + query: Query, + resumeToken: string, + expectedCount?: number + ): this { const targetId = this.queryMapping.get(queryToTarget(query)); if (isNullOrUndefined(targetId)) { throw new Error("Can't restore an unknown query: " + query); } - this.addQueryToActiveTargets(targetId!, query, resumeToken); + this.addQueryToActiveTargets(targetId!, query, { + resumeToken, + expectedCount + }); const currentStep = this.currentStep!; currentStep.expectedState = currentStep.expectedState || {}; @@ -531,18 +537,18 @@ export class SpecBuilder { query: Query; resumeToken?: string; readTime?: TestSnapshotVersion; + expectedCount?: number; }> ): this { this.assertStep('Active target expectation requires previous step'); const currentStep = this.currentStep!; this.clientState.activeTargets = {}; - targets.forEach(({ query, resumeToken, readTime }) => { - this.addQueryToActiveTargets( - this.getTargetId(query), - query, + targets.forEach(({ query, resumeToken, readTime, expectedCount }) => { + this.addQueryToActiveTargets(this.getTargetId(query), query, { resumeToken, - readTime - ); + readTime, + expectedCount + }); }); currentStep.expectedState = currentStep.expectedState || {}; currentStep.expectedState.activeTargets = { ...this.activeTargets }; @@ -573,7 +579,7 @@ export class SpecBuilder { this.addQueryToActiveTargets( this.limboMapping[path], newQueryForPath(key.path), - '' + { resumeToken: '' } ); }); @@ -912,22 +918,14 @@ export class SpecBuilder { } /** Registers a query that is active in another tab. */ - expectListen( - query: Query, - resume?: { resumeToken?: string; readTime?: TestSnapshotVersion } - ): this { + expectListen(query: Query, resume?: ResumeSpec): this { this.assertStep('Expectations require previous step'); const target = queryToTarget(query); const targetId = this.queryIdGenerator.cachedId(target); this.queryMapping.set(target, targetId); - this.addQueryToActiveTargets( - targetId, - query, - resume?.resumeToken, - resume?.readTime - ); + this.addQueryToActiveTargets(targetId, query, resume); const currentStep = this.currentStep!; currentStep.expectedState = currentStep.expectedState || {}; @@ -1095,9 +1093,12 @@ export class SpecBuilder { private addQueryToActiveTargets( targetId: number, query: Query, - resumeToken?: string, - readTime?: TestSnapshotVersion + resume?: ResumeSpec ): void { + if (!(resume?.resumeToken || resume?.readTime) && resume?.expectedCount) { + fail('Expected count is present without a resume token or read time.'); + } + if (this.activeTargets[targetId]) { const activeQueries = this.activeTargets[targetId].queries; if ( @@ -1108,21 +1109,24 @@ export class SpecBuilder { // `query` is not added yet. this.activeTargets[targetId] = { queries: [SpecBuilder.queryToSpec(query), ...activeQueries], - resumeToken: resumeToken || '', - readTime + resumeToken: resume?.resumeToken || '', + readTime: resume?.readTime, + expectedCount: resume?.expectedCount }; } else { this.activeTargets[targetId] = { queries: activeQueries, - resumeToken: resumeToken || '', - readTime + resumeToken: resume?.resumeToken || '', + readTime: resume?.readTime, + expectedCount: resume?.expectedCount }; } } else { this.activeTargets[targetId] = { queries: [SpecBuilder.queryToSpec(query)], - resumeToken: resumeToken || '', - readTime + resumeToken: resume?.resumeToken || '', + readTime: resume?.readTime, + expectedCount: resume?.expectedCount }; } } @@ -1134,7 +1138,8 @@ export class SpecBuilder { if (queriesAfterRemoval.length > 0) { this.activeTargets[targetId] = { queries: queriesAfterRemoval, - resumeToken: this.activeTargets[targetId].resumeToken + resumeToken: this.activeTargets[targetId].resumeToken, + expectedCount: this.activeTargets[targetId].expectedCount }; } else { delete this.activeTargets[targetId]; diff --git a/packages/firestore/test/unit/specs/spec_test_runner.ts b/packages/firestore/test/unit/specs/spec_test_runner.ts index affda741e7a..107459ebce2 100644 --- a/packages/firestore/test/unit/specs/spec_test_runner.ts +++ b/packages/firestore/test/unit/specs/spec_test_runner.ts @@ -1117,6 +1117,10 @@ abstract class TestRunner { version(expected.readTime!) ); } + if (expected.expectedCount !== undefined) { + targetData = targetData.withExpectedCount(expected.expectedCount); + } + const expectedTarget = toTarget(this.serializer, targetData); expect(actualTarget.query).to.deep.equal(expectedTarget.query); expect(actualTarget.targetId).to.equal(expectedTarget.targetId); @@ -1128,6 +1132,11 @@ abstract class TestRunner { expectedTarget.resumeToken )}, actual: ${stringFromBase64String(actualTarget.resumeToken)}` ); + if (expected.expectedCount !== undefined) { + expect(actualTarget.expectedCount).to.equal( + expectedTarget.expectedCount + ); + } delete actualTargets[targetId]; }); expect(objectSize(actualTargets)).to.equal(