Skip to content

Commit cd6d0f1

Browse files
authored
Implement limbo resolution listen throttling (#5310)
1 parent f355ead commit cd6d0f1

File tree

8 files changed

+192
-76
lines changed

8 files changed

+192
-76
lines changed

Firestore/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
# Unreleased
2+
- [changed] Firestore now limits the number of concurrent document lookups it
3+
will perform when resolving inconsistencies in the local cache
4+
(https://github.com/firebase/firebase-js-sdk/issues/2683).
25

36
# v1.11.3
47
- [changed] Internal changes.

Firestore/Example/Tests/SpecTests/FSTSpecTests.mm

Lines changed: 69 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919
#import <FirebaseFirestore/FIRFirestoreErrors.h>
2020

2121
#include <algorithm>
22+
#include <cstddef>
23+
#include <limits>
2224
#include <map>
2325
#include <memory>
26+
#include <set>
2427
#include <string>
2528
#include <unordered_map>
2629
#include <utility>
@@ -152,16 +155,24 @@ ByteString MakeResumeToken(NSString *specString) {
152155
return MakeByteString([specString dataUsingEncoding:NSUTF8StringEncoding]);
153156
}
154157

155-
NSString *ToDocumentListString(const std::map<DocumentKey, TargetId> &map) {
158+
NSString *ToDocumentListString(const std::set<DocumentKey> &keys) {
156159
std::vector<std::string> strings;
157-
strings.reserve(map.size());
158-
for (const auto &kv : map) {
159-
strings.push_back(kv.first.ToString());
160+
strings.reserve(keys.size());
161+
for (const auto &key : keys) {
162+
strings.push_back(key.ToString());
160163
}
161164
std::sort(strings.begin(), strings.end());
162165
return MakeNSString(absl::StrJoin(strings, ", "));
163166
}
164167

168+
NSString *ToDocumentListString(const std::map<DocumentKey, TargetId> &map) {
169+
std::set<DocumentKey> keys;
170+
for (const auto &kv : map) {
171+
keys.insert(kv.first);
172+
}
173+
return ToDocumentListString(keys);
174+
}
175+
165176
NSString *ToTargetIdListString(const ActiveTargetMap &map) {
166177
std::vector<model::TargetId> targetIds;
167178
targetIds.reserve(map.size());
@@ -181,6 +192,7 @@ @interface FSTSpecTests ()
181192

182193
@implementation FSTSpecTests {
183194
BOOL _gcEnabled;
195+
size_t _maxConcurrentLimboResolutions;
184196
BOOL _networkEnabled;
185197
FSTUserDataConverter *_converter;
186198
}
@@ -212,12 +224,20 @@ - (void)setUpForSpecWithConfig:(NSDictionary *)config {
212224
// Store GCEnabled so we can re-use it in doRestart.
213225
NSNumber *GCEnabled = config[@"useGarbageCollection"];
214226
_gcEnabled = [GCEnabled boolValue];
227+
NSNumber *maxConcurrentLimboResolutions = config[@"maxConcurrentLimboResolutions"];
228+
_maxConcurrentLimboResolutions = (maxConcurrentLimboResolutions == nil)
229+
? std::numeric_limits<size_t>::max()
230+
: maxConcurrentLimboResolutions.unsignedIntValue;
215231
NSNumber *numClients = config[@"numClients"];
216232
if (numClients) {
217233
XCTAssertEqualObjects(numClients, @1, @"The iOS client does not support multi-client tests");
218234
}
219235
std::unique_ptr<Persistence> persistence = [self persistenceWithGCEnabled:_gcEnabled];
220-
self.driver = [[FSTSyncEngineTestDriver alloc] initWithPersistence:std::move(persistence)];
236+
self.driver =
237+
[[FSTSyncEngineTestDriver alloc] initWithPersistence:std::move(persistence)
238+
initialUser:User::Unauthenticated()
239+
outstandingWrites:{}
240+
maxConcurrentLimboResolutions:_maxConcurrentLimboResolutions];
221241
[self.driver start];
222242
}
223243

@@ -522,9 +542,11 @@ - (void)doRestart {
522542
[self.driver shutdown];
523543

524544
std::unique_ptr<Persistence> persistence = [self persistenceWithGCEnabled:_gcEnabled];
525-
self.driver = [[FSTSyncEngineTestDriver alloc] initWithPersistence:std::move(persistence)
526-
initialUser:currentUser
527-
outstandingWrites:outstandingWrites];
545+
self.driver =
546+
[[FSTSyncEngineTestDriver alloc] initWithPersistence:std::move(persistence)
547+
initialUser:currentUser
548+
outstandingWrites:outstandingWrites
549+
maxConcurrentLimboResolutions:_maxConcurrentLimboResolutions];
528550
[self.driver start];
529551
}
530552

@@ -689,9 +711,18 @@ - (void)validateExpectedState:(nullable NSDictionary *)expectedState {
689711
for (NSString *name in docNames) {
690712
expectedActiveLimboDocuments = expectedActiveLimboDocuments.insert(FSTTestDocKey(name));
691713
}
692-
// Update the expected limbo documents
714+
// Update the expected active limbo documents
693715
[self.driver setExpectedActiveLimboDocuments:std::move(expectedActiveLimboDocuments)];
694716
}
717+
if (expectedState[@"enqueuedLimboDocs"]) {
718+
DocumentKeySet expectedEnqueuedLimboDocuments;
719+
NSArray *docNames = expectedState[@"enqueuedLimboDocs"];
720+
for (NSString *name in docNames) {
721+
expectedEnqueuedLimboDocuments = expectedEnqueuedLimboDocuments.insert(FSTTestDocKey(name));
722+
}
723+
// Update the expected enqueued limbo documents
724+
[self.driver setExpectedEnqueuedLimboDocuments:std::move(expectedEnqueuedLimboDocuments)];
725+
}
695726
if (expectedState[@"activeTargets"]) {
696727
__block ActiveTargetMap expectedActiveTargets;
697728
[expectedState[@"activeTargets"]
@@ -719,7 +750,8 @@ - (void)validateExpectedState:(nullable NSDictionary *)expectedState {
719750
// Always validate the we received the expected number of callbacks.
720751
[self validateUserCallbacks:expectedState];
721752
// Always validate that the expected limbo docs match the actual limbo docs.
722-
[self validateLimboDocuments];
753+
[self validateActiveLimboDocuments];
754+
[self validateEnqueuedLimboDocuments];
723755
// Always validate that the expected active targets match the actual active targets.
724756
[self validateActiveTargets];
725757
}
@@ -744,9 +776,9 @@ - (void)validateUserCallbacks:(nullable NSDictionary *)expected {
744776
}
745777
}
746778

747-
- (void)validateLimboDocuments {
779+
- (void)validateActiveLimboDocuments {
748780
// Make a copy so it can modified while checking against the expected limbo docs.
749-
std::map<DocumentKey, TargetId> actualLimboDocs = self.driver.currentLimboDocuments;
781+
std::map<DocumentKey, TargetId> actualLimboDocs = self.driver.activeLimboDocumentResolutions;
750782

751783
// Validate that each active limbo doc has an expected active target
752784
for (const auto &kv : actualLimboDocs) {
@@ -767,6 +799,31 @@ - (void)validateLimboDocuments {
767799
ToDocumentListString(actualLimboDocs));
768800
}
769801

802+
- (void)validateEnqueuedLimboDocuments {
803+
std::set<DocumentKey> actualLimboDocs;
804+
for (const auto &key : self.driver.enqueuedLimboDocumentResolutions) {
805+
actualLimboDocs.insert(key);
806+
}
807+
std::set<DocumentKey> expectedLimboDocs;
808+
for (const auto &key : self.driver.expectedEnqueuedLimboDocuments) {
809+
expectedLimboDocs.insert(key);
810+
}
811+
812+
for (const auto &key : actualLimboDocs) {
813+
XCTAssertTrue(expectedLimboDocs.find(key) != expectedLimboDocs.end(),
814+
@"Found enqueued limbo doc %s, but it was not in the set of "
815+
@"expected enqueued limbo documents (%@)",
816+
key.ToString().c_str(), ToDocumentListString(expectedLimboDocs));
817+
}
818+
819+
for (const auto &key : expectedLimboDocs) {
820+
XCTAssertTrue(actualLimboDocs.find(key) != actualLimboDocs.end(),
821+
@"Expected doc %s to be enqueued for limbo resolution, "
822+
@"but it was not in the queue (%@)",
823+
key.ToString().c_str(), ToDocumentListString(actualLimboDocs));
824+
}
825+
}
826+
770827
- (void)validateActiveTargets {
771828
if (!_networkEnabled) {
772829
return;

Firestore/Example/Tests/SpecTests/FSTSyncEngineTestDriver.h

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
#import <Foundation/Foundation.h>
1818

19+
#include <cstddef>
20+
#include <deque>
1921
#include <map>
2022
#include <memory>
2123
#include <unordered_map>
@@ -115,12 +117,6 @@ typedef std::unordered_map<auth::User, NSMutableArray<FSTOutstandingWrite *> *,
115117
*/
116118
@interface FSTSyncEngineTestDriver : NSObject
117119

118-
/**
119-
* Initializes the underlying FSTSyncEngine with the given local persistence implementation and
120-
* garbage collection policy.
121-
*/
122-
- (instancetype)initWithPersistence:(std::unique_ptr<local::Persistence>)persistence;
123-
124120
/**
125121
* Initializes the underlying FSTSyncEngine with the given local persistence implementation and
126122
* a set of existing outstandingWrites (useful when your Persistence object has persisted
@@ -129,7 +125,7 @@ typedef std::unordered_map<auth::User, NSMutableArray<FSTOutstandingWrite *> *,
129125
- (instancetype)initWithPersistence:(std::unique_ptr<local::Persistence>)persistence
130126
initialUser:(const auth::User &)initialUser
131127
outstandingWrites:(const FSTOutstandingWriteQueues &)outstandingWrites
132-
NS_DESIGNATED_INITIALIZER;
128+
maxConcurrentLimboResolutions:(size_t)maxConcurrentLimboResolutions NS_DESIGNATED_INITIALIZER;
133129

134130
- (instancetype)init NS_UNAVAILABLE;
135131

@@ -289,15 +285,24 @@ typedef std::unordered_map<auth::User, NSMutableArray<FSTOutstandingWrite *> *,
289285
*/
290286
- (NSArray<NSString *> *)capturedRejectedWritesSinceLastCall;
291287

292-
/** The current set of documents in limbo. */
293-
- (std::map<model::DocumentKey, model::TargetId>)currentLimboDocuments;
288+
/** The current set of documents in limbo with active targets. */
289+
- (std::map<model::DocumentKey, model::TargetId>)activeLimboDocumentResolutions;
290+
291+
/** The current set of documents in limbo that are enqueued for resolution. */
292+
- (std::deque<model::DocumentKey>)enqueuedLimboDocumentResolutions;
294293

295294
/** The expected set of documents in limbo with an active target. */
296295
- (const model::DocumentKeySet &)expectedActiveLimboDocuments;
297296

298297
/** Sets the expected set of documents in limbo with an active target. */
299298
- (void)setExpectedActiveLimboDocuments:(model::DocumentKeySet)docs;
300299

300+
/** The expected set of documents in limbo that are enqueued for resolution. */
301+
- (const model::DocumentKeySet &)expectedEnqueuedLimboDocuments;
302+
303+
/** Sets the expected set of documents in limbo that are enqueued for resolution. */
304+
- (void)setExpectedEnqueuedLimboDocuments:(model::DocumentKeySet)docs;
305+
301306
/**
302307
* The writes that have been sent to the FSTSyncEngine via writeUserMutation: but not yet
303308
* acknowledged by calling receiveWriteAck/Error:. They are tracked per-user.

Firestore/Example/Tests/SpecTests/FSTSyncEngineTestDriver.mm

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
#import <FirebaseFirestore/FIRFirestoreErrors.h>
2020

21+
#include <cstddef>
2122
#include <map>
2223
#include <memory>
2324
#include <string>
@@ -154,6 +155,8 @@ @interface FSTSyncEngineTestDriver ()
154155
@end
155156

156157
@implementation FSTSyncEngineTestDriver {
158+
size_t _maxConcurrentLimboResolutions;
159+
157160
std::unique_ptr<Persistence> _persistence;
158161

159162
std::unique_ptr<LocalStore> _localStore;
@@ -173,6 +176,7 @@ @implementation FSTSyncEngineTestDriver {
173176
// ivar is declared as mutable.
174177
std::unordered_map<User, NSMutableArray<FSTOutstandingWrite *> *, HashUser> _outstandingWrites;
175178
DocumentKeySet _expectedActiveLimboDocuments;
179+
DocumentKeySet _expectedEnqueuedLimboDocuments;
176180

177181
/** A dictionary for tracking the listens on queries. */
178182
std::unordered_map<Query, std::shared_ptr<QueryListener>> _queryListeners;
@@ -188,16 +192,13 @@ @implementation FSTSyncEngineTestDriver {
188192
int _snapshotsInSyncEvents;
189193
}
190194

191-
- (instancetype)initWithPersistence:(std::unique_ptr<Persistence>)persistence {
192-
return [self initWithPersistence:std::move(persistence)
193-
initialUser:User::Unauthenticated()
194-
outstandingWrites:{}];
195-
}
196-
197195
- (instancetype)initWithPersistence:(std::unique_ptr<Persistence>)persistence
198196
initialUser:(const User &)initialUser
199-
outstandingWrites:(const FSTOutstandingWriteQueues &)outstandingWrites {
197+
outstandingWrites:(const FSTOutstandingWriteQueues &)outstandingWrites
198+
maxConcurrentLimboResolutions:(size_t)maxConcurrentLimboResolutions {
200199
if (self = [super init]) {
200+
_maxConcurrentLimboResolutions = maxConcurrentLimboResolutions;
201+
201202
// Do a deep copy.
202203
for (const auto &pair : outstandingWrites) {
203204
_outstandingWrites[pair.first] = [pair.second mutableCopy];
@@ -219,7 +220,8 @@ - (instancetype)initWithPersistence:(std::unique_ptr<Persistence>)persistence
219220
[self](OnlineState onlineState) { _syncEngine->HandleOnlineStateChange(onlineState); });
220221
;
221222

222-
_syncEngine = absl::make_unique<SyncEngine>(_localStore.get(), _remoteStore.get(), initialUser);
223+
_syncEngine = absl::make_unique<SyncEngine>(_localStore.get(), _remoteStore.get(), initialUser,
224+
_maxConcurrentLimboResolutions);
223225
_remoteStore->set_sync_engine(_syncEngine.get());
224226
_eventManager.Init(_syncEngine.get());
225227

@@ -251,6 +253,14 @@ - (void)setExpectedActiveLimboDocuments:(DocumentKeySet)docs {
251253
_expectedActiveLimboDocuments = std::move(docs);
252254
}
253255

256+
- (const DocumentKeySet &)expectedEnqueuedLimboDocuments {
257+
return _expectedEnqueuedLimboDocuments;
258+
}
259+
260+
- (void)setExpectedEnqueuedLimboDocuments:(DocumentKeySet)docs {
261+
_expectedEnqueuedLimboDocuments = std::move(docs);
262+
}
263+
254264
- (void)drainQueue {
255265
_workerQueue->EnqueueBlocking([] {});
256266
}
@@ -472,8 +482,12 @@ - (void)receiveWatchStreamError:(int)errorCode userInfo:(NSDictionary<NSString *
472482
});
473483
}
474484

475-
- (std::map<DocumentKey, TargetId>)currentLimboDocuments {
476-
return _syncEngine->GetCurrentLimboDocuments();
485+
- (std::map<DocumentKey, TargetId>)activeLimboDocumentResolutions {
486+
return _syncEngine->GetActiveLimboDocumentResolutions();
487+
}
488+
489+
- (std::deque<DocumentKey>)enqueuedLimboDocumentResolutions {
490+
return _syncEngine->GetEnqueuedLimboDocumentResolutions();
477491
}
478492

479493
- (const std::unordered_map<TargetId, TargetData> &)activeTargets {

Firestore/Example/Tests/SpecTests/json/limbo_spec_test.json

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4656,8 +4656,6 @@
46564656
"describeName": "Limbo Documents:",
46574657
"itName": "Limbo resolution throttling when a limbo listen is rejected.",
46584658
"tags": [
4659-
"no-android",
4660-
"no-ios"
46614659
],
46624660
"config": {
46634661
"maxConcurrentLimboResolutions": 1,
@@ -4992,8 +4990,6 @@
49924990
"describeName": "Limbo Documents:",
49934991
"itName": "Limbo resolution throttling with all results at once from watch",
49944992
"tags": [
4995-
"no-android",
4996-
"no-ios"
49974993
],
49984994
"config": {
49994995
"maxConcurrentLimboResolutions": 2,
@@ -5571,8 +5567,6 @@
55715567
"describeName": "Limbo Documents:",
55725568
"itName": "Limbo resolution throttling with existence filter mismatch",
55735569
"tags": [
5574-
"no-android",
5575-
"no-ios"
55765570
],
55775571
"config": {
55785572
"maxConcurrentLimboResolutions": 2,
@@ -6185,8 +6179,6 @@
61856179
"describeName": "Limbo Documents:",
61866180
"itName": "Limbo resolution throttling with results one at a time from watch",
61876181
"tags": [
6188-
"no-android",
6189-
"no-ios"
61906182
],
61916183
"config": {
61926184
"maxConcurrentLimboResolutions": 2,

Firestore/core/src/firebase/firestore/core/firestore_client.cc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ using util::StatusOrCallback;
100100
using util::ThrowIllegalState;
101101
using util::TimerId;
102102

103+
static const size_t kMaxConcurrentLimboResolutions = 100;
104+
103105
std::shared_ptr<FirestoreClient> FirestoreClient::Create(
104106
const DatabaseInfo& database_info,
105107
const api::Settings& settings,
@@ -202,8 +204,9 @@ void FirestoreClient::Initialize(const User& user, const Settings& settings) {
202204
weak_this.lock()->sync_engine_->HandleOnlineStateChange(online_state);
203205
});
204206

205-
sync_engine_ = absl::make_unique<SyncEngine>(local_store_.get(),
206-
remote_store_.get(), user);
207+
sync_engine_ =
208+
absl::make_unique<SyncEngine>(local_store_.get(), remote_store_.get(),
209+
user, kMaxConcurrentLimboResolutions);
207210

208211
event_manager_ = absl::make_unique<EventManager>(sync_engine_.get());
209212

0 commit comments

Comments
 (0)