Skip to content

Commit 9493faa

Browse files
author
Greg Soltis
authored
Add sequence numbers and sync them (#1172)
* Add sequence numbers and sync them * Lint and comments * FirestoreClient and ListenSequence review feedback * Make webStorage non-optional * Return a promise of removeddocs from removeMutationBatch * [AUTOMATED]: Prettier Code Styling * Rename sequence number parsing function * [AUTOMATED]: Prettier Code Styling * IndexedDb startup refactoring to throw proper errors early * Renamed tests * Refactor Syncer interface for sequence numbers to conform to other examples * [AUTOMATED]: Prettier Code Styling * Shorten the comment * Drop 'only' * Add comment re multiClientParams * SharedClientState existing doesn't imply multiple clients * Remove start() from Persistence interface (#1179) * WIP on switching to static constructors * Remove start() from Persistence interface, use static helpers for IndexedDbPersistence construction * [AUTOMATED]: Prettier Code Styling * Mark start() private * Remove unused import * Use explicit type, switch async to Promise.resolve * Export the type * Condense a few lines * Use persistence key to scope sequence numbers to a project * [AUTOMATED]: Prettier Code Styling * Fix test to use new sequence number key * updateSequenceNumber -> writeSequenceNumber * Add comments to interface, switch to assert for sequenceNumberHandler * [AUTOMATED]: Prettier Code Styling
1 parent ac7f9ce commit 9493faa

15 files changed

+518
-186
lines changed

packages/firestore/src/core/firestore_client.ts

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -301,17 +301,7 @@ export class FirestoreClient {
301301
useProto3Json: true
302302
});
303303

304-
return Promise.resolve().then(() => {
305-
const persistence: IndexedDbPersistence = new IndexedDbPersistence(
306-
storagePrefix,
307-
this.clientId,
308-
this.platform,
309-
this.asyncQueue,
310-
serializer,
311-
settings.experimentalTabSynchronization
312-
);
313-
this.persistence = persistence;
314-
304+
return Promise.resolve().then(async () => {
315305
if (
316306
settings.experimentalTabSynchronization &&
317307
!WebStorageSharedClientState.isAvailable(this.platform)
@@ -322,16 +312,32 @@ export class FirestoreClient {
322312
);
323313
}
324314

325-
this.sharedClientState = settings.experimentalTabSynchronization
326-
? new WebStorageSharedClientState(
327-
this.asyncQueue,
328-
this.platform,
329-
storagePrefix,
330-
this.clientId,
331-
user
332-
)
333-
: new MemorySharedClientState();
334-
return persistence.start();
315+
if (settings.experimentalTabSynchronization) {
316+
this.sharedClientState = new WebStorageSharedClientState(
317+
this.asyncQueue,
318+
this.platform,
319+
storagePrefix,
320+
this.clientId,
321+
user
322+
);
323+
this.persistence = await IndexedDbPersistence.createMultiClientIndexedDbPersistence(
324+
storagePrefix,
325+
this.clientId,
326+
this.platform,
327+
this.asyncQueue,
328+
serializer,
329+
{ sequenceNumberSyncer: this.sharedClientState }
330+
);
331+
} else {
332+
this.sharedClientState = new MemorySharedClientState();
333+
this.persistence = await IndexedDbPersistence.createIndexedDbPersistence(
334+
storagePrefix,
335+
this.clientId,
336+
this.platform,
337+
this.asyncQueue,
338+
serializer
339+
);
340+
}
335341
});
336342
}
337343

@@ -344,7 +350,7 @@ export class FirestoreClient {
344350
this.garbageCollector = new EagerGarbageCollector();
345351
this.persistence = new MemoryPersistence(this.clientId);
346352
this.sharedClientState = new MemorySharedClientState();
347-
return this.persistence.start();
353+
return Promise.resolve();
348354
}
349355

350356
/**
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/**
2+
* Copyright 2018 Google Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import { ListenSequenceNumber } from './types';
18+
19+
/**
20+
* `SequenceNumberSyncer` defines the methods required to keep multiple instances of a `ListenSequence` in sync.
21+
*/
22+
export interface SequenceNumberSyncer {
23+
// Notify the syncer that a new sequence number has been used.
24+
writeSequenceNumber(sequenceNumber: ListenSequenceNumber): void;
25+
// Setting this property allows the syncer to notify when a sequence number has been used, and
26+
// and lets the ListenSequence adjust its internal previous value accordingly.
27+
sequenceNumberHandler:
28+
| ((sequenceNumber: ListenSequenceNumber) => void)
29+
| null;
30+
}
31+
32+
/**
33+
* `ListenSequence` is a monotonic sequence. It is initialized with a minimum value to
34+
* exceed. All subsequent calls to next will return increasing values. If provided with a
35+
* `SequenceNumberSyncer`, it will additionally bump its next value when told of a new value, as well as write out
36+
* sequence numbers that it produces via `next()`.
37+
*/
38+
export class ListenSequence {
39+
static readonly INVALID: ListenSequenceNumber = -1;
40+
41+
private writeNewSequenceNumber?: (
42+
newSequenceNumber: ListenSequenceNumber
43+
) => void;
44+
45+
constructor(
46+
private previousValue: ListenSequenceNumber,
47+
sequenceNumberSyncer?: SequenceNumberSyncer
48+
) {
49+
if (sequenceNumberSyncer) {
50+
sequenceNumberSyncer.sequenceNumberHandler = sequenceNumber =>
51+
this.setPreviousValue(sequenceNumber);
52+
this.writeNewSequenceNumber = sequenceNumber =>
53+
sequenceNumberSyncer.writeSequenceNumber(sequenceNumber);
54+
}
55+
}
56+
57+
private setPreviousValue(
58+
externalPreviousValue: ListenSequenceNumber
59+
): ListenSequenceNumber {
60+
this.previousValue = Math.max(externalPreviousValue, this.previousValue);
61+
return this.previousValue;
62+
}
63+
64+
next(): ListenSequenceNumber {
65+
const nextValue = ++this.previousValue;
66+
if (this.writeNewSequenceNumber) {
67+
this.writeNewSequenceNumber(nextValue);
68+
}
69+
return nextValue;
70+
}
71+
}

packages/firestore/src/core/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ export type BatchId = number;
2626
*/
2727
export type TargetId = number;
2828

29+
export type ListenSequenceNumber = number;
30+
2931
// TODO(b/35918695): In GRPC / node, tokens are Uint8Array. In WebChannel,
3032
// they're strings. We should probably (de-)serialize to a common internal type.
3133
export type ProtoByteString = Uint8Array | string;

packages/firestore/src/local/indexeddb_mutation_queue.ts

Lines changed: 63 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,11 @@ import { LocalSerializer } from './local_serializer';
4141
import { MutationQueue } from './mutation_queue';
4242
import { PersistenceTransaction } from './persistence';
4343
import { PersistencePromise } from './persistence_promise';
44-
import { SimpleDbStore } from './simple_db';
45-
import { IndexedDbPersistence } from './indexeddb_persistence';
44+
import { SimpleDbStore, SimpleDbTransaction } from './simple_db';
45+
import {
46+
IndexedDbPersistence,
47+
IndexedDbTransaction
48+
} from './indexeddb_persistence';
4649

4750
/** A mutation queue for a specific user, backed by IndexedDB. */
4851
export class IndexedDbMutationQueue implements MutationQueue {
@@ -479,41 +482,18 @@ export class IndexedDbMutationQueue implements MutationQueue {
479482
transaction: PersistenceTransaction,
480483
batch: MutationBatch
481484
): PersistencePromise<void> {
482-
const mutationStore = mutationsStore(transaction);
483-
const indexTxn = documentMutationsStore(transaction);
484-
const promises: Array<PersistencePromise<void>> = [];
485-
486-
const range = IDBKeyRange.only(batch.batchId);
487-
let numDeleted = 0;
488-
const removePromise = mutationStore.iterate(
489-
{ range },
490-
(key, value, control) => {
491-
numDeleted++;
492-
return control.delete();
493-
}
494-
);
495-
promises.push(
496-
removePromise.next(() => {
497-
assert(
498-
numDeleted === 1,
499-
'Dangling document-mutation reference found: Missing batch ' +
500-
batch.batchId
501-
);
502-
})
503-
);
504-
for (const mutation of batch.mutations) {
505-
const indexKey = DbDocumentMutation.key(
506-
this.userId,
507-
mutation.key.path,
508-
batch.batchId
509-
);
510-
promises.push(indexTxn.delete(indexKey));
485+
return removeMutationBatch(
486+
(transaction as IndexedDbTransaction).simpleDbTransaction,
487+
this.userId,
488+
batch
489+
).next(removedDocuments => {
490+
this.removeCachedMutationKeys(batch.batchId);
511491
if (this.garbageCollector !== null) {
512-
this.garbageCollector.addPotentialGarbageKey(mutation.key);
492+
for (const key of removedDocuments) {
493+
this.garbageCollector.addPotentialGarbageKey(key);
494+
}
513495
}
514-
}
515-
this.removeCachedMutationKeys(batch.batchId);
516-
return PersistencePromise.waitFor(promises);
496+
});
517497
}
518498

519499
removeCachedMutationKeys(batchId: BatchId): void {
@@ -598,6 +578,54 @@ export class IndexedDbMutationQueue implements MutationQueue {
598578
}
599579
}
600580

581+
/**
582+
* Delete a mutation batch and the associated document mutations.
583+
* @return A PersistencePromise of the document mutations that were removed.
584+
*/
585+
export function removeMutationBatch(
586+
txn: SimpleDbTransaction,
587+
userId: string,
588+
batch: MutationBatch
589+
): PersistencePromise<DocumentKey[]> {
590+
const mutationStore = txn.store<DbMutationBatchKey, DbMutationBatch>(
591+
DbMutationBatch.store
592+
);
593+
const indexTxn = txn.store<DbDocumentMutationKey, DbDocumentMutation>(
594+
DbDocumentMutation.store
595+
);
596+
const promises: Array<PersistencePromise<void>> = [];
597+
598+
const range = IDBKeyRange.only(batch.batchId);
599+
let numDeleted = 0;
600+
const removePromise = mutationStore.iterate(
601+
{ range },
602+
(key, value, control) => {
603+
numDeleted++;
604+
return control.delete();
605+
}
606+
);
607+
promises.push(
608+
removePromise.next(() => {
609+
assert(
610+
numDeleted === 1,
611+
'Dangling document-mutation reference found: Missing batch ' +
612+
batch.batchId
613+
);
614+
})
615+
);
616+
const removedDocuments: DocumentKey[] = [];
617+
for (const mutation of batch.mutations) {
618+
const indexKey = DbDocumentMutation.key(
619+
userId,
620+
mutation.key.path,
621+
batch.batchId
622+
);
623+
promises.push(indexTxn.delete(indexKey));
624+
removedDocuments.push(mutation.key);
625+
}
626+
return PersistencePromise.waitFor(promises).next(() => removedDocuments);
627+
}
628+
601629
function convertStreamToken(token: ProtoByteString): string {
602630
if (token instanceof Uint8Array) {
603631
// TODO(b/78771403): Convert tokens to strings during deserialization

0 commit comments

Comments
 (0)