diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/core/Transaction.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/core/Transaction.java index 961ff82b53f..1986fef30d8 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/core/Transaction.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/core/Transaction.java @@ -14,6 +14,8 @@ package com.google.firebase.firestore.core; +import static com.google.firebase.firestore.util.Assert.fail; + import com.google.android.gms.tasks.Task; import com.google.android.gms.tasks.Tasks; import com.google.firebase.firestore.FirebaseFirestoreException; @@ -41,8 +43,6 @@ import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; -import static com.google.firebase.firestore.util.Assert.fail; - /** * Internal transaction object responsible for accumulating the mutations to perform and the base * versions for any documents read. diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalStore.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalStore.java index 0529b72167f..a2b34dc7396 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalStore.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalStore.java @@ -16,7 +16,6 @@ import static com.google.firebase.firestore.util.Assert.hardAssert; import static java.util.Arrays.asList; -import static java.util.Collections.singletonList; import android.util.SparseArray; import com.google.firebase.Timestamp; @@ -36,8 +35,6 @@ import com.google.firebase.firestore.remote.TargetChange; import com.google.firebase.firestore.util.Logger; import com.google.protobuf.ByteString; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -122,17 +119,6 @@ public final class LocalStore { /** Used to generate targetIds for queries tracked locally. */ private final TargetIdGenerator targetIdGenerator; - /** - * A heldBatchResult is a mutation batch result (from a write acknowledgement) that arrived before - * the watch stream got notified of a snapshot that includes the write. So we "hold" it until the - * watch stream catches up. It ensures that the local write remains visible (latency compensation) - * and doesn't temporarily appear reverted because the watch stream is slower than the write - * stream and so wasn't reflecting it. - * - *

NOTE: Eventually we want to move this functionality into the remote store. - */ - private final List heldBatchResults; - public LocalStore(Persistence persistence, User initialUser) { hardAssert( persistence.isStarted(), "LocalStore was passed an unstarted persistence implementation"); @@ -149,39 +135,10 @@ public LocalStore(Persistence persistence, User initialUser) { persistence.getReferenceDelegate().setAdditionalReferences(localViewReferences); targetIds = new SparseArray<>(); - heldBatchResults = new ArrayList<>(); } public void start() { - startMutationQueue(); - } - - private void startMutationQueue() { - persistence.runTransaction( - "Start MutationQueue", - () -> { - mutationQueue.start(); - - // If we have any leftover mutation batch results from a prior run, just drop them. - // TODO: We may need to repopulate heldBatchResults or similar instead, - // but that is not straightforward since we're not persisting the write ack versions. - heldBatchResults.clear(); - - // TODO: This is the only usage of getAllMutationBatchesThroughBatchId(). - // Consider removing it in favor of a getAcknowledgedBatches method. - int highestAck = mutationQueue.getHighestAcknowledgedBatchId(); - if (highestAck != MutationBatch.UNKNOWN) { - List batches = - mutationQueue.getAllMutationBatchesThroughBatchId(highestAck); - if (!batches.isEmpty()) { - // NOTE: This could be more efficient if we had a removeBatchesThroughBatchID, but - // this set should be very small and this code should go away eventually. - for (MutationBatch batch : batches) { - mutationQueue.removeMutationBatch(batch); - } - } - } - }); + mutationQueue.start(); } // PORTING NOTE: no shutdown for LocalStore or persistence components on Android. @@ -191,7 +148,7 @@ public ImmutableSortedMap handleUserChange(User user List oldBatches = mutationQueue.getAllMutationBatches(); mutationQueue = persistence.getMutationQueue(user); - startMutationQueue(); + mutationQueue.start(); List newBatches = mutationQueue.getAllMutationBatches(); @@ -249,18 +206,11 @@ public ImmutableSortedMap acknowledgeBatch( return persistence.runTransaction( "Acknowledge batch", () -> { - mutationQueue.acknowledgeBatch(batchResult.getBatch(), batchResult.getStreamToken()); - - Set affected; - if (shouldHoldBatchResult(batchResult.getCommitVersion())) { - heldBatchResults.add(batchResult); - affected = Collections.emptySet(); - } else { - affected = releaseBatchResults(singletonList(batchResult)); - } - + MutationBatch batch = batchResult.getBatch(); + mutationQueue.acknowledgeBatch(batch, batchResult.getStreamToken()); + applyBatchResult(batchResult); mutationQueue.performConsistencyCheck(); - return localDocuments.getDocuments(affected); + return localDocuments.getDocuments(batch.getKeys()); }); } @@ -282,9 +232,9 @@ public ImmutableSortedMap rejectBatch(int batchId) { int lastAcked = mutationQueue.getHighestAcknowledgedBatchId(); hardAssert(batchId > lastAcked, "Acknowledged batches can't be rejected."); - Set affectedKeys = removeMutationBatch(toReject); + mutationQueue.removeMutationBatch(toReject); mutationQueue.performConsistencyCheck(); - return localDocuments.getDocuments(affectedKeys); + return localDocuments.getDocuments(toReject.getKeys()); }); } @@ -421,10 +371,6 @@ public ImmutableSortedMap applyRemoteEvent(RemoteEve queryCache.setLastRemoteSnapshotVersion(remoteVersion); } - Set releasedWriteKeys = releaseHeldBatchResults(); - - // Union the two key sets. - changedDocKeys.addAll(releasedWriteKeys); return localDocuments.getDocuments(changedDocKeys); }); } @@ -563,12 +509,6 @@ public void releaseQuery(Query query) { localViewReferences.removeReferencesForId(queryData.getTargetId()); persistence.getReferenceDelegate().removeTarget(queryData); targetIds.remove(queryData.getTargetId()); - - // If this was the last watch target, then we won't get any more watch snapshots, so we - // should release any held batch results. - if (targetIds.size() == 0) { - releaseHeldBatchResults(); - } }); } @@ -585,70 +525,6 @@ public ImmutableSortedSet getRemoteDocumentKeys(int targetId) { return queryCache.getMatchingKeysForTargetId(targetId); } - /** - * Releases all the held batch results up to the current remote version received, and applies - * their mutations to the docs in the remote documents cache. - * - * @return the set of keys of docs that were modified by those writes. - */ - private Set releaseHeldBatchResults() { - ArrayList toRelease = new ArrayList<>(); - for (MutationBatchResult batchResult : heldBatchResults) { - if (!isRemoteUpToVersion(batchResult.getCommitVersion())) { - break; - } - toRelease.add(batchResult); - } - - if (toRelease.isEmpty()) { - return Collections.emptySet(); - } else { - heldBatchResults.subList(0, toRelease.size()).clear(); - return releaseBatchResults(toRelease); - } - } - - private boolean isRemoteUpToVersion(SnapshotVersion snapshotVersion) { - // If there are no watch targets, then we won't get remote snapshots, and are always - // "up-to-date." - return snapshotVersion.compareTo(queryCache.getLastRemoteSnapshotVersion()) <= 0 - || targetIds.size() == 0; - } - - private boolean shouldHoldBatchResult(SnapshotVersion snapshotVersion) { - // Check if watcher isn't up to date or prior results are already held. - return !isRemoteUpToVersion(snapshotVersion) || !heldBatchResults.isEmpty(); - } - - private Set releaseBatchResults(List batchResults) { - ArrayList batches = new ArrayList<>(batchResults.size()); - // TODO: Call queryEngine.handleDocumentChange() as appropriate. - for (MutationBatchResult batchResult : batchResults) { - applyBatchResult(batchResult); - batches.add(batchResult.getBatch()); - } - - return removeMutationBatches(batches); - } - - private Set removeMutationBatch(MutationBatch batch) { - return removeMutationBatches(singletonList(batch)); - } - - /** Removes the given mutation batches. */ - private Set removeMutationBatches(List batches) { - - Set affectedDocs = new HashSet<>(); - for (MutationBatch batch : batches) { - for (Mutation mutation : batch.getMutations()) { - affectedDocs.add(mutation.getKey()); - } - mutationQueue.removeMutationBatch(batch); - } - - return affectedDocs; - } - private void applyBatchResult(MutationBatchResult batchResult) { MutationBatch batch = batchResult.getBatch(); Set docKeys = batch.getKeys(); @@ -671,5 +547,7 @@ private void applyBatchResult(MutationBatchResult batchResult) { } } } + + mutationQueue.removeMutationBatch(batch); } } diff --git a/firebase-firestore/src/test/java/com/google/firebase/firestore/local/LocalStoreTestCase.java b/firebase-firestore/src/test/java/com/google/firebase/firestore/local/LocalStoreTestCase.java index 816ab038f8c..8d4f96580c2 100644 --- a/firebase-firestore/src/test/java/com/google/firebase/firestore/local/LocalStoreTestCase.java +++ b/firebase-firestore/src/test/java/com/google/firebase/firestore/local/LocalStoreTestCase.java @@ -255,8 +255,15 @@ public void testHandlesAckThenRejectThenRemoteEvent() { // The last seen version is zero, so this ack must be held. acknowledgeMutation(1); - assertChanged(); - assertContains(doc("foo/bar", 0, map("foo", "bar"), Document.DocumentState.LOCAL_MUTATIONS)); + assertChanged(doc("foo/bar", 1, map("foo", "bar"), Document.DocumentState.COMMITTED_MUTATIONS)); + if (garbageCollectorIsEager()) { + // Nothing is pinning this anymore, as it has been acknowledged and there are no targets + // active. + assertNotContains("foo/bar"); + } else { + assertContains( + doc("foo/bar", 1, map("foo", "bar"), Document.DocumentState.COMMITTED_MUTATIONS)); + } writeMutation(setMutation("bar/baz", map("bar", "baz"))); assertChanged(doc("bar/baz", 0, map("bar", "baz"), Document.DocumentState.LOCAL_MUTATIONS)); @@ -327,8 +334,9 @@ public void testHandlesDocumentThenSetMutationThenAckThenDocument() { acknowledgeMutation(3); // We haven't seen the remote event yet. - assertChanged(); - assertContains(doc("foo/bar", 2, map("foo", "bar"), Document.DocumentState.LOCAL_MUTATIONS)); + assertChanged(doc("foo/bar", 3, map("foo", "bar"), Document.DocumentState.COMMITTED_MUTATIONS)); + assertContains( + doc("foo/bar", 3, map("foo", "bar"), Document.DocumentState.COMMITTED_MUTATIONS)); applyRemoteEvent( updateRemoteEvent(doc("foo/bar", 3, map("it", "changed")), asList(targetId), emptyList())); @@ -374,9 +382,18 @@ public void testHandlesPatchMutationThenDocumentThenAck() { acknowledgeMutation(2); - assertChanged(); + assertChanged( + doc( + "foo/bar", + 2, + map("foo", "bar", "it", "base"), + Document.DocumentState.COMMITTED_MUTATIONS)); assertContains( - doc("foo/bar", 1, map("foo", "bar", "it", "base"), Document.DocumentState.LOCAL_MUTATIONS)); + doc( + "foo/bar", + 2, + map("foo", "bar", "it", "base"), + Document.DocumentState.COMMITTED_MUTATIONS)); applyRemoteEvent( updateRemoteEvent(