diff --git a/packages/firestore/src/protos/firestore_proto_api.d.ts b/packages/firestore/src/protos/firestore_proto_api.d.ts index 3b3c8c48a5d..a85bc282bca 100644 --- a/packages/firestore/src/protos/firestore_proto_api.d.ts +++ b/packages/firestore/src/protos/firestore_proto_api.d.ts @@ -184,7 +184,7 @@ export declare namespace firestoreV1ApiClientInterfaces { interface Document { name?: string; fields?: ApiClientObjectMap; - createTime?: string; + createTime?: Timestamp; updateTime?: Timestamp; } interface DocumentChange { diff --git a/packages/firestore/src/util/bundle_reader.ts b/packages/firestore/src/util/bundle_reader.ts new file mode 100644 index 00000000000..c8e7435953b --- /dev/null +++ b/packages/firestore/src/util/bundle_reader.ts @@ -0,0 +1,243 @@ +/** + * @license + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { + BundleElement, + BundleMetadata +} from '../protos/firestore_bundle_proto'; +import { Deferred } from './promise'; + +/** + * A complete element in the bundle stream, together with the byte length it + * occupies in the stream. + */ +export class SizedBundleElement { + constructor( + public readonly payload: BundleElement, + // How many bytes this element takes to store in the bundle. + public readonly byteLength: number + ) {} + + isBundleMetadata(): boolean { + return 'metadata' in this.payload; + } +} + +/** + * Create a `ReadableStream` from a underlying buffer. + * + * @param data: Underlying buffer. + * @param bytesPerRead: How many bytes to read from the underlying buffer from + * each read through the stream. + */ +export function toReadableStream( + data: Uint8Array | ArrayBuffer, + bytesPerRead = 10240 +): ReadableStream { + let readFrom = 0; + return new ReadableStream({ + start(controller) {}, + async pull(controller): Promise { + controller.enqueue(data.slice(readFrom, readFrom + bytesPerRead)); + readFrom += bytesPerRead; + if (readFrom >= data.byteLength) { + controller.close(); + } + } + }); +} + +/** + * A class representing a bundle. + * + * Takes a bundle stream or buffer, and presents abstractions to read bundled + * elements out of the underlying content. + */ +export class BundleReader { + /** Cached bundle metadata. */ + private metadata: Deferred = new Deferred(); + /** The reader instance of the given ReadableStream. */ + private reader: ReadableStreamDefaultReader; + /** + * Internal buffer to hold bundle content, accumulating incomplete element + * content. + */ + private buffer: Uint8Array = new Uint8Array(); + /** The decoder used to parse binary data into strings. */ + private textDecoder = new TextDecoder('utf-8'); + + constructor( + private bundleStream: + | ReadableStream + | Uint8Array + | ArrayBuffer + ) { + if ( + bundleStream instanceof Uint8Array || + bundleStream instanceof ArrayBuffer + ) { + this.bundleStream = toReadableStream(bundleStream); + } + this.reader = (this.bundleStream as ReadableStream).getReader(); + + // Read the metadata (which is the first element). + this.nextElementImpl().then( + element => { + if (element && element.isBundleMetadata()) { + this.metadata.resolve(element.payload.metadata!); + } else { + this.metadata.reject( + new Error(`The first element of the bundle is not a metadata, it is + ${JSON.stringify(element?.payload)}`) + ); + } + }, + error => this.metadata.reject(error) + ); + } + + /** + * Returns the metadata of the bundle. + */ + async getMetadata(): Promise { + return this.metadata.promise; + } + + /** + * Returns the next BundleElement (together with its byte size in the bundle) + * that has not been read from underlying ReadableStream. Returns null if we + * have reached the end of the stream. + */ + async nextElement(): Promise { + // Makes sure metadata is read before proceeding. + await this.getMetadata(); + return this.nextElementImpl(); + } + + /** + * Reads from the head of internal buffer, and pulling more data from + * underlying stream if a complete element cannot be found, until an + * element(including the prefixed length and the JSON string) is found. + * + * Once a complete element is read, it is dropped from internal buffer. + * + * Returns either the bundled element, or null if we have reached the end of + * the stream. + */ + private async nextElementImpl(): Promise { + const lengthBuffer = await this.readLength(); + if (lengthBuffer === null) { + return null; + } + + const lengthString = this.textDecoder.decode(lengthBuffer); + const length = Number(lengthString); + if (isNaN(length)) { + this.raiseError(`length string (${lengthString}) is not valid number`); + } + + const jsonString = await this.readJsonString(length); + + return new SizedBundleElement( + JSON.parse(jsonString), + lengthBuffer.length + length + ); + } + + /** First index of '{' from the underlying buffer. */ + private indexOfOpenBracket(): number { + return this.buffer.findIndex(v => v === '{'.charCodeAt(0)); + } + + /** + * Reads from the beginning of the internal buffer, until the first '{', and + * return the content. + * + * If reached end of the stream, returns a null. + */ + private async readLength(): Promise { + while (this.indexOfOpenBracket() < 0) { + const done = await this.pullMoreDataToBuffer(); + if (done) { + break; + } + } + + // Broke out of the loop because underlying stream is closed, and there + // happens to be no more data to process. + if (this.buffer.length === 0) { + return null; + } + + const position = this.indexOfOpenBracket(); + // Broke out of the loop because underlying stream is closed, but still + // cannot find an open bracket. + if (position < 0) { + this.raiseError( + 'Reached the end of bundle when a length string is expected.' + ); + } + + const result = this.buffer.slice(0, position); + // Update the internal buffer to drop the read length. + this.buffer = this.buffer.slice(position); + return result; + } + + /** + * Reads from a specified position from the internal buffer, for a specified + * number of bytes, pulling more data from the underlying stream if needed. + * + * Returns a string decoded from the read bytes. + */ + private async readJsonString(length: number): Promise { + while (this.buffer.length < length) { + const done = await this.pullMoreDataToBuffer(); + if (done) { + this.raiseError('Reached the end of bundle when more is expected.'); + } + } + + const result = this.textDecoder.decode(this.buffer.slice(0, length)); + // Update the internal buffer to drop the read json string. + this.buffer = this.buffer.slice(length); + return result; + } + + private raiseError(message: string): void { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this.reader.cancel('Invalid bundle format.'); + throw new Error(message); + } + + /** + * Pulls more data from underlying stream to internal buffer. + * Returns a boolean indicating whether the stream is finished. + */ + private async pullMoreDataToBuffer(): Promise { + const result = await this.reader.read(); + if (!result.done) { + const newBuffer = new Uint8Array( + this.buffer.length + result.value.length + ); + newBuffer.set(this.buffer); + newBuffer.set(result.value, this.buffer.length); + this.buffer = newBuffer; + } + return result.done; + } +} diff --git a/packages/firestore/test/unit/util/bundle.test.ts b/packages/firestore/test/unit/util/bundle.test.ts new file mode 100644 index 00000000000..981d3ad7289 --- /dev/null +++ b/packages/firestore/test/unit/util/bundle.test.ts @@ -0,0 +1,342 @@ +/** + * @license + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { expect } from 'chai'; +import { + BundleReader, + SizedBundleElement, + toReadableStream +} from '../../../src/util/bundle_reader'; +import { isNode } from '../../util/test_platform'; +import { BundleElement } from '../../../src/protos/firestore_bundle_proto'; + +function readableStreamFromString( + content: string, + bytesPerRead: number +): ReadableStream { + return toReadableStream(new TextEncoder().encode(content), bytesPerRead); +} + +function lengthPrefixedString(o: {}): string { + const str = JSON.stringify(o); + const l = new TextEncoder().encode(str).byteLength; + return `${l}${str}`; +} + +// eslint-disable-next-line no-restricted-properties +(isNode() ? describe.skip : describe)('readableStreamFromString()', () => { + it('returns stepping readable stream', async () => { + const encoder = new TextEncoder(); + const s = readableStreamFromString('0123456789', 4); + const r = s.getReader(); + + let result = await r.read(); + expect(result.value).to.deep.equal(encoder.encode('0123')); + expect(result.done).to.be.false; + + result = await r.read(); + expect(result.value).to.deep.equal(encoder.encode('4567')); + expect(result.done).to.be.false; + + result = await r.read(); + expect(result.value).to.deep.equal(encoder.encode('89')); + expect(result.done).to.be.false; + + result = await r.read(); + expect(result.value).to.be.undefined; + expect(result.done).to.be.true; + }); +}); + +// eslint-disable-next-line no-restricted-properties +(isNode() ? describe.skip : describe)('Bundle ', () => { + genericBundleReadingTests(1); + genericBundleReadingTests(4); + genericBundleReadingTests(64); + genericBundleReadingTests(1024); +}); + +function genericBundleReadingTests(bytesPerRead: number): void { + const encoder = new TextEncoder(); + // Setting up test data. + const meta: BundleElement = { + metadata: { + id: 'test-bundle', + createTime: { seconds: 1577836805, nanos: 6 }, + version: 1, + totalDocuments: 1, + totalBytes: 416 + } + }; + const metaString = lengthPrefixedString(meta); + + const doc1Meta: BundleElement = { + documentMetadata: { + name: + 'projects/test-project/databases/(default)/documents/collectionId/doc1', + readTime: { seconds: 5, nanos: 6 }, + exists: true + } + }; + const doc1MetaString = lengthPrefixedString(doc1Meta); + const doc1: BundleElement = { + document: { + name: + 'projects/test-project/databases/(default)/documents/collectionId/doc1', + createTime: { seconds: 1, nanos: 2000000 }, + updateTime: { seconds: 3, nanos: 4000 }, + fields: { foo: { stringValue: 'value' }, bar: { integerValue: -42 } } + } + }; + const doc1String = lengthPrefixedString(doc1); + + const doc2Meta: BundleElement = { + documentMetadata: { + name: + 'projects/test-project/databases/(default)/documents/collectionId/doc2', + readTime: { seconds: 5, nanos: 6 }, + exists: true + } + }; + const doc2MetaString = lengthPrefixedString(doc2Meta); + const doc2: BundleElement = { + document: { + name: + 'projects/test-project/databases/(default)/documents/collectionId/doc2', + createTime: { seconds: 1, nanos: 2000000 }, + updateTime: { seconds: 3, nanos: 4000 }, + fields: { foo: { stringValue: 'value1' }, bar: { integerValue: 42 } } + } + }; + const doc2String = lengthPrefixedString(doc2); + + const noDocMeta: BundleElement = { + documentMetadata: { + name: + 'projects/test-project/databases/(default)/documents/collectionId/nodoc', + readTime: { seconds: 5, nanos: 6 }, + exists: false + } + }; + const noDocMetaString = lengthPrefixedString(noDocMeta); + + const limitQuery: BundleElement = { + namedQuery: { + name: 'limitQuery', + bundledQuery: { + parent: 'projects/fireeats-97d5e/databases/(default)/documents', + structuredQuery: { + from: [{ collectionId: 'node_3.7.5_7Li7XoCjutvNxwD0tpo9' }], + orderBy: [{ field: { fieldPath: 'sort' }, direction: 'DESCENDING' }], + limit: { 'value': 1 } + }, + limitType: 'FIRST' + }, + readTime: { 'seconds': 1590011379, 'nanos': 191164000 } + } + }; + const limitQueryString = lengthPrefixedString(limitQuery); + const limitToLastQuery: BundleElement = { + namedQuery: { + name: 'limitToLastQuery', + bundledQuery: { + parent: 'projects/fireeats-97d5e/databases/(default)/documents', + structuredQuery: { + from: [{ collectionId: 'node_3.7.5_7Li7XoCjutvNxwD0tpo9' }], + orderBy: [{ field: { fieldPath: 'sort' }, direction: 'ASCENDING' }], + limit: { 'value': 1 } + }, + limitType: 'LAST' + }, + readTime: { 'seconds': 1590011379, 'nanos': 543063000 } + } + }; + const limitToLastQueryString = lengthPrefixedString(limitToLastQuery); + + async function getAllElements( + bundle: BundleReader + ): Promise { + const result: SizedBundleElement[] = []; + while (true) { + const sizedElement = await bundle.nextElement(); + if (sizedElement === null) { + break; + } + if (!sizedElement.isBundleMetadata()) { + result.push(sizedElement); + } + } + + return Promise.resolve(result); + } + + function verifySizedElement( + element: SizedBundleElement, + payload: unknown, + payloadString: string + ): void { + expect(element.payload).to.deep.equal(payload); + expect(element.byteLength).to.equal( + encoder.encode(payloadString).byteLength + ); + } + + async function generateBundleAndParse( + bundleString: string, + bytesPerRead: number, + validMeta = false + ): Promise { + const bundleStream = readableStreamFromString(bundleString, bytesPerRead); + const bundle = new BundleReader(bundleStream); + + if (!validMeta) { + await expect(await bundle.getMetadata()).should.be.rejected; + } else { + expect(await bundle.getMetadata()).to.deep.equal(meta.metadata); + } + + await getAllElements(bundle); + } + + it('reads with query and doc with bytesPerRead ' + bytesPerRead, async () => { + const bundleStream = readableStreamFromString( + metaString + + limitQueryString + + limitToLastQueryString + + doc1MetaString + + doc1String, + bytesPerRead + ); + const bundle = new BundleReader(bundleStream); + + expect(await bundle.getMetadata()).to.deep.equal(meta.metadata); + + const actual = await getAllElements(bundle); + expect(actual.length).to.equal(4); + verifySizedElement(actual[0], limitQuery, limitQueryString); + verifySizedElement(actual[1], limitToLastQuery, limitToLastQueryString); + verifySizedElement(actual[2], doc1Meta, doc1MetaString); + verifySizedElement(actual[3], doc1, doc1String); + }); + + it( + 'reads with unexpected orders with bytesPerRead ' + bytesPerRead, + async () => { + const bundleStream = readableStreamFromString( + metaString + + doc1MetaString + + doc1String + + limitQueryString + + doc2MetaString + + doc2String, + bytesPerRead + ); + const bundle = new BundleReader(bundleStream); + + const actual = await getAllElements(bundle); + expect(actual.length).to.equal(5); + verifySizedElement(actual[0], doc1Meta, doc1MetaString); + verifySizedElement(actual[1], doc1, doc1String); + verifySizedElement(actual[2], limitQuery, limitQueryString); + verifySizedElement(actual[3], doc2Meta, doc2MetaString); + verifySizedElement(actual[4], doc2, doc2String); + + // Reading metadata after other elements should also work. + expect(await bundle.getMetadata()).to.deep.equal(meta.metadata); + } + ); + + it( + 'reads without named query with bytesPerRead ' + bytesPerRead, + async () => { + const bundleStream = readableStreamFromString( + metaString + doc1MetaString + doc1String, + bytesPerRead + ); + const bundle = new BundleReader(bundleStream); + + expect(await bundle.getMetadata()).to.deep.equal(meta.metadata); + + const actual = await getAllElements(bundle); + expect(actual.length).to.equal(2); + verifySizedElement(actual[0], doc1Meta, doc1MetaString); + verifySizedElement(actual[1], doc1, doc1String); + } + ); + + it('reads with deleted doc with bytesPerRead ' + bytesPerRead, async () => { + const bundleStream = readableStreamFromString( + metaString + noDocMetaString + doc1MetaString + doc1String, + bytesPerRead + ); + const bundle = new BundleReader(bundleStream); + + expect(await bundle.getMetadata()).to.deep.equal(meta.metadata); + + const actual = await getAllElements(bundle); + expect(actual.length).to.equal(3); + verifySizedElement(actual[0], noDocMeta, noDocMetaString); + verifySizedElement(actual[1], doc1Meta, doc1MetaString); + verifySizedElement(actual[2], doc1, doc1String); + }); + + it( + 'reads without documents or query with bytesPerRead ' + bytesPerRead, + async () => { + const bundleStream = readableStreamFromString(metaString, bytesPerRead); + const bundle = new BundleReader(bundleStream); + + expect(await bundle.getMetadata()).to.deep.equal(meta.metadata); + + const actual = await getAllElements(bundle); + expect(actual.length).to.equal(0); + } + ); + + it( + 'throws with ill-formatted bundle with bytesPerRead ' + bytesPerRead, + async () => { + await expect( + generateBundleAndParse('metadata: "no length prefix"', bytesPerRead) + ).to.be.rejectedWith( + 'Reached the end of bundle when a length string is expected.' + ); + + await expect( + generateBundleAndParse('{metadata: "no length prefix"}', bytesPerRead) + ).to.be.rejectedWith('Unexpected end of JSON input'); + + await expect( + generateBundleAndParse( + metaString + 'invalid-string', + bytesPerRead, + true + ) + ).to.be.rejectedWith( + 'Reached the end of bundle when a length string is expected.' + ); + + await expect( + generateBundleAndParse('1' + metaString, bytesPerRead) + ).to.be.rejectedWith('Reached the end of bundle when more is expected.'); + + // First element is not BundleMetadata. + await expect( + generateBundleAndParse(doc1MetaString + doc1String, bytesPerRead) + ).to.be.rejectedWith('The first element of the bundle is not a metadata'); + } + ); +}