Skip to content

Apply range-filters in get when probing active-listener cache #4408

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Feb 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .changeset/small-icons-allow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
'@firebase/database': patch
---
Fixed an issue with `Query.get()` where Query filters are not applied to data in some cases.
4 changes: 2 additions & 2 deletions packages/database/src/core/Repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,8 @@ export class Repo {
*/
getValue(query: Query): Promise<DataSnapshot> {
// Only active queries are cached. There is no persisted cache.
const cached = this.serverSyncTree_.calcCompleteEventCache(query.path);
if (!cached.isEmpty()) {
const cached = this.serverSyncTree_.getServerValue(query);
if (cached != null) {
return Promise.resolve(
new DataSnapshot(
cached,
Expand Down
47 changes: 38 additions & 9 deletions packages/database/src/core/SyncPoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,24 @@ export class SyncPoint {
return events;
}
}

/**
* Add an event callback for the specified query.
* Get a view for the specified query.
*
* @param serverCache Complete server cache, if we have it.
* @param query The query to return a view for
* @param writesCache
* @param serverCache
* @param serverCacheComplete
* @return Events to raise.
*/
addEventRegistration(
getView(
query: Query,
eventRegistration: EventRegistration,
writesCache: WriteTreeRef,
serverCache: Node | null,
serverCacheComplete: boolean
): Event[] {
): View {
const queryId = query.queryIdentifier();
let view = this.views.get(queryId);
const view = this.views.get(queryId);
if (!view) {
// TODO: make writesCache take flag for complete server node
let eventCache = writesCache.calcCompleteEventCache(
Expand All @@ -128,10 +130,37 @@ export class SyncPoint {
new CacheNode(eventCache, eventCacheComplete, false),
new CacheNode(serverCache, serverCacheComplete, false)
);
view = new View(query, viewCache);
this.views.set(queryId, view);
return new View(query, viewCache);
}
return view;
}

/**
* Add an event callback for the specified query.
*
* @param query
* @param eventRegistration
* @param writesCache
* @param serverCache Complete server cache, if we have it.
* @param serverCacheComplete
* @return Events to raise.
*/
addEventRegistration(
query: Query,
eventRegistration: EventRegistration,
writesCache: WriteTreeRef,
serverCache: Node | null,
serverCacheComplete: boolean
): Event[] {
const view = this.getView(
query,
writesCache,
serverCache,
serverCacheComplete
);
if (!this.views.has(query.queryIdentifier())) {
this.views.set(query.queryIdentifier(), view);
}
// This is guaranteed to exist now, we just created anything that was missing
view.addEventRegistration(eventRegistration);
return view.getInitialEvents(eventRegistration);
Expand Down
33 changes: 33 additions & 0 deletions packages/database/src/core/SyncTree.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import { Node } from './snap/Node';
import { Event } from './view/Event';
import { EventRegistration } from './view/EventRegistration';
import { View } from './view/View';
import { CacheNode } from './view/CacheNode';

/**
* @typedef {{
Expand Down Expand Up @@ -501,6 +502,38 @@ export class SyncTree {
);
}

getServerValue(query: Query): Node | null {
const path = query.path;
let serverCache: Node | null = null;
// Any covering writes will necessarily be at the root, so really all we need to find is the server cache.
// Consider optimizing this once there's a better understanding of what actual behavior will be.
this.syncPointTree_.foreachOnPath(path, (pathToSyncPoint, sp) => {
const relativePath = Path.relativePath(pathToSyncPoint, path);
serverCache = serverCache || sp.getCompleteServerCache(relativePath);
});
let syncPoint = this.syncPointTree_.get(path);
if (!syncPoint) {
syncPoint = new SyncPoint();
this.syncPointTree_ = this.syncPointTree_.set(path, syncPoint);
} else {
serverCache = serverCache || syncPoint.getCompleteServerCache(Path.Empty);
}
const serverCacheComplete = serverCache != null;
const serverCacheNode: CacheNode | null = serverCacheComplete
? new CacheNode(serverCache, true, false)
: null;
const writesCache: WriteTreeRef | null = this.pendingWriteTree_.childWrites(
query.path
);
const view: View = syncPoint.getView(
query,
writesCache,
serverCacheComplete ? serverCacheNode.getNode() : ChildrenNode.EMPTY_NODE,
serverCacheComplete
);
return view.getCompleteNode();
}

/**
* This collapses multiple unfiltered views into a single view, since we only need a single
* listener for them.
Expand Down
4 changes: 4 additions & 0 deletions packages/database/src/core/view/View.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ export class View {
return this.viewCache_.getServerCache().getNode();
}

getCompleteNode(): Node | null {
return this.viewCache_.getCompleteEventSnap();
}

getCompleteServerCache(path: Path): Node | null {
const cache = this.viewCache_.getCompleteServerSnap();
if (cache) {
Expand Down
188 changes: 179 additions & 9 deletions packages/database/test/query.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1399,6 +1399,40 @@ describe('Query Tests', () => {
expect(Object.values(snap.val())).to.deep.equal([snap.val()[childOne.key]]);
});

it('Ensure startAfter on key index works with overlapping listener', async () => {
const node = getRandomNode() as Reference;
const childOne = node.push();
const childTwo = node.push();
// Create a server synced and a latency-compensated write
await childOne.set(1);
childTwo.set(2);
const ea = EventAccumulatorFactory.waitsForCount(1);
node.on('value', snap => {
ea.addEvent(snap.val());
});
await ea.promise;
const snap = await node.orderByKey().startAfter(childOne.key).get();
expect(Object.keys(snap.val())).to.deep.equal([childTwo.key]);
expect(Object.values(snap.val())).to.deep.equal([snap.val()[childTwo.key]]);
});

it('Ensure endBefore on key index works with overlapping listener', async () => {
const node = getRandomNode() as Reference;
const childOne = node.push();
const childTwo = node.push();
// Create a server synced and a latency-compensated write
await childOne.set(1);
childTwo.set(2);
const ea = EventAccumulatorFactory.waitsForCount(1);
node.on('value', snap => {
ea.addEvent(snap.val());
});
await ea.promise;
const snap = await node.orderByKey().endBefore(childTwo.key).get();
expect(Object.keys(snap.val())).to.deep.equal([childOne.key]);
expect(Object.values(snap.val())).to.deep.equal([snap.val()[childOne.key]]);
});

it('Ensure startAt / endAt with priority works.', async () => {
const node = getRandomNode() as Reference;

Expand Down Expand Up @@ -3120,14 +3154,28 @@ describe('Query Tests', () => {
expect((await node.get()).val()).to.equal(null);
});

it('get at non-empty root returns correct value', async () => {
it('get at node returns correct value', async () => {
const node = getRandomNode() as Reference;
const expected = { foo: 'a', bar: 'b' };
await node.set(expected);
const snapshot = await node.get();
expect(snapshot.val()).to.deep.equal(expected);
});

it('get for child returns correct value', async () => {
const node = getRandomNode() as Reference;
await node.set({ foo: 'a', bar: 'b', baz: 'c' });
const snapshot = await node.child('baz').get();
expect(snapshot.val()).to.deep.equal('c');
});

it('get for parent returns correct value', async () => {
const node = getRandomNode() as Reference;
const child = node.child('child');
await child.set(1);
expect((await node.get()).val()).to.deep.equal({ child: 1 });
});

it('get for removed node returns correct value', async () => {
const node = getRandomNode() as Reference;
const expected = { foo: 'a', bar: 'b' };
Expand All @@ -3140,7 +3188,7 @@ describe('Query Tests', () => {
expect(snapshot.val()).to.be.null;
});

it('get while offline is rejected', async () => {
it('get for missing node while offline is rejected', async () => {
const node = getRandomNode() as Reference;
node.database.goOffline();
try {
Expand All @@ -3150,13 +3198,7 @@ describe('Query Tests', () => {
}
});

it('get returns the latest value', async () => {
const node = getRandomNode() as Reference;
await node.set({ foo: 'bar' });
expect((await node.get()).val()).to.deep.equal({ foo: 'bar' });
});

it('get reads from cache if database is not connected', async () => {
it('get reads node from cache when not connected', async () => {
const node = getRandomNode() as Reference;
const node2 = getFreshRepo(node.path);
try {
Expand All @@ -3178,6 +3220,134 @@ describe('Query Tests', () => {
}
});

it('get reads child node from cache when not connected', async () => {
const node = getRandomNode() as Reference;
const node2 = getFreshRepo(node.path);
try {
await node2.set({ foo: 'bar' });
const onSnapshot = await new Promise((resolve, _) => {
node.on('value', snap => {
resolve(snap);
});
});
node.database.goOffline();
const getSnapshot = await node.child('foo').get();
// node's cache dropped here.
node.off();
expect(getSnapshot.val()).to.deep.equal('bar');
} finally {
node.database.goOnline();
}
});

it('get reads parent node from cache when not connected', async () => {
const node = getRandomNode() as Reference;
const node2 = getFreshRepo(node.path);
try {
await node2.set({ foo: 'bar' });
await node2.child('baz').set(1);
const onSnapshot = await new Promise((resolve, _) => {
node.on('value', snap => {
resolve(snap);
});
});
node.database.goOffline();
const getSnapshot = await node.get();
// node's cache dropped here.
node.off();
expect(getSnapshot.val()).to.deep.equal({ foo: 'bar', baz: 1 });
} finally {
node.database.goOnline();
}
});

it('get with pending node writes when not connected', async () => {
const node = getRandomNode() as Reference;
const node2 = getFreshRepo(node.path);
try {
await node2.set({ foo: 'bar' });
const onSnapshot = await new Promise((resolve, _) => {
node.on('value', snap => {
resolve(snap);
});
});
node.database.goOffline();
node.set({ foo: 'baz' });
const getSnapshot = await node.get();
// node's cache dropped here.
node.off();
expect(getSnapshot.val()).to.deep.equal({ foo: 'baz' });
} finally {
node.database.goOnline();
}
});

it('get with pending child writes when not connected', async () => {
const node = getRandomNode() as Reference;
const node2 = getFreshRepo(node.path);
try {
await node2.set({ foo: 'bar' });
const onSnapshot = await new Promise((resolve, _) => {
node.on('value', snap => {
resolve(snap);
});
});
node.database.goOffline();
node.child('baz').set(true);
const getSnapshot = await node.get();
// node's cache dropped here.
node.off();
expect(getSnapshot.val()).to.deep.equal({ foo: 'bar', baz: true });
} finally {
node.database.goOnline();
}
});

it('get with pending parent writes when not connected', async () => {
const node = getRandomNode() as Reference;
const node2 = getFreshRepo(node.path);
try {
await node2.set({ foo: 'bar' });
const onSnapshot = await new Promise((resolve, _) => {
node.on('value', snap => {
resolve(snap);
});
});
node.database.goOffline();
node.set({ foo: 'baz' });
const getSnapshot = await node.child('foo').get();
// node's cache dropped here.
node.off();
expect(getSnapshot.val()).to.deep.equal('baz');
} finally {
node.database.goOnline();
}
});

it('get with pending writes', async () => {
const node = getRandomNode() as Reference;
node.database.goOffline();
try {
node.set({ foo: 'bar' });
const snap = await node.get();
expect(snap.val()).to.deep.equal({ foo: 'bar' });
} finally {
node.database.goOnline();
}
});

it('get child of pending writes', async () => {
const node = getRandomNode() as Reference;
node.database.goOffline();
try {
node.set({ foo: 'bar' });
const snap = await node.child('foo').get();
expect(snap.val()).to.deep.equal('bar');
} finally {
node.database.goOnline();
}
});

it('get does not cache sibling data', async () => {
const reader = getRandomNode() as Reference;
const writer = getFreshRepo(reader.path);
Expand Down