Skip to content

Update query-document mapping from bundles. #3620

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 14, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions packages/firestore/exp/src/api/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import {
indexedDbStoragePrefix
} from '../../../src/local/indexeddb_persistence';
import { LoadBundleTask } from '../../../src/api/bundle';
import { Query } from '../../../lite';
import {
getLocalStore,
getPersistence,
Expand Down Expand Up @@ -332,9 +331,15 @@ export function loadBundle(
const firestoreImpl = cast(firestore, Firestore);
const resultTask = new LoadBundleTask();
// eslint-disable-next-line @typescript-eslint/no-floating-promises
getSyncEngine(firestoreImpl).then(syncEngine =>
enqueueLoadBundle(firestoreImpl._queue, syncEngine, bundleData, resultTask)
);
getSyncEngine(firestoreImpl).then(async syncEngine => {
enqueueLoadBundle(
(await firestoreImpl._getConfiguration()).databaseInfo.databaseId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This style of code is pretty hard to debug (a lot of the magic happens inline). Do you mind extracting a local variable for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

firestoreImpl._queue,
syncEngine,
bundleData,
resultTask
);
});

return resultTask;
}
Expand Down
47 changes: 38 additions & 9 deletions packages/firestore/src/core/bundle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ import {
saveNamedQuery
} from '../local/local_store';
import { SizedBundleElement } from '../util/bundle_reader';
import { MaybeDocumentMap } from '../model/collections';
import {
documentKeySet,
DocumentKeySet,
MaybeDocumentMap
} from '../model/collections';
import { BundleMetadata } from '../protos/firestore_bundle_proto';

/**
Expand Down Expand Up @@ -79,7 +83,7 @@ export type BundledDocuments = BundledDocument[];
* Helper to convert objects from bundles to model objects in the SDK.
*/
export class BundleConverter {
constructor(private serializer: JsonProtoSerializer) {}
constructor(private readonly serializer: JsonProtoSerializer) {}

toDocumentKey(name: string): DocumentKey {
return fromName(this.serializer, name);
Expand Down Expand Up @@ -161,7 +165,8 @@ export class BundleLoader {

constructor(
private metadata: bundleProto.BundleMetadata,
private localStore: LocalStore
private localStore: LocalStore,
private serializer: JsonProtoSerializer
) {
this.progress = bundleInitialProgress(metadata);
}
Expand Down Expand Up @@ -208,6 +213,28 @@ export class BundleLoader {
return null;
}

private getQueryDocumentMapping(
documents: BundledDocuments
): Map<string, DocumentKeySet> {
const queryDocumentMap = new Map<string, DocumentKeySet>();
const bundleConverter = new BundleConverter(this.serializer);
for (const bundleDoc of documents) {
if (bundleDoc.metadata.queries) {
const documentKey = bundleConverter.toDocumentKey(
bundleDoc.metadata.name!
);
for (const queryName of bundleDoc.metadata.queries) {
const documentKeys = (
queryDocumentMap.get(queryName) || documentKeySet()
).add(documentKey);
queryDocumentMap.set(queryName, documentKeys);
}
}
}

return queryDocumentMap;
}

/**
* Update the progress to 'Success' and return the updated progress.
*/
Expand All @@ -218,16 +245,18 @@ export class BundleLoader {
'Bundled documents ends with a document metadata and missing document.'
);

for (const q of this.queries) {
await saveNamedQuery(this.localStore, q);
}

const changedDocs = await applyBundleDocuments(
const changedDocuments = await applyBundleDocuments(
this.localStore,
this.documents
);

const queryDocumentMap = this.getQueryDocumentMapping(this.documents);

for (const q of this.queries) {
await saveNamedQuery(this.localStore, q, queryDocumentMap.get(q.name!));
}

this.progress.taskState = 'Success';
return new BundleLoadResult({ ...this.progress }, changedDocs);
return new BundleLoadResult({ ...this.progress }, changedDocuments);
}
}
16 changes: 11 additions & 5 deletions packages/firestore/src/core/firestore_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ import { TransactionRunner } from './transaction_runner';
import { Datastore } from '../remote/datastore';
import { BundleReader } from '../util/bundle_reader';
import { LoadBundleTask } from '../api/bundle';
import { newTextEncoder } from '../platform/serializer';
import { newSerializer, newTextEncoder } from '../platform/serializer';
import { toByteStreamReader } from '../platform/byte_stream_reader';
import { NamedQuery } from './bundle';
import { JsonProtoSerializer } from '../remote/serializer';

const LOG_TAG = 'FirestoreClient';
export const MAX_CONCURRENT_LIMBO_RESOLUTIONS = 100;
Expand Down Expand Up @@ -537,7 +538,10 @@ export class FirestoreClient {
): void {
this.verifyNotTerminated();

const reader = createBundleReader(data);
const reader = createBundleReader(
data,
newSerializer(this.databaseInfo.databaseId)
);
this.asyncQueue.enqueueAndForget(async () => {
loadBundle(this.syncEngine, reader, resultTask);
return resultTask.catch(e => {
Expand Down Expand Up @@ -798,24 +802,26 @@ export function enqueueExecuteQueryViaSnapshotListener(
}

function createBundleReader(
data: ReadableStream<Uint8Array> | ArrayBuffer | string
data: ReadableStream<Uint8Array> | ArrayBuffer | string,
serializer: JsonProtoSerializer
): BundleReader {
let content: ReadableStream<Uint8Array> | ArrayBuffer;
if (typeof data === 'string') {
content = newTextEncoder().encode(data);
} else {
content = data;
}
return new BundleReader(toByteStreamReader(content));
return new BundleReader(toByteStreamReader(content), serializer);
}

export function enqueueLoadBundle(
databaseId: DatabaseId,
asyncQueue: AsyncQueue,
syncEngine: SyncEngine,
data: ReadableStream<Uint8Array> | ArrayBuffer | string,
resultTask: LoadBundleTask
): void {
const reader = createBundleReader(data);
const reader = createBundleReader(data, newSerializer(databaseId));
asyncQueue.enqueueAndForget(async () => {
loadBundle(syncEngine, reader, resultTask);
return resultTask.catch(e => {
Expand Down
6 changes: 5 additions & 1 deletion packages/firestore/src/core/sync_engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1358,7 +1358,11 @@ async function loadBundleImpl(

task._updateProgress(bundleInitialProgress(metadata));

const loader = new BundleLoader(metadata, syncEngine.localStore);
const loader = new BundleLoader(
metadata,
syncEngine.localStore,
reader.serializer
);
let element = await reader.nextElement();
while (element) {
debugAssert(
Expand Down
48 changes: 34 additions & 14 deletions packages/firestore/src/local/local_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1373,7 +1373,8 @@ export function getNamedQuery(
*/
export async function saveNamedQuery(
localStore: LocalStore,
query: bundleProto.NamedQuery
query: bundleProto.NamedQuery,
documents: DocumentKeySet = documentKeySet()
): Promise<void> {
// Allocate a target for the named query such that it can be resumed
// from associated read time if users use it to listen.
Expand All @@ -1383,27 +1384,46 @@ export async function saveNamedQuery(
const allocated = await localStore.allocateTarget(
queryToTarget(fromBundledQuery(query.bundledQuery!))
);

const localStoreImpl = debugCast(localStore, LocalStoreImpl);
return localStoreImpl.persistence.runTransaction(
'Save named query',
'readwrite',
transaction => {
// Update allocated target's read time, if the bundle's read time is newer.
let updateReadTime = PersistencePromise.resolve();
const readTime = fromVersion(query.readTime!);
if (allocated.snapshotVersion.compareTo(readTime) < 0) {
const newTargetData = allocated.withResumeToken(
ByteString.EMPTY_BYTE_STRING,
readTime
);
updateReadTime = localStoreImpl.targetCache.updateTargetData(
transaction,
newTargetData
);
// Simply save the query itself if it is older than what the SDK already
// has.
if (allocated.snapshotVersion.compareTo(readTime) >= 0) {
return localStoreImpl.bundleCache.saveNamedQuery(transaction, query);
}
return updateReadTime.next(() =>
localStoreImpl.bundleCache.saveNamedQuery(transaction, query)

// Update existing target data because the query from the bundle is newer.
const newTargetData = allocated.withResumeToken(
ByteString.EMPTY_BYTE_STRING,
readTime
);
localStoreImpl.targetDataByTarget = localStoreImpl.targetDataByTarget.insert(
newTargetData.targetId,
newTargetData
);
return localStoreImpl.targetCache
.updateTargetData(transaction, newTargetData)
.next(() =>
localStoreImpl.targetCache.removeMatchingKeysForTargetId(
transaction,
allocated.targetId
)
)
.next(() =>
localStoreImpl.targetCache.addMatchingKeys(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should only do this if you updated the readTime above. On top of that, you need to also remove all existing keys, otherwise your mapping will not match the backend's mapping.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree to both.

transaction,
documents,
allocated.targetId
)
)
.next(() =>
localStoreImpl.bundleCache.saveNamedQuery(transaction, query)
);
}
);
}
3 changes: 3 additions & 0 deletions packages/firestore/src/protos/firestore/bundle.proto
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ message BundledDocumentMetadata {

// Whether the document exists.
bool exists = 3;

// The names of the queries in this bundle that this document matches to.
repeated string queries = 4;
}

// Metadata describing the bundle file/stream.
Expand Down
3 changes: 3 additions & 0 deletions packages/firestore/src/protos/firestore_bundle_proto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ export interface BundledDocumentMetadata {

/** BundledDocumentMetadata exists */
exists?: boolean | null;

/** The names of the queries in this bundle that this document matches to. */
queries?: string[];
}

/** Properties of a BundleMetadata. */
Expand Down
14 changes: 11 additions & 3 deletions packages/firestore/src/util/bundle_reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { Deferred } from './promise';
import { debugAssert } from './assert';
import { toByteStreamReader } from '../platform/byte_stream_reader';
import { newTextDecoder } from '../platform/serializer';
import { JsonProtoSerializer } from '../remote/serializer';

/**
* A complete element in the bundle stream, together with the byte length it
Expand Down Expand Up @@ -70,13 +71,20 @@ export class BundleReader {
/** The decoder used to parse binary data into strings. */
private textDecoder: TextDecoder;

static fromBundleSource(source: BundleSource): BundleReader {
return new BundleReader(toByteStreamReader(source, BYTES_PER_READ));
static fromBundleSource(
source: BundleSource,
serializer: JsonProtoSerializer
): BundleReader {
return new BundleReader(
toByteStreamReader(source, BYTES_PER_READ),
serializer
);
}

constructor(
/** The reader to read from underlying binary bundle data source. */
private reader: ReadableStreamReader<Uint8Array>
private reader: ReadableStreamReader<Uint8Array>,
readonly serializer: JsonProtoSerializer
) {
this.textDecoder = newTextDecoder();
// Read the metadata (which is the first element).
Expand Down
Loading