Skip to content

Commit 4494127

Browse files
amabluea-maurice
amablue
authored andcommitted
Added Tagged versions of sync tree functions, to bring the SyncTree in
alignment with the Java implementation. These were originally omitted because I was told that they were not necessary anymore, but it turns out they are needed to support query conditions. PiperOrigin-RevId: 270320533
1 parent eab9e30 commit 4494127

14 files changed

+291
-58
lines changed

database/src/desktop/core/info_listen_provider.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "database/src/common/query_spec.h"
1818
#include "database/src/desktop/connection/persistent_connection.h"
1919
#include "database/src/desktop/core/listen_provider.h"
20+
#include "database/src/desktop/core/tag.h"
2021
#include "database/src/desktop/util_desktop.h"
2122
#include "database/src/desktop/view/view.h"
2223

@@ -25,7 +26,7 @@ namespace database {
2526
namespace internal {
2627

2728
void InfoListenProvider::StartListening(const QuerySpec& query_spec,
28-
const View* view) {
29+
const Tag& tag, const View* view) {
2930
repo_->scheduler().Schedule([this, query_spec]() {
3031
const Variant& value = VariantGetChild(info_data_, query_spec.path);
3132
if (!VariantIsEmpty(value)) {
@@ -35,7 +36,8 @@ void InfoListenProvider::StartListening(const QuerySpec& query_spec,
3536
});
3637
}
3738

38-
void InfoListenProvider::StopListening(const QuerySpec& query_spec) {}
39+
void InfoListenProvider::StopListening(const QuerySpec& query_spec,
40+
const Tag& tag) {}
3941

4042
} // namespace internal
4143
} // namespace database

database/src/desktop/core/info_listen_provider.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "database/src/desktop/connection/persistent_connection.h"
2020
#include "database/src/desktop/core/listen_provider.h"
2121
#include "database/src/desktop/core/repo.h"
22+
#include "database/src/desktop/core/tag.h"
2223

2324
namespace firebase {
2425
namespace database {
@@ -33,9 +34,10 @@ class InfoListenProvider : public ListenProvider {
3334

3435
void set_sync_tree(SyncTree* sync_tree) { sync_tree_ = sync_tree; }
3536

36-
void StartListening(const QuerySpec& query_spec, const View* view) override;
37+
void StartListening(const QuerySpec& query_spec, const Tag& tag,
38+
const View* view) override;
3739

38-
void StopListening(const QuerySpec& query_spec) override;
40+
void StopListening(const QuerySpec& query_spec, const Tag& tag) override;
3941

4042
private:
4143
Repo* repo_;

database/src/desktop/core/listen_provider.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#define FIREBASE_DATABASE_CLIENT_CPP_SRC_DESKTOP_CORE_LISTEN_PROVIDER_H_
1717

1818
#include "database/src/common/query_spec.h"
19+
#include "database/src/desktop/core/tag.h"
1920
#include "database/src/desktop/view/view.h"
2021

2122
namespace firebase {
@@ -31,11 +32,11 @@ class ListenProvider {
3132
// Begin listening on a location with a set of parameters given by the
3233
// QuerySpec. While listening, the server will send down updates which will be
3334
// parsed and passed along to the SyncTree to be cached locally.
34-
virtual void StartListening(const QuerySpec& query_spec,
35+
virtual void StartListening(const QuerySpec& query_spec, const Tag& tag,
3536
const View* view) = 0;
3637

3738
// Stop listening on a location given by the QuerySpec.
38-
virtual void StopListening(const QuerySpec& query_spec) = 0;
39+
virtual void StopListening(const QuerySpec& query_spec, const Tag& tag) = 0;
3940
};
4041

4142
} // namespace internal

database/src/desktop/core/operation.cc

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,15 @@ namespace database {
2525
namespace internal {
2626

2727
const OperationSource OperationSource::kUser(OperationSource::kSourceUser,
28-
Optional<QueryParams>());
28+
Optional<QueryParams>(), false);
2929
const OperationSource OperationSource::kServer(OperationSource::kSourceServer,
30-
Optional<QueryParams>());
30+
Optional<QueryParams>(), false);
31+
32+
OperationSource OperationSource::ForServerTaggedQuery(
33+
const QueryParams& params) {
34+
return OperationSource(OperationSource::kSourceServer,
35+
Optional<QueryParams>(params), true);
36+
}
3137

3238
Operation Operation::Overwrite(const OperationSource& source, const Path& path,
3339
const Variant& snapshot) {

database/src/desktop/core/operation.h

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#ifndef FIREBASE_DATABASE_CLIENT_CPP_SRC_DESKTOP_CORE_OPERATION_H_
1616
#define FIREBASE_DATABASE_CLIENT_CPP_SRC_DESKTOP_CORE_OPERATION_H_
1717

18+
#include "app/src/assert.h"
1819
#include "app/src/optional.h"
1920
#include "app/src/path.h"
2021
#include "database/src/common/query_spec.h"
@@ -30,21 +31,29 @@ namespace internal {
3031
struct OperationSource {
3132
enum Source { kSourceUser, kSourceServer };
3233

33-
explicit OperationSource(Source _source) : source(_source), query_params() {}
34+
explicit OperationSource(Source _source)
35+
: source(_source), query_params(), tagged(false) {}
3436

3537
explicit OperationSource(const Optional<QueryParams>& _query_params)
36-
: source(kSourceServer), query_params(_query_params) {}
38+
: source(kSourceServer), query_params(_query_params), tagged(false) {}
3739

38-
OperationSource(Source _source, const Optional<QueryParams>& _query_params)
39-
: source(_source), query_params(_query_params) {}
40+
OperationSource(Source _source, const Optional<QueryParams>& _query_params,
41+
bool _tagged)
42+
: source(_source), query_params(_query_params), tagged(_tagged) {
43+
FIREBASE_DEV_ASSERT(!tagged || source == kSourceServer);
44+
}
4045

4146
// Whether this operation originated on the client or the server.
42-
Source source;
47+
const Source source;
4348

4449
// The parameters, if any, that are associated with this operation. This is
4550
// used to determine which View the operation should apply to.
4651
const Optional<QueryParams> query_params;
4752

53+
const bool tagged;
54+
55+
static OperationSource ForServerTaggedQuery(const QueryParams& params);
56+
4857
static const OperationSource kUser;
4958
static const OperationSource kServer;
5059
};

database/src/desktop/core/repo.cc

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -546,11 +546,21 @@ void Repo::OnDataUpdate(const Path& path, const Variant& data, bool is_merge,
546546
SAFE_REFERENCE_RETURN_VOID_IF_INVALID(ThisRefLock, lock, safe_this_);
547547

548548
std::vector<Event> events;
549-
if (is_merge) {
550-
std::map<Path, Variant> changed_children = VariantToPathMap(data);
551-
events = server_sync_tree_->ApplyServerMerge(path, changed_children);
549+
if (tag.has_value()) {
550+
if (is_merge) {
551+
std::map<Path, Variant> changed_children = VariantToPathMap(data);
552+
events =
553+
server_sync_tree_->ApplyTaggedQueryMerge(path, changed_children, tag);
554+
} else {
555+
events = server_sync_tree_->ApplyTaggedQueryOverwrite(path, data, tag);
556+
}
552557
} else {
553-
events = server_sync_tree_->ApplyServerOverwrite(path, data);
558+
if (is_merge) {
559+
std::map<Path, Variant> changed_children = VariantToPathMap(data);
560+
events = server_sync_tree_->ApplyServerMerge(path, changed_children);
561+
} else {
562+
events = server_sync_tree_->ApplyServerOverwrite(path, data);
563+
}
554564
}
555565
if (events.size() > 0) {
556566
// Since we have a listener outstanding for each transaction, receiving any

database/src/desktop/core/sync_tree.cc

Lines changed: 147 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "database/src/desktop/core/listen_provider.h"
3131
#include "database/src/desktop/core/server_values.h"
3232
#include "database/src/desktop/core/sync_point.h"
33+
#include "database/src/desktop/core/tag.h"
3334
#include "database/src/desktop/core/tree.h"
3435
#include "database/src/desktop/util_desktop.h"
3536
#include "database/src/desktop/view/event.h"
@@ -190,6 +191,14 @@ std::vector<Event> SyncTree::AddEventRegistration(
190191
// Now that we have the sync point, see if there is an existing view of the
191192
// database, and if there isn't, then set one up.
192193
bool view_already_exists = sync_point->ViewExistsForQuery(query_spec);
194+
if (!view_already_exists && !QuerySpecLoadsAllData(query_spec)) {
195+
// We need to track a tag for this query
196+
FIREBASE_DEV_ASSERT_MESSAGE(!query_spec_to_tag_map_.count(query_spec),
197+
"View does not exist but we have a tag");
198+
Tag tag = GetNextQueryTag();
199+
query_spec_to_tag_map_[query_spec] = tag;
200+
tag_to_query_spec_map_[*tag] = query_spec;
201+
}
193202
WriteTreeRef writes_cache = pending_write_tree_->ChildWrites(path);
194203
events = sync_point->AddEventRegistration(Move(event_registration),
195204
writes_cache, server_cache,
@@ -203,6 +212,86 @@ std::vector<Event> SyncTree::AddEventRegistration(
203212
return events;
204213
}
205214

215+
// Apply a listen complete to a path.
216+
std::vector<Event> SyncTree::ApplyTaggedListenComplete(const Tag& tag) {
217+
std::vector<Event> results;
218+
persistence_manager_->RunInTransaction([&, this]() -> bool {
219+
const QuerySpec* query_spec = this->QuerySpecForTag(tag);
220+
if (query_spec != nullptr) {
221+
this->persistence_manager_->SetQueryComplete(*query_spec);
222+
Operation op = Operation::ListenComplete(
223+
OperationSource::ForServerTaggedQuery(query_spec->params), Path());
224+
results = this->ApplyTaggedOperation(*query_spec, op);
225+
return true;
226+
} else {
227+
// We've already removed the query. No big deal, ignore the update
228+
return true;
229+
}
230+
});
231+
return results;
232+
}
233+
234+
std::vector<Event> SyncTree::ApplyTaggedOperation(const QuerySpec& query_spec,
235+
const Operation& operation) {
236+
const Path& query_path = query_spec.path;
237+
SyncPoint* sync_point = sync_point_tree_.GetValueAt(query_path);
238+
FIREBASE_DEV_ASSERT_MESSAGE(
239+
sync_point != nullptr,
240+
"Missing sync point for query tag that we're tracking");
241+
WriteTreeRef writes_cache = pending_write_tree_->ChildWrites(query_path);
242+
return sync_point->ApplyOperation(operation, writes_cache, nullptr,
243+
persistence_manager_.get());
244+
}
245+
246+
// Apply new server data for the specified tagged query.
247+
std::vector<Event> SyncTree::ApplyTaggedQueryOverwrite(const Path& path,
248+
const Variant& snap,
249+
const Tag& tag) {
250+
std::vector<Event> results;
251+
persistence_manager_->RunInTransaction([&, this]() -> bool {
252+
const QuerySpec* query_spec = this->QuerySpecForTag(tag);
253+
if (query_spec != nullptr) {
254+
Optional<Path> relative_path = Path::GetRelative(query_spec->path, path);
255+
QuerySpec query_to_overwrite =
256+
relative_path->empty() ? *query_spec : QuerySpec(path);
257+
this->persistence_manager_->UpdateServerCache(query_to_overwrite, snap);
258+
Operation op = Operation::Overwrite(
259+
OperationSource::ForServerTaggedQuery(query_spec->params),
260+
*relative_path, snap);
261+
results = this->ApplyTaggedOperation(*query_spec, op);
262+
return true;
263+
} else {
264+
// Query must have been removed already
265+
return true;
266+
}
267+
});
268+
return results;
269+
}
270+
271+
std::vector<Event> SyncTree::ApplyTaggedQueryMerge(
272+
const Path& path, const std::map<Path, Variant>& changed_children,
273+
const Tag& tag) {
274+
std::vector<Event> results;
275+
persistence_manager_->RunInTransaction([&, this]() -> bool {
276+
const QuerySpec* query_spec = QuerySpecForTag(tag);
277+
if (query_spec != nullptr) {
278+
Optional<Path> relative_path = Path::GetRelative(query_spec->path, path);
279+
FIREBASE_DEV_ASSERT(relative_path.has_value());
280+
CompoundWrite merge = CompoundWrite::FromPathMerge(changed_children);
281+
this->persistence_manager_->UpdateServerCache(path, merge);
282+
Operation op = Operation::Merge(
283+
OperationSource::ForServerTaggedQuery(query_spec->params),
284+
*relative_path, merge);
285+
results = ApplyTaggedOperation(*query_spec, op);
286+
return true;
287+
} else {
288+
// We've already removed the query. No big deal, ignore the update
289+
return true;
290+
}
291+
});
292+
return results;
293+
}
294+
206295
std::vector<Event> SyncTree::ApplyListenComplete(const Path& path) {
207296
std::vector<Event> results;
208297
persistence_manager_->RunInTransaction([&, this]() -> bool {
@@ -457,30 +546,40 @@ static QuerySpec QuerySpecForListening(const QuerySpec& query_spec) {
457546

458547
void SyncTree::SetupListener(const QuerySpec& query_spec, const View* view) {
459548
const Path& path = query_spec.path;
460-
listen_provider_->StartListening(QuerySpecForListening(query_spec), view);
549+
const Tag& tag = TagForQuerySpec(query_spec);
550+
listen_provider_->StartListening(QuerySpecForListening(query_spec), tag,
551+
view);
461552

462553
Tree<SyncPoint>* subtree = sync_point_tree_.GetChild(path);
463554

464555
// The root of this subtree has our query. We're here because we definitely
465556
// need to send a listen for that, but we may need to shadow other listens
466557
// as well.
467-
468-
// Shadow everything at or below this location, this is a default listener.
469-
subtree->CallOnEach(path, [this](const Path& relative_path,
470-
const SyncPoint& child_sync_point) {
471-
if (!relative_path.empty() && child_sync_point.HasCompleteView()) {
472-
const QuerySpec& query_spec =
473-
child_sync_point.GetCompleteView()->query_spec();
474-
listen_provider_->StopListening(MakeDefaultQuerySpec(query_spec));
475-
} else {
476-
// No default listener here.
477-
for (const View* sync_point_view :
478-
child_sync_point.GetIncompleteQueryViews()) {
479-
const QuerySpec& child_query_spec = sync_point_view->query_spec();
480-
listen_provider_->StopListening(MakeDefaultQuerySpec(child_query_spec));
558+
if (tag.has_value()) {
559+
FIREBASE_DEV_ASSERT_MESSAGE(
560+
!subtree->value()->HasCompleteView(),
561+
"If we're adding a query, it shouldn't be shadowed");
562+
} else {
563+
// Shadow everything at or below this location, this is a default listener.
564+
subtree->CallOnEach(path, [this](const Path& relative_path,
565+
const SyncPoint& child_sync_point) {
566+
if (!relative_path.empty() && child_sync_point.HasCompleteView()) {
567+
const QuerySpec& query_spec =
568+
child_sync_point.GetCompleteView()->query_spec();
569+
listen_provider_->StopListening(QuerySpecForListening(query_spec),
570+
TagForQuerySpec(query_spec));
571+
} else {
572+
// No default listener here.
573+
for (const View* sync_point_view :
574+
child_sync_point.GetIncompleteQueryViews()) {
575+
const QuerySpec& child_query_spec = sync_point_view->query_spec();
576+
listen_provider_->StopListening(
577+
QuerySpecForListening(child_query_spec),
578+
TagForQuerySpec(child_query_spec));
579+
}
481580
}
482-
}
483-
});
581+
});
582+
}
484583
}
485584

486585
static void CollectDistinctViewsForSubTree(Tree<SyncPoint>* subtree,
@@ -561,7 +660,7 @@ std::vector<Event> SyncTree::RemoveEventRegistration(
561660
for (const View* view : new_views) {
562661
QuerySpec new_query = view->query_spec();
563662
listen_provider_->StartListening(QuerySpecForListening(new_query),
564-
view);
663+
TagForQuerySpec(new_query), view);
565664
}
566665
} else {
567666
// There's nothing below us, so nothing we need to start listening on
@@ -578,14 +677,19 @@ std::vector<Event> SyncTree::RemoveEventRegistration(
578677
// other queries here. Just cancel the one default. Otherwise, we need
579678
// to iterate through and cancel each individual query
580679
if (removing_default) {
581-
listen_provider_->StopListening(QuerySpecForListening(query_spec));
680+
listen_provider_->StopListening(QuerySpecForListening(query_spec),
681+
Tag());
582682
} else {
583683
for (QuerySpec query_to_remove : removed) {
684+
Tag tag = TagForQuerySpec(query_to_remove);
685+
FIREBASE_DEV_ASSERT(tag.has_value());
584686
listen_provider_->StopListening(
585-
QuerySpecForListening(query_to_remove));
687+
QuerySpecForListening(query_to_remove), tag);
586688
}
587689
}
588690
}
691+
// Now, clear all of the tags we're tracking for the removed listens.
692+
RemoveTags(removed);
589693
} else {
590694
// No-op, this listener must've been already removed.
591695
}
@@ -594,6 +698,29 @@ std::vector<Event> SyncTree::RemoveEventRegistration(
594698
return cancel_events;
595699
}
596700

701+
void SyncTree::RemoveTags(const std::vector<QuerySpec>& queries) {
702+
for (const QuerySpec& removed_query : queries) {
703+
if (!QuerySpecLoadsAllData(removed_query)) {
704+
// We should have a tag for this
705+
Tag tag = TagForQuerySpec(removed_query);
706+
FIREBASE_DEV_ASSERT(tag.has_value());
707+
query_spec_to_tag_map_.erase(removed_query);
708+
tag_to_query_spec_map_.erase(*tag);
709+
}
710+
}
711+
}
712+
713+
const QuerySpec* SyncTree::QuerySpecForTag(const Tag& tag) {
714+
return MapGet(&tag_to_query_spec_map_, *tag);
715+
}
716+
717+
Tag SyncTree::TagForQuerySpec(const QuerySpec& query_spec) {
718+
const Tag* tag_ptr = MapGet(&query_spec_to_tag_map_, query_spec);
719+
return tag_ptr ? *tag_ptr : Tag();
720+
}
721+
722+
Tag SyncTree::GetNextQueryTag() { return Tag(next_query_tag_++); }
723+
597724
} // namespace internal
598725
} // namespace database
599726
} // namespace firebase

0 commit comments

Comments
 (0)