Skip to content

Using SortedSet instead of a PriorityQueue #1233

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 19, 2018
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 31 additions & 118 deletions packages/firestore/src/local/lru_garbage_collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import { PersistenceTransaction } from './persistence';
import { PersistencePromise } from './persistence_promise';
import { ListenSequenceNumber } from '../core/types';
import { ListenSequence } from '../core/listen_sequence';
import { assert } from '../util/assert';
import { AnyJs } from '../util/misc';
import { AnyJs, primitiveComparator } from '../util/misc';
import { SortedSet } from '../util/sorted_set';

/**
* Persistence layers intending to use LRU Garbage collection should have reference delegates that
Expand Down Expand Up @@ -77,139 +77,52 @@ export type ActiveTargets = {
[id: number]: AnyJs;
};

/**
* A selective port of `java.util.PriorityQueue`
* {@see <a href="https://github.com/openjdk-mirror/jdk7u-jdk/blob/master/src/share/classes/java/util/PriorityQueue.java">PriorityQueue.java</a>}
* The queue does not grow and must have an initial capacity when it is constructed. Additionally, elements may only be
* `poll()`'d and cannot be removed from any other position in the queue.
*/
class PriorityQueue<T> {
private _size = 0;
get size(): number {
return this._size;
}
private readonly queue: T[];
constructor(
private readonly capacity: number,
private readonly comparator: (a: T, b: T) => number
) {
assert(capacity > 0, 'Capacity must be greater than 0');
this.queue = new Array<T>(capacity);
}

add(elem: T): void {
assert(this._size + 1 <= this.capacity, 'Queue is over capacity');
if (this._size === 0) {
this.queue[0] = elem;
this._size = 1;
} else {
this.siftUp(elem);
}
}

poll(): T | null {
if (this._size === 0) {
return null;
}
const result = this.queue[0];
const newSize = --this._size;
const last = this.queue[newSize];
delete this.queue[newSize];
if (newSize !== 0) {
this.siftDown(last);
}
return result;
}

peek(): T | null {
if (this._size > 0) {
return this.queue[0];
}
return null;
}

private siftUp(elem: T): void {
let k = this._size;
while (k > 0) {
const parent = (k - 1) >>> 1;
const toCheck = this.queue[parent];
const comp = this.comparator(elem, toCheck);
if (comp >= 0) {
break;
}
this.queue[k] = toCheck;
k = parent;
}
this.queue[k] = elem;
this._size++;
}

private siftDown(lastElem: T): void {
let k = 0;
const half = this._size >>> 1;
while (k < half) {
let child = (k << 1) + 1;
let toCheck = this.queue[child];
const right = child + 1;
if (
right < this._size &&
this.comparator(toCheck, this.queue[right]) > 0
) {
toCheck = this.queue[right];
child = right;
}
if (this.comparator(lastElem, toCheck) <= 0) {
break;
}
this.queue[k] = toCheck;
k = child;
}
this.queue[k] = lastElem;
// The type and comparator for the items contained in the SortedSet used in
// place of a priority queue for the RollingSequenceNumberBuffer.
type BufferEntry = [ListenSequenceNumber, number];
function bufferEntryComparator(a: BufferEntry, b: BufferEntry): number {
const seqCmp = primitiveComparator(a[0], b[0]);
if (seqCmp === 0) {
// This order doesn't matter, but we can bias against churn by sorting
// entries created earlier as less than newer entries.
return primitiveComparator(a[1], b[1]);
} else {
return seqCmp;
}
}

/**
* Used to calculate the nth sequence number. Keeps a rolling buffer of the lowest n values passed
* to `addElement`, and finally reports the largest of them in `maxValue`.
* Used to calculate the nth sequence number. Keeps a rolling buffer of the
* lowest n values passed to `addElement`, and finally reports the largest of
* them in `maxValue`.
*/
class RollingSequenceNumberBuffer {
private queue: PriorityQueue<ListenSequenceNumber>;
private buffer: SortedSet<BufferEntry> = new SortedSet<BufferEntry>(
bufferEntryComparator
);

// Invert the comparison because we want to keep the smallest values.
private static COMPARATOR: (
a: ListenSequenceNumber,
b: ListenSequenceNumber
) => number = (a, b) => {
if (b < a) {
return -1;
} else if (b === a) {
return 0;
}
return 1;
};
private previousIndex = 0;

constructor(private readonly maxElements: number) {}

constructor(private readonly maxElements: number) {
this.queue = new PriorityQueue(
maxElements,
RollingSequenceNumberBuffer.COMPARATOR
);
private nextIndex(): number {
return ++this.previousIndex;
}

addElement(sequenceNumber: ListenSequenceNumber): void {
if (this.queue.size < this.maxElements) {
this.queue.add(sequenceNumber);
const entry: BufferEntry = [sequenceNumber, this.nextIndex()];
if (this.buffer.size < this.maxElements) {
this.buffer = this.buffer.add(entry);
} else {
// Note: use first because we have inverted the comparison
const highestValue = this.queue.peek()!;
if (sequenceNumber < highestValue) {
this.queue.poll();
this.queue.add(sequenceNumber);
const highestValue = this.buffer.last()!;
if (bufferEntryComparator(entry, highestValue) < 0) {
this.buffer = this.buffer.delete(highestValue).add(entry);
}
}
}

get maxValue(): ListenSequenceNumber {
return this.queue.peek()!;
return this.buffer.last()![0];
}
}

Expand Down