Skip to content

Remove held batch handling from LocalStore #34

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package com.google.firebase.firestore.core;

import static com.google.firebase.firestore.util.Assert.fail;

import com.google.android.gms.tasks.Task;
import com.google.android.gms.tasks.Tasks;
import com.google.firebase.firestore.FirebaseFirestoreException;
Expand Down Expand Up @@ -41,8 +43,6 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

import static com.google.firebase.firestore.util.Assert.fail;

/**
* Internal transaction object responsible for accumulating the mutations to perform and the base
* versions for any documents read.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import static com.google.firebase.firestore.util.Assert.hardAssert;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;

import android.util.SparseArray;
import com.google.firebase.Timestamp;
Expand All @@ -36,8 +35,6 @@
import com.google.firebase.firestore.remote.TargetChange;
import com.google.firebase.firestore.util.Logger;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -122,17 +119,6 @@ public final class LocalStore {
/** Used to generate targetIds for queries tracked locally. */
private final TargetIdGenerator targetIdGenerator;

/**
* A heldBatchResult is a mutation batch result (from a write acknowledgement) that arrived before
* the watch stream got notified of a snapshot that includes the write. So we "hold" it until the
* watch stream catches up. It ensures that the local write remains visible (latency compensation)
* and doesn't temporarily appear reverted because the watch stream is slower than the write
* stream and so wasn't reflecting it.
*
* <p>NOTE: Eventually we want to move this functionality into the remote store.
*/
private final List<MutationBatchResult> heldBatchResults;

public LocalStore(Persistence persistence, User initialUser) {
hardAssert(
persistence.isStarted(), "LocalStore was passed an unstarted persistence implementation");
Expand All @@ -149,39 +135,10 @@ public LocalStore(Persistence persistence, User initialUser) {
persistence.getReferenceDelegate().setAdditionalReferences(localViewReferences);

targetIds = new SparseArray<>();
heldBatchResults = new ArrayList<>();
}

public void start() {
startMutationQueue();
}

private void startMutationQueue() {
persistence.runTransaction(
"Start MutationQueue",
() -> {
mutationQueue.start();

// If we have any leftover mutation batch results from a prior run, just drop them.
// TODO: We may need to repopulate heldBatchResults or similar instead,
// but that is not straightforward since we're not persisting the write ack versions.
heldBatchResults.clear();

// TODO: This is the only usage of getAllMutationBatchesThroughBatchId().
// Consider removing it in favor of a getAcknowledgedBatches method.
int highestAck = mutationQueue.getHighestAcknowledgedBatchId();
if (highestAck != MutationBatch.UNKNOWN) {
List<MutationBatch> batches =
mutationQueue.getAllMutationBatchesThroughBatchId(highestAck);
if (!batches.isEmpty()) {
// NOTE: This could be more efficient if we had a removeBatchesThroughBatchID, but
// this set should be very small and this code should go away eventually.
for (MutationBatch batch : batches) {
mutationQueue.removeMutationBatch(batch);
}
}
}
});
mutationQueue.start();
}

// PORTING NOTE: no shutdown for LocalStore or persistence components on Android.
Expand All @@ -191,7 +148,7 @@ public ImmutableSortedMap<DocumentKey, MaybeDocument> handleUserChange(User user
List<MutationBatch> oldBatches = mutationQueue.getAllMutationBatches();

mutationQueue = persistence.getMutationQueue(user);
startMutationQueue();
mutationQueue.start();

List<MutationBatch> newBatches = mutationQueue.getAllMutationBatches();

Expand Down Expand Up @@ -249,18 +206,11 @@ public ImmutableSortedMap<DocumentKey, MaybeDocument> acknowledgeBatch(
return persistence.runTransaction(
"Acknowledge batch",
() -> {
mutationQueue.acknowledgeBatch(batchResult.getBatch(), batchResult.getStreamToken());

Set<DocumentKey> affected;
if (shouldHoldBatchResult(batchResult.getCommitVersion())) {
heldBatchResults.add(batchResult);
affected = Collections.emptySet();
} else {
affected = releaseBatchResults(singletonList(batchResult));
}

MutationBatch batch = batchResult.getBatch();
mutationQueue.acknowledgeBatch(batch, batchResult.getStreamToken());
applyBatchResult(batchResult);
mutationQueue.performConsistencyCheck();
return localDocuments.getDocuments(affected);
return localDocuments.getDocuments(batch.getKeys());
});
}

Expand All @@ -282,9 +232,9 @@ public ImmutableSortedMap<DocumentKey, MaybeDocument> rejectBatch(int batchId) {
int lastAcked = mutationQueue.getHighestAcknowledgedBatchId();
hardAssert(batchId > lastAcked, "Acknowledged batches can't be rejected.");

Set<DocumentKey> affectedKeys = removeMutationBatch(toReject);
mutationQueue.removeMutationBatch(toReject);
mutationQueue.performConsistencyCheck();
return localDocuments.getDocuments(affectedKeys);
return localDocuments.getDocuments(toReject.getKeys());
});
}

Expand Down Expand Up @@ -421,10 +371,6 @@ public ImmutableSortedMap<DocumentKey, MaybeDocument> applyRemoteEvent(RemoteEve
queryCache.setLastRemoteSnapshotVersion(remoteVersion);
}

Set<DocumentKey> releasedWriteKeys = releaseHeldBatchResults();

// Union the two key sets.
changedDocKeys.addAll(releasedWriteKeys);
return localDocuments.getDocuments(changedDocKeys);
});
}
Expand Down Expand Up @@ -563,12 +509,6 @@ public void releaseQuery(Query query) {
localViewReferences.removeReferencesForId(queryData.getTargetId());
persistence.getReferenceDelegate().removeTarget(queryData);
targetIds.remove(queryData.getTargetId());

// If this was the last watch target, then we won't get any more watch snapshots, so we
// should release any held batch results.
if (targetIds.size() == 0) {
releaseHeldBatchResults();
}
});
}

Expand All @@ -585,70 +525,6 @@ public ImmutableSortedSet<DocumentKey> getRemoteDocumentKeys(int targetId) {
return queryCache.getMatchingKeysForTargetId(targetId);
}

/**
* Releases all the held batch results up to the current remote version received, and applies
* their mutations to the docs in the remote documents cache.
*
* @return the set of keys of docs that were modified by those writes.
*/
private Set<DocumentKey> releaseHeldBatchResults() {
ArrayList<MutationBatchResult> toRelease = new ArrayList<>();
for (MutationBatchResult batchResult : heldBatchResults) {
if (!isRemoteUpToVersion(batchResult.getCommitVersion())) {
break;
}
toRelease.add(batchResult);
}

if (toRelease.isEmpty()) {
return Collections.emptySet();
} else {
heldBatchResults.subList(0, toRelease.size()).clear();
return releaseBatchResults(toRelease);
}
}

private boolean isRemoteUpToVersion(SnapshotVersion snapshotVersion) {
// If there are no watch targets, then we won't get remote snapshots, and are always
// "up-to-date."
return snapshotVersion.compareTo(queryCache.getLastRemoteSnapshotVersion()) <= 0
|| targetIds.size() == 0;
}

private boolean shouldHoldBatchResult(SnapshotVersion snapshotVersion) {
// Check if watcher isn't up to date or prior results are already held.
return !isRemoteUpToVersion(snapshotVersion) || !heldBatchResults.isEmpty();
}

private Set<DocumentKey> releaseBatchResults(List<MutationBatchResult> batchResults) {
ArrayList<MutationBatch> batches = new ArrayList<>(batchResults.size());
// TODO: Call queryEngine.handleDocumentChange() as appropriate.
for (MutationBatchResult batchResult : batchResults) {
applyBatchResult(batchResult);
batches.add(batchResult.getBatch());
}

return removeMutationBatches(batches);
}

private Set<DocumentKey> removeMutationBatch(MutationBatch batch) {
return removeMutationBatches(singletonList(batch));
}

/** Removes the given mutation batches. */
private Set<DocumentKey> removeMutationBatches(List<MutationBatch> batches) {

Set<DocumentKey> affectedDocs = new HashSet<>();
for (MutationBatch batch : batches) {
for (Mutation mutation : batch.getMutations()) {
affectedDocs.add(mutation.getKey());
}
mutationQueue.removeMutationBatch(batch);
}

return affectedDocs;
}

private void applyBatchResult(MutationBatchResult batchResult) {
MutationBatch batch = batchResult.getBatch();
Set<DocumentKey> docKeys = batch.getKeys();
Expand All @@ -671,5 +547,7 @@ private void applyBatchResult(MutationBatchResult batchResult) {
}
}
}

mutationQueue.removeMutationBatch(batch);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,15 @@ public void testHandlesAckThenRejectThenRemoteEvent() {

// The last seen version is zero, so this ack must be held.
acknowledgeMutation(1);
assertChanged();
assertContains(doc("foo/bar", 0, map("foo", "bar"), Document.DocumentState.LOCAL_MUTATIONS));
assertChanged(doc("foo/bar", 1, map("foo", "bar"), Document.DocumentState.COMMITTED_MUTATIONS));
if (garbageCollectorIsEager()) {
// Nothing is pinning this anymore, as it has been acknowledged and there are no targets
// active.
assertNotContains("foo/bar");
} else {
assertContains(
doc("foo/bar", 1, map("foo", "bar"), Document.DocumentState.COMMITTED_MUTATIONS));
}

writeMutation(setMutation("bar/baz", map("bar", "baz")));
assertChanged(doc("bar/baz", 0, map("bar", "baz"), Document.DocumentState.LOCAL_MUTATIONS));
Expand Down Expand Up @@ -327,8 +334,9 @@ public void testHandlesDocumentThenSetMutationThenAckThenDocument() {

acknowledgeMutation(3);
// We haven't seen the remote event yet.
assertChanged();
assertContains(doc("foo/bar", 2, map("foo", "bar"), Document.DocumentState.LOCAL_MUTATIONS));
assertChanged(doc("foo/bar", 3, map("foo", "bar"), Document.DocumentState.COMMITTED_MUTATIONS));
assertContains(
doc("foo/bar", 3, map("foo", "bar"), Document.DocumentState.COMMITTED_MUTATIONS));

applyRemoteEvent(
updateRemoteEvent(doc("foo/bar", 3, map("it", "changed")), asList(targetId), emptyList()));
Expand Down Expand Up @@ -374,9 +382,18 @@ public void testHandlesPatchMutationThenDocumentThenAck() {

acknowledgeMutation(2);

assertChanged();
assertChanged(
doc(
"foo/bar",
2,
map("foo", "bar", "it", "base"),
Document.DocumentState.COMMITTED_MUTATIONS));
assertContains(
doc("foo/bar", 1, map("foo", "bar", "it", "base"), Document.DocumentState.LOCAL_MUTATIONS));
doc(
"foo/bar",
2,
map("foo", "bar", "it", "base"),
Document.DocumentState.COMMITTED_MUTATIONS));

applyRemoteEvent(
updateRemoteEvent(
Expand Down