Skip to content

Implement limbo resolution listen throttling #5310

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Apr 15, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Firestore/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# Unreleased
- [changed] Firestore now limits the number of concurrent document lookups it
will perform when resolving inconsistencies in the local cache (#5310).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This really should link to the issue rather than the PR--it doesn't matter that the issue was in another repository. See below for a what a link looks like to an issue in firebase-js-sdk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Changed the PR number to the link to the issue firebase/firebase-js-sdk#449.


# v1.11.3
- [changed] Internal changes.
Expand Down
81 changes: 69 additions & 12 deletions Firestore/Example/Tests/SpecTests/FSTSpecTests.mm
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
#import <FirebaseFirestore/FIRFirestoreErrors.h>

#include <algorithm>
#include <cstddef>
#include <limits>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <unordered_map>
#include <utility>
Expand Down Expand Up @@ -152,16 +155,24 @@ ByteString MakeResumeToken(NSString *specString) {
return MakeByteString([specString dataUsingEncoding:NSUTF8StringEncoding]);
}

NSString *ToDocumentListString(const std::map<DocumentKey, TargetId> &map) {
NSString *ToDocumentListString(const std::set<DocumentKey> &keys) {
std::vector<std::string> strings;
strings.reserve(map.size());
for (const auto &kv : map) {
strings.push_back(kv.first.ToString());
strings.reserve(keys.size());
for (const auto &key : keys) {
strings.push_back(key.ToString());
}
std::sort(strings.begin(), strings.end());
return MakeNSString(absl::StrJoin(strings, ", "));
}

NSString *ToDocumentListString(const std::map<DocumentKey, TargetId> &map) {
std::set<DocumentKey> keys;
for (const auto &kv : map) {
keys.insert(kv.first);
}
return ToDocumentListString(keys);
}

NSString *ToTargetIdListString(const ActiveTargetMap &map) {
std::vector<model::TargetId> targetIds;
targetIds.reserve(map.size());
Expand All @@ -181,6 +192,7 @@ @interface FSTSpecTests ()

@implementation FSTSpecTests {
BOOL _gcEnabled;
size_t _maxConcurrentLimboResolutions;
BOOL _networkEnabled;
FSTUserDataConverter *_converter;
}
Expand Down Expand Up @@ -212,12 +224,20 @@ - (void)setUpForSpecWithConfig:(NSDictionary *)config {
// Store GCEnabled so we can re-use it in doRestart.
NSNumber *GCEnabled = config[@"useGarbageCollection"];
_gcEnabled = [GCEnabled boolValue];
NSNumber *maxConcurrentLimboResolutions = config[@"maxConcurrentLimboResolutions"];
_maxConcurrentLimboResolutions = (maxConcurrentLimboResolutions == nil)
? std::numeric_limits<size_t>::max()
: maxConcurrentLimboResolutions.unsignedIntValue;
NSNumber *numClients = config[@"numClients"];
if (numClients) {
XCTAssertEqualObjects(numClients, @1, @"The iOS client does not support multi-client tests");
}
std::unique_ptr<Persistence> persistence = [self persistenceWithGCEnabled:_gcEnabled];
self.driver = [[FSTSyncEngineTestDriver alloc] initWithPersistence:std::move(persistence)];
self.driver =
[[FSTSyncEngineTestDriver alloc] initWithPersistence:std::move(persistence)
initialUser:User::Unauthenticated()
outstandingWrites:{}
maxConcurrentLimboResolutions:_maxConcurrentLimboResolutions];
[self.driver start];
}

Expand Down Expand Up @@ -522,9 +542,11 @@ - (void)doRestart {
[self.driver shutdown];

std::unique_ptr<Persistence> persistence = [self persistenceWithGCEnabled:_gcEnabled];
self.driver = [[FSTSyncEngineTestDriver alloc] initWithPersistence:std::move(persistence)
initialUser:currentUser
outstandingWrites:outstandingWrites];
self.driver =
[[FSTSyncEngineTestDriver alloc] initWithPersistence:std::move(persistence)
initialUser:currentUser
outstandingWrites:outstandingWrites
maxConcurrentLimboResolutions:_maxConcurrentLimboResolutions];
[self.driver start];
}

Expand Down Expand Up @@ -689,9 +711,18 @@ - (void)validateExpectedState:(nullable NSDictionary *)expectedState {
for (NSString *name in docNames) {
expectedActiveLimboDocuments = expectedActiveLimboDocuments.insert(FSTTestDocKey(name));
}
// Update the expected limbo documents
// Update the expected active limbo documents
[self.driver setExpectedActiveLimboDocuments:std::move(expectedActiveLimboDocuments)];
}
if (expectedState[@"enqueuedLimboDocs"]) {
DocumentKeySet expectedEnqueuedLimboDocuments;
NSArray *docNames = expectedState[@"enqueuedLimboDocs"];
for (NSString *name in docNames) {
expectedEnqueuedLimboDocuments = expectedEnqueuedLimboDocuments.insert(FSTTestDocKey(name));
}
// Update the expected enqueued limbo documents
[self.driver setExpectedEnqueuedLimboDocuments:std::move(expectedEnqueuedLimboDocuments)];
}
if (expectedState[@"activeTargets"]) {
__block ActiveTargetMap expectedActiveTargets;
[expectedState[@"activeTargets"]
Expand Down Expand Up @@ -719,7 +750,8 @@ - (void)validateExpectedState:(nullable NSDictionary *)expectedState {
// Always validate the we received the expected number of callbacks.
[self validateUserCallbacks:expectedState];
// Always validate that the expected limbo docs match the actual limbo docs.
[self validateLimboDocuments];
[self validateActiveLimboDocuments];
[self validateEnqueuedLimboDocuments];
// Always validate that the expected active targets match the actual active targets.
[self validateActiveTargets];
}
Expand All @@ -744,9 +776,9 @@ - (void)validateUserCallbacks:(nullable NSDictionary *)expected {
}
}

- (void)validateLimboDocuments {
- (void)validateActiveLimboDocuments {
// Make a copy so it can modified while checking against the expected limbo docs.
std::map<DocumentKey, TargetId> actualLimboDocs = self.driver.currentLimboDocuments;
std::map<DocumentKey, TargetId> actualLimboDocs = self.driver.activeLimboDocumentResolutions;

// Validate that each active limbo doc has an expected active target
for (const auto &kv : actualLimboDocs) {
Expand All @@ -767,6 +799,31 @@ - (void)validateLimboDocuments {
ToDocumentListString(actualLimboDocs));
}

- (void)validateEnqueuedLimboDocuments {
std::set<DocumentKey> actualLimboDocs;
for (const auto &key : self.driver.enqueuedLimboDocumentResolutions) {
actualLimboDocs.insert(key);
}
std::set<DocumentKey> expectedLimboDocs;
for (const auto &key : self.driver.expectedEnqueuedLimboDocuments) {
expectedLimboDocs.insert(key);
}

for (const auto &key : actualLimboDocs) {
XCTAssertTrue(expectedLimboDocs.find(key) != expectedLimboDocs.end(),
@"Found enqueued limbo doc %s, but it was not in the set of "
@"expected enqueued limbo documents (%@)",
key.ToString().c_str(), ToDocumentListString(expectedLimboDocs));
}

for (const auto &key : expectedLimboDocs) {
XCTAssertTrue(actualLimboDocs.find(key) != actualLimboDocs.end(),
@"Expected doc %s to be enqueued for limbo resolution, "
@"but it was not in the queue (%@)",
key.ToString().c_str(), ToDocumentListString(actualLimboDocs));
}
}

- (void)validateActiveTargets {
if (!_networkEnabled) {
return;
Expand Down
23 changes: 14 additions & 9 deletions Firestore/Example/Tests/SpecTests/FSTSyncEngineTestDriver.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#import <Foundation/Foundation.h>

#include <cstddef>
#include <deque>
#include <map>
#include <memory>
#include <unordered_map>
Expand Down Expand Up @@ -115,12 +117,6 @@ typedef std::unordered_map<auth::User, NSMutableArray<FSTOutstandingWrite *> *,
*/
@interface FSTSyncEngineTestDriver : NSObject

/**
* Initializes the underlying FSTSyncEngine with the given local persistence implementation and
* garbage collection policy.
*/
- (instancetype)initWithPersistence:(std::unique_ptr<local::Persistence>)persistence;

/**
* Initializes the underlying FSTSyncEngine with the given local persistence implementation and
* a set of existing outstandingWrites (useful when your Persistence object has persisted
Expand All @@ -129,7 +125,7 @@ typedef std::unordered_map<auth::User, NSMutableArray<FSTOutstandingWrite *> *,
- (instancetype)initWithPersistence:(std::unique_ptr<local::Persistence>)persistence
initialUser:(const auth::User &)initialUser
outstandingWrites:(const FSTOutstandingWriteQueues &)outstandingWrites
NS_DESIGNATED_INITIALIZER;
maxConcurrentLimboResolutions:(size_t)maxConcurrentLimboResolutions NS_DESIGNATED_INITIALIZER;

- (instancetype)init NS_UNAVAILABLE;

Expand Down Expand Up @@ -289,15 +285,24 @@ typedef std::unordered_map<auth::User, NSMutableArray<FSTOutstandingWrite *> *,
*/
- (NSArray<NSString *> *)capturedRejectedWritesSinceLastCall;

/** The current set of documents in limbo. */
- (std::map<model::DocumentKey, model::TargetId>)currentLimboDocuments;
/** The current set of documents in limbo with active targets. */
- (std::map<model::DocumentKey, model::TargetId>)activeLimboDocumentResolutions;

/** The current set of documents in limbo that are enqueued for resolution. */
- (std::deque<model::DocumentKey>)enqueuedLimboDocumentResolutions;

/** The expected set of documents in limbo with an active target. */
- (const model::DocumentKeySet &)expectedActiveLimboDocuments;

/** Sets the expected set of documents in limbo with an active target. */
- (void)setExpectedActiveLimboDocuments:(model::DocumentKeySet)docs;

/** The expected set of documents in limbo that are enqueued for resolution. */
- (const model::DocumentKeySet &)expectedEnqueuedLimboDocuments;

/** Sets the expected set of documents in limbo that are enqueued for resolution. */
- (void)setExpectedEnqueuedLimboDocuments:(model::DocumentKeySet)docs;

/**
* The writes that have been sent to the FSTSyncEngine via writeUserMutation: but not yet
* acknowledged by calling receiveWriteAck/Error:. They are tracked per-user.
Expand Down
34 changes: 24 additions & 10 deletions Firestore/Example/Tests/SpecTests/FSTSyncEngineTestDriver.mm
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#import <FirebaseFirestore/FIRFirestoreErrors.h>

#include <cstddef>
#include <map>
#include <memory>
#include <string>
Expand Down Expand Up @@ -154,6 +155,8 @@ @interface FSTSyncEngineTestDriver ()
@end

@implementation FSTSyncEngineTestDriver {
size_t _maxConcurrentLimboResolutions;

std::unique_ptr<Persistence> _persistence;

std::unique_ptr<LocalStore> _localStore;
Expand All @@ -173,6 +176,7 @@ @implementation FSTSyncEngineTestDriver {
// ivar is declared as mutable.
std::unordered_map<User, NSMutableArray<FSTOutstandingWrite *> *, HashUser> _outstandingWrites;
DocumentKeySet _expectedActiveLimboDocuments;
DocumentKeySet _expectedEnqueuedLimboDocuments;

/** A dictionary for tracking the listens on queries. */
std::unordered_map<Query, std::shared_ptr<QueryListener>> _queryListeners;
Expand All @@ -188,16 +192,13 @@ @implementation FSTSyncEngineTestDriver {
int _snapshotsInSyncEvents;
}

- (instancetype)initWithPersistence:(std::unique_ptr<Persistence>)persistence {
return [self initWithPersistence:std::move(persistence)
initialUser:User::Unauthenticated()
outstandingWrites:{}];
}

- (instancetype)initWithPersistence:(std::unique_ptr<Persistence>)persistence
initialUser:(const User &)initialUser
outstandingWrites:(const FSTOutstandingWriteQueues &)outstandingWrites {
outstandingWrites:(const FSTOutstandingWriteQueues &)outstandingWrites
maxConcurrentLimboResolutions:(size_t)maxConcurrentLimboResolutions {
if (self = [super init]) {
_maxConcurrentLimboResolutions = maxConcurrentLimboResolutions;

// Do a deep copy.
for (const auto &pair : outstandingWrites) {
_outstandingWrites[pair.first] = [pair.second mutableCopy];
Expand All @@ -219,7 +220,8 @@ - (instancetype)initWithPersistence:(std::unique_ptr<Persistence>)persistence
[self](OnlineState onlineState) { _syncEngine->HandleOnlineStateChange(onlineState); });
;

_syncEngine = absl::make_unique<SyncEngine>(_localStore.get(), _remoteStore.get(), initialUser);
_syncEngine = absl::make_unique<SyncEngine>(_localStore.get(), _remoteStore.get(), initialUser,
_maxConcurrentLimboResolutions);
_remoteStore->set_sync_engine(_syncEngine.get());
_eventManager.Init(_syncEngine.get());

Expand Down Expand Up @@ -251,6 +253,14 @@ - (void)setExpectedActiveLimboDocuments:(DocumentKeySet)docs {
_expectedActiveLimboDocuments = std::move(docs);
}

- (const DocumentKeySet &)expectedEnqueuedLimboDocuments {
return _expectedEnqueuedLimboDocuments;
}

- (void)setExpectedEnqueuedLimboDocuments:(DocumentKeySet)docs {
_expectedEnqueuedLimboDocuments = std::move(docs);
}

- (void)drainQueue {
_workerQueue->EnqueueBlocking([] {});
}
Expand Down Expand Up @@ -472,8 +482,12 @@ - (void)receiveWatchStreamError:(int)errorCode userInfo:(NSDictionary<NSString *
});
}

- (std::map<DocumentKey, TargetId>)currentLimboDocuments {
return _syncEngine->GetCurrentLimboDocuments();
- (std::map<DocumentKey, TargetId>)activeLimboDocumentResolutions {
return _syncEngine->GetActiveLimboDocumentResolutions();
}

- (std::deque<DocumentKey>)enqueuedLimboDocumentResolutions {
return _syncEngine->GetEnqueuedLimboDocumentResolutions();
}

- (const std::unordered_map<TargetId, TargetData> &)activeTargets {
Expand Down
8 changes: 0 additions & 8 deletions Firestore/Example/Tests/SpecTests/json/limbo_spec_test.json
Original file line number Diff line number Diff line change
Expand Up @@ -4656,8 +4656,6 @@
"describeName": "Limbo Documents:",
"itName": "Limbo resolution throttling when a limbo listen is rejected.",
"tags": [
"no-android",
"no-ios"
],
"config": {
"maxConcurrentLimboResolutions": 1,
Expand Down Expand Up @@ -4992,8 +4990,6 @@
"describeName": "Limbo Documents:",
"itName": "Limbo resolution throttling with all results at once from watch",
"tags": [
"no-android",
"no-ios"
],
"config": {
"maxConcurrentLimboResolutions": 2,
Expand Down Expand Up @@ -5571,8 +5567,6 @@
"describeName": "Limbo Documents:",
"itName": "Limbo resolution throttling with existence filter mismatch",
"tags": [
"no-android",
"no-ios"
],
"config": {
"maxConcurrentLimboResolutions": 2,
Expand Down Expand Up @@ -6185,8 +6179,6 @@
"describeName": "Limbo Documents:",
"itName": "Limbo resolution throttling with results one at a time from watch",
"tags": [
"no-android",
"no-ios"
],
"config": {
"maxConcurrentLimboResolutions": 2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ using util::StatusOrCallback;
using util::ThrowIllegalState;
using util::TimerId;

static const size_t kMaxConcurrentLimboResolutions = 100;

std::shared_ptr<FirestoreClient> FirestoreClient::Create(
const DatabaseInfo& database_info,
const api::Settings& settings,
Expand Down Expand Up @@ -202,8 +204,9 @@ void FirestoreClient::Initialize(const User& user, const Settings& settings) {
weak_this.lock()->sync_engine_->HandleOnlineStateChange(online_state);
});

sync_engine_ = absl::make_unique<SyncEngine>(local_store_.get(),
remote_store_.get(), user);
sync_engine_ =
absl::make_unique<SyncEngine>(local_store_.get(), remote_store_.get(),
user, kMaxConcurrentLimboResolutions);

event_manager_ = absl::make_unique<EventManager>(sync_engine_.get());

Expand Down
Loading