Skip to content

Remove no longer needed MutationBatch methods #40

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,6 @@ public ImmutableSortedMap<DocumentKey, MaybeDocument> rejectBatch(int batchId) {
MutationBatch toReject = mutationQueue.lookupMutationBatch(batchId);
hardAssert(toReject != null, "Attempt to reject nonexistent batch!");

int lastAcked = mutationQueue.getHighestAcknowledgedBatchId();
hardAssert(batchId > lastAcked, "Acknowledged batches can't be rejected.");

mutationQueue.removeMutationBatch(toReject);
mutationQueue.performConsistencyCheck();
return localDocuments.getDocuments(toReject.getKeys());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,6 @@ public int getNextBatchId() {
return nextBatchId;
}

@Override
public int getHighestAcknowledgedBatchId() {
return highestAcknowledgedBatchId;
}

@Override
public void acknowledgeBatch(MutationBatch batch, ByteString streamToken) {
int batchId = batch.getBatchId();
Expand Down Expand Up @@ -216,23 +211,6 @@ public List<MutationBatch> getAllMutationBatches() {
return getAllLiveMutationBatchesBeforeIndex(queue.size());
}

@Override
public List<MutationBatch> getAllMutationBatchesThroughBatchId(int batchId) {
int count = queue.size();

int endIndex = indexOfBatchId(batchId);
if (endIndex < 0) {
endIndex = 0;
} else if (endIndex >= count) {
endIndex = count;
} else {
// The endIndex is in the queue so increment to pull everything in the queue including it.
endIndex += 1;
}

return getAllLiveMutationBatchesBeforeIndex(endIndex);
}

@Override
public List<MutationBatch> getAllMutationBatchesAffectingDocumentKey(DocumentKey documentKey) {
DocumentReference start = new DocumentReference(documentKey, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,6 @@ interface MutationQueue {
*/
int getNextBatchId();

/**
* Returns the highest batchId that has been acknowledged. If no batches have been acknowledged or
* if there are no batches in the queue this can return {@link MutationBatch#UNKNOWN}.
*/
int getHighestAcknowledgedBatchId();

/** Acknowledges the given batch. */
void acknowledgeBatch(MutationBatch batch, ByteString streamToken);

Expand Down Expand Up @@ -85,17 +79,6 @@ interface MutationQueue {
// cheaply, we should replace this.
List<MutationBatch> getAllMutationBatches();

/**
* Finds all mutations with a batchId less than or equal to the given batchId.
*
* <p>Generally the caller should be asking for the next unacknowledged batchId and the number of
* acknowledged batches should be very small when things are functioning well.
*
* @param batchId The batch to search through.
* @return an List containing all batches with matching batchIds.
*/
List<MutationBatch> getAllMutationBatchesThroughBatchId(int batchId);

/**
* Finds all mutation batches that could @em possibly affect the given document key. Not all
* mutations in a batch will necessarily affect the document key, so when looping through the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,6 @@ public int getNextBatchId() {
return nextBatchId;
}

@Override
public int getHighestAcknowledgedBatchId() {
return lastAcknowledgedBatchId;
}

@Override
public void acknowledgeBatch(MutationBatch batch, ByteString streamToken) {
int batchId = batch.getBatchId();
Expand Down Expand Up @@ -267,17 +262,6 @@ public List<MutationBatch> getAllMutationBatches() {
return result;
}

@Override
public List<MutationBatch> getAllMutationBatchesThroughBatchId(int batchId) {
List<MutationBatch> result = new ArrayList<>();
db.query(
"SELECT mutations FROM mutations "
+ "WHERE uid = ? AND batch_id <= ? ORDER BY batch_id ASC")
.binding(uid, batchId)
.forEach(row -> result.add(decodeMutationBatch(row.getBlob(0))));
return result;
}

@Override
public List<MutationBatch> getAllMutationBatchesAffectingDocumentKey(DocumentKey documentKey) {
String path = EncodedPath.encode(documentKey.getPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import static com.google.firebase.firestore.testutil.TestUtil.streamToken;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -103,38 +101,6 @@ public void testCountBatches() {
assertTrue(mutationQueue.isEmpty());
}

@Test
public void testAcknowledgeBatchId() {
// Initial state of an empty queue
assertEquals(MutationBatch.UNKNOWN, mutationQueue.getHighestAcknowledgedBatchId());

// Adding mutation batches should not change the highest acked batchId.
MutationBatch batch1 = addMutationBatch();
MutationBatch batch2 = addMutationBatch();
MutationBatch batch3 = addMutationBatch();
assertThat(batch1.getBatchId(), greaterThan(MutationBatch.UNKNOWN));
assertThat(batch2.getBatchId(), greaterThan(batch1.getBatchId()));
assertThat(batch3.getBatchId(), greaterThan(batch2.getBatchId()));

assertEquals(MutationBatch.UNKNOWN, mutationQueue.getHighestAcknowledgedBatchId());

acknowledgeBatch(batch1);
assertEquals(batch1.getBatchId(), mutationQueue.getHighestAcknowledgedBatchId());

acknowledgeBatch(batch2);
assertEquals(batch2.getBatchId(), mutationQueue.getHighestAcknowledgedBatchId());

removeMutationBatches(batch1);
assertEquals(batch2.getBatchId(), mutationQueue.getHighestAcknowledgedBatchId());

removeMutationBatches(batch2);
assertEquals(batch2.getBatchId(), mutationQueue.getHighestAcknowledgedBatchId());

// Batch 3 never acknowledged.
removeMutationBatches(batch3);
assertEquals(batch2.getBatchId(), mutationQueue.getHighestAcknowledgedBatchId());
}

@Test
public void testAcknowledgeThenRemove() {
MutationBatch batch1 = addMutationBatch();
Expand All @@ -147,44 +113,6 @@ public void testAcknowledgeThenRemove() {
});

assertEquals(0, batchCount());
assertEquals(batch1.getBatchId(), mutationQueue.getHighestAcknowledgedBatchId());
}

@Test
public void testHighestAcknowledgedBatchIdNeverExceedsNextBatchId() {
MutationBatch batch1 = addMutationBatch();
MutationBatch batch2 = addMutationBatch();
acknowledgeBatch(batch1);
acknowledgeBatch(batch2);
assertEquals(batch2.getBatchId(), mutationQueue.getHighestAcknowledgedBatchId());

removeMutationBatches(batch1, batch2);
assertEquals(batch2.getBatchId(), mutationQueue.getHighestAcknowledgedBatchId());

// Restart the queue so that nextBatchId will be reset.
mutationQueue = persistence.getMutationQueue(User.UNAUTHENTICATED);
mutationQueue.start();

persistence.runTransaction("Start mutationQueue", () -> mutationQueue.start());

// Verify that on restart with an empty queue, nextBatchId falls to a lower value.
assertThat(mutationQueue.getNextBatchId(), lessThan(batch2.getBatchId()));

// As a result highestAcknowledgedBatchId must also reset lower.
assertEquals(MutationBatch.UNKNOWN, mutationQueue.getHighestAcknowledgedBatchId());

// The mutation queue will reset the next batchId after all mutations are removed so adding
// another mutation will cause a collision.
MutationBatch newBatch = addMutationBatch();
assertEquals(batch1.getBatchId(), newBatch.getBatchId());

// Restart the queue with one unacknowledged batch in it.
persistence.runTransaction("Start mutationQueue", () -> mutationQueue.start());

assertEquals(newBatch.getBatchId() + 1, mutationQueue.getNextBatchId());

// highestAcknowledgedBatchId must still be MutationBatch.UNKNOWN.
assertEquals(MutationBatch.UNKNOWN, mutationQueue.getHighestAcknowledgedBatchId());
}

@Test
Expand Down Expand Up @@ -265,24 +193,6 @@ public void testNextMutationBatchAfterBatchIdSkipsAcknowledgedBatches() {
mutationQueue.getNextMutationBatchAfterBatchId(batches.get(1).getBatchId()));
}

@Test
public void testAllMutationBatchesThroughBatchID() {
List<MutationBatch> batches = createBatches(10);
makeHoles(asList(2, 6, 7), batches);

List<MutationBatch> found;
List<MutationBatch> expected;

found = mutationQueue.getAllMutationBatchesThroughBatchId(batches.get(0).getBatchId() - 1);
assertEquals(emptyList(), found);

for (int i = 0; i < batches.size(); i++) {
found = mutationQueue.getAllMutationBatchesThroughBatchId(batches.get(i).getBatchId());
expected = batches.subList(0, i + 1);
assertEquals(expected, found);
}
}

@Test
public void testAllMutationBatchesAffectingDocumentKey() {
List<Mutation> mutations =
Expand Down Expand Up @@ -442,7 +352,7 @@ public void testRemoveMutationBatches() {

List<MutationBatch> found;

found = mutationQueue.getAllMutationBatchesThroughBatchId(last.getBatchId());
found = mutationQueue.getAllMutationBatches();
assertEquals(batches, found);
assertEquals(9, found.size());

Expand All @@ -452,14 +362,14 @@ public void testRemoveMutationBatches() {
batches.remove(batches.get(0));
assertEquals(6, batchCount());

found = mutationQueue.getAllMutationBatchesThroughBatchId(last.getBatchId());
found = mutationQueue.getAllMutationBatches();
assertEquals(batches, found);
assertEquals(6, found.size());

removeMutationBatches(batches.remove(batches.size() - 1));
assertEquals(5, batchCount());

found = mutationQueue.getAllMutationBatchesThroughBatchId(last.getBatchId());
found = mutationQueue.getAllMutationBatches();
assertEquals(batches, found);
assertEquals(5, found.size());

Expand All @@ -469,12 +379,12 @@ public void testRemoveMutationBatches() {
removeMutationBatches(batches.remove(1));
assertEquals(3, batchCount());

found = mutationQueue.getAllMutationBatchesThroughBatchId(last.getBatchId());
found = mutationQueue.getAllMutationBatches();
assertEquals(batches, found);
assertEquals(3, found.size());

removeMutationBatches(batches.toArray(new MutationBatch[0]));
found = mutationQueue.getAllMutationBatchesThroughBatchId(last.getBatchId());
found = mutationQueue.getAllMutationBatches();
assertEquals(emptyList(), found);
assertEquals(0, found.size());
assertTrue(mutationQueue.isEmpty());
Expand All @@ -496,7 +406,6 @@ public void testStreamToken() {
persistence.runTransaction(
"acknowledgeBatchId", () -> mutationQueue.acknowledgeBatch(batch1, streamToken2));

assertEquals(batch1.getBatchId(), mutationQueue.getHighestAcknowledgedBatchId());
assertEquals(streamToken2, mutationQueue.getLastStreamToken());
}

Expand Down