diff --git a/packages/firestore/src/core/sync_engine.ts b/packages/firestore/src/core/sync_engine.ts index caea989346a..438a2cb9d3d 100644 --- a/packages/firestore/src/core/sync_engine.ts +++ b/packages/firestore/src/core/sync_engine.ts @@ -1138,10 +1138,12 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { } for (const targetId of added) { - debugAssert( - !this.queriesByTarget.has(targetId), - 'Trying to add an already active target' - ); + if (this.queriesByTarget.has(targetId)) { + // A target might have been added in a previous attempt + logDebug(LOG_TAG, 'Adding an already active target ' + targetId); + continue; + } + const target = await this.localStore.getTarget(targetId); debugAssert( !!target, diff --git a/packages/firestore/src/local/shared_client_state.ts b/packages/firestore/src/local/shared_client_state.ts index c6dd4d6501b..dba0aa6998b 100644 --- a/packages/firestore/src/local/shared_client_state.ts +++ b/packages/firestore/src/local/shared_client_state.ts @@ -29,9 +29,10 @@ import { Platform } from '../platform/platform'; import { hardAssert, debugAssert } from '../util/assert'; import { AsyncQueue } from '../util/async_queue'; import { Code, FirestoreError } from '../util/error'; -import { forEach } from '../util/obj'; import { logError, logDebug } from '../util/log'; import { SortedSet } from '../util/sorted_set'; +import { SortedMap } from '../util/sorted_map'; +import { primitiveComparator } from '../util/misc'; import { isSafeInteger } from '../util/types'; import { QueryTargetState, @@ -475,12 +476,14 @@ export class WebStorageSharedClientState implements SharedClientState { private readonly storage: Storage; private readonly localClientStorageKey: string; private readonly sequenceNumberKey: string; - private readonly activeClients: { [key: string]: ClientState } = {}; private readonly storageListener = this.handleWebStorageEvent.bind(this); private readonly onlineStateKey: string; private readonly clientStateKeyRe: RegExp; private readonly mutationBatchKeyRe: RegExp; private readonly queryTargetKeyRe: RegExp; + private activeClients = new SortedMap( + primitiveComparator + ); private started = false; private currentUser: User; @@ -519,7 +522,10 @@ export class WebStorageSharedClientState implements SharedClientState { this.sequenceNumberKey = createWebStorageSequenceNumberKey( this.persistenceKey ); - this.activeClients[this.localClientId] = new LocalClientState(); + this.activeClients = this.activeClients.insert( + this.localClientId, + new LocalClientState() + ); this.clientStateKeyRe = new RegExp( `^${CLIENT_STATE_KEY_PREFIX}_${escapedPersistenceKey}_([^_]*)$` @@ -576,7 +582,10 @@ export class WebStorageSharedClientState implements SharedClientState { storageItem ); if (clientState) { - this.activeClients[clientState.clientId] = clientState; + this.activeClients = this.activeClients.insert( + clientState.clientId, + clientState + ); } } } @@ -611,24 +620,17 @@ export class WebStorageSharedClientState implements SharedClientState { } getAllActiveQueryTargets(): TargetIdSet { - let activeTargets = targetIdSet(); - forEach(this.activeClients, (key, value) => { - activeTargets = activeTargets.unionWith(value.activeTargetIds); - }); - return activeTargets; + return this.extractActiveQueryTargets(this.activeClients); } isActiveQueryTarget(targetId: TargetId): boolean { - // This is not using `obj.forEach` since `forEach` doesn't support early - // return. - for (const clientId in this.activeClients) { - if (this.activeClients.hasOwnProperty(clientId)) { - if (this.activeClients[clientId].activeTargetIds.has(targetId)) { - return true; - } + let found = false; + this.activeClients.forEach((key, value) => { + if (value.activeTargetIds.has(targetId)) { + found = true; } - } - return false; + }); + return found; } addPendingMutation(batchId: BatchId): void { @@ -823,7 +825,7 @@ export class WebStorageSharedClientState implements SharedClientState { } private get localClientState(): LocalClientState { - return this.activeClients[this.localClientId] as LocalClientState; + return this.activeClients.get(this.localClientId) as LocalClientState; } private persistClientState(): void { @@ -979,26 +981,23 @@ export class WebStorageSharedClientState implements SharedClientState { clientId: ClientId, clientState: RemoteClientState | null ): Promise { - const existingTargets = this.getAllActiveQueryTargets(); - - if (clientState) { - this.activeClients[clientId] = clientState; - } else { - delete this.activeClients[clientId]; - } + const updatedClients = clientState + ? this.activeClients.insert(clientId, clientState) + : this.activeClients.remove(clientId); - const newTargets = this.getAllActiveQueryTargets(); + const existingTargets = this.extractActiveQueryTargets(this.activeClients); + const newTargets = this.extractActiveQueryTargets(updatedClients); const addedTargets: TargetId[] = []; const removedTargets: TargetId[] = []; - newTargets.forEach(async targetId => { + newTargets.forEach(targetId => { if (!existingTargets.has(targetId)) { addedTargets.push(targetId); } }); - existingTargets.forEach(async targetId => { + existingTargets.forEach(targetId => { if (!newTargets.has(targetId)) { removedTargets.push(targetId); } @@ -1007,7 +1006,9 @@ export class WebStorageSharedClientState implements SharedClientState { return this.syncEngine!.applyActiveTargetsChange( addedTargets, removedTargets - ); + ).then(() => { + this.activeClients = updatedClients; + }); } private handleOnlineStateEvent(onlineState: SharedOnlineState): void { @@ -1016,10 +1017,20 @@ export class WebStorageSharedClientState implements SharedClientState { // IndexedDb. If a client does not update their IndexedDb client state // within 5 seconds, it is considered inactive and we don't emit an online // state event. - if (this.activeClients[onlineState.clientId]) { + if (this.activeClients.get(onlineState.clientId)) { this.onlineStateHandler!(onlineState.onlineState); } } + + private extractActiveQueryTargets( + clients: SortedMap + ): SortedSet { + let activeTargets = targetIdSet(); + clients.forEach((kev, value) => { + activeTargets = activeTargets.unionWith(value.activeTargetIds); + }); + return activeTargets; + } } function fromWebStorageSequenceNumber( diff --git a/packages/firestore/test/unit/specs/recovery_spec.test.ts b/packages/firestore/test/unit/specs/recovery_spec.test.ts index b5dcf024479..5aadd6976bf 100644 --- a/packages/firestore/test/unit/specs/recovery_spec.test.ts +++ b/packages/firestore/test/unit/specs/recovery_spec.test.ts @@ -42,7 +42,7 @@ describeSpec( // Client 1 has received the WebStorage notification that the write // has been acknowledged, but failed to process the change. Hence, // we did not get a user callback. We schedule the first retry and - // make sure that it also does not get processed until + // make sure that it also does not get processed until // `recoverDatabase` is called. .runTimer(TimerId.AsyncQueueRetry) .recoverDatabase() @@ -55,7 +55,7 @@ describeSpec( ); specTest( - 'Query raises events in secondary client (with recovery)', + 'Query raises events in secondary client (with recovery)', ['multi-client'], () => { const query = Query.atPath(path('collection')); @@ -75,5 +75,39 @@ describeSpec( .expectEvents(query, {}); } ); + + specTest( + 'Query is listened to by primary (with recovery)', + ['multi-client'], + () => { + const query = Query.atPath(path('collection')); + + return ( + client(0) + .expectPrimaryState(true) + .failDatabase() + .client(1) + .userListens(query) + .client(0) + // The primary client 0 receives a WebStorage notification about the + // new query, but it cannot load the target from IndexedDB. The + // query will only be listened to once we recover the database. + .recoverDatabase() + .runTimer(TimerId.AsyncQueueRetry) + .expectListen(query) + .failDatabase() + .client(1) + .userUnlistens(query) + .client(0) + // The primary client 0 receives a notification that the query can + // be released, but it can only process the change after we recover + // the database. + .expectActiveTargets({ query }) + .recoverDatabase() + .runTimer(TimerId.AsyncQueueRetry) + .expectActiveTargets() + ); + } + ); } ); diff --git a/packages/firestore/test/unit/specs/spec_builder.ts b/packages/firestore/test/unit/specs/spec_builder.ts index 49056e74658..e8063b15d6e 100644 --- a/packages/firestore/test/unit/specs/spec_builder.ts +++ b/packages/firestore/test/unit/specs/spec_builder.ts @@ -446,7 +446,7 @@ export class SpecBuilder { /** Overrides the currently expected set of active targets. */ expectActiveTargets( - ...targets: Array<{ query: Query; resumeToken: string }> + ...targets: Array<{ query: Query; resumeToken?: string }> ): this { this.assertStep('Active target expectation requires previous step'); const currentStep = this.currentStep!;