Skip to content

Commit ab6d188

Browse files
authored
Merge 4313e51 into b0c8299
2 parents b0c8299 + 4313e51 commit ab6d188

File tree

7 files changed

+209
-71
lines changed

7 files changed

+209
-71
lines changed

packages/firestore/src/platform/platform.ts

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ export interface Platform {
5050
*/
5151
randomBytes(nBytes: number): Uint8Array;
5252

53+
/**
54+
* Builds a `ByteStreamReader` from a data source.
55+
*/
56+
toByteStreamReader(source: unknown): ByteStreamReader;
57+
5358
/** The Platform's 'window' implementation or null if not available. */
5459
readonly window: Window | null;
5560

@@ -60,6 +65,54 @@ export interface Platform {
6065
readonly base64Available: boolean;
6166
}
6267

68+
/**
69+
* An interface compatible with Web's ReadableStream.getReader() return type.
70+
*
71+
* This can be used as an abstraction to mimic `ReadableStream` where it is not
72+
* available.
73+
*/
74+
export interface ByteStreamReader {
75+
read(): Promise<ByteStreamReadResult>;
76+
cancel(reason?: string): Promise<void>;
77+
}
78+
79+
/**
80+
* An interface compatible with ReadableStreamReadResult<UInt8Array>.
81+
*/
82+
export interface ByteStreamReadResult {
83+
done: boolean;
84+
value?: Uint8Array;
85+
}
86+
87+
/**
88+
* Builds a `ByteStreamReader` from a UInt8Array.
89+
* @param source The data source to use.
90+
* @param bytesPerRead How many bytes each `read()` from the returned reader
91+
* will read.
92+
*/
93+
export function toByteStreamReader(
94+
source: Uint8Array,
95+
bytesPerRead = 10240
96+
): ByteStreamReader {
97+
let readFrom = 0;
98+
return new (class implements ByteStreamReader {
99+
async read(): Promise<ByteStreamReadResult> {
100+
if (readFrom < source.byteLength) {
101+
const result = {
102+
value: source.slice(readFrom, readFrom + bytesPerRead),
103+
done: false
104+
};
105+
readFrom += bytesPerRead;
106+
return result;
107+
}
108+
109+
return { value: undefined, done: true };
110+
}
111+
112+
async cancel(reason?: string): Promise<void> {}
113+
})();
114+
}
115+
63116
/**
64117
* Provides singleton helpers where setup code can inject a platform at runtime.
65118
* setPlatform needs to be set before Firestore is used and must be set exactly
@@ -74,6 +127,13 @@ export class PlatformSupport {
74127
PlatformSupport.platform = platform;
75128
}
76129

130+
/**
131+
* Forcing to set the platform instance, testing only!
132+
*/
133+
private static _forceSetPlatform(platform: Platform): void {
134+
PlatformSupport.platform = platform;
135+
}
136+
77137
static getPlatform(): Platform {
78138
if (!PlatformSupport.platform) {
79139
fail('Platform not set');

packages/firestore/src/platform_browser/browser_platform.ts

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,12 @@
1616
*/
1717

1818
import { DatabaseId, DatabaseInfo } from '../core/database_info';
19-
import { Platform } from '../platform/platform';
19+
import {
20+
ByteStreamReader,
21+
ByteStreamReadResult,
22+
Platform,
23+
toByteStreamReader
24+
} from '../platform/platform';
2025
import { Connection } from '../remote/connection';
2126
import { JsonProtoSerializer } from '../remote/serializer';
2227
import { ConnectivityMonitor } from './../remote/connectivity_monitor';
@@ -93,4 +98,33 @@ export class BrowserPlatform implements Platform {
9398
}
9499
return bytes;
95100
}
101+
102+
/**
103+
* On web, a `ReadableStream` is wrapped around by a `ByteStreamReader`.
104+
*/
105+
toByteStreamReader(
106+
source: Uint8Array | ArrayBuffer | ReadableStream<Uint8Array>
107+
): ByteStreamReader {
108+
if (source instanceof Uint8Array) {
109+
return toByteStreamReader(source);
110+
}
111+
if (source instanceof ArrayBuffer) {
112+
return toByteStreamReader(new Uint8Array(source));
113+
}
114+
if (source instanceof ReadableStream) {
115+
const reader = source.getReader();
116+
return new (class implements ByteStreamReader {
117+
read(): Promise<ByteStreamReadResult> {
118+
return reader.read();
119+
}
120+
121+
cancel(reason?: string): Promise<void> {
122+
return reader.cancel(reason);
123+
}
124+
})();
125+
}
126+
throw new Error(
127+
'Source of `toByteStreamReader` has to be Uint8Array, ArrayBuffer or ReadableStream'
128+
);
129+
}
96130
}

packages/firestore/src/platform_node/node_platform.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@ import { randomBytes } from 'crypto';
1919
import { inspect } from 'util';
2020

2121
import { DatabaseId, DatabaseInfo } from '../core/database_info';
22-
import { Platform } from '../platform/platform';
22+
import {
23+
ByteStreamReader,
24+
Platform,
25+
toByteStreamReader
26+
} from '../platform/platform';
2327
import { Connection } from '../remote/connection';
2428
import { JsonProtoSerializer } from '../remote/serializer';
2529
import { Code, FirestoreError } from '../util/error';
@@ -83,4 +87,14 @@ export class NodePlatform implements Platform {
8387

8488
return randomBytes(nBytes);
8589
}
90+
91+
/**
92+
* On Node, only supported data source is a `Uint8Array` for now.
93+
*/
94+
toByteStreamReader(source: unknown): ByteStreamReader {
95+
if (source instanceof Uint8Array) {
96+
return toByteStreamReader(source);
97+
}
98+
throw new Error('Source of `toByteStreamReader` has to be Uint8Array');
99+
}
86100
}

packages/firestore/src/util/bundle_reader.ts

Lines changed: 10 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import {
2020
BundleMetadata
2121
} from '../protos/firestore_bundle_proto';
2222
import { Deferred } from './promise';
23+
import { ByteStreamReader, PlatformSupport } from '../platform/platform';
24+
import { debugAssert } from './assert';
2325

2426
/**
2527
* A complete element in the bundle stream, together with the byte length it
@@ -37,30 +39,6 @@ export class SizedBundleElement {
3739
}
3840
}
3941

40-
/**
41-
* Create a `ReadableStream` from a underlying buffer.
42-
*
43-
* @param data: Underlying buffer.
44-
* @param bytesPerRead: How many bytes to read from the underlying buffer from
45-
* each read through the stream.
46-
*/
47-
export function toReadableStream(
48-
data: Uint8Array | ArrayBuffer,
49-
bytesPerRead = 10240
50-
): ReadableStream<Uint8Array | ArrayBuffer> {
51-
let readFrom = 0;
52-
return new ReadableStream({
53-
start(controller) {},
54-
async pull(controller): Promise<void> {
55-
controller.enqueue(data.slice(readFrom, readFrom + bytesPerRead));
56-
readFrom += bytesPerRead;
57-
if (readFrom >= data.byteLength) {
58-
controller.close();
59-
}
60-
}
61-
});
62-
}
63-
6442
/**
6543
* A class representing a bundle.
6644
*
@@ -70,8 +48,8 @@ export function toReadableStream(
7048
export class BundleReader {
7149
/** Cached bundle metadata. */
7250
private metadata: Deferred<BundleMetadata> = new Deferred<BundleMetadata>();
73-
/** The reader instance of the given ReadableStream. */
74-
private reader: ReadableStreamDefaultReader;
51+
/** The reader to read from underlying binary bundle data source. */
52+
private reader: ByteStreamReader;
7553
/**
7654
* Internal buffer to hold bundle content, accumulating incomplete element
7755
* content.
@@ -86,13 +64,9 @@ export class BundleReader {
8664
| Uint8Array
8765
| ArrayBuffer
8866
) {
89-
if (
90-
bundleStream instanceof Uint8Array ||
91-
bundleStream instanceof ArrayBuffer
92-
) {
93-
this.bundleStream = toReadableStream(bundleStream);
94-
}
95-
this.reader = (this.bundleStream as ReadableStream).getReader();
67+
this.reader = PlatformSupport.getPlatform().toByteStreamReader(
68+
bundleStream
69+
);
9670

9771
// Read the metadata (which is the first element).
9872
this.nextElementImpl().then(
@@ -231,11 +205,12 @@ export class BundleReader {
231205
private async pullMoreDataToBuffer(): Promise<boolean> {
232206
const result = await this.reader.read();
233207
if (!result.done) {
208+
debugAssert(!!result.value, 'Read undefined when "done" is false.');
234209
const newBuffer = new Uint8Array(
235-
this.buffer.length + result.value.length
210+
this.buffer.length + result.value!.length
236211
);
237212
newBuffer.set(this.buffer);
238-
newBuffer.set(result.value, this.buffer.length);
213+
newBuffer.set(result.value!, this.buffer.length);
239214
this.buffer = newBuffer;
240215
}
241216
return result.done;

packages/firestore/test/unit/platform/platform.test.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,34 @@
1616
*/
1717

1818
import { expect } from 'chai';
19-
import { PlatformSupport } from '../../../src/platform/platform';
19+
import {
20+
PlatformSupport,
21+
toByteStreamReader
22+
} from '../../../src/platform/platform';
2023

2124
describe('Platform', () => {
2225
it('can load the platform at runtime', () => {
2326
expect(PlatformSupport.getPlatform()).to.exist;
2427
});
28+
29+
it('toByteStreamReader() steps underlying data', async () => {
30+
const encoder = new TextEncoder();
31+
const r = toByteStreamReader(encoder.encode('0123456789'), 4);
32+
33+
let result = await r.read();
34+
expect(result.value).to.deep.equal(encoder.encode('0123'));
35+
expect(result.done).to.be.false;
36+
37+
result = await r.read();
38+
expect(result.value).to.deep.equal(encoder.encode('4567'));
39+
expect(result.done).to.be.false;
40+
41+
result = await r.read();
42+
expect(result.value).to.deep.equal(encoder.encode('89'));
43+
expect(result.done).to.be.false;
44+
45+
result = await r.read();
46+
expect(result.value).to.be.undefined;
47+
expect(result.done).to.be.true;
48+
});
2549
});

0 commit comments

Comments
 (0)