Skip to content

Commit 125b6d8

Browse files
committed
Add Healthy state to streams.
1 parent ba40cde commit 125b6d8

File tree

3 files changed

+72
-8
lines changed

3 files changed

+72
-8
lines changed

packages/firestore/src/remote/persistent_stream.ts

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,13 @@ const enum PersistentStreamState {
9797
*/
9898
Open,
9999

100+
/**
101+
* The stream is healthy and has been connected for more than 10 seconds. We
102+
* therefore assume that the credentials we passed were valid. Both
103+
* isStarted() and isOpen() will return true.
104+
*/
105+
Healthy,
106+
100107
/**
101108
* The stream encountered an error. The next start attempt will back off.
102109
* While in this state isStarted() will return false.
@@ -132,6 +139,9 @@ export interface PersistentStreamListener {
132139
/** The time a stream stays open after it is marked idle. */
133140
const IDLE_TIMEOUT_MS = 60 * 1000;
134141

142+
/** The time a stream stays open until we consider it healthy. */
143+
const HEALTHY_TIMEOUT_MS = 10 * 1000;
144+
135145
/**
136146
* A PersistentStream is an abstract base class that represents a streaming RPC
137147
* to the Firestore backend. It's built on top of the connections own support
@@ -178,6 +188,7 @@ export abstract class PersistentStream<
178188
private closeCount = 0;
179189

180190
private idleTimer: DelayedOperation<void> | null = null;
191+
private healthCheck: DelayedOperation<void> | null = null;
181192
private stream: Stream<SendType, ReceiveType> | null = null;
182193

183194
protected backoff: ExponentialBackoff;
@@ -186,6 +197,7 @@ export abstract class PersistentStream<
186197
private queue: AsyncQueue,
187198
connectionTimerId: TimerId,
188199
private idleTimerId: TimerId,
200+
private healthTimerId: TimerId,
189201
protected connection: Connection,
190202
private credentialsProvider: CredentialsProvider,
191203
protected listener: ListenerType
@@ -203,8 +215,8 @@ export abstract class PersistentStream<
203215
isStarted(): boolean {
204216
return (
205217
this.state === PersistentStreamState.Starting ||
206-
this.state === PersistentStreamState.Open ||
207-
this.state === PersistentStreamState.Backoff
218+
this.state === PersistentStreamState.Backoff ||
219+
this.isOpen()
208220
);
209221
}
210222

@@ -213,7 +225,8 @@ export abstract class PersistentStream<
213225
* called) and the stream is ready for outbound requests.
214226
*/
215227
isOpen(): boolean {
216-
return this.state === PersistentStreamState.Open;
228+
return this.state === PersistentStreamState.Open ||
229+
this.state === PersistentStreamState.Healthy;
217230
}
218231

219232
/**
@@ -291,6 +304,7 @@ export abstract class PersistentStream<
291304
/** Sends a message to the underlying stream. */
292305
protected sendRequest(msg: SendType): void {
293306
this.cancelIdleCheck();
307+
this.cancelHealthCheck();
294308
this.stream!.send(msg);
295309
}
296310

@@ -311,6 +325,14 @@ export abstract class PersistentStream<
311325
}
312326
}
313327

328+
/** Cancels the health check delayed operation. */
329+
private cancelHealthCheck(): void {
330+
if (this.healthCheck) {
331+
this.healthCheck.cancel();
332+
this.healthCheck = null;
333+
}
334+
}
335+
314336
/**
315337
* Closes the stream and cleans up as necessary:
316338
*
@@ -352,9 +374,14 @@ export abstract class PersistentStream<
352374
'Using maximum backoff delay to prevent overloading the backend.'
353375
);
354376
this.backoff.resetToMax();
355-
} else if (error && error.code === Code.UNAUTHENTICATED) {
356-
// "unauthenticated" error means the token was rejected. Try force refreshing it in case it
357-
// just expired.
377+
} else if (error && error.code === Code.UNAUTHENTICATED &&
378+
this.state !== PersistentStreamState.Healthy) {
379+
// "unauthenticated" error means the token was rejected. This should rarely
380+
// happen since both Auth and AppCheck ensure a sufficient TTL when we
381+
// request a token. If a user manually resets their system clock this can
382+
// fail, however. In this case, we should get a Code.UNAUTHENTICATED error
383+
// before we received the first message and we need to invalidate the token
384+
// to ensure that we fetch a new token.
358385
this.credentialsProvider.invalidateToken();
359386
}
360387

@@ -450,6 +477,19 @@ export abstract class PersistentStream<
450477
this.state = PersistentStreamState.Open;
451478
return this.listener!.onOpen();
452479
});
480+
481+
if (this.healthCheck === null) {
482+
this.healthCheck = this.queue.enqueueAfterDelay(
483+
this.healthTimerId,
484+
HEALTHY_TIMEOUT_MS,
485+
() => {
486+
if (this.isOpen()) {
487+
this.state = PersistentStreamState.Healthy;
488+
}
489+
return Promise.resolve();
490+
}
491+
);
492+
}
453493
});
454494
this.stream.onClose((error?: FirestoreError) => {
455495
dispatchIfNotClosed(() => {
@@ -559,6 +599,7 @@ export class PersistentListenStream extends PersistentStream<
559599
queue,
560600
TimerId.ListenStreamConnectionBackoff,
561601
TimerId.ListenStreamIdle,
602+
TimerId.HealthCheckTimeout,
562603
connection,
563604
credentials,
564605
listener
@@ -667,6 +708,7 @@ export class PersistentWriteStream extends PersistentStream<
667708
queue,
668709
TimerId.WriteStreamConnectionBackoff,
669710
TimerId.WriteStreamIdle,
711+
TimerId.HealthCheckTimeout,
670712
connection,
671713
credentials,
672714
listener

packages/firestore/src/util/async_queue.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,17 @@ export const enum TimerId {
3838
All = 'all',
3939

4040
/**
41-
* The following 4 timers are used in persistent_stream.ts for the listen and
41+
* The following 5 timers are used in persistent_stream.ts for the listen and
4242
* write streams. The "Idle" timer is used to close the stream due to
4343
* inactivity. The "ConnectionBackoff" timer is used to restart a stream once
44-
* the appropriate backoff delay has elapsed.
44+
* the appropriate backoff delay has elapsed. The health check is used to mark
45+
* a stream healthy if it has not received an error during its initial setup.
4546
*/
4647
ListenStreamIdle = 'listen_stream_idle',
4748
ListenStreamConnectionBackoff = 'listen_stream_connection_backoff',
4849
WriteStreamIdle = 'write_stream_idle',
4950
WriteStreamConnectionBackoff = 'write_stream_connection_backoff',
51+
HealthCheckTimeout = 'health_check_timeout',
5052

5153
/**
5254
* A timer used in online_state_tracker.ts to transition from

packages/firestore/test/integration/remote/stream.test.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,26 @@ describe('Write Stream', () => {
274274
});
275275
});
276276

277+
it('token is not invalidated once the stream is healthy', () => {
278+
const credentials = new MockCredentialsProvider();
279+
280+
return withTestWriteStream(async (writeStream, streamListener, queue) => {
281+
await streamListener.awaitCallback('open');
282+
283+
await queue.runAllDelayedOperationsUntil(TimerId.HealthCheckTimeout);
284+
285+
// Simulate callback from GRPC with an unauthenticated error -- this should
286+
// NOT invalidate the token.
287+
await writeStream.handleStreamClose(
288+
new FirestoreError(Code.UNAUTHENTICATED, '')
289+
);
290+
await streamListener.awaitCallback('close');
291+
expect(credentials.observedStates).to.deep.equal([
292+
'getToken'
293+
]);
294+
}, credentials);
295+
});
296+
277297
export async function withTestWriteStream(
278298
fn: (
279299
writeStream: PersistentWriteStream,

0 commit comments

Comments
 (0)