Skip to content

Commit 9bcb18d

Browse files
authored
Refactor pendingWrites / write pipeline. (#217)
* Rename receiveListener => listener. This matches JS and I'm not sure why "receive" was added to the name. The listener is used for received data but also for close notifications, etc. * Refactor pendingWrites / write pipeline. [Port of firebase/firebase-js-sdk#972] No functional changes. Just renames, moves, added comments, etc. * pendingWrites => writePipeline * canWriteMutations() renamed canAddToWritePipeline() * commit() => addToWritePipeline() * lastBatchSeen removed since we can compute it on demand from writePipeline. * Removed cleanUpWriteStreamState() and instead inlined it in disableNetworkInternal() since I didn't like the non-symmetry with cleanUpWatchStreamState() which is called every time the stream closes. * Lots of comment cleanup.
1 parent 08ee952 commit 9bcb18d

File tree

5 files changed

+69
-75
lines changed

5 files changed

+69
-75
lines changed

firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public void run() {
152152
private State state = State.Initial;
153153
private ClientCall<ReqT, RespT> call;
154154
final ExponentialBackoff backoff;
155-
CallbackT receiveListener;
155+
CallbackT listener;
156156
StreamObserver streamObserver;
157157

158158
AbstractStream(
@@ -190,7 +190,7 @@ public boolean isOpen() {
190190
@Override
191191
public void start(CallbackT listener) {
192192
workerQueue.verifyIsCurrentThread();
193-
hardAssert(receiveListener == null, "Receive listener still set");
193+
hardAssert(this.listener == null, "Receive listener still set");
194194
hardAssert(call == null, "Last call still set");
195195
hardAssert(idleTimer == null, "Idle timer still set");
196196

@@ -201,7 +201,7 @@ public void start(CallbackT listener) {
201201

202202
hardAssert(state == State.Initial, "Already started");
203203

204-
receiveListener = listener;
204+
this.listener = listener;
205205

206206
streamObserver = new StreamObserver();
207207
call = firestoreChannel.runBidiStreamingRpc(methodDescriptor, streamObserver);
@@ -219,7 +219,7 @@ public void start(CallbackT listener) {
219219
// in the auth state.
220220
if (state == State.Auth) {
221221
state = State.Open;
222-
receiveListener.onOpen();
222+
this.listener.onOpen();
223223
}
224224
});
225225
}
@@ -238,7 +238,7 @@ public void start(CallbackT listener) {
238238
* 'State.Stop'.
239239
*
240240
* @param finalState the intended state of the stream after closing.
241-
* @param status the status to emit to the receiveListener.
241+
* @param status the status to emit to the listener.
242242
*/
243243
private void close(State finalState, Status status) {
244244
workerQueue.verifyIsCurrentThread();
@@ -292,14 +292,14 @@ private void close(State finalState, Status status) {
292292
call = null;
293293
}
294294

295-
// This state must be assigned before calling receiveListener.onClose to allow the callback to
295+
// This state must be assigned before calling listener.onClose to allow the callback to
296296
// inhibit backoff or otherwise manipulate the state in its non-started state.
297297
this.state = finalState;
298298

299299
// If the caller explicitly requested a stream stop, don't notify them of a closing stream (it
300300
// could trigger undesirable recovery logic, etc.).
301-
CallbackT receiveListener = this.receiveListener;
302-
this.receiveListener = null;
301+
CallbackT receiveListener = this.listener;
302+
this.listener = null;
303303
if (finalState != State.Stop) {
304304
receiveListener.onClose(status);
305305
}

firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java

Lines changed: 51 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,11 @@
4040
import io.grpc.Status;
4141
import io.grpc.Status.Code;
4242
import java.util.ArrayDeque;
43+
import java.util.Deque;
4344
import java.util.HashMap;
4445
import java.util.List;
4546
import java.util.Map;
4647
import java.util.Map.Entry;
47-
import java.util.Queue;
4848

4949
/**
5050
* RemoteStore handles all interaction with the backend through a simple, clean interface. This
@@ -129,9 +129,21 @@ public interface RemoteStoreCallback {
129129
@Nullable private WriteStream writeStream;
130130
@Nullable private WatchChangeAggregator watchChangeAggregator;
131131

132-
private int lastBatchSeen;
133-
134-
private final Queue<MutationBatch> pendingWrites;
132+
/**
133+
* A list of up to MAX_PENDING_WRITES writes that we have fetched from the LocalStore via
134+
* fillWritePipeline() and have or will send to the write stream.
135+
*
136+
* <p>Whenever writePipeline.length > 0 the RemoteStore will attempt to start or restart the write
137+
* stream. When the stream is established the writes in the pipeline will be sent in order.
138+
*
139+
* <p>Writes remain in writePipeline until they are acknowledged by the backend and thus will
140+
* automatically be re-sent if the stream is interrupted / restarted before they're acknowledged.
141+
*
142+
* <p>Write responses from the backend are linked to their originating request purely based on
143+
* order, and so we can just poll() writes from the front of the writePipeline as we receive
144+
* responses.
145+
*/
146+
private final Deque<MutationBatch> writePipeline;
135147

136148
public RemoteStore(
137149
RemoteStoreCallback remoteStoreCallback,
@@ -143,8 +155,7 @@ public RemoteStore(
143155
this.datastore = datastore;
144156

145157
listenTargets = new HashMap<>();
146-
lastBatchSeen = MutationBatch.UNKNOWN;
147-
pendingWrites = new ArrayDeque<>();
158+
writePipeline = new ArrayDeque<>();
148159

149160
onlineStateTracker =
150161
new OnlineStateTracker(workerQueue, remoteStoreCallback::handleOnlineStateChange);
@@ -188,7 +199,10 @@ private void disableNetworkInternal() {
188199
writeStream.stop();
189200

190201
cleanUpWatchStreamState();
191-
cleanUpWriteStreamState();
202+
if (!writePipeline.isEmpty()) {
203+
Logger.debug(LOG_TAG, "Stopping write stream with %d pending writes", writePipeline.size());
204+
writePipeline.clear();
205+
}
192206

193207
writeStream = null;
194208
watchStream = null;
@@ -290,7 +304,7 @@ private void sendUnwatchRequest(int targetId) {
290304
* pending writes.
291305
*/
292306
private boolean shouldStartWriteStream() {
293-
return isNetworkEnabled() && !writeStream.isStarted() && !pendingWrites.isEmpty();
307+
return isNetworkEnabled() && !writeStream.isStarted() && !writePipeline.isEmpty();
294308
}
295309

296310
/**
@@ -301,12 +315,6 @@ private boolean shouldStartWatchStream() {
301315
return isNetworkEnabled() && !watchStream.isStarted() && !listenTargets.isEmpty();
302316
}
303317

304-
private void cleanUpWriteStreamState() {
305-
lastBatchSeen = MutationBatch.UNKNOWN;
306-
Logger.debug(LOG_TAG, "Stopping write stream with " + pendingWrites.size() + " pending writes");
307-
pendingWrites.clear();
308-
}
309-
310318
private void cleanUpWatchStreamState() {
311319
// If the connection is closed then we'll never get a snapshot version for the accumulated
312320
// changes and so we'll never be able to complete the batch. When we start up again the server
@@ -488,49 +496,46 @@ private void processTargetError(WatchTargetChange targetChange) {
488496
// Write Stream
489497

490498
/**
491-
* Notifies that there are new mutations to process in the queue. This is typically called by
492-
* SyncEngine after it has sent mutations to LocalStore.
499+
* Attempts to fill our write pipeline with writes from the LocalStore.
493500
*
494-
* <p>In response the remote store will pull mutations from the local store until the datastore
495-
* instance reports that it cannot accept further in-progress writes. This mechanism serves to
496-
* maintain a pipeline of in-flight requests between the Datastore and the server that applies
497-
* them.
501+
* <p>Called internally to bootstrap or refill the write pipeline and by SyncEngine whenever there
502+
* are new mutations to process.
503+
*
504+
* <p>Starts the write stream if necessary.
498505
*/
499506
public void fillWritePipeline() {
500507
if (isNetworkEnabled()) {
501-
while (canWriteMutations()) {
502-
MutationBatch batch = localStore.getNextMutationBatch(lastBatchSeen);
508+
int lastBatchIdRetrieved =
509+
writePipeline.isEmpty() ? MutationBatch.UNKNOWN : writePipeline.getLast().getBatchId();
510+
while (canAddToWritePipeline()) {
511+
MutationBatch batch = localStore.getNextMutationBatch(lastBatchIdRetrieved);
503512
if (batch == null) {
504513
break;
505514
}
506-
commitBatch(batch);
515+
addToWritePipeline(batch);
516+
lastBatchIdRetrieved = batch.getBatchId();
507517
}
508518

509-
if (pendingWrites.isEmpty()) {
510-
writeStream.markIdle();
511-
}
519+
writeStream.markIdle();
512520
}
513521
}
514522

515523
/**
516-
* Returns true if the backend can accept additional write requests.
517-
*
518-
* <p>When sending mutations to the write stream (e.g. in fillWritePipeline()), call this method
519-
* first to check if more mutations can be sent.
520-
*
521-
* <p>Currently the only thing that can prevent the backend from accepting write requests is if
522-
* there are too many requests already outstanding. As writes complete the backend will be able to
523-
* accept more.
524+
* Returns true if we can add to the write pipeline (i.e. it is not full and the network is
525+
* enabled).
524526
*/
525-
private boolean canWriteMutations() {
526-
return isNetworkEnabled() && pendingWrites.size() < MAX_PENDING_WRITES;
527+
private boolean canAddToWritePipeline() {
528+
return isNetworkEnabled() && writePipeline.size() < MAX_PENDING_WRITES;
527529
}
528530

529-
private void commitBatch(MutationBatch mutationBatch) {
530-
hardAssert(canWriteMutations(), "commitBatch called when mutations can't be written");
531-
lastBatchSeen = mutationBatch.getBatchId();
531+
/**
532+
* Queues additional writes to be sent to the write stream, sending them immediately if the write
533+
* stream is established, else starting the write stream if it is not yet started.
534+
*/
535+
private void addToWritePipeline(MutationBatch mutationBatch) {
536+
hardAssert(canAddToWritePipeline(), "addToWritePipeline called when pipeline is full");
532537

533-
pendingWrites.add(mutationBatch);
538+
writePipeline.add(mutationBatch);
534539

535540
if (shouldStartWriteStream()) {
536541
startWriteStream();
@@ -576,19 +581,8 @@ private void handleWriteStreamHandshakeComplete() {
576581
// Record the stream token.
577582
localStore.setLastStreamToken(writeStream.getLastStreamToken());
578583

579-
/*
580-
* Drain pending writes.
581-
*
582-
* Note that at this point pendingWrites contains mutations that have already been accepted by
583-
* fillWritePipeline/commitBatch. If the pipeline is full, canWriteMutations will be false,
584-
* despite the fact that we actually need to send mutations over.
585-
*
586-
* This also means that this method indirectly respects the limits imposed by canWriteMutations
587-
* since writes can't be added to the pendingWrites array when canWriteMutations is false. If
588-
* the limits imposed by canWriteMutations actually protect us from DOSing ourselves then those
589-
* limits won't be exceeded here and we'll continue to make progress.
590-
*/
591-
for (MutationBatch batch : pendingWrites) {
584+
// Send the write pipeline now that stream is established.
585+
for (MutationBatch batch : writePipeline) {
592586
writeStream.writeMutations(batch.getMutations());
593587
}
594588
}
@@ -599,8 +593,8 @@ private void handleWriteStreamHandshakeComplete() {
599593
private void handleWriteStreamMutationResults(
600594
SnapshotVersion commitVersion, List<MutationResult> results) {
601595
// This is a response to a write containing mutations and should be correlated to the first
602-
// pending write.
603-
MutationBatch batch = pendingWrites.poll();
596+
// write in our write pipeline.
597+
MutationBatch batch = writePipeline.poll();
604598

605599
MutationBatchResult mutationBatchResult =
606600
MutationBatchResult.create(batch, commitVersion, results, writeStream.getLastStreamToken());
@@ -617,7 +611,7 @@ private void handleWriteStreamClose(Status status) {
617611

618612
// If the write stream closed due to an error, invoke the error callbacks if there are pending
619613
// writes.
620-
if (!status.isOk() && !pendingWrites.isEmpty()) {
614+
if (!status.isOk() && !writePipeline.isEmpty()) {
621615
// TODO: handle UNAUTHENTICATED status, see go/firestore-client-errors
622616
if (writeStream.isHandshakeComplete()) {
623617
// This error affects the actual writes
@@ -658,7 +652,7 @@ private void handleWriteError(Status status) {
658652
if (Datastore.isPermanentWriteError(status)) {
659653
// If this was a permanent error, the request itself was the problem so it's not going
660654
// to succeed if we resend it.
661-
MutationBatch batch = pendingWrites.poll();
655+
MutationBatch batch = writePipeline.poll();
662656

663657
// In this case it's also unlikely that the server itself is melting down -- this was
664658
// just a bad request, so inhibit backoff on the next restart

firebase-firestore/src/main/java/com/google/firebase/firestore/remote/WatchStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,6 @@ public void onNext(com.google.firestore.v1beta1.ListenResponse listenResponse) {
100100

101101
WatchChange watchChange = serializer.decodeWatchChange(listenResponse);
102102
SnapshotVersion snapshotVersion = serializer.decodeVersionFromListenResponse(listenResponse);
103-
receiveListener.onWatchChange(snapshotVersion, watchChange);
103+
listener.onWatchChange(snapshotVersion, watchChange);
104104
}
105105
}

firebase-firestore/src/main/java/com/google/firebase/firestore/remote/WriteStream.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ public void onNext(WriteResponse response) {
161161
// The first response is the handshake response
162162
handshakeComplete = true;
163163

164-
receiveListener.onHandshakeComplete();
164+
listener.onHandshakeComplete();
165165
} else {
166166
// A successful first write response means the stream is healthy,
167167
// Note, that we could consider a successful handshake healthy, however,
@@ -176,7 +176,7 @@ public void onNext(WriteResponse response) {
176176
com.google.firestore.v1beta1.WriteResult result = response.getWriteResults(i);
177177
results.add(serializer.decodeMutationResult(result, commitVersion));
178178
}
179-
receiveListener.onWriteResponse(commitVersion, results);
179+
listener.onWriteResponse(commitVersion, results);
180180
}
181181
}
182182
}

firebase-firestore/src/test/java/com/google/firebase/firestore/remote/MockDatastore.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ private class MockWatchStream extends WatchStream {
5555
@Override
5656
public void start(WatchStream.Callback callback) {
5757
hardAssert(!open, "Trying to start already started watch stream");
58-
receiveListener = callback;
58+
listener = callback;
5959
streamObserver = new WatchStream.StreamObserver();
6060
open = true;
6161
callback.onOpen();
@@ -105,7 +105,7 @@ public void unwatchTarget(int targetId) {
105105
/** Injects a stream failure as though it had come from the backend. */
106106
void failStream(Status status) {
107107
open = false;
108-
receiveListener.onClose(status);
108+
listener.onClose(status);
109109
}
110110

111111
/** Injects a watch change as though it had come from the backend. */
@@ -129,7 +129,7 @@ void writeWatchChange(WatchChange change, SnapshotVersion snapshotVersion) {
129129
snapshotVersion = SnapshotVersion.NONE;
130130
}
131131
}
132-
receiveListener.onWatchChange(snapshotVersion, change);
132+
listener.onWatchChange(snapshotVersion, change);
133133
}
134134
}
135135

@@ -146,7 +146,7 @@ private class MockWriteStream extends WriteStream {
146146
@Override
147147
public void start(WriteStream.Callback callback) {
148148
hardAssert(!open, "Trying to start already started write stream");
149-
receiveListener = callback;
149+
listener = callback;
150150
streamObserver = new WriteStream.StreamObserver();
151151
handshakeComplete = false;
152152
open = true;
@@ -169,7 +169,7 @@ public void writeHandshake() {
169169
hardAssert(!handshakeComplete, "Handshake already completed");
170170
writeStreamRequestCount += 1;
171171
handshakeComplete = true;
172-
receiveListener.onHandshakeComplete();
172+
listener.onHandshakeComplete();
173173
}
174174

175175
@Override
@@ -180,13 +180,13 @@ public void writeMutations(List<Mutation> mutations) {
180180

181181
/** Injects a write ack as though it had come from the backend in response to a write. */
182182
void ackWrite(SnapshotVersion commitVersion, List<MutationResult> results) {
183-
receiveListener.onWriteResponse(commitVersion, results);
183+
listener.onWriteResponse(commitVersion, results);
184184
}
185185

186186
/** Injects a stream failure as though it had come from the backend. */
187187
void failStream(Status status) {
188188
open = false;
189-
receiveListener.onClose(status);
189+
listener.onClose(status);
190190
}
191191

192192
/** Returns a previous write that had been "sent to the backend". */

0 commit comments

Comments
 (0)