17
17
import static com .google .firebase .firestore .util .Assert .fail ;
18
18
import static com .google .firebase .firestore .util .Assert .hardAssert ;
19
19
20
+ import androidx .annotation .NonNull ;
20
21
import androidx .annotation .Nullable ;
21
22
import androidx .annotation .VisibleForTesting ;
22
23
import com .google .android .gms .tasks .Task ;
48
49
import com .google .firebase .firestore .util .Logger ;
49
50
import com .google .firebase .firestore .util .Util ;
50
51
import io .grpc .Status ;
52
+ import java .util .ArrayDeque ;
51
53
import java .util .ArrayList ;
52
54
import java .util .Collections ;
53
55
import java .util .HashMap ;
54
56
import java .util .List ;
55
57
import java .util .Map ;
58
+ import java .util .Queue ;
56
59
import java .util .Set ;
57
60
58
61
/**
@@ -89,8 +92,28 @@ private static class LimboResolution {
89
92
}
90
93
}
91
94
95
+ /** An entry in {@link #limboListenQueue}. */
96
+ private static final class LimboListenQueueEntry {
97
+
98
+ final DocumentKey key ;
99
+ final TargetData targetData ;
100
+
101
+ LimboListenQueueEntry (DocumentKey key , TargetData targetData ) {
102
+ this .key = key ;
103
+ this .targetData = targetData ;
104
+ }
105
+
106
+ @ NonNull
107
+ @ Override
108
+ public String toString () {
109
+ return "key=" + key + " targetData=" + targetData ;
110
+ }
111
+ }
112
+
92
113
private static final String TAG = SyncEngine .class .getSimpleName ();
93
114
115
+ private static final int DEFAULT_MAX_CONCURRENT_LIMBO_RESOLUTIONS = 100 ;
116
+
94
117
/** Interface implemented by EventManager to handle notifications from SyncEngine. */
95
118
interface SyncEngineCallback {
96
119
/** Handles new view snapshots. */
@@ -109,6 +132,8 @@ interface SyncEngineCallback {
109
132
/** The remote store for sending writes, watches, etc. to the backend. */
110
133
private final RemoteStore remoteStore ;
111
134
135
+ private final int maxConcurrentLimboResolutions ;
136
+
112
137
/** QueryViews for all active queries, indexed by query. */
113
138
private final Map <Query , QueryView > queryViewsByQuery ;
114
139
@@ -127,6 +152,13 @@ interface SyncEngineCallback {
127
152
*/
128
153
private final Map <Integer , LimboResolution > limboResolutionsByTarget ;
129
154
155
+ /**
156
+ * The list of enqueued limbo resolutions. After {@link #maxConcurrentLimboResolutions} listens
157
+ * have started for the purpose of limbo resolutions then additional limbo resolution listens are
158
+ * enqueued in this queue. When a listen completes then a listen is dequeued and started.
159
+ */
160
+ private final Queue <LimboListenQueueEntry > limboListenQueue ;
161
+
130
162
/** Used to track any documents that are currently in limbo. */
131
163
private final ReferenceSet limboDocumentRefs ;
132
164
@@ -144,14 +176,24 @@ interface SyncEngineCallback {
144
176
private SyncEngineCallback syncEngineListener ;
145
177
146
178
public SyncEngine (LocalStore localStore , RemoteStore remoteStore , User initialUser ) {
179
+ this (localStore , remoteStore , initialUser , DEFAULT_MAX_CONCURRENT_LIMBO_RESOLUTIONS );
180
+ }
181
+
182
+ public SyncEngine (
183
+ LocalStore localStore ,
184
+ RemoteStore remoteStore ,
185
+ User initialUser ,
186
+ int maxConcurrentLimboResolutions ) {
147
187
this .localStore = localStore ;
148
188
this .remoteStore = remoteStore ;
189
+ this .maxConcurrentLimboResolutions = maxConcurrentLimboResolutions ;
149
190
150
191
queryViewsByQuery = new HashMap <>();
151
192
queriesByTarget = new HashMap <>();
152
193
153
194
limboTargetsByKey = new HashMap <>();
154
195
limboResolutionsByTarget = new HashMap <>();
196
+ limboListenQueue = new ArrayDeque <>();
155
197
limboDocumentRefs = new ReferenceSet ();
156
198
157
199
mutationUserCallbacks = new HashMap <>();
@@ -380,6 +422,10 @@ public void handleRejectedListen(int targetId, Status error) {
380
422
limboTargetsByKey .remove (limboKey );
381
423
limboResolutionsByTarget .remove (targetId );
382
424
425
+ if (!limboListenQueue .isEmpty ()) {
426
+ remoteStore .listen (limboListenQueue .remove ().targetData );
427
+ }
428
+
383
429
// TODO: Retry on transient errors?
384
430
385
431
// It's a limbo doc. Create a synthetic event saying it was deleted. This is kind of a hack.
@@ -540,6 +586,9 @@ private void removeLimboTarget(DocumentKey key) {
540
586
remoteStore .stopListening (targetId );
541
587
limboTargetsByKey .remove (key );
542
588
limboResolutionsByTarget .remove (targetId );
589
+ if (!limboListenQueue .isEmpty ()) {
590
+ remoteStore .listen (limboListenQueue .remove ().targetData );
591
+ }
543
592
}
544
593
}
545
594
@@ -616,7 +665,11 @@ private void trackLimboChange(LimboDocumentChange change) {
616
665
ListenSequence .INVALID ,
617
666
QueryPurpose .LIMBO_RESOLUTION );
618
667
limboResolutionsByTarget .put (limboTargetId , new LimboResolution (key ));
619
- remoteStore .listen (targetData );
668
+ if (limboResolutionsByTarget .size () <= maxConcurrentLimboResolutions ) {
669
+ remoteStore .listen (targetData );
670
+ } else {
671
+ limboListenQueue .add (new LimboListenQueueEntry (key , targetData ));
672
+ }
620
673
limboTargetsByKey .put (key , limboTargetId );
621
674
}
622
675
}
@@ -627,6 +680,15 @@ public Map<DocumentKey, Integer> getCurrentLimboDocuments() {
627
680
return new HashMap <>(limboTargetsByKey );
628
681
}
629
682
683
+ @ VisibleForTesting
684
+ public List <DocumentKey > getEnqueuedLimboDocuments () {
685
+ ArrayList <DocumentKey > list = new ArrayList <>(limboListenQueue .size ());
686
+ for (LimboListenQueueEntry entry : limboListenQueue ) {
687
+ list .add (entry .key );
688
+ }
689
+ return list ;
690
+ }
691
+
630
692
public void handleCredentialChange (User user ) {
631
693
boolean userChanged = !currentUser .equals (user );
632
694
currentUser = user ;
0 commit comments