Skip to content

Commit 3f0d9ff

Browse files
committed
Implement limbo resolution throttling
1 parent f0bda45 commit 3f0d9ff

File tree

4 files changed

+123
-37
lines changed

4 files changed

+123
-37
lines changed

firebase-firestore/src/main/java/com/google/firebase/firestore/core/FirestoreClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
public final class FirestoreClient implements RemoteStore.RemoteStoreCallback {
6868

6969
private static final String LOG_TAG = "FirestoreClient";
70+
private static final int MAX_CONCURRENT_LIMBO_RESOLUTIONS = 100;
7071

7172
private final DatabaseInfo databaseInfo;
7273
private final CredentialsProvider credentialsProvider;
@@ -279,7 +280,7 @@ private void initialize(Context context, User user, boolean usePersistence, long
279280
ConnectivityMonitor connectivityMonitor = new AndroidConnectivityMonitor(context);
280281
remoteStore = new RemoteStore(this, localStore, datastore, asyncQueue, connectivityMonitor);
281282

282-
syncEngine = new SyncEngine(localStore, remoteStore, user);
283+
syncEngine = new SyncEngine(localStore, remoteStore, user, MAX_CONCURRENT_LIMBO_RESOLUTIONS);
283284
eventManager = new EventManager(syncEngine);
284285

285286
// NOTE: RemoteStore depends on LocalStore (for persisting stream tokens, refilling mutation

firebase-firestore/src/main/java/com/google/firebase/firestore/core/SyncEngine.java

Lines changed: 63 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,13 @@
4848
import com.google.firebase.firestore.util.Logger;
4949
import com.google.firebase.firestore.util.Util;
5050
import io.grpc.Status;
51+
import java.util.ArrayDeque;
5152
import java.util.ArrayList;
5253
import java.util.Collections;
5354
import java.util.HashMap;
5455
import java.util.List;
5556
import java.util.Map;
57+
import java.util.Queue;
5658
import java.util.Set;
5759

5860
/**
@@ -115,17 +117,22 @@ interface SyncEngineCallback {
115117
/** Queries mapped to active targets, indexed by target id. */
116118
private final Map<Integer, List<Query>> queriesByTarget;
117119

120+
private final int maxConcurrentLimboResolutions;
121+
118122
/**
119-
* When a document is in limbo, we create a special listen to resolve it. This maps the
120-
* DocumentKey of each limbo document to the target ID of the listen resolving it.
123+
* The keys of documents that are in limbo for which we haven't yet started a limbo resolution
124+
* query.
121125
*/
122-
private final Map<DocumentKey, Integer> limboTargetsByKey;
126+
private final Queue<DocumentKey> enqueuedLimboResolutions;
127+
128+
/** Keeps track of the target ID for each document that is in limbo with an active target. */
129+
private final Map<DocumentKey, Integer> activeLimboTargetsByKey;
123130

124131
/**
125-
* Basically the inverse of limboTargetsByKey, a map of target ID to a LimboResolution (which
126-
* includes the DocumentKey as well as whether we've received a document for the target).
132+
* Keeps track of the information about an active limbo resolution for each active target ID that
133+
* was started for the purpose of limbo resolution.
127134
*/
128-
private final Map<Integer, LimboResolution> limboResolutionsByTarget;
135+
private final Map<Integer, LimboResolution> activeLimboResolutionsByTarget;
129136

130137
/** Used to track any documents that are currently in limbo. */
131138
private final ReferenceSet limboDocumentRefs;
@@ -143,15 +150,21 @@ interface SyncEngineCallback {
143150

144151
private SyncEngineCallback syncEngineListener;
145152

146-
public SyncEngine(LocalStore localStore, RemoteStore remoteStore, User initialUser) {
153+
public SyncEngine(
154+
LocalStore localStore,
155+
RemoteStore remoteStore,
156+
User initialUser,
157+
int maxConcurrentLimboResolutions) {
147158
this.localStore = localStore;
148159
this.remoteStore = remoteStore;
160+
this.maxConcurrentLimboResolutions = maxConcurrentLimboResolutions;
149161

150162
queryViewsByQuery = new HashMap<>();
151163
queriesByTarget = new HashMap<>();
152164

153-
limboTargetsByKey = new HashMap<>();
154-
limboResolutionsByTarget = new HashMap<>();
165+
enqueuedLimboResolutions = new ArrayDeque<>();
166+
activeLimboTargetsByKey = new HashMap<>();
167+
activeLimboResolutionsByTarget = new HashMap<>();
155168
limboDocumentRefs = new ReferenceSet();
156169

157170
mutationUserCallbacks = new HashMap<>();
@@ -298,7 +311,7 @@ public void handleRemoteEvent(RemoteEvent event) {
298311
for (Map.Entry<Integer, TargetChange> entry : event.getTargetChanges().entrySet()) {
299312
Integer targetId = entry.getKey();
300313
TargetChange targetChange = entry.getValue();
301-
LimboResolution limboResolution = limboResolutionsByTarget.get(targetId);
314+
LimboResolution limboResolution = activeLimboResolutionsByTarget.get(targetId);
302315
if (limboResolution != null) {
303316
// Since this is a limbo resolution lookup, it's for a single document and it could be
304317
// added, modified, or removed, but not a combination.
@@ -349,7 +362,7 @@ public void handleOnlineStateChange(OnlineState onlineState) {
349362

350363
@Override
351364
public ImmutableSortedSet<DocumentKey> getRemoteKeysForTarget(int targetId) {
352-
LimboResolution limboResolution = limboResolutionsByTarget.get(targetId);
365+
LimboResolution limboResolution = activeLimboResolutionsByTarget.get(targetId);
353366
if (limboResolution != null && limboResolution.receivedDocument) {
354367
return DocumentKey.emptyKeySet().insert(limboResolution.key);
355368
} else {
@@ -372,13 +385,14 @@ public ImmutableSortedSet<DocumentKey> getRemoteKeysForTarget(int targetId) {
372385
public void handleRejectedListen(int targetId, Status error) {
373386
assertCallback("handleRejectedListen");
374387

375-
LimboResolution limboResolution = limboResolutionsByTarget.get(targetId);
388+
LimboResolution limboResolution = activeLimboResolutionsByTarget.get(targetId);
376389
DocumentKey limboKey = limboResolution != null ? limboResolution.key : null;
377390
if (limboKey != null) {
378391
// Since this query failed, we won't want to manually unlisten to it.
379392
// So go ahead and remove it from bookkeeping.
380-
limboTargetsByKey.remove(limboKey);
381-
limboResolutionsByTarget.remove(targetId);
393+
activeLimboTargetsByKey.remove(limboKey);
394+
activeLimboResolutionsByTarget.remove(targetId);
395+
pumpEnqueuedLimboResolutions();
382396

383397
// TODO: Retry on transient errors?
384398

@@ -535,11 +549,12 @@ private void removeAndCleanupTarget(int targetId, Status status) {
535549
private void removeLimboTarget(DocumentKey key) {
536550
// It's possible that the target already got removed because the query failed. In that case,
537551
// the key won't exist in `limboTargetsByKey`. Only do the cleanup if we still have the target.
538-
Integer targetId = limboTargetsByKey.get(key);
552+
Integer targetId = activeLimboTargetsByKey.get(key);
539553
if (targetId != null) {
540554
remoteStore.stopListening(targetId);
541-
limboTargetsByKey.remove(key);
542-
limboResolutionsByTarget.remove(targetId);
555+
activeLimboTargetsByKey.remove(key);
556+
activeLimboResolutionsByTarget.remove(targetId);
557+
pumpEnqueuedLimboResolutions();
543558
}
544559
}
545560

@@ -605,26 +620,47 @@ private void updateTrackedLimboDocuments(List<LimboDocumentChange> limboChanges,
605620

606621
private void trackLimboChange(LimboDocumentChange change) {
607622
DocumentKey key = change.getKey();
608-
if (!limboTargetsByKey.containsKey(key)) {
623+
if (!activeLimboTargetsByKey.containsKey(key)) {
609624
Logger.debug(TAG, "New document in limbo: %s", key);
625+
enqueuedLimboResolutions.add(key);
626+
pumpEnqueuedLimboResolutions();
627+
}
628+
}
629+
630+
/**
631+
* Starts listens for documents in limbo that are enqueued for resolution, subject to a maximum
632+
* number of concurrent resolutions.
633+
*
634+
* <p>Without bounding the number of concurrent resolutions, the server can fail with "resource
635+
* exhausted" errors which can lead to pathological client behavior as seen in
636+
* https://github.com/firebase/firebase-js-sdk/issues/2683.
637+
*/
638+
private void pumpEnqueuedLimboResolutions() {
639+
while (!enqueuedLimboResolutions.isEmpty()
640+
&& activeLimboTargetsByKey.size() < maxConcurrentLimboResolutions) {
641+
DocumentKey key = enqueuedLimboResolutions.remove();
610642
int limboTargetId = targetIdGenerator.nextId();
611-
Query query = Query.atPath(key.getPath());
612-
TargetData targetData =
643+
activeLimboResolutionsByTarget.put(limboTargetId, new LimboResolution(key));
644+
activeLimboTargetsByKey.put(key, limboTargetId);
645+
remoteStore.listen(
613646
new TargetData(
614-
query.toTarget(),
647+
Query.atPath(key.getPath()).toTarget(),
615648
limboTargetId,
616649
ListenSequence.INVALID,
617-
QueryPurpose.LIMBO_RESOLUTION);
618-
limboResolutionsByTarget.put(limboTargetId, new LimboResolution(key));
619-
remoteStore.listen(targetData);
620-
limboTargetsByKey.put(key, limboTargetId);
650+
QueryPurpose.LIMBO_RESOLUTION));
621651
}
622652
}
623653

624654
@VisibleForTesting
625-
public Map<DocumentKey, Integer> getCurrentLimboDocuments() {
655+
public Map<DocumentKey, Integer> getActiveLimboDocumentResolutions() {
626656
// Make a defensive copy as the Map continues to be modified.
627-
return new HashMap<>(limboTargetsByKey);
657+
return new HashMap<>(activeLimboTargetsByKey);
658+
}
659+
660+
@VisibleForTesting
661+
public Queue<DocumentKey> getEnqueuedLimboDocumentResolutions() {
662+
// Make a defensive copy as the Queue continues to be modified.
663+
return new ArrayDeque<>(enqueuedLimboResolutions);
628664
}
629665

630666
public void handleCredentialChange(User user) {

firebase-firestore/src/test/java/com/google/firebase/firestore/spec/SpecTestCase.java

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ public abstract class SpecTestCase implements RemoteStoreCallback {
154154
: Sets.newHashSet("no-android", BENCHMARK_TAG, "multi-client");
155155

156156
private boolean garbageCollectionEnabled;
157+
private int maxConcurrentLimboResolutions;
157158
private boolean networkEnabled = true;
158159

159160
//
@@ -176,9 +177,18 @@ public abstract class SpecTestCase implements RemoteStoreCallback {
176177
*/
177178
private Map<Query, QueryListener> queryListeners;
178179

179-
/** Set of documents that are expected to be in limbo. Verified at every step. */
180+
/**
181+
* Set of documents that are expected to be in limbo with an active target. Verified at every
182+
* step.
183+
*/
180184
private Set<DocumentKey> expectedActiveLimboDocs;
181185

186+
/**
187+
* Set of documents that are expected to be in limbo, enqueued for resolution and, therefore,
188+
* without an active target. Verified at every step.
189+
*/
190+
private Set<DocumentKey> expectedEnqueuedLimboDocs;
191+
182192
/** Set of expected active targets, keyed by target ID. */
183193
private Map<Integer, Pair<List<TargetData>, String>> expectedActiveTargets;
184194

@@ -251,6 +261,8 @@ protected void specSetUp(JSONObject config) {
251261
outstandingWrites = new HashMap<>();
252262

253263
this.garbageCollectionEnabled = config.optBoolean("useGarbageCollection", false);
264+
this.maxConcurrentLimboResolutions =
265+
config.optInt("maxConcurrentLimboResolutions", Integer.MAX_VALUE);
254266

255267
currentUser = User.UNAUTHENTICATED;
256268

@@ -265,6 +277,7 @@ protected void specSetUp(JSONObject config) {
265277
queryListeners = new HashMap<>();
266278

267279
expectedActiveLimboDocs = new HashSet<>();
280+
expectedEnqueuedLimboDocs = new HashSet<>();
268281
expectedActiveTargets = new HashMap<>();
269282

270283
snapshotsInSyncListeners = Collections.synchronizedList(new ArrayList<>());
@@ -294,7 +307,8 @@ private void initClient() {
294307
ConnectivityMonitor connectivityMonitor =
295308
new AndroidConnectivityMonitor(ApplicationProvider.getApplicationContext());
296309
remoteStore = new RemoteStore(this, localStore, datastore, queue, connectivityMonitor);
297-
syncEngine = new SyncEngine(localStore, remoteStore, currentUser);
310+
syncEngine =
311+
new SyncEngine(localStore, remoteStore, currentUser, maxConcurrentLimboResolutions);
298312
eventManager = new EventManager(syncEngine);
299313
localStore.start();
300314
remoteStore.start();
@@ -918,6 +932,13 @@ private void validateExpectedState(@Nullable JSONObject expectedState) throws JS
918932
expectedActiveLimboDocs.add(key((String) limboDocs.get(i)));
919933
}
920934
}
935+
if (expectedState.has("enqueuedLimboDocs")) {
936+
expectedEnqueuedLimboDocs = new HashSet<>();
937+
JSONArray limboDocs = expectedState.getJSONArray("enqueuedLimboDocs");
938+
for (int i = 0; i < limboDocs.length(); i++) {
939+
expectedEnqueuedLimboDocs.add(key((String) limboDocs.get(i)));
940+
}
941+
}
921942
if (expectedState.has("activeTargets")) {
922943
expectedActiveTargets = new HashMap<>();
923944
JSONObject activeTargets = expectedState.getJSONObject("activeTargets");
@@ -950,7 +971,8 @@ private void validateExpectedState(@Nullable JSONObject expectedState) throws JS
950971
// Always validate the we received the expected number of events.
951972
validateUserCallbacks(expectedState);
952973
// Always validate that the expected limbo docs match the actual limbo docs.
953-
validateLimboDocs();
974+
validateActiveLimboDocs();
975+
validateEnqueuedLimboDocs();
954976
// Always validate that the expected active targets match the actual active targets.
955977
validateActiveTargets();
956978
}
@@ -984,11 +1006,11 @@ private void validateUserCallbacks(@Nullable JSONObject expected) throws JSONExc
9841006
}
9851007
}
9861008

987-
private void validateLimboDocs() {
1009+
private void validateActiveLimboDocs() {
9881010
// Make a copy so it can modified while checking against the expected limbo docs.
9891011
@SuppressWarnings("VisibleForTests")
9901012
Map<DocumentKey, Integer> actualLimboDocs =
991-
new HashMap<>(syncEngine.getCurrentLimboDocuments());
1013+
new HashMap<>(syncEngine.getActiveLimboDocumentResolutions());
9921014

9931015
// Validate that each active limbo doc has an expected active target
9941016
for (Map.Entry<DocumentKey, Integer> limboDoc : actualLimboDocs.entrySet()) {
@@ -1014,6 +1036,37 @@ private void validateLimboDocs() {
10141036
assertTrue("Unexpected active docs in limbo: " + actualLimboDocs, actualLimboDocs.isEmpty());
10151037
}
10161038

1039+
private void validateEnqueuedLimboDocs() {
1040+
Set<DocumentKey> actualLimboDocs =
1041+
new HashSet<>(syncEngine.getEnqueuedLimboDocumentResolutions());
1042+
1043+
for (DocumentKey key : actualLimboDocs) {
1044+
assertTrue(
1045+
"Found enqueued limbo doc "
1046+
+ key.getPath().canonicalString()
1047+
+ ", but it was not in the set of expected enqueued limbo documents ("
1048+
+ expectedEnqueuedLimboDocs.stream()
1049+
.sorted()
1050+
.map(String::valueOf)
1051+
.collect(Collectors.joining(", "))
1052+
+ ")",
1053+
expectedEnqueuedLimboDocs.contains(key));
1054+
}
1055+
1056+
for (DocumentKey key : expectedEnqueuedLimboDocs) {
1057+
assertTrue(
1058+
"Expected doc "
1059+
+ key.getPath().canonicalString()
1060+
+ " to be enqueued for limbo resolution, but it was not in the queue ("
1061+
+ actualLimboDocs.stream()
1062+
.sorted()
1063+
.map(String::valueOf)
1064+
.collect(Collectors.joining(", "))
1065+
+ ")",
1066+
actualLimboDocs.contains(key));
1067+
}
1068+
}
1069+
10171070
private void validateActiveTargets() {
10181071
if (!networkEnabled) {
10191072
return;

firebase-firestore/src/test/resources/json/limbo_spec_test.json

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4656,7 +4656,6 @@
46564656
"describeName": "Limbo Documents:",
46574657
"itName": "Limbo resolution throttling when a limbo listen is rejected.",
46584658
"tags": [
4659-
"no-android",
46604659
"no-ios"
46614660
],
46624661
"config": {
@@ -4992,7 +4991,6 @@
49924991
"describeName": "Limbo Documents:",
49934992
"itName": "Limbo resolution throttling with all results at once from watch",
49944993
"tags": [
4995-
"no-android",
49964994
"no-ios"
49974995
],
49984996
"config": {
@@ -5571,7 +5569,6 @@
55715569
"describeName": "Limbo Documents:",
55725570
"itName": "Limbo resolution throttling with existence filter mismatch",
55735571
"tags": [
5574-
"no-android",
55755572
"no-ios"
55765573
],
55775574
"config": {
@@ -6185,7 +6182,6 @@
61856182
"describeName": "Limbo Documents:",
61866183
"itName": "Limbo resolution throttling with results one at a time from watch",
61876184
"tags": [
6188-
"no-android",
61896185
"no-ios"
61906186
],
61916187
"config": {

0 commit comments

Comments
 (0)