From 125b6d87090032770111381cf54d57264168960e Mon Sep 17 00:00:00 2001 From: Ehsan Nasiri Date: Fri, 15 Oct 2021 11:57:58 -0500 Subject: [PATCH 1/7] Add Healthy state to streams. --- .../firestore/src/remote/persistent_stream.ts | 54 ++++++++++++++++--- packages/firestore/src/util/async_queue.ts | 6 ++- .../test/integration/remote/stream.test.ts | 20 +++++++ 3 files changed, 72 insertions(+), 8 deletions(-) diff --git a/packages/firestore/src/remote/persistent_stream.ts b/packages/firestore/src/remote/persistent_stream.ts index 3e69daf19e0..d7100e981ac 100644 --- a/packages/firestore/src/remote/persistent_stream.ts +++ b/packages/firestore/src/remote/persistent_stream.ts @@ -97,6 +97,13 @@ const enum PersistentStreamState { */ Open, + /** + * The stream is healthy and has been connected for more than 10 seconds. We + * therefore assume that the credentials we passed were valid. Both + * isStarted() and isOpen() will return true. + */ + Healthy, + /** * The stream encountered an error. The next start attempt will back off. * While in this state isStarted() will return false. @@ -132,6 +139,9 @@ export interface PersistentStreamListener { /** The time a stream stays open after it is marked idle. */ const IDLE_TIMEOUT_MS = 60 * 1000; +/** The time a stream stays open until we consider it healthy. */ +const HEALTHY_TIMEOUT_MS = 10 * 1000; + /** * A PersistentStream is an abstract base class that represents a streaming RPC * to the Firestore backend. It's built on top of the connections own support @@ -178,6 +188,7 @@ export abstract class PersistentStream< private closeCount = 0; private idleTimer: DelayedOperation | null = null; + private healthCheck: DelayedOperation | null = null; private stream: Stream | null = null; protected backoff: ExponentialBackoff; @@ -186,6 +197,7 @@ export abstract class PersistentStream< private queue: AsyncQueue, connectionTimerId: TimerId, private idleTimerId: TimerId, + private healthTimerId: TimerId, protected connection: Connection, private credentialsProvider: CredentialsProvider, protected listener: ListenerType @@ -203,8 +215,8 @@ export abstract class PersistentStream< isStarted(): boolean { return ( this.state === PersistentStreamState.Starting || - this.state === PersistentStreamState.Open || - this.state === PersistentStreamState.Backoff + this.state === PersistentStreamState.Backoff || + this.isOpen() ); } @@ -213,7 +225,8 @@ export abstract class PersistentStream< * called) and the stream is ready for outbound requests. */ isOpen(): boolean { - return this.state === PersistentStreamState.Open; + return this.state === PersistentStreamState.Open || + this.state === PersistentStreamState.Healthy; } /** @@ -291,6 +304,7 @@ export abstract class PersistentStream< /** Sends a message to the underlying stream. */ protected sendRequest(msg: SendType): void { this.cancelIdleCheck(); + this.cancelHealthCheck(); this.stream!.send(msg); } @@ -311,6 +325,14 @@ export abstract class PersistentStream< } } + /** Cancels the health check delayed operation. */ + private cancelHealthCheck(): void { + if (this.healthCheck) { + this.healthCheck.cancel(); + this.healthCheck = null; + } + } + /** * Closes the stream and cleans up as necessary: * @@ -352,9 +374,14 @@ export abstract class PersistentStream< 'Using maximum backoff delay to prevent overloading the backend.' ); this.backoff.resetToMax(); - } else if (error && error.code === Code.UNAUTHENTICATED) { - // "unauthenticated" error means the token was rejected. Try force refreshing it in case it - // just expired. + } else if (error && error.code === Code.UNAUTHENTICATED && + this.state !== PersistentStreamState.Healthy) { + // "unauthenticated" error means the token was rejected. This should rarely + // happen since both Auth and AppCheck ensure a sufficient TTL when we + // request a token. If a user manually resets their system clock this can + // fail, however. In this case, we should get a Code.UNAUTHENTICATED error + // before we received the first message and we need to invalidate the token + // to ensure that we fetch a new token. this.credentialsProvider.invalidateToken(); } @@ -450,6 +477,19 @@ export abstract class PersistentStream< this.state = PersistentStreamState.Open; return this.listener!.onOpen(); }); + + if (this.healthCheck === null) { + this.healthCheck = this.queue.enqueueAfterDelay( + this.healthTimerId, + HEALTHY_TIMEOUT_MS, + () => { + if (this.isOpen()) { + this.state = PersistentStreamState.Healthy; + } + return Promise.resolve(); + } + ); + } }); this.stream.onClose((error?: FirestoreError) => { dispatchIfNotClosed(() => { @@ -559,6 +599,7 @@ export class PersistentListenStream extends PersistentStream< queue, TimerId.ListenStreamConnectionBackoff, TimerId.ListenStreamIdle, + TimerId.HealthCheckTimeout, connection, credentials, listener @@ -667,6 +708,7 @@ export class PersistentWriteStream extends PersistentStream< queue, TimerId.WriteStreamConnectionBackoff, TimerId.WriteStreamIdle, + TimerId.HealthCheckTimeout, connection, credentials, listener diff --git a/packages/firestore/src/util/async_queue.ts b/packages/firestore/src/util/async_queue.ts index 24f07e77f9b..23c0159745a 100644 --- a/packages/firestore/src/util/async_queue.ts +++ b/packages/firestore/src/util/async_queue.ts @@ -38,15 +38,17 @@ export const enum TimerId { All = 'all', /** - * The following 4 timers are used in persistent_stream.ts for the listen and + * The following 5 timers are used in persistent_stream.ts for the listen and * write streams. The "Idle" timer is used to close the stream due to * inactivity. The "ConnectionBackoff" timer is used to restart a stream once - * the appropriate backoff delay has elapsed. + * the appropriate backoff delay has elapsed. The health check is used to mark + * a stream healthy if it has not received an error during its initial setup. */ ListenStreamIdle = 'listen_stream_idle', ListenStreamConnectionBackoff = 'listen_stream_connection_backoff', WriteStreamIdle = 'write_stream_idle', WriteStreamConnectionBackoff = 'write_stream_connection_backoff', + HealthCheckTimeout = 'health_check_timeout', /** * A timer used in online_state_tracker.ts to transition from diff --git a/packages/firestore/test/integration/remote/stream.test.ts b/packages/firestore/test/integration/remote/stream.test.ts index ae812b63c07..5a6cc53b36a 100644 --- a/packages/firestore/test/integration/remote/stream.test.ts +++ b/packages/firestore/test/integration/remote/stream.test.ts @@ -274,6 +274,26 @@ describe('Write Stream', () => { }); }); +it('token is not invalidated once the stream is healthy', () => { + const credentials = new MockCredentialsProvider(); + + return withTestWriteStream(async (writeStream, streamListener, queue) => { + await streamListener.awaitCallback('open'); + + await queue.runAllDelayedOperationsUntil(TimerId.HealthCheckTimeout); + + // Simulate callback from GRPC with an unauthenticated error -- this should + // NOT invalidate the token. + await writeStream.handleStreamClose( + new FirestoreError(Code.UNAUTHENTICATED, '') + ); + await streamListener.awaitCallback('close'); + expect(credentials.observedStates).to.deep.equal([ + 'getToken' + ]); + }, credentials); +}); + export async function withTestWriteStream( fn: ( writeStream: PersistentWriteStream, From 4a8a7df89a7f45723577d93fcf3a0b75515a75c3 Mon Sep 17 00:00:00 2001 From: Ehsan Nasiri Date: Fri, 15 Oct 2021 12:05:34 -0500 Subject: [PATCH 2/7] Add changelog. --- packages/firestore/CHANGELOG.md | 3 +++ packages/firestore/src/remote/persistent_stream.ts | 13 +++++++++---- .../test/integration/remote/stream.test.ts | 4 +--- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/packages/firestore/CHANGELOG.md b/packages/firestore/CHANGELOG.md index 4d8da6ca40b..c52950edb42 100644 --- a/packages/firestore/CHANGELOG.md +++ b/packages/firestore/CHANGELOG.md @@ -1,5 +1,8 @@ # @firebase/firestore +# Unreleased +- [added] Added Healthy state to streams. + ## 3.1.1 ### Patch Changes diff --git a/packages/firestore/src/remote/persistent_stream.ts b/packages/firestore/src/remote/persistent_stream.ts index d7100e981ac..e0ae2d0553d 100644 --- a/packages/firestore/src/remote/persistent_stream.ts +++ b/packages/firestore/src/remote/persistent_stream.ts @@ -225,8 +225,10 @@ export abstract class PersistentStream< * called) and the stream is ready for outbound requests. */ isOpen(): boolean { - return this.state === PersistentStreamState.Open || - this.state === PersistentStreamState.Healthy; + return ( + this.state === PersistentStreamState.Open || + this.state === PersistentStreamState.Healthy + ); } /** @@ -374,8 +376,11 @@ export abstract class PersistentStream< 'Using maximum backoff delay to prevent overloading the backend.' ); this.backoff.resetToMax(); - } else if (error && error.code === Code.UNAUTHENTICATED && - this.state !== PersistentStreamState.Healthy) { + } else if ( + error && + error.code === Code.UNAUTHENTICATED && + this.state !== PersistentStreamState.Healthy + ) { // "unauthenticated" error means the token was rejected. This should rarely // happen since both Auth and AppCheck ensure a sufficient TTL when we // request a token. If a user manually resets their system clock this can diff --git a/packages/firestore/test/integration/remote/stream.test.ts b/packages/firestore/test/integration/remote/stream.test.ts index 5a6cc53b36a..2793872287b 100644 --- a/packages/firestore/test/integration/remote/stream.test.ts +++ b/packages/firestore/test/integration/remote/stream.test.ts @@ -288,9 +288,7 @@ it('token is not invalidated once the stream is healthy', () => { new FirestoreError(Code.UNAUTHENTICATED, '') ); await streamListener.awaitCallback('close'); - expect(credentials.observedStates).to.deep.equal([ - 'getToken' - ]); + expect(credentials.observedStates).to.deep.equal(['getToken']); }, credentials); }); From 29a07600f271fb8f3e6194ec9c5009baf72e2821 Mon Sep 17 00:00:00 2001 From: Ehsan Date: Fri, 15 Oct 2021 12:07:59 -0500 Subject: [PATCH 3/7] Create fresh-otters-eat.md --- .changeset/fresh-otters-eat.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/fresh-otters-eat.md diff --git a/.changeset/fresh-otters-eat.md b/.changeset/fresh-otters-eat.md new file mode 100644 index 00000000000..c9cf993958d --- /dev/null +++ b/.changeset/fresh-otters-eat.md @@ -0,0 +1,5 @@ +--- +"@firebase/firestore": patch +--- + +Add Healthy state to streams. From 47b1a262da2c241cdcaaad0b7bbcf0f2f036b3d5 Mon Sep 17 00:00:00 2001 From: Ehsan Nasiri Date: Fri, 15 Oct 2021 12:51:39 -0500 Subject: [PATCH 4/7] Remove changelog/changeset. --- .changeset/fresh-otters-eat.md | 5 ----- packages/firestore/CHANGELOG.md | 3 --- 2 files changed, 8 deletions(-) delete mode 100644 .changeset/fresh-otters-eat.md diff --git a/.changeset/fresh-otters-eat.md b/.changeset/fresh-otters-eat.md deleted file mode 100644 index c9cf993958d..00000000000 --- a/.changeset/fresh-otters-eat.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -"@firebase/firestore": patch ---- - -Add Healthy state to streams. diff --git a/packages/firestore/CHANGELOG.md b/packages/firestore/CHANGELOG.md index c52950edb42..4d8da6ca40b 100644 --- a/packages/firestore/CHANGELOG.md +++ b/packages/firestore/CHANGELOG.md @@ -1,8 +1,5 @@ # @firebase/firestore -# Unreleased -- [added] Added Healthy state to streams. - ## 3.1.1 ### Patch Changes From e1ce0056476180fca52efd556cf35964efc6dec8 Mon Sep 17 00:00:00 2001 From: Ehsan Nasiri Date: Fri, 15 Oct 2021 15:15:44 -0500 Subject: [PATCH 5/7] Cancel healthCheck in `close`, not in `sendRequest`. --- packages/firestore/src/remote/persistent_stream.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/firestore/src/remote/persistent_stream.ts b/packages/firestore/src/remote/persistent_stream.ts index e0ae2d0553d..0792754b69b 100644 --- a/packages/firestore/src/remote/persistent_stream.ts +++ b/packages/firestore/src/remote/persistent_stream.ts @@ -306,7 +306,6 @@ export abstract class PersistentStream< /** Sends a message to the underlying stream. */ protected sendRequest(msg: SendType): void { this.cancelIdleCheck(); - this.cancelHealthCheck(); this.stream!.send(msg); } @@ -360,6 +359,7 @@ export abstract class PersistentStream< // Cancel any outstanding timers (they're guaranteed not to execute). this.cancelIdleCheck(); + this.cancelHealthCheck(); this.backoff.cancel(); // Invalidates any stream-related callbacks (e.g. from auth or the From 7989d10d86bf403bc914987ccf28bba96484a856 Mon Sep 17 00:00:00 2001 From: Ehsan Nasiri Date: Fri, 15 Oct 2021 16:39:03 -0500 Subject: [PATCH 6/7] Perform everything in `dispatchIfNotClosed`. --- .../firestore/src/remote/persistent_stream.ts | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/packages/firestore/src/remote/persistent_stream.ts b/packages/firestore/src/remote/persistent_stream.ts index 0792754b69b..3f8df36030f 100644 --- a/packages/firestore/src/remote/persistent_stream.ts +++ b/packages/firestore/src/remote/persistent_stream.ts @@ -475,15 +475,7 @@ export abstract class PersistentStream< this.stream = this.startRpc(token); this.stream.onOpen(() => { dispatchIfNotClosed(() => { - debugAssert( - this.state === PersistentStreamState.Starting, - 'Expected stream to be in state Starting, but was ' + this.state - ); - this.state = PersistentStreamState.Open; - return this.listener!.onOpen(); - }); - - if (this.healthCheck === null) { + debugAssert(this.healthCheck === null, 'Expected healthCheck to be null'); this.healthCheck = this.queue.enqueueAfterDelay( this.healthTimerId, HEALTHY_TIMEOUT_MS, @@ -494,7 +486,13 @@ export abstract class PersistentStream< return Promise.resolve(); } ); - } + debugAssert( + this.state === PersistentStreamState.Starting, + 'Expected stream to be in state Starting, but was ' + this.state + ); + this.state = PersistentStreamState.Open; + return this.listener!.onOpen(); + }); }); this.stream.onClose((error?: FirestoreError) => { dispatchIfNotClosed(() => { From 5461e28330257c1fb6b5aa593fac21a0b1b91a44 Mon Sep 17 00:00:00 2001 From: Ehsan Nasiri Date: Fri, 15 Oct 2021 16:57:57 -0500 Subject: [PATCH 7/7] perform the healthCheck assignment after checking the stream state. --- .../firestore/src/remote/persistent_stream.ts | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/packages/firestore/src/remote/persistent_stream.ts b/packages/firestore/src/remote/persistent_stream.ts index 3f8df36030f..3a684e8c543 100644 --- a/packages/firestore/src/remote/persistent_stream.ts +++ b/packages/firestore/src/remote/persistent_stream.ts @@ -475,7 +475,15 @@ export abstract class PersistentStream< this.stream = this.startRpc(token); this.stream.onOpen(() => { dispatchIfNotClosed(() => { - debugAssert(this.healthCheck === null, 'Expected healthCheck to be null'); + debugAssert( + this.state === PersistentStreamState.Starting, + 'Expected stream to be in state Starting, but was ' + this.state + ); + this.state = PersistentStreamState.Open; + debugAssert( + this.healthCheck === null, + 'Expected healthCheck to be null' + ); this.healthCheck = this.queue.enqueueAfterDelay( this.healthTimerId, HEALTHY_TIMEOUT_MS, @@ -486,11 +494,6 @@ export abstract class PersistentStream< return Promise.resolve(); } ); - debugAssert( - this.state === PersistentStreamState.Starting, - 'Expected stream to be in state Starting, but was ' + this.state - ); - this.state = PersistentStreamState.Open; return this.listener!.onOpen(); }); });