diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/core/Transaction.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/core/Transaction.java
index 961ff82b53f..1986fef30d8 100644
--- a/firebase-firestore/src/main/java/com/google/firebase/firestore/core/Transaction.java
+++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/core/Transaction.java
@@ -14,6 +14,8 @@
package com.google.firebase.firestore.core;
+import static com.google.firebase.firestore.util.Assert.fail;
+
import com.google.android.gms.tasks.Task;
import com.google.android.gms.tasks.Tasks;
import com.google.firebase.firestore.FirebaseFirestoreException;
@@ -41,8 +43,6 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
-import static com.google.firebase.firestore.util.Assert.fail;
-
/**
* Internal transaction object responsible for accumulating the mutations to perform and the base
* versions for any documents read.
diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalStore.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalStore.java
index 0529b72167f..a2b34dc7396 100644
--- a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalStore.java
+++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalStore.java
@@ -16,7 +16,6 @@
import static com.google.firebase.firestore.util.Assert.hardAssert;
import static java.util.Arrays.asList;
-import static java.util.Collections.singletonList;
import android.util.SparseArray;
import com.google.firebase.Timestamp;
@@ -36,8 +35,6 @@
import com.google.firebase.firestore.remote.TargetChange;
import com.google.firebase.firestore.util.Logger;
import com.google.protobuf.ByteString;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -122,17 +119,6 @@ public final class LocalStore {
/** Used to generate targetIds for queries tracked locally. */
private final TargetIdGenerator targetIdGenerator;
- /**
- * A heldBatchResult is a mutation batch result (from a write acknowledgement) that arrived before
- * the watch stream got notified of a snapshot that includes the write. So we "hold" it until the
- * watch stream catches up. It ensures that the local write remains visible (latency compensation)
- * and doesn't temporarily appear reverted because the watch stream is slower than the write
- * stream and so wasn't reflecting it.
- *
- *
NOTE: Eventually we want to move this functionality into the remote store.
- */
- private final List heldBatchResults;
-
public LocalStore(Persistence persistence, User initialUser) {
hardAssert(
persistence.isStarted(), "LocalStore was passed an unstarted persistence implementation");
@@ -149,39 +135,10 @@ public LocalStore(Persistence persistence, User initialUser) {
persistence.getReferenceDelegate().setAdditionalReferences(localViewReferences);
targetIds = new SparseArray<>();
- heldBatchResults = new ArrayList<>();
}
public void start() {
- startMutationQueue();
- }
-
- private void startMutationQueue() {
- persistence.runTransaction(
- "Start MutationQueue",
- () -> {
- mutationQueue.start();
-
- // If we have any leftover mutation batch results from a prior run, just drop them.
- // TODO: We may need to repopulate heldBatchResults or similar instead,
- // but that is not straightforward since we're not persisting the write ack versions.
- heldBatchResults.clear();
-
- // TODO: This is the only usage of getAllMutationBatchesThroughBatchId().
- // Consider removing it in favor of a getAcknowledgedBatches method.
- int highestAck = mutationQueue.getHighestAcknowledgedBatchId();
- if (highestAck != MutationBatch.UNKNOWN) {
- List batches =
- mutationQueue.getAllMutationBatchesThroughBatchId(highestAck);
- if (!batches.isEmpty()) {
- // NOTE: This could be more efficient if we had a removeBatchesThroughBatchID, but
- // this set should be very small and this code should go away eventually.
- for (MutationBatch batch : batches) {
- mutationQueue.removeMutationBatch(batch);
- }
- }
- }
- });
+ mutationQueue.start();
}
// PORTING NOTE: no shutdown for LocalStore or persistence components on Android.
@@ -191,7 +148,7 @@ public ImmutableSortedMap handleUserChange(User user
List oldBatches = mutationQueue.getAllMutationBatches();
mutationQueue = persistence.getMutationQueue(user);
- startMutationQueue();
+ mutationQueue.start();
List newBatches = mutationQueue.getAllMutationBatches();
@@ -249,18 +206,11 @@ public ImmutableSortedMap acknowledgeBatch(
return persistence.runTransaction(
"Acknowledge batch",
() -> {
- mutationQueue.acknowledgeBatch(batchResult.getBatch(), batchResult.getStreamToken());
-
- Set affected;
- if (shouldHoldBatchResult(batchResult.getCommitVersion())) {
- heldBatchResults.add(batchResult);
- affected = Collections.emptySet();
- } else {
- affected = releaseBatchResults(singletonList(batchResult));
- }
-
+ MutationBatch batch = batchResult.getBatch();
+ mutationQueue.acknowledgeBatch(batch, batchResult.getStreamToken());
+ applyBatchResult(batchResult);
mutationQueue.performConsistencyCheck();
- return localDocuments.getDocuments(affected);
+ return localDocuments.getDocuments(batch.getKeys());
});
}
@@ -282,9 +232,9 @@ public ImmutableSortedMap rejectBatch(int batchId) {
int lastAcked = mutationQueue.getHighestAcknowledgedBatchId();
hardAssert(batchId > lastAcked, "Acknowledged batches can't be rejected.");
- Set affectedKeys = removeMutationBatch(toReject);
+ mutationQueue.removeMutationBatch(toReject);
mutationQueue.performConsistencyCheck();
- return localDocuments.getDocuments(affectedKeys);
+ return localDocuments.getDocuments(toReject.getKeys());
});
}
@@ -421,10 +371,6 @@ public ImmutableSortedMap applyRemoteEvent(RemoteEve
queryCache.setLastRemoteSnapshotVersion(remoteVersion);
}
- Set releasedWriteKeys = releaseHeldBatchResults();
-
- // Union the two key sets.
- changedDocKeys.addAll(releasedWriteKeys);
return localDocuments.getDocuments(changedDocKeys);
});
}
@@ -563,12 +509,6 @@ public void releaseQuery(Query query) {
localViewReferences.removeReferencesForId(queryData.getTargetId());
persistence.getReferenceDelegate().removeTarget(queryData);
targetIds.remove(queryData.getTargetId());
-
- // If this was the last watch target, then we won't get any more watch snapshots, so we
- // should release any held batch results.
- if (targetIds.size() == 0) {
- releaseHeldBatchResults();
- }
});
}
@@ -585,70 +525,6 @@ public ImmutableSortedSet getRemoteDocumentKeys(int targetId) {
return queryCache.getMatchingKeysForTargetId(targetId);
}
- /**
- * Releases all the held batch results up to the current remote version received, and applies
- * their mutations to the docs in the remote documents cache.
- *
- * @return the set of keys of docs that were modified by those writes.
- */
- private Set releaseHeldBatchResults() {
- ArrayList toRelease = new ArrayList<>();
- for (MutationBatchResult batchResult : heldBatchResults) {
- if (!isRemoteUpToVersion(batchResult.getCommitVersion())) {
- break;
- }
- toRelease.add(batchResult);
- }
-
- if (toRelease.isEmpty()) {
- return Collections.emptySet();
- } else {
- heldBatchResults.subList(0, toRelease.size()).clear();
- return releaseBatchResults(toRelease);
- }
- }
-
- private boolean isRemoteUpToVersion(SnapshotVersion snapshotVersion) {
- // If there are no watch targets, then we won't get remote snapshots, and are always
- // "up-to-date."
- return snapshotVersion.compareTo(queryCache.getLastRemoteSnapshotVersion()) <= 0
- || targetIds.size() == 0;
- }
-
- private boolean shouldHoldBatchResult(SnapshotVersion snapshotVersion) {
- // Check if watcher isn't up to date or prior results are already held.
- return !isRemoteUpToVersion(snapshotVersion) || !heldBatchResults.isEmpty();
- }
-
- private Set releaseBatchResults(List batchResults) {
- ArrayList batches = new ArrayList<>(batchResults.size());
- // TODO: Call queryEngine.handleDocumentChange() as appropriate.
- for (MutationBatchResult batchResult : batchResults) {
- applyBatchResult(batchResult);
- batches.add(batchResult.getBatch());
- }
-
- return removeMutationBatches(batches);
- }
-
- private Set removeMutationBatch(MutationBatch batch) {
- return removeMutationBatches(singletonList(batch));
- }
-
- /** Removes the given mutation batches. */
- private Set removeMutationBatches(List batches) {
-
- Set affectedDocs = new HashSet<>();
- for (MutationBatch batch : batches) {
- for (Mutation mutation : batch.getMutations()) {
- affectedDocs.add(mutation.getKey());
- }
- mutationQueue.removeMutationBatch(batch);
- }
-
- return affectedDocs;
- }
-
private void applyBatchResult(MutationBatchResult batchResult) {
MutationBatch batch = batchResult.getBatch();
Set docKeys = batch.getKeys();
@@ -671,5 +547,7 @@ private void applyBatchResult(MutationBatchResult batchResult) {
}
}
}
+
+ mutationQueue.removeMutationBatch(batch);
}
}
diff --git a/firebase-firestore/src/test/java/com/google/firebase/firestore/local/LocalStoreTestCase.java b/firebase-firestore/src/test/java/com/google/firebase/firestore/local/LocalStoreTestCase.java
index 816ab038f8c..8d4f96580c2 100644
--- a/firebase-firestore/src/test/java/com/google/firebase/firestore/local/LocalStoreTestCase.java
+++ b/firebase-firestore/src/test/java/com/google/firebase/firestore/local/LocalStoreTestCase.java
@@ -255,8 +255,15 @@ public void testHandlesAckThenRejectThenRemoteEvent() {
// The last seen version is zero, so this ack must be held.
acknowledgeMutation(1);
- assertChanged();
- assertContains(doc("foo/bar", 0, map("foo", "bar"), Document.DocumentState.LOCAL_MUTATIONS));
+ assertChanged(doc("foo/bar", 1, map("foo", "bar"), Document.DocumentState.COMMITTED_MUTATIONS));
+ if (garbageCollectorIsEager()) {
+ // Nothing is pinning this anymore, as it has been acknowledged and there are no targets
+ // active.
+ assertNotContains("foo/bar");
+ } else {
+ assertContains(
+ doc("foo/bar", 1, map("foo", "bar"), Document.DocumentState.COMMITTED_MUTATIONS));
+ }
writeMutation(setMutation("bar/baz", map("bar", "baz")));
assertChanged(doc("bar/baz", 0, map("bar", "baz"), Document.DocumentState.LOCAL_MUTATIONS));
@@ -327,8 +334,9 @@ public void testHandlesDocumentThenSetMutationThenAckThenDocument() {
acknowledgeMutation(3);
// We haven't seen the remote event yet.
- assertChanged();
- assertContains(doc("foo/bar", 2, map("foo", "bar"), Document.DocumentState.LOCAL_MUTATIONS));
+ assertChanged(doc("foo/bar", 3, map("foo", "bar"), Document.DocumentState.COMMITTED_MUTATIONS));
+ assertContains(
+ doc("foo/bar", 3, map("foo", "bar"), Document.DocumentState.COMMITTED_MUTATIONS));
applyRemoteEvent(
updateRemoteEvent(doc("foo/bar", 3, map("it", "changed")), asList(targetId), emptyList()));
@@ -374,9 +382,18 @@ public void testHandlesPatchMutationThenDocumentThenAck() {
acknowledgeMutation(2);
- assertChanged();
+ assertChanged(
+ doc(
+ "foo/bar",
+ 2,
+ map("foo", "bar", "it", "base"),
+ Document.DocumentState.COMMITTED_MUTATIONS));
assertContains(
- doc("foo/bar", 1, map("foo", "bar", "it", "base"), Document.DocumentState.LOCAL_MUTATIONS));
+ doc(
+ "foo/bar",
+ 2,
+ map("foo", "bar", "it", "base"),
+ Document.DocumentState.COMMITTED_MUTATIONS));
applyRemoteEvent(
updateRemoteEvent(