Skip to content

Implement IndexedDB LRU Reference Delegate, add LRU tests #1224

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Sep 19, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
eab8740
Implement IndexedDB LRU Reference Delegate, add LRU tests
Sep 12, 2018
6818502
Lint cleanup
Sep 12, 2018
d37c2c4
[AUTOMATED]: Prettier Code Styling
Sep 12, 2018
11cad0d
Add garbagecollector property, reorder indexeddb query cache construc…
Sep 17, 2018
3114ce7
LiveTargets -> ActiveTargets, fix comments on ReferenceDelegate
Sep 17, 2018
5718c24
[AUTOMATED]: Prettier Code Styling
Sep 17, 2018
baa9fa8
Methods reordered to match android
Sep 17, 2018
ed3c84d
Remove forEachOrphanedDocument from query cache, move to reference de…
Sep 17, 2018
9d68610
Fix comment typo and reflow, move mutationQueuesContainKey to indexed…
Sep 17, 2018
cba7bf7
[AUTOMATED]: Prettier Code Styling
Sep 17, 2018
f88817a
Only pass reference delegate to indexeddb query cache, not whole pers…
Sep 17, 2018
a79a941
[AUTOMATED]: Prettier Code Styling
Sep 17, 2018
be943c5
iterateAsync -> iterateSerial
Sep 17, 2018
16eaaca
onLimboDocumentUpdated -> updateLimboDocument
Sep 17, 2018
e25162c
Reorder lru delegate methods to match android
Sep 17, 2018
279e14a
Regroup tests a little
Sep 17, 2018
34f7967
A little test cleanup
Sep 17, 2018
801694c
additionalReferences -> inMemoryPins
Sep 18, 2018
34f62fb
Using SortedSet instead of a PriorityQueue (#1233)
Sep 19, 2018
3b748b1
Drop sentinel row and sentinel key store. Add sequence number to targ…
Sep 19, 2018
11b0baf
Merge
Sep 19, 2018
366418e
Lint
Sep 19, 2018
fcbb366
Comments and flip conditional
Sep 19, 2018
c922dce
Compare to undefined, not typeof undefined
Sep 19, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 22 additions & 46 deletions packages/firestore/src/local/indexeddb_persistence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import {
} from './indexeddb_mutation_queue';
import {
IndexedDbQueryCache,
getHighestListenSequenceNumber
getHighestListenSequenceNumber,
documentTargetStore
} from './indexeddb_query_cache';
import { IndexedDbRemoteDocumentCache } from './indexeddb_remote_document_cache';
import {
Expand Down Expand Up @@ -1049,7 +1050,7 @@ function clientMetadataStore(

/** Provides LRU functionality for IndexedDB persistence. */
export class IndexedDbLruDelegate implements ReferenceDelegate, LruDelegate {
private additionalReferences: ReferenceSet | null;
private inMemoryPins: ReferenceSet | null;

readonly garbageCollector: LruGarbageCollector;

Expand Down Expand Up @@ -1078,7 +1079,7 @@ export class IndexedDbLruDelegate implements ReferenceDelegate, LruDelegate {
}

setInMemoryPins(inMemoryPins: ReferenceSet): void {
this.additionalReferences = inMemoryPins;
this.inMemoryPins = inMemoryPins;
}

addReference(
Expand Down Expand Up @@ -1122,15 +1123,13 @@ export class IndexedDbLruDelegate implements ReferenceDelegate, LruDelegate {
txn: PersistenceTransaction,
docKey: DocumentKey
): PersistencePromise<boolean> {
return this.additionalReferences!.containsKey(txn, docKey).next(
isPinned => {
if (isPinned) {
return PersistencePromise.resolve(true);
} else {
return mutationQueuesContainKey(txn, docKey);
}
return this.inMemoryPins!.containsKey(txn, docKey).next(isPinned => {
if (isPinned) {
return PersistencePromise.resolve(true);
} else {
return mutationQueuesContainKey(txn, docKey);
}
);
});
}

removeOrphanedDocuments(
Expand Down Expand Up @@ -1170,7 +1169,7 @@ export class IndexedDbLruDelegate implements ReferenceDelegate, LruDelegate {
docKey: DocumentKey
): PersistencePromise<void> {
return PersistencePromise.waitFor([
sentinelKeyStore(txn).delete(sentinelKey(docKey)),
documentTargetStore(txn).delete(sentinelKey(docKey)),
this.db.getRemoteDocumentCache().removeEntry(txn, docKey)
]);
}
Expand Down Expand Up @@ -1202,7 +1201,7 @@ export class IndexedDbLruDelegate implements ReferenceDelegate, LruDelegate {
txn: PersistenceTransaction,
f: (docKey: DocumentKey, sequenceNumber: ListenSequenceNumber) => void
): PersistencePromise<void> {
const store = sentinelKeyStore(txn);
const store = documentTargetStore(txn);
let nextToReport: ListenSequenceNumber = ListenSequence.INVALID;
let nextPath: EncodedResourcePath;
return store
Expand All @@ -1219,7 +1218,9 @@ export class IndexedDbLruDelegate implements ReferenceDelegate, LruDelegate {
}
// set nextToReport to be this sequence number. It's the next one we
// might report, if we don't find any targets for this document.
nextToReport = sequenceNumber;
// Note that the sequence number must be defined when the targetId
// is 0.
nextToReport = sequenceNumber!;
nextPath = path;
} else {
// set nextToReport to be invalid, we know we don't need to report
Expand All @@ -1239,51 +1240,26 @@ export class IndexedDbLruDelegate implements ReferenceDelegate, LruDelegate {
}
}

/**
* `SentinelRow` describes the schema of rows in the DbTargetDocument store that
* have TargetId === 0. The sequence number is an approximation of a last-used value
* for the document identified by the path portion of the key.
*/
type SentinelRow = {
targetId: TargetId;
path: EncodedResourcePath;
sequenceNumber: ListenSequenceNumber;
};

/**
* The sentinel key store is the same as the DbTargetDocument store, but allows for
* reading and writing sequence numbers as part of the value stored.
*/
function sentinelKeyStore(
txn: PersistenceTransaction
): SimpleDbStore<DbTargetDocumentKey, SentinelRow> {
return IndexedDbPersistence.getStore<DbTargetDocumentKey, SentinelRow>(
txn,
DbTargetDocument.store
);
}

function sentinelKey(key: DocumentKey): [TargetId, EncodedResourcePath] {
return [0, encode(key.path)];
}

/**
* @return A value suitable for writing in the sentinel key store.
* @return A value suitable for writing a sentinel row in the target-document
* store.
*/
function sentinelRow(
key: DocumentKey,
sequenceNumber: ListenSequenceNumber
): SentinelRow {
return {
targetId: 0,
path: encode(key.path),
sequenceNumber
};
): DbTargetDocument {
return new DbTargetDocument(0, encode(key.path), sequenceNumber);
}

function writeSentinelKey(
txn: PersistenceTransaction,
key: DocumentKey
): PersistencePromise<void> {
return sentinelKeyStore(txn).put(sentinelRow(key, txn.currentSequenceNumber));
return documentTargetStore(txn).put(
sentinelRow(key, txn.currentSequenceNumber)
);
}
2 changes: 1 addition & 1 deletion packages/firestore/src/local/indexeddb_query_cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ export function getHighestListenSequenceNumber(
/**
* Helper to get a typed SimpleDbStore for the document target object store.
*/
function documentTargetStore(
export function documentTargetStore(
txn: PersistenceTransaction
): SimpleDbStore<DbTargetDocumentKey, DbTargetDocument> {
return IndexedDbPersistence.getStore<DbTargetDocumentKey, DbTargetDocument>(
Expand Down
28 changes: 21 additions & 7 deletions packages/firestore/src/local/indexeddb_schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import * as api from '../protos/firestore_proto_api';
import { BatchId } from '../core/types';
import { BatchId, ListenSequenceNumber } from '../core/types';
import { TargetId } from '../core/types';
import { ResourcePath } from '../model/path';
import { assert } from '../util/assert';
Expand Down Expand Up @@ -573,9 +573,14 @@ export class DbTarget {
export type DbTargetDocumentKey = [TargetId, EncodedResourcePath];

/**
* An object representing an association between a target and a document.
* Stored in the targetDocument object store to store the documents tracked by a
* particular target.
* An object representing an association between a target and a document, or a
* sentinel row marking the last sequence number at which a document was used.
* Each document cached must have a corresponding sentinel row before lru
* garbage collection is enabled.
*
* The target associations and sentinel rows are co-located so that orphaned
* documents and their sequence numbers can be identified efficiently via a scan
* of this store.
*/
export class DbTargetDocument {
/** Name of the IndexedDb object store. */
Expand All @@ -592,14 +597,23 @@ export class DbTargetDocument {

constructor(
/**
* The targetId identifying a target.
* The targetId identifying a target or 0 for a sentinel row.
*/
public targetId: TargetId,
/**
* The path to the document, as encoded in the key.
*/
public path: EncodedResourcePath
) {}
public path: EncodedResourcePath,
/**
*
*/
public sequenceNumber?: ListenSequenceNumber
) {
assert(
(targetId !== 0) !== (typeof sequenceNumber !== 'undefined'),
'A target-document row must either have targetId == 0 and a defined sequence number, or a non-zero targetId and no sequence number'
);
}
}

/**
Expand Down
158 changes: 40 additions & 118 deletions packages/firestore/src/local/lru_garbage_collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import { PersistenceTransaction } from './persistence';
import { PersistencePromise } from './persistence_promise';
import { ListenSequenceNumber } from '../core/types';
import { ListenSequence } from '../core/listen_sequence';
import { assert } from '../util/assert';
import { AnyJs } from '../util/misc';
import { AnyJs, primitiveComparator } from '../util/misc';
import { SortedSet } from '../util/sorted_set';

/**
* Persistence layers intending to use LRU Garbage collection should have reference delegates that
Expand Down Expand Up @@ -77,139 +77,61 @@ export type ActiveTargets = {
[id: number]: AnyJs;
};

/**
* A selective port of `java.util.PriorityQueue`
* {@see <a href="https://github.com/openjdk-mirror/jdk7u-jdk/blob/master/src/share/classes/java/util/PriorityQueue.java">PriorityQueue.java</a>}
* The queue does not grow and must have an initial capacity when it is constructed. Additionally, elements may only be
* `poll()`'d and cannot be removed from any other position in the queue.
*/
class PriorityQueue<T> {
private _size = 0;
get size(): number {
return this._size;
}
private readonly queue: T[];
constructor(
private readonly capacity: number,
private readonly comparator: (a: T, b: T) => number
) {
assert(capacity > 0, 'Capacity must be greater than 0');
this.queue = new Array<T>(capacity);
}

add(elem: T): void {
assert(this._size + 1 <= this.capacity, 'Queue is over capacity');
if (this._size === 0) {
this.queue[0] = elem;
this._size = 1;
} else {
this.siftUp(elem);
}
}

poll(): T | null {
if (this._size === 0) {
return null;
}
const result = this.queue[0];
const newSize = --this._size;
const last = this.queue[newSize];
delete this.queue[newSize];
if (newSize !== 0) {
this.siftDown(last);
}
return result;
}

peek(): T | null {
if (this._size > 0) {
return this.queue[0];
}
return null;
}

private siftUp(elem: T): void {
let k = this._size;
while (k > 0) {
const parent = (k - 1) >>> 1;
const toCheck = this.queue[parent];
const comp = this.comparator(elem, toCheck);
if (comp >= 0) {
break;
}
this.queue[k] = toCheck;
k = parent;
}
this.queue[k] = elem;
this._size++;
}

private siftDown(lastElem: T): void {
let k = 0;
const half = this._size >>> 1;
while (k < half) {
let child = (k << 1) + 1;
let toCheck = this.queue[child];
const right = child + 1;
if (
right < this._size &&
this.comparator(toCheck, this.queue[right]) > 0
) {
toCheck = this.queue[right];
child = right;
}
if (this.comparator(lastElem, toCheck) <= 0) {
break;
}
this.queue[k] = toCheck;
k = child;
}
this.queue[k] = lastElem;
// The type and comparator for the items contained in the SortedSet used in
// place of a priority queue for the RollingSequenceNumberBuffer.
type BufferEntry = [ListenSequenceNumber, number];
function bufferEntryComparator(
[aSequence, aIndex]: BufferEntry,
[bSequence, bIndex]: BufferEntry
): number {
const seqCmp = primitiveComparator(aSequence, bSequence);
if (seqCmp === 0) {
// This order doesn't matter, but we can bias against churn by sorting
// entries created earlier as less than newer entries.
return primitiveComparator(aIndex, bIndex);
} else {
return seqCmp;
}
}

/**
* Used to calculate the nth sequence number. Keeps a rolling buffer of the lowest n values passed
* to `addElement`, and finally reports the largest of them in `maxValue`.
* Used to calculate the nth sequence number. Keeps a rolling buffer of the
* lowest n values passed to `addElement`, and finally reports the largest of
* them in `maxValue`.
*/
class RollingSequenceNumberBuffer {
private queue: PriorityQueue<ListenSequenceNumber>;
private buffer: SortedSet<BufferEntry> = new SortedSet<BufferEntry>(
bufferEntryComparator
);

// Invert the comparison because we want to keep the smallest values.
private static COMPARATOR: (
a: ListenSequenceNumber,
b: ListenSequenceNumber
) => number = (a, b) => {
if (b < a) {
return -1;
} else if (b === a) {
return 0;
}
return 1;
};
private previousIndex = 0;

constructor(private readonly maxElements: number) {}

constructor(private readonly maxElements: number) {
this.queue = new PriorityQueue(
maxElements,
RollingSequenceNumberBuffer.COMPARATOR
);
private nextIndex(): number {
return ++this.previousIndex;
}

addElement(sequenceNumber: ListenSequenceNumber): void {
if (this.queue.size < this.maxElements) {
this.queue.add(sequenceNumber);
const entry: BufferEntry = [sequenceNumber, this.nextIndex()];
if (this.buffer.size < this.maxElements) {
this.buffer = this.buffer.add(entry);
} else {
// Note: use first because we have inverted the comparison
const highestValue = this.queue.peek()!;
if (sequenceNumber < highestValue) {
this.queue.poll();
this.queue.add(sequenceNumber);
const highestValue = this.buffer.last()!;
if (bufferEntryComparator(entry, highestValue) < 0) {
this.buffer = this.buffer.delete(highestValue).add(entry);
}
}
}

get maxValue(): ListenSequenceNumber {
return this.queue.peek()!;
// Guaranteed to be non-empty. If we decide we are not collecting any
// sequence numbers, nthSequenceNumber below short-circuits. If we have
// decided that we are collecting n sequence numbers, it's because n is some
// percentage of the existing sequence numbers. That means we should never
// be in a situation where we are collecting sequence numbers but don't
// actually have any.
return this.buffer.last()![0];
}
}

Expand Down