Skip to content

Update query-document mapping from bundles. #3620

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 14, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 37 additions & 7 deletions packages/firestore/src/core/bundle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ import {
saveNamedQuery
} from '../local/local_store';
import { SizedBundleElement } from '../util/bundle_reader';
import { MaybeDocumentMap } from '../model/collections';
import {
documentKeySet,
DocumentKeySet,
MaybeDocumentMap
} from '../model/collections';
import { BundleMetadata } from '../protos/firestore_bundle_proto';

/**
Expand Down Expand Up @@ -208,6 +212,30 @@ export class BundleLoader {
return null;
}

private getQueryDocumentMapping(
documents: BundledDocuments
): Map<string, DocumentKeySet> {
const queryDocumentMap = new Map<string, DocumentKeySet>();
const bundleConverter = new BundleConverter(
this.localStore.getSerializer()
);
for (const bundleDoc of documents) {
if (bundleDoc.metadata.queries) {
const documentKey = bundleConverter.toDocumentKey(
bundleDoc.metadata.name!
);
for (const queryName of bundleDoc.metadata.queries) {
const documentKeys = (
queryDocumentMap.get(queryName) || documentKeySet()
).add(documentKey);
queryDocumentMap.set(queryName, documentKeys);
}
}
}

return queryDocumentMap;
}

/**
* Update the progress to 'Success' and return the updated progress.
*/
Expand All @@ -218,16 +246,18 @@ export class BundleLoader {
'Bundled documents ends with a document metadata and missing document.'
);

for (const q of this.queries) {
await saveNamedQuery(this.localStore, q);
}

const changedDocs = await applyBundleDocuments(
const changedDocuments = await applyBundleDocuments(
this.localStore,
this.documents
);

const queryDocumentMap = this.getQueryDocumentMapping(this.documents);

for (const q of this.queries) {
await saveNamedQuery(this.localStore, q, queryDocumentMap.get(q.name!));
}

this.progress.taskState = 'Success';
return new BundleLoadResult({ ...this.progress }, changedDocs);
return new BundleLoadResult({ ...this.progress }, changedDocuments);
}
}
33 changes: 28 additions & 5 deletions packages/firestore/src/local/local_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ export interface LocalStore {
executeQuery(query: Query, usePreviousResults: boolean): Promise<QueryResult>;

collectGarbage(garbageCollector: LruGarbageCollector): Promise<LruResults>;

/** Returns the serializer associated with the local store. */
getSerializer(): JsonProtoSerializer;
}

/**
Expand Down Expand Up @@ -1083,6 +1086,10 @@ class LocalStoreImpl implements LocalStore {
txn => garbageCollector.collect(txn, this.targetDataByTarget)
);
}

getSerializer(): JsonProtoSerializer {
return this.serializer;
}
}

export function newLocalStore(
Expand Down Expand Up @@ -1373,7 +1380,8 @@ export function getNamedQuery(
*/
export async function saveNamedQuery(
localStore: LocalStore,
query: bundleProto.NamedQuery
query: bundleProto.NamedQuery,
documents: DocumentKeySet = documentKeySet()
): Promise<void> {
// Allocate a target for the named query such that it can be resumed
// from associated read time if users use it to listen.
Expand All @@ -1389,19 +1397,34 @@ export async function saveNamedQuery(
'readwrite',
transaction => {
// Update allocated target's read time, if the bundle's read time is newer.
let updateReadTime = PersistencePromise.resolve();
let readTimeUpdated = PersistencePromise.resolve();
const readTime = fromVersion(query.readTime!);
if (allocated.snapshotVersion.compareTo(readTime) < 0) {
const newTargetData = allocated.withResumeToken(
ByteString.EMPTY_BYTE_STRING,
readTime
);
updateReadTime = localStoreImpl.targetCache.updateTargetData(
transaction,
readTimeUpdated = localStoreImpl.targetCache
.updateTargetData(transaction, newTargetData)
.next(() =>
localStoreImpl.targetCache.removeMatchingKeysForTargetId(
transaction,
allocated.targetId
)
)
.next(() =>
localStoreImpl.targetCache.addMatchingKeys(
transaction,
documents,
allocated.targetId
)
);
localStoreImpl.targetDataByTarget = localStoreImpl.targetDataByTarget.insert(
newTargetData.targetId,
newTargetData
);
}
return updateReadTime.next(() =>
return readTimeUpdated.next(() =>
localStoreImpl.bundleCache.saveNamedQuery(transaction, query)
);
}
Expand Down
3 changes: 3 additions & 0 deletions packages/firestore/src/protos/firestore/bundle.proto
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ message BundledDocumentMetadata {

// Whether the document exists.
bool exists = 3;

// The names of the queries in this bundle that this document matches to.
repeated string queries = 4;
}

// Metadata describing the bundle file/stream.
Expand Down
3 changes: 3 additions & 0 deletions packages/firestore/src/protos/firestore_bundle_proto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ export interface BundledDocumentMetadata {

/** BundledDocumentMetadata exists */
exists?: boolean | null;

/** The names of the queries in this bundle that this document matches to. */
queries?: string[];
}

/** Properties of a BundleMetadata. */
Expand Down
99 changes: 95 additions & 4 deletions packages/firestore/test/unit/local/local_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import { LocalViewChanges } from '../../../src/local/local_view_changes';
import { Persistence } from '../../../src/local/persistence';
import { SimpleQueryEngine } from '../../../src/local/simple_query_engine';
import {
DocumentKeySet,
documentKeySet,
MaybeDocumentMap
} from '../../../src/model/collections';
Expand Down Expand Up @@ -91,6 +92,7 @@ import {
query,
setMutation,
TestBundledDocuments,
TestNamedQuery,
TestSnapshotVersion,
transformMutation,
unknownDoc,
Expand Down Expand Up @@ -137,7 +139,7 @@ class LocalStoreTester {
| RemoteEvent
| LocalViewChanges
| TestBundledDocuments
| bundleProto.NamedQuery
| TestNamedQuery
): LocalStoreTester {
if (op instanceof Mutation) {
return this.afterMutations([op]);
Expand Down Expand Up @@ -188,17 +190,21 @@ class LocalStoreTester {

this.promiseChain = this.promiseChain
.then(() => applyBundleDocuments(this.localStore, documents))
.then((result: MaybeDocumentMap) => {
.then(result => {
this.lastChanges = result;
});
return this;
}

afterNamedQuery(namedQuery: bundleProto.NamedQuery): LocalStoreTester {
afterNamedQuery(testQuery: TestNamedQuery): LocalStoreTester {
this.prepareNextStep();

this.promiseChain = this.promiseChain.then(() =>
saveNamedQuery(this.localStore, namedQuery)
saveNamedQuery(
this.localStore,
testQuery.namedQuery,
testQuery.matchingDocuments
)
);
return this;
}
Expand Down Expand Up @@ -432,6 +438,29 @@ class LocalStoreTester {
return this;
}

toHaveQueryDocumentMapping(
persistence: Persistence,
targetId: TargetId,
expectedKeys: DocumentKeySet
): LocalStoreTester {
this.promiseChain = this.promiseChain.then(() => {
return persistence.runTransaction(
'toHaveQueryDocumentMapping',
'readonly',
transaction => {
return persistence
.getTargetCache()
.getMatchingKeysForTargetId(transaction, targetId)
.next(matchedKeys => {
expect(matchedKeys.isEqual(expectedKeys)).to.be.true;
});
}
);
});

return this;
}

toHaveNewerBundle(
metadata: bundleProto.BundleMetadata,
expected: boolean
Expand Down Expand Up @@ -1706,6 +1735,68 @@ function genericLocalStoreTests(
.finish();
});

it('loading named queries allocates targets and updates target document mapping', async () => {
const expectedQueryDocumentMap = new Map([
['query-1', documentKeySet(key('foo1/bar'))],
['query-2', documentKeySet(key('foo2/bar'))]
]);
const version1 = SnapshotVersion.fromTimestamp(Timestamp.fromMillis(10000));
const version2 = SnapshotVersion.fromTimestamp(Timestamp.fromMillis(20000));

return expectLocalStore()
.after(
bundledDocuments(
[doc('foo1/bar', 1, { sum: 1337 }), doc('foo2/bar', 2, { sum: 42 })],
[['query-1'], ['query-2']]
)
)
.toReturnChanged(
doc('foo1/bar', 1, { sum: 1337 }),
doc('foo2/bar', 2, { sum: 42 })
)
.toContain(doc('foo1/bar', 1, { sum: 1337 }))
.toContain(doc('foo2/bar', 2, { sum: 42 }))
.after(
namedQuery(
'query-1',
query('foo1'),
/* limitType */ 'FIRST',
version1,
expectedQueryDocumentMap.get('query-1')
)
)
.toHaveNamedQuery({
name: 'query-1',
query: query('foo1'),
readTime: version1
})
.toHaveQueryDocumentMapping(
persistence,
/*targetId*/ 2,
/*expectedKeys*/ documentKeySet(key('foo1/bar'))
)
.after(
namedQuery(
'query-2',
query('foo2'),
/* limitType */ 'FIRST',
version2,
expectedQueryDocumentMap.get('query-2')
)
)
.toHaveNamedQuery({
name: 'query-2',
query: query('foo2'),
readTime: version2
})
.toHaveQueryDocumentMapping(
persistence,
/*targetId*/ 4,
/*expectedKeys*/ documentKeySet(key('foo2/bar'))
)
.finish();
});

it('handles saving and loading limit to last queries', async () => {
const now = Timestamp.now();
return expectLocalStore()
Expand Down
39 changes: 26 additions & 13 deletions packages/firestore/test/util/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -445,14 +445,16 @@ export class TestBundledDocuments {
}

export function bundledDocuments(
documents: MaybeDocument[]
documents: MaybeDocument[],
queryNames?: string[][]
): TestBundledDocuments {
const result = documents.map(d => {
const result = documents.map((d, index) => {
return {
metadata: {
name: toName(JSON_SERIALIZER, d.key),
readTime: toVersion(JSON_SERIALIZER, d.version),
exists: d instanceof Document
exists: d instanceof Document,
queries: queryNames ? queryNames[index] : undefined
},
document:
d instanceof Document ? toDocument(JSON_SERIALIZER, d) : undefined
Expand All @@ -462,21 +464,32 @@ export function bundledDocuments(
return new TestBundledDocuments(result);
}

export class TestNamedQuery {
constructor(
public namedQuery: bundleProto.NamedQuery,
public matchingDocuments: DocumentKeySet
) {}
}

export function namedQuery(
name: string,
query: Query,
limitType: bundleProto.LimitType,
readTime: SnapshotVersion
): bundleProto.NamedQuery {
readTime: SnapshotVersion,
matchingDocuments: DocumentKeySet = documentKeySet()
): TestNamedQuery {
return {
name,
readTime: toTimestamp(JSON_SERIALIZER, readTime.toTimestamp()),
bundledQuery: {
parent: toQueryTarget(JSON_SERIALIZER, queryToTarget(query)).parent,
limitType,
structuredQuery: toQueryTarget(JSON_SERIALIZER, queryToTarget(query))
.structuredQuery
}
namedQuery: {
name,
readTime: toTimestamp(JSON_SERIALIZER, readTime.toTimestamp()),
bundledQuery: {
parent: toQueryTarget(JSON_SERIALIZER, queryToTarget(query)).parent,
limitType,
structuredQuery: toQueryTarget(JSON_SERIALIZER, queryToTarget(query))
.structuredQuery
}
},
matchingDocuments
};
}

Expand Down