Skip to content

Change removeMutationBatch to remove a single batch #30

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,9 @@ private void startMutationQueue() {
if (!batches.isEmpty()) {
// NOTE: This could be more efficient if we had a removeBatchesThroughBatchID, but
// this set should be very small and this code should go away eventually.
mutationQueue.removeMutationBatches(batches);
for (MutationBatch batch : batches) {
mutationQueue.removeMutationBatch(batch);
}
}
}
});
Expand Down Expand Up @@ -641,9 +643,9 @@ private Set<DocumentKey> removeMutationBatches(List<MutationBatch> batches) {
for (Mutation mutation : batch.getMutations()) {
affectedDocs.add(mutation.getKey());
}
mutationQueue.removeMutationBatch(batch);
}

mutationQueue.removeMutationBatches(batches);
return affectedDocs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,69 +328,40 @@ private List<MutationBatch> lookupMutationBatches(ImmutableSortedSet<Integer> ba
}

@Override
public void removeMutationBatches(List<MutationBatch> batches) {
int batchCount = batches.size();
hardAssert(batchCount > 0, "Should not remove mutations when none exist.");

int firstBatchId = batches.get(0).getBatchId();

int queueCount = queue.size();

public void removeMutationBatch(MutationBatch batch) {
// Find the position of the first batch for removal. This need not be the first entry in the
// queue.
int startIndex = indexOfExistingBatchId(firstBatchId, "removed");
int batchIndex = indexOfExistingBatchId(batch.getBatchId(), "removed");
hardAssert(
queue.get(startIndex).getBatchId() == firstBatchId,
queue.get(batchIndex).getBatchId() == batch.getBatchId(),
"Removed batches must exist in the queue");

// Check that removed batches are contiguous (while excluding tombstones).
int batchIndex = 1;
int queueIndex = startIndex + 1;
while (batchIndex < batchCount && queueIndex < queueCount) {
MutationBatch batch = queue.get(queueIndex);
if (batch.isTombstone()) {
queueIndex++;
continue;
}

hardAssert(
batch.getBatchId() == batches.get(batchIndex).getBatchId(),
"Removed batches must be contiguous in the queue");
batchIndex++;
queueIndex++;
}

// Only actually remove batches if removing at the front of the queue. Previously rejected
// batches may have left tombstones in the queue, so expand the removal range to include any
// tombstones.
if (startIndex == 0) {
for (; queueIndex < queueCount; queueIndex++) {
MutationBatch batch = queue.get(queueIndex);
if (!batch.isTombstone()) {
if (batchIndex == 0) {
int endIndex = 1;
for (; endIndex < queue.size(); endIndex++) {
MutationBatch currentBatch = queue.get(endIndex);
if (!currentBatch.isTombstone()) {
break;
}
}

queue.subList(startIndex, queueIndex).clear();
queue.subList(batchIndex, endIndex).clear();

} else {
// Mark tombstones
for (int i = startIndex; i < queueIndex; i++) {
queue.set(i, queue.get(i).toTombstone());
}
queue.set(batchIndex, queue.get(batchIndex).toTombstone());
}

// Remove entries from the index too.
ImmutableSortedSet<DocumentReference> references = batchesByDocumentKey;
for (MutationBatch batch : batches) {
int batchId = batch.getBatchId();
for (Mutation mutation : batch.getMutations()) {
DocumentKey key = mutation.getKey();
persistence.getReferenceDelegate().removeMutationReference(key);

DocumentReference reference = new DocumentReference(key, batchId);
references = references.remove(reference);
}
for (Mutation mutation : batch.getMutations()) {
DocumentKey key = mutation.getKey();
persistence.getReferenceDelegate().removeMutationReference(key);

DocumentReference reference = new DocumentReference(key, batch.getBatchId());
references = references.remove(reference);
}
batchesByDocumentKey = references;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,14 @@ List<MutationBatch> getAllMutationBatchesAffectingDocumentKeys(
List<MutationBatch> getAllMutationBatchesAffectingQuery(Query query);

/**
* Removes the given mutation batches from the queue. This is useful in two circumstances:
* Removes the given mutation batch from the queue. This is useful in two circumstances:
*
* <ul>
* <li>Removing applied mutations from the head of the queue
* <li>Removing rejected mutations from anywhere in the queue
* </ul>
*
* <p>In both cases, the array of mutations to remove must be a contiguous range of batchIds. This
* is most easily accomplished by loading mutations with {@link
* #getAllMutationBatchesThroughBatchId}.
*/
void removeMutationBatches(List<MutationBatch> batches);
void removeMutationBatch(MutationBatch batch);

/** Performs a consistency check, examining the mutation queue for any leaks, if possible. */
void performConsistencyCheck();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,23 +421,21 @@ public List<MutationBatch> getAllMutationBatchesAffectingQuery(Query query) {
}

@Override
public void removeMutationBatches(List<MutationBatch> batches) {
public void removeMutationBatch(MutationBatch batch) {
SQLiteStatement mutationDeleter =
db.prepare("DELETE FROM mutations WHERE uid = ? AND batch_id = ?");

SQLiteStatement indexDeleter =
db.prepare("DELETE FROM document_mutations WHERE uid = ? AND path = ? AND batch_id = ?");

for (MutationBatch batch : batches) {
int batchId = batch.getBatchId();
int deleted = db.execute(mutationDeleter, uid, batchId);
hardAssert(deleted != 0, "Mutation batch (%s, %d) did not exist", uid, batch.getBatchId());
int batchId = batch.getBatchId();
int deleted = db.execute(mutationDeleter, uid, batchId);
hardAssert(deleted != 0, "Mutation batch (%s, %d) did not exist", uid, batch.getBatchId());

for (Mutation mutation : batch.getMutations()) {
DocumentKey key = mutation.getKey();
String path = EncodedPath.encode(key.getPath());
db.execute(indexDeleter, uid, path, batchId);
}
for (Mutation mutation : batch.getMutations()) {
DocumentKey key = mutation.getKey();
String path = EncodedPath.encode(key.getPath());
db.execute(indexDeleter, uid, path, batchId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void testAcknowledgeThenRemove() {
name.getMethodName(),
() -> {
mutationQueue.acknowledgeBatch(batch1, WriteStream.EMPTY_STREAM_TOKEN);
mutationQueue.removeMutationBatches(asList(batch1));
mutationQueue.removeMutationBatch(batch1);
});

assertEquals(0, batchCount());
Expand Down Expand Up @@ -536,7 +536,12 @@ private void acknowledgeBatch(MutationBatch batch) {
/** Calls removeMutationBatches on the mutation queue in a new transaction and commits. */
private void removeMutationBatches(MutationBatch... batches) {
persistence.runTransaction(
"Remove mutation batches", () -> mutationQueue.removeMutationBatches(asList(batches)));
"Remove mutation batches",
() -> {
for (MutationBatch batch : batches) {
mutationQueue.removeMutationBatch(batch);
}
});
}

/** Returns the number of mutation batches in the mutation queue. */
Expand Down