diff --git a/packages/firestore-types/index.d.ts b/packages/firestore-types/index.d.ts index 2346aebf2eb..9b2bc043cce 100644 --- a/packages/firestore-types/index.d.ts +++ b/packages/firestore-types/index.d.ts @@ -93,9 +93,40 @@ export class FirebaseFirestore { terminate(): Promise; + loadBundle( + bundleData: ArrayBuffer | ReadableStream | string + ): LoadBundleTask; + INTERNAL: { delete: () => Promise }; } +export interface LoadBundleTask { + onProgress( + next?: (progress: LoadBundleTaskProgress) => any, + error?: (error: Error) => any, + complete?: () => void + ): void; + + then( + onFulfilled?: (a: LoadBundleTaskProgress) => T | PromiseLike, + onRejected?: (a: Error) => R | PromiseLike + ): Promise; + + catch( + onRejected: (a: Error) => R | PromiseLike + ): Promise; +} + +export interface LoadBundleTaskProgress { + documentsLoaded: number; + totalDocuments: number; + bytesLoaded: number; + totalBytes: number; + taskState: TaskState; +} + +export type TaskState = 'Error' | 'Running' | 'Success'; + export class GeoPoint { constructor(latitude: number, longitude: number); diff --git a/packages/firestore/src/api/bundle.ts b/packages/firestore/src/api/bundle.ts new file mode 100644 index 00000000000..703a5d06482 --- /dev/null +++ b/packages/firestore/src/api/bundle.ts @@ -0,0 +1,117 @@ +/** + * @license + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as firestore from '@firebase/firestore-types'; +import { Deferred } from '../util/promise'; +import { PartialObserver } from './observer'; +import { debugAssert } from '../util/assert'; + +export class LoadBundleTask + implements + firestore.LoadBundleTask, + PromiseLike { + private _progressObserver: PartialObserver< + firestore.LoadBundleTaskProgress + > = {}; + private _taskCompletionResolver = new Deferred< + firestore.LoadBundleTaskProgress + >(); + + private _lastProgress: firestore.LoadBundleTaskProgress = { + taskState: 'Running', + totalBytes: 0, + totalDocuments: 0, + bytesLoaded: 0, + documentsLoaded: 0 + }; + + onProgress( + next?: (progress: firestore.LoadBundleTaskProgress) => unknown, + error?: (err: Error) => unknown, + complete?: () => void + ): void { + this._progressObserver = { + next, + error, + complete + }; + } + + catch( + onRejected: (a: Error) => R | PromiseLike + ): Promise { + return this._taskCompletionResolver.promise.catch(onRejected); + } + + then( + onFulfilled?: (a: firestore.LoadBundleTaskProgress) => T | PromiseLike, + onRejected?: (a: Error) => R | PromiseLike + ): Promise { + return this._taskCompletionResolver.promise.then(onFulfilled, onRejected); + } + + /** + * Notifies all observers that bundle loading has completed, with a provided + * `LoadBundleTaskProgress` object. + */ + _completeWith(progress: firestore.LoadBundleTaskProgress): void { + debugAssert( + progress.taskState === 'Success', + 'Task is not completed with Success.' + ); + this._updateProgress(progress); + if (this._progressObserver.complete) { + this._progressObserver.complete(); + } + + this._taskCompletionResolver.resolve(progress); + } + + /** + * Notifies all observers that bundle loading has failed, with a provided + * `Error` as the reason. + */ + _failWith(error: Error): void { + this._lastProgress.taskState = 'Error'; + + if (this._progressObserver.next) { + this._progressObserver.next(this._lastProgress); + } + + if (this._progressObserver.error) { + this._progressObserver.error(error); + } + + this._taskCompletionResolver.reject(error); + } + + /** + * Notifies a progress update of loading a bundle. + * @param progress The new progress. + */ + _updateProgress(progress: firestore.LoadBundleTaskProgress): void { + debugAssert( + this._lastProgress.taskState === 'Running', + 'Cannot update progress on a completed or failed task' + ); + + this._lastProgress = progress; + if (this._progressObserver.next) { + this._progressObserver.next(progress); + } + } +} diff --git a/packages/firestore/src/api/database.ts b/packages/firestore/src/api/database.ts index 248bcbc8e53..a7b4af44453 100644 --- a/packages/firestore/src/api/database.ts +++ b/packages/firestore/src/api/database.ts @@ -493,6 +493,13 @@ export class Firestore implements firestore.FirebaseFirestore, FirebaseService { }; } + loadBundle( + bundleData: ArrayBuffer | ReadableStream | string + ): firestore.LoadBundleTask { + this.ensureClientConfigured(); + return this._firestoreClient!.loadBundle(bundleData); + } + ensureClientConfigured(): FirestoreClient { if (!this._firestoreClient) { // Kick off starting the client but don't actually wait for it. diff --git a/packages/firestore/src/core/bundle.ts b/packages/firestore/src/core/bundle.ts index cb652e8dbb9..55f00ffb39b 100644 --- a/packages/firestore/src/core/bundle.ts +++ b/packages/firestore/src/core/bundle.ts @@ -15,6 +15,7 @@ * limitations under the License. */ +import * as firestore from '@firebase/firestore-types'; import { Query } from './query'; import { SnapshotVersion } from './snapshot_version'; import { @@ -28,6 +29,14 @@ import * as api from '../protos/firestore_proto_api'; import { DocumentKey } from '../model/document_key'; import { MaybeDocument, NoDocument } from '../model/document'; import { debugAssert } from '../util/assert'; +import { + applyBundleDocuments, + LocalStore, + saveNamedQuery +} from '../local/local_store'; +import { SizedBundleElement } from '../util/bundle_reader'; +import { MaybeDocumentMap } from '../model/collections'; +import { BundleMetadata } from '../protos/firestore_bundle_proto'; /** * Represents a Firestore bundle saved by the SDK in its local storage. @@ -58,7 +67,7 @@ export interface NamedQuery { */ interface BundledDocument { metadata: bundleProto.BundledDocumentMetadata; - document: api.Document | undefined; + document?: api.Document; } /** @@ -98,3 +107,127 @@ export class BundleConverter { return fromVersion(time); } } + +/** + * Returns a `LoadBundleTaskProgress` representing the initial progress of + * loading a bundle. + */ +export function bundleInitialProgress( + metadata: BundleMetadata +): firestore.LoadBundleTaskProgress { + return { + taskState: 'Running', + documentsLoaded: 0, + bytesLoaded: 0, + totalDocuments: metadata.totalDocuments!, + totalBytes: metadata.totalBytes! + }; +} + +/** + * Returns a `LoadBundleTaskProgress` representing the progress that the loading + * has succeeded. + */ +export function bundleSuccessProgress( + metadata: BundleMetadata +): firestore.LoadBundleTaskProgress { + return { + taskState: 'Success', + documentsLoaded: metadata.totalDocuments!, + bytesLoaded: metadata.totalBytes!, + totalDocuments: metadata.totalDocuments!, + totalBytes: metadata.totalBytes! + }; +} + +export class BundleLoadResult { + constructor( + readonly progress: firestore.LoadBundleTaskProgress, + readonly changedDocs: MaybeDocumentMap + ) {} +} + +/** + * A class to process the elements from a bundle, load them into local + * storage and provide progress update while loading. + */ +export class BundleLoader { + /** The current progress of loading */ + private progress: firestore.LoadBundleTaskProgress; + /** Batched queries to be saved into storage */ + private queries: bundleProto.NamedQuery[] = []; + /** Batched documents to be saved into storage */ + private documents: BundledDocuments = []; + + constructor( + private metadata: bundleProto.BundleMetadata, + private localStore: LocalStore + ) { + this.progress = bundleInitialProgress(metadata); + } + + /** + * Adds an element from the bundle to the loader. + * + * Returns a new progress if adding the element leads to a new progress, + * otherwise returns null. + */ + addSizedElement( + element: SizedBundleElement + ): firestore.LoadBundleTaskProgress | null { + debugAssert(!element.isBundleMetadata(), 'Unexpected bundle metadata.'); + + this.progress.bytesLoaded += element.byteLength; + + let documentsLoaded = this.progress.documentsLoaded; + + if (element.payload.namedQuery) { + this.queries.push(element.payload.namedQuery); + } else if (element.payload.documentMetadata) { + this.documents.push({ metadata: element.payload.documentMetadata }); + if (!element.payload.documentMetadata.exists) { + ++documentsLoaded; + } + } else if (element.payload.document) { + debugAssert( + this.documents.length > 0 && + this.documents[this.documents.length - 1].metadata.name === + element.payload.document.name, + 'The document being added does not match the stored metadata.' + ); + this.documents[this.documents.length - 1].document = + element.payload.document; + ++documentsLoaded; + } + + if (documentsLoaded !== this.progress.documentsLoaded) { + this.progress.documentsLoaded = documentsLoaded; + return { ...this.progress }; + } + + return null; + } + + /** + * Update the progress to 'Success' and return the updated progress. + */ + async complete(): Promise { + debugAssert( + this.documents[this.documents.length - 1]?.metadata.exists !== true || + !!this.documents[this.documents.length - 1].document, + 'Bundled documents ends with a document metadata and missing document.' + ); + + for (const q of this.queries) { + await saveNamedQuery(this.localStore, q); + } + + const changedDocs = await applyBundleDocuments( + this.localStore, + this.documents + ); + + this.progress.taskState = 'Success'; + return new BundleLoadResult({ ...this.progress }, changedDocs); + } +} diff --git a/packages/firestore/src/core/firestore_client.ts b/packages/firestore/src/core/firestore_client.ts index 1910865521a..533e8bd4a75 100644 --- a/packages/firestore/src/core/firestore_client.ts +++ b/packages/firestore/src/core/firestore_client.ts @@ -15,6 +15,7 @@ * limitations under the License. */ +import * as firestore from '@firebase/firestore-types'; import { CredentialsProvider } from '../api/credentials'; import { User } from '../auth/user'; import { LocalStore } from '../local/local_store'; @@ -26,7 +27,7 @@ import { newDatastore } from '../remote/datastore'; import { RemoteStore } from '../remote/remote_store'; import { AsyncQueue, wrapInUserErrorIfRecoverable } from '../util/async_queue'; import { Code, FirestoreError } from '../util/error'; -import { logDebug } from '../util/log'; +import { logDebug, logWarn } from '../util/log'; import { Deferred } from '../util/promise'; import { EventManager, @@ -34,7 +35,7 @@ import { Observer, QueryListener } from './event_manager'; -import { SyncEngine } from './sync_engine'; +import { SyncEngine, loadBundle } from './sync_engine'; import { View } from './view'; import { SharedClientState } from '../local/shared_client_state'; @@ -47,8 +48,11 @@ import { ComponentProvider, MemoryComponentProvider } from './component_provider'; +import { BundleReader } from '../util/bundle_reader'; +import { LoadBundleTask } from '../api/bundle'; import { newConnection } from '../platform/connection'; import { newSerializer } from '../platform/serializer'; +import { toByteStreamReader } from '../platform/byte_stream_reader'; const LOG_TAG = 'FirestoreClient'; const MAX_CONCURRENT_LIMBO_RESOLUTIONS = 100; @@ -512,4 +516,27 @@ export class FirestoreClient { }); return deferred.promise; } + + loadBundle( + data: ReadableStream | ArrayBuffer | string + ): firestore.LoadBundleTask { + this.verifyNotTerminated(); + + let content: ReadableStream | ArrayBuffer; + if (typeof data === 'string') { + content = new TextEncoder().encode(data); + } else { + content = data; + } + const reader = new BundleReader(toByteStreamReader(content)); + const task = new LoadBundleTask(); + this.asyncQueue.enqueueAndForget(async () => { + loadBundle(this.syncEngine, reader, task); + return task.catch(e => { + logWarn(LOG_TAG, `Loading bundle failed with ${e}`); + }); + }); + + return task; + } } diff --git a/packages/firestore/src/core/sync_engine.ts b/packages/firestore/src/core/sync_engine.ts index a4a289781bb..759af7c2a9b 100644 --- a/packages/firestore/src/core/sync_engine.ts +++ b/packages/firestore/src/core/sync_engine.ts @@ -17,9 +17,11 @@ import { User } from '../auth/user'; import { + hasNewerBundle, ignoreIfPrimaryLeaseLoss, LocalStore, - MultiTabLocalStore + MultiTabLocalStore, + saveBundle } from '../local/local_store'; import { LocalViewChanges } from '../local/local_view_changes'; import { ReferenceSet } from '../local/reference_set'; @@ -36,7 +38,7 @@ import { BATCHID_UNKNOWN, MutationBatchResult } from '../model/mutation_batch'; import { RemoteEvent, TargetChange } from '../remote/remote_event'; import { RemoteStore } from '../remote/remote_store'; import { RemoteSyncer } from '../remote/remote_syncer'; -import { debugAssert, fail, hardAssert } from '../util/assert'; +import { debugAssert, debugCast, fail, hardAssert } from '../util/assert'; import { Code, FirestoreError } from '../util/error'; import { logDebug } from '../util/log'; import { primitiveComparator } from '../util/misc'; @@ -74,7 +76,14 @@ import { import { ViewSnapshot } from './view_snapshot'; import { AsyncQueue, wrapInUserErrorIfRecoverable } from '../util/async_queue'; import { TransactionRunner } from './transaction_runner'; +import { BundleReader } from '../util/bundle_reader'; +import { + BundleLoader, + bundleInitialProgress, + bundleSuccessProgress +} from './bundle'; import { Datastore } from '../remote/datastore'; +import { LoadBundleTask } from '../api/bundle'; const LOG_TAG = 'SyncEngine'; @@ -274,7 +283,7 @@ class SyncEngineImpl implements SyncEngine { private onlineState = OnlineState.Unknown; constructor( - protected localStore: LocalStore, + public localStore: LocalStore, protected remoteStore: RemoteStore, protected datastore: Datastore, // PORTING NOTE: Manages state synchronization in multi-tab environments. @@ -837,7 +846,7 @@ class SyncEngineImpl implements SyncEngine { return this.enqueuedLimboResolutions; } - protected async emitNewSnapsAndNotifyLocalStore( + async emitNewSnapsAndNotifyLocalStore( changes: MaybeDocumentMap, remoteEvent?: RemoteEvent ): Promise { @@ -901,7 +910,7 @@ class SyncEngineImpl implements SyncEngine { await this.localStore.notifyLocalViewChanges(docChangesInAllViews); } - protected assertSubscribed(fnName: string): void { + assertSubscribed(fnName: string): void { debugAssert( this.syncEngineListener !== null, 'Trying to call ' + fnName + ' before calling subscribe().' @@ -1005,7 +1014,7 @@ class MultiTabSyncEngineImpl extends SyncEngineImpl { private _isPrimaryClient: undefined | boolean = undefined; constructor( - protected localStore: MultiTabLocalStore, + public localStore: MultiTabLocalStore, remoteStore: RemoteStore, datastore: Datastore, sharedClientState: SharedClientState, @@ -1371,3 +1380,70 @@ export function newMultiTabSyncEngine( maxConcurrentLimboResolutions ); } + +/** + * Loads a Firestore bundle into the SDK. The returned promise resolves when + * the bundle finished loading. + * + * @param bundleReader Bundle to load into the SDK. + * @param task LoadBundleTask used to update the loading progress to public API. + */ +export function loadBundle( + syncEngine: SyncEngine, + bundleReader: BundleReader, + task: LoadBundleTask +): void { + const syncEngineImpl = debugCast(syncEngine, SyncEngineImpl); + syncEngineImpl.assertSubscribed('loadBundle()'); + + // eslint-disable-next-line @typescript-eslint/no-floating-promises + loadBundleImpl(syncEngineImpl, bundleReader, task); +} + +async function loadBundleImpl( + syncEngine: SyncEngineImpl, + reader: BundleReader, + task: LoadBundleTask +): Promise { + try { + const metadata = await reader.getMetadata(); + const skip = await hasNewerBundle(syncEngine.localStore, metadata); + if (skip) { + await reader.close(); + task._completeWith(bundleSuccessProgress(metadata)); + return; + } + + task._updateProgress(bundleInitialProgress(metadata)); + + const loader = new BundleLoader(metadata, syncEngine.localStore); + let element = await reader.nextElement(); + while (element) { + debugAssert( + !element.payload.metadata, + 'Unexpected BundleMetadata element.' + ); + const progress = await loader.addSizedElement(element); + if (progress) { + task._updateProgress(progress); + } + + element = await reader.nextElement(); + } + + const result = await loader.complete(); + // TODO(b/160876443): This currently raises snapshots with + // `fromCache=false` if users already listen to some queries and bundles + // has newer version. + await syncEngine.emitNewSnapsAndNotifyLocalStore( + result.changedDocs, + /* remoteEvent */ undefined + ); + + // Save metadata, so loading the same bundle will skip. + await saveBundle(syncEngine.localStore, metadata); + task._completeWith(result.progress); + } catch (e) { + task._failWith(e); + } +} diff --git a/packages/firestore/src/local/local_store.ts b/packages/firestore/src/local/local_store.ts index 5bc3217edb0..9e889165111 100644 --- a/packages/firestore/src/local/local_store.ts +++ b/packages/firestore/src/local/local_store.ts @@ -1296,7 +1296,7 @@ export function applyBundleDocuments( }); return localStoreImpl.persistence.runTransaction( 'Apply bundle documents', - 'readwrite-primary', + 'readwrite', txn => { return localStoreImpl .populateDocumentChangeBuffer( @@ -1341,7 +1341,7 @@ export function hasNewerBundle( ); }) .then(cached => { - return !!cached && cached.createTime!.compareTo(currentReadTime) > 0; + return !!cached && cached.createTime!.compareTo(currentReadTime) >= 0; }); } diff --git a/packages/firestore/src/platform/byte_stream_reader.ts b/packages/firestore/src/platform/byte_stream_reader.ts index 8665a97e79b..49aac60e9ff 100644 --- a/packages/firestore/src/platform/byte_stream_reader.ts +++ b/packages/firestore/src/platform/byte_stream_reader.ts @@ -20,10 +20,11 @@ import * as node from './node/byte_stream_reader'; import * as rn from './rn/byte_stream_reader'; import * as browser from './browser/byte_stream_reader'; import { isNode, isReactNative } from '@firebase/util'; +import { DEFAULT_BYTES_PER_READ } from '../util/byte_stream'; export function toByteStreamReader( source: BundleSource, - bytesPerRead: number + bytesPerRead: number = DEFAULT_BYTES_PER_READ ): ReadableStreamReader { if (isNode()) { return node.toByteStreamReader(source, bytesPerRead); diff --git a/packages/firestore/src/util/bundle_reader.ts b/packages/firestore/src/util/bundle_reader.ts index 4d46864ae08..e426fa0c825 100644 --- a/packages/firestore/src/util/bundle_reader.ts +++ b/packages/firestore/src/util/bundle_reader.ts @@ -93,6 +93,10 @@ export class BundleReader { ); } + close(): Promise { + return this.reader.cancel(); + } + /** * Returns the metadata of the bundle. */ diff --git a/packages/firestore/src/util/byte_stream.ts b/packages/firestore/src/util/byte_stream.ts index 3434e410270..a2723a82528 100644 --- a/packages/firestore/src/util/byte_stream.ts +++ b/packages/firestore/src/util/byte_stream.ts @@ -17,6 +17,13 @@ import { debugAssert } from './assert'; +/** + * How many bytes to read each time when `ReadableStreamReader.read()` is + * called. Only applicable for byte streams that we control (e.g. those backed + * by an UInt8Array). + */ +export const DEFAULT_BYTES_PER_READ = 10240; + /** * Builds a `ByteStreamReader` from a UInt8Array. * @param source The data source to use. @@ -25,7 +32,7 @@ import { debugAssert } from './assert'; */ export function toByteStreamReaderHelper( source: Uint8Array, - bytesPerRead: number + bytesPerRead: number = DEFAULT_BYTES_PER_READ ): ReadableStreamReader { debugAssert( bytesPerRead > 0, diff --git a/packages/firestore/test/integration/api_internal/bundle.test.ts b/packages/firestore/test/integration/api_internal/bundle.test.ts new file mode 100644 index 00000000000..b0e18b5b1cf --- /dev/null +++ b/packages/firestore/test/integration/api_internal/bundle.test.ts @@ -0,0 +1,241 @@ +/** + * @license + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as firestore from '@firebase/firestore-types'; +import { expect } from 'chai'; +import { + apiDescribe, + toDataArray, + withAlternateTestDb, + withTestDb +} from '../util/helpers'; +import { DatabaseId } from '../../../src/core/database_info'; +import { key } from '../../util/helpers'; +import { EventsAccumulator } from '../util/events_accumulator'; +import { TestBundleBuilder } from '../../unit/util/bundle_data'; + +function verifySuccessProgress(p: firestore.LoadBundleTaskProgress): void { + expect(p.taskState).to.equal('Success'); + expect(p.bytesLoaded).to.be.equal(p.totalBytes); + expect(p.documentsLoaded).to.equal(p.totalDocuments); +} + +function verifyInProgress( + p: firestore.LoadBundleTaskProgress, + expectedDocuments: number +): void { + expect(p.taskState).to.equal('Running'); + expect(p.bytesLoaded <= p.totalBytes).to.be.true; + expect(p.documentsLoaded <= p.totalDocuments).to.be.true; + expect(p.documentsLoaded).to.equal(expectedDocuments); +} + +apiDescribe('Bundles', (persistence: boolean) => { + const encoder = new TextEncoder(); + const testDocs: { [key: string]: firestore.DocumentData } = { + a: { k: { stringValue: 'a' }, bar: { integerValue: 1 } }, + b: { k: { stringValue: 'b' }, bar: { integerValue: 2 } } + }; + + function bundleWithTestDocs( + db: firestore.FirebaseFirestore + ): TestBundleBuilder { + const a = key('coll-1/a'); + const b = key('coll-1/b'); + const builder = new TestBundleBuilder( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (db as any)._databaseId as DatabaseId + ); + builder.addDocumentMetadata(a, { seconds: 1000, nanos: 9999 }, true); + builder.addDocument( + a, + { seconds: 1, nanos: 9 }, + { seconds: 1, nanos: 9 }, + testDocs.a + ); + builder.addDocumentMetadata(b, { seconds: 1000, nanos: 9999 }, true); + builder.addDocument( + b, + { seconds: 1, nanos: 9 }, + { seconds: 1, nanos: 9 }, + testDocs.b + ); + + return builder; + } + + function verifySnapEqualTestDocs(snap: firestore.QuerySnapshot): void { + expect(toDataArray(snap)).to.deep.equal([ + { k: 'a', bar: 1 }, + { k: 'b', bar: 2 } + ]); + } + + it('load with documents only with on progress and promise interface', () => { + return withTestDb(persistence, async db => { + const builder = bundleWithTestDocs(db); + + const progressEvents: firestore.LoadBundleTaskProgress[] = []; + let completeCalled = false; + const task = db.loadBundle( + builder.build('test-bundle', { seconds: 1001, nanos: 9999 }) + ); + task.onProgress( + progress => { + progressEvents.push(progress); + }, + undefined, + () => { + completeCalled = true; + } + ); + await task; + let fulfillProgress: firestore.LoadBundleTaskProgress; + await task.then(progress => { + fulfillProgress = progress; + }); + + expect(completeCalled).to.be.true; + expect(progressEvents.length).to.equal(4); + verifyInProgress(progressEvents[0], 0); + verifyInProgress(progressEvents[1], 1); + verifyInProgress(progressEvents[2], 2); + verifySuccessProgress(progressEvents[3]); + expect(fulfillProgress!).to.deep.equal(progressEvents[3]); + + // Read from cache. These documents do not exist in backend, so they can + // only be read from cache. + const snap = await db.collection('coll-1').get({ source: 'cache' }); + verifySnapEqualTestDocs(snap); + }); + }); + + it('load with documents with promise interface', () => { + return withTestDb(persistence, async db => { + const builder = bundleWithTestDocs(db); + + const fulfillProgress: firestore.LoadBundleTaskProgress = await db.loadBundle( + builder.build('test-bundle', { seconds: 1001, nanos: 9999 }) + ); + + verifySuccessProgress(fulfillProgress!); + + // Read from cache. These documents do not exist in backend, so they can + // only be read from cache. + const snap = await db.collection('coll-1').get({ source: 'cache' }); + verifySnapEqualTestDocs(snap); + }); + }); + + it('load for a second time skips', () => { + return withTestDb(persistence, async db => { + const builder = bundleWithTestDocs(db); + + await db.loadBundle( + builder.build('test-bundle', { seconds: 1001, nanos: 9999 }) + ); + + let completeCalled = false; + const progressEvents: firestore.LoadBundleTaskProgress[] = []; + const task = db.loadBundle( + encoder.encode( + builder.build('test-bundle', { seconds: 1001, nanos: 9999 }) + ) + ); + task.onProgress( + progress => { + progressEvents.push(progress); + }, + error => {}, + () => { + completeCalled = true; + } + ); + await task; + + expect(completeCalled).to.be.true; + // No loading actually happened in the second `loadBundle` call only the + // success progress is recorded. + expect(progressEvents.length).to.equal(1); + verifySuccessProgress(progressEvents[0]); + + // Read from cache. These documents do not exist in backend, so they can + // only be read from cache. + const snap = await db.collection('coll-1').get({ source: 'cache' }); + verifySnapEqualTestDocs(snap); + }); + }); + + it('load with documents already pulled from backend', () => { + return withTestDb(persistence, async db => { + await db.doc('coll-1/a').set({ k: 'a', bar: 0 }); + await db.doc('coll-1/b').set({ k: 'b', bar: 0 }); + + const accumulator = new EventsAccumulator(); + db.collection('coll-1').onSnapshot(accumulator.storeEvent); + await accumulator.awaitEvent(); + + const builder = bundleWithTestDocs(db); + const progress = await db.loadBundle( + // Testing passing in non-string bundles. + encoder.encode( + builder.build('test-bundle', { seconds: 1001, nanos: 9999 }) + ) + ); + + verifySuccessProgress(progress); + // The test bundle is holding ancient documents, so no events are + // generated as a result. The case where a bundle has newer doc than + // cache can only be tested in spec tests. + await accumulator.assertNoAdditionalEvents(); + + const snap = await db.collection('coll-1').get(); + expect(toDataArray(snap)).to.deep.equal([ + { k: 'a', bar: 0 }, + { k: 'b', bar: 0 } + ]); + }); + }); + + it('load with documents from other projects fails', () => { + return withTestDb(persistence, async db => { + let builder = bundleWithTestDocs(db); + return withAlternateTestDb(persistence, async otherDb => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + await expect( + otherDb.loadBundle( + builder.build('test-bundle', { seconds: 1001, nanos: 9999 }) + ) + ).to.be.rejectedWith('Tried to deserialize key from different project'); + + // Verify otherDb still functions, despite loaded a problematic bundle. + builder = bundleWithTestDocs(otherDb); + const finalProgress = await otherDb.loadBundle( + builder.build('test-bundle', { seconds: 1001, nanos: 9999 }) + ); + verifySuccessProgress(finalProgress); + + // Read from cache. These documents do not exist in backend, so they can + // only be read from cache. + const snap = await otherDb + .collection('coll-1') + .get({ source: 'cache' }); + verifySnapEqualTestDocs(snap); + }); + }); + }); +}); diff --git a/packages/firestore/test/unit/specs/bundle_spec.test.ts b/packages/firestore/test/unit/specs/bundle_spec.test.ts new file mode 100644 index 00000000000..fa7b08d8c45 --- /dev/null +++ b/packages/firestore/test/unit/specs/bundle_spec.test.ts @@ -0,0 +1,373 @@ +/** + * @license + * Copyright 2017 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Query } from '../../../src/core/query'; +import { + doc, + path, + TestSnapshotVersion, + version, + wrapObject +} from '../../util/helpers'; + +import { describeSpec, specTest } from './describe_spec'; +import { client, spec } from './spec_builder'; +import { TestBundleBuilder } from '../util/bundle_data'; +import { + JSON_SERIALIZER, + TEST_DATABASE_ID +} from '../local/persistence_test_helpers'; +import { DocumentKey } from '../../../src/model/document_key'; +import { toVersion } from '../../../src/remote/serializer'; +import { JsonObject } from '../../../src/model/object_value'; + +interface TestBundleDocument { + key: DocumentKey; + readTime: TestSnapshotVersion; + createTime?: TestSnapshotVersion; + updateTime?: TestSnapshotVersion; + content?: JsonObject; +} + +function bundleWithDocument(testDoc: TestBundleDocument): string { + const builder = new TestBundleBuilder(TEST_DATABASE_ID); + builder.addDocumentMetadata( + testDoc.key, + toVersion(JSON_SERIALIZER, version(testDoc.readTime)), + !!testDoc.createTime + ); + if (testDoc.createTime) { + builder.addDocument( + testDoc.key, + toVersion(JSON_SERIALIZER, version(testDoc.createTime)), + toVersion(JSON_SERIALIZER, version(testDoc.updateTime!)), + wrapObject(testDoc.content!).proto.mapValue.fields! + ); + } + return builder.build( + 'test-bundle', + toVersion(JSON_SERIALIZER, version(testDoc.readTime)) + ); +} + +describeSpec('Bundles:', ['no-ios', 'no-android'], () => { + specTest('Newer docs from bundles should overwrite cache', [], () => { + const query1 = Query.atPath(path('collection')); + const docA = doc('collection/a', 1000, { value: 'a' }); + const docAChanged = doc('collection/a', 2999, { value: 'b' }); + + const bundleString = bundleWithDocument({ + key: docA.key, + readTime: 3000, + createTime: 1999, + updateTime: 2999, + content: { value: 'b' } + }); + + return ( + spec() + .userListens(query1) + .watchAcksFull(query1, 1000, docA) + .expectEvents(query1, { added: [docA] }) + // TODO(b/160876443): This currently raises snapshots with + // `fromCache=false` if users already listen to some queries and bundles + // has newer version. + .loadBundle(bundleString) + .expectEvents(query1, { modified: [docAChanged] }) + ); + }); + + specTest( + 'Newer deleted docs from bundles should delete cached docs', + [], + () => { + const query1 = Query.atPath(path('collection')); + const docA = doc('collection/a', 1000, { value: 'a' }); + + const bundleString = bundleWithDocument({ + key: docA.key, + readTime: 3000 + }); + + return spec() + .userListens(query1) + .watchAcksFull(query1, 1000, docA) + .expectEvents(query1, { added: [docA] }) + .loadBundle(bundleString) + .expectEvents(query1, { removed: [docA] }); + } + ); + + specTest('Older deleted docs from bundles should do nothing', [], () => { + const query1 = Query.atPath(path('collection')); + const docA = doc('collection/a', 1000, { value: 'a' }); + + const bundleString = bundleWithDocument({ + key: docA.key, + readTime: 999 + }); + + return ( + spec() + .userListens(query1) + .watchAcksFull(query1, 1000, docA) + .expectEvents(query1, { added: [docA] }) + // No events are expected here. + .loadBundle(bundleString) + ); + }); + + specTest( + 'Newer docs from bundles should raise snapshot only when Watch catches up with acknowledged writes', + [], + () => { + const query = Query.atPath(path('collection')); + const docA = doc('collection/a', 250, { value: 'a' }); + + const bundleBeforeMutationAck = bundleWithDocument({ + key: docA.key, + readTime: 500, + createTime: 250, + updateTime: 500, + content: { value: 'b' } + }); + + const bundleAfterMutationAck = bundleWithDocument({ + key: docA.key, + readTime: 1001, + createTime: 250, + updateTime: 1001, + content: { value: 'fromBundle' } + }); + return ( + spec() + // TODO(b/160878667): Figure out what happens when memory eager GC is on + // a bundle is loaded. + .withGCEnabled(false) + .userListens(query) + .watchAcksFull(query, 250, docA) + .expectEvents(query, { + added: [doc('collection/a', 250, { value: 'a' })] + }) + .userPatches('collection/a', { value: 'patched' }) + .expectEvents(query, { + modified: [ + doc( + 'collection/a', + 250, + { value: 'patched' }, + { hasLocalMutations: true } + ) + ], + hasPendingWrites: true + }) + .writeAcks('collection/a', 1000) + // loading bundleBeforeMutationAck will not raise snapshots, because its + // snapshot version is older than the acknowledged mutation. + .loadBundle(bundleBeforeMutationAck) + // loading bundleAfterMutationAck will raise a snapshot, because it is after + // the acknowledged mutation. + .loadBundle(bundleAfterMutationAck) + .expectEvents(query, { + modified: [doc('collection/a', 1001, { value: 'fromBundle' })] + }) + ); + } + ); + + specTest( + 'Newer docs from bundles should keep not raise snapshot if there are unacknowledged writes', + [], + () => { + const query = Query.atPath(path('collection')); + const docA = doc('collection/a', 250, { value: 'a' }); + + const bundleString = bundleWithDocument({ + key: docA.key, + readTime: 1001, + createTime: 250, + updateTime: 1001, + content: { value: 'fromBundle' } + }); + + return ( + spec() + .withGCEnabled(false) + .userListens(query) + .watchAcksFull(query, 250, docA) + .expectEvents(query, { + added: [doc('collection/a', 250, { value: 'a' })] + }) + .userPatches('collection/a', { value: 'patched' }) + .expectEvents(query, { + modified: [ + doc( + 'collection/a', + 250, + { value: 'patched' }, + { hasLocalMutations: true } + ) + ], + hasPendingWrites: true + }) + // Loading the bundle will not raise snapshots, because the + // mutation has not been acknowledged. + .loadBundle(bundleString) + ); + } + ); + + specTest('Newer docs from bundles might lead to limbo doc', [], () => { + const query = Query.atPath(path('collection')); + const docA = doc('collection/a', 1000, { value: 'a' }); + const bundleString1 = bundleWithDocument({ + key: docA.key, + readTime: 500, + createTime: 250, + updateTime: 500, + content: { value: 'b' } + }); + const limboQuery = Query.atPath(docA.key.path); + + return ( + spec() + .withGCEnabled(false) + .userListens(query) + .watchAcksFull(query, 250) + // Backend tells is there is no such doc. + .expectEvents(query, {}) + // Bundle tells otherwise, leads to limbo. + .loadBundle(bundleString1) + .expectLimboDocs(docA.key) + .expectEvents(query, { + added: [doc('collection/a', 500, { value: 'b' })], + fromCache: true + }) + // .watchAcksFull(limboQuery, 1002, docA1) + .watchAcks(limboQuery) + .watchSends({ affects: [limboQuery] }) + .watchCurrents(limboQuery, 'resume-token-1002') + .watchSnapshots(1002) + .expectLimboDocs() + .expectEvents(query, { + removed: [doc('collection/a', 500, { value: 'b' })], + fromCache: false + }) + ); + }); + + specTest( + 'Load from secondary clients and observe from primary', + ['multi-client'], + () => { + const query = Query.atPath(path('collection')); + const docA = doc('collection/a', 250, { value: 'a' }); + const bundleString1 = bundleWithDocument({ + key: docA.key, + readTime: 500, + createTime: 250, + updateTime: 500, + content: { value: 'b' } + }); + + return client(0) + .userListens(query) + .watchAcksFull(query, 250, docA) + .expectEvents(query, { + added: [docA] + }) + .client(1) + .loadBundle(bundleString1) + .client(0) + .becomeVisible(); + // TODO(wuandy): Loading from secondary client does not notify other + // clients for now. We need to fix it and uncomment below. + // .expectEvents(query, { + // modified: [doc('collection/a', 500, { value: 'b' })], + // }) + } + ); + + specTest( + 'Load and observe from same secondary client', + ['multi-client'], + () => { + const query = Query.atPath(path('collection')); + const docA = doc('collection/a', 250, { value: 'a' }); + const bundleString1 = bundleWithDocument({ + key: docA.key, + readTime: 500, + createTime: 250, + updateTime: 500, + content: { value: 'b' } + }); + + return client(0) + .userListens(query) + .watchAcksFull(query, 250, docA) + .expectEvents(query, { + added: [docA] + }) + .client(1) + .userListens(query) + .expectEvents(query, { + added: [docA] + }) + .loadBundle(bundleString1) + .expectEvents(query, { + modified: [doc('collection/a', 500, { value: 'b' })] + }); + } + ); + + specTest( + 'Load from primary client and observe from secondary', + ['multi-client'], + () => { + const query = Query.atPath(path('collection')); + const docA = doc('collection/a', 250, { value: 'a' }); + const bundleString1 = bundleWithDocument({ + key: docA.key, + readTime: 500, + createTime: 250, + updateTime: 500, + content: { value: 'b' } + }); + + return client(0) + .userListens(query) + .watchAcksFull(query, 250, docA) + .expectEvents(query, { + added: [docA] + }) + .client(1) + .userListens(query) + .expectEvents(query, { + added: [docA] + }) + .client(0) + .loadBundle(bundleString1) + .expectEvents(query, { + modified: [doc('collection/a', 500, { value: 'b' })] + }) + .client(1) + .expectEvents(query, { + modified: [doc('collection/a', 500, { value: 'b' })] + }); + } + ); +}); diff --git a/packages/firestore/test/unit/specs/spec_builder.ts b/packages/firestore/test/unit/specs/spec_builder.ts index af29c65f2dd..1f46c387174 100644 --- a/packages/firestore/test/unit/specs/spec_builder.ts +++ b/packages/firestore/test/unit/specs/spec_builder.ts @@ -354,6 +354,14 @@ export class SpecBuilder { return this; } + loadBundle(bundleContent: string): this { + this.nextStep(); + this.currentStep = { + loadBundle: bundleContent + }; + return this; + } + // PORTING NOTE: Only used by web multi-tab tests. becomeHidden(): this { this.nextStep(); diff --git a/packages/firestore/test/unit/specs/spec_test_runner.ts b/packages/firestore/test/unit/specs/spec_test_runner.ts index 9317443718b..af3780bf265 100644 --- a/packages/firestore/test/unit/specs/spec_test_runner.ts +++ b/packages/firestore/test/unit/specs/spec_test_runner.ts @@ -27,7 +27,7 @@ import { } from '../../../src/core/event_manager'; import { Query } from '../../../src/core/query'; import { SnapshotVersion } from '../../../src/core/snapshot_version'; -import { SyncEngine } from '../../../src/core/sync_engine'; +import { loadBundle, SyncEngine } from '../../../src/core/sync_engine'; import { TargetId } from '../../../src/core/types'; import { ChangeType, @@ -116,12 +116,16 @@ import { SharedWriteTracker } from './spec_test_components'; import { IndexedDbPersistence } from '../../../src/local/indexeddb_persistence'; +import { BundleReader } from '../../../src/util/bundle_reader'; +import { LoadBundleTask } from '../../../src/api/bundle'; import { encodeBase64 } from '../../../src/platform/base64'; import { FakeDocument, SharedFakeWebStorage, testWindow } from '../../util/test_platform'; +import { toByteStreamReader } from '../../../src/platform/byte_stream_reader'; +import { logWarn } from '../../../src/util/log'; const ARBITRARY_SEQUENCE_NUMBER = 2; @@ -311,6 +315,8 @@ abstract class TestRunner { return this.doAddSnapshotsInSyncListener(); } else if ('removeSnapshotsInSyncListener' in step) { return this.doRemoveSnapshotsInSyncListener(); + } else if ('loadBundle' in step) { + return this.doLoadBundle(step.loadBundle!); } else if ('watchAck' in step) { return this.doWatchAck(step.watchAck!); } else if ('watchCurrent' in step) { @@ -448,6 +454,19 @@ abstract class TestRunner { return Promise.resolve(); } + private async doLoadBundle(bundle: string): Promise { + const reader = new BundleReader( + toByteStreamReader(new TextEncoder().encode(bundle)) + ); + const task = new LoadBundleTask(); + return this.queue.enqueue(async () => { + loadBundle(this.syncEngine, reader, task); + await task.catch(e => { + logWarn(`Loading bundle failed with ${e}`); + }); + }); + } + private doMutations(mutations: Mutation[]): Promise { const documentKeys = mutations.map(val => val.key.path.toString()); const syncEngineCallback = new Deferred(); @@ -1252,6 +1271,8 @@ export interface SpecStep { addSnapshotsInSyncListener?: true; /** Unlistens from a SnapshotsInSync event. */ removeSnapshotsInSyncListener?: true; + /** Loads a bundle from a string. */ + loadBundle?: string; /** Ack for a query in the watch stream */ watchAck?: SpecWatchAck; diff --git a/packages/firestore/test/unit/util/bundle.test.ts b/packages/firestore/test/unit/util/bundle.test.ts index 172df106761..9d6527b5fc2 100644 --- a/packages/firestore/test/unit/util/bundle.test.ts +++ b/packages/firestore/test/unit/util/bundle.test.ts @@ -20,8 +20,25 @@ import { BundleReader, SizedBundleElement } from '../../../src/util/bundle_reader'; -import { BundleElement } from '../../../src/protos/firestore_bundle_proto'; import { toByteStreamReader } from '../../../src/platform/byte_stream_reader'; +import { + doc1String, + doc1MetaString, + doc1Meta, + noDocMetaString, + noDocMeta, + doc2MetaString, + doc2Meta, + limitQueryString, + limitQuery, + limitToLastQuery, + limitToLastQueryString, + meta, + metaString, + doc2String, + doc1, + doc2 +} from './bundle_data'; use(chaiAsPromised); @@ -40,12 +57,6 @@ export function byteStreamReaderFromString( return toByteStreamReader(data, bytesPerRead); } -function lengthPrefixedString(o: {}): string { - const str = JSON.stringify(o); - const l = new TextEncoder().encode(str).byteLength; - return `${l}${str}`; -} - // Testing readableStreamFromString() is working as expected. describe('byteStreamReaderFromString()', () => { it('returns a reader stepping readable stream', async () => { @@ -83,100 +94,6 @@ function genericBundleReadingTests(bytesPerRead: number): void { } const encoder = new TextEncoder(); - // Setting up test data. - const meta: BundleElement = { - metadata: { - id: 'test-bundle', - createTime: { seconds: 1577836805, nanos: 6 }, - version: 1, - totalDocuments: 1, - totalBytes: 416 - } - }; - const metaString = lengthPrefixedString(meta); - - const doc1Meta: BundleElement = { - documentMetadata: { - name: - 'projects/test-project/databases/(default)/documents/collectionId/doc1', - readTime: { seconds: 5, nanos: 6 }, - exists: true - } - }; - const doc1MetaString = lengthPrefixedString(doc1Meta); - const doc1: BundleElement = { - document: { - name: - 'projects/test-project/databases/(default)/documents/collectionId/doc1', - createTime: { seconds: 1, nanos: 2000000 }, - updateTime: { seconds: 3, nanos: 4000 }, - fields: { foo: { stringValue: 'value' }, bar: { integerValue: -42 } } - } - }; - const doc1String = lengthPrefixedString(doc1); - - const doc2Meta: BundleElement = { - documentMetadata: { - name: - 'projects/test-project/databases/(default)/documents/collectionId/doc2', - readTime: { seconds: 5, nanos: 6 }, - exists: true - } - }; - const doc2MetaString = lengthPrefixedString(doc2Meta); - const doc2: BundleElement = { - document: { - name: - 'projects/test-project/databases/(default)/documents/collectionId/doc2', - createTime: { seconds: 1, nanos: 2000000 }, - updateTime: { seconds: 3, nanos: 4000 }, - fields: { foo: { stringValue: 'value1' }, bar: { integerValue: 42 } } - } - }; - const doc2String = lengthPrefixedString(doc2); - - const noDocMeta: BundleElement = { - documentMetadata: { - name: - 'projects/test-project/databases/(default)/documents/collectionId/nodoc', - readTime: { seconds: 5, nanos: 6 }, - exists: false - } - }; - const noDocMetaString = lengthPrefixedString(noDocMeta); - - const limitQuery: BundleElement = { - namedQuery: { - name: 'limitQuery', - bundledQuery: { - parent: 'projects/fireeats-97d5e/databases/(default)/documents', - structuredQuery: { - from: [{ collectionId: 'node_3.7.5_7Li7XoCjutvNxwD0tpo9' }], - orderBy: [{ field: { fieldPath: 'sort' }, direction: 'DESCENDING' }], - limit: { 'value': 1 } - }, - limitType: 'FIRST' - }, - readTime: { 'seconds': 1590011379, 'nanos': 191164000 } - } - }; - const limitQueryString = lengthPrefixedString(limitQuery); - const limitToLastQuery: BundleElement = { - namedQuery: { - name: 'limitToLastQuery', - bundledQuery: { - parent: 'projects/fireeats-97d5e/databases/(default)/documents', - structuredQuery: { - from: [{ collectionId: 'node_3.7.5_7Li7XoCjutvNxwD0tpo9' }], - orderBy: [{ field: { fieldPath: 'sort' }, direction: 'ASCENDING' }], - limit: { 'value': 1 } - }, - limitType: 'LAST' - }, - readTime: { 'seconds': 1590011379, 'nanos': 543063000 } - } - }; - const limitToLastQueryString = lengthPrefixedString(limitToLastQuery); async function getAllElements( bundle: BundleReader diff --git a/packages/firestore/test/unit/util/bundle_data.ts b/packages/firestore/test/unit/util/bundle_data.ts new file mode 100644 index 00000000000..3984c2be8d3 --- /dev/null +++ b/packages/firestore/test/unit/util/bundle_data.ts @@ -0,0 +1,213 @@ +/** + * @license + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { + BundledQuery, + BundleElement +} from '../../../src/protos/firestore_bundle_proto'; +import { DatabaseId } from '../../../src/core/database_info'; +import * as api from '../../../src/protos/firestore_proto_api'; +import { Value } from '../../../src/protos/firestore_proto_api'; +import { JsonProtoSerializer, toName } from '../../../src/remote/serializer'; +import { DocumentKey } from '../../../src/model/document_key'; +import { newSerializer } from '../../../src/platform/serializer'; + +function lengthPrefixedString(o: {}): string { + const str = JSON.stringify(o); + const l = new TextEncoder().encode(str).byteLength; + return `${l}${str}`; +} + +export class TestBundleBuilder { + readonly elements: BundleElement[] = []; + private serializer: JsonProtoSerializer; + constructor(private databaseId: DatabaseId) { + this.serializer = newSerializer(databaseId); + } + + addDocumentMetadata( + docKey: DocumentKey, + readTime: api.Timestamp, + exists: boolean + ): TestBundleBuilder { + this.elements.push({ + documentMetadata: { + name: toName(this.serializer, docKey), + readTime, + exists + } + }); + return this; + } + addDocument( + docKey: DocumentKey, + createTime: api.Timestamp, + updateTime: api.Timestamp, + fields: api.ApiClientObjectMap + ): TestBundleBuilder { + this.elements.push({ + document: { + name: toName(this.serializer, docKey), + createTime, + updateTime, + fields + } + }); + return this; + } + addNamedQuery( + name: string, + readTime: api.Timestamp, + bundledQuery: BundledQuery + ): TestBundleBuilder { + this.elements.push({ namedQuery: { name, readTime, bundledQuery } }); + return this; + } + getMetadataElement( + id: string, + createTime: api.Timestamp, + version = 1 + ): BundleElement { + let totalDocuments = 0; + let totalBytes = 0; + const encoder = new TextEncoder(); + for (const element of this.elements) { + if (element.documentMetadata && !element.documentMetadata.exists) { + totalDocuments += 1; + } + if (element.document) { + totalDocuments += 1; + } + totalBytes += encoder.encode(lengthPrefixedString(element)).byteLength; + } + + return { + metadata: { + id, + createTime, + version, + totalDocuments, + totalBytes + } + }; + } + + build(id: string, createTime: api.Timestamp, version = 1): string { + let result = ''; + for (const element of this.elements) { + result += lengthPrefixedString(element); + } + return ( + lengthPrefixedString(this.getMetadataElement(id, createTime, version)) + + result + ); + } +} + +// TODO(wuandy): Ideally, these should use `TestBundleBuilder` above. +export const meta: BundleElement = { + metadata: { + id: 'test-bundle', + createTime: { seconds: 1577836805, nanos: 6 }, + version: 1, + totalDocuments: 1, + totalBytes: 416 + } +}; +export const metaString = lengthPrefixedString(meta); + +export const doc1Meta: BundleElement = { + documentMetadata: { + name: + 'projects/test-project/databases/(default)/documents/collectionId/doc1', + readTime: { seconds: 5, nanos: 6 }, + exists: true + } +}; +export const doc1MetaString = lengthPrefixedString(doc1Meta); +export const doc1: BundleElement = { + document: { + name: + 'projects/test-project/databases/(default)/documents/collectionId/doc1', + createTime: { seconds: 1, nanos: 2000000 }, + updateTime: { seconds: 3, nanos: 4000 }, + fields: { foo: { stringValue: 'value' }, bar: { integerValue: -42 } } + } +}; +export const doc1String = lengthPrefixedString(doc1); + +export const doc2Meta: BundleElement = { + documentMetadata: { + name: + 'projects/test-project/databases/(default)/documents/collectionId/doc2', + readTime: { seconds: 5, nanos: 6 }, + exists: true + } +}; +export const doc2MetaString = lengthPrefixedString(doc2Meta); +export const doc2: BundleElement = { + document: { + name: + 'projects/test-project/databases/(default)/documents/collectionId/doc2', + createTime: { seconds: 1, nanos: 2000000 }, + updateTime: { seconds: 3, nanos: 4000 }, + fields: { foo: { stringValue: 'value1' }, bar: { integerValue: 42 } } + } +}; +export const doc2String = lengthPrefixedString(doc2); + +export const noDocMeta: BundleElement = { + documentMetadata: { + name: + 'projects/test-project/databases/(default)/documents/collectionId/nodoc', + readTime: { seconds: 5, nanos: 6 }, + exists: false + } +}; +export const noDocMetaString = lengthPrefixedString(noDocMeta); + +export const limitQuery: BundleElement = { + namedQuery: { + name: 'limitQuery', + bundledQuery: { + parent: 'projects/fireeats-97d5e/databases/(default)/documents', + structuredQuery: { + from: [{ collectionId: 'node_3.7.5_7Li7XoCjutvNxwD0tpo9' }], + orderBy: [{ field: { fieldPath: 'sort' }, direction: 'DESCENDING' }], + limit: { 'value': 1 } + }, + limitType: 'FIRST' + }, + readTime: { 'seconds': 1590011379, 'nanos': 191164000 } + } +}; +export const limitQueryString = lengthPrefixedString(limitQuery); +export const limitToLastQuery: BundleElement = { + namedQuery: { + name: 'limitToLastQuery', + bundledQuery: { + parent: 'projects/fireeats-97d5e/databases/(default)/documents', + structuredQuery: { + from: [{ collectionId: 'node_3.7.5_7Li7XoCjutvNxwD0tpo9' }], + orderBy: [{ field: { fieldPath: 'sort' }, direction: 'ASCENDING' }], + limit: { 'value': 1 } + }, + limitType: 'LAST' + }, + readTime: { 'seconds': 1590011379, 'nanos': 543063000 } + } +}; +export const limitToLastQueryString = lengthPrefixedString(limitToLastQuery);