Skip to content

Commit 6cfb268

Browse files
Make 'handleClientStateEvent()/handleQueryTargetEvent()' idempotent (#2916)
1 parent ed9a7be commit 6cfb268

File tree

4 files changed

+84
-37
lines changed

4 files changed

+84
-37
lines changed

packages/firestore/src/core/sync_engine.ts

+6-4
Original file line numberDiff line numberDiff line change
@@ -1136,10 +1136,12 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
11361136
}
11371137

11381138
for (const targetId of added) {
1139-
debugAssert(
1140-
!this.queriesByTarget.has(targetId),
1141-
'Trying to add an already active target'
1142-
);
1139+
if (this.queriesByTarget.has(targetId)) {
1140+
// A target might have been added in a previous attempt
1141+
logDebug(LOG_TAG, 'Adding an already active target ' + targetId);
1142+
continue;
1143+
}
1144+
11431145
const target = await this.localStore.getTarget(targetId);
11441146
debugAssert(
11451147
!!target,

packages/firestore/src/local/shared_client_state.ts

+42-31
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,10 @@ import { Platform } from '../platform/platform';
2929
import { hardAssert, debugAssert } from '../util/assert';
3030
import { AsyncQueue } from '../util/async_queue';
3131
import { Code, FirestoreError } from '../util/error';
32-
import { forEach } from '../util/obj';
3332
import { logError, logDebug } from '../util/log';
3433
import { SortedSet } from '../util/sorted_set';
34+
import { SortedMap } from '../util/sorted_map';
35+
import { primitiveComparator } from '../util/misc';
3536
import { isSafeInteger } from '../util/types';
3637
import {
3738
QueryTargetState,
@@ -475,12 +476,14 @@ export class WebStorageSharedClientState implements SharedClientState {
475476
private readonly storage: Storage;
476477
private readonly localClientStorageKey: string;
477478
private readonly sequenceNumberKey: string;
478-
private readonly activeClients: { [key: string]: ClientState } = {};
479479
private readonly storageListener = this.handleWebStorageEvent.bind(this);
480480
private readonly onlineStateKey: string;
481481
private readonly clientStateKeyRe: RegExp;
482482
private readonly mutationBatchKeyRe: RegExp;
483483
private readonly queryTargetKeyRe: RegExp;
484+
private activeClients = new SortedMap<string, ClientState>(
485+
primitiveComparator
486+
);
484487
private started = false;
485488
private currentUser: User;
486489

@@ -519,7 +522,10 @@ export class WebStorageSharedClientState implements SharedClientState {
519522
this.sequenceNumberKey = createWebStorageSequenceNumberKey(
520523
this.persistenceKey
521524
);
522-
this.activeClients[this.localClientId] = new LocalClientState();
525+
this.activeClients = this.activeClients.insert(
526+
this.localClientId,
527+
new LocalClientState()
528+
);
523529

524530
this.clientStateKeyRe = new RegExp(
525531
`^${CLIENT_STATE_KEY_PREFIX}_${escapedPersistenceKey}_([^_]*)$`
@@ -576,7 +582,10 @@ export class WebStorageSharedClientState implements SharedClientState {
576582
storageItem
577583
);
578584
if (clientState) {
579-
this.activeClients[clientState.clientId] = clientState;
585+
this.activeClients = this.activeClients.insert(
586+
clientState.clientId,
587+
clientState
588+
);
580589
}
581590
}
582591
}
@@ -611,24 +620,17 @@ export class WebStorageSharedClientState implements SharedClientState {
611620
}
612621

613622
getAllActiveQueryTargets(): TargetIdSet {
614-
let activeTargets = targetIdSet();
615-
forEach(this.activeClients, (key, value) => {
616-
activeTargets = activeTargets.unionWith(value.activeTargetIds);
617-
});
618-
return activeTargets;
623+
return this.extractActiveQueryTargets(this.activeClients);
619624
}
620625

621626
isActiveQueryTarget(targetId: TargetId): boolean {
622-
// This is not using `obj.forEach` since `forEach` doesn't support early
623-
// return.
624-
for (const clientId in this.activeClients) {
625-
if (this.activeClients.hasOwnProperty(clientId)) {
626-
if (this.activeClients[clientId].activeTargetIds.has(targetId)) {
627-
return true;
628-
}
627+
let found = false;
628+
this.activeClients.forEach((key, value) => {
629+
if (value.activeTargetIds.has(targetId)) {
630+
found = true;
629631
}
630-
}
631-
return false;
632+
});
633+
return found;
632634
}
633635

634636
addPendingMutation(batchId: BatchId): void {
@@ -823,7 +825,7 @@ export class WebStorageSharedClientState implements SharedClientState {
823825
}
824826

825827
private get localClientState(): LocalClientState {
826-
return this.activeClients[this.localClientId] as LocalClientState;
828+
return this.activeClients.get(this.localClientId) as LocalClientState;
827829
}
828830

829831
private persistClientState(): void {
@@ -979,26 +981,23 @@ export class WebStorageSharedClientState implements SharedClientState {
979981
clientId: ClientId,
980982
clientState: RemoteClientState | null
981983
): Promise<void> {
982-
const existingTargets = this.getAllActiveQueryTargets();
983-
984-
if (clientState) {
985-
this.activeClients[clientId] = clientState;
986-
} else {
987-
delete this.activeClients[clientId];
988-
}
984+
const updatedClients = clientState
985+
? this.activeClients.insert(clientId, clientState)
986+
: this.activeClients.remove(clientId);
989987

990-
const newTargets = this.getAllActiveQueryTargets();
988+
const existingTargets = this.extractActiveQueryTargets(this.activeClients);
989+
const newTargets = this.extractActiveQueryTargets(updatedClients);
991990

992991
const addedTargets: TargetId[] = [];
993992
const removedTargets: TargetId[] = [];
994993

995-
newTargets.forEach(async targetId => {
994+
newTargets.forEach(targetId => {
996995
if (!existingTargets.has(targetId)) {
997996
addedTargets.push(targetId);
998997
}
999998
});
1000999

1001-
existingTargets.forEach(async targetId => {
1000+
existingTargets.forEach(targetId => {
10021001
if (!newTargets.has(targetId)) {
10031002
removedTargets.push(targetId);
10041003
}
@@ -1007,7 +1006,9 @@ export class WebStorageSharedClientState implements SharedClientState {
10071006
return this.syncEngine!.applyActiveTargetsChange(
10081007
addedTargets,
10091008
removedTargets
1010-
);
1009+
).then(() => {
1010+
this.activeClients = updatedClients;
1011+
});
10111012
}
10121013

10131014
private handleOnlineStateEvent(onlineState: SharedOnlineState): void {
@@ -1016,10 +1017,20 @@ export class WebStorageSharedClientState implements SharedClientState {
10161017
// IndexedDb. If a client does not update their IndexedDb client state
10171018
// within 5 seconds, it is considered inactive and we don't emit an online
10181019
// state event.
1019-
if (this.activeClients[onlineState.clientId]) {
1020+
if (this.activeClients.get(onlineState.clientId)) {
10201021
this.onlineStateHandler!(onlineState.onlineState);
10211022
}
10221023
}
1024+
1025+
private extractActiveQueryTargets(
1026+
clients: SortedMap<string, ClientState>
1027+
): SortedSet<TargetId> {
1028+
let activeTargets = targetIdSet();
1029+
clients.forEach((kev, value) => {
1030+
activeTargets = activeTargets.unionWith(value.activeTargetIds);
1031+
});
1032+
return activeTargets;
1033+
}
10231034
}
10241035

10251036
function fromWebStorageSequenceNumber(

packages/firestore/test/unit/specs/recovery_spec.test.ts

+35-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ describeSpec(
5555
);
5656

5757
specTest(
58-
'Query raises events in secondary client (with recovery)',
58+
'Query raises events in secondary client (with recovery)',
5959
['multi-client'],
6060
() => {
6161
const query = Query.atPath(path('collection'));
@@ -75,5 +75,39 @@ describeSpec(
7575
.expectEvents(query, {});
7676
}
7777
);
78+
79+
specTest(
80+
'Query is listened to by primary (with recovery)',
81+
['multi-client'],
82+
() => {
83+
const query = Query.atPath(path('collection'));
84+
85+
return (
86+
client(0)
87+
.expectPrimaryState(true)
88+
.failDatabase()
89+
.client(1)
90+
.userListens(query)
91+
.client(0)
92+
// The primary client 0 receives a WebStorage notification about the
93+
// new query, but it cannot load the target from IndexedDB. The
94+
// query will only be listened to once we recover the database.
95+
.recoverDatabase()
96+
.runTimer(TimerId.AsyncQueueRetry)
97+
.expectListen(query)
98+
.failDatabase()
99+
.client(1)
100+
.userUnlistens(query)
101+
.client(0)
102+
// The primary client 0 receives a notification that the query can
103+
// be released, but it can only process the change after we recover
104+
// the database.
105+
.expectActiveTargets({ query })
106+
.recoverDatabase()
107+
.runTimer(TimerId.AsyncQueueRetry)
108+
.expectActiveTargets()
109+
);
110+
}
111+
);
78112
}
79113
);

packages/firestore/test/unit/specs/spec_builder.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ export class SpecBuilder {
446446

447447
/** Overrides the currently expected set of active targets. */
448448
expectActiveTargets(
449-
...targets: Array<{ query: Query; resumeToken: string }>
449+
...targets: Array<{ query: Query; resumeToken?: string }>
450450
): this {
451451
this.assertStep('Active target expectation requires previous step');
452452
const currentStep = this.currentStep!;

0 commit comments

Comments
 (0)