diff --git a/Firestore/Example/Tests/SpecTests/FSTMockDatastore.mm b/Firestore/Example/Tests/SpecTests/FSTMockDatastore.mm index 4c645635dd9..27ad5e9c7c9 100644 --- a/Firestore/Example/Tests/SpecTests/FSTMockDatastore.mm +++ b/Firestore/Example/Tests/SpecTests/FSTMockDatastore.mm @@ -115,6 +115,11 @@ void WatchQuery(const TargetData& query) override { // Snapshot version is ignored on the wire TargetData sentTargetData = query.WithResumeToken(query.resume_token(), SnapshotVersion::None()); + + if (query.expected_count().has_value()) { + sentTargetData = sentTargetData.WithExpectedCount(query.expected_count().value()); + } + datastore_->IncrementWatchStreamRequests(); active_targets_[query.target_id()] = sentTargetData; } diff --git a/Firestore/Example/Tests/SpecTests/FSTSpecTests.mm b/Firestore/Example/Tests/SpecTests/FSTSpecTests.mm index c6c5ba015c9..698fe830cd8 100644 --- a/Firestore/Example/Tests/SpecTests/FSTSpecTests.mm +++ b/Firestore/Example/Tests/SpecTests/FSTSpecTests.mm @@ -780,6 +780,10 @@ - (void)validateExpectedState:(nullable NSDictionary *)expectedState { target_data = target_data.WithResumeToken( ByteString(), [self parseVersion:queryData[@"readTime"]]); } + + if ([queryData objectForKey:@"expectedCount"] != nil) { + target_data = target_data.WithExpectedCount([queryData[@"expectedCount"] intValue]); + } queries.push_back(std::move(target_data)); } expectedActiveTargets[targetID] = std::move(queries); @@ -896,7 +900,13 @@ - (void)validateActiveTargets { XCTAssertEqual(actual.target_id(), targetData.target_id()); XCTAssertEqual(actual.snapshot_version(), targetData.snapshot_version()); XCTAssertEqual(actual.resume_token(), targetData.resume_token()); - + if (targetData.expected_count().has_value()) { + if (!actual.expected_count().has_value()) { + XCTFail("Actual target data doesn't have an expected_count."); + } else { + XCTAssertEqual(actual.expected_count().value(), targetData.expected_count().value()); + } + } actualTargets.erase(targetID); } diff --git a/Firestore/Example/Tests/SpecTests/json/listen_spec_test.json b/Firestore/Example/Tests/SpecTests/json/listen_spec_test.json index 5755019b048..85170a91d71 100644 --- a/Firestore/Example/Tests/SpecTests/json/listen_spec_test.json +++ b/Firestore/Example/Tests/SpecTests/json/listen_spec_test.json @@ -3337,6 +3337,158 @@ } ] }, + "ExpectedCount in listen request should work after coming back online": { + "describeName": "Listens:", + "itName": "ExpectedCount in listen request should work after coming back online", + "tags": [ + ], + "config": { + "numClients": 1, + "useGarbageCollection": false + }, + "steps": [ + { + "userListen": { + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + }, + "targetId": 2 + }, + "expectedState": { + "activeTargets": { + "2": { + "queries": [ + { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + ], + "resumeToken": "" + } + } + } + }, + { + "watchAck": [ + 2 + ] + }, + { + "watchEntity": { + "docs": [ + { + "key": "collection/a", + "options": { + "hasCommittedMutations": false, + "hasLocalMutations": false + }, + "value": { + "key": "a" + }, + "version": 1000 + } + ], + "targets": [ + 2 + ] + } + }, + { + "watchCurrent": [ + [ + 2 + ], + "resume-token-1000" + ] + }, + { + "watchSnapshot": { + "targetIds": [ + ], + "version": 1000 + }, + "expectedSnapshotEvents": [ + { + "added": [ + { + "key": "collection/a", + "options": { + "hasCommittedMutations": false, + "hasLocalMutations": false + }, + "value": { + "key": "a" + }, + "version": 1000 + } + ], + "errorCode": 0, + "fromCache": false, + "hasPendingWrites": false, + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + } + ] + }, + { + "enableNetwork": false, + "expectedSnapshotEvents": [ + { + "errorCode": 0, + "fromCache": true, + "hasPendingWrites": false, + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + } + ], + "expectedState": { + "activeLimboDocs": [ + ], + "activeTargets": { + }, + "enqueuedLimboDocs": [ + ] + } + }, + { + "enableNetwork": true, + "expectedState": { + "activeTargets": { + "2": { + "queries": [ + { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + ], + "resumeToken": "resume-token-1000", + "expectedCount": 1 + } + } + } + } + ] + }, "Ignores update from inactive target": { "describeName": "Listens:", "itName": "Ignores update from inactive target", @@ -12409,6 +12561,525 @@ } ] }, + "Resuming a query should specify expectedCount that does not include pending mutations": { + "describeName": "Listens:", + "itName": "Resuming a query should specify expectedCount that does not include pending mutations", + "tags": [ + ], + "config": { + "numClients": 1, + "useGarbageCollection": false + }, + "steps": [ + { + "userListen": { + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + }, + "targetId": 2 + }, + "expectedState": { + "activeTargets": { + "2": { + "queries": [ + { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + ], + "resumeToken": "" + } + } + } + }, + { + "watchAck": [ + 2 + ] + }, + { + "watchEntity": { + "docs": [ + { + "key": "collection/a", + "options": { + "hasCommittedMutations": false, + "hasLocalMutations": false + }, + "value": { + "key": "a" + }, + "version": 1000 + } + ], + "targets": [ + 2 + ] + } + }, + { + "watchCurrent": [ + [ + 2 + ], + "resume-token-1000" + ] + }, + { + "watchSnapshot": { + "targetIds": [ + ], + "version": 1000 + }, + "expectedSnapshotEvents": [ + { + "added": [ + { + "key": "collection/a", + "options": { + "hasCommittedMutations": false, + "hasLocalMutations": false + }, + "value": { + "key": "a" + }, + "version": 1000 + } + ], + "errorCode": 0, + "fromCache": false, + "hasPendingWrites": false, + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + } + ] + }, + { + "userUnlisten": [ + 2, + { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + ], + "expectedState": { + "activeTargets": { + } + } + }, + { + "userSet": [ + "collection/b", + { + "key": "b" + } + ] + }, + { + "userListen": { + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + }, + "targetId": 2 + }, + "expectedSnapshotEvents": [ + { + "added": [ + { + "key": "collection/a", + "options": { + "hasCommittedMutations": false, + "hasLocalMutations": false + }, + "value": { + "key": "a" + }, + "version": 1000 + }, + { + "key": "collection/b", + "options": { + "hasCommittedMutations": false, + "hasLocalMutations": true + }, + "value": { + "key": "b" + }, + "version": 0 + } + ], + "errorCode": 0, + "fromCache": true, + "hasPendingWrites": true, + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + } + ], + "expectedState": { + "activeTargets": { + "2": { + "queries": [ + { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + ], + "resumeToken": "resume-token-1000", + "expectedCount": 1 + } + } + } + } + ] + }, + "Resuming a query should specify expectedCount when adding the target": { + "describeName": "Listens:", + "itName": "Resuming a query should specify expectedCount when adding the target", + "tags": [ + ], + "config": { + "numClients": 1, + "useGarbageCollection": false + }, + "steps": [ + { + "userListen": { + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + }, + "targetId": 2 + }, + "expectedState": { + "activeTargets": { + "2": { + "queries": [ + { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + ], + "resumeToken": "" + } + } + } + }, + { + "watchAck": [ + 2 + ] + }, + { + "watchEntity": { + "docs": [ + ], + "targets": [ + 2 + ] + } + }, + { + "watchCurrent": [ + [ + 2 + ], + "resume-token-1000" + ] + }, + { + "watchSnapshot": { + "targetIds": [ + ], + "version": 1000 + }, + "expectedSnapshotEvents": [ + { + "errorCode": 0, + "fromCache": false, + "hasPendingWrites": false, + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + } + ] + }, + { + "userUnlisten": [ + 2, + { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + ], + "expectedState": { + "activeTargets": { + } + } + }, + { + "watchRemove": { + "targetIds": [ + 2 + ] + } + }, + { + "userListen": { + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + }, + "targetId": 2 + }, + "expectedSnapshotEvents": [ + { + "errorCode": 0, + "fromCache": true, + "hasPendingWrites": false, + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + } + ], + "expectedState": { + "activeTargets": { + "2": { + "queries": [ + { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + ], + "resumeToken": "resume-token-1000", + "expectedCount": 0 + } + } + } + }, + { + "watchAck": [ + 2 + ] + }, + { + "watchEntity": { + "docs": [ + { + "key": "collection/a", + "options": { + "hasCommittedMutations": false, + "hasLocalMutations": false + }, + "value": { + "key": "a" + }, + "version": 1000 + }, + { + "key": "collection/b", + "options": { + "hasCommittedMutations": false, + "hasLocalMutations": false + }, + "value": { + "key": "b" + }, + "version": 1000 + } + ], + "targets": [ + 2 + ] + } + }, + { + "watchCurrent": [ + [ + 2 + ], + "resume-token-2000" + ] + }, + { + "watchSnapshot": { + "targetIds": [ + ], + "version": 2000 + }, + "expectedSnapshotEvents": [ + { + "added": [ + { + "key": "collection/a", + "options": { + "hasCommittedMutations": false, + "hasLocalMutations": false + }, + "value": { + "key": "a" + }, + "version": 1000 + }, + { + "key": "collection/b", + "options": { + "hasCommittedMutations": false, + "hasLocalMutations": false + }, + "value": { + "key": "b" + }, + "version": 1000 + } + ], + "errorCode": 0, + "fromCache": false, + "hasPendingWrites": false, + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + } + ] + }, + { + "userUnlisten": [ + 2, + { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + ], + "expectedState": { + "activeTargets": { + } + } + }, + { + "userListen": { + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + }, + "targetId": 2 + }, + "expectedSnapshotEvents": [ + { + "added": [ + { + "key": "collection/a", + "options": { + "hasCommittedMutations": false, + "hasLocalMutations": false + }, + "value": { + "key": "a" + }, + "version": 1000 + }, + { + "key": "collection/b", + "options": { + "hasCommittedMutations": false, + "hasLocalMutations": false + }, + "value": { + "key": "b" + }, + "version": 1000 + } + ], + "errorCode": 0, + "fromCache": true, + "hasPendingWrites": false, + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + } + ], + "expectedState": { + "activeTargets": { + "2": { + "queries": [ + { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + ], + "resumeToken": "resume-token-2000", + "expectedCount": 2 + } + } + } + } + ] + }, "Secondary client advances query state with global snapshot from primary": { "describeName": "Listens:", "itName": "Secondary client advances query state with global snapshot from primary", diff --git a/Firestore/Protos/nanopb/google/firestore/v1/firestore.nanopb.cc b/Firestore/Protos/nanopb/google/firestore/v1/firestore.nanopb.cc index 2d6f479416c..4013605af20 100644 --- a/Firestore/Protos/nanopb/google/firestore/v1/firestore.nanopb.cc +++ b/Firestore/Protos/nanopb/google/firestore/v1/firestore.nanopb.cc @@ -221,7 +221,7 @@ const pb_field_t google_firestore_v1_Target_fields[8] = { PB_ONEOF_FIELD(resume_type, 11, MESSAGE , ONEOF, STATIC , UNION, google_firestore_v1_Target, read_time, target_type.documents, &google_protobuf_Timestamp_fields), PB_FIELD( 5, INT32 , SINGULAR, STATIC , OTHER, google_firestore_v1_Target, target_id, resume_type.read_time, 0), PB_FIELD( 6, BOOL , SINGULAR, STATIC , OTHER, google_firestore_v1_Target, once, target_id, 0), - PB_FIELD( 12, MESSAGE , SINGULAR, STATIC , OTHER, google_firestore_v1_Target, expected_count, once, &google_protobuf_Int32Value_fields), + PB_FIELD( 12, MESSAGE , OPTIONAL, STATIC , OTHER, google_firestore_v1_Target, expected_count, once, &google_protobuf_Int32Value_fields), PB_LAST_FIELD }; @@ -834,8 +834,10 @@ std::string google_firestore_v1_Target::ToString(int indent) const { tostring_result += PrintPrimitiveField("target_id: ", target_id, indent + 1, false); tostring_result += PrintPrimitiveField("once: ", once, indent + 1, false); - tostring_result += PrintMessageField("expected_count ", - expected_count, indent + 1, false); + if (has_expected_count) { + tostring_result += PrintMessageField("expected_count ", + expected_count, indent + 1, true); + } std::string tostring_tail = PrintTail(indent); return tostring_header + tostring_result + tostring_tail; diff --git a/Firestore/Protos/nanopb/google/firestore/v1/firestore.nanopb.h b/Firestore/Protos/nanopb/google/firestore/v1/firestore.nanopb.h index 513a2fe5064..b2c87a26be2 100644 --- a/Firestore/Protos/nanopb/google/firestore/v1/firestore.nanopb.h +++ b/Firestore/Protos/nanopb/google/firestore/v1/firestore.nanopb.h @@ -375,6 +375,7 @@ typedef struct _google_firestore_v1_Target { } resume_type; int32_t target_id; bool once; + bool has_expected_count; google_protobuf_Int32Value expected_count; std::string ToString(int indent = 0) const; @@ -421,7 +422,7 @@ typedef struct _google_firestore_v1_ListenRequest { #define google_firestore_v1_ListenRequest_init_default {NULL, 0, {google_firestore_v1_Target_init_default}, 0, NULL} #define google_firestore_v1_ListenRequest_LabelsEntry_init_default {NULL, NULL} #define google_firestore_v1_ListenResponse_init_default {0, {google_firestore_v1_TargetChange_init_default}} -#define google_firestore_v1_Target_init_default {0, {google_firestore_v1_Target_QueryTarget_init_default}, 0, {NULL}, 0, 0, google_protobuf_Int32Value_init_default} +#define google_firestore_v1_Target_init_default {0, {google_firestore_v1_Target_QueryTarget_init_default}, 0, {NULL}, 0, 0, false, google_protobuf_Int32Value_init_default} #define google_firestore_v1_Target_DocumentsTarget_init_default {0, NULL} #define google_firestore_v1_Target_QueryTarget_init_default {NULL, 0, {google_firestore_v1_StructuredQuery_init_default}} #define google_firestore_v1_TargetChange_init_default {_google_firestore_v1_TargetChange_TargetChangeType_MIN, 0, NULL, false, google_rpc_Status_init_default, NULL, google_protobuf_Timestamp_init_default} @@ -450,7 +451,7 @@ typedef struct _google_firestore_v1_ListenRequest { #define google_firestore_v1_ListenRequest_init_zero {NULL, 0, {google_firestore_v1_Target_init_zero}, 0, NULL} #define google_firestore_v1_ListenRequest_LabelsEntry_init_zero {NULL, NULL} #define google_firestore_v1_ListenResponse_init_zero {0, {google_firestore_v1_TargetChange_init_zero}} -#define google_firestore_v1_Target_init_zero {0, {google_firestore_v1_Target_QueryTarget_init_zero}, 0, {NULL}, 0, 0, google_protobuf_Int32Value_init_zero} +#define google_firestore_v1_Target_init_zero {0, {google_firestore_v1_Target_QueryTarget_init_zero}, 0, {NULL}, 0, 0, false, google_protobuf_Int32Value_init_zero} #define google_firestore_v1_Target_DocumentsTarget_init_zero {0, NULL} #define google_firestore_v1_Target_QueryTarget_init_zero {NULL, 0, {google_firestore_v1_StructuredQuery_init_zero}} #define google_firestore_v1_TargetChange_init_zero {_google_firestore_v1_TargetChange_TargetChangeType_MIN, 0, NULL, false, google_rpc_Status_init_zero, NULL, google_protobuf_Timestamp_init_zero} diff --git a/Firestore/Protos/protos/google/firestore/v1/firestore.options b/Firestore/Protos/protos/google/firestore/v1/firestore.options index a577ead78df..14984149d50 100644 --- a/Firestore/Protos/protos/google/firestore/v1/firestore.options +++ b/Firestore/Protos/protos/google/firestore/v1/firestore.options @@ -21,3 +21,8 @@ # cause is not set if everything is OK, serializer needs to be able to tell # that is the case. google.firestore.v1.TargetChange.cause proto3:false + +# expected_count is set only if there is a resume token present in the +# Target. The backend may differentiate between an expected_count of 0 and +# expected_count being omitted. +google.firestore.v1.Target.expected_count proto3:false diff --git a/Firestore/core/src/core/sync_engine.cc b/Firestore/core/src/core/sync_engine.cc index cc664585200..6899c236870 100644 --- a/Firestore/core/src/core/sync_engine.cc +++ b/Firestore/core/src/core/sync_engine.cc @@ -111,7 +111,6 @@ TargetId SyncEngine::Listen(Query query) { TargetData target_data = local_store_->AllocateTarget(query.ToTarget()); TargetId target_id = target_data.target_id(); nanopb::ByteString resume_token = target_data.resume_token(); - remote_store_->Listen(std::move(target_data)); ViewSnapshot view_snapshot = InitializeViewAndComputeSnapshot( query, target_id, std::move(resume_token)); @@ -120,6 +119,7 @@ TargetId SyncEngine::Listen(Query query) { snapshots.push_back(std::move(view_snapshot)); sync_engine_callback_->OnViewSnapshots(std::move(snapshots)); + remote_store_->Listen(std::move(target_data)); return target_id; } diff --git a/Firestore/core/src/local/local_serializer.cc b/Firestore/core/src/local/local_serializer.cc index 92d82380513..14d1a5502b9 100644 --- a/Firestore/core/src/local/local_serializer.cc +++ b/Firestore/core/src/local/local_serializer.cc @@ -289,7 +289,8 @@ TargetData LocalSerializer::DecodeTargetData( if (!reader->status().ok()) return TargetData(); return TargetData(std::move(target), target_id, sequence_number, QueryPurpose::Listen, version, - last_limbo_free_snapshot_version, std::move(resume_token)); + last_limbo_free_snapshot_version, std::move(resume_token), + /*expected_count=*/absl::nullopt); } Message LocalSerializer::EncodeMutationBatch( diff --git a/Firestore/core/src/local/target_data.cc b/Firestore/core/src/local/target_data.cc index e5759d84c85..d056b41c38a 100644 --- a/Firestore/core/src/local/target_data.cc +++ b/Firestore/core/src/local/target_data.cc @@ -60,7 +60,8 @@ TargetData::TargetData(Target target, QueryPurpose purpose, SnapshotVersion snapshot_version, SnapshotVersion last_limbo_free_snapshot_version, - ByteString resume_token) + ByteString resume_token, + absl::optional expected_count) : target_(std::move(target)), target_id_(target_id), sequence_number_(sequence_number), @@ -68,7 +69,8 @@ TargetData::TargetData(Target target, snapshot_version_(std::move(snapshot_version)), last_limbo_free_snapshot_version_( std::move(last_limbo_free_snapshot_version)), - resume_token_(std::move(resume_token)) { + resume_token_(std::move(resume_token)), + expected_count_(std::move(expected_count)) { } TargetData::TargetData(Target target, @@ -81,35 +83,46 @@ TargetData::TargetData(Target target, purpose, SnapshotVersion::None(), SnapshotVersion::None(), - ByteString()) { + ByteString(), + /*expected_count=*/absl::nullopt) { } TargetData TargetData::Invalid() { return TargetData({}, /*target_id=*/-1, /*sequence_number=*/-1, QueryPurpose::Listen, SnapshotVersion(SnapshotVersion::None()), - SnapshotVersion(SnapshotVersion::None()), {}); + SnapshotVersion(SnapshotVersion::None()), {}, + /*expected_count=*/absl::nullopt); } TargetData TargetData::WithSequenceNumber( ListenSequenceNumber sequence_number) const { return TargetData(target_, target_id_, sequence_number, purpose_, snapshot_version_, last_limbo_free_snapshot_version_, - resume_token_); + resume_token_, expected_count_); } TargetData TargetData::WithResumeToken(ByteString resume_token, SnapshotVersion snapshot_version) const { return TargetData(target_, target_id_, sequence_number_, purpose_, std::move(snapshot_version), - last_limbo_free_snapshot_version_, std::move(resume_token)); + last_limbo_free_snapshot_version_, std::move(resume_token), + /*expected_count=*/absl::nullopt); +} + +TargetData TargetData::WithExpectedCount( + absl::optional expected_count) const { + return TargetData(target_, target_id_, sequence_number_, purpose_, + snapshot_version_, last_limbo_free_snapshot_version_, + resume_token_, std::move(expected_count)); } TargetData TargetData::WithLastLimboFreeSnapshotVersion( SnapshotVersion last_limbo_free_snapshot_version) const { return TargetData(target_, target_id_, sequence_number_, purpose_, snapshot_version_, - std::move(last_limbo_free_snapshot_version), resume_token_); + std::move(last_limbo_free_snapshot_version), resume_token_, + expected_count_); } bool operator==(const TargetData& lhs, const TargetData& rhs) { @@ -117,12 +130,13 @@ bool operator==(const TargetData& lhs, const TargetData& rhs) { lhs.sequence_number() == rhs.sequence_number() && lhs.purpose() == rhs.purpose() && lhs.snapshot_version() == rhs.snapshot_version() && - lhs.resume_token() == rhs.resume_token(); + lhs.resume_token() == rhs.resume_token() && + lhs.expected_count() == rhs.expected_count(); } size_t TargetData::Hash() const { return util::Hash(target_, target_id_, sequence_number_, purpose_, - snapshot_version_, resume_token_); + snapshot_version_, resume_token_, expected_count_); } std::string TargetData::ToString() const { @@ -138,7 +152,11 @@ std::ostream& operator<<(std::ostream& os, const TargetData& value) { << ", version=" << value.snapshot_version_ << ", last_limbo_free_snapshot_version=" << value.last_limbo_free_snapshot_version_ - << ", resume_token=" << value.resume_token_ << ")"; + << ", resume_token=" << value.resume_token_ << ", expected_count=" + << (value.expected_count_.has_value() + ? std::to_string(value.expected_count_.value()) + : "null") + << ")"; } } // namespace local diff --git a/Firestore/core/src/local/target_data.h b/Firestore/core/src/local/target_data.h index 5206145b052..5bdb94a423e 100644 --- a/Firestore/core/src/local/target_data.h +++ b/Firestore/core/src/local/target_data.h @@ -67,6 +67,9 @@ class TargetData { * target to be resumed after disconnecting without retransmitting all the * data that matches the query. The resume token essentially identifies a * point in time from which the server should resume sending results. + * @param expected_count The number of documents that last matched the query + * at the resume token or read time. Documents are counted only when making a + * listen request with resume token or read time, otherwise, keep it null. */ TargetData(core::Target target, model::TargetId target_id, @@ -74,7 +77,8 @@ class TargetData { QueryPurpose purpose, model::SnapshotVersion snapshot_version, model::SnapshotVersion last_limbo_free_snapshot_version, - nanopb::ByteString resume_token); + nanopb::ByteString resume_token, + absl::optional expected_count); /** * Convenience constructor for use when creating a TargetData for the first @@ -142,6 +146,15 @@ class TargetData { return resume_token_; } + /** + * The number of documents that last matched the query at the resume token or + * read time. Documents are counted only when making a listen request with + * resume token or read time, otherwise, keep it null. + */ + const absl::optional& expected_count() const { + return expected_count_; + } + /** Creates a new target data instance with an updated sequence number. */ TargetData WithSequenceNumber( model::ListenSequenceNumber sequence_number) const; @@ -153,6 +166,9 @@ class TargetData { TargetData WithResumeToken(nanopb::ByteString resume_token, model::SnapshotVersion snapshot_version) const; + /** Creates a new target data instance with an updated expected count. */ + TargetData WithExpectedCount(absl::optional expected_count) const; + /** * Creates a new target data instance with an updated last limbo free snapshot * version. @@ -176,6 +192,7 @@ class TargetData { model::SnapshotVersion snapshot_version_; model::SnapshotVersion last_limbo_free_snapshot_version_; nanopb::ByteString resume_token_; + absl::optional expected_count_; }; inline bool operator!=(const TargetData& lhs, const TargetData& rhs) { diff --git a/Firestore/core/src/remote/remote_store.cc b/Firestore/core/src/remote/remote_store.cc index 85c84f8fff9..cf98ff67296 100644 --- a/Firestore/core/src/remote/remote_store.cc +++ b/Firestore/core/src/remote/remote_store.cc @@ -193,7 +193,16 @@ void RemoteStore::SendWatchRequest(const TargetData& target_data) { // We need to increment the expected number of pending responses we're due // from watch so we wait for the ack to process any messages from this target. watch_change_aggregator_->RecordPendingTargetRequest(target_data.target_id()); - watch_stream_->WatchQuery(target_data); + + // Add expectedCount to target if there is a resume token. + if (!target_data.resume_token().empty()) { + int32_t expectedCount = + GetRemoteKeysForTarget(target_data.target_id()).size(); + TargetData new_target_data = target_data.WithExpectedCount(expectedCount); + watch_stream_->WatchQuery(new_target_data); + } else { + watch_stream_->WatchQuery(target_data); + } } void RemoteStore::SendUnwatchRequest(TargetId target_id) { diff --git a/Firestore/core/src/remote/serializer.cc b/Firestore/core/src/remote/serializer.cc index 402197739b9..d6eb0737cbc 100644 --- a/Firestore/core/src/remote/serializer.cc +++ b/Firestore/core/src/remote/serializer.cc @@ -635,6 +635,12 @@ google_firestore_v1_Target Serializer::EncodeTarget( result.which_resume_type = google_firestore_v1_Target_resume_token_tag; result.resume_type.resume_token = nanopb::CopyBytesArray(target_data.resume_token().get()); + + if (target_data.expected_count().has_value()) { + result.has_expected_count = true; + int32_t expected_count = target_data.expected_count().value(); + result.expected_count.value = expected_count; + } } return result; diff --git a/Firestore/core/test/unit/local/local_serializer_test.cc b/Firestore/core/test/unit/local/local_serializer_test.cc index 1e76027f732..ab760e73a57 100644 --- a/Firestore/core/test/unit/local/local_serializer_test.cc +++ b/Firestore/core/test/unit/local/local_serializer_test.cc @@ -286,7 +286,10 @@ class LocalSerializerTest : public ::testing::Test { serializer.DecodeTargetData(&reader, *message); EXPECT_OK(reader.status()); - EXPECT_EQ(target_data, actual_target_data); + // Set the expected_count in expected TargetData to null, as serializing + // a TargetData into local Target proto will drop the expected_count and + // the deserialized actual TargetData will not include expected_count. + EXPECT_EQ(target_data.WithExpectedCount(absl::nullopt), actual_target_data); } ByteString EncodeTargetData(local::LocalSerializer* localSerializer, @@ -466,10 +469,46 @@ TEST_F(LocalSerializerTest, EncodesTargetData) { SnapshotVersion limbo_free_version = testutil::Version(1000); ByteString resume_token = testutil::ResumeToken(1039); + TargetData target_data( + query.ToTarget(), target_id, sequence_number, QueryPurpose::Listen, + SnapshotVersion(version), SnapshotVersion(limbo_free_version), + ByteString(resume_token), /*expected_count=*/absl::nullopt); + + ::firestore::client::Target expected; + expected.set_target_id(target_id); + expected.set_last_listen_sequence_number(sequence_number); + expected.mutable_snapshot_version()->set_nanos(1039000); + expected.mutable_last_limbo_free_snapshot_version()->set_nanos(1000000); + expected.set_resume_token(resume_token.data(), resume_token.size()); + v1::Target::QueryTarget* query_proto = expected.mutable_query(); + + // Add expected collection. + query_proto->set_parent("projects/p/databases/d/documents"); + v1::StructuredQuery::CollectionSelector from; + from.set_collection_id("room"); + *query_proto->mutable_structured_query()->add_from() = std::move(from); + + // Add default order_by. + v1::StructuredQuery::Order order; + order.mutable_field()->set_field_path(FieldPath::kDocumentKeyPath); + order.set_direction(v1::StructuredQuery::ASCENDING); + *query_proto->mutable_structured_query()->add_order_by() = std::move(order); + + ExpectRoundTrip(target_data, expected); +} + +TEST_F(LocalSerializerTest, EncodesTargetDataWillDropExpectedCount) { + core::Query query = Query("room"); + TargetId target_id = 42; + ListenSequenceNumber sequence_number = 10; + SnapshotVersion version = testutil::Version(1039); + SnapshotVersion limbo_free_version = testutil::Version(1000); + ByteString resume_token = testutil::ResumeToken(1039); + TargetData target_data(query.ToTarget(), target_id, sequence_number, QueryPurpose::Listen, SnapshotVersion(version), SnapshotVersion(limbo_free_version), - ByteString(resume_token)); + ByteString(resume_token), /*expected_count=*/1234); ::firestore::client::Target expected; expected.set_target_id(target_id); @@ -530,10 +569,36 @@ TEST_F(LocalSerializerTest, EncodesTargetDataWithDocumentQuery) { SnapshotVersion limbo_free_version = testutil::Version(1000); ByteString resume_token = testutil::ResumeToken(1039); + TargetData target_data( + query.ToTarget(), target_id, sequence_number, QueryPurpose::Listen, + SnapshotVersion(version), SnapshotVersion(limbo_free_version), + ByteString(resume_token), /*expected_count=*/absl::nullopt); + + ::firestore::client::Target expected; + expected.set_target_id(target_id); + expected.set_last_listen_sequence_number(sequence_number); + expected.mutable_snapshot_version()->set_nanos(1039000); + expected.mutable_last_limbo_free_snapshot_version()->set_nanos(1000000); + expected.set_resume_token(resume_token.data(), resume_token.size()); + v1::Target::DocumentsTarget* documents_proto = expected.mutable_documents(); + documents_proto->add_documents("projects/p/databases/d/documents/room/1"); + + ExpectRoundTrip(target_data, expected); +} + +TEST_F(LocalSerializerTest, + EncodesTargetDataWithDocumentQueryWillDropExpectedCount) { + core::Query query = Query("room/1"); + TargetId target_id = 42; + ListenSequenceNumber sequence_number = 10; + SnapshotVersion version = testutil::Version(1039); + SnapshotVersion limbo_free_version = testutil::Version(1000); + ByteString resume_token = testutil::ResumeToken(1039); + TargetData target_data(query.ToTarget(), target_id, sequence_number, QueryPurpose::Listen, SnapshotVersion(version), SnapshotVersion(limbo_free_version), - ByteString(resume_token)); + ByteString(resume_token), /*expected_count=*/1234); ::firestore::client::Target expected; expected.set_target_id(target_id); diff --git a/Firestore/core/test/unit/local/target_cache_test.cc b/Firestore/core/test/unit/local/target_cache_test.cc index f42c17f0b94..262d46edfca 100644 --- a/Firestore/core/test/unit/local/target_cache_test.cc +++ b/Firestore/core/test/unit/local/target_cache_test.cc @@ -79,7 +79,7 @@ TargetData TargetCacheTestBase::MakeTargetData( ByteString resume_token = ResumeToken(version); return TargetData(query.ToTarget(), target_id, sequence_number, QueryPurpose::Listen, Version(version), Version(version), - resume_token); + resume_token, /*expected_count=*/absl::nullopt); } void TargetCacheTestBase::AddMatchingKey(const DocumentKey& key, diff --git a/Firestore/core/test/unit/remote/serializer_test.cc b/Firestore/core/test/unit/remote/serializer_test.cc index 5c8d08216ed..0be416c9fe5 100644 --- a/Firestore/core/test/unit/remote/serializer_test.cc +++ b/Firestore/core/test/unit/remote/serializer_test.cc @@ -75,6 +75,7 @@ namespace { namespace v1 = google::firestore::v1; using core::Bound; +using google::protobuf::Int32Value; using google::protobuf::util::MessageDifferencer; using local::QueryPurpose; using local::TargetData; @@ -1489,7 +1490,7 @@ TEST_F(SerializerTest, EncodesResumeTokens) { core::Query q = Query("docs"); TargetData model(q.ToTarget(), 1, 0, QueryPurpose::Listen, SnapshotVersion::None(), SnapshotVersion::None(), - Bytes({1, 2, 3})); + Bytes({1, 2, 3}), /*expected_count=*/absl::nullopt); v1::Target proto; proto.mutable_query()->set_parent(ResourceName("")); @@ -1512,6 +1513,63 @@ TEST_F(SerializerTest, EncodesResumeTokens) { ExpectRoundTrip(model, proto); } +TEST_F(SerializerTest, EncodesExpectedCount) { + core::Query q = Query("docs"); + TargetData model(q.ToTarget(), 1, 0, QueryPurpose::Listen, + SnapshotVersion::None(), SnapshotVersion::None(), + Bytes({1, 2, 3}), /*expected_count=*/1234); + + v1::Target proto; + proto.mutable_query()->set_parent(ResourceName("")); + proto.set_target_id(1); + + v1::StructuredQuery::CollectionSelector from; + from.set_collection_id("docs"); + *proto.mutable_query()->mutable_structured_query()->add_from() = + std::move(from); + + v1::StructuredQuery::Order order; + order.mutable_field()->set_field_path(FieldPath::kDocumentKeyPath); + order.set_direction(v1::StructuredQuery::ASCENDING); + *proto.mutable_query()->mutable_structured_query()->add_order_by() = + std::move(order); + + proto.set_resume_token("\001\002\003"); + + google::protobuf::Int32Value int32_value; + google::protobuf::Int32Value* expected_count = int32_value.New(); + expected_count->set_value(1234); + proto.set_allocated_expected_count(expected_count); + + EXPECT_TRUE(proto.has_expected_count()); + ExpectRoundTrip(model, proto); +} + +TEST_F(SerializerTest, EncodeExpectedCountSkippedWithoutResumeToken) { + core::Query q = Query("docs"); + TargetData model(q.ToTarget(), 1, 0, QueryPurpose::Listen, + SnapshotVersion::None(), SnapshotVersion::None(), + ByteString(), /*expected_count=*/1234); + + v1::Target proto; + proto.mutable_query()->set_parent(ResourceName("")); + proto.set_target_id(1); + + v1::StructuredQuery::CollectionSelector from; + from.set_collection_id("docs"); + *proto.mutable_query()->mutable_structured_query()->add_from() = + std::move(from); + + v1::StructuredQuery::Order order; + order.mutable_field()->set_field_path(FieldPath::kDocumentKeyPath); + order.set_direction(v1::StructuredQuery::ASCENDING); + *proto.mutable_query()->mutable_structured_query()->add_order_by() = + std::move(order); + + EXPECT_FALSE(proto.has_expected_count()); + ExpectRoundTrip(model, proto); +} + TEST_F(SerializerTest, EncodesListenRequestLabels) { core::Query q = Query("docs");