Skip to content

Implement IndexedDB LRU Reference Delegate, add LRU tests #1224

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Sep 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
eab8740
Implement IndexedDB LRU Reference Delegate, add LRU tests
Sep 12, 2018
6818502
Lint cleanup
Sep 12, 2018
d37c2c4
[AUTOMATED]: Prettier Code Styling
Sep 12, 2018
11cad0d
Add garbagecollector property, reorder indexeddb query cache construc…
Sep 17, 2018
3114ce7
LiveTargets -> ActiveTargets, fix comments on ReferenceDelegate
Sep 17, 2018
5718c24
[AUTOMATED]: Prettier Code Styling
Sep 17, 2018
baa9fa8
Methods reordered to match android
Sep 17, 2018
ed3c84d
Remove forEachOrphanedDocument from query cache, move to reference de…
Sep 17, 2018
9d68610
Fix comment typo and reflow, move mutationQueuesContainKey to indexed…
Sep 17, 2018
cba7bf7
[AUTOMATED]: Prettier Code Styling
Sep 17, 2018
f88817a
Only pass reference delegate to indexeddb query cache, not whole pers…
Sep 17, 2018
a79a941
[AUTOMATED]: Prettier Code Styling
Sep 17, 2018
be943c5
iterateAsync -> iterateSerial
Sep 17, 2018
16eaaca
onLimboDocumentUpdated -> updateLimboDocument
Sep 17, 2018
e25162c
Reorder lru delegate methods to match android
Sep 17, 2018
279e14a
Regroup tests a little
Sep 17, 2018
34f7967
A little test cleanup
Sep 17, 2018
801694c
additionalReferences -> inMemoryPins
Sep 18, 2018
34f62fb
Using SortedSet instead of a PriorityQueue (#1233)
Sep 19, 2018
3b748b1
Drop sentinel row and sentinel key store. Add sequence number to targ…
Sep 19, 2018
11b0baf
Merge
Sep 19, 2018
366418e
Lint
Sep 19, 2018
fcbb366
Comments and flip conditional
Sep 19, 2018
c922dce
Compare to undefined, not typeof undefined
Sep 19, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 42 additions & 13 deletions packages/firestore/src/local/indexeddb_mutation_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -525,19 +525,7 @@ export class IndexedDbMutationQueue implements MutationQueue {
txn: PersistenceTransaction,
key: DocumentKey
): PersistencePromise<boolean> {
const indexKey = DbDocumentMutation.prefixForPath(this.userId, key.path);
const encodedPath = indexKey[1];
const startRange = IDBKeyRange.lowerBound(indexKey);
let containsKey = false;
return documentMutationsStore(txn)
.iterate({ range: startRange, keysOnly: true }, (key, value, control) => {
const [userID, keyPath, /*batchID*/ _] = key;
if (userID === this.userId && keyPath === encodedPath) {
containsKey = true;
}
control.done();
})
.next(() => containsKey);
return mutationQueueContainsKey(txn, this.userId, key);
}

// PORTING NOTE: Multi-tab only (state is held in memory in other clients).
Expand All @@ -560,6 +548,47 @@ export class IndexedDbMutationQueue implements MutationQueue {
}
}

/**
* @return true if the mutation queue for the given user contains a pending mutation for the given key.
*/
function mutationQueueContainsKey(
txn: PersistenceTransaction,
userId: string,
key: DocumentKey
): PersistencePromise<boolean> {
const indexKey = DbDocumentMutation.prefixForPath(userId, key.path);
const encodedPath = indexKey[1];
const startRange = IDBKeyRange.lowerBound(indexKey);
let containsKey = false;
return documentMutationsStore(txn)
.iterate({ range: startRange, keysOnly: true }, (key, value, control) => {
const [userID, keyPath, /*batchID*/ _] = key;
if (userID === userId && keyPath === encodedPath) {
containsKey = true;
}
control.done();
})
.next(() => containsKey);
}

/** Returns true if any mutation queue contains the given document. */
export function mutationQueuesContainKey(
txn: PersistenceTransaction,
docKey: DocumentKey
): PersistencePromise<boolean> {
let found = false;
return mutationQueuesStore(txn)
.iterateSerial(userId => {
return mutationQueueContainsKey(txn, userId, docKey).next(containsKey => {
if (containsKey) {
found = true;
}
return PersistencePromise.resolve(!containsKey);
});
})
.next(() => found);
}

/**
* Delete a mutation batch and the associated document mutations.
* @return A PersistencePromise of the document mutations that were removed.
Expand Down
250 changes: 243 additions & 7 deletions packages/firestore/src/local/indexeddb_persistence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ import { assert, fail } from '../util/assert';
import { Code, FirestoreError } from '../util/error';
import * as log from '../util/log';

import { IndexedDbMutationQueue } from './indexeddb_mutation_queue';
import {
IndexedDbMutationQueue,
mutationQueuesContainKey
} from './indexeddb_mutation_queue';
import {
IndexedDbQueryCache,
getHighestListenSequenceNumber
getHighestListenSequenceNumber,
documentTargetStore
} from './indexeddb_query_cache';
import { IndexedDbRemoteDocumentCache } from './indexeddb_remote_document_cache';
import {
Expand All @@ -35,24 +39,35 @@ import {
DbPrimaryClientKey,
SCHEMA_VERSION,
DbTargetGlobal,
SchemaConverter
SchemaConverter,
DbTargetDocument
} from './indexeddb_schema';
import { LocalSerializer } from './local_serializer';
import { MutationQueue } from './mutation_queue';
import {
Persistence,
PersistenceTransaction,
PrimaryStateListener
PrimaryStateListener,
ReferenceDelegate
} from './persistence';
import { PersistencePromise } from './persistence_promise';
import { QueryCache } from './query_cache';
import { RemoteDocumentCache } from './remote_document_cache';
import { SimpleDb, SimpleDbStore, SimpleDbTransaction } from './simple_db';
import { Platform } from '../platform/platform';
import { AsyncQueue, TimerId } from '../util/async_queue';
import { ClientId } from './shared_client_state';
import { CancelablePromise } from '../util/promise';
import { ListenSequence, SequenceNumberSyncer } from '../core/listen_sequence';
import { ReferenceSet } from './reference_set';
import { QueryData } from './query_data';
import { DocumentKey } from '../model/document_key';
import { decode, encode, EncodedResourcePath } from './encoded_resource_path';
import {
ActiveTargets,
LruDelegate,
LruGarbageCollector
} from './lru_garbage_collector';
import { ListenSequenceNumber, TargetId } from '../core/types';

const LOG_TAG = 'IndexedDbPersistence';

Expand Down Expand Up @@ -247,6 +262,7 @@ export class IndexedDbPersistence implements Persistence {
private remoteDocumentCache: IndexedDbRemoteDocumentCache;
private readonly webStorage: Storage;
private listenSequence: ListenSequence;
readonly referenceDelegate: IndexedDbLruDelegate;

// Note that `multiClientParams` must be present to enable multi-client support while multi-tab
// is still experimental. When multi-client is switched to always on, `multiClientParams` will
Expand All @@ -265,11 +281,15 @@ export class IndexedDbPersistence implements Persistence {
UNSUPPORTED_PLATFORM_ERROR_MSG
);
}
this.referenceDelegate = new IndexedDbLruDelegate(this);
this.dbName = persistenceKey + IndexedDbPersistence.MAIN_DATABASE;
this.serializer = new LocalSerializer(serializer);
this.document = platform.document;
this.allowTabSynchronization = multiClientParams !== undefined;
this.queryCache = new IndexedDbQueryCache(this.serializer);
this.queryCache = new IndexedDbQueryCache(
this.referenceDelegate,
this.serializer
);
this.remoteDocumentCache = new IndexedDbRemoteDocumentCache(
this.serializer,
/*keepDocumentChangeLog=*/ this.allowTabSynchronization
Expand Down Expand Up @@ -694,7 +714,7 @@ export class IndexedDbPersistence implements Persistence {
return IndexedDbMutationQueue.forUser(user, this.serializer);
}

getQueryCache(): QueryCache {
getQueryCache(): IndexedDbQueryCache {
assert(
this.started,
'Cannot initialize QueryCache before persistence is started.'
Expand Down Expand Up @@ -1026,3 +1046,219 @@ function clientMetadataStore(
DbClientMetadata.store
);
}

/** Provides LRU functionality for IndexedDB persistence. */
export class IndexedDbLruDelegate implements ReferenceDelegate, LruDelegate {
private inMemoryPins: ReferenceSet | null;

readonly garbageCollector: LruGarbageCollector;

constructor(private readonly db: IndexedDbPersistence) {
this.garbageCollector = new LruGarbageCollector(this);
}

getTargetCount(txn: PersistenceTransaction): PersistencePromise<number> {
return this.db.getQueryCache().getQueryCount(txn);
}

forEachTarget(
txn: PersistenceTransaction,
f: (q: QueryData) => void
): PersistencePromise<void> {
return this.db.getQueryCache().forEachTarget(txn, f);
}

forEachOrphanedDocumentSequenceNumber(
txn: PersistenceTransaction,
f: (sequenceNumber: ListenSequenceNumber) => void
): PersistencePromise<void> {
return this.forEachOrphanedDocument(txn, (docKey, sequenceNumber) =>
f(sequenceNumber)
);
}

setInMemoryPins(inMemoryPins: ReferenceSet): void {
this.inMemoryPins = inMemoryPins;
}

addReference(
txn: PersistenceTransaction,
key: DocumentKey
): PersistencePromise<void> {
return writeSentinelKey(txn, key);
}

removeReference(
txn: PersistenceTransaction,
key: DocumentKey
): PersistencePromise<void> {
return writeSentinelKey(txn, key);
}

removeTargets(
txn: PersistenceTransaction,
upperBound: ListenSequenceNumber,
activeTargetIds: ActiveTargets
): PersistencePromise<number> {
return this.db
.getQueryCache()
.removeTargets(txn, upperBound, activeTargetIds);
}

removeMutationReference(
txn: PersistenceTransaction,
key: DocumentKey
): PersistencePromise<void> {
return writeSentinelKey(txn, key);
}

/**
* Returns true if anything would prevent this document from being garbage
* collected, given that the document in question is not present in any
* targets and has a sequence number less than or equal to the upper bound for
* the collection run.
*/
private isPinned(
txn: PersistenceTransaction,
docKey: DocumentKey
): PersistencePromise<boolean> {
return this.inMemoryPins!.containsKey(txn, docKey).next(isPinned => {
if (isPinned) {
return PersistencePromise.resolve(true);
} else {
return mutationQueuesContainKey(txn, docKey);
}
});
}

removeOrphanedDocuments(
txn: PersistenceTransaction,
upperBound: ListenSequenceNumber
): PersistencePromise<number> {
let count = 0;
const promises: Array<PersistencePromise<void>> = [];
const iteration = this.forEachOrphanedDocument(
txn,
(docKey, sequenceNumber) => {
if (sequenceNumber <= upperBound) {
const p = this.isPinned(txn, docKey).next(isPinned => {
if (!isPinned) {
count++;
return this.removeOrphanedDocument(txn, docKey);
}
});
promises.push(p);
}
}
);
// Wait for iteration first to make sure we have a chance to add all of the
// removal promises to the array.
return iteration
.next(() => PersistencePromise.waitFor(promises))
.next(() => count);
}

/**
* Clears a document from the cache. The document is assumed to be orphaned, so target-document
* associations are not queried. We remove it from the remote document cache, as well as remove
* its sentinel row.
*/
private removeOrphanedDocument(
txn: PersistenceTransaction,
docKey: DocumentKey
): PersistencePromise<void> {
return PersistencePromise.waitFor([
documentTargetStore(txn).delete(sentinelKey(docKey)),
this.db.getRemoteDocumentCache().removeEntry(txn, docKey)
]);
}

removeTarget(
txn: PersistenceTransaction,
queryData: QueryData
): PersistencePromise<void> {
const updated = queryData.copy({
sequenceNumber: txn.currentSequenceNumber
});
return this.db.getQueryCache().updateQueryData(txn, updated);
}

updateLimboDocument(
txn: PersistenceTransaction,
key: DocumentKey
): PersistencePromise<void> {
return writeSentinelKey(txn, key);
}

/**
* Call provided function for each document in the cache that is 'orphaned'. Orphaned
* means not a part of any target, so the only entry in the target-document index for
* that document will be the sentinel row (targetId 0), which will also have the sequence
* number for the last time the document was accessed.
*/
private forEachOrphanedDocument(
txn: PersistenceTransaction,
f: (docKey: DocumentKey, sequenceNumber: ListenSequenceNumber) => void
): PersistencePromise<void> {
const store = documentTargetStore(txn);
let nextToReport: ListenSequenceNumber = ListenSequence.INVALID;
let nextPath: EncodedResourcePath;
return store
.iterate(
{
index: DbTargetDocument.documentTargetsIndex
},
([targetId, docKey], { path, sequenceNumber }) => {
if (targetId === 0) {
// if nextToReport is valid, report it, this is a new key so the
// last one must not be a member of any targets.
if (nextToReport !== ListenSequence.INVALID) {
f(new DocumentKey(decode(nextPath)), nextToReport);
}
// set nextToReport to be this sequence number. It's the next one we
// might report, if we don't find any targets for this document.
// Note that the sequence number must be defined when the targetId
// is 0.
nextToReport = sequenceNumber!;
nextPath = path;
} else {
// set nextToReport to be invalid, we know we don't need to report
// this one since we found a target for it.
nextToReport = ListenSequence.INVALID;
}
}
)
.next(() => {
// Since we report sequence numbers after getting to the next key, we
// need to check if the last key we iterated over was an orphaned
// document and report it.
if (nextToReport !== ListenSequence.INVALID) {
f(new DocumentKey(decode(nextPath)), nextToReport);
}
});
}
}

function sentinelKey(key: DocumentKey): [TargetId, EncodedResourcePath] {
return [0, encode(key.path)];
}

/**
* @return A value suitable for writing a sentinel row in the target-document
* store.
*/
function sentinelRow(
key: DocumentKey,
sequenceNumber: ListenSequenceNumber
): DbTargetDocument {
return new DbTargetDocument(0, encode(key.path), sequenceNumber);
}

function writeSentinelKey(
txn: PersistenceTransaction,
key: DocumentKey
): PersistencePromise<void> {
return documentTargetStore(txn).put(
sentinelRow(key, txn.currentSequenceNumber)
);
}
Loading