diff --git a/common/api-review/util.api.md b/common/api-review/util.api.md index 75e484edd50..b5dda9eb0d7 100644 --- a/common/api-review/util.api.md +++ b/common/api-review/util.api.md @@ -82,6 +82,11 @@ export function contains(obj: T, key: string): boolean; // @public (undocumented) export function createMockUserToken(token: EmulatorMockTokenOptions, projectId?: string): string; +// Warning: (ae-missing-release-tag) "createObserver" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal) +// +// @public (undocumented) +export function createObserver(nextOrObserver?: NextFn | PartialObserver, error?: ErrorFn, complete?: CompleteFn): Observer; + // Warning: (ae-missing-release-tag) "createSubscribe" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal) // // @public diff --git a/packages/auth/src/core/auth/auth_impl.ts b/packages/auth/src/core/auth/auth_impl.ts index d7308c03fcc..70b31d64737 100644 --- a/packages/auth/src/core/auth/auth_impl.ts +++ b/packages/auth/src/core/auth/auth_impl.ts @@ -39,7 +39,10 @@ import { FirebaseError, getModularInstance, Observer, - Subscribe + Subscribe, + Observable, + PartialObserver, + createObserver } from '@firebase/util'; import { AuthInternal, ConfigInternal } from '../../model/auth'; @@ -84,8 +87,8 @@ export class AuthImpl implements AuthInternal, _FirebaseService { private operations = Promise.resolve(); private persistenceManager?: PersistenceUserManager; private redirectPersistenceManager?: PersistenceUserManager; - private authStateSubscription = new Subscription(this); - private idTokenSubscription = new Subscription(this); + private authStateEmitter = new EmitterStream(); + private idTokenEmitter = new EmitterStream(); private readonly beforeStateQueue = new AuthMiddlewareQueue(this); private redirectUser: UserInternal | null = null; private isProactiveRefreshEnabled = false; @@ -439,12 +442,9 @@ export class AuthImpl implements AuthInternal, _FirebaseService { error?: ErrorFn, completed?: CompleteFn ): Unsubscribe { - return this.registerStateListener( - this.authStateSubscription, - nextOrObserver, - error, - completed - ); + const stream = this.createUserStateStream(this.authStateEmitter); + const observer: Observer = createObserver(nextOrObserver, error, completed); + return stream.subscribe(observer); } beforeAuthStateChanged( @@ -459,12 +459,9 @@ export class AuthImpl implements AuthInternal, _FirebaseService { error?: ErrorFn, completed?: CompleteFn ): Unsubscribe { - return this.registerStateListener( - this.idTokenSubscription, - nextOrObserver, - error, - completed - ); + const stream = this.createUserStateStream(this.idTokenEmitter); + const observer: Observer = createObserver(nextOrObserver, error, completed); + return stream.subscribe(observer); } toJSON(): object { @@ -567,43 +564,35 @@ export class AuthImpl implements AuthInternal, _FirebaseService { return; } - this.idTokenSubscription.next(this.currentUser); + this.idTokenEmitter.emit(this.currentUser); const currentUid = this.currentUser?.uid ?? null; if (this.lastNotifiedUid !== currentUid) { this.lastNotifiedUid = currentUid; - this.authStateSubscription.next(this.currentUser); + this.authStateEmitter.emit(this.currentUser); } } - private registerStateListener( - subscription: Subscription, - nextOrObserver: NextOrObserver, - error?: ErrorFn, - completed?: CompleteFn - ): Unsubscribe { + private createUserStateStream( + emitter: EmitterStream + ): Observable { if (this._deleted) { - return () => {}; + return new NeverStream(); } - const cb = - typeof nextOrObserver === 'function' - ? nextOrObserver - : nextOrObserver.next.bind(nextOrObserver); - const promise = this._isInitialized ? Promise.resolve() : this._initializationPromise; _assert(promise, this, AuthErrorCode.INTERNAL_ERROR); - // The callback needs to be called asynchronously per the spec. - // eslint-disable-next-line @typescript-eslint/no-floating-promises - promise.then(() => cb(this.currentUser)); - if (typeof nextOrObserver === 'function') { - return subscription.addObserver(nextOrObserver, error, completed); - } else { - return subscription.addObserver(nextOrObserver); - } + const user = new PromiseStream( + promise.then(() => this.currentUser) + ); + + return new ConcatStream( + user, + emitter + ); } /** @@ -717,17 +706,101 @@ export function _castAuth(auth: Auth): AuthInternal { return getModularInstance(auth) as AuthInternal; } -/** Helper class to wrap subscriber logic */ -class Subscription { - private observer: Observer | null = null; - readonly addObserver: Subscribe = createSubscribe( - observer => (this.observer = observer) - ); +class NeverStream implements Observable { + constructor() {} + + subscribe( + nextOrObserver?: NextFn | PartialObserver, + error?: ErrorFn, + complete?: CompleteFn + ): Unsubscribe { + return () => {}; + } +} + +class EmitterStream implements Observable { + constructor() { + this.subscribe = createSubscribe( + (o) => { this.observer = o; } + ); + } - constructor(readonly auth: AuthInternal) {} + subscribe: Subscribe; + observer?: Observer; - get next(): NextFn { - _assert(this.observer, this.auth, AuthErrorCode.INTERNAL_ERROR); - return this.observer.next.bind(this.observer); + emit(value: T) { + this.observer!.next(value); } } + +class PromiseStream implements Observable { + constructor(promise: Promise) { + this.promise = promise; + } + + promise: Promise; + + subscribe( + nextOrObserver?: NextFn | PartialObserver, + error?: ErrorFn, + complete?: CompleteFn + ): Unsubscribe { + const observer = createObserver(nextOrObserver, error, complete); + + let isUnsubscribed = false; + + this.promise.then( + (value) => { + if (isUnsubscribed) { return; } + observer.next(value); + observer.complete(); + }, + (error) => { + if (isUnsubscribed) { return; } + observer.error(error); + } + ) + + return () => { + isUnsubscribed = true; + } + } +} + +class ConcatStream implements Observable { + constructor(s0: Observable, s1: Observable) { + this.s0 = s0; + this.s1 = s1; + } + + s0: Observable; + s1: Observable; + + subscribe( + nextOrObserver?: NextFn | PartialObserver, + error?: ErrorFn, + complete?: CompleteFn + ): Unsubscribe { + const observer = createObserver(nextOrObserver, error, complete); + + let unsubscribe0: Unsubscribe; + let unsubscribe1: Unsubscribe | null; + + unsubscribe0 = this.s0.subscribe( + (value: T) => { observer.next(value); }, + (error: Error) => { observer.error(error); }, + () => { + unsubscribe1 = this.s1.subscribe( + (value: T) => { observer.next(value); }, + (error: Error) => { observer.error(error); }, + () => { observer.complete(); } + ); + } + ) + + return () => { + unsubscribe1?.call(undefined); + unsubscribe0(); + } + } +} \ No newline at end of file diff --git a/packages/util/src/subscribe.ts b/packages/util/src/subscribe.ts index 6420c18cca7..ff4d6abb370 100644 --- a/packages/util/src/subscribe.ts +++ b/packages/util/src/subscribe.ts @@ -66,6 +66,51 @@ export function createSubscribe( return proxy.subscribe.bind(proxy); } +export function createObserver( + nextOrObserver?: NextFn | PartialObserver, + error?: ErrorFn, + complete?: CompleteFn +): Observer { + let observer: Observer; + + if ( + nextOrObserver === undefined && + error === undefined && + complete === undefined + ) { + throw new Error('Missing Observer.'); + } + + // Assemble an Observer object when passed as callback functions. + if ( + implementsAnyMethods(nextOrObserver as { [key: string]: unknown }, [ + 'next', + 'error', + 'complete' + ]) + ) { + observer = nextOrObserver as Observer; + } else { + observer = { + next: nextOrObserver as NextFn, + error, + complete + } as Observer; + } + + if (observer.next === undefined) { + observer.next = noop as NextFn; + } + if (observer.error === undefined) { + observer.error = noop as ErrorFn; + } + if (observer.complete === undefined) { + observer.complete = noop as CompleteFn; + } + + return observer; +} + /** * Implement fan-out for any number of Observers attached via a subscribe * function. @@ -130,42 +175,9 @@ class ObserverProxy implements Observer { error?: ErrorFn, complete?: CompleteFn ): Unsubscribe { - let observer: Observer; - - if ( - nextOrObserver === undefined && - error === undefined && - complete === undefined - ) { - throw new Error('Missing Observer.'); - } - - // Assemble an Observer object when passed as callback functions. - if ( - implementsAnyMethods(nextOrObserver as { [key: string]: unknown }, [ - 'next', - 'error', - 'complete' - ]) - ) { - observer = nextOrObserver as Observer; - } else { - observer = { - next: nextOrObserver as NextFn, - error, - complete - } as Observer; - } - - if (observer.next === undefined) { - observer.next = noop as NextFn; - } - if (observer.error === undefined) { - observer.error = noop as ErrorFn; - } - if (observer.complete === undefined) { - observer.complete = noop as CompleteFn; - } + const observer = createObserver( + nextOrObserver, error, complete + ); const unsub = this.unsubscribeOne.bind(this, this.observers!.length);