48
48
import com .google .firebase .firestore .util .Logger ;
49
49
import com .google .firebase .firestore .util .Util ;
50
50
import io .grpc .Status ;
51
+ import java .util .ArrayDeque ;
51
52
import java .util .ArrayList ;
52
53
import java .util .Collections ;
53
54
import java .util .HashMap ;
54
55
import java .util .List ;
55
56
import java .util .Map ;
57
+ import java .util .Queue ;
56
58
import java .util .Set ;
57
59
58
60
/**
@@ -115,17 +117,22 @@ interface SyncEngineCallback {
115
117
/** Queries mapped to active targets, indexed by target id. */
116
118
private final Map <Integer , List <Query >> queriesByTarget ;
117
119
120
+ private final int maxConcurrentLimboResolutions ;
121
+
118
122
/**
119
- * When a document is in limbo, we create a special listen to resolve it. This maps the
120
- * DocumentKey of each limbo document to the target ID of the listen resolving it .
123
+ * The keys of documents that are in limbo for which we haven't yet started a limbo resolution
124
+ * query .
121
125
*/
122
- private final Map <DocumentKey , Integer > limboTargetsByKey ;
126
+ private final Queue <DocumentKey > enqueuedLimboResolutions ;
127
+
128
+ /** Keeps track of the target ID for each document that is in limbo with an active target. */
129
+ private final Map <DocumentKey , Integer > activeLimboTargetsByKey ;
123
130
124
131
/**
125
- * Basically the inverse of limboTargetsByKey, a map of target ID to a LimboResolution (which
126
- * includes the DocumentKey as well as whether we've received a document for the target) .
132
+ * Keeps track of the information about an active limbo resolution for each active target ID that
133
+ * was started for the purpose of limbo resolution .
127
134
*/
128
- private final Map <Integer , LimboResolution > limboResolutionsByTarget ;
135
+ private final Map <Integer , LimboResolution > activeLimboResolutionsByTarget ;
129
136
130
137
/** Used to track any documents that are currently in limbo. */
131
138
private final ReferenceSet limboDocumentRefs ;
@@ -143,15 +150,21 @@ interface SyncEngineCallback {
143
150
144
151
private SyncEngineCallback syncEngineListener ;
145
152
146
- public SyncEngine (LocalStore localStore , RemoteStore remoteStore , User initialUser ) {
153
+ public SyncEngine (
154
+ LocalStore localStore ,
155
+ RemoteStore remoteStore ,
156
+ User initialUser ,
157
+ int maxConcurrentLimboResolutions ) {
147
158
this .localStore = localStore ;
148
159
this .remoteStore = remoteStore ;
160
+ this .maxConcurrentLimboResolutions = maxConcurrentLimboResolutions ;
149
161
150
162
queryViewsByQuery = new HashMap <>();
151
163
queriesByTarget = new HashMap <>();
152
164
153
- limboTargetsByKey = new HashMap <>();
154
- limboResolutionsByTarget = new HashMap <>();
165
+ enqueuedLimboResolutions = new ArrayDeque <>();
166
+ activeLimboTargetsByKey = new HashMap <>();
167
+ activeLimboResolutionsByTarget = new HashMap <>();
155
168
limboDocumentRefs = new ReferenceSet ();
156
169
157
170
mutationUserCallbacks = new HashMap <>();
@@ -298,7 +311,7 @@ public void handleRemoteEvent(RemoteEvent event) {
298
311
for (Map .Entry <Integer , TargetChange > entry : event .getTargetChanges ().entrySet ()) {
299
312
Integer targetId = entry .getKey ();
300
313
TargetChange targetChange = entry .getValue ();
301
- LimboResolution limboResolution = limboResolutionsByTarget .get (targetId );
314
+ LimboResolution limboResolution = activeLimboResolutionsByTarget .get (targetId );
302
315
if (limboResolution != null ) {
303
316
// Since this is a limbo resolution lookup, it's for a single document and it could be
304
317
// added, modified, or removed, but not a combination.
@@ -349,7 +362,7 @@ public void handleOnlineStateChange(OnlineState onlineState) {
349
362
350
363
@ Override
351
364
public ImmutableSortedSet <DocumentKey > getRemoteKeysForTarget (int targetId ) {
352
- LimboResolution limboResolution = limboResolutionsByTarget .get (targetId );
365
+ LimboResolution limboResolution = activeLimboResolutionsByTarget .get (targetId );
353
366
if (limboResolution != null && limboResolution .receivedDocument ) {
354
367
return DocumentKey .emptyKeySet ().insert (limboResolution .key );
355
368
} else {
@@ -372,13 +385,14 @@ public ImmutableSortedSet<DocumentKey> getRemoteKeysForTarget(int targetId) {
372
385
public void handleRejectedListen (int targetId , Status error ) {
373
386
assertCallback ("handleRejectedListen" );
374
387
375
- LimboResolution limboResolution = limboResolutionsByTarget .get (targetId );
388
+ LimboResolution limboResolution = activeLimboResolutionsByTarget .get (targetId );
376
389
DocumentKey limboKey = limboResolution != null ? limboResolution .key : null ;
377
390
if (limboKey != null ) {
378
391
// Since this query failed, we won't want to manually unlisten to it.
379
392
// So go ahead and remove it from bookkeeping.
380
- limboTargetsByKey .remove (limboKey );
381
- limboResolutionsByTarget .remove (targetId );
393
+ activeLimboTargetsByKey .remove (limboKey );
394
+ activeLimboResolutionsByTarget .remove (targetId );
395
+ pumpEnqueuedLimboResolutions ();
382
396
383
397
// TODO: Retry on transient errors?
384
398
@@ -535,11 +549,12 @@ private void removeAndCleanupTarget(int targetId, Status status) {
535
549
private void removeLimboTarget (DocumentKey key ) {
536
550
// It's possible that the target already got removed because the query failed. In that case,
537
551
// the key won't exist in `limboTargetsByKey`. Only do the cleanup if we still have the target.
538
- Integer targetId = limboTargetsByKey .get (key );
552
+ Integer targetId = activeLimboTargetsByKey .get (key );
539
553
if (targetId != null ) {
540
554
remoteStore .stopListening (targetId );
541
- limboTargetsByKey .remove (key );
542
- limboResolutionsByTarget .remove (targetId );
555
+ activeLimboTargetsByKey .remove (key );
556
+ activeLimboResolutionsByTarget .remove (targetId );
557
+ pumpEnqueuedLimboResolutions ();
543
558
}
544
559
}
545
560
@@ -605,26 +620,47 @@ private void updateTrackedLimboDocuments(List<LimboDocumentChange> limboChanges,
605
620
606
621
private void trackLimboChange (LimboDocumentChange change ) {
607
622
DocumentKey key = change .getKey ();
608
- if (!limboTargetsByKey .containsKey (key )) {
623
+ if (!activeLimboTargetsByKey .containsKey (key )) {
609
624
Logger .debug (TAG , "New document in limbo: %s" , key );
625
+ enqueuedLimboResolutions .add (key );
626
+ pumpEnqueuedLimboResolutions ();
627
+ }
628
+ }
629
+
630
+ /**
631
+ * Starts listens for documents in limbo that are enqueued for resolution, subject to a maximum
632
+ * number of concurrent resolutions.
633
+ *
634
+ * <p>Without bounding the number of concurrent resolutions, the server can fail with "resource
635
+ * exhausted" errors which can lead to pathological client behavior as seen in
636
+ * https://github.com/firebase/firebase-js-sdk/issues/2683.
637
+ */
638
+ private void pumpEnqueuedLimboResolutions () {
639
+ while (!enqueuedLimboResolutions .isEmpty ()
640
+ && activeLimboTargetsByKey .size () < maxConcurrentLimboResolutions ) {
641
+ DocumentKey key = enqueuedLimboResolutions .remove ();
610
642
int limboTargetId = targetIdGenerator .nextId ();
611
- Query query = Query .atPath (key .getPath ());
612
- TargetData targetData =
643
+ activeLimboResolutionsByTarget .put (limboTargetId , new LimboResolution (key ));
644
+ activeLimboTargetsByKey .put (key , limboTargetId );
645
+ remoteStore .listen (
613
646
new TargetData (
614
- query .toTarget (),
647
+ Query . atPath ( key . getPath ()) .toTarget (),
615
648
limboTargetId ,
616
649
ListenSequence .INVALID ,
617
- QueryPurpose .LIMBO_RESOLUTION );
618
- limboResolutionsByTarget .put (limboTargetId , new LimboResolution (key ));
619
- remoteStore .listen (targetData );
620
- limboTargetsByKey .put (key , limboTargetId );
650
+ QueryPurpose .LIMBO_RESOLUTION ));
621
651
}
622
652
}
623
653
624
654
@ VisibleForTesting
625
- public Map <DocumentKey , Integer > getCurrentLimboDocuments () {
655
+ public Map <DocumentKey , Integer > getActiveLimboDocumentResolutions () {
626
656
// Make a defensive copy as the Map continues to be modified.
627
- return new HashMap <>(limboTargetsByKey );
657
+ return new HashMap <>(activeLimboTargetsByKey );
658
+ }
659
+
660
+ @ VisibleForTesting
661
+ public Queue <DocumentKey > getEnqueuedLimboDocumentResolutions () {
662
+ // Make a defensive copy as the Queue continues to be modified.
663
+ return new ArrayDeque <>(enqueuedLimboResolutions );
628
664
}
629
665
630
666
public void handleCredentialChange (User user ) {
0 commit comments