Skip to content

Add expectedCount to Target in listen request #6854

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions packages/firestore/src/core/sync_engine_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,6 @@ export async function syncEngineListen(
syncEngineImpl.localStore,
queryToTarget(query)
);
if (syncEngineImpl.isPrimaryClient) {
remoteStoreListen(syncEngineImpl.remoteStore, targetData);
}

const status = syncEngineImpl.sharedClientState.addLocalQueryTarget(
targetData.targetId
Expand All @@ -331,6 +328,10 @@ export async function syncEngineListen(
status === 'current',
targetData.resumeToken
);

if (syncEngineImpl.isPrimaryClient) {
remoteStoreListen(syncEngineImpl.remoteStore, targetData);
}
}

return viewSnapshot;
Expand Down
32 changes: 28 additions & 4 deletions packages/firestore/src/local/target_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ export class TargetData {
* matches the target. The resume token essentially identifies a point in
* time from which the server should resume sending results.
*/
readonly resumeToken: ByteString = ByteString.EMPTY_BYTE_STRING
readonly resumeToken: ByteString = ByteString.EMPTY_BYTE_STRING,
/**
* The number of documents that last matched the query at the resume token or
* read time.
*/
readonly expectedCount: number = 0
) {}

/** Creates a new target data instance with an updated sequence number. */
Expand All @@ -78,7 +83,8 @@ export class TargetData {
sequenceNumber,
this.snapshotVersion,
this.lastLimboFreeSnapshotVersion,
this.resumeToken
this.resumeToken,
this.expectedCount
);
}

Expand All @@ -97,7 +103,24 @@ export class TargetData {
this.sequenceNumber,
snapshotVersion,
this.lastLimboFreeSnapshotVersion,
resumeToken
resumeToken,
this.expectedCount
);
}

/**
* Creates a new target data instance with an updated expected count.
*/
withExpectedCount(expectedCount: number): TargetData {
return new TargetData(
this.target,
this.targetId,
this.purpose,
this.sequenceNumber,
this.snapshotVersion,
this.lastLimboFreeSnapshotVersion,
this.resumeToken,
expectedCount
);
}

Expand All @@ -115,7 +138,8 @@ export class TargetData {
this.sequenceNumber,
this.snapshotVersion,
lastLimboFreeSnapshotVersion,
this.resumeToken
this.resumeToken,
this.expectedCount
);
}
}
11 changes: 11 additions & 0 deletions packages/firestore/src/remote/remote_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,17 @@ function sendWatchRequest(
remoteStoreImpl.watchChangeAggregator!.recordPendingTargetRequest(
targetData.targetId
);

if (
targetData.resumeToken.approximateByteSize() > 0 ||
targetData.snapshotVersion.compareTo(SnapshotVersion.min()) > 0
) {
const expectedCount = remoteStoreImpl.remoteSyncer.getRemoteKeysForTarget!(
targetData.targetId
).size;
targetData = targetData.withExpectedCount(expectedCount);
}

ensureWatchStream(remoteStoreImpl).watch(targetData);
}

Expand Down
2 changes: 2 additions & 0 deletions packages/firestore/src/remote/serializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,7 @@ export function toTarget(

if (targetData.resumeToken.approximateByteSize() > 0) {
result.resumeToken = toBytes(serializer, targetData.resumeToken);
result.expectedCount = targetData.expectedCount;
} else if (targetData.snapshotVersion.compareTo(SnapshotVersion.min()) > 0) {
// TODO(wuandy): Consider removing above check because it is most likely true.
// Right now, many tests depend on this behaviour though (leaving min() out
Expand All @@ -1020,6 +1021,7 @@ export function toTarget(
serializer,
targetData.snapshotVersion.toTimestamp()
);
result.expectedCount = targetData.expectedCount;
}

return result;
Expand Down
3 changes: 2 additions & 1 deletion packages/firestore/test/unit/remote/serializer.helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1778,7 +1778,8 @@ export function serializerTest(
}
},
resumeToken: new Uint8Array([1, 2, 3]),
targetId: 1
targetId: 1,
expectedCount: 0
};
expect(result).to.deep.equal(expected);
});
Expand Down
90 changes: 90 additions & 0 deletions packages/firestore/test/unit/specs/listen_spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1809,4 +1809,94 @@ describeSpec('Listens:', [], () => {
);
}
);

specTest(
'Query with resume target can pass in expectedCount to listen request',
[],
() => {
const query1 = query('collection');
const docA = doc('collection/a', 1000, { key: 'a' });
const docB = doc('collection/b', 1000, { key: 'b' });

return (
spec()
.withGCEnabled(false)
.userListens(query1)
.watchAcksFull(query1, 1000)
.expectEvents(query1, {})
.userUnlistens(query1)
.watchRemoves(query1)
// There is 0 remote document from previous listen.
.userListens(query1, {
resumeToken: 'resume-token-1000',
expectedCount: 0
})
.expectEvents(query1, { fromCache: true })
.watchAcksFull(query1, 2000, docA, docB)
.expectEvents(query1, { added: [docA, docB] })
.userUnlistens(query1)
.userListens(query1, {
resumeToken: 'resume-token-2000',
expectedCount: 2
})
.expectEvents(query1, { added: [docA, docB], fromCache: true })
);
}
);

specTest(
'ExpectedCount should equal to the number of documents that last matched the query at the resume token',
[],
() => {
const query1 = query('collection');
const docA = doc('collection/a', 1000, { key: 'a' });
const docBLocal = doc('collection/b', 1000, {
key: 'b'
}).setHasLocalMutations();

return spec()
.withGCEnabled(false)
.userListens(query1)
.watchAcksFull(query1, 1000, docA)
.expectEvents(query1, { added: [docA] })
.userUnlistens(query1)
.userSets('collection/b', { key: 'b' })
.userListens(query1, {
resumeToken: 'resume-token-1000',
expectedCount: 1
})
.expectEvents(query1, {
added: [docA, docBLocal],
fromCache: true,
hasPendingWrites: true
});
}
);

specTest(
'ExpectedCount in listen request should work after coming back online',
[],
() => {
const query1 = query('collection');
const docA = doc('collection/a', 1000, { key: 'a' });
const docBLocal = doc('collection/b', 1000, {
key: 'b'
}).setHasLocalMutations();

return spec()
.withGCEnabled(false)
.userListens(query1)
.watchAcksFull(query1, 1000, docA)
.expectEvents(query1, { added: [docA] })
.userSets('collection/b', { key: 'b' })
.expectEvents(query1, {
hasPendingWrites: true,
added: [docBLocal]
})
.disableNetwork()
.expectEvents(query1, { hasPendingWrites: true, fromCache: true })
.enableNetwork()
.restoreListen(query1, 'resume-token-1000', 1);
}
);
});
Loading