Skip to content

Enable bundle reader for Node. #3167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jun 10, 2020
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions packages/firestore/src/platform/platform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import { Connection } from '../remote/connection';
import { JsonProtoSerializer } from '../remote/serializer';
import { fail } from '../util/assert';
import { ConnectivityMonitor } from './../remote/connectivity_monitor';
import { BundleSource } from '../util/bundle_reader';
import { validatePositiveNumber } from '../util/input_validation';

/**
* Provides a common interface to load anything platform dependent, e.g.
Expand Down Expand Up @@ -50,6 +52,18 @@ export interface Platform {
*/
randomBytes(nBytes: number): Uint8Array;

/**
* Builds a `ByteStreamReader` from a data source.
* @param source The data source to use.
* @param bytesPerRead How many bytes each `read()` from the returned reader
* will read. It is ignored if the passed in source does not provide
* such control(example: ReadableStream).
*/
toByteStreamReader(
source: BundleSource,
bytesPerRead: number
): ByteStreamReader;

/** The Platform's 'window' implementation or null if not available. */
readonly window: Window | null;

Expand All @@ -60,6 +74,57 @@ export interface Platform {
readonly base64Available: boolean;
}

/**
* An interface compatible with Web's ReadableStream.getReader() return type.
*
* This can be used as an abstraction to mimic `ReadableStream` where it is not
* available.
*/
export interface ByteStreamReader {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you take a look at using the type ReadableStreamReader directly?

interface ReadableStreamReader<R = any> {
    cancel(): Promise<void>;
    read(): Promise<ReadableStreamReadResult<R>>;
    releaseLock(): void;
}

(from lib.dom.d.ts)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the document, it seems ReadableStreamDefaultReader is the interface? There are several differences.

  1. releaseLock() is not added there because it is not used. I dont have a strong preference though.
  2. The DOM one is more generic, but given we only intend to use it to read bundle source which is ArrayBuffer | ReadableStream<UInt8Array> I think it is fine?
  3. cancel takes a string because ReadableStreamDefaultReader takes a reason for its cancel (https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader/cancel).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we would be better off if we re-used existing types. It makes our code smaller, easier to understand for outsiders and is more future proof. You are already using most of the idioms of the existing API, and re-using the type would be a good explanation of why you are doing this ("citing the source").

The change here is actually pretty simple: https://gist.github.com/schmidt-sebastian/62cabbbcab7f78f3a6e1aee7809f1b73
If we need a more complicated type, we could keep ByteStreamReader but make it:

export type ByteStreamReader = ReadableStreamReader<ArrayBuffer | ReadableStream<UInt8Array>>

As for 3: Right now, we ignore the reason for cancelling. Could we just make it part of the error message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

I had the assumption that ReadableStreamReader will not be available for Node, turns out the interface does exist with d.ts. Thanks for the patch!

read(): Promise<ByteStreamReadResult>;
cancel(reason?: string): Promise<void>;
}

/**
* An interface compatible with ReadableStreamReadResult<UInt8Array>.
*/
export interface ByteStreamReadResult {
done: boolean;
value?: Uint8Array;
}

/**
* Builds a `ByteStreamReader` from a UInt8Array.
* @param source The data source to use.
* @param bytesPerRead How many bytes each `read()` from the returned reader
* will read.
*/
export function toByteStreamReader(
source: Uint8Array,
bytesPerRead: number
): ByteStreamReader {
validatePositiveNumber('toByteStreamReader', 2, bytesPerRead);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is not a user facing method, this should be a debugAssert, which will get removed in the final build and drops the dependency on validatePositiveNumber in code that doesn't otherwise depend on it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

let readFrom = 0;
const reader: ByteStreamReader = {
async read(): Promise<ByteStreamReadResult> {
if (readFrom < source.byteLength) {
const result = {
value: source.slice(readFrom, readFrom + bytesPerRead),
done: false
};
readFrom += bytesPerRead;
return result;
}

return { value: undefined, done: true };
},

async cancel(reason?: string): Promise<void> {}
};

return reader;
}

/**
* Provides singleton helpers where setup code can inject a platform at runtime.
* setPlatform needs to be set before Firestore is used and must be set exactly
Expand All @@ -74,6 +139,13 @@ export class PlatformSupport {
PlatformSupport.platform = platform;
}

/**
* Forcing to set the platform instance, testing only!
*/
private static _forceSetPlatform(platform: Platform): void {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can drop this now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

PlatformSupport.platform = platform;
}

static getPlatform(): Platform {
if (!PlatformSupport.platform) {
fail('Platform not set');
Expand Down
28 changes: 27 additions & 1 deletion packages/firestore/src/platform_browser/browser_platform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
*/

import { DatabaseId, DatabaseInfo } from '../core/database_info';
import { Platform } from '../platform/platform';
import {
ByteStreamReader,
Platform,
toByteStreamReader
} from '../platform/platform';
import { Connection } from '../remote/connection';
import { JsonProtoSerializer } from '../remote/serializer';
import { ConnectivityMonitor } from './../remote/connectivity_monitor';
Expand All @@ -25,6 +29,7 @@ import { NoopConnectivityMonitor } from '../remote/connectivity_monitor_noop';
import { BrowserConnectivityMonitor } from './browser_connectivity_monitor';
import { WebChannelConnection } from './webchannel_connection';
import { debugAssert } from '../util/assert';
import { BundleSource } from '../util/bundle_reader';

// Implements the Platform API for browsers and some browser-like environments
// (including ReactNative).
Expand Down Expand Up @@ -93,4 +98,25 @@ export class BrowserPlatform implements Platform {
}
return bytes;
}

/**
* On web, a `ReadableStream` is wrapped around by a `ByteStreamReader`.
*/
toByteStreamReader(
source: BundleSource,
bytesPerRead: number
): ByteStreamReader {
if (source instanceof Uint8Array) {
return toByteStreamReader(source, bytesPerRead);
}
if (source instanceof ArrayBuffer) {
return toByteStreamReader(new Uint8Array(source), bytesPerRead);
}
if (source instanceof ReadableStream) {
return source.getReader();
}
throw new Error(
'Source of `toByteStreamReader` has to be a ArrayBuffer or ReadableStream'
);
}
}
25 changes: 24 additions & 1 deletion packages/firestore/src/platform_node/node_platform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ import { randomBytes } from 'crypto';
import { inspect } from 'util';

import { DatabaseId, DatabaseInfo } from '../core/database_info';
import { Platform } from '../platform/platform';
import {
ByteStreamReader,
Platform,
toByteStreamReader
} from '../platform/platform';
import { Connection } from '../remote/connection';
import { JsonProtoSerializer } from '../remote/serializer';
import { Code, FirestoreError } from '../util/error';
Expand All @@ -29,6 +33,7 @@ import { NoopConnectivityMonitor } from './../remote/connectivity_monitor_noop';
import { GrpcConnection } from './grpc_connection';
import { loadProtos } from './load_protos';
import { debugAssert } from '../util/assert';
import { invalidClassError } from '../util/input_validation';

export class NodePlatform implements Platform {
readonly base64Available = true;
Expand Down Expand Up @@ -83,4 +88,22 @@ export class NodePlatform implements Platform {

return randomBytes(nBytes);
}

/**
* On Node, only supported data source is a `Uint8Array` for now.
*/
toByteStreamReader(
source: Uint8Array,
bytesPerRead: number
): ByteStreamReader {
if (!(source instanceof Uint8Array)) {
throw invalidClassError(
'NodePlatform.toByteStreamReader',
'Uint8Array',
1,
source
);
}
return toByteStreamReader(source, bytesPerRead);
}
}
59 changes: 23 additions & 36 deletions packages/firestore/src/util/bundle_reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import {
BundleMetadata
} from '../protos/firestore_bundle_proto';
import { Deferred } from './promise';
import { ByteStreamReader, PlatformSupport } from '../platform/platform';
import { debugAssert } from './assert';

/**
* A complete element in the bundle stream, together with the byte length it
Expand All @@ -37,29 +39,19 @@ export class SizedBundleElement {
}
}

export type BundleSource =
| ReadableStream<Uint8Array>
| ArrayBuffer
| Uint8Array;

/**
* Create a `ReadableStream` from a underlying buffer.
* When applicable, how many bytets to read from the underlying data source
* each time.
*
* @param data: Underlying buffer.
* @param bytesPerRead: How many bytes to read from the underlying buffer from
* each read through the stream.
* It is not application when we don't really have control, for example, when
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/application/applicable

But it could just be: "Not applicable for ReadableStreams."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* source is a ReadableStream.
*/
export function toReadableStream(
data: Uint8Array | ArrayBuffer,
bytesPerRead = 10240
): ReadableStream<Uint8Array | ArrayBuffer> {
let readFrom = 0;
return new ReadableStream({
start(controller) {},
async pull(controller): Promise<void> {
controller.enqueue(data.slice(readFrom, readFrom + bytesPerRead));
readFrom += bytesPerRead;
if (readFrom >= data.byteLength) {
controller.close();
}
}
});
}
const BYTES_PER_READ = 10240;

/**
* A class representing a bundle.
Expand All @@ -70,8 +62,6 @@ export function toReadableStream(
export class BundleReader {
/** Cached bundle metadata. */
private metadata: Deferred<BundleMetadata> = new Deferred<BundleMetadata>();
/** The reader instance of the given ReadableStream. */
private reader: ReadableStreamDefaultReader;
/**
* Internal buffer to hold bundle content, accumulating incomplete element
* content.
Expand All @@ -80,20 +70,16 @@ export class BundleReader {
/** The decoder used to parse binary data into strings. */
private textDecoder = new TextDecoder('utf-8');

static fromBundleSource(source: BundleSource): BundleReader {
return new BundleReader(
PlatformSupport.getPlatform().toByteStreamReader(source, BYTES_PER_READ)
);
}

constructor(
private bundleStream:
| ReadableStream<Uint8Array | ArrayBuffer>
| Uint8Array
| ArrayBuffer
/** The reader to read from underlying binary bundle data source. */
private reader: ByteStreamReader
) {
if (
bundleStream instanceof Uint8Array ||
bundleStream instanceof ArrayBuffer
) {
this.bundleStream = toReadableStream(bundleStream);
}
this.reader = (this.bundleStream as ReadableStream).getReader();

// Read the metadata (which is the first element).
this.nextElementImpl().then(
element => {
Expand Down Expand Up @@ -231,11 +217,12 @@ export class BundleReader {
private async pullMoreDataToBuffer(): Promise<boolean> {
const result = await this.reader.read();
if (!result.done) {
debugAssert(!!result.value, 'Read undefined when "done" is false.');
const newBuffer = new Uint8Array(
this.buffer.length + result.value.length
this.buffer.length + result.value!.length
);
newBuffer.set(this.buffer);
newBuffer.set(result.value, this.buffer.length);
newBuffer.set(result.value!, this.buffer.length);
this.buffer = newBuffer;
}
return result.done;
Expand Down
29 changes: 28 additions & 1 deletion packages/firestore/test/unit/platform/platform.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,37 @@
*/

import { expect } from 'chai';
import { PlatformSupport } from '../../../src/platform/platform';
import {
PlatformSupport,
toByteStreamReader
} from '../../../src/platform/platform';

describe('Platform', () => {
it('can load the platform at runtime', () => {
expect(PlatformSupport.getPlatform()).to.exist;
});

it('toByteStreamReader() steps underlying data', async () => {
const encoder = new TextEncoder();
const r = toByteStreamReader(
encoder.encode('0123456789'),
/* bytesPerRead */ 4
);

let result = await r.read();
expect(result.value).to.deep.equal(encoder.encode('0123'));
expect(result.done).to.be.false;

result = await r.read();
expect(result.value).to.deep.equal(encoder.encode('4567'));
expect(result.done).to.be.false;

result = await r.read();
expect(result.value).to.deep.equal(encoder.encode('89'));
expect(result.done).to.be.false;

result = await r.read();
expect(result.value).to.be.undefined;
expect(result.done).to.be.true;
});
});
Loading