Skip to content

Add Healthy state to streams. #5622

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 16, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 51 additions & 6 deletions packages/firestore/src/remote/persistent_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ const enum PersistentStreamState {
*/
Open,

/**
* The stream is healthy and has been connected for more than 10 seconds. We
* therefore assume that the credentials we passed were valid. Both
* isStarted() and isOpen() will return true.
*/
Healthy,

/**
* The stream encountered an error. The next start attempt will back off.
* While in this state isStarted() will return false.
Expand Down Expand Up @@ -132,6 +139,9 @@ export interface PersistentStreamListener {
/** The time a stream stays open after it is marked idle. */
const IDLE_TIMEOUT_MS = 60 * 1000;

/** The time a stream stays open until we consider it healthy. */
const HEALTHY_TIMEOUT_MS = 10 * 1000;

/**
* A PersistentStream is an abstract base class that represents a streaming RPC
* to the Firestore backend. It's built on top of the connections own support
Expand Down Expand Up @@ -178,6 +188,7 @@ export abstract class PersistentStream<
private closeCount = 0;

private idleTimer: DelayedOperation<void> | null = null;
private healthCheck: DelayedOperation<void> | null = null;
private stream: Stream<SendType, ReceiveType> | null = null;

protected backoff: ExponentialBackoff;
Expand All @@ -186,6 +197,7 @@ export abstract class PersistentStream<
private queue: AsyncQueue,
connectionTimerId: TimerId,
private idleTimerId: TimerId,
private healthTimerId: TimerId,
protected connection: Connection,
private credentialsProvider: CredentialsProvider,
protected listener: ListenerType
Expand All @@ -203,8 +215,8 @@ export abstract class PersistentStream<
isStarted(): boolean {
return (
this.state === PersistentStreamState.Starting ||
this.state === PersistentStreamState.Open ||
this.state === PersistentStreamState.Backoff
this.state === PersistentStreamState.Backoff ||
this.isOpen()
);
}

Expand All @@ -213,7 +225,10 @@ export abstract class PersistentStream<
* called) and the stream is ready for outbound requests.
*/
isOpen(): boolean {
return this.state === PersistentStreamState.Open;
return (
this.state === PersistentStreamState.Open ||
this.state === PersistentStreamState.Healthy
);
}

/**
Expand Down Expand Up @@ -311,6 +326,14 @@ export abstract class PersistentStream<
}
}

/** Cancels the health check delayed operation. */
private cancelHealthCheck(): void {
if (this.healthCheck) {
this.healthCheck.cancel();
this.healthCheck = null;
}
}

/**
* Closes the stream and cleans up as necessary:
*
Expand All @@ -336,6 +359,7 @@ export abstract class PersistentStream<

// Cancel any outstanding timers (they're guaranteed not to execute).
this.cancelIdleCheck();
this.cancelHealthCheck();
this.backoff.cancel();

// Invalidates any stream-related callbacks (e.g. from auth or the
Expand All @@ -352,9 +376,17 @@ export abstract class PersistentStream<
'Using maximum backoff delay to prevent overloading the backend.'
);
this.backoff.resetToMax();
} else if (error && error.code === Code.UNAUTHENTICATED) {
// "unauthenticated" error means the token was rejected. Try force refreshing it in case it
// just expired.
} else if (
error &&
error.code === Code.UNAUTHENTICATED &&
this.state !== PersistentStreamState.Healthy
) {
// "unauthenticated" error means the token was rejected. This should rarely
// happen since both Auth and AppCheck ensure a sufficient TTL when we
// request a token. If a user manually resets their system clock this can
// fail, however. In this case, we should get a Code.UNAUTHENTICATED error
// before we received the first message and we need to invalidate the token
// to ensure that we fetch a new token.
this.credentialsProvider.invalidateToken();
}

Expand Down Expand Up @@ -443,6 +475,17 @@ export abstract class PersistentStream<
this.stream = this.startRpc(token);
this.stream.onOpen(() => {
dispatchIfNotClosed(() => {
debugAssert(this.healthCheck === null, 'Expected healthCheck to be null');
this.healthCheck = this.queue.enqueueAfterDelay(
this.healthTimerId,
HEALTHY_TIMEOUT_MS,
() => {
if (this.isOpen()) {
this.state = PersistentStreamState.Healthy;
}
return Promise.resolve();
}
);
debugAssert(
this.state === PersistentStreamState.Starting,
'Expected stream to be in state Starting, but was ' + this.state
Expand Down Expand Up @@ -559,6 +602,7 @@ export class PersistentListenStream extends PersistentStream<
queue,
TimerId.ListenStreamConnectionBackoff,
TimerId.ListenStreamIdle,
TimerId.HealthCheckTimeout,
connection,
credentials,
listener
Expand Down Expand Up @@ -667,6 +711,7 @@ export class PersistentWriteStream extends PersistentStream<
queue,
TimerId.WriteStreamConnectionBackoff,
TimerId.WriteStreamIdle,
TimerId.HealthCheckTimeout,
connection,
credentials,
listener
Expand Down
6 changes: 4 additions & 2 deletions packages/firestore/src/util/async_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,17 @@ export const enum TimerId {
All = 'all',

/**
* The following 4 timers are used in persistent_stream.ts for the listen and
* The following 5 timers are used in persistent_stream.ts for the listen and
* write streams. The "Idle" timer is used to close the stream due to
* inactivity. The "ConnectionBackoff" timer is used to restart a stream once
* the appropriate backoff delay has elapsed.
* the appropriate backoff delay has elapsed. The health check is used to mark
* a stream healthy if it has not received an error during its initial setup.
*/
ListenStreamIdle = 'listen_stream_idle',
ListenStreamConnectionBackoff = 'listen_stream_connection_backoff',
WriteStreamIdle = 'write_stream_idle',
WriteStreamConnectionBackoff = 'write_stream_connection_backoff',
HealthCheckTimeout = 'health_check_timeout',

/**
* A timer used in online_state_tracker.ts to transition from
Expand Down
18 changes: 18 additions & 0 deletions packages/firestore/test/integration/remote/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,24 @@ describe('Write Stream', () => {
});
});

it('token is not invalidated once the stream is healthy', () => {
const credentials = new MockCredentialsProvider();

return withTestWriteStream(async (writeStream, streamListener, queue) => {
await streamListener.awaitCallback('open');

await queue.runAllDelayedOperationsUntil(TimerId.HealthCheckTimeout);

// Simulate callback from GRPC with an unauthenticated error -- this should
// NOT invalidate the token.
await writeStream.handleStreamClose(
new FirestoreError(Code.UNAUTHENTICATED, '')
);
await streamListener.awaitCallback('close');
expect(credentials.observedStates).to.deep.equal(['getToken']);
}, credentials);
});

export async function withTestWriteStream(
fn: (
writeStream: PersistentWriteStream,
Expand Down