Skip to content

Fix MutationQueue issue resulting in re-sending acknowledged writes. #559

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/firestore/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
neither succeeds nor fails within 10 seconds, the SDK will consider itself
"offline", causing get() calls to resolve with cached results, rather than
continuing to wait.
- [fixed] Fixed a potential race condition after calling `enableNetwork()` that
could result in a "Mutation batchIDs must be acknowledged in order" assertion
crash.

# 0.3.2
- [fixed] Fixed a regression in Firebase JS release 4.9.0 that could in certain
Expand Down
12 changes: 9 additions & 3 deletions packages/firestore/src/local/indexeddb_mutation_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -255,14 +255,20 @@ export class IndexedDbMutationQueue implements MutationQueue {
transaction: PersistenceTransaction,
batchId: BatchId
): PersistencePromise<MutationBatch | null> {
const range = IDBKeyRange.lowerBound(this.keyForBatchId(batchId + 1));
// All batches with batchId <= this.metadata.lastAcknowledgedBatchId have
// been acknowledged so the first unacknowledged batch after batchID will
// have a batchID larger than both of these values.
const nextBatchId =
Math.max(batchId, this.metadata.lastAcknowledgedBatchId) + 1;

const range = IDBKeyRange.lowerBound(this.keyForBatchId(nextBatchId));
let foundBatch: MutationBatch | null = null;
return mutationsStore(transaction)
.iterate({ range }, (key, dbBatch, control) => {
if (dbBatch.userId === this.userId) {
assert(
dbBatch.batchId > batchId,
'Should have found mutation after ' + batchId
dbBatch.batchId >= nextBatchId,
'Should have found mutation after ' + nextBatchId
);
foundBatch = this.serializer.fromDbMutationBatch(dbBatch);
}
Expand Down
4 changes: 2 additions & 2 deletions packages/firestore/src/local/memory_mutation_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,11 @@ export class MemoryMutationQueue implements MutationQueue {
// All batches with batchId <= this.highestAcknowledgedBatchId have been
// acknowledged so the first unacknowledged batch after batchID will have a
// batchID larger than both of these values.
batchId = Math.max(batchId + 1, this.highestAcknowledgedBatchId);
const nextBatchId = Math.max(batchId, this.highestAcknowledgedBatchId) + 1;

// The requested batchId may still be out of range so normalize it to the
// start of the queue.
const rawIndex = this.indexOfBatchId(batchId);
const rawIndex = this.indexOfBatchId(nextBatchId);
let index = rawIndex < 0 ? 0 : rawIndex;

// Finally return the first non-tombstone batch.
Expand Down
21 changes: 21 additions & 0 deletions packages/firestore/test/unit/local/mutation_queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import {

import * as persistenceHelpers from './persistence_test_helpers';
import { TestMutationQueue } from './test_mutation_queue';
import { addEqualityMatcher } from '../../util/equality_matcher';

let persistence: Persistence;
let mutationQueue: TestMutationQueue;
Expand Down Expand Up @@ -120,6 +121,8 @@ describe('IndexedDbMutationQueue', () => {
* implementations.
*/
function genericMutationQueueTests() {
addEqualityMatcher();

beforeEach(() => {
mutationQueue = new TestMutationQueue(
persistence,
Expand Down Expand Up @@ -324,6 +327,24 @@ function genericMutationQueueTests() {
expect(notFound).to.be.null;
});

it('getNextMutationBatchAfterBatchId() skips acknowledged batches', async () => {
const batches = await createBatches(3);
expect(
await mutationQueue.getNextMutationBatchAfterBatchId(BATCHID_UNKNOWN)
).to.deep.equal(batches[0]);

await mutationQueue.acknowledgeBatch(batches[0], emptyByteString());
expect(
await mutationQueue.getNextMutationBatchAfterBatchId(BATCHID_UNKNOWN)
).to.deep.equal(batches[1]);
expect(
await mutationQueue.getNextMutationBatchAfterBatchId(batches[0].batchId)
).to.deep.equal(batches[1]);
expect(
await mutationQueue.getNextMutationBatchAfterBatchId(batches[1].batchId)
).to.deep.equal(batches[2]);
});

it('can getAllMutationBatchesThroughBatchId()', async () => {
const batches = await createBatches(10);
await makeHolesInBatches([2, 6, 7], batches);
Expand Down
6 changes: 3 additions & 3 deletions packages/firestore/test/unit/specs/spec_test_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -825,17 +825,17 @@ abstract class TestRunner {
private validateStateExpectations(expectation: StateExpectation): void {
if (expectation) {
if ('numOutstandingWrites' in expectation) {
expect(this.remoteStore.outstandingWrites()).to.deep.equal(
expect(this.remoteStore.outstandingWrites()).to.equal(
expectation.numOutstandingWrites
);
}
if ('writeStreamRequestCount' in expectation) {
expect(this.connection.writeStreamRequestCount).to.deep.equal(
expect(this.connection.writeStreamRequestCount).to.equal(
expectation.writeStreamRequestCount
);
}
if ('watchStreamRequestCount' in expectation) {
expect(this.connection.watchStreamRequestCount).to.deep.equal(
expect(this.connection.watchStreamRequestCount).to.equal(
expectation.watchStreamRequestCount
);
}
Expand Down
53 changes: 53 additions & 0 deletions packages/firestore/test/unit/specs/write_spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,59 @@ describeSpec('Writes:', [], () => {
);
});

specTest(
'Held writes are not re-sent after disable/enable network.',
[],
() => {
const query = Query.atPath(path('collection'));
const docALocal = doc(
'collection/a',
0,
{ v: 1 },
{ hasLocalMutations: true }
);
const docA = doc('collection/a', 1000, { v: 1 });

return (
spec()
.userListens(query)
.watchAcksFull(query, 500)
.expectEvents(query, {})
.userSets('collection/a', { v: 1 })
.expectEvents(query, {
hasPendingWrites: true,
added: [docALocal]
})
// ack write but without a watch event.
.writeAcks(1000)

// handshake + write = 2 requests
.expectWriteStreamRequestCount(2)

.disableNetwork()
.expectEvents(query, {
hasPendingWrites: true,
fromCache: true
})

// handshake + write + close = 3 requests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is a bit too terse. Is this happening as a result of disabling the network or something else?

If so it's weird because I would have thought when we disable the network nothing happens.

This could be a little clearer if you expected the write stream request count was 2 above and then this would show that all we did after that was close.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Added the requestCount==2 expectation prior to close. FWIW, disabling the network sends an additional no-op write (

). I don't know the full context of that, but I assume it's intentional (so the backend knows we got all the acks and can GC state probably).

.expectWriteStreamRequestCount(3)

.enableNetwork()
.expectActiveTargets({ query, resumeToken: 'resume-token-500' })

// acked write should /not/ have been resent, so count should still be 3
.expectWriteStreamRequestCount(3)

// Finally watch catches up.
.watchAcksFull(query, 2000, docA)
.expectEvents(query, {
metadata: [docA]
})
);
}
);

specTest(
'Held writes are released when there are no queries left.',
[],
Expand Down