16
16
17
17
import static com .google .firebase .firestore .util .Assert .hardAssert ;
18
18
import static java .util .Arrays .asList ;
19
- import static java .util .Collections .singletonList ;
20
19
21
20
import android .util .SparseArray ;
22
21
import com .google .firebase .Timestamp ;
36
35
import com .google .firebase .firestore .remote .TargetChange ;
37
36
import com .google .firebase .firestore .util .Logger ;
38
37
import com .google .protobuf .ByteString ;
39
- import java .util .ArrayList ;
40
- import java .util .Collections ;
41
38
import java .util .HashSet ;
42
39
import java .util .List ;
43
40
import java .util .Map ;
@@ -122,17 +119,6 @@ public final class LocalStore {
122
119
/** Used to generate targetIds for queries tracked locally. */
123
120
private final TargetIdGenerator targetIdGenerator ;
124
121
125
- /**
126
- * A heldBatchResult is a mutation batch result (from a write acknowledgement) that arrived before
127
- * the watch stream got notified of a snapshot that includes the write. So we "hold" it until the
128
- * watch stream catches up. It ensures that the local write remains visible (latency compensation)
129
- * and doesn't temporarily appear reverted because the watch stream is slower than the write
130
- * stream and so wasn't reflecting it.
131
- *
132
- * <p>NOTE: Eventually we want to move this functionality into the remote store.
133
- */
134
- private final List <MutationBatchResult > heldBatchResults ;
135
-
136
122
public LocalStore (Persistence persistence , User initialUser ) {
137
123
hardAssert (
138
124
persistence .isStarted (), "LocalStore was passed an unstarted persistence implementation" );
@@ -149,39 +135,10 @@ public LocalStore(Persistence persistence, User initialUser) {
149
135
persistence .getReferenceDelegate ().setAdditionalReferences (localViewReferences );
150
136
151
137
targetIds = new SparseArray <>();
152
- heldBatchResults = new ArrayList <>();
153
138
}
154
139
155
140
public void start () {
156
- startMutationQueue ();
157
- }
158
-
159
- private void startMutationQueue () {
160
- persistence .runTransaction (
161
- "Start MutationQueue" ,
162
- () -> {
163
- mutationQueue .start ();
164
-
165
- // If we have any leftover mutation batch results from a prior run, just drop them.
166
- // TODO: We may need to repopulate heldBatchResults or similar instead,
167
- // but that is not straightforward since we're not persisting the write ack versions.
168
- heldBatchResults .clear ();
169
-
170
- // TODO: This is the only usage of getAllMutationBatchesThroughBatchId().
171
- // Consider removing it in favor of a getAcknowledgedBatches method.
172
- int highestAck = mutationQueue .getHighestAcknowledgedBatchId ();
173
- if (highestAck != MutationBatch .UNKNOWN ) {
174
- List <MutationBatch > batches =
175
- mutationQueue .getAllMutationBatchesThroughBatchId (highestAck );
176
- if (!batches .isEmpty ()) {
177
- // NOTE: This could be more efficient if we had a removeBatchesThroughBatchID, but
178
- // this set should be very small and this code should go away eventually.
179
- for (MutationBatch batch : batches ) {
180
- mutationQueue .removeMutationBatch (batch );
181
- }
182
- }
183
- }
184
- });
141
+ mutationQueue .start ();
185
142
}
186
143
187
144
// PORTING NOTE: no shutdown for LocalStore or persistence components on Android.
@@ -191,7 +148,7 @@ public ImmutableSortedMap<DocumentKey, MaybeDocument> handleUserChange(User user
191
148
List <MutationBatch > oldBatches = mutationQueue .getAllMutationBatches ();
192
149
193
150
mutationQueue = persistence .getMutationQueue (user );
194
- startMutationQueue ();
151
+ mutationQueue . start ();
195
152
196
153
List <MutationBatch > newBatches = mutationQueue .getAllMutationBatches ();
197
154
@@ -249,18 +206,11 @@ public ImmutableSortedMap<DocumentKey, MaybeDocument> acknowledgeBatch(
249
206
return persistence .runTransaction (
250
207
"Acknowledge batch" ,
251
208
() -> {
252
- mutationQueue .acknowledgeBatch (batchResult .getBatch (), batchResult .getStreamToken ());
253
-
254
- Set <DocumentKey > affected ;
255
- if (shouldHoldBatchResult (batchResult .getCommitVersion ())) {
256
- heldBatchResults .add (batchResult );
257
- affected = Collections .emptySet ();
258
- } else {
259
- affected = releaseBatchResults (singletonList (batchResult ));
260
- }
261
-
209
+ MutationBatch batch = batchResult .getBatch ();
210
+ mutationQueue .acknowledgeBatch (batch , batchResult .getStreamToken ());
211
+ applyBatchResult (batchResult );
262
212
mutationQueue .performConsistencyCheck ();
263
- return localDocuments .getDocuments (affected );
213
+ return localDocuments .getDocuments (batch . getKeys () );
264
214
});
265
215
}
266
216
@@ -282,9 +232,9 @@ public ImmutableSortedMap<DocumentKey, MaybeDocument> rejectBatch(int batchId) {
282
232
int lastAcked = mutationQueue .getHighestAcknowledgedBatchId ();
283
233
hardAssert (batchId > lastAcked , "Acknowledged batches can't be rejected." );
284
234
285
- Set < DocumentKey > affectedKeys = removeMutationBatch (toReject );
235
+ mutationQueue . removeMutationBatch (toReject );
286
236
mutationQueue .performConsistencyCheck ();
287
- return localDocuments .getDocuments (affectedKeys );
237
+ return localDocuments .getDocuments (toReject . getKeys () );
288
238
});
289
239
}
290
240
@@ -421,10 +371,6 @@ public ImmutableSortedMap<DocumentKey, MaybeDocument> applyRemoteEvent(RemoteEve
421
371
queryCache .setLastRemoteSnapshotVersion (remoteVersion );
422
372
}
423
373
424
- Set <DocumentKey > releasedWriteKeys = releaseHeldBatchResults ();
425
-
426
- // Union the two key sets.
427
- changedDocKeys .addAll (releasedWriteKeys );
428
374
return localDocuments .getDocuments (changedDocKeys );
429
375
});
430
376
}
@@ -563,12 +509,6 @@ public void releaseQuery(Query query) {
563
509
localViewReferences .removeReferencesForId (queryData .getTargetId ());
564
510
persistence .getReferenceDelegate ().removeTarget (queryData );
565
511
targetIds .remove (queryData .getTargetId ());
566
-
567
- // If this was the last watch target, then we won't get any more watch snapshots, so we
568
- // should release any held batch results.
569
- if (targetIds .size () == 0 ) {
570
- releaseHeldBatchResults ();
571
- }
572
512
});
573
513
}
574
514
@@ -585,70 +525,6 @@ public ImmutableSortedSet<DocumentKey> getRemoteDocumentKeys(int targetId) {
585
525
return queryCache .getMatchingKeysForTargetId (targetId );
586
526
}
587
527
588
- /**
589
- * Releases all the held batch results up to the current remote version received, and applies
590
- * their mutations to the docs in the remote documents cache.
591
- *
592
- * @return the set of keys of docs that were modified by those writes.
593
- */
594
- private Set <DocumentKey > releaseHeldBatchResults () {
595
- ArrayList <MutationBatchResult > toRelease = new ArrayList <>();
596
- for (MutationBatchResult batchResult : heldBatchResults ) {
597
- if (!isRemoteUpToVersion (batchResult .getCommitVersion ())) {
598
- break ;
599
- }
600
- toRelease .add (batchResult );
601
- }
602
-
603
- if (toRelease .isEmpty ()) {
604
- return Collections .emptySet ();
605
- } else {
606
- heldBatchResults .subList (0 , toRelease .size ()).clear ();
607
- return releaseBatchResults (toRelease );
608
- }
609
- }
610
-
611
- private boolean isRemoteUpToVersion (SnapshotVersion snapshotVersion ) {
612
- // If there are no watch targets, then we won't get remote snapshots, and are always
613
- // "up-to-date."
614
- return snapshotVersion .compareTo (queryCache .getLastRemoteSnapshotVersion ()) <= 0
615
- || targetIds .size () == 0 ;
616
- }
617
-
618
- private boolean shouldHoldBatchResult (SnapshotVersion snapshotVersion ) {
619
- // Check if watcher isn't up to date or prior results are already held.
620
- return !isRemoteUpToVersion (snapshotVersion ) || !heldBatchResults .isEmpty ();
621
- }
622
-
623
- private Set <DocumentKey > releaseBatchResults (List <MutationBatchResult > batchResults ) {
624
- ArrayList <MutationBatch > batches = new ArrayList <>(batchResults .size ());
625
- // TODO: Call queryEngine.handleDocumentChange() as appropriate.
626
- for (MutationBatchResult batchResult : batchResults ) {
627
- applyBatchResult (batchResult );
628
- batches .add (batchResult .getBatch ());
629
- }
630
-
631
- return removeMutationBatches (batches );
632
- }
633
-
634
- private Set <DocumentKey > removeMutationBatch (MutationBatch batch ) {
635
- return removeMutationBatches (singletonList (batch ));
636
- }
637
-
638
- /** Removes the given mutation batches. */
639
- private Set <DocumentKey > removeMutationBatches (List <MutationBatch > batches ) {
640
-
641
- Set <DocumentKey > affectedDocs = new HashSet <>();
642
- for (MutationBatch batch : batches ) {
643
- for (Mutation mutation : batch .getMutations ()) {
644
- affectedDocs .add (mutation .getKey ());
645
- }
646
- mutationQueue .removeMutationBatch (batch );
647
- }
648
-
649
- return affectedDocs ;
650
- }
651
-
652
528
private void applyBatchResult (MutationBatchResult batchResult ) {
653
529
MutationBatch batch = batchResult .getBatch ();
654
530
Set <DocumentKey > docKeys = batch .getKeys ();
@@ -671,5 +547,7 @@ private void applyBatchResult(MutationBatchResult batchResult) {
671
547
}
672
548
}
673
549
}
550
+
551
+ mutationQueue .removeMutationBatch (batch );
674
552
}
675
553
}
0 commit comments