Skip to content

Stop persisting last acknowledged batch ID #2243

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jan 11, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 7 additions & 33 deletions Firestore/Example/Tests/Local/FSTMutationQueueTests.mm
Original file line number Diff line number Diff line change
Expand Up @@ -83,32 +83,30 @@ - (void)testCountBatches {
- (void)testAcknowledgeBatchID {
if ([self isTestBaseClass]) return;

// Initial state of an empty queue
XCTAssertEqual([self.mutationQueue highestAcknowledgedBatchID], kFSTBatchIDUnknown);

// Adding mutation batches should not change the highest acked batchID.
self.persistence.run("testAcknowledgeBatchID", [&]() {
XCTAssertEqual([self batchCount], 0);

FSTMutationBatch *batch1 = [self addMutationBatch];
FSTMutationBatch *batch2 = [self addMutationBatch];
FSTMutationBatch *batch3 = [self addMutationBatch];
XCTAssertGreaterThan(batch1.batchID, kFSTBatchIDUnknown);
XCTAssertGreaterThan(batch2.batchID, batch1.batchID);
XCTAssertGreaterThan(batch3.batchID, batch2.batchID);

XCTAssertEqual([self.mutationQueue highestAcknowledgedBatchID], kFSTBatchIDUnknown);
XCTAssertEqual([self batchCount], 3);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking batchCount is the only replacement for the previous check that I could think of.


[self.mutationQueue acknowledgeBatch:batch1 streamToken:nil];
[self.mutationQueue removeMutationBatch:batch1];
XCTAssertEqual([self batchCount], 2);

[self.mutationQueue acknowledgeBatch:batch2 streamToken:nil];
XCTAssertEqual([self.mutationQueue highestAcknowledgedBatchID], batch2.batchID);
XCTAssertEqual([self batchCount], 2);

[self.mutationQueue removeMutationBatch:batch2];
XCTAssertEqual([self.mutationQueue highestAcknowledgedBatchID], batch2.batchID);
XCTAssertEqual([self batchCount], 1);

// Batch 3 never acknowledged.
[self.mutationQueue removeMutationBatch:batch3];
XCTAssertEqual([self.mutationQueue highestAcknowledgedBatchID], batch2.batchID);
XCTAssertEqual([self batchCount], 0);
});
}

Expand All @@ -122,7 +120,6 @@ - (void)testAcknowledgeThenRemove {
[self.mutationQueue removeMutationBatch:batch1];

XCTAssertEqual([self batchCount], 0);
XCTAssertEqual([self.mutationQueue highestAcknowledgedBatchID], batch1.batchID);
});
}

Expand Down Expand Up @@ -186,28 +183,6 @@ - (void)testNextMutationBatchAfterBatchID {
});
}

- (void)testNextMutationBatchAfterBatchIDSkipsAcknowledgedBatches {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test appears to be no longer relevant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. The mutation queue now contains only unacknowledged writes now.

if ([self isTestBaseClass]) return;

NSMutableArray<FSTMutationBatch *> *batches = self.persistence.run(
"testNextMutationBatchAfterBatchIDSkipsAcknowledgedBatches newBatches",
[&]() -> NSMutableArray<FSTMutationBatch *> * {
NSMutableArray<FSTMutationBatch *> *newBatches = [self createBatches:3];
XCTAssertEqualObjects([self.mutationQueue nextMutationBatchAfterBatchID:kFSTBatchIDUnknown],
newBatches[0]);
return newBatches;
});
self.persistence.run("testNextMutationBatchAfterBatchIDSkipsAcknowledgedBatches", [&]() {
[self.mutationQueue acknowledgeBatch:batches[0] streamToken:nil];
XCTAssertEqualObjects([self.mutationQueue nextMutationBatchAfterBatchID:kFSTBatchIDUnknown],
batches[1]);
XCTAssertEqualObjects([self.mutationQueue nextMutationBatchAfterBatchID:batches[0].batchID],
batches[1]);
XCTAssertEqualObjects([self.mutationQueue nextMutationBatchAfterBatchID:batches[1].batchID],
batches[2]);
});
}

- (void)testAllMutationBatchesAffectingDocumentKey {
if ([self isTestBaseClass]) return;

Expand Down Expand Up @@ -406,7 +381,6 @@ - (void)testStreamToken {
XCTAssertEqualObjects([self.mutationQueue lastStreamToken], streamToken1);

[self.mutationQueue acknowledgeBatch:batch1 streamToken:streamToken2];
XCTAssertEqual(self.mutationQueue.highestAcknowledgedBatchID, batch1.batchID);
XCTAssertEqualObjects([self.mutationQueue lastStreamToken], streamToken2);
});
}
Expand Down
34 changes: 2 additions & 32 deletions Firestore/Source/Local/FSTLevelDBMutationQueue.mm
Original file line number Diff line number Diff line change
Expand Up @@ -115,31 +115,13 @@ - (instancetype)initWithUserID:(std::string)userID
}

- (void)start {
BatchId nextBatchID = [FSTLevelDBMutationQueue loadNextBatchIDFromDB:_db.ptr];
self.nextBatchID = [FSTLevelDBMutationQueue loadNextBatchIDFromDB:_db.ptr];

// On restart, nextBatchId may end up lower than lastAcknowledgedBatchId since it's computed from
// the queue contents, and there may be no mutations in the queue. In this case, we need to reset
// lastAcknowledgedBatchId (which is safe since the queue must be empty).
std::string key = [self keyForCurrentMutationQueue];
FSTPBMutationQueue *metadata = [self metadataForKey:key];
if (!metadata) {
metadata = [FSTPBMutationQueue message];

// proto3's default value for lastAcknowledgedBatchId is zero, but that would consider the first
// entry in the queue to be acknowledged without that acknowledgement actually happening.
metadata.lastAcknowledgedBatchId = kFSTBatchIDUnknown;
} else {
BatchId lastAcked = metadata.lastAcknowledgedBatchId;
if (lastAcked >= nextBatchID) {
HARD_ASSERT([self isEmpty], "Reset nextBatchID is only possible when the queue is empty");
lastAcked = kFSTBatchIDUnknown;

metadata.lastAcknowledgedBatchId = lastAcked;
_db.currentTransaction->Put([self keyForCurrentMutationQueue], metadata);
}
}

self.nextBatchID = nextBatchID;
self.metadata = metadata;
}

Expand Down Expand Up @@ -220,17 +202,8 @@ - (BOOL)isEmpty {
return empty;
}

- (BatchId)highestAcknowledgedBatchID {
return self.metadata.lastAcknowledgedBatchId;
}

- (void)acknowledgeBatch:(FSTMutationBatch *)batch streamToken:(nullable NSData *)streamToken {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this function is now equivalent to setLastStreamToken.

BatchId batchID = batch.batchID;
HARD_ASSERT(batchID > self.highestAcknowledgedBatchID,
"Mutation batchIDs must be acknowledged in order");

FSTPBMutationQueue *metadata = self.metadata;
metadata.lastAcknowledgedBatchId = batchID;
metadata.lastStreamToken = streamToken;

_db.currentTransaction->Put([self keyForCurrentMutationQueue], metadata);
Expand Down Expand Up @@ -304,10 +277,7 @@ - (nullable FSTMutationBatch *)lookupMutationBatch:(BatchId)batchID {
}

- (nullable FSTMutationBatch *)nextMutationBatchAfterBatchID:(BatchId)batchID {
// All batches with batchID <= self.metadata.lastAcknowledgedBatchId have been acknowledged so
// the first unacknowledged batch after batchID will have a batchID larger than both of these
// values.
BatchId nextBatchID = MAX(batchID, self.metadata.lastAcknowledgedBatchId) + 1;
BatchId nextBatchID = batchID + 1;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume this is fine?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.


std::string key = [self mutationKeyForBatchID:nextBatchID];
auto it = _db.currentTransaction->NewIterator();
Expand Down
10 changes: 4 additions & 6 deletions Firestore/Source/Local/FSTLocalStore.mm
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,6 @@ - (MaybeDocumentMap)rejectBatchID:(BatchId)batchID {
FSTMutationBatch *toReject = [self.mutationQueue lookupMutationBatch:batchID];
HARD_ASSERT(toReject, "Attempt to reject nonexistent batch!");

BatchId lastAcked = [self.mutationQueue highestAcknowledgedBatchID];
HARD_ASSERT(batchID > lastAcked, "Acknowledged batches can't be rejected.");

[self.mutationQueue removeMutationBatch:toReject];
[self.mutationQueue performConsistencyCheck];

Expand Down Expand Up @@ -378,9 +375,10 @@ - (nullable FSTMutationBatch *)nextMutationBatchAfterBatchID:(BatchId)batchID {
}

- (nullable FSTMaybeDocument *)readDocument:(const DocumentKey &)key {
return self.persistence.run("ReadDocument", [&]() -> FSTMaybeDocument *_Nullable {
return [self.localDocuments documentForKey:key];
});
return self.persistence.run(
"ReadDocument", [&]() -> FSTMaybeDocument *_Nullable {
return [self.localDocuments documentForKey:key];
});
}

- (FSTQueryData *)allocateQuery:(FSTQuery *)query {
Expand Down
20 changes: 2 additions & 18 deletions Firestore/Source/Local/FSTMemoryMutationQueue.mm
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ @interface FSTMemoryMutationQueue ()
/** The next value to use when assigning sequential IDs to each mutation batch. */
@property(nonatomic, assign) BatchId nextBatchID;

/** The highest acknowledged mutation in the queue. */
@property(nonatomic, assign) BatchId highestAcknowledgedBatchID;

/**
* The last received stream token from the server, used to acknowledge which responses the client
* has processed. Stream tokens are opaque checkpoint markers whose only real value is their
Expand All @@ -94,7 +91,6 @@ - (instancetype)initWithPersistence:(FSTMemoryPersistence *)persistence {
_queue = [NSMutableArray array];

_nextBatchID = 1;
_highestAcknowledgedBatchID = kFSTBatchIDUnknown;
}
return self;
}
Expand All @@ -105,13 +101,10 @@ - (void)start {
// Note: The queue may be shutdown / started multiple times, since we maintain the queue for the
// duration of the app session in case a user logs out / back in. To behave like the
// LevelDB-backed MutationQueue (and accommodate tests that expect as much), we reset nextBatchID
// and highestAcknowledgedBatchID if the queue is empty.
// if the queue is empty.
if (self.isEmpty) {
self.nextBatchID = 1;
self.highestAcknowledgedBatchID = kFSTBatchIDUnknown;
}
HARD_ASSERT(self.highestAcknowledgedBatchID < self.nextBatchID,
"highestAcknowledgedBatchID must be less than the nextBatchID");
}

- (BOOL)isEmpty {
Expand All @@ -120,16 +113,10 @@ - (BOOL)isEmpty {
return self.queue.count == 0;
}

- (BatchId)highestAcknowledgedBatchID {
return _highestAcknowledgedBatchID;
}

- (void)acknowledgeBatch:(FSTMutationBatch *)batch streamToken:(nullable NSData *)streamToken {
NSMutableArray<FSTMutationBatch *> *queue = self.queue;

BatchId batchID = batch.batchID;
HARD_ASSERT(batchID > self.highestAcknowledgedBatchID,
"Mutation batchIDs must be acknowledged in order");

NSInteger batchIndex = [self indexOfExistingBatchID:batchID action:@"acknowledged"];
HARD_ASSERT(batchIndex == 0, "Can only acknowledge the first batch in the mutation queue");
Expand All @@ -139,7 +126,6 @@ - (void)acknowledgeBatch:(FSTMutationBatch *)batch streamToken:(nullable NSData
HARD_ASSERT(batchID == check.batchID, "Queue ordering failure: expected batch %s, got batch %s",
batchID, check.batchID);

self.highestAcknowledgedBatchID = batchID;
self.lastStreamToken = streamToken;
}

Expand Down Expand Up @@ -186,9 +172,7 @@ - (nullable FSTMutationBatch *)lookupMutationBatch:(BatchId)batchID {
- (nullable FSTMutationBatch *)nextMutationBatchAfterBatchID:(BatchId)batchID {
NSMutableArray<FSTMutationBatch *> *queue = self.queue;

// All batches with batchID <= self.highestAcknowledgedBatchID have been acknowledged so the
// first unacknowledged batch after batchID will have a batchID larger than both of these values.
BatchId nextBatchID = MAX(batchID, self.highestAcknowledgedBatchID) + 1;
BatchId nextBatchID = batchID + 1;

// The requested batchID may still be out of range so normalize it to the start of the queue.
NSInteger rawIndex = [self indexOfBatchID:nextBatchID];
Expand Down
10 changes: 0 additions & 10 deletions Firestore/Source/Local/FSTMutationQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,12 @@ NS_ASSUME_NONNULL_BEGIN
/**
* Starts the mutation queue, performing any initial reads that might be required to establish
* invariants, etc.
*
* After starting, the mutation queue must guarantee that the highestAcknowledgedBatchID is less
* than nextBatchID. This prevents the local store from creating new batches that the mutation
* queue would consider erroneously acknowledged.
*/
- (void)start;

/** Returns YES if this queue contains no mutation batches. */
- (BOOL)isEmpty;

/**
* Returns the highest batchID that has been acknowledged. If no batches have been acknowledged
* or if there are no batches in the queue this can return kFSTBatchIDUnknown.
*/
- (firebase::firestore::model::BatchId)highestAcknowledgedBatchID;

/** Acknowledges the given batch. */
- (void)acknowledgeBatch:(FSTMutationBatch *)batch streamToken:(nullable NSData *)streamToken;

Expand Down