diff --git a/Firestore/Example/Tests/Integration/FSTDatastoreTests.mm b/Firestore/Example/Tests/Integration/FSTDatastoreTests.mm index 873b9487967..756b4ed6eb6 100644 --- a/Firestore/Example/Tests/Integration/FSTDatastoreTests.mm +++ b/Firestore/Example/Tests/Integration/FSTDatastoreTests.mm @@ -55,7 +55,7 @@ NS_ASSUME_NONNULL_BEGIN @interface FSTRemoteStore (Tests) -- (void)commitBatch:(FSTMutationBatch *)batch; +- (void)addBatchToWritePipeline:(FSTMutationBatch *)batch; @end #pragma mark - FSTRemoteStoreEventCapture @@ -224,7 +224,7 @@ - (void)testStreamingWrite { localWriteTime:[FIRTimestamp timestamp] mutations:@[ mutation ]]; [_testWorkerQueue dispatchAsync:^{ - [_remoteStore commitBatch:batch]; + [_remoteStore addBatchToWritePipeline:batch]; }]; [self awaitExpectations]; diff --git a/Firestore/Source/Remote/FSTRemoteStore.mm b/Firestore/Source/Remote/FSTRemoteStore.mm index e1c0caa4c32..ce7f4807ef3 100644 --- a/Firestore/Source/Remote/FSTRemoteStore.mm +++ b/Firestore/Source/Remote/FSTRemoteStore.mm @@ -83,8 +83,6 @@ @interface FSTRemoteStore () @property(nonatomic, strong, readonly) NSMutableDictionary *listenTargets; -@property(nonatomic, assign) FSTBatchID lastBatchSeen; - @property(nonatomic, strong, readonly) FSTOnlineStateTracker *onlineStateTracker; @property(nonatomic, strong, nullable) FSTWatchChangeAggregator *watchChangeAggregator; @@ -95,11 +93,20 @@ @interface FSTRemoteStore () @property(nonatomic, strong, nullable) FSTWriteStream *writeStream; /** - * A FIFO queue of in-flight writes. This is in-flight from the point of view of the caller of - * writeMutations, not from the point of view from the Datastore itself. In particular, these - * requests may not have been sent to the Datastore server if the write stream is not yet running. + * A list of up to kMaxPendingWrites writes that we have fetched from the LocalStore via + * fillWritePipeline and have or will send to the write stream. + * + * Whenever writePipeline is not empty, the RemoteStore will attempt to start or restart the write + * stream. When the stream is established, the writes in the pipeline will be sent in order. + * + * Writes remain in writePipeline until they are acknowledged by the backend and thus will + * automatically be re-sent if the stream is interrupted / restarted before they're acknowledged. + * + * Write responses from the backend are linked to their originating request purely based on + * order, and so we can just remove writes from the front of the writePipeline as we receive + * responses. */ -@property(nonatomic, strong, readonly) NSMutableArray *pendingWrites; +@property(nonatomic, strong, readonly) NSMutableArray *writePipeline; @end @implementation FSTRemoteStore @@ -112,8 +119,7 @@ - (instancetype)initWithLocalStore:(FSTLocalStore *)localStore _datastore = datastore; _listenTargets = [NSMutableDictionary dictionary]; - _lastBatchSeen = kFSTBatchIDUnknown; - _pendingWrites = [NSMutableArray array]; + _writePipeline = [NSMutableArray array]; _onlineStateTracker = [[FSTOnlineStateTracker alloc] initWithWorkerDispatchQueue:queue]; } return self; @@ -178,7 +184,12 @@ - (void)disableNetworkInternal { [self.writeStream stop]; [self cleanUpWatchStreamState]; - [self cleanUpWriteStreamState]; + + if (self.writePipeline.count > 0) { + LOG_DEBUG("Stopping write stream with %lu pending writes", + (unsigned long)self.writePipeline.count); + [self.writePipeline removeAllObjects]; + } self.writeStream = nil; self.watchStream = nil; @@ -418,7 +429,7 @@ - (nullable FSTQueryData *)queryDataForTarget:(FSTBoxedTargetID *)targetID { * pending writes. */ - (BOOL)shouldStartWriteStream { - return [self isNetworkEnabled] && ![self.writeStream isStarted] && self.pendingWrites.count > 0; + return [self isNetworkEnabled] && ![self.writeStream isStarted] && self.writePipeline.count > 0; } - (void)startWriteStream { @@ -428,48 +439,50 @@ - (void)startWriteStream { [self.writeStream startWithDelegate:self]; } -- (void)cleanUpWriteStreamState { - self.lastBatchSeen = kFSTBatchIDUnknown; - LOG_DEBUG("Stopping write stream with %s pending writes", [self.pendingWrites count]); - [self.pendingWrites removeAllObjects]; -} - +/** + * Attempts to fill our write pipeline with writes from the LocalStore. + * + * Called internally to bootstrap or refill the write pipeline and by SyncEngine whenever there + * are new mutations to process. + * + * Starts the write stream if necessary. + */ - (void)fillWritePipeline { - if ([self isNetworkEnabled]) { - while ([self canWriteMutations]) { - FSTMutationBatch *batch = [self.localStore nextMutationBatchAfterBatchID:self.lastBatchSeen]; - if (!batch) { - break; + FSTBatchID lastBatchIDRetrieved = + self.writePipeline.count == 0 ? kFSTBatchIDUnknown : self.writePipeline.lastObject.batchID; + while ([self canAddToWritePipeline]) { + FSTMutationBatch *batch = [self.localStore nextMutationBatchAfterBatchID:lastBatchIDRetrieved]; + if (!batch) { + if (self.writePipeline.count == 0) { + [self.writeStream markIdle]; } - [self commitBatch:batch]; + break; } + [self addBatchToWritePipeline:batch]; + lastBatchIDRetrieved = batch.batchID; + } - if ([self.pendingWrites count] == 0) { - [self.writeStream markIdle]; - } + if ([self shouldStartWriteStream]) { + [self startWriteStream]; } } /** - * Returns YES if the backend can accept additional write requests. - * - * When sending mutations to the write stream (e.g. in -fillWritePipeline), call this method first - * to check if more mutations can be sent. - * - * Currently the only thing that can prevent the backend from accepting write requests is if - * there are too many requests already outstanding. As writes complete the backend will be able - * to accept more. + * Returns YES if we can add to the write pipeline (i.e. it is not full and the network is enabled). */ -- (BOOL)canWriteMutations { - return [self isNetworkEnabled] && self.pendingWrites.count < kMaxPendingWrites; +- (BOOL)canAddToWritePipeline { + return [self isNetworkEnabled] && self.writePipeline.count < kMaxPendingWrites; } -/** Given mutations to commit, actually commits them to the backend. */ -- (void)commitBatch:(FSTMutationBatch *)batch { - HARD_ASSERT([self canWriteMutations], "commitBatch called when mutations can't be written"); - self.lastBatchSeen = batch.batchID; +/** + * Queues additional writes to be sent to the write stream, sending them immediately if the write + * stream is established, else starting the write stream if it is not yet started. + */ +- (void)addBatchToWritePipeline:(FSTMutationBatch *)batch { + HARD_ASSERT([self canAddToWritePipeline], + "addBatchToWritePipeline called when mutations can't be written"); - [self.pendingWrites addObject:batch]; + [self.writePipeline addObject:batch]; if ([self shouldStartWriteStream]) { [self startWriteStream]; @@ -490,17 +503,8 @@ - (void)writeStreamDidCompleteHandshake { // Record the stream token. [self.localStore setLastStreamToken:self.writeStream.lastStreamToken]; - // Drain any pending writes. - // - // Note that at this point pendingWrites contains mutations that have already been accepted by - // fillWritePipeline/commitBatch. If the pipeline is full, canWriteMutations will be NO, despite - // the fact that we actually need to send mutations over. - // - // This also means that this method indirectly respects the limits imposed by canWriteMutations - // since writes can't be added to the pendingWrites array when canWriteMutations is NO. If the - // limits imposed by canWriteMutations actually protect us from DOSing ourselves then those limits - // won't be exceeded here and we'll continue to make progress. - for (FSTMutationBatch *write in self.pendingWrites) { + // Send the write pipeline now that the stream is established. + for (FSTMutationBatch *write in self.writePipeline) { [self.writeStream writeMutations:write.mutations]; } } @@ -509,10 +513,10 @@ - (void)writeStreamDidCompleteHandshake { - (void)writeStreamDidReceiveResponseWithVersion:(const SnapshotVersion &)commitVersion mutationResults:(NSArray *)results { // This is a response to a write containing mutations and should be correlated to the first - // pending write. - NSMutableArray *pendingWrites = self.pendingWrites; - FSTMutationBatch *batch = pendingWrites[0]; - [pendingWrites removeObjectAtIndex:0]; + // write in our write pipeline. + NSMutableArray *writePipeline = self.writePipeline; + FSTMutationBatch *batch = writePipeline[0]; + [writePipeline removeObjectAtIndex:0]; FSTMutationBatchResult *batchResult = [FSTMutationBatchResult resultWithBatch:batch @@ -535,7 +539,7 @@ - (void)writeStreamWasInterruptedWithError:(nullable NSError *)error { // If the write stream closed due to an error, invoke the error callbacks if there are pending // writes. - if (error != nil && self.pendingWrites.count > 0) { + if (error != nil && self.writePipeline.count > 0) { if (self.writeStream.handshakeComplete) { // This error affects the actual writes. [self handleWriteError:error]; @@ -572,8 +576,8 @@ - (void)handleWriteError:(NSError *)error { // If this was a permanent error, the request itself was the problem so it's not going to // succeed if we resend it. - FSTMutationBatch *batch = self.pendingWrites[0]; - [self.pendingWrites removeObjectAtIndex:0]; + FSTMutationBatch *batch = self.writePipeline[0]; + [self.writePipeline removeObjectAtIndex:0]; // In this case it's also unlikely that the server itself is melting down--this was just a // bad request so inhibit backoff on the next restart.