Skip to content

Enable bundle reader for Node. #3167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jun 10, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions packages/firestore/src/platform/platform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import { Connection } from '../remote/connection';
import { JsonProtoSerializer } from '../remote/serializer';
import { fail } from '../util/assert';
import { ConnectivityMonitor } from './../remote/connectivity_monitor';
import { BundleSource } from '../util/bundle_reader';
import { validatePositiveNumber } from '../util/input_validation';

/**
* Provides a common interface to load anything platform dependent, e.g.
Expand Down Expand Up @@ -50,6 +52,18 @@ export interface Platform {
*/
randomBytes(nBytes: number): Uint8Array;

/**
* Builds a `ByteStreamReader` from a data source.
* @param source The data source to use.
* @param bytesPerRead How many bytes each `read()` from the returned reader
* will read. It is ignored if the passed in source does not provide
* such control(example: ReadableStream).
*/
toByteStreamReader(
source: BundleSource,
bytesPerRead: number
): ReadableStreamReader<Uint8Array>;

/** The Platform's 'window' implementation or null if not available. */
readonly window: Window | null;

Expand All @@ -60,6 +74,37 @@ export interface Platform {
readonly base64Available: boolean;
}

/**
* Builds a `ByteStreamReader` from a UInt8Array.
* @param source The data source to use.
* @param bytesPerRead How many bytes each `read()` from the returned reader
* will read.
*/
export function toByteStreamReader(
source: Uint8Array,
bytesPerRead: number
): ReadableStreamReader<Uint8Array> {
validatePositiveNumber('toByteStreamReader', 2, bytesPerRead);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is not a user facing method, this should be a debugAssert, which will get removed in the final build and drops the dependency on validatePositiveNumber in code that doesn't otherwise depend on it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

let readFrom = 0;
const reader: ReadableStreamReader<Uint8Array> = {
async read(): Promise<ReadableStreamReadResult<Uint8Array>> {
if (readFrom < source.byteLength) {
const result = {
value: source.slice(readFrom, readFrom + bytesPerRead),
done: false
};
readFrom += bytesPerRead;
return result;
}

return { value: undefined, done: true };
},
async cancel(): Promise<void> {},
releaseLock() {}
};
return reader;
}

/**
* Provides singleton helpers where setup code can inject a platform at runtime.
* setPlatform needs to be set before Firestore is used and must be set exactly
Expand Down
24 changes: 23 additions & 1 deletion packages/firestore/src/platform_browser/browser_platform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/

import { DatabaseId, DatabaseInfo } from '../core/database_info';
import { Platform } from '../platform/platform';
import { Platform, toByteStreamReader } from '../platform/platform';
import { Connection } from '../remote/connection';
import { JsonProtoSerializer } from '../remote/serializer';
import { ConnectivityMonitor } from './../remote/connectivity_monitor';
Expand All @@ -25,6 +25,7 @@ import { NoopConnectivityMonitor } from '../remote/connectivity_monitor_noop';
import { BrowserConnectivityMonitor } from './browser_connectivity_monitor';
import { WebChannelConnection } from './webchannel_connection';
import { debugAssert } from '../util/assert';
import { BundleSource } from '../util/bundle_reader';

// Implements the Platform API for browsers and some browser-like environments
// (including ReactNative).
Expand Down Expand Up @@ -93,4 +94,25 @@ export class BrowserPlatform implements Platform {
}
return bytes;
}

/**
* On web, a `ReadableStream` is wrapped around by a `ByteStreamReader`.
*/
toByteStreamReader(
source: BundleSource,
bytesPerRead: number
): ReadableStreamReader<Uint8Array> {
if (source instanceof Uint8Array) {
return toByteStreamReader(source, bytesPerRead);
}
if (source instanceof ArrayBuffer) {
return toByteStreamReader(new Uint8Array(source), bytesPerRead);
}
if (source instanceof ReadableStream) {
return source.getReader();
}
throw new Error(
'Source of `toByteStreamReader` has to be a ArrayBuffer or ReadableStream'
);
}
}
21 changes: 20 additions & 1 deletion packages/firestore/src/platform_node/node_platform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { randomBytes } from 'crypto';
import { inspect } from 'util';

import { DatabaseId, DatabaseInfo } from '../core/database_info';
import { Platform } from '../platform/platform';
import { Platform, toByteStreamReader } from '../platform/platform';
import { Connection } from '../remote/connection';
import { JsonProtoSerializer } from '../remote/serializer';
import { Code, FirestoreError } from '../util/error';
Expand All @@ -29,6 +29,7 @@ import { NoopConnectivityMonitor } from './../remote/connectivity_monitor_noop';
import { GrpcConnection } from './grpc_connection';
import { loadProtos } from './load_protos';
import { debugAssert } from '../util/assert';
import { invalidClassError } from '../util/input_validation';

export class NodePlatform implements Platform {
readonly base64Available = true;
Expand Down Expand Up @@ -83,4 +84,22 @@ export class NodePlatform implements Platform {

return randomBytes(nBytes);
}

/**
* On Node, only supported data source is a `Uint8Array` for now.
*/
toByteStreamReader(
source: Uint8Array,
bytesPerRead: number
): ReadableStreamReader<Uint8Array> {
if (!(source instanceof Uint8Array)) {
throw invalidClassError(
'NodePlatform.toByteStreamReader',
'Uint8Array',
1,
source
);
}
return toByteStreamReader(source, bytesPerRead);
}
}
62 changes: 24 additions & 38 deletions packages/firestore/src/util/bundle_reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import {
BundleMetadata
} from '../protos/firestore_bundle_proto';
import { Deferred } from './promise';
import { PlatformSupport } from '../platform/platform';
import { debugAssert } from './assert';

/**
* A complete element in the bundle stream, together with the byte length it
Expand All @@ -37,29 +39,18 @@ export class SizedBundleElement {
}
}

export type BundleSource =
| ReadableStream<Uint8Array>
| ArrayBuffer
| Uint8Array;

/**
* Create a `ReadableStream` from a underlying buffer.
* When applicable, how many bytes to read from the underlying data source
* each time.
*
* @param data: Underlying buffer.
* @param bytesPerRead: How many bytes to read from the underlying buffer from
* each read through the stream.
* Not applicable for ReadableStreams.
*/
export function toReadableStream(
data: Uint8Array | ArrayBuffer,
bytesPerRead = 10240
): ReadableStream<Uint8Array | ArrayBuffer> {
let readFrom = 0;
return new ReadableStream({
start(controller) {},
async pull(controller): Promise<void> {
controller.enqueue(data.slice(readFrom, readFrom + bytesPerRead));
readFrom += bytesPerRead;
if (readFrom >= data.byteLength) {
controller.close();
}
}
});
}
const BYTES_PER_READ = 10240;

/**
* A class representing a bundle.
Expand All @@ -70,8 +61,6 @@ export function toReadableStream(
export class BundleReader {
/** Cached bundle metadata. */
private metadata: Deferred<BundleMetadata> = new Deferred<BundleMetadata>();
/** The reader instance of the given ReadableStream. */
private reader: ReadableStreamDefaultReader;
/**
* Internal buffer to hold bundle content, accumulating incomplete element
* content.
Expand All @@ -80,20 +69,16 @@ export class BundleReader {
/** The decoder used to parse binary data into strings. */
private textDecoder = new TextDecoder('utf-8');

static fromBundleSource(source: BundleSource): BundleReader {
return new BundleReader(
PlatformSupport.getPlatform().toByteStreamReader(source, BYTES_PER_READ)
);
}

constructor(
private bundleStream:
| ReadableStream<Uint8Array | ArrayBuffer>
| Uint8Array
| ArrayBuffer
/** The reader to read from underlying binary bundle data source. */
private reader: ReadableStreamReader<Uint8Array>
) {
if (
bundleStream instanceof Uint8Array ||
bundleStream instanceof ArrayBuffer
) {
this.bundleStream = toReadableStream(bundleStream);
}
this.reader = (this.bundleStream as ReadableStream).getReader();

// Read the metadata (which is the first element).
this.nextElementImpl().then(
element => {
Expand Down Expand Up @@ -220,8 +205,8 @@ export class BundleReader {

private raiseError(message: string): void {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.reader.cancel('Invalid bundle format.');
throw new Error(message);
this.reader.cancel();
throw new Error(`Invalid bundle format: ${message}`);
}

/**
Expand All @@ -231,11 +216,12 @@ export class BundleReader {
private async pullMoreDataToBuffer(): Promise<boolean> {
const result = await this.reader.read();
if (!result.done) {
debugAssert(!!result.value, 'Read undefined when "done" is false.');
const newBuffer = new Uint8Array(
this.buffer.length + result.value.length
this.buffer.length + result.value!.length
);
newBuffer.set(this.buffer);
newBuffer.set(result.value, this.buffer.length);
newBuffer.set(result.value!, this.buffer.length);
this.buffer = newBuffer;
}
return result.done;
Expand Down
29 changes: 28 additions & 1 deletion packages/firestore/test/unit/platform/platform.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,37 @@
*/

import { expect } from 'chai';
import { PlatformSupport } from '../../../src/platform/platform';
import {
PlatformSupport,
toByteStreamReader
} from '../../../src/platform/platform';

describe('Platform', () => {
it('can load the platform at runtime', () => {
expect(PlatformSupport.getPlatform()).to.exist;
});

it('toByteStreamReader() steps underlying data', async () => {
const encoder = new TextEncoder();
const r = toByteStreamReader(
encoder.encode('0123456789'),
/* bytesPerRead */ 4
);

let result = await r.read();
expect(result.value).to.deep.equal(encoder.encode('0123'));
expect(result.done).to.be.false;

result = await r.read();
expect(result.value).to.deep.equal(encoder.encode('4567'));
expect(result.done).to.be.false;

result = await r.read();
expect(result.value).to.deep.equal(encoder.encode('89'));
expect(result.done).to.be.false;

result = await r.read();
expect(result.value).to.be.undefined;
expect(result.done).to.be.true;
});
});
Loading