Skip to content

Add expectedCount to Target in listen request #6854

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions packages/firestore/src/core/sync_engine_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,6 @@ export async function syncEngineListen(
syncEngineImpl.localStore,
queryToTarget(query)
);
if (syncEngineImpl.isPrimaryClient) {
remoteStoreListen(syncEngineImpl.remoteStore, targetData);
}

const status = syncEngineImpl.sharedClientState.addLocalQueryTarget(
targetData.targetId
Expand All @@ -331,6 +328,10 @@ export async function syncEngineListen(
status === 'current',
targetData.resumeToken
);

if (syncEngineImpl.isPrimaryClient) {
remoteStoreListen(syncEngineImpl.remoteStore, targetData);
}
}

return viewSnapshot;
Expand Down
39 changes: 32 additions & 7 deletions packages/firestore/src/local/target_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ export class TargetData {
* matches the target. The resume token essentially identifies a point in
* time from which the server should resume sending results.
*/
readonly resumeToken: ByteString = ByteString.EMPTY_BYTE_STRING
readonly resumeToken: ByteString = ByteString.EMPTY_BYTE_STRING,

/** The number of documents that last matched the query at the resume token or
* read time.
*/
readonly expectedCount: number = 0
) {}

/** Creates a new target data instance with an updated sequence number. */
Expand All @@ -78,17 +83,19 @@ export class TargetData {
sequenceNumber,
this.snapshotVersion,
this.lastLimboFreeSnapshotVersion,
this.resumeToken
this.resumeToken,
this.expectedCount
);
}

/**
* Creates a new target data instance with an updated resume token and
* snapshot version.
* Creates a new target data instance with an updated resume token,
* snapshot version, and expectedCount if presented.
*/
withResumeToken(
resumeToken: ByteString,
snapshotVersion: SnapshotVersion
snapshotVersion: SnapshotVersion,
expectedCount?: number
): TargetData {
return new TargetData(
this.target,
Expand All @@ -97,7 +104,24 @@ export class TargetData {
this.sequenceNumber,
snapshotVersion,
this.lastLimboFreeSnapshotVersion,
resumeToken
resumeToken,
expectedCount || this.expectedCount
);
}

/**
* Creates a new target data instance with an updated expected count.
*/
withExpectedCount(expectedCount: number): TargetData {
return new TargetData(
this.target,
this.targetId,
this.purpose,
this.sequenceNumber,
this.snapshotVersion,
this.lastLimboFreeSnapshotVersion,
this.resumeToken,
expectedCount
);
}

Expand All @@ -115,7 +139,8 @@ export class TargetData {
this.sequenceNumber,
this.snapshotVersion,
lastLimboFreeSnapshotVersion,
this.resumeToken
this.resumeToken,
this.expectedCount
);
}
}
11 changes: 11 additions & 0 deletions packages/firestore/src/remote/remote_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,17 @@ function sendWatchRequest(
remoteStoreImpl.watchChangeAggregator!.recordPendingTargetRequest(
targetData.targetId
);

if (
targetData.resumeToken.approximateByteSize() > 0 ||
targetData.snapshotVersion.compareTo(SnapshotVersion.min()) > 0
) {
const expectedCount = remoteStoreImpl.remoteSyncer.getRemoteKeysForTarget!(
targetData.targetId
).size;
targetData = targetData.withExpectedCount(expectedCount);
}

ensureWatchStream(remoteStoreImpl).watch(targetData);
}

Expand Down
2 changes: 2 additions & 0 deletions packages/firestore/src/remote/serializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,7 @@ export function toTarget(

if (targetData.resumeToken.approximateByteSize() > 0) {
result.resumeToken = toBytes(serializer, targetData.resumeToken);
result.expectedCount = targetData.expectedCount;
} else if (targetData.snapshotVersion.compareTo(SnapshotVersion.min()) > 0) {
// TODO(wuandy): Consider removing above check because it is most likely true.
// Right now, many tests depend on this behaviour though (leaving min() out
Expand All @@ -1020,6 +1021,7 @@ export function toTarget(
serializer,
targetData.snapshotVersion.toTimestamp()
);
result.expectedCount = targetData.expectedCount;
}

return result;
Expand Down
3 changes: 2 additions & 1 deletion packages/firestore/test/unit/remote/serializer.helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1778,7 +1778,8 @@ export function serializerTest(
}
},
resumeToken: new Uint8Array([1, 2, 3]),
targetId: 1
targetId: 1,
expectedCount: 0
};
expect(result).to.deep.equal(expected);
});
Expand Down
2 changes: 1 addition & 1 deletion packages/firestore/test/unit/specs/bundle_spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ describeSpec('Bundles:', ['no-ios'], () => {
// Read named query from loaded bundle by secondary.
.client(0)
.expectListen(query1, { readTime: 400 })
.expectActiveTargets({ query: query1, readTime: 400 })
.expectActiveTargets({ query: query1, resume: { readTime: 400 } })
.userListens(query2, { readTime: 560 })
.expectEvents(query2, {
added: [doc('collection/a', 550, { value: 'c' })],
Expand Down
12 changes: 6 additions & 6 deletions packages/firestore/test/unit/specs/existence_filter_spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ describeSpec('Existence Filters:', [], () => {
.watchSnapshots(2000)
// query is now marked as "inconsistent" because of filter mismatch
.expectEvents(query1, { fromCache: true })
.expectActiveTargets({ query: query1, resumeToken: '' })
.expectActiveTargets({ query: query1, resume: { resumeToken: '' } })
.watchRemoves(query1) // Acks removal of query
.watchAcksFull(query1, 2000, doc1)
.expectLimboDocs(doc2.key) // doc2 is now in limbo
Expand Down Expand Up @@ -146,7 +146,7 @@ describeSpec('Existence Filters:', [], () => {
.watchSnapshots(2000)
// query is now marked as "inconsistent" because of filter mismatch
.expectEvents(query1, { fromCache: true })
.expectActiveTargets({ query: query1, resumeToken: '' })
.expectActiveTargets({ query: query1, resume: { resumeToken: '' } })
.watchRemoves(query1) // Acks removal of query
.watchAcksFull(query1, 2000, doc1)
.expectLimboDocs(doc2.key) // doc2 is now in limbo
Expand All @@ -173,14 +173,14 @@ describeSpec('Existence Filters:', [], () => {
.watchStreamCloses(Code.UNAVAILABLE)
.expectActiveTargets({
query: query1,
resumeToken: 'existence-filter-resume-token'
resume: { resumeToken: 'existence-filter-resume-token' }
})
.watchAcks(query1)
.watchFilters([query1], [doc1.key]) // in the next sync doc2 was deleted
.watchSnapshots(2000)
// query is now marked as "inconsistent" because of filter mismatch
.expectEvents(query1, { fromCache: true })
.expectActiveTargets({ query: query1, resumeToken: '' })
.expectActiveTargets({ query: query1, resume: { resumeToken: '' } })
.watchRemoves(query1) // Acks removal of query
.watchAcksFull(query1, 2000, doc1)
.expectLimboDocs(doc2.key) // doc2 is now in limbo
Expand Down Expand Up @@ -211,7 +211,7 @@ describeSpec('Existence Filters:', [], () => {
// The query result includes doc3, but is marked as "inconsistent"
// due to the filter mismatch
.expectEvents(query1, { added: [doc3], fromCache: true })
.expectActiveTargets({ query: query1, resumeToken: '' })
.expectActiveTargets({ query: query1, resume: { resumeToken: '' } })
.watchRemoves(query1) // Acks removal of query
.watchAcksFull(query1, 3000, doc1, doc2, doc3)
.expectEvents(query1, { added: [doc2] })
Expand Down Expand Up @@ -243,7 +243,7 @@ describeSpec('Existence Filters:', [], () => {
.watchSnapshots(2000)
// query is now marked as "inconsistent" because of filter mismatch
.expectEvents(query1, { fromCache: true })
.expectActiveTargets({ query: query1, resumeToken: '' })
.expectActiveTargets({ query: query1, resume: { resumeToken: '' } })
.watchRemoves(query1) // Acks removal of query
.watchAcksFull(query1, 2000, doc1)
.expectLimboDocs(doc2.key) // doc2 is now in limbo
Expand Down
2 changes: 1 addition & 1 deletion packages/firestore/test/unit/specs/limbo_spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,7 @@ describeSpec('Limbo Documents:', [], () => {
// The view now contains the docAs and the docBs (6 documents), but
// the existence filter indicated only 3 should match. This causes
// the client to re-listen without a resume token.
.expectActiveTargets({ query: query1, resumeToken: '' })
.expectActiveTargets({ query: query1, resume: { resumeToken: '' } })
// When the existence filter mismatch was detected, the client removed
// then re-added the target. Watch needs to acknowledge the removal.
.watchRemoves(query1)
Expand Down
5 changes: 4 additions & 1 deletion packages/firestore/test/unit/specs/limit_spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,10 @@ describeSpec('Limits:', [], () => {
.watchSends({ affects: [limitQuery] }, secondDocument)
.watchFilters([limitQuery], [secondDocument.key])
.watchSnapshots(1004)
.expectActiveTargets({ query: limitQuery, resumeToken: '' })
.expectActiveTargets({
query: limitQuery,
resume: { resumeToken: '' }
})
.watchRemoves(limitQuery)
.watchAcksFull(limitQuery, 1005, secondDocument)
// The snapshot after the existence filter mismatch triggers limbo
Expand Down
81 changes: 81 additions & 0 deletions packages/firestore/test/unit/specs/listen_spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1809,4 +1809,85 @@ describeSpec('Listens:', [], () => {
);
}
);

specTest(
'Query with resume target can pass in expectedCount to listen request',
[],
() => {
const query1 = query('collection');
const docA = doc('collection/a', 1000, { key: 'a' });
const docB = doc('collection/b', 1000, { key: 'b' });

return (
spec()
.withGCEnabled(false)
.userListens(query1)
.watchAcksFull(query1, 1000)
.expectEvents(query1, {})
.userUnlistens(query1)
.watchRemoves(query1)
// There is 0 remote document from previous listen.
.userListens(query1, { resumeToken: 'resume-token-1000' }, 0)
.expectEvents(query1, { fromCache: true })
.watchAcksFull(query1, 2000, docA, docB)
.expectEvents(query1, { added: [docA, docB] })
.userUnlistens(query1)
.userListens(query1, { resumeToken: 'resume-token-2000' }, 2)
.expectEvents(query1, { added: [docA, docB], fromCache: true })
);
}
);

specTest(
'ExpectedCount should equal to the number of documents that last matched the query at the resume token',
[],
() => {
const query1 = query('collection');
const docA = doc('collection/a', 1000, { key: 'a' });
const docBLocal = doc('collection/b', 1000, {
key: 'b'
}).setHasLocalMutations();

return spec()
.withGCEnabled(false)
.userListens(query1)
.watchAcksFull(query1, 1000, docA)
.expectEvents(query1, { added: [docA] })
.userUnlistens(query1)
.userSets('collection/b', { key: 'b' })
.userListens(query1, { resumeToken: 'resume-token-1000' }, 1)
.expectEvents(query1, {
added: [docA, docBLocal],
fromCache: true,
hasPendingWrites: true
});
}
);

specTest(
'ExpectedCount in listen request should work after coming back online',
[],
() => {
const query1 = query('collection');
const docA = doc('collection/a', 1000, { key: 'a' });
const docBLocal = doc('collection/b', 1000, {
key: 'b'
}).setHasLocalMutations();

return spec()
.withGCEnabled(false)
.userListens(query1)
.watchAcksFull(query1, 1000, docA)
.expectEvents(query1, { added: [docA] })
.userSets('collection/b', { key: 'b' })
.expectEvents(query1, {
hasPendingWrites: true,
added: [docBLocal]
})
.disableNetwork()
.expectEvents(query1, { hasPendingWrites: true, fromCache: true })
.enableNetwork()
.restoreListen(query1, 'resume-token-1000', 1);
}
);
});
5 changes: 4 additions & 1 deletion packages/firestore/test/unit/specs/persistence_spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,10 @@ describeSpec('Persistence:', [], () => {
.expectEvents(query1, { added: [anonDoc], hasPendingWrites: true })
.changeUser('user1')
// A user change will re-send the query with the current resume token
.expectActiveTargets({ query: query1, resumeToken: 'resume-token-500' })
.expectActiveTargets({
query: query1,
resume: { resumeToken: 'resume-token-500' }
})
.expectEvents(query1, { removed: [anonDoc] })
.userSets('users/user1', { uid: 'user1' })
.expectEvents(query1, { added: [user1Doc], hasPendingWrites: true })
Expand Down
8 changes: 4 additions & 4 deletions packages/firestore/test/unit/specs/recovery_spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ describeSpec('Persistence Recovery', ['no-ios', 'no-android'], () => {
// Client is back online
.expectActiveTargets({
query: query1,
resumeToken: 'resume-token-1000'
resume: { resumeToken: 'resume-token-1000' }
})
.expectUserCallbacks({ acknowledged: ['collection/a'] })
.watchAcksFull(query1, 1001, doc1)
Expand Down Expand Up @@ -549,7 +549,7 @@ describeSpec('Persistence Recovery', ['no-ios', 'no-android'], () => {
.runTimer(TimerId.AsyncQueueRetry)
.expectActiveTargets({
query: query1,
resumeToken: 'resume-token-1000'
resume: { resumeToken: 'resume-token-1000' }
})
.watchAcksFull(query1, 2000, doc2)
.expectEvents(query1, {
Expand Down Expand Up @@ -639,7 +639,7 @@ describeSpec('Persistence Recovery', ['no-ios', 'no-android'], () => {
.expectActiveTargets(
{
query: filteredQuery,
resumeToken: 'resume-token-2000'
resume: { resumeToken: 'resume-token-2000' }
},
{ query: limboQuery }
)
Expand Down Expand Up @@ -681,7 +681,7 @@ describeSpec('Persistence Recovery', ['no-ios', 'no-android'], () => {
.recoverDatabase()
.runTimer(TimerId.AsyncQueueRetry)
.expectActiveTargets(
{ query: filteredQuery, resumeToken: 'resume-token-2000' },
{ query: filteredQuery, resume: { resumeToken: 'resume-token-2000' } },
{ query: limboQuery }
)
.watchAcksFull(filteredQuery, 3000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ describeSpec('Remote store:', [], () => {
// Change user (will shut down existing streams and start new ones).
.changeUser('abc')
// Our query should be sent to the new stream.
.expectActiveTargets({ query: query1, resumeToken: '' })
.expectActiveTargets({ query: query1, resume: { resumeToken: '' } })

// Close the (newly-created) stream as if it too failed (should trigger
// retry with backoff, potentially reproducing the crash in b/74749605).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ describeSpec('Resume tokens:', [], () => {
.watchStreamCloses(Code.UNAVAILABLE)
.expectActiveTargets({
query: query1,
resumeToken: 'custom-query-resume-token'
resume: { resumeToken: 'custom-query-resume-token' }
});
});

Expand Down
Loading