Skip to content

Implement limbo resolution listen throttling. #1374

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 6, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
public final class FirestoreClient implements RemoteStore.RemoteStoreCallback {

private static final String LOG_TAG = "FirestoreClient";
private static final int MAX_CONCURRENT_LIMBO_RESOLUTIONS = 100;

private final DatabaseInfo databaseInfo;
private final CredentialsProvider credentialsProvider;
Expand Down Expand Up @@ -279,7 +280,7 @@ private void initialize(Context context, User user, boolean usePersistence, long
ConnectivityMonitor connectivityMonitor = new AndroidConnectivityMonitor(context);
remoteStore = new RemoteStore(this, localStore, datastore, asyncQueue, connectivityMonitor);

syncEngine = new SyncEngine(localStore, remoteStore, user);
syncEngine = new SyncEngine(localStore, remoteStore, user, MAX_CONCURRENT_LIMBO_RESOLUTIONS);
eventManager = new EventManager(syncEngine);

// NOTE: RemoteStore depends on LocalStore (for persisting stream tokens, refilling mutation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@
import com.google.firebase.firestore.util.Logger;
import com.google.firebase.firestore.util.Util;
import io.grpc.Status;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;

/**
Expand Down Expand Up @@ -115,17 +117,22 @@ interface SyncEngineCallback {
/** Queries mapped to active targets, indexed by target id. */
private final Map<Integer, List<Query>> queriesByTarget;

private final int maxConcurrentLimboResolutions;

/**
* When a document is in limbo, we create a special listen to resolve it. This maps the
* DocumentKey of each limbo document to the target ID of the listen resolving it.
* The keys of documents that are in limbo for which we haven't yet started a limbo resolution
* query.
*/
private final Map<DocumentKey, Integer> limboTargetsByKey;
private final Queue<DocumentKey> enqueuedLimboResolutions;

/** Keeps track of the target ID for each document that is in limbo with an active target. */
private final Map<DocumentKey, Integer> activeLimboTargetsByKey;

/**
* Basically the inverse of limboTargetsByKey, a map of target ID to a LimboResolution (which
* includes the DocumentKey as well as whether we've received a document for the target).
* Keeps track of the information about an active limbo resolution for each active target ID that
* was started for the purpose of limbo resolution.
*/
private final Map<Integer, LimboResolution> limboResolutionsByTarget;
private final Map<Integer, LimboResolution> activeLimboResolutionsByTarget;

/** Used to track any documents that are currently in limbo. */
private final ReferenceSet limboDocumentRefs;
Expand All @@ -143,15 +150,21 @@ interface SyncEngineCallback {

private SyncEngineCallback syncEngineListener;

public SyncEngine(LocalStore localStore, RemoteStore remoteStore, User initialUser) {
public SyncEngine(
LocalStore localStore,
RemoteStore remoteStore,
User initialUser,
int maxConcurrentLimboResolutions) {
this.localStore = localStore;
this.remoteStore = remoteStore;
this.maxConcurrentLimboResolutions = maxConcurrentLimboResolutions;

queryViewsByQuery = new HashMap<>();
queriesByTarget = new HashMap<>();

limboTargetsByKey = new HashMap<>();
limboResolutionsByTarget = new HashMap<>();
enqueuedLimboResolutions = new ArrayDeque<>();
activeLimboTargetsByKey = new HashMap<>();
activeLimboResolutionsByTarget = new HashMap<>();
limboDocumentRefs = new ReferenceSet();

mutationUserCallbacks = new HashMap<>();
Expand Down Expand Up @@ -298,7 +311,7 @@ public void handleRemoteEvent(RemoteEvent event) {
for (Map.Entry<Integer, TargetChange> entry : event.getTargetChanges().entrySet()) {
Integer targetId = entry.getKey();
TargetChange targetChange = entry.getValue();
LimboResolution limboResolution = limboResolutionsByTarget.get(targetId);
LimboResolution limboResolution = activeLimboResolutionsByTarget.get(targetId);
if (limboResolution != null) {
// Since this is a limbo resolution lookup, it's for a single document and it could be
// added, modified, or removed, but not a combination.
Expand Down Expand Up @@ -349,7 +362,7 @@ public void handleOnlineStateChange(OnlineState onlineState) {

@Override
public ImmutableSortedSet<DocumentKey> getRemoteKeysForTarget(int targetId) {
LimboResolution limboResolution = limboResolutionsByTarget.get(targetId);
LimboResolution limboResolution = activeLimboResolutionsByTarget.get(targetId);
if (limboResolution != null && limboResolution.receivedDocument) {
return DocumentKey.emptyKeySet().insert(limboResolution.key);
} else {
Expand All @@ -372,13 +385,14 @@ public ImmutableSortedSet<DocumentKey> getRemoteKeysForTarget(int targetId) {
public void handleRejectedListen(int targetId, Status error) {
assertCallback("handleRejectedListen");

LimboResolution limboResolution = limboResolutionsByTarget.get(targetId);
LimboResolution limboResolution = activeLimboResolutionsByTarget.get(targetId);
DocumentKey limboKey = limboResolution != null ? limboResolution.key : null;
if (limboKey != null) {
// Since this query failed, we won't want to manually unlisten to it.
// So go ahead and remove it from bookkeeping.
limboTargetsByKey.remove(limboKey);
limboResolutionsByTarget.remove(targetId);
activeLimboTargetsByKey.remove(limboKey);
activeLimboResolutionsByTarget.remove(targetId);
pumpEnqueuedLimboResolutions();

// TODO: Retry on transient errors?

Expand Down Expand Up @@ -535,11 +549,12 @@ private void removeAndCleanupTarget(int targetId, Status status) {
private void removeLimboTarget(DocumentKey key) {
// It's possible that the target already got removed because the query failed. In that case,
// the key won't exist in `limboTargetsByKey`. Only do the cleanup if we still have the target.
Integer targetId = limboTargetsByKey.get(key);
Integer targetId = activeLimboTargetsByKey.get(key);
if (targetId != null) {
remoteStore.stopListening(targetId);
limboTargetsByKey.remove(key);
limboResolutionsByTarget.remove(targetId);
activeLimboTargetsByKey.remove(key);
activeLimboResolutionsByTarget.remove(targetId);
pumpEnqueuedLimboResolutions();
}
}

Expand Down Expand Up @@ -605,26 +620,47 @@ private void updateTrackedLimboDocuments(List<LimboDocumentChange> limboChanges,

private void trackLimboChange(LimboDocumentChange change) {
DocumentKey key = change.getKey();
if (!limboTargetsByKey.containsKey(key)) {
if (!activeLimboTargetsByKey.containsKey(key)) {
Logger.debug(TAG, "New document in limbo: %s", key);
enqueuedLimboResolutions.add(key);
pumpEnqueuedLimboResolutions();
}
}

/**
* Starts listens for documents in limbo that are enqueued for resolution, subject to a maximum
* number of concurrent resolutions.
*
* <p>Without bounding the number of concurrent resolutions, the server can fail with "resource
* exhausted" errors which can lead to pathological client behavior as seen in
* https://github.com/firebase/firebase-js-sdk/issues/2683.
*/
private void pumpEnqueuedLimboResolutions() {
while (!enqueuedLimboResolutions.isEmpty()
&& activeLimboTargetsByKey.size() < maxConcurrentLimboResolutions) {
DocumentKey key = enqueuedLimboResolutions.remove();
int limboTargetId = targetIdGenerator.nextId();
Query query = Query.atPath(key.getPath());
TargetData targetData =
activeLimboResolutionsByTarget.put(limboTargetId, new LimboResolution(key));
activeLimboTargetsByKey.put(key, limboTargetId);
remoteStore.listen(
new TargetData(
query.toTarget(),
Query.atPath(key.getPath()).toTarget(),
limboTargetId,
ListenSequence.INVALID,
QueryPurpose.LIMBO_RESOLUTION);
limboResolutionsByTarget.put(limboTargetId, new LimboResolution(key));
remoteStore.listen(targetData);
limboTargetsByKey.put(key, limboTargetId);
QueryPurpose.LIMBO_RESOLUTION));
}
}

@VisibleForTesting
public Map<DocumentKey, Integer> getCurrentLimboDocuments() {
public Map<DocumentKey, Integer> getActiveLimboDocumentResolutions() {
// Make a defensive copy as the Map continues to be modified.
return new HashMap<>(limboTargetsByKey);
return new HashMap<>(activeLimboTargetsByKey);
}

@VisibleForTesting
public Queue<DocumentKey> getEnqueuedLimboDocumentResolutions() {
// Make a defensive copy as the Queue continues to be modified.
return new ArrayDeque<>(enqueuedLimboResolutions);
}

public void handleCredentialChange(User user) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public abstract class SpecTestCase implements RemoteStoreCallback {
: Sets.newHashSet("no-android", BENCHMARK_TAG, "multi-client");

private boolean garbageCollectionEnabled;
private int maxConcurrentLimboResolutions;
private boolean networkEnabled = true;

//
Expand All @@ -176,9 +177,18 @@ public abstract class SpecTestCase implements RemoteStoreCallback {
*/
private Map<Query, QueryListener> queryListeners;

/** Set of documents that are expected to be in limbo. Verified at every step. */
/**
* Set of documents that are expected to be in limbo with an active target. Verified at every
* step.
*/
private Set<DocumentKey> expectedActiveLimboDocs;

/**
* Set of documents that are expected to be in limbo, enqueued for resolution and, therefore,
* without an active target. Verified at every step.
*/
private Set<DocumentKey> expectedEnqueuedLimboDocs;

/** Set of expected active targets, keyed by target ID. */
private Map<Integer, Pair<List<TargetData>, String>> expectedActiveTargets;

Expand Down Expand Up @@ -251,6 +261,8 @@ protected void specSetUp(JSONObject config) {
outstandingWrites = new HashMap<>();

this.garbageCollectionEnabled = config.optBoolean("useGarbageCollection", false);
this.maxConcurrentLimboResolutions =
config.optInt("maxConcurrentLimboResolutions", Integer.MAX_VALUE);

currentUser = User.UNAUTHENTICATED;

Expand All @@ -265,6 +277,7 @@ protected void specSetUp(JSONObject config) {
queryListeners = new HashMap<>();

expectedActiveLimboDocs = new HashSet<>();
expectedEnqueuedLimboDocs = new HashSet<>();
expectedActiveTargets = new HashMap<>();

snapshotsInSyncListeners = Collections.synchronizedList(new ArrayList<>());
Expand Down Expand Up @@ -294,7 +307,8 @@ private void initClient() {
ConnectivityMonitor connectivityMonitor =
new AndroidConnectivityMonitor(ApplicationProvider.getApplicationContext());
remoteStore = new RemoteStore(this, localStore, datastore, queue, connectivityMonitor);
syncEngine = new SyncEngine(localStore, remoteStore, currentUser);
syncEngine =
new SyncEngine(localStore, remoteStore, currentUser, maxConcurrentLimboResolutions);
eventManager = new EventManager(syncEngine);
localStore.start();
remoteStore.start();
Expand Down Expand Up @@ -918,6 +932,13 @@ private void validateExpectedState(@Nullable JSONObject expectedState) throws JS
expectedActiveLimboDocs.add(key((String) limboDocs.get(i)));
}
}
if (expectedState.has("enqueuedLimboDocs")) {
expectedEnqueuedLimboDocs = new HashSet<>();
JSONArray limboDocs = expectedState.getJSONArray("enqueuedLimboDocs");
for (int i = 0; i < limboDocs.length(); i++) {
expectedEnqueuedLimboDocs.add(key((String) limboDocs.get(i)));
}
}
if (expectedState.has("activeTargets")) {
expectedActiveTargets = new HashMap<>();
JSONObject activeTargets = expectedState.getJSONObject("activeTargets");
Expand Down Expand Up @@ -950,7 +971,8 @@ private void validateExpectedState(@Nullable JSONObject expectedState) throws JS
// Always validate the we received the expected number of events.
validateUserCallbacks(expectedState);
// Always validate that the expected limbo docs match the actual limbo docs.
validateLimboDocs();
validateActiveLimboDocs();
validateEnqueuedLimboDocs();
// Always validate that the expected active targets match the actual active targets.
validateActiveTargets();
}
Expand Down Expand Up @@ -984,11 +1006,11 @@ private void validateUserCallbacks(@Nullable JSONObject expected) throws JSONExc
}
}

private void validateLimboDocs() {
private void validateActiveLimboDocs() {
// Make a copy so it can modified while checking against the expected limbo docs.
@SuppressWarnings("VisibleForTests")
Map<DocumentKey, Integer> actualLimboDocs =
new HashMap<>(syncEngine.getCurrentLimboDocuments());
new HashMap<>(syncEngine.getActiveLimboDocumentResolutions());

// Validate that each active limbo doc has an expected active target
for (Map.Entry<DocumentKey, Integer> limboDoc : actualLimboDocs.entrySet()) {
Expand All @@ -1014,6 +1036,37 @@ private void validateLimboDocs() {
assertTrue("Unexpected active docs in limbo: " + actualLimboDocs, actualLimboDocs.isEmpty());
}

private void validateEnqueuedLimboDocs() {
Set<DocumentKey> actualLimboDocs =
new HashSet<>(syncEngine.getEnqueuedLimboDocumentResolutions());

for (DocumentKey key : actualLimboDocs) {
assertTrue(
"Found enqueued limbo doc "
+ key.getPath().canonicalString()
+ ", but it was not in the set of expected enqueued limbo documents ("
+ expectedEnqueuedLimboDocs.stream()
.sorted()
.map(String::valueOf)
.collect(Collectors.joining(", "))
+ ")",
expectedEnqueuedLimboDocs.contains(key));
}

for (DocumentKey key : expectedEnqueuedLimboDocs) {
assertTrue(
"Expected doc "
+ key.getPath().canonicalString()
+ " to be enqueued for limbo resolution, but it was not in the queue ("
+ actualLimboDocs.stream()
.sorted()
.map(String::valueOf)
.collect(Collectors.joining(", "))
+ ")",
actualLimboDocs.contains(key));
}
}

private void validateActiveTargets() {
if (!networkEnabled) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4656,7 +4656,6 @@
"describeName": "Limbo Documents:",
"itName": "Limbo resolution throttling when a limbo listen is rejected.",
"tags": [
"no-android",
"no-ios"
],
"config": {
Expand Down Expand Up @@ -4992,7 +4991,6 @@
"describeName": "Limbo Documents:",
"itName": "Limbo resolution throttling with all results at once from watch",
"tags": [
"no-android",
"no-ios"
],
"config": {
Expand Down Expand Up @@ -5571,7 +5569,6 @@
"describeName": "Limbo Documents:",
"itName": "Limbo resolution throttling with existence filter mismatch",
"tags": [
"no-android",
"no-ios"
],
"config": {
Expand Down Expand Up @@ -6185,7 +6182,6 @@
"describeName": "Limbo Documents:",
"itName": "Limbo resolution throttling with results one at a time from watch",
"tags": [
"no-android",
"no-ios"
],
"config": {
Expand Down