Skip to content

Refactor pendingWrites / write pipeline. #1699

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Firestore/Example/Tests/Integration/FSTDatastoreTests.mm
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
NS_ASSUME_NONNULL_BEGIN

@interface FSTRemoteStore (Tests)
- (void)commitBatch:(FSTMutationBatch *)batch;
- (void)addBatchToWritePipeline:(FSTMutationBatch *)batch;
@end

#pragma mark - FSTRemoteStoreEventCapture
Expand Down Expand Up @@ -224,7 +224,7 @@ - (void)testStreamingWrite {
localWriteTime:[FIRTimestamp timestamp]
mutations:@[ mutation ]];
[_testWorkerQueue dispatchAsync:^{
[_remoteStore commitBatch:batch];
[_remoteStore addBatchToWritePipeline:batch];
}];

[self awaitExpectations];
Expand Down
120 changes: 62 additions & 58 deletions Firestore/Source/Remote/FSTRemoteStore.mm
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ @interface FSTRemoteStore () <FSTWatchStreamDelegate, FSTWriteStreamDelegate>
@property(nonatomic, strong, readonly)
NSMutableDictionary<FSTBoxedTargetID *, FSTQueryData *> *listenTargets;

@property(nonatomic, assign) FSTBatchID lastBatchSeen;

@property(nonatomic, strong, readonly) FSTOnlineStateTracker *onlineStateTracker;

@property(nonatomic, strong, nullable) FSTWatchChangeAggregator *watchChangeAggregator;
Expand All @@ -95,11 +93,20 @@ @interface FSTRemoteStore () <FSTWatchStreamDelegate, FSTWriteStreamDelegate>
@property(nonatomic, strong, nullable) FSTWriteStream *writeStream;

/**
* A FIFO queue of in-flight writes. This is in-flight from the point of view of the caller of
* writeMutations, not from the point of view from the Datastore itself. In particular, these
* requests may not have been sent to the Datastore server if the write stream is not yet running.
* A list of up to kMaxPendingWrites writes that we have fetched from the LocalStore via
* fillWritePipeline and have or will send to the write stream.
*
* Whenever writePipeline is not empty, the RemoteStore will attempt to start or restart the write
* stream. When the stream is established, the writes in the pipeline will be sent in order.
*
* Writes remain in writePipeline until they are acknowledged by the backend and thus will
* automatically be re-sent if the stream is interrupted / restarted before they're acknowledged.
*
* Write responses from the backend are linked to their originating request purely based on
* order, and so we can just remove writes from the front of the writePipeline as we receive
* responses.
*/
@property(nonatomic, strong, readonly) NSMutableArray<FSTMutationBatch *> *pendingWrites;
@property(nonatomic, strong, readonly) NSMutableArray<FSTMutationBatch *> *writePipeline;
@end

@implementation FSTRemoteStore
Expand All @@ -112,8 +119,7 @@ - (instancetype)initWithLocalStore:(FSTLocalStore *)localStore
_datastore = datastore;
_listenTargets = [NSMutableDictionary dictionary];

_lastBatchSeen = kFSTBatchIDUnknown;
_pendingWrites = [NSMutableArray array];
_writePipeline = [NSMutableArray array];
_onlineStateTracker = [[FSTOnlineStateTracker alloc] initWithWorkerDispatchQueue:queue];
}
return self;
Expand Down Expand Up @@ -178,7 +184,12 @@ - (void)disableNetworkInternal {
[self.writeStream stop];

[self cleanUpWatchStreamState];
[self cleanUpWriteStreamState];

if (self.writePipeline.count > 0) {
LOG_DEBUG("Stopping write stream with %lu pending writes",
(unsigned long)self.writePipeline.count);
[self.writePipeline removeAllObjects];
}

self.writeStream = nil;
self.watchStream = nil;
Expand Down Expand Up @@ -418,7 +429,7 @@ - (nullable FSTQueryData *)queryDataForTarget:(FSTBoxedTargetID *)targetID {
* pending writes.
*/
- (BOOL)shouldStartWriteStream {
return [self isNetworkEnabled] && ![self.writeStream isStarted] && self.pendingWrites.count > 0;
return [self isNetworkEnabled] && ![self.writeStream isStarted] && self.writePipeline.count > 0;
}

- (void)startWriteStream {
Expand All @@ -428,48 +439,50 @@ - (void)startWriteStream {
[self.writeStream startWithDelegate:self];
}

- (void)cleanUpWriteStreamState {
self.lastBatchSeen = kFSTBatchIDUnknown;
LOG_DEBUG("Stopping write stream with %s pending writes", [self.pendingWrites count]);
[self.pendingWrites removeAllObjects];
}

/**
* Attempts to fill our write pipeline with writes from the LocalStore.
*
* Called internally to bootstrap or refill the write pipeline and by SyncEngine whenever there
* are new mutations to process.
*
* Starts the write stream if necessary.
*/
- (void)fillWritePipeline {
if ([self isNetworkEnabled]) {
while ([self canWriteMutations]) {
FSTMutationBatch *batch = [self.localStore nextMutationBatchAfterBatchID:self.lastBatchSeen];
if (!batch) {
break;
FSTBatchID lastBatchIDRetrieved =
self.writePipeline.count == 0 ? kFSTBatchIDUnknown : self.writePipeline.lastObject.batchID;
while ([self canAddToWritePipeline]) {
FSTMutationBatch *batch = [self.localStore nextMutationBatchAfterBatchID:lastBatchIDRetrieved];
if (!batch) {
if (self.writePipeline.count == 0) {
[self.writeStream markIdle];
}
[self commitBatch:batch];
break;
}
[self addBatchToWritePipeline:batch];
lastBatchIDRetrieved = batch.batchID;
}

if ([self.pendingWrites count] == 0) {
[self.writeStream markIdle];
}
if ([self shouldStartWriteStream]) {
[self startWriteStream];
}
}

/**
* Returns YES if the backend can accept additional write requests.
*
* When sending mutations to the write stream (e.g. in -fillWritePipeline), call this method first
* to check if more mutations can be sent.
*
* Currently the only thing that can prevent the backend from accepting write requests is if
* there are too many requests already outstanding. As writes complete the backend will be able
* to accept more.
* Returns YES if we can add to the write pipeline (i.e. it is not full and the network is enabled).
*/
- (BOOL)canWriteMutations {
return [self isNetworkEnabled] && self.pendingWrites.count < kMaxPendingWrites;
- (BOOL)canAddToWritePipeline {
return [self isNetworkEnabled] && self.writePipeline.count < kMaxPendingWrites;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename isNetworkEnabled to canUseNetwork like in the other ports?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but that happened in a later PR that I'm not porting (yet).

}

/** Given mutations to commit, actually commits them to the backend. */
- (void)commitBatch:(FSTMutationBatch *)batch {
HARD_ASSERT([self canWriteMutations], "commitBatch called when mutations can't be written");
self.lastBatchSeen = batch.batchID;
/**
* Queues additional writes to be sent to the write stream, sending them immediately if the write
* stream is established, else starting the write stream if it is not yet started.
*/
- (void)addBatchToWritePipeline:(FSTMutationBatch *)batch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remember the few bits about naming on iOS correctly, then this method name should end in its argument type. I can't come up with any sane suggestions, and since this is a fine C++ name, we may be able to ignore this if you also can't think of a name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't see a good way of doing that and we have other similarly-named methods already (e.g. applyChangesToDocuments:(FSTViewDocumentChanges *)docChanges;)

HARD_ASSERT([self canAddToWritePipeline],
"addBatchToWritePipeline called when mutations can't be written");

[self.pendingWrites addObject:batch];
[self.writePipeline addObject:batch];

if ([self shouldStartWriteStream]) {
[self startWriteStream];
Expand All @@ -490,17 +503,8 @@ - (void)writeStreamDidCompleteHandshake {
// Record the stream token.
[self.localStore setLastStreamToken:self.writeStream.lastStreamToken];

// Drain any pending writes.
//
// Note that at this point pendingWrites contains mutations that have already been accepted by
// fillWritePipeline/commitBatch. If the pipeline is full, canWriteMutations will be NO, despite
// the fact that we actually need to send mutations over.
//
// This also means that this method indirectly respects the limits imposed by canWriteMutations
// since writes can't be added to the pendingWrites array when canWriteMutations is NO. If the
// limits imposed by canWriteMutations actually protect us from DOSing ourselves then those limits
// won't be exceeded here and we'll continue to make progress.
for (FSTMutationBatch *write in self.pendingWrites) {
// Send the write pipeline now that the stream is established.
for (FSTMutationBatch *write in self.writePipeline) {
[self.writeStream writeMutations:write.mutations];
}
}
Expand All @@ -509,10 +513,10 @@ - (void)writeStreamDidCompleteHandshake {
- (void)writeStreamDidReceiveResponseWithVersion:(const SnapshotVersion &)commitVersion
mutationResults:(NSArray<FSTMutationResult *> *)results {
// This is a response to a write containing mutations and should be correlated to the first
// pending write.
NSMutableArray *pendingWrites = self.pendingWrites;
FSTMutationBatch *batch = pendingWrites[0];
[pendingWrites removeObjectAtIndex:0];
// write in our write pipeline.
NSMutableArray *writePipeline = self.writePipeline;
FSTMutationBatch *batch = writePipeline[0];
[writePipeline removeObjectAtIndex:0];

FSTMutationBatchResult *batchResult =
[FSTMutationBatchResult resultWithBatch:batch
Expand All @@ -535,7 +539,7 @@ - (void)writeStreamWasInterruptedWithError:(nullable NSError *)error {

// If the write stream closed due to an error, invoke the error callbacks if there are pending
// writes.
if (error != nil && self.pendingWrites.count > 0) {
if (error != nil && self.writePipeline.count > 0) {
if (self.writeStream.handshakeComplete) {
// This error affects the actual writes.
[self handleWriteError:error];
Expand Down Expand Up @@ -572,8 +576,8 @@ - (void)handleWriteError:(NSError *)error {

// If this was a permanent error, the request itself was the problem so it's not going to
// succeed if we resend it.
FSTMutationBatch *batch = self.pendingWrites[0];
[self.pendingWrites removeObjectAtIndex:0];
FSTMutationBatch *batch = self.writePipeline[0];
[self.writePipeline removeObjectAtIndex:0];

// In this case it's also unlikely that the server itself is melting down--this was just a
// bad request so inhibit backoff on the next restart.
Expand Down