Skip to content

Commit 2f179bc

Browse files
authored
Enable bundle reader for Node. (#3167)
1 parent b0c8299 commit 2f179bc

File tree

7 files changed

+182
-76
lines changed

7 files changed

+182
-76
lines changed

packages/firestore/src/platform/platform.ts

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
import { DatabaseId, DatabaseInfo } from '../core/database_info';
1919
import { Connection } from '../remote/connection';
2020
import { JsonProtoSerializer } from '../remote/serializer';
21-
import { fail } from '../util/assert';
21+
import { debugAssert, fail } from '../util/assert';
2222
import { ConnectivityMonitor } from './../remote/connectivity_monitor';
23+
import { BundleSource } from '../util/bundle_reader';
2324

2425
/**
2526
* Provides a common interface to load anything platform dependent, e.g.
@@ -50,6 +51,18 @@ export interface Platform {
5051
*/
5152
randomBytes(nBytes: number): Uint8Array;
5253

54+
/**
55+
* Builds a `ByteStreamReader` from a data source.
56+
* @param source The data source to use.
57+
* @param bytesPerRead How many bytes each `read()` from the returned reader
58+
* will read. It is ignored if the passed in source does not provide
59+
* such control(example: ReadableStream).
60+
*/
61+
toByteStreamReader(
62+
source: BundleSource,
63+
bytesPerRead: number
64+
): ReadableStreamReader<Uint8Array>;
65+
5366
/** The Platform's 'window' implementation or null if not available. */
5467
readonly window: Window | null;
5568

@@ -60,6 +73,40 @@ export interface Platform {
6073
readonly base64Available: boolean;
6174
}
6275

76+
/**
77+
* Builds a `ByteStreamReader` from a UInt8Array.
78+
* @param source The data source to use.
79+
* @param bytesPerRead How many bytes each `read()` from the returned reader
80+
* will read.
81+
*/
82+
export function toByteStreamReader(
83+
source: Uint8Array,
84+
bytesPerRead: number
85+
): ReadableStreamReader<Uint8Array> {
86+
debugAssert(
87+
bytesPerRead > 0,
88+
`toByteStreamReader expects positive bytesPerRead, but got ${bytesPerRead}`
89+
);
90+
let readFrom = 0;
91+
const reader: ReadableStreamReader<Uint8Array> = {
92+
async read(): Promise<ReadableStreamReadResult<Uint8Array>> {
93+
if (readFrom < source.byteLength) {
94+
const result = {
95+
value: source.slice(readFrom, readFrom + bytesPerRead),
96+
done: false
97+
};
98+
readFrom += bytesPerRead;
99+
return result;
100+
}
101+
102+
return { value: undefined, done: true };
103+
},
104+
async cancel(): Promise<void> {},
105+
releaseLock() {}
106+
};
107+
return reader;
108+
}
109+
63110
/**
64111
* Provides singleton helpers where setup code can inject a platform at runtime.
65112
* setPlatform needs to be set before Firestore is used and must be set exactly

packages/firestore/src/platform_browser/browser_platform.ts

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717

1818
import { DatabaseId, DatabaseInfo } from '../core/database_info';
19-
import { Platform } from '../platform/platform';
19+
import { Platform, toByteStreamReader } from '../platform/platform';
2020
import { Connection } from '../remote/connection';
2121
import { JsonProtoSerializer } from '../remote/serializer';
2222
import { ConnectivityMonitor } from './../remote/connectivity_monitor';
@@ -25,6 +25,7 @@ import { NoopConnectivityMonitor } from '../remote/connectivity_monitor_noop';
2525
import { BrowserConnectivityMonitor } from './browser_connectivity_monitor';
2626
import { WebChannelConnection } from './webchannel_connection';
2727
import { debugAssert } from '../util/assert';
28+
import { BundleSource } from '../util/bundle_reader';
2829

2930
// Implements the Platform API for browsers and some browser-like environments
3031
// (including ReactNative).
@@ -93,4 +94,25 @@ export class BrowserPlatform implements Platform {
9394
}
9495
return bytes;
9596
}
97+
98+
/**
99+
* On web, a `ReadableStream` is wrapped around by a `ByteStreamReader`.
100+
*/
101+
toByteStreamReader(
102+
source: BundleSource,
103+
bytesPerRead: number
104+
): ReadableStreamReader<Uint8Array> {
105+
if (source instanceof Uint8Array) {
106+
return toByteStreamReader(source, bytesPerRead);
107+
}
108+
if (source instanceof ArrayBuffer) {
109+
return toByteStreamReader(new Uint8Array(source), bytesPerRead);
110+
}
111+
if (source instanceof ReadableStream) {
112+
return source.getReader();
113+
}
114+
throw new Error(
115+
'Source of `toByteStreamReader` has to be a ArrayBuffer or ReadableStream'
116+
);
117+
}
96118
}

packages/firestore/src/platform_node/node_platform.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import { randomBytes } from 'crypto';
1919
import { inspect } from 'util';
2020

2121
import { DatabaseId, DatabaseInfo } from '../core/database_info';
22-
import { Platform } from '../platform/platform';
22+
import { Platform, toByteStreamReader } from '../platform/platform';
2323
import { Connection } from '../remote/connection';
2424
import { JsonProtoSerializer } from '../remote/serializer';
2525
import { Code, FirestoreError } from '../util/error';
@@ -29,6 +29,7 @@ import { NoopConnectivityMonitor } from './../remote/connectivity_monitor_noop';
2929
import { GrpcConnection } from './grpc_connection';
3030
import { loadProtos } from './load_protos';
3131
import { debugAssert } from '../util/assert';
32+
import { invalidClassError } from '../util/input_validation';
3233

3334
export class NodePlatform implements Platform {
3435
readonly base64Available = true;
@@ -83,4 +84,22 @@ export class NodePlatform implements Platform {
8384

8485
return randomBytes(nBytes);
8586
}
87+
88+
/**
89+
* On Node, only supported data source is a `Uint8Array` for now.
90+
*/
91+
toByteStreamReader(
92+
source: Uint8Array,
93+
bytesPerRead: number
94+
): ReadableStreamReader<Uint8Array> {
95+
if (!(source instanceof Uint8Array)) {
96+
throw invalidClassError(
97+
'NodePlatform.toByteStreamReader',
98+
'Uint8Array',
99+
1,
100+
source
101+
);
102+
}
103+
return toByteStreamReader(source, bytesPerRead);
104+
}
86105
}

packages/firestore/src/util/bundle_reader.ts

Lines changed: 24 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import {
2020
BundleMetadata
2121
} from '../protos/firestore_bundle_proto';
2222
import { Deferred } from './promise';
23+
import { PlatformSupport } from '../platform/platform';
24+
import { debugAssert } from './assert';
2325

2426
/**
2527
* A complete element in the bundle stream, together with the byte length it
@@ -37,29 +39,18 @@ export class SizedBundleElement {
3739
}
3840
}
3941

42+
export type BundleSource =
43+
| ReadableStream<Uint8Array>
44+
| ArrayBuffer
45+
| Uint8Array;
46+
4047
/**
41-
* Create a `ReadableStream` from a underlying buffer.
48+
* When applicable, how many bytes to read from the underlying data source
49+
* each time.
4250
*
43-
* @param data: Underlying buffer.
44-
* @param bytesPerRead: How many bytes to read from the underlying buffer from
45-
* each read through the stream.
51+
* Not applicable for ReadableStreams.
4652
*/
47-
export function toReadableStream(
48-
data: Uint8Array | ArrayBuffer,
49-
bytesPerRead = 10240
50-
): ReadableStream<Uint8Array | ArrayBuffer> {
51-
let readFrom = 0;
52-
return new ReadableStream({
53-
start(controller) {},
54-
async pull(controller): Promise<void> {
55-
controller.enqueue(data.slice(readFrom, readFrom + bytesPerRead));
56-
readFrom += bytesPerRead;
57-
if (readFrom >= data.byteLength) {
58-
controller.close();
59-
}
60-
}
61-
});
62-
}
53+
const BYTES_PER_READ = 10240;
6354

6455
/**
6556
* A class representing a bundle.
@@ -70,8 +61,6 @@ export function toReadableStream(
7061
export class BundleReader {
7162
/** Cached bundle metadata. */
7263
private metadata: Deferred<BundleMetadata> = new Deferred<BundleMetadata>();
73-
/** The reader instance of the given ReadableStream. */
74-
private reader: ReadableStreamDefaultReader;
7564
/**
7665
* Internal buffer to hold bundle content, accumulating incomplete element
7766
* content.
@@ -80,20 +69,16 @@ export class BundleReader {
8069
/** The decoder used to parse binary data into strings. */
8170
private textDecoder = new TextDecoder('utf-8');
8271

72+
static fromBundleSource(source: BundleSource): BundleReader {
73+
return new BundleReader(
74+
PlatformSupport.getPlatform().toByteStreamReader(source, BYTES_PER_READ)
75+
);
76+
}
77+
8378
constructor(
84-
private bundleStream:
85-
| ReadableStream<Uint8Array | ArrayBuffer>
86-
| Uint8Array
87-
| ArrayBuffer
79+
/** The reader to read from underlying binary bundle data source. */
80+
private reader: ReadableStreamReader<Uint8Array>
8881
) {
89-
if (
90-
bundleStream instanceof Uint8Array ||
91-
bundleStream instanceof ArrayBuffer
92-
) {
93-
this.bundleStream = toReadableStream(bundleStream);
94-
}
95-
this.reader = (this.bundleStream as ReadableStream).getReader();
96-
9782
// Read the metadata (which is the first element).
9883
this.nextElementImpl().then(
9984
element => {
@@ -220,8 +205,8 @@ export class BundleReader {
220205

221206
private raiseError(message: string): void {
222207
// eslint-disable-next-line @typescript-eslint/no-floating-promises
223-
this.reader.cancel('Invalid bundle format.');
224-
throw new Error(message);
208+
this.reader.cancel();
209+
throw new Error(`Invalid bundle format: ${message}`);
225210
}
226211

227212
/**
@@ -231,11 +216,12 @@ export class BundleReader {
231216
private async pullMoreDataToBuffer(): Promise<boolean> {
232217
const result = await this.reader.read();
233218
if (!result.done) {
219+
debugAssert(!!result.value, 'Read undefined when "done" is false.');
234220
const newBuffer = new Uint8Array(
235-
this.buffer.length + result.value.length
221+
this.buffer.length + result.value!.length
236222
);
237223
newBuffer.set(this.buffer);
238-
newBuffer.set(result.value, this.buffer.length);
224+
newBuffer.set(result.value!, this.buffer.length);
239225
this.buffer = newBuffer;
240226
}
241227
return result.done;

packages/firestore/test/unit/platform/platform.test.ts

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,37 @@
1616
*/
1717

1818
import { expect } from 'chai';
19-
import { PlatformSupport } from '../../../src/platform/platform';
19+
import {
20+
PlatformSupport,
21+
toByteStreamReader
22+
} from '../../../src/platform/platform';
2023

2124
describe('Platform', () => {
2225
it('can load the platform at runtime', () => {
2326
expect(PlatformSupport.getPlatform()).to.exist;
2427
});
28+
29+
it('toByteStreamReader() steps underlying data', async () => {
30+
const encoder = new TextEncoder();
31+
const r = toByteStreamReader(
32+
encoder.encode('0123456789'),
33+
/* bytesPerRead */ 4
34+
);
35+
36+
let result = await r.read();
37+
expect(result.value).to.deep.equal(encoder.encode('0123'));
38+
expect(result.done).to.be.false;
39+
40+
result = await r.read();
41+
expect(result.value).to.deep.equal(encoder.encode('4567'));
42+
expect(result.done).to.be.false;
43+
44+
result = await r.read();
45+
expect(result.value).to.deep.equal(encoder.encode('89'));
46+
expect(result.done).to.be.false;
47+
48+
result = await r.read();
49+
expect(result.value).to.be.undefined;
50+
expect(result.done).to.be.true;
51+
});
2552
});

0 commit comments

Comments
 (0)