Skip to content

Commit 5b3e9fe

Browse files
author
Michael Lehenbauer
committed
Refactor pendingWrites / write pipeline.
[Port of firebase/firebase-js-sdk#972] No functional changes. Just renames, moves, added comments, etc. * pendingWrites => writePipeline * canWriteMutations renamed canAddToWritePipeline * commitBatch => addBatchToWritePipeline * lastBatchSeen removed since we can compute it on demand from writePipeline. * Removed cleanUpWriteStreamState and instead inlined it in disableNetworkInternal since I didn't like the non-symmetry with cleanUpWatchStreamState which is called every time the stream closes. * Misc. comment cleanup.
1 parent 270b1cd commit 5b3e9fe

File tree

2 files changed

+64
-60
lines changed

2 files changed

+64
-60
lines changed

Firestore/Example/Tests/Integration/FSTDatastoreTests.mm

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
NS_ASSUME_NONNULL_BEGIN
5656

5757
@interface FSTRemoteStore (Tests)
58-
- (void)commitBatch:(FSTMutationBatch *)batch;
58+
- (void)addToWritePipeline:(FSTMutationBatch *)batch;
5959
@end
6060

6161
#pragma mark - FSTRemoteStoreEventCapture
@@ -224,7 +224,7 @@ - (void)testStreamingWrite {
224224
localWriteTime:[FIRTimestamp timestamp]
225225
mutations:@[ mutation ]];
226226
[_testWorkerQueue dispatchAsync:^{
227-
[_remoteStore commitBatch:batch];
227+
[_remoteStore addToWritePipeline:batch];
228228
}];
229229

230230
[self awaitExpectations];

Firestore/Source/Remote/FSTRemoteStore.mm

Lines changed: 62 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,6 @@ @interface FSTRemoteStore () <FSTWatchStreamDelegate, FSTWriteStreamDelegate>
9797
* is reached the targetId is removed from the map to free the memory).
9898
*/
9999

100-
@property(nonatomic, assign) FSTBatchID lastBatchSeen;
101-
102100
@property(nonatomic, strong, readonly) FSTOnlineStateTracker *onlineStateTracker;
103101

104102
@property(nonatomic, strong, nullable) FSTWatchChangeAggregator *watchChangeAggregator;
@@ -109,11 +107,20 @@ @interface FSTRemoteStore () <FSTWatchStreamDelegate, FSTWriteStreamDelegate>
109107
@property(nonatomic, strong, nullable) FSTWriteStream *writeStream;
110108

111109
/**
112-
* A FIFO queue of in-flight writes. This is in-flight from the point of view of the caller of
113-
* writeMutations, not from the point of view from the Datastore itself. In particular, these
114-
* requests may not have been sent to the Datastore server if the write stream is not yet running.
110+
* A list of up to kMaxPendingWrites writes that we have fetched from the LocalStore via
111+
* fillWritePipeline and have or will send to the write stream.
112+
*
113+
* Whenever writePipeline is not empty the RemoteStore will attempt to start or restart the write
114+
* stream. When the stream is established the writes in the pipeline will be sent in order.
115+
*
116+
* Writes remain in writePipeline until they are acknowledged by the backend and thus will
117+
* automatically be re-sent if the stream is interrupted / restarted before they're acknowledged.
118+
*
119+
* Write responses from the backend are linked to their originating request purely based on
120+
* order, and so we can just remove writes from the front of the writePipeline as we receive
121+
* responses.
115122
*/
116-
@property(nonatomic, strong, readonly) NSMutableArray<FSTMutationBatch *> *pendingWrites;
123+
@property(nonatomic, strong, readonly) NSMutableArray<FSTMutationBatch *> *writePipeline;
117124
@end
118125

119126
@implementation FSTRemoteStore
@@ -126,8 +133,7 @@ - (instancetype)initWithLocalStore:(FSTLocalStore *)localStore
126133
_datastore = datastore;
127134
_listenTargets = [NSMutableDictionary dictionary];
128135

129-
_lastBatchSeen = kFSTBatchIDUnknown;
130-
_pendingWrites = [NSMutableArray array];
136+
_writePipeline = [NSMutableArray array];
131137
_onlineStateTracker = [[FSTOnlineStateTracker alloc] initWithWorkerDispatchQueue:queue];
132138
}
133139
return self;
@@ -192,7 +198,12 @@ - (void)disableNetworkInternal {
192198
[self.writeStream stop];
193199

194200
[self cleanUpWatchStreamState];
195-
[self cleanUpWriteStreamState];
201+
202+
if (self.writePipeline.count > 0) {
203+
LOG_DEBUG("Stopping write stream with %lu pending writes",
204+
(unsigned long)self.writePipeline.count);
205+
[self.writePipeline removeAllObjects];
206+
}
196207

197208
self.writeStream = nil;
198209
self.watchStream = nil;
@@ -432,7 +443,7 @@ - (nullable FSTQueryData *)queryDataForTarget:(FSTBoxedTargetID *)targetID {
432443
* pending writes.
433444
*/
434445
- (BOOL)shouldStartWriteStream {
435-
return [self isNetworkEnabled] && ![self.writeStream isStarted] && self.pendingWrites.count > 0;
446+
return [self isNetworkEnabled] && ![self.writeStream isStarted] && self.writePipeline.count > 0;
436447
}
437448

438449
- (void)startWriteStream {
@@ -442,48 +453,50 @@ - (void)startWriteStream {
442453
[self.writeStream startWithDelegate:self];
443454
}
444455

445-
- (void)cleanUpWriteStreamState {
446-
self.lastBatchSeen = kFSTBatchIDUnknown;
447-
LOG_DEBUG("Stopping write stream with %s pending writes", [self.pendingWrites count]);
448-
[self.pendingWrites removeAllObjects];
449-
}
450-
456+
/**
457+
* Attempts to fill our write pipeline with writes from the LocalStore.
458+
*
459+
* Called internally to bootstrap or refill the write pipeline and by SyncEngine whenever there
460+
* are new mutations to process.
461+
*
462+
* Starts the write stream if necessary.
463+
*/
451464
- (void)fillWritePipeline {
452-
if ([self isNetworkEnabled]) {
453-
while ([self canWriteMutations]) {
454-
FSTMutationBatch *batch = [self.localStore nextMutationBatchAfterBatchID:self.lastBatchSeen];
455-
if (!batch) {
456-
break;
465+
FSTBatchID lastBatchIDRetrieved =
466+
self.writePipeline.count == 0 ? kFSTBatchIDUnknown : self.writePipeline.lastObject.batchID;
467+
while ([self canAddToWritePipeline]) {
468+
FSTMutationBatch *batch = [self.localStore nextMutationBatchAfterBatchID:lastBatchIDRetrieved];
469+
if (!batch) {
470+
if (self.writePipeline.count == 0) {
471+
[self.writeStream markIdle];
457472
}
458-
[self commitBatch:batch];
473+
break;
459474
}
475+
[self addBatchToWritePipeline:batch];
476+
lastBatchIDRetrieved = batch.batchID;
477+
}
460478

461-
if ([self.pendingWrites count] == 0) {
462-
[self.writeStream markIdle];
463-
}
479+
if ([self shouldStartWriteStream]) {
480+
[self startWriteStream];
464481
}
465482
}
466483

467484
/**
468-
* Returns YES if the backend can accept additional write requests.
469-
*
470-
* When sending mutations to the write stream (e.g. in -fillWritePipeline), call this method first
471-
* to check if more mutations can be sent.
472-
*
473-
* Currently the only thing that can prevent the backend from accepting write requests is if
474-
* there are too many requests already outstanding. As writes complete the backend will be able
475-
* to accept more.
485+
* Returns YES if we can add to the write pipeline (i.e. it is not full and the network is enabled).
476486
*/
477-
- (BOOL)canWriteMutations {
478-
return [self isNetworkEnabled] && self.pendingWrites.count < kMaxPendingWrites;
487+
- (BOOL)canAddToWritePipeline {
488+
return [self isNetworkEnabled] && self.writePipeline.count < kMaxPendingWrites;
479489
}
480490

481-
/** Given mutations to commit, actually commits them to the backend. */
482-
- (void)commitBatch:(FSTMutationBatch *)batch {
483-
HARD_ASSERT([self canWriteMutations], "commitBatch called when mutations can't be written");
484-
self.lastBatchSeen = batch.batchID;
491+
/**
492+
* Queues additional writes to be sent to the write stream, sending them immediately if the write
493+
* stream is established, else starting the write stream if it is not yet started.
494+
*/
495+
- (void)addBatchToWritePipeline:(FSTMutationBatch *)batch {
496+
HARD_ASSERT([self canAddToWritePipeline],
497+
"addBatchToWritePipeline called when mutations can't be written");
485498

486-
[self.pendingWrites addObject:batch];
499+
[self.writePipeline addObject:batch];
487500

488501
if ([self shouldStartWriteStream]) {
489502
[self startWriteStream];
@@ -504,17 +517,8 @@ - (void)writeStreamDidCompleteHandshake {
504517
// Record the stream token.
505518
[self.localStore setLastStreamToken:self.writeStream.lastStreamToken];
506519

507-
// Drain any pending writes.
508-
//
509-
// Note that at this point pendingWrites contains mutations that have already been accepted by
510-
// fillWritePipeline/commitBatch. If the pipeline is full, canWriteMutations will be NO, despite
511-
// the fact that we actually need to send mutations over.
512-
//
513-
// This also means that this method indirectly respects the limits imposed by canWriteMutations
514-
// since writes can't be added to the pendingWrites array when canWriteMutations is NO. If the
515-
// limits imposed by canWriteMutations actually protect us from DOSing ourselves then those limits
516-
// won't be exceeded here and we'll continue to make progress.
517-
for (FSTMutationBatch *write in self.pendingWrites) {
520+
// Send the write pipeline now that the stream is established.
521+
for (FSTMutationBatch *write in self.writePipeline) {
518522
[self.writeStream writeMutations:write.mutations];
519523
}
520524
}
@@ -523,10 +527,10 @@ - (void)writeStreamDidCompleteHandshake {
523527
- (void)writeStreamDidReceiveResponseWithVersion:(const SnapshotVersion &)commitVersion
524528
mutationResults:(NSArray<FSTMutationResult *> *)results {
525529
// This is a response to a write containing mutations and should be correlated to the first
526-
// pending write.
527-
NSMutableArray *pendingWrites = self.pendingWrites;
528-
FSTMutationBatch *batch = pendingWrites[0];
529-
[pendingWrites removeObjectAtIndex:0];
530+
// write in our write pipeline.
531+
NSMutableArray *writePipeline = self.writePipeline;
532+
FSTMutationBatch *batch = writePipeline[0];
533+
[writePipeline removeObjectAtIndex:0];
530534

531535
FSTMutationBatchResult *batchResult =
532536
[FSTMutationBatchResult resultWithBatch:batch
@@ -549,7 +553,7 @@ - (void)writeStreamWasInterruptedWithError:(nullable NSError *)error {
549553

550554
// If the write stream closed due to an error, invoke the error callbacks if there are pending
551555
// writes.
552-
if (error != nil && self.pendingWrites.count > 0) {
556+
if (error != nil && self.writePipeline.count > 0) {
553557
if (self.writeStream.handshakeComplete) {
554558
// This error affects the actual writes.
555559
[self handleWriteError:error];
@@ -586,8 +590,8 @@ - (void)handleWriteError:(NSError *)error {
586590

587591
// If this was a permanent error, the request itself was the problem so it's not going to
588592
// succeed if we resend it.
589-
FSTMutationBatch *batch = self.pendingWrites[0];
590-
[self.pendingWrites removeObjectAtIndex:0];
593+
FSTMutationBatch *batch = self.writePipeline[0];
594+
[self.writePipeline removeObjectAtIndex:0];
591595

592596
// In this case it's also unlikely that the server itself is melting down--this was just a
593597
// bad request so inhibit backoff on the next restart.

0 commit comments

Comments
 (0)