Skip to content

Commit 9eba6e0

Browse files
committed
Implement limbo resolution listen throttling.
1 parent 6251d0c commit 9eba6e0

File tree

3 files changed

+913
-20
lines changed

3 files changed

+913
-20
lines changed

firebase-firestore/src/main/java/com/google/firebase/firestore/core/SyncEngine.java

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import static com.google.firebase.firestore.util.Assert.fail;
1818
import static com.google.firebase.firestore.util.Assert.hardAssert;
1919

20+
import androidx.annotation.NonNull;
2021
import androidx.annotation.Nullable;
2122
import androidx.annotation.VisibleForTesting;
2223
import com.google.android.gms.tasks.Task;
@@ -48,11 +49,13 @@
4849
import com.google.firebase.firestore.util.Logger;
4950
import com.google.firebase.firestore.util.Util;
5051
import io.grpc.Status;
52+
import java.util.ArrayDeque;
5153
import java.util.ArrayList;
5254
import java.util.Collections;
5355
import java.util.HashMap;
5456
import java.util.List;
5557
import java.util.Map;
58+
import java.util.Queue;
5659
import java.util.Set;
5760

5861
/**
@@ -89,8 +92,28 @@ private static class LimboResolution {
8992
}
9093
}
9194

95+
/** An entry in {@link #limboListenQueue}. */
96+
private static final class LimboListenQueueEntry {
97+
98+
final DocumentKey key;
99+
final TargetData targetData;
100+
101+
LimboListenQueueEntry(DocumentKey key, TargetData targetData) {
102+
this.key = key;
103+
this.targetData = targetData;
104+
}
105+
106+
@NonNull
107+
@Override
108+
public String toString() {
109+
return "key=" + key + " targetData=" + targetData;
110+
}
111+
}
112+
92113
private static final String TAG = SyncEngine.class.getSimpleName();
93114

115+
private static final int DEFAULT_MAX_CONCURRENT_LIMBO_RESOLUTIONS = 100;
116+
94117
/** Interface implemented by EventManager to handle notifications from SyncEngine. */
95118
interface SyncEngineCallback {
96119
/** Handles new view snapshots. */
@@ -109,6 +132,9 @@ interface SyncEngineCallback {
109132
/** The remote store for sending writes, watches, etc. to the backend. */
110133
private final RemoteStore remoteStore;
111134

135+
/** The max number of concurrent listens for limbo resolution; see {@link #limboListenQueue}. */
136+
private final int maxConcurrentLimboResolutions;
137+
112138
/** QueryViews for all active queries, indexed by query. */
113139
private final Map<Query, QueryView> queryViewsByQuery;
114140

@@ -127,6 +153,13 @@ interface SyncEngineCallback {
127153
*/
128154
private final Map<Integer, LimboResolution> limboResolutionsByTarget;
129155

156+
/**
157+
* The list of enqueued limbo resolutions. After {@link #maxConcurrentLimboResolutions} listens
158+
* have started for the purpose of limbo resolutions then additional limbo resolution listens are
159+
* enqueued in this queue. When a listen completes then a listen is dequeued and started.
160+
*/
161+
private final Queue<LimboListenQueueEntry> limboListenQueue;
162+
130163
/** Used to track any documents that are currently in limbo. */
131164
private final ReferenceSet limboDocumentRefs;
132165

@@ -144,14 +177,24 @@ interface SyncEngineCallback {
144177
private SyncEngineCallback syncEngineListener;
145178

146179
public SyncEngine(LocalStore localStore, RemoteStore remoteStore, User initialUser) {
180+
this(localStore, remoteStore, initialUser, DEFAULT_MAX_CONCURRENT_LIMBO_RESOLUTIONS);
181+
}
182+
183+
public SyncEngine(
184+
LocalStore localStore,
185+
RemoteStore remoteStore,
186+
User initialUser,
187+
int maxConcurrentLimboResolutions) {
147188
this.localStore = localStore;
148189
this.remoteStore = remoteStore;
190+
this.maxConcurrentLimboResolutions = maxConcurrentLimboResolutions;
149191

150192
queryViewsByQuery = new HashMap<>();
151193
queriesByTarget = new HashMap<>();
152194

153195
limboTargetsByKey = new HashMap<>();
154196
limboResolutionsByTarget = new HashMap<>();
197+
limboListenQueue = new ArrayDeque<>();
155198
limboDocumentRefs = new ReferenceSet();
156199

157200
mutationUserCallbacks = new HashMap<>();
@@ -380,6 +423,10 @@ public void handleRejectedListen(int targetId, Status error) {
380423
limboTargetsByKey.remove(limboKey);
381424
limboResolutionsByTarget.remove(targetId);
382425

426+
if (!limboListenQueue.isEmpty()) {
427+
remoteStore.listen(limboListenQueue.remove().targetData);
428+
}
429+
383430
// TODO: Retry on transient errors?
384431

385432
// It's a limbo doc. Create a synthetic event saying it was deleted. This is kind of a hack.
@@ -540,6 +587,9 @@ private void removeLimboTarget(DocumentKey key) {
540587
remoteStore.stopListening(targetId);
541588
limboTargetsByKey.remove(key);
542589
limboResolutionsByTarget.remove(targetId);
590+
if (!limboListenQueue.isEmpty()) {
591+
remoteStore.listen(limboListenQueue.remove().targetData);
592+
}
543593
}
544594
}
545595

@@ -616,7 +666,11 @@ private void trackLimboChange(LimboDocumentChange change) {
616666
ListenSequence.INVALID,
617667
QueryPurpose.LIMBO_RESOLUTION);
618668
limboResolutionsByTarget.put(limboTargetId, new LimboResolution(key));
619-
remoteStore.listen(targetData);
669+
if (limboResolutionsByTarget.size() <= maxConcurrentLimboResolutions) {
670+
remoteStore.listen(targetData);
671+
} else {
672+
limboListenQueue.add(new LimboListenQueueEntry(key, targetData));
673+
}
620674
limboTargetsByKey.put(key, limboTargetId);
621675
}
622676
}
@@ -627,6 +681,15 @@ public Map<DocumentKey, Integer> getCurrentLimboDocuments() {
627681
return new HashMap<>(limboTargetsByKey);
628682
}
629683

684+
@VisibleForTesting
685+
public List<DocumentKey> getEnqueuedLimboDocuments() {
686+
ArrayList<DocumentKey> list = new ArrayList<>(limboListenQueue.size());
687+
for (LimboListenQueueEntry entry : limboListenQueue) {
688+
list.add(entry.key);
689+
}
690+
return list;
691+
}
692+
630693
public void handleCredentialChange(User user) {
631694
boolean userChanged = !currentUser.equals(user);
632695
currentUser = user;

firebase-firestore/src/test/java/com/google/firebase/firestore/spec/SpecTestCase.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ public abstract class SpecTestCase implements RemoteStoreCallback {
153153
: Sets.newHashSet("no-android", BENCHMARK_TAG, "multi-client");
154154

155155
private boolean garbageCollectionEnabled;
156+
private Integer maxNumConcurrentLimboResolutions;
156157
private boolean networkEnabled = true;
157158

158159
//
@@ -250,6 +251,8 @@ protected void specSetUp(JSONObject config) {
250251
outstandingWrites = new HashMap<>();
251252

252253
this.garbageCollectionEnabled = config.optBoolean("useGarbageCollection", false);
254+
this.maxNumConcurrentLimboResolutions =
255+
(Integer) config.opt("maxNumConcurrentLimboResolutions");
253256

254257
currentUser = User.UNAUTHENTICATED;
255258

@@ -293,7 +296,12 @@ private void initClient() {
293296
ConnectivityMonitor connectivityMonitor =
294297
new AndroidConnectivityMonitor(ApplicationProvider.getApplicationContext());
295298
remoteStore = new RemoteStore(this, localStore, datastore, queue, connectivityMonitor);
296-
syncEngine = new SyncEngine(localStore, remoteStore, currentUser);
299+
if (maxNumConcurrentLimboResolutions == null) {
300+
syncEngine = new SyncEngine(localStore, remoteStore, currentUser);
301+
} else {
302+
syncEngine =
303+
new SyncEngine(localStore, remoteStore, currentUser, maxNumConcurrentLimboResolutions);
304+
}
297305
eventManager = new EventManager(syncEngine);
298306
localStore.start();
299307
remoteStore.start();
@@ -988,12 +996,20 @@ private void validateLimboDocs() {
988996
@SuppressWarnings("VisibleForTests")
989997
Map<DocumentKey, Integer> actualLimboDocs =
990998
new HashMap<>(syncEngine.getCurrentLimboDocuments());
999+
@SuppressWarnings("VisibleForTests")
1000+
List<DocumentKey> enqueuedLimboDocuments = syncEngine.getEnqueuedLimboDocuments();
9911001

9921002
// Validate that each limbo doc has an expected active target
9931003
for (Map.Entry<DocumentKey, Integer> limboDoc : actualLimboDocs.entrySet()) {
994-
assertTrue(
995-
"Found limbo doc " + limboDoc.getKey() + " without an expected active target",
996-
expectedActiveTargets.containsKey(limboDoc.getValue()));
1004+
if (enqueuedLimboDocuments.contains(limboDoc.getKey())) {
1005+
assertFalse(
1006+
"Found limbo doc " + limboDoc.getKey() + " with an unexpected active target",
1007+
expectedActiveTargets.containsKey(limboDoc.getValue()));
1008+
} else {
1009+
assertTrue(
1010+
"Found limbo doc " + limboDoc.getKey() + " without an expected active target",
1011+
expectedActiveTargets.containsKey(limboDoc.getValue()));
1012+
}
9971013
}
9981014

9991015
for (DocumentKey expectedLimboDoc : expectedLimboDocs) {

0 commit comments

Comments
 (0)