Skip to content

Commit abb3b3d

Browse files
authored
Add Healthy state to streams. (#5622)
* Add Healthy state to streams. * Add changelog. * Create fresh-otters-eat.md * Remove changelog/changeset. * Cancel healthCheck in `close`, not in `sendRequest`. * Perform everything in `dispatchIfNotClosed`. * perform the healthCheck assignment after checking the stream state.
1 parent 192ac86 commit abb3b3d

File tree

3 files changed

+76
-8
lines changed

3 files changed

+76
-8
lines changed

packages/firestore/src/remote/persistent_stream.ts

+54-6
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,13 @@ const enum PersistentStreamState {
9797
*/
9898
Open,
9999

100+
/**
101+
* The stream is healthy and has been connected for more than 10 seconds. We
102+
* therefore assume that the credentials we passed were valid. Both
103+
* isStarted() and isOpen() will return true.
104+
*/
105+
Healthy,
106+
100107
/**
101108
* The stream encountered an error. The next start attempt will back off.
102109
* While in this state isStarted() will return false.
@@ -132,6 +139,9 @@ export interface PersistentStreamListener {
132139
/** The time a stream stays open after it is marked idle. */
133140
const IDLE_TIMEOUT_MS = 60 * 1000;
134141

142+
/** The time a stream stays open until we consider it healthy. */
143+
const HEALTHY_TIMEOUT_MS = 10 * 1000;
144+
135145
/**
136146
* A PersistentStream is an abstract base class that represents a streaming RPC
137147
* to the Firestore backend. It's built on top of the connections own support
@@ -178,6 +188,7 @@ export abstract class PersistentStream<
178188
private closeCount = 0;
179189

180190
private idleTimer: DelayedOperation<void> | null = null;
191+
private healthCheck: DelayedOperation<void> | null = null;
181192
private stream: Stream<SendType, ReceiveType> | null = null;
182193

183194
protected backoff: ExponentialBackoff;
@@ -186,6 +197,7 @@ export abstract class PersistentStream<
186197
private queue: AsyncQueue,
187198
connectionTimerId: TimerId,
188199
private idleTimerId: TimerId,
200+
private healthTimerId: TimerId,
189201
protected connection: Connection,
190202
private credentialsProvider: CredentialsProvider,
191203
protected listener: ListenerType
@@ -203,8 +215,8 @@ export abstract class PersistentStream<
203215
isStarted(): boolean {
204216
return (
205217
this.state === PersistentStreamState.Starting ||
206-
this.state === PersistentStreamState.Open ||
207-
this.state === PersistentStreamState.Backoff
218+
this.state === PersistentStreamState.Backoff ||
219+
this.isOpen()
208220
);
209221
}
210222

@@ -213,7 +225,10 @@ export abstract class PersistentStream<
213225
* called) and the stream is ready for outbound requests.
214226
*/
215227
isOpen(): boolean {
216-
return this.state === PersistentStreamState.Open;
228+
return (
229+
this.state === PersistentStreamState.Open ||
230+
this.state === PersistentStreamState.Healthy
231+
);
217232
}
218233

219234
/**
@@ -311,6 +326,14 @@ export abstract class PersistentStream<
311326
}
312327
}
313328

329+
/** Cancels the health check delayed operation. */
330+
private cancelHealthCheck(): void {
331+
if (this.healthCheck) {
332+
this.healthCheck.cancel();
333+
this.healthCheck = null;
334+
}
335+
}
336+
314337
/**
315338
* Closes the stream and cleans up as necessary:
316339
*
@@ -336,6 +359,7 @@ export abstract class PersistentStream<
336359

337360
// Cancel any outstanding timers (they're guaranteed not to execute).
338361
this.cancelIdleCheck();
362+
this.cancelHealthCheck();
339363
this.backoff.cancel();
340364

341365
// Invalidates any stream-related callbacks (e.g. from auth or the
@@ -352,9 +376,17 @@ export abstract class PersistentStream<
352376
'Using maximum backoff delay to prevent overloading the backend.'
353377
);
354378
this.backoff.resetToMax();
355-
} else if (error && error.code === Code.UNAUTHENTICATED) {
356-
// "unauthenticated" error means the token was rejected. Try force refreshing it in case it
357-
// just expired.
379+
} else if (
380+
error &&
381+
error.code === Code.UNAUTHENTICATED &&
382+
this.state !== PersistentStreamState.Healthy
383+
) {
384+
// "unauthenticated" error means the token was rejected. This should rarely
385+
// happen since both Auth and AppCheck ensure a sufficient TTL when we
386+
// request a token. If a user manually resets their system clock this can
387+
// fail, however. In this case, we should get a Code.UNAUTHENTICATED error
388+
// before we received the first message and we need to invalidate the token
389+
// to ensure that we fetch a new token.
358390
this.credentialsProvider.invalidateToken();
359391
}
360392

@@ -448,6 +480,20 @@ export abstract class PersistentStream<
448480
'Expected stream to be in state Starting, but was ' + this.state
449481
);
450482
this.state = PersistentStreamState.Open;
483+
debugAssert(
484+
this.healthCheck === null,
485+
'Expected healthCheck to be null'
486+
);
487+
this.healthCheck = this.queue.enqueueAfterDelay(
488+
this.healthTimerId,
489+
HEALTHY_TIMEOUT_MS,
490+
() => {
491+
if (this.isOpen()) {
492+
this.state = PersistentStreamState.Healthy;
493+
}
494+
return Promise.resolve();
495+
}
496+
);
451497
return this.listener!.onOpen();
452498
});
453499
});
@@ -559,6 +605,7 @@ export class PersistentListenStream extends PersistentStream<
559605
queue,
560606
TimerId.ListenStreamConnectionBackoff,
561607
TimerId.ListenStreamIdle,
608+
TimerId.HealthCheckTimeout,
562609
connection,
563610
credentials,
564611
listener
@@ -667,6 +714,7 @@ export class PersistentWriteStream extends PersistentStream<
667714
queue,
668715
TimerId.WriteStreamConnectionBackoff,
669716
TimerId.WriteStreamIdle,
717+
TimerId.HealthCheckTimeout,
670718
connection,
671719
credentials,
672720
listener

packages/firestore/src/util/async_queue.ts

+4-2
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,17 @@ export const enum TimerId {
3838
All = 'all',
3939

4040
/**
41-
* The following 4 timers are used in persistent_stream.ts for the listen and
41+
* The following 5 timers are used in persistent_stream.ts for the listen and
4242
* write streams. The "Idle" timer is used to close the stream due to
4343
* inactivity. The "ConnectionBackoff" timer is used to restart a stream once
44-
* the appropriate backoff delay has elapsed.
44+
* the appropriate backoff delay has elapsed. The health check is used to mark
45+
* a stream healthy if it has not received an error during its initial setup.
4546
*/
4647
ListenStreamIdle = 'listen_stream_idle',
4748
ListenStreamConnectionBackoff = 'listen_stream_connection_backoff',
4849
WriteStreamIdle = 'write_stream_idle',
4950
WriteStreamConnectionBackoff = 'write_stream_connection_backoff',
51+
HealthCheckTimeout = 'health_check_timeout',
5052

5153
/**
5254
* A timer used in online_state_tracker.ts to transition from

packages/firestore/test/integration/remote/stream.test.ts

+18
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,24 @@ describe('Write Stream', () => {
274274
});
275275
});
276276

277+
it('token is not invalidated once the stream is healthy', () => {
278+
const credentials = new MockCredentialsProvider();
279+
280+
return withTestWriteStream(async (writeStream, streamListener, queue) => {
281+
await streamListener.awaitCallback('open');
282+
283+
await queue.runAllDelayedOperationsUntil(TimerId.HealthCheckTimeout);
284+
285+
// Simulate callback from GRPC with an unauthenticated error -- this should
286+
// NOT invalidate the token.
287+
await writeStream.handleStreamClose(
288+
new FirestoreError(Code.UNAUTHENTICATED, '')
289+
);
290+
await streamListener.awaitCallback('close');
291+
expect(credentials.observedStates).to.deep.equal(['getToken']);
292+
}, credentials);
293+
});
294+
277295
export async function withTestWriteStream(
278296
fn: (
279297
writeStream: PersistentWriteStream,

0 commit comments

Comments
 (0)