Skip to content

Change removeMutationBatch to remove a single batch #1148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 20, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 29 additions & 31 deletions packages/firestore/src/local/indexeddb_mutation_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -473,44 +473,42 @@ export class IndexedDbMutationQueue implements MutationQueue {
return PersistencePromise.waitFor(promises).next(() => results);
}

removeMutationBatches(
removeMutationBatch(
transaction: PersistenceTransaction,
batches: MutationBatch[]
batch: MutationBatch
): PersistencePromise<void> {
const mutationStore = mutationsStore(transaction);
const indexTxn = documentMutationsStore(transaction);
const promises: Array<PersistencePromise<void>> = [];

for (const batch of batches) {
const range = IDBKeyRange.only(batch.batchId);
let numDeleted = 0;
const removePromise = mutationStore.iterate(
{ range },
(key, value, control) => {
numDeleted++;
return control.delete();
}
);
promises.push(
removePromise.next(() => {
assert(
numDeleted === 1,
'Dangling document-mutation reference found: Missing batch ' +
batch.batchId
);
})
);
for (const mutation of batch.mutations) {
const indexKey = DbDocumentMutation.key(
this.userId,
mutation.key.path,
batch.batchId
const range = IDBKeyRange.only(batch.batchId);
let numDeleted = 0;
const removePromise = mutationStore.iterate(
{ range },
(key, value, control) => {
numDeleted++;
return control.delete();
}
);
promises.push(
removePromise.next(() => {
assert(
numDeleted === 1,
'Dangling document-mutation reference found: Missing batch ' +
batch.batchId
);
this.removeCachedMutationKeys(batch.batchId);
promises.push(indexTxn.delete(indexKey));
if (this.garbageCollector !== null) {
this.garbageCollector.addPotentialGarbageKey(mutation.key);
}
})
);
for (const mutation of batch.mutations) {
const indexKey = DbDocumentMutation.key(
this.userId,
mutation.key.path,
batch.batchId
);
this.removeCachedMutationKeys(batch.batchId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this called in the loop? Couldn't this be called outside the loop on mutations in the batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this should just be called once. Fixed.

promises.push(indexTxn.delete(indexKey));
if (this.garbageCollector !== null) {
this.garbageCollector.addPotentialGarbageKey(mutation.key);
}
}
return PersistencePromise.waitFor(promises);
Expand Down
17 changes: 9 additions & 8 deletions packages/firestore/src/local/local_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,11 @@ export class LocalStore {
}
})
.next(ackedBatches => {
if (ackedBatches.length > 0) {
return this.mutationQueue.removeMutationBatches(txn, ackedBatches);
} else {
return PersistencePromise.resolve();
}
let p = PersistencePromise.resolve();
ackedBatches.forEach(batch => {
p = p.next(() => this.mutationQueue.removeMutationBatch(txn, batch));
});
return p;
});
}

Expand Down Expand Up @@ -979,16 +979,17 @@ export class LocalStore {
batches: MutationBatch[]
): PersistencePromise<DocumentKeySet> {
let affectedDocs = documentKeySet();

let p = PersistencePromise.resolve();
for (const batch of batches) {
for (const mutation of batch.mutations) {
const key = mutation.key;
affectedDocs = affectedDocs.add(key);
}
p = p.next(() => this.mutationQueue.removeMutationBatch(txn, batch));
}

return this.mutationQueue
.removeMutationBatches(txn, batches)
.next(() => affectedDocs);
return p.next(() => affectedDocs);
}

private applyWriteToRemoteDocuments(
Expand Down
64 changes: 18 additions & 46 deletions packages/firestore/src/local/memory_mutation_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -328,73 +328,45 @@ export class MemoryMutationQueue implements MutationQueue {
return result;
}

removeMutationBatches(
removeMutationBatch(
transaction: PersistenceTransaction,
batches: MutationBatch[]
batch: MutationBatch
): PersistencePromise<void> {
const batchCount = batches.length;
assert(batchCount > 0, 'Should not remove mutations when none exist.');

const firstBatchId = batches[0].batchId;
const queueCount = this.mutationQueue.length;

// Find the position of the first batch for removal. This need not be the
// first entry in the queue.
const startIndex = this.indexOfExistingBatchId(firstBatchId, 'removed');
const batchIndex = this.indexOfExistingBatchId(batch.batchId, 'removed');
assert(
this.mutationQueue[startIndex].batchId === firstBatchId,
this.mutationQueue[batchIndex].batchId === batch.batchId,
'Removed batches must exist in the queue'
);

// Check that removed batches are contiguous (while excluding tombstones).
let batchIndex = 1;
let queueIndex = startIndex + 1;
while (batchIndex < batchCount && queueIndex < queueCount) {
const batch = this.mutationQueue[queueIndex];
if (batch.isTombstone()) {
queueIndex++;
continue;
}

assert(
batch.batchId === batches[batchIndex].batchId,
'Removed batches must be contiguous in the queue'
);
batchIndex++;
queueIndex++;
}

// Only actually remove batches if removing at the front of the queue.
// Previously rejected batches may have left tombstones in the queue, so
// expand the removal range to include any tombstones.
if (startIndex === 0) {
for (; queueIndex < queueCount; queueIndex++) {
if (batchIndex === 0) {
let queueIndex = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

endIndex? The queueIndex name was never any good and now it's even worse because we're not comparing the batch to remove against the queue contents.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

for (; queueIndex < this.mutationQueue.length; queueIndex++) {
const batch = this.mutationQueue[queueIndex];
if (!batch.isTombstone()) {
break;
}
}
const length = queueIndex - startIndex;
this.mutationQueue.splice(startIndex, length);
this.mutationQueue.splice(0, queueIndex);
} else {
// Mark the tombstones
for (let i = startIndex; i < queueIndex; i++) {
this.mutationQueue[i] = this.mutationQueue[i].toTombstone();
}
this.mutationQueue[batchIndex] = this.mutationQueue[
batchIndex
].toTombstone();
}

let references = this.batchesByDocumentKey;
for (const batch of batches) {
const batchId = batch.batchId;
for (const mutation of batch.mutations) {
const key = mutation.key;
if (this.garbageCollector !== null) {
this.garbageCollector.addPotentialGarbageKey(key);
}

const ref = new DocReference(key, batchId);
references = references.delete(ref);
for (const mutation of batch.mutations) {
const key = mutation.key;
if (this.garbageCollector !== null) {
this.garbageCollector.addPotentialGarbageKey(key);
}

const ref = new DocReference(key, batch.batchId);
references = references.delete(ref);
}
this.batchesByDocumentKey = references;
return PersistencePromise.resolve();
Expand Down
14 changes: 5 additions & 9 deletions packages/firestore/src/local/mutation_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,21 +197,17 @@ export interface MutationQueue extends GarbageSource {
): PersistencePromise<MutationBatch[]>;

/**
* Removes the given mutation batches from the queue. This is useful in two
* Removes the given mutation batch from the queue. This is useful in two
* circumstances:
*
* + Removing applied mutations from the head of the queue
* + Removing rejected mutations from anywhere in the queue
*
* In both cases, the array of mutations to remove must be a contiguous range
* of batchIds. This is most easily accomplished by loading mutations with
* getAllMutationBatchesThroughBatchId()
* + Removing an applied mutation from the head of the queue
* + Removing a rejected mutation from anywhere in the queue
*
* Multi-Tab Note: This operation should only be called by the primary client.
*/
removeMutationBatches(
removeMutationBatch(
transaction: PersistenceTransaction,
batches: MutationBatch[]
batch: MutationBatch
): PersistencePromise<void>;

/**
Expand Down
55 changes: 28 additions & 27 deletions packages/firestore/test/unit/local/mutation_queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ function genericMutationQueueTests(): void {
for (let i = 0; i < holes.length; i++) {
const index = holes[i] - i;
const batch = batches[index];
await mutationQueue.removeMutationBatches([batch]);
await mutationQueue.removeMutationBatch(batch);

batches.splice(index, 1);
removed.push(batch);
Expand All @@ -146,10 +146,10 @@ function genericMutationQueueTests(): void {
const batch2 = await addMutationBatch();
expect(await mutationQueue.countBatches()).to.equal(2);

await mutationQueue.removeMutationBatches([batch2]);
await mutationQueue.removeMutationBatch(batch2);
expect(await mutationQueue.countBatches()).to.equal(1);

await mutationQueue.removeMutationBatches([batch1]);
await mutationQueue.removeMutationBatch(batch1);
expect(await mutationQueue.countBatches()).to.equal(0);
});

Expand Down Expand Up @@ -181,18 +181,18 @@ function genericMutationQueueTests(): void {
batch2.batchId
);

await mutationQueue.removeMutationBatches([batch1]);
await mutationQueue.removeMutationBatch(batch1);
expect(await mutationQueue.getHighestAcknowledgedBatchId()).to.equal(
batch2.batchId
);

await mutationQueue.removeMutationBatches([batch2]);
await mutationQueue.removeMutationBatch(batch2);
expect(await mutationQueue.getHighestAcknowledgedBatchId()).to.equal(
batch2.batchId
);

// Batch 3 never acknowledged.
await mutationQueue.removeMutationBatches([batch3]);
await mutationQueue.removeMutationBatch(batch3);
expect(await mutationQueue.getHighestAcknowledgedBatchId()).to.equal(
batch2.batchId
);
Expand All @@ -206,7 +206,7 @@ function genericMutationQueueTests(): void {
);

await mutationQueue.acknowledgeBatch(batch1, emptyByteString());
await mutationQueue.removeMutationBatches([batch1]);
await mutationQueue.removeMutationBatch(batch1);

expect(await mutationQueue.countBatches()).to.equal(0);
expect(await mutationQueue.getHighestAcknowledgedBatchId()).to.equal(
Expand All @@ -226,7 +226,8 @@ function genericMutationQueueTests(): void {
batch2.batchId
);

await mutationQueue.removeMutationBatches([batch1, batch2]);
await mutationQueue.removeMutationBatch(batch1);
await mutationQueue.removeMutationBatch(batch2);
expect(await mutationQueue.getHighestAcknowledgedBatchId()).to.equal(
batch2.batchId
);
Expand Down Expand Up @@ -461,16 +462,17 @@ function genericMutationQueueTests(): void {
await addMutationBatch('foo/baz')
];

await mutationQueue.removeMutationBatches([batches[0]]);
await mutationQueue.removeMutationBatch(batches[0]);
expectSetToEqual(await mutationQueue.collectGarbage(gc), []);

await mutationQueue.removeMutationBatches([batches[1]]);
await mutationQueue.removeMutationBatch(batches[1]);
expectSetToEqual(await mutationQueue.collectGarbage(gc), [key('foo/ba')]);

await mutationQueue.removeMutationBatches([batches[5]]);
await mutationQueue.removeMutationBatch(batches[5]);
expectSetToEqual(await mutationQueue.collectGarbage(gc), [key('foo/baz')]);

await mutationQueue.removeMutationBatches([batches[2], batches[3]]);
await mutationQueue.removeMutationBatch(batches[2]);
await mutationQueue.removeMutationBatch(batches[3]);
expectSetToEqual(await mutationQueue.collectGarbage(gc), [
key('foo/bar'),
key('foo/bar2')
Expand All @@ -479,7 +481,8 @@ function genericMutationQueueTests(): void {
batches.push(await addMutationBatch('foo/bar/suffix/baz'));
expectSetToEqual(await mutationQueue.collectGarbage(gc), []);

await mutationQueue.removeMutationBatches([batches[4], batches[6]]);
await mutationQueue.removeMutationBatch(batches[4]);
await mutationQueue.removeMutationBatch(batches[6]);
expectSetToEqual(await mutationQueue.collectGarbage(gc), [
key('foo/bar/suffix/baz')
]);
Expand Down Expand Up @@ -509,11 +512,11 @@ function genericMutationQueueTests(): void {
);
});

it('can removeMutationBatches()', async () => {
it('can removeMutationBatch()', async () => {
const batches = await createBatches(10);
const last = batches[batches.length - 1];

await mutationQueue.removeMutationBatches([batches[0]]);
await mutationQueue.removeMutationBatch(batches[0]);
batches.splice(0, 1);
expect(await mutationQueue.countBatches()).to.equal(9);

Expand All @@ -525,11 +528,9 @@ function genericMutationQueueTests(): void {
expectEqualArrays(found, batches);
expect(found.length).to.equal(9);

await mutationQueue.removeMutationBatches([
batches[0],
batches[1],
batches[2]
]);
await mutationQueue.removeMutationBatch(batches[0]);
await mutationQueue.removeMutationBatch(batches[1]);
await mutationQueue.removeMutationBatch(batches[2]);
batches.splice(0, 3);
expect(await mutationQueue.countBatches()).to.equal(6);

Expand All @@ -539,7 +540,7 @@ function genericMutationQueueTests(): void {
expectEqualArrays(found, batches);
expect(found.length).to.equal(6);

await mutationQueue.removeMutationBatches([batches[batches.length - 1]]);
await mutationQueue.removeMutationBatch(batches[batches.length - 1]);
batches.splice(batches.length - 1, 1);
expect(await mutationQueue.countBatches()).to.equal(5);

Expand All @@ -549,11 +550,11 @@ function genericMutationQueueTests(): void {
expectEqualArrays(found, batches);
expect(found.length).to.equal(5);

await mutationQueue.removeMutationBatches([batches[3]]);
await mutationQueue.removeMutationBatch(batches[3]);
batches.splice(3, 1);
expect(await mutationQueue.countBatches()).to.equal(4);

await mutationQueue.removeMutationBatches([batches[1]]);
await mutationQueue.removeMutationBatch(batches[1]);
batches.splice(1, 1);
expect(await mutationQueue.countBatches()).to.equal(3);

Expand All @@ -564,10 +565,10 @@ function genericMutationQueueTests(): void {
expect(found.length).to.equal(3);
expect(await mutationQueue.checkEmpty()).to.equal(false);

await mutationQueue.removeMutationBatches(batches);
found = await mutationQueue.getAllMutationBatchesThroughBatchId(
last.batchId
);
for (const batch of batches) {
await mutationQueue.removeMutationBatch(batch);
}
found = await mutationQueue.getAllMutationBatches();
expectEqualArrays(found, []);
expect(found.length).to.equal(0);
expect(await mutationQueue.checkEmpty()).to.equal(true);
Expand Down
Loading