-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Refactor pendingWrites / write pipeline. #1699
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -83,8 +83,6 @@ @interface FSTRemoteStore () <FSTWatchStreamDelegate, FSTWriteStreamDelegate> | |
@property(nonatomic, strong, readonly) | ||
NSMutableDictionary<FSTBoxedTargetID *, FSTQueryData *> *listenTargets; | ||
|
||
@property(nonatomic, assign) FSTBatchID lastBatchSeen; | ||
|
||
@property(nonatomic, strong, readonly) FSTOnlineStateTracker *onlineStateTracker; | ||
|
||
@property(nonatomic, strong, nullable) FSTWatchChangeAggregator *watchChangeAggregator; | ||
|
@@ -95,11 +93,20 @@ @interface FSTRemoteStore () <FSTWatchStreamDelegate, FSTWriteStreamDelegate> | |
@property(nonatomic, strong, nullable) FSTWriteStream *writeStream; | ||
|
||
/** | ||
* A FIFO queue of in-flight writes. This is in-flight from the point of view of the caller of | ||
* writeMutations, not from the point of view from the Datastore itself. In particular, these | ||
* requests may not have been sent to the Datastore server if the write stream is not yet running. | ||
* A list of up to kMaxPendingWrites writes that we have fetched from the LocalStore via | ||
* fillWritePipeline and have or will send to the write stream. | ||
* | ||
* Whenever writePipeline is not empty, the RemoteStore will attempt to start or restart the write | ||
* stream. When the stream is established, the writes in the pipeline will be sent in order. | ||
* | ||
* Writes remain in writePipeline until they are acknowledged by the backend and thus will | ||
* automatically be re-sent if the stream is interrupted / restarted before they're acknowledged. | ||
* | ||
* Write responses from the backend are linked to their originating request purely based on | ||
* order, and so we can just remove writes from the front of the writePipeline as we receive | ||
* responses. | ||
*/ | ||
@property(nonatomic, strong, readonly) NSMutableArray<FSTMutationBatch *> *pendingWrites; | ||
@property(nonatomic, strong, readonly) NSMutableArray<FSTMutationBatch *> *writePipeline; | ||
@end | ||
|
||
@implementation FSTRemoteStore | ||
|
@@ -112,8 +119,7 @@ - (instancetype)initWithLocalStore:(FSTLocalStore *)localStore | |
_datastore = datastore; | ||
_listenTargets = [NSMutableDictionary dictionary]; | ||
|
||
_lastBatchSeen = kFSTBatchIDUnknown; | ||
_pendingWrites = [NSMutableArray array]; | ||
_writePipeline = [NSMutableArray array]; | ||
_onlineStateTracker = [[FSTOnlineStateTracker alloc] initWithWorkerDispatchQueue:queue]; | ||
} | ||
return self; | ||
|
@@ -178,7 +184,12 @@ - (void)disableNetworkInternal { | |
[self.writeStream stop]; | ||
|
||
[self cleanUpWatchStreamState]; | ||
[self cleanUpWriteStreamState]; | ||
|
||
if (self.writePipeline.count > 0) { | ||
LOG_DEBUG("Stopping write stream with %lu pending writes", | ||
(unsigned long)self.writePipeline.count); | ||
[self.writePipeline removeAllObjects]; | ||
} | ||
|
||
self.writeStream = nil; | ||
self.watchStream = nil; | ||
|
@@ -418,7 +429,7 @@ - (nullable FSTQueryData *)queryDataForTarget:(FSTBoxedTargetID *)targetID { | |
* pending writes. | ||
*/ | ||
- (BOOL)shouldStartWriteStream { | ||
return [self isNetworkEnabled] && ![self.writeStream isStarted] && self.pendingWrites.count > 0; | ||
return [self isNetworkEnabled] && ![self.writeStream isStarted] && self.writePipeline.count > 0; | ||
} | ||
|
||
- (void)startWriteStream { | ||
|
@@ -428,48 +439,50 @@ - (void)startWriteStream { | |
[self.writeStream startWithDelegate:self]; | ||
} | ||
|
||
- (void)cleanUpWriteStreamState { | ||
self.lastBatchSeen = kFSTBatchIDUnknown; | ||
LOG_DEBUG("Stopping write stream with %s pending writes", [self.pendingWrites count]); | ||
[self.pendingWrites removeAllObjects]; | ||
} | ||
|
||
/** | ||
* Attempts to fill our write pipeline with writes from the LocalStore. | ||
* | ||
* Called internally to bootstrap or refill the write pipeline and by SyncEngine whenever there | ||
* are new mutations to process. | ||
* | ||
* Starts the write stream if necessary. | ||
*/ | ||
- (void)fillWritePipeline { | ||
if ([self isNetworkEnabled]) { | ||
while ([self canWriteMutations]) { | ||
FSTMutationBatch *batch = [self.localStore nextMutationBatchAfterBatchID:self.lastBatchSeen]; | ||
if (!batch) { | ||
break; | ||
FSTBatchID lastBatchIDRetrieved = | ||
self.writePipeline.count == 0 ? kFSTBatchIDUnknown : self.writePipeline.lastObject.batchID; | ||
while ([self canAddToWritePipeline]) { | ||
FSTMutationBatch *batch = [self.localStore nextMutationBatchAfterBatchID:lastBatchIDRetrieved]; | ||
if (!batch) { | ||
if (self.writePipeline.count == 0) { | ||
[self.writeStream markIdle]; | ||
} | ||
[self commitBatch:batch]; | ||
break; | ||
} | ||
[self addBatchToWritePipeline:batch]; | ||
lastBatchIDRetrieved = batch.batchID; | ||
} | ||
|
||
if ([self.pendingWrites count] == 0) { | ||
[self.writeStream markIdle]; | ||
} | ||
if ([self shouldStartWriteStream]) { | ||
[self startWriteStream]; | ||
} | ||
} | ||
|
||
/** | ||
* Returns YES if the backend can accept additional write requests. | ||
* | ||
* When sending mutations to the write stream (e.g. in -fillWritePipeline), call this method first | ||
* to check if more mutations can be sent. | ||
* | ||
* Currently the only thing that can prevent the backend from accepting write requests is if | ||
* there are too many requests already outstanding. As writes complete the backend will be able | ||
* to accept more. | ||
* Returns YES if we can add to the write pipeline (i.e. it is not full and the network is enabled). | ||
*/ | ||
- (BOOL)canWriteMutations { | ||
return [self isNetworkEnabled] && self.pendingWrites.count < kMaxPendingWrites; | ||
- (BOOL)canAddToWritePipeline { | ||
return [self isNetworkEnabled] && self.writePipeline.count < kMaxPendingWrites; | ||
} | ||
|
||
/** Given mutations to commit, actually commits them to the backend. */ | ||
- (void)commitBatch:(FSTMutationBatch *)batch { | ||
HARD_ASSERT([self canWriteMutations], "commitBatch called when mutations can't be written"); | ||
self.lastBatchSeen = batch.batchID; | ||
/** | ||
* Queues additional writes to be sent to the write stream, sending them immediately if the write | ||
* stream is established, else starting the write stream if it is not yet started. | ||
*/ | ||
- (void)addBatchToWritePipeline:(FSTMutationBatch *)batch { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I remember the few bits about naming on iOS correctly, then this method name should end in its argument type. I can't come up with any sane suggestions, and since this is a fine C++ name, we may be able to ignore this if you also can't think of a name. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I don't see a good way of doing that and we have other similarly-named methods already (e.g. |
||
HARD_ASSERT([self canAddToWritePipeline], | ||
"addBatchToWritePipeline called when mutations can't be written"); | ||
|
||
[self.pendingWrites addObject:batch]; | ||
[self.writePipeline addObject:batch]; | ||
|
||
if ([self shouldStartWriteStream]) { | ||
[self startWriteStream]; | ||
|
@@ -490,17 +503,8 @@ - (void)writeStreamDidCompleteHandshake { | |
// Record the stream token. | ||
[self.localStore setLastStreamToken:self.writeStream.lastStreamToken]; | ||
|
||
// Drain any pending writes. | ||
// | ||
// Note that at this point pendingWrites contains mutations that have already been accepted by | ||
// fillWritePipeline/commitBatch. If the pipeline is full, canWriteMutations will be NO, despite | ||
// the fact that we actually need to send mutations over. | ||
// | ||
// This also means that this method indirectly respects the limits imposed by canWriteMutations | ||
// since writes can't be added to the pendingWrites array when canWriteMutations is NO. If the | ||
// limits imposed by canWriteMutations actually protect us from DOSing ourselves then those limits | ||
// won't be exceeded here and we'll continue to make progress. | ||
for (FSTMutationBatch *write in self.pendingWrites) { | ||
// Send the write pipeline now that the stream is established. | ||
for (FSTMutationBatch *write in self.writePipeline) { | ||
[self.writeStream writeMutations:write.mutations]; | ||
} | ||
} | ||
|
@@ -509,10 +513,10 @@ - (void)writeStreamDidCompleteHandshake { | |
- (void)writeStreamDidReceiveResponseWithVersion:(const SnapshotVersion &)commitVersion | ||
mutationResults:(NSArray<FSTMutationResult *> *)results { | ||
// This is a response to a write containing mutations and should be correlated to the first | ||
// pending write. | ||
NSMutableArray *pendingWrites = self.pendingWrites; | ||
FSTMutationBatch *batch = pendingWrites[0]; | ||
[pendingWrites removeObjectAtIndex:0]; | ||
// write in our write pipeline. | ||
NSMutableArray *writePipeline = self.writePipeline; | ||
FSTMutationBatch *batch = writePipeline[0]; | ||
[writePipeline removeObjectAtIndex:0]; | ||
|
||
FSTMutationBatchResult *batchResult = | ||
[FSTMutationBatchResult resultWithBatch:batch | ||
|
@@ -535,7 +539,7 @@ - (void)writeStreamWasInterruptedWithError:(nullable NSError *)error { | |
|
||
// If the write stream closed due to an error, invoke the error callbacks if there are pending | ||
// writes. | ||
if (error != nil && self.pendingWrites.count > 0) { | ||
if (error != nil && self.writePipeline.count > 0) { | ||
if (self.writeStream.handshakeComplete) { | ||
// This error affects the actual writes. | ||
[self handleWriteError:error]; | ||
|
@@ -572,8 +576,8 @@ - (void)handleWriteError:(NSError *)error { | |
|
||
// If this was a permanent error, the request itself was the problem so it's not going to | ||
// succeed if we resend it. | ||
FSTMutationBatch *batch = self.pendingWrites[0]; | ||
[self.pendingWrites removeObjectAtIndex:0]; | ||
FSTMutationBatch *batch = self.writePipeline[0]; | ||
[self.writePipeline removeObjectAtIndex:0]; | ||
|
||
// In this case it's also unlikely that the server itself is melting down--this was just a | ||
// bad request so inhibit backoff on the next restart. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rename
isNetworkEnabled
tocanUseNetwork
like in the other ports?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but that happened in a later PR that I'm not porting (yet).