Skip to content

Commit 63e1414

Browse files
authored
Refactor pendingWrites / write pipeline. (#972)
No functional changes. Just renames, moves, added comments, etc. * pendingWrites => writePipeline * canWriteMutations() renamed canAddToWritePipeline() * commit() => addToWritePipeline() * lastBatchSeen removed since we can compute it on demand from writePipeline. * Removed cleanUpWriteStreamState() and instead inlined it in disableNetworkInternal() since I didn't like the non-symmetry with cleanUpWatchStreamState() which is called every time the stream closes. * Lots of comment cleanup.
1 parent 27d2778 commit 63e1414

File tree

1 file changed

+64
-66
lines changed

1 file changed

+64
-66
lines changed

packages/firestore/src/remote/remote_store.ts

+64-66
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import { User } from '../auth/user';
1818
import { SnapshotVersion } from '../core/snapshot_version';
1919
import { Transaction } from '../core/transaction';
20-
import { BatchId, OnlineState, TargetId } from '../core/types';
20+
import { OnlineState, TargetId } from '../core/types';
2121
import { LocalStore } from '../local/local_store';
2222
import { QueryData, QueryPurpose } from '../local/query_data';
2323
import { MutationResult } from '../model/mutation';
@@ -77,8 +77,24 @@ const MAX_PENDING_WRITES = 10;
7777
* - acking mutations to the SyncEngine once they are accepted or rejected.
7878
*/
7979
export class RemoteStore implements TargetMetadataProvider {
80-
private pendingWrites: MutationBatch[] = [];
81-
private lastBatchSeen: BatchId = BATCHID_UNKNOWN;
80+
/**
81+
* A list of up to MAX_PENDING_WRITES writes that we have fetched from the
82+
* LocalStore via fillWritePipeline() and have or will send to the write
83+
* stream.
84+
*
85+
* Whenever writePipeline.length > 0 the RemoteStore will attempt to start or
86+
* restart the write stream. When the stream is established the writes in the
87+
* pipeline will be sent in order.
88+
*
89+
* Writes remain in writePipeline until they are acknowledged by the backend
90+
* and thus will automatically be re-sent if the stream is interrupted /
91+
* restarted before they're acknowledged.
92+
*
93+
* Write responses from the backend are linked to their originating request
94+
* purely based on order, and so we can just shift() writes from the front of
95+
* the writePipeline as we receive responses.
96+
*/
97+
private writePipeline: MutationBatch[] = [];
8298

8399
/**
84100
* A mapping of watched targets that the client cares about tracking and the
@@ -177,7 +193,16 @@ export class RemoteStore implements TargetMetadataProvider {
177193
this.writeStream.stop();
178194

179195
this.cleanUpWatchStreamState();
180-
this.cleanUpWriteStreamState();
196+
197+
log.debug(
198+
LOG_TAG,
199+
'Stopping write stream with ' +
200+
this.writePipeline.length +
201+
' pending writes'
202+
);
203+
// TODO(mikelehen): We only actually need to clear the write pipeline if
204+
// this is being called as part of handleUserChange(). Consider reworking.
205+
this.writePipeline = [];
181206

182207
this.writeStream = null;
183208
this.watchStream = null;
@@ -439,72 +464,61 @@ export class RemoteStore implements TargetMetadataProvider {
439464
return promiseChain;
440465
}
441466

442-
cleanUpWriteStreamState(): void {
443-
this.lastBatchSeen = BATCHID_UNKNOWN;
444-
log.debug(
445-
LOG_TAG,
446-
'Stopping write stream with ' +
447-
this.pendingWrites.length +
448-
' pending writes'
449-
);
450-
this.pendingWrites = [];
451-
}
452-
453467
/**
454-
* Notifies that there are new mutations to process in the queue. This is
455-
* typically called by SyncEngine after it has sent mutations to LocalStore.
468+
* Attempts to fill our write pipeline with writes from the LocalStore.
469+
*
470+
* Called internally to bootstrap or refill the write pipeline and by
471+
* SyncEngine whenever there are new mutations to process.
472+
*
473+
* Starts the write stream if necessary.
456474
*/
457475
async fillWritePipeline(): Promise<void> {
458-
if (this.canWriteMutations()) {
476+
if (this.canAddToWritePipeline()) {
477+
const lastBatchIdRetrieved =
478+
this.writePipeline.length > 0
479+
? this.writePipeline[this.writePipeline.length - 1].batchId
480+
: BATCHID_UNKNOWN;
459481
return this.localStore
460-
.nextMutationBatch(this.lastBatchSeen)
482+
.nextMutationBatch(lastBatchIdRetrieved)
461483
.then(batch => {
462484
if (batch === null) {
463-
if (this.pendingWrites.length === 0) {
485+
if (this.writePipeline.length === 0) {
464486
this.writeStream.markIdle();
465487
}
466488
} else {
467-
this.commit(batch);
489+
this.addToWritePipeline(batch);
468490
return this.fillWritePipeline();
469491
}
470492
});
471493
}
472494
}
473495

474496
/**
475-
* Returns true if the backend can accept additional write requests.
476-
*
477-
* When sending mutations to the write stream (e.g. in fillWritePipeline),
478-
* call this method first to check if more mutations can be sent.
479-
*
480-
* Currently the only thing that can prevent the backend from accepting
481-
* write requests is if there are too many requests already outstanding. As
482-
* writes complete the backend will be able to accept more.
497+
* Returns true if we can add to the write pipeline (i.e. it is not full and
498+
* the network is enabled).
483499
*/
484-
canWriteMutations(): boolean {
500+
private canAddToWritePipeline(): boolean {
485501
return (
486-
this.isNetworkEnabled() && this.pendingWrites.length < MAX_PENDING_WRITES
502+
this.isNetworkEnabled() && this.writePipeline.length < MAX_PENDING_WRITES
487503
);
488504
}
489505

490506
// For testing
491507
outstandingWrites(): number {
492-
return this.pendingWrites.length;
508+
return this.writePipeline.length;
493509
}
494510

495511
/**
496-
* Given mutations to commit, actually commits them to the Datastore. Note
497-
* that this does *not* return a Promise specifically because the AsyncQueue
498-
* should not block operations for this.
512+
* Queues additional writes to be sent to the write stream, sending them
513+
* immediately if the write stream is established, else starting the write
514+
* stream if it is not yet started.
499515
*/
500-
private commit(batch: MutationBatch): void {
516+
private addToWritePipeline(batch: MutationBatch): void {
501517
assert(
502-
this.canWriteMutations(),
503-
"commit called when batches can't be written"
518+
this.canAddToWritePipeline(),
519+
'addToWritePipeline called when pipeline is full'
504520
);
505-
this.lastBatchSeen = batch.batchId;
506-
507-
this.pendingWrites.push(batch);
521+
this.writePipeline.push(batch);
508522

509523
if (this.shouldStartWriteStream()) {
510524
this.startWriteStream();
@@ -517,7 +531,7 @@ export class RemoteStore implements TargetMetadataProvider {
517531
return (
518532
this.isNetworkEnabled() &&
519533
!this.writeStream.isStarted() &&
520-
this.pendingWrites.length > 0
534+
this.writePipeline.length > 0
521535
);
522536
}
523537

@@ -543,20 +557,8 @@ export class RemoteStore implements TargetMetadataProvider {
543557
return this.localStore
544558
.setLastStreamToken(this.writeStream.lastStreamToken)
545559
.then(() => {
546-
// Drain any pending writes.
547-
//
548-
// Note that at this point pendingWrites contains mutations that
549-
// have already been accepted by fillWritePipeline/commitBatch. If
550-
// the pipeline is full, canWriteMutations will be false, despite
551-
// the fact that we actually need to send mutations over.
552-
//
553-
// This also means that this method indirectly respects the limits
554-
// imposed by canWriteMutations since writes can't be added to the
555-
// pendingWrites array when canWriteMutations is false. If the
556-
// limits imposed by canWriteMutations actually protect us from
557-
// DOSing ourselves then those limits won't be exceeded here and
558-
// we'll continue to make progress.
559-
for (const batch of this.pendingWrites) {
560+
// Send the write pipeline now that the stream is established.
561+
for (const batch of this.writePipeline) {
560562
this.writeStream.writeMutations(batch.mutations);
561563
}
562564
});
@@ -567,12 +569,12 @@ export class RemoteStore implements TargetMetadataProvider {
567569
results: MutationResult[]
568570
): Promise<void> {
569571
// This is a response to a write containing mutations and should be
570-
// correlated to the first pending write.
572+
// correlated to the first write in our write pipeline.
571573
assert(
572-
this.pendingWrites.length > 0,
573-
'Got result for empty pending writes'
574+
this.writePipeline.length > 0,
575+
'Got result for empty write pipeline'
574576
);
575-
const batch = this.pendingWrites.shift()!;
577+
const batch = this.writePipeline.shift()!;
576578
const success = MutationBatchResult.from(
577579
batch,
578580
commitVersion,
@@ -594,11 +596,7 @@ export class RemoteStore implements TargetMetadataProvider {
594596

595597
// If the write stream closed due to an error, invoke the error callbacks if
596598
// there are pending writes.
597-
if (error && this.pendingWrites.length > 0) {
598-
assert(
599-
!!error,
600-
'We have pending writes, but the write stream closed without an error'
601-
);
599+
if (error && this.writePipeline.length > 0) {
602600
// A promise that is resolved after we processed the error
603601
let errorHandling: Promise<void>;
604602
if (this.writeStream.handshakeComplete) {
@@ -644,7 +642,7 @@ export class RemoteStore implements TargetMetadataProvider {
644642
if (isPermanentError(error.code)) {
645643
// This was a permanent error, the request itself was the problem
646644
// so it's not going to succeed if we resend it.
647-
const batch = this.pendingWrites.shift()!;
645+
const batch = this.writePipeline.shift()!;
648646

649647
// In this case it's also unlikely that the server itself is melting
650648
// down -- this was just a bad request so inhibit backoff on the next

0 commit comments

Comments
 (0)