Skip to content

Commit 97c499c

Browse files
authored
Refactor pendingWrites / write pipeline. (#1699)
[Port of firebase/firebase-js-sdk#972] No functional changes. Just renames, moves, added comments, etc. * pendingWrites => writePipeline * canWriteMutations renamed canAddToWritePipeline * commitBatch => addBatchToWritePipeline * lastBatchSeen removed since we can compute it on demand from writePipeline. * Removed cleanUpWriteStreamState and instead inlined it in disableNetworkInternal since I didn't like the non-symmetry with cleanUpWatchStreamState which is called every time the stream closes. * Misc. comment cleanup.
1 parent bb1a448 commit 97c499c

File tree

2 files changed

+64
-60
lines changed

2 files changed

+64
-60
lines changed

Firestore/Example/Tests/Integration/FSTDatastoreTests.mm

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
NS_ASSUME_NONNULL_BEGIN
5656

5757
@interface FSTRemoteStore (Tests)
58-
- (void)commitBatch:(FSTMutationBatch *)batch;
58+
- (void)addBatchToWritePipeline:(FSTMutationBatch *)batch;
5959
@end
6060

6161
#pragma mark - FSTRemoteStoreEventCapture
@@ -224,7 +224,7 @@ - (void)testStreamingWrite {
224224
localWriteTime:[FIRTimestamp timestamp]
225225
mutations:@[ mutation ]];
226226
[_testWorkerQueue dispatchAsync:^{
227-
[_remoteStore commitBatch:batch];
227+
[_remoteStore addBatchToWritePipeline:batch];
228228
}];
229229

230230
[self awaitExpectations];

Firestore/Source/Remote/FSTRemoteStore.mm

Lines changed: 62 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,6 @@ @interface FSTRemoteStore () <FSTWatchStreamDelegate, FSTWriteStreamDelegate>
8383
@property(nonatomic, strong, readonly)
8484
NSMutableDictionary<FSTBoxedTargetID *, FSTQueryData *> *listenTargets;
8585

86-
@property(nonatomic, assign) FSTBatchID lastBatchSeen;
87-
8886
@property(nonatomic, strong, readonly) FSTOnlineStateTracker *onlineStateTracker;
8987

9088
@property(nonatomic, strong, nullable) FSTWatchChangeAggregator *watchChangeAggregator;
@@ -95,11 +93,20 @@ @interface FSTRemoteStore () <FSTWatchStreamDelegate, FSTWriteStreamDelegate>
9593
@property(nonatomic, strong, nullable) FSTWriteStream *writeStream;
9694

9795
/**
98-
* A FIFO queue of in-flight writes. This is in-flight from the point of view of the caller of
99-
* writeMutations, not from the point of view from the Datastore itself. In particular, these
100-
* requests may not have been sent to the Datastore server if the write stream is not yet running.
96+
* A list of up to kMaxPendingWrites writes that we have fetched from the LocalStore via
97+
* fillWritePipeline and have or will send to the write stream.
98+
*
99+
* Whenever writePipeline is not empty, the RemoteStore will attempt to start or restart the write
100+
* stream. When the stream is established, the writes in the pipeline will be sent in order.
101+
*
102+
* Writes remain in writePipeline until they are acknowledged by the backend and thus will
103+
* automatically be re-sent if the stream is interrupted / restarted before they're acknowledged.
104+
*
105+
* Write responses from the backend are linked to their originating request purely based on
106+
* order, and so we can just remove writes from the front of the writePipeline as we receive
107+
* responses.
101108
*/
102-
@property(nonatomic, strong, readonly) NSMutableArray<FSTMutationBatch *> *pendingWrites;
109+
@property(nonatomic, strong, readonly) NSMutableArray<FSTMutationBatch *> *writePipeline;
103110
@end
104111

105112
@implementation FSTRemoteStore
@@ -112,8 +119,7 @@ - (instancetype)initWithLocalStore:(FSTLocalStore *)localStore
112119
_datastore = datastore;
113120
_listenTargets = [NSMutableDictionary dictionary];
114121

115-
_lastBatchSeen = kFSTBatchIDUnknown;
116-
_pendingWrites = [NSMutableArray array];
122+
_writePipeline = [NSMutableArray array];
117123
_onlineStateTracker = [[FSTOnlineStateTracker alloc] initWithWorkerDispatchQueue:queue];
118124
}
119125
return self;
@@ -178,7 +184,12 @@ - (void)disableNetworkInternal {
178184
[self.writeStream stop];
179185

180186
[self cleanUpWatchStreamState];
181-
[self cleanUpWriteStreamState];
187+
188+
if (self.writePipeline.count > 0) {
189+
LOG_DEBUG("Stopping write stream with %lu pending writes",
190+
(unsigned long)self.writePipeline.count);
191+
[self.writePipeline removeAllObjects];
192+
}
182193

183194
self.writeStream = nil;
184195
self.watchStream = nil;
@@ -418,7 +429,7 @@ - (nullable FSTQueryData *)queryDataForTarget:(FSTBoxedTargetID *)targetID {
418429
* pending writes.
419430
*/
420431
- (BOOL)shouldStartWriteStream {
421-
return [self isNetworkEnabled] && ![self.writeStream isStarted] && self.pendingWrites.count > 0;
432+
return [self isNetworkEnabled] && ![self.writeStream isStarted] && self.writePipeline.count > 0;
422433
}
423434

424435
- (void)startWriteStream {
@@ -428,48 +439,50 @@ - (void)startWriteStream {
428439
[self.writeStream startWithDelegate:self];
429440
}
430441

431-
- (void)cleanUpWriteStreamState {
432-
self.lastBatchSeen = kFSTBatchIDUnknown;
433-
LOG_DEBUG("Stopping write stream with %s pending writes", [self.pendingWrites count]);
434-
[self.pendingWrites removeAllObjects];
435-
}
436-
442+
/**
443+
* Attempts to fill our write pipeline with writes from the LocalStore.
444+
*
445+
* Called internally to bootstrap or refill the write pipeline and by SyncEngine whenever there
446+
* are new mutations to process.
447+
*
448+
* Starts the write stream if necessary.
449+
*/
437450
- (void)fillWritePipeline {
438-
if ([self isNetworkEnabled]) {
439-
while ([self canWriteMutations]) {
440-
FSTMutationBatch *batch = [self.localStore nextMutationBatchAfterBatchID:self.lastBatchSeen];
441-
if (!batch) {
442-
break;
451+
FSTBatchID lastBatchIDRetrieved =
452+
self.writePipeline.count == 0 ? kFSTBatchIDUnknown : self.writePipeline.lastObject.batchID;
453+
while ([self canAddToWritePipeline]) {
454+
FSTMutationBatch *batch = [self.localStore nextMutationBatchAfterBatchID:lastBatchIDRetrieved];
455+
if (!batch) {
456+
if (self.writePipeline.count == 0) {
457+
[self.writeStream markIdle];
443458
}
444-
[self commitBatch:batch];
459+
break;
445460
}
461+
[self addBatchToWritePipeline:batch];
462+
lastBatchIDRetrieved = batch.batchID;
463+
}
446464

447-
if ([self.pendingWrites count] == 0) {
448-
[self.writeStream markIdle];
449-
}
465+
if ([self shouldStartWriteStream]) {
466+
[self startWriteStream];
450467
}
451468
}
452469

453470
/**
454-
* Returns YES if the backend can accept additional write requests.
455-
*
456-
* When sending mutations to the write stream (e.g. in -fillWritePipeline), call this method first
457-
* to check if more mutations can be sent.
458-
*
459-
* Currently the only thing that can prevent the backend from accepting write requests is if
460-
* there are too many requests already outstanding. As writes complete the backend will be able
461-
* to accept more.
471+
* Returns YES if we can add to the write pipeline (i.e. it is not full and the network is enabled).
462472
*/
463-
- (BOOL)canWriteMutations {
464-
return [self isNetworkEnabled] && self.pendingWrites.count < kMaxPendingWrites;
473+
- (BOOL)canAddToWritePipeline {
474+
return [self isNetworkEnabled] && self.writePipeline.count < kMaxPendingWrites;
465475
}
466476

467-
/** Given mutations to commit, actually commits them to the backend. */
468-
- (void)commitBatch:(FSTMutationBatch *)batch {
469-
HARD_ASSERT([self canWriteMutations], "commitBatch called when mutations can't be written");
470-
self.lastBatchSeen = batch.batchID;
477+
/**
478+
* Queues additional writes to be sent to the write stream, sending them immediately if the write
479+
* stream is established, else starting the write stream if it is not yet started.
480+
*/
481+
- (void)addBatchToWritePipeline:(FSTMutationBatch *)batch {
482+
HARD_ASSERT([self canAddToWritePipeline],
483+
"addBatchToWritePipeline called when mutations can't be written");
471484

472-
[self.pendingWrites addObject:batch];
485+
[self.writePipeline addObject:batch];
473486

474487
if ([self shouldStartWriteStream]) {
475488
[self startWriteStream];
@@ -490,17 +503,8 @@ - (void)writeStreamDidCompleteHandshake {
490503
// Record the stream token.
491504
[self.localStore setLastStreamToken:self.writeStream.lastStreamToken];
492505

493-
// Drain any pending writes.
494-
//
495-
// Note that at this point pendingWrites contains mutations that have already been accepted by
496-
// fillWritePipeline/commitBatch. If the pipeline is full, canWriteMutations will be NO, despite
497-
// the fact that we actually need to send mutations over.
498-
//
499-
// This also means that this method indirectly respects the limits imposed by canWriteMutations
500-
// since writes can't be added to the pendingWrites array when canWriteMutations is NO. If the
501-
// limits imposed by canWriteMutations actually protect us from DOSing ourselves then those limits
502-
// won't be exceeded here and we'll continue to make progress.
503-
for (FSTMutationBatch *write in self.pendingWrites) {
506+
// Send the write pipeline now that the stream is established.
507+
for (FSTMutationBatch *write in self.writePipeline) {
504508
[self.writeStream writeMutations:write.mutations];
505509
}
506510
}
@@ -509,10 +513,10 @@ - (void)writeStreamDidCompleteHandshake {
509513
- (void)writeStreamDidReceiveResponseWithVersion:(const SnapshotVersion &)commitVersion
510514
mutationResults:(NSArray<FSTMutationResult *> *)results {
511515
// This is a response to a write containing mutations and should be correlated to the first
512-
// pending write.
513-
NSMutableArray *pendingWrites = self.pendingWrites;
514-
FSTMutationBatch *batch = pendingWrites[0];
515-
[pendingWrites removeObjectAtIndex:0];
516+
// write in our write pipeline.
517+
NSMutableArray *writePipeline = self.writePipeline;
518+
FSTMutationBatch *batch = writePipeline[0];
519+
[writePipeline removeObjectAtIndex:0];
516520

517521
FSTMutationBatchResult *batchResult =
518522
[FSTMutationBatchResult resultWithBatch:batch
@@ -535,7 +539,7 @@ - (void)writeStreamWasInterruptedWithError:(nullable NSError *)error {
535539

536540
// If the write stream closed due to an error, invoke the error callbacks if there are pending
537541
// writes.
538-
if (error != nil && self.pendingWrites.count > 0) {
542+
if (error != nil && self.writePipeline.count > 0) {
539543
if (self.writeStream.handshakeComplete) {
540544
// This error affects the actual writes.
541545
[self handleWriteError:error];
@@ -572,8 +576,8 @@ - (void)handleWriteError:(NSError *)error {
572576

573577
// If this was a permanent error, the request itself was the problem so it's not going to
574578
// succeed if we resend it.
575-
FSTMutationBatch *batch = self.pendingWrites[0];
576-
[self.pendingWrites removeObjectAtIndex:0];
579+
FSTMutationBatch *batch = self.writePipeline[0];
580+
[self.writePipeline removeObjectAtIndex:0];
577581

578582
// In this case it's also unlikely that the server itself is melting down--this was just a
579583
// bad request so inhibit backoff on the next restart.

0 commit comments

Comments
 (0)