17
17
import static com .google .firebase .firestore .util .Assert .fail ;
18
18
import static com .google .firebase .firestore .util .Assert .hardAssert ;
19
19
20
+ import androidx .annotation .NonNull ;
20
21
import androidx .annotation .Nullable ;
21
22
import androidx .annotation .VisibleForTesting ;
22
23
import com .google .android .gms .tasks .Task ;
48
49
import com .google .firebase .firestore .util .Logger ;
49
50
import com .google .firebase .firestore .util .Util ;
50
51
import io .grpc .Status ;
52
+ import java .util .ArrayDeque ;
51
53
import java .util .ArrayList ;
52
54
import java .util .Collections ;
53
55
import java .util .HashMap ;
54
56
import java .util .List ;
55
57
import java .util .Map ;
58
+ import java .util .Queue ;
56
59
import java .util .Set ;
57
60
58
61
/**
@@ -89,8 +92,28 @@ private static class LimboResolution {
89
92
}
90
93
}
91
94
95
+ /** An entry in {@link #limboListenQueue}. */
96
+ private static final class LimboListenQueueEntry {
97
+
98
+ final DocumentKey key ;
99
+ final TargetData targetData ;
100
+
101
+ LimboListenQueueEntry (DocumentKey key , TargetData targetData ) {
102
+ this .key = key ;
103
+ this .targetData = targetData ;
104
+ }
105
+
106
+ @ NonNull
107
+ @ Override
108
+ public String toString () {
109
+ return "key=" + key + " targetData=" + targetData ;
110
+ }
111
+ }
112
+
92
113
private static final String TAG = SyncEngine .class .getSimpleName ();
93
114
115
+ private static final int DEFAULT_MAX_CONCURRENT_LIMBO_RESOLUTIONS = 100 ;
116
+
94
117
/** Interface implemented by EventManager to handle notifications from SyncEngine. */
95
118
interface SyncEngineCallback {
96
119
/** Handles new view snapshots. */
@@ -109,6 +132,9 @@ interface SyncEngineCallback {
109
132
/** The remote store for sending writes, watches, etc. to the backend. */
110
133
private final RemoteStore remoteStore ;
111
134
135
+ /** The maximum number of concurrent active limbo resolutions; see {@link #limboListenQueue}. */
136
+ private final int maxConcurrentLimboResolutions ;
137
+
112
138
/** QueryViews for all active queries, indexed by query. */
113
139
private final Map <Query , QueryView > queryViewsByQuery ;
114
140
@@ -127,6 +153,13 @@ interface SyncEngineCallback {
127
153
*/
128
154
private final Map <Integer , LimboResolution > limboResolutionsByTarget ;
129
155
156
+ /**
157
+ * The list of enqueued limbo resolutions. After {@link #maxConcurrentLimboResolutions} listens
158
+ * have started for the purpose of limbo resolutions then additional limbo resolution listens are
159
+ * enqueued in this queue. When a listen completes then a listen is dequeued and started.
160
+ */
161
+ private final Queue <LimboListenQueueEntry > limboListenQueue ;
162
+
130
163
/** Used to track any documents that are currently in limbo. */
131
164
private final ReferenceSet limboDocumentRefs ;
132
165
@@ -144,14 +177,24 @@ interface SyncEngineCallback {
144
177
private SyncEngineCallback syncEngineListener ;
145
178
146
179
public SyncEngine (LocalStore localStore , RemoteStore remoteStore , User initialUser ) {
180
+ this (localStore , remoteStore , initialUser , DEFAULT_MAX_CONCURRENT_LIMBO_RESOLUTIONS );
181
+ }
182
+
183
+ public SyncEngine (
184
+ LocalStore localStore ,
185
+ RemoteStore remoteStore ,
186
+ User initialUser ,
187
+ int maxConcurrentLimboResolutions ) {
147
188
this .localStore = localStore ;
148
189
this .remoteStore = remoteStore ;
190
+ this .maxConcurrentLimboResolutions = maxConcurrentLimboResolutions ;
149
191
150
192
queryViewsByQuery = new HashMap <>();
151
193
queriesByTarget = new HashMap <>();
152
194
153
195
limboTargetsByKey = new HashMap <>();
154
196
limboResolutionsByTarget = new HashMap <>();
197
+ limboListenQueue = new ArrayDeque <>();
155
198
limboDocumentRefs = new ReferenceSet ();
156
199
157
200
mutationUserCallbacks = new HashMap <>();
@@ -380,6 +423,10 @@ public void handleRejectedListen(int targetId, Status error) {
380
423
limboTargetsByKey .remove (limboKey );
381
424
limboResolutionsByTarget .remove (targetId );
382
425
426
+ if (!limboListenQueue .isEmpty ()) {
427
+ remoteStore .listen (limboListenQueue .remove ().targetData );
428
+ }
429
+
383
430
// TODO: Retry on transient errors?
384
431
385
432
// It's a limbo doc. Create a synthetic event saying it was deleted. This is kind of a hack.
@@ -540,6 +587,9 @@ private void removeLimboTarget(DocumentKey key) {
540
587
remoteStore .stopListening (targetId );
541
588
limboTargetsByKey .remove (key );
542
589
limboResolutionsByTarget .remove (targetId );
590
+ if (!limboListenQueue .isEmpty ()) {
591
+ remoteStore .listen (limboListenQueue .remove ().targetData );
592
+ }
543
593
}
544
594
}
545
595
@@ -616,7 +666,11 @@ private void trackLimboChange(LimboDocumentChange change) {
616
666
ListenSequence .INVALID ,
617
667
QueryPurpose .LIMBO_RESOLUTION );
618
668
limboResolutionsByTarget .put (limboTargetId , new LimboResolution (key ));
619
- remoteStore .listen (targetData );
669
+ if (limboResolutionsByTarget .size () <= maxConcurrentLimboResolutions ) {
670
+ remoteStore .listen (targetData );
671
+ } else {
672
+ limboListenQueue .add (new LimboListenQueueEntry (key , targetData ));
673
+ }
620
674
limboTargetsByKey .put (key , limboTargetId );
621
675
}
622
676
}
@@ -627,6 +681,15 @@ public Map<DocumentKey, Integer> getCurrentLimboDocuments() {
627
681
return new HashMap <>(limboTargetsByKey );
628
682
}
629
683
684
+ @ VisibleForTesting
685
+ public List <DocumentKey > getEnqueuedLimboDocuments () {
686
+ ArrayList <DocumentKey > list = new ArrayList <>(limboListenQueue .size ());
687
+ for (LimboListenQueueEntry entry : limboListenQueue ) {
688
+ list .add (entry .key );
689
+ }
690
+ return list ;
691
+ }
692
+
630
693
public void handleCredentialChange (User user ) {
631
694
boolean userChanged = !currentUser .equals (user );
632
695
currentUser = user ;
0 commit comments