Skip to content

Commit 49e3609

Browse files
authored
Merge cef7db7 into 036849f
2 parents 036849f + cef7db7 commit 49e3609

File tree

8 files changed

+182
-46
lines changed

8 files changed

+182
-46
lines changed

packages/firestore/src/core/sync_engine_impl.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -316,9 +316,6 @@ export async function syncEngineListen(
316316
syncEngineImpl.localStore,
317317
queryToTarget(query)
318318
);
319-
if (syncEngineImpl.isPrimaryClient) {
320-
remoteStoreListen(syncEngineImpl.remoteStore, targetData);
321-
}
322319

323320
const status = syncEngineImpl.sharedClientState.addLocalQueryTarget(
324321
targetData.targetId
@@ -331,6 +328,10 @@ export async function syncEngineListen(
331328
status === 'current',
332329
targetData.resumeToken
333330
);
331+
332+
if (syncEngineImpl.isPrimaryClient) {
333+
remoteStoreListen(syncEngineImpl.remoteStore, targetData);
334+
}
334335
}
335336

336337
return viewSnapshot;

packages/firestore/src/local/target_data.ts

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,13 @@ export class TargetData {
6666
* matches the target. The resume token essentially identifies a point in
6767
* time from which the server should resume sending results.
6868
*/
69-
readonly resumeToken: ByteString = ByteString.EMPTY_BYTE_STRING
69+
readonly resumeToken: ByteString = ByteString.EMPTY_BYTE_STRING,
70+
/**
71+
* The number of documents that last matched the query at the resume token or
72+
* read time. Documents are counted only when making a listen request with
73+
* resume token or read time, otherwise, keep it null.
74+
*/
75+
readonly expectedCount: number | null = null
7076
) {}
7177

7278
/** Creates a new target data instance with an updated sequence number. */
@@ -78,7 +84,8 @@ export class TargetData {
7884
sequenceNumber,
7985
this.snapshotVersion,
8086
this.lastLimboFreeSnapshotVersion,
81-
this.resumeToken
87+
this.resumeToken,
88+
this.expectedCount
8289
);
8390
}
8491

@@ -97,7 +104,24 @@ export class TargetData {
97104
this.sequenceNumber,
98105
snapshotVersion,
99106
this.lastLimboFreeSnapshotVersion,
100-
resumeToken
107+
resumeToken,
108+
/* expectedCount= */ null
109+
);
110+
}
111+
112+
/**
113+
* Creates a new target data instance with an updated expected count.
114+
*/
115+
withExpectedCount(expectedCount: number): TargetData {
116+
return new TargetData(
117+
this.target,
118+
this.targetId,
119+
this.purpose,
120+
this.sequenceNumber,
121+
this.snapshotVersion,
122+
this.lastLimboFreeSnapshotVersion,
123+
this.resumeToken,
124+
expectedCount
101125
);
102126
}
103127

@@ -115,7 +139,8 @@ export class TargetData {
115139
this.sequenceNumber,
116140
this.snapshotVersion,
117141
lastLimboFreeSnapshotVersion,
118-
this.resumeToken
142+
this.resumeToken,
143+
this.expectedCount
119144
);
120145
}
121146
}

packages/firestore/src/remote/remote_store.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,17 @@ function sendWatchRequest(
334334
remoteStoreImpl.watchChangeAggregator!.recordPendingTargetRequest(
335335
targetData.targetId
336336
);
337+
338+
if (
339+
targetData.resumeToken.approximateByteSize() > 0 ||
340+
targetData.snapshotVersion.compareTo(SnapshotVersion.min()) > 0
341+
) {
342+
const expectedCount = remoteStoreImpl.remoteSyncer.getRemoteKeysForTarget!(
343+
targetData.targetId
344+
).size;
345+
targetData = targetData.withExpectedCount(expectedCount);
346+
}
347+
337348
ensureWatchStream(remoteStoreImpl).watch(targetData);
338349
}
339350

packages/firestore/src/remote/serializer.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,6 +1012,7 @@ export function toTarget(
10121012

10131013
if (targetData.resumeToken.approximateByteSize() > 0) {
10141014
result.resumeToken = toBytes(serializer, targetData.resumeToken);
1015+
result.expectedCount = targetData.expectedCount ?? undefined;
10151016
} else if (targetData.snapshotVersion.compareTo(SnapshotVersion.min()) > 0) {
10161017
// TODO(wuandy): Consider removing above check because it is most likely true.
10171018
// Right now, many tests depend on this behaviour though (leaving min() out
@@ -1020,6 +1021,7 @@ export function toTarget(
10201021
serializer,
10211022
targetData.snapshotVersion.toTimestamp()
10221023
);
1024+
result.expectedCount = targetData.expectedCount ?? undefined;
10231025
}
10241026

10251027
return result;

packages/firestore/test/unit/remote/serializer.helper.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1778,7 +1778,8 @@ export function serializerTest(
17781778
}
17791779
},
17801780
resumeToken: new Uint8Array([1, 2, 3]),
1781-
targetId: 1
1781+
targetId: 1,
1782+
expectedCount: undefined
17821783
};
17831784
expect(result).to.deep.equal(expected);
17841785
});

packages/firestore/test/unit/specs/listen_spec.test.ts

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1809,4 +1809,86 @@ describeSpec('Listens:', [], () => {
18091809
);
18101810
}
18111811
);
1812+
1813+
specTest(
1814+
'Resuming a query should specify expectedCount when adding the target',
1815+
[],
1816+
() => {
1817+
const query1 = query('collection');
1818+
const docA = doc('collection/a', 1000, { key: 'a' });
1819+
const docB = doc('collection/b', 1000, { key: 'b' });
1820+
1821+
return (
1822+
spec()
1823+
.withGCEnabled(false)
1824+
.userListens(query1)
1825+
.watchAcksFull(query1, 1000)
1826+
.expectEvents(query1, {})
1827+
.userUnlistens(query1)
1828+
.watchRemoves(query1)
1829+
// There are 0 remote documents from previous listen.
1830+
.userListens(query1, {
1831+
resumeToken: 'resume-token-1000',
1832+
expectedCount: 0
1833+
})
1834+
.expectEvents(query1, { fromCache: true })
1835+
.watchAcksFull(query1, 2000, docA, docB)
1836+
.expectEvents(query1, { added: [docA, docB] })
1837+
.userUnlistens(query1)
1838+
.userListens(query1, {
1839+
resumeToken: 'resume-token-2000',
1840+
expectedCount: 2
1841+
})
1842+
.expectEvents(query1, { added: [docA, docB], fromCache: true })
1843+
);
1844+
}
1845+
);
1846+
1847+
specTest(
1848+
'Resuming a query should specify expectedCount that does not include pending mutations',
1849+
[],
1850+
() => {
1851+
const query1 = query('collection');
1852+
const docA = doc('collection/a', 1000, { key: 'a' });
1853+
const docBLocal = doc('collection/b', 1000, {
1854+
key: 'b'
1855+
}).setHasLocalMutations();
1856+
1857+
return spec()
1858+
.withGCEnabled(false)
1859+
.userListens(query1)
1860+
.watchAcksFull(query1, 1000, docA)
1861+
.expectEvents(query1, { added: [docA] })
1862+
.userUnlistens(query1)
1863+
.userSets('collection/b', { key: 'b' })
1864+
.userListens(query1, {
1865+
resumeToken: 'resume-token-1000',
1866+
expectedCount: 1
1867+
})
1868+
.expectEvents(query1, {
1869+
added: [docA, docBLocal],
1870+
fromCache: true,
1871+
hasPendingWrites: true
1872+
});
1873+
}
1874+
);
1875+
1876+
specTest(
1877+
'ExpectedCount in listen request should work after coming back online',
1878+
[],
1879+
() => {
1880+
const query1 = query('collection');
1881+
const docA = doc('collection/a', 1000, { key: 'a' });
1882+
1883+
return spec()
1884+
.withGCEnabled(false)
1885+
.userListens(query1)
1886+
.watchAcksFull(query1, 1000, docA)
1887+
.expectEvents(query1, { added: [docA] })
1888+
.disableNetwork()
1889+
.expectEvents(query1, { fromCache: true })
1890+
.enableNetwork()
1891+
.restoreListen(query1, 'resume-token-1000', /* expectedCount= */ 1);
1892+
}
1893+
);
18121894
});

packages/firestore/test/unit/specs/spec_builder.ts

Lines changed: 43 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,19 @@ export interface ActiveTargetSpec {
7676
queries: SpecQuery[];
7777
resumeToken?: string;
7878
readTime?: TestSnapshotVersion;
79+
expectedCount?: number;
7980
}
8081

8182
export interface ActiveTargetMap {
8283
[targetId: string]: ActiveTargetSpec;
8384
}
8485

86+
export interface ResumeSpec {
87+
resumeToken?: string;
88+
readTime?: TestSnapshotVersion;
89+
expectedCount?: number;
90+
}
91+
8592
/**
8693
* Tracks the expected memory state of a client (e.g. the expected active watch
8794
* targets based on userListens(), userUnlistens(), and watchRemoves()
@@ -256,10 +263,7 @@ export class SpecBuilder {
256263
return this;
257264
}
258265

259-
userListens(
260-
query: Query,
261-
resume?: { resumeToken?: string; readTime?: TestSnapshotVersion }
262-
): this {
266+
userListens(query: Query, resume?: ResumeSpec): this {
263267
this.nextStep();
264268

265269
const target = queryToTarget(query);
@@ -278,12 +282,7 @@ export class SpecBuilder {
278282
}
279283

280284
this.queryMapping.set(target, targetId);
281-
this.addQueryToActiveTargets(
282-
targetId,
283-
query,
284-
resume?.resumeToken,
285-
resume?.readTime
286-
);
285+
this.addQueryToActiveTargets(targetId, query, resume);
287286
this.currentStep = {
288287
userListen: { targetId, query: SpecBuilder.queryToSpec(query) },
289288
expectedState: { activeTargets: { ...this.activeTargets } }
@@ -296,14 +295,21 @@ export class SpecBuilder {
296295
* Registers a previously active target with the test expectations after a
297296
* stream disconnect.
298297
*/
299-
restoreListen(query: Query, resumeToken: string): this {
298+
restoreListen(
299+
query: Query,
300+
resumeToken: string,
301+
expectedCount?: number
302+
): this {
300303
const targetId = this.queryMapping.get(queryToTarget(query));
301304

302305
if (isNullOrUndefined(targetId)) {
303306
throw new Error("Can't restore an unknown query: " + query);
304307
}
305308

306-
this.addQueryToActiveTargets(targetId!, query, resumeToken);
309+
this.addQueryToActiveTargets(targetId!, query, {
310+
resumeToken,
311+
expectedCount
312+
});
307313

308314
const currentStep = this.currentStep!;
309315
currentStep.expectedState = currentStep.expectedState || {};
@@ -531,18 +537,18 @@ export class SpecBuilder {
531537
query: Query;
532538
resumeToken?: string;
533539
readTime?: TestSnapshotVersion;
540+
expectedCount?: number;
534541
}>
535542
): this {
536543
this.assertStep('Active target expectation requires previous step');
537544
const currentStep = this.currentStep!;
538545
this.clientState.activeTargets = {};
539-
targets.forEach(({ query, resumeToken, readTime }) => {
540-
this.addQueryToActiveTargets(
541-
this.getTargetId(query),
542-
query,
546+
targets.forEach(({ query, resumeToken, readTime, expectedCount }) => {
547+
this.addQueryToActiveTargets(this.getTargetId(query), query, {
543548
resumeToken,
544-
readTime
545-
);
549+
readTime,
550+
expectedCount
551+
});
546552
});
547553
currentStep.expectedState = currentStep.expectedState || {};
548554
currentStep.expectedState.activeTargets = { ...this.activeTargets };
@@ -573,7 +579,7 @@ export class SpecBuilder {
573579
this.addQueryToActiveTargets(
574580
this.limboMapping[path],
575581
newQueryForPath(key.path),
576-
''
582+
{ resumeToken: '' }
577583
);
578584
});
579585

@@ -912,22 +918,14 @@ export class SpecBuilder {
912918
}
913919

914920
/** Registers a query that is active in another tab. */
915-
expectListen(
916-
query: Query,
917-
resume?: { resumeToken?: string; readTime?: TestSnapshotVersion }
918-
): this {
921+
expectListen(query: Query, resume?: ResumeSpec): this {
919922
this.assertStep('Expectations require previous step');
920923

921924
const target = queryToTarget(query);
922925
const targetId = this.queryIdGenerator.cachedId(target);
923926
this.queryMapping.set(target, targetId);
924927

925-
this.addQueryToActiveTargets(
926-
targetId,
927-
query,
928-
resume?.resumeToken,
929-
resume?.readTime
930-
);
928+
this.addQueryToActiveTargets(targetId, query, resume);
931929

932930
const currentStep = this.currentStep!;
933931
currentStep.expectedState = currentStep.expectedState || {};
@@ -1095,9 +1093,12 @@ export class SpecBuilder {
10951093
private addQueryToActiveTargets(
10961094
targetId: number,
10971095
query: Query,
1098-
resumeToken?: string,
1099-
readTime?: TestSnapshotVersion
1096+
resume?: ResumeSpec
11001097
): void {
1098+
if (!(resume?.resumeToken || resume?.readTime) && resume?.expectedCount) {
1099+
fail('Expected count is present without a resume token or read time.');
1100+
}
1101+
11011102
if (this.activeTargets[targetId]) {
11021103
const activeQueries = this.activeTargets[targetId].queries;
11031104
if (
@@ -1108,21 +1109,24 @@ export class SpecBuilder {
11081109
// `query` is not added yet.
11091110
this.activeTargets[targetId] = {
11101111
queries: [SpecBuilder.queryToSpec(query), ...activeQueries],
1111-
resumeToken: resumeToken || '',
1112-
readTime
1112+
resumeToken: resume?.resumeToken || '',
1113+
readTime: resume?.readTime,
1114+
expectedCount: resume?.expectedCount
11131115
};
11141116
} else {
11151117
this.activeTargets[targetId] = {
11161118
queries: activeQueries,
1117-
resumeToken: resumeToken || '',
1118-
readTime
1119+
resumeToken: resume?.resumeToken || '',
1120+
readTime: resume?.readTime,
1121+
expectedCount: resume?.expectedCount
11191122
};
11201123
}
11211124
} else {
11221125
this.activeTargets[targetId] = {
11231126
queries: [SpecBuilder.queryToSpec(query)],
1124-
resumeToken: resumeToken || '',
1125-
readTime
1127+
resumeToken: resume?.resumeToken || '',
1128+
readTime: resume?.readTime,
1129+
expectedCount: resume?.expectedCount
11261130
};
11271131
}
11281132
}
@@ -1134,7 +1138,8 @@ export class SpecBuilder {
11341138
if (queriesAfterRemoval.length > 0) {
11351139
this.activeTargets[targetId] = {
11361140
queries: queriesAfterRemoval,
1137-
resumeToken: this.activeTargets[targetId].resumeToken
1141+
resumeToken: this.activeTargets[targetId].resumeToken,
1142+
expectedCount: this.activeTargets[targetId].expectedCount
11381143
};
11391144
} else {
11401145
delete this.activeTargets[targetId];

0 commit comments

Comments
 (0)