Skip to content

Commit 46da093

Browse files
authored
Refactor stream (#8376)
* Refactor stream * Pretty
1 parent 14b7720 commit 46da093

File tree

1 file changed

+57
-33
lines changed

1 file changed

+57
-33
lines changed

packages/firestore/src/remote/persistent_stream.ts

Lines changed: 57 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,11 @@ export abstract class PersistentStream<
212212
this.backoff = new ExponentialBackoff(queue, connectionTimerId);
213213
}
214214

215+
/**
216+
* Count of response messages received.
217+
*/
218+
protected responseCount: number = 0;
219+
215220
/**
216221
* Returns true if start() has been called and no error has occurred. True
217222
* indicates the stream is open or in the process of opening (which
@@ -246,6 +251,7 @@ export abstract class PersistentStream<
246251
* When start returns, isStarted() will return true.
247252
*/
248253
start(): void {
254+
this.responseCount = 0;
249255
if (this.state === PersistentStreamState.Error) {
250256
this.performBackoff();
251257
return;
@@ -429,11 +435,18 @@ export abstract class PersistentStream<
429435
): Stream<SendType, ReceiveType>;
430436

431437
/**
432-
* Called after the stream has received a message. The function will be
433-
* called on the right queue and must return a Promise.
438+
* Called when the stream receives first message.
439+
* The function will be called on the right queue and must return a Promise.
440+
* @param message - The message received from the stream.
441+
*/
442+
protected abstract onFirst(message: ReceiveType): Promise<void>;
443+
444+
/**
445+
* Called on subsequent messages after the stream has received first message.
446+
* The function will be called on the right queue and must return a Promise.
434447
* @param message - The message received from the stream.
435448
*/
436-
protected abstract onMessage(message: ReceiveType): Promise<void>;
449+
protected abstract onNext(message: ReceiveType): Promise<void>;
437450

438451
private auth(): void {
439452
debugAssert(
@@ -522,7 +535,11 @@ export abstract class PersistentStream<
522535
});
523536
this.stream.onMessage((msg: ReceiveType) => {
524537
dispatchIfNotClosed(() => {
525-
return this.onMessage(msg);
538+
if (++this.responseCount === 1) {
539+
return this.onFirst(msg);
540+
} else {
541+
return this.onNext(msg);
542+
}
526543
});
527544
});
528545
}
@@ -643,7 +660,11 @@ export class PersistentListenStream extends PersistentStream<
643660
);
644661
}
645662

646-
protected onMessage(watchChangeProto: ProtoListenResponse): Promise<void> {
663+
protected onFirst(watchChangeProto: ProtoListenResponse): Promise<void> {
664+
return this.onNext(watchChangeProto);
665+
}
666+
667+
protected onNext(watchChangeProto: ProtoListenResponse): Promise<void> {
647668
// A successful response means the stream is healthy
648669
this.backoff.reset();
649670

@@ -723,8 +744,6 @@ export class PersistentWriteStream extends PersistentStream<
723744
ProtoWriteResponse,
724745
WriteStreamListener
725746
> {
726-
private handshakeComplete_ = false;
727-
728747
constructor(
729748
queue: AsyncQueue,
730749
connection: Connection,
@@ -760,18 +779,17 @@ export class PersistentWriteStream extends PersistentStream<
760779
* the stream is ready to accept mutations.
761780
*/
762781
get handshakeComplete(): boolean {
763-
return this.handshakeComplete_;
782+
return this.responseCount > 0;
764783
}
765784

766785
// Override of PersistentStream.start
767786
start(): void {
768-
this.handshakeComplete_ = false;
769787
this.lastStreamToken = undefined;
770788
super.start();
771789
}
772790

773791
protected tearDown(): void {
774-
if (this.handshakeComplete_) {
792+
if (this.handshakeComplete) {
775793
this.writeMutations([]);
776794
}
777795
}
@@ -787,35 +805,41 @@ export class PersistentWriteStream extends PersistentStream<
787805
);
788806
}
789807

790-
protected onMessage(responseProto: ProtoWriteResponse): Promise<void> {
808+
protected onFirst(responseProto: ProtoWriteResponse): Promise<void> {
809+
// Always capture the last stream token.
810+
hardAssert(
811+
!!responseProto.streamToken,
812+
'Got a write handshake response without a stream token'
813+
);
814+
this.lastStreamToken = responseProto.streamToken;
815+
816+
// The first response is always the handshake response
817+
hardAssert(
818+
!responseProto.writeResults || responseProto.writeResults.length === 0,
819+
'Got mutation results for handshake'
820+
);
821+
return this.listener!.onHandshakeComplete();
822+
}
823+
824+
protected onNext(responseProto: ProtoWriteResponse): Promise<void> {
791825
// Always capture the last stream token.
792826
hardAssert(
793827
!!responseProto.streamToken,
794828
'Got a write response without a stream token'
795829
);
796830
this.lastStreamToken = responseProto.streamToken;
797831

798-
if (!this.handshakeComplete_) {
799-
// The first response is always the handshake response
800-
hardAssert(
801-
!responseProto.writeResults || responseProto.writeResults.length === 0,
802-
'Got mutation results for handshake'
803-
);
804-
this.handshakeComplete_ = true;
805-
return this.listener!.onHandshakeComplete();
806-
} else {
807-
// A successful first write response means the stream is healthy,
808-
// Note, that we could consider a successful handshake healthy, however,
809-
// the write itself might be causing an error we want to back off from.
810-
this.backoff.reset();
832+
// A successful first write response means the stream is healthy,
833+
// Note, that we could consider a successful handshake healthy, however,
834+
// the write itself might be causing an error we want to back off from.
835+
this.backoff.reset();
811836

812-
const results = fromWriteResults(
813-
responseProto.writeResults,
814-
responseProto.commitTime
815-
);
816-
const commitVersion = fromVersion(responseProto.commitTime!);
817-
return this.listener!.onMutationResult(commitVersion, results);
818-
}
837+
const results = fromWriteResults(
838+
responseProto.writeResults,
839+
responseProto.commitTime
840+
);
841+
const commitVersion = fromVersion(responseProto.commitTime!);
842+
return this.listener!.onMutationResult(commitVersion, results);
819843
}
820844

821845
/**
@@ -825,7 +849,7 @@ export class PersistentWriteStream extends PersistentStream<
825849
*/
826850
writeHandshake(): void {
827851
debugAssert(this.isOpen(), 'Writing handshake requires an opened stream');
828-
debugAssert(!this.handshakeComplete_, 'Handshake already completed');
852+
debugAssert(!this.handshakeComplete, 'Handshake already completed');
829853
debugAssert(
830854
!this.lastStreamToken,
831855
'Stream token should be empty during handshake'
@@ -841,7 +865,7 @@ export class PersistentWriteStream extends PersistentStream<
841865
writeMutations(mutations: Mutation[]): void {
842866
debugAssert(this.isOpen(), 'Writing mutations requires an opened stream');
843867
debugAssert(
844-
this.handshakeComplete_,
868+
this.handshakeComplete,
845869
'Handshake must be complete before writing mutations'
846870
);
847871
debugAssert(

0 commit comments

Comments
 (0)