-
Notifications
You must be signed in to change notification settings - Fork 940
Enable bundle reader for Node. #3167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
9602712
c5e783e
5e7fb89
1ee1615
18f0be1
aa455bf
78248cd
83160a1
24e10cb
9d6edc5
4cbe608
4313e51
de1d162
1775298
6eafb6f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,8 @@ import { Connection } from '../remote/connection'; | |
import { JsonProtoSerializer } from '../remote/serializer'; | ||
import { fail } from '../util/assert'; | ||
import { ConnectivityMonitor } from './../remote/connectivity_monitor'; | ||
import { BundleSource } from '../util/bundle_reader'; | ||
import { validatePositiveNumber } from '../util/input_validation'; | ||
|
||
/** | ||
* Provides a common interface to load anything platform dependent, e.g. | ||
|
@@ -50,6 +52,18 @@ export interface Platform { | |
*/ | ||
randomBytes(nBytes: number): Uint8Array; | ||
|
||
/** | ||
* Builds a `ByteStreamReader` from a data source. | ||
* @param source The data source to use. | ||
* @param bytesPerRead How many bytes each `read()` from the returned reader | ||
* will read. It is ignored if the passed in source does not provide | ||
* such control(example: ReadableStream). | ||
*/ | ||
toByteStreamReader( | ||
source: BundleSource, | ||
bytesPerRead: number | ||
): ByteStreamReader; | ||
|
||
/** The Platform's 'window' implementation or null if not available. */ | ||
readonly window: Window | null; | ||
|
||
|
@@ -60,6 +74,57 @@ export interface Platform { | |
readonly base64Available: boolean; | ||
} | ||
|
||
/** | ||
* An interface compatible with Web's ReadableStream.getReader() return type. | ||
* | ||
* This can be used as an abstraction to mimic `ReadableStream` where it is not | ||
* available. | ||
*/ | ||
export interface ByteStreamReader { | ||
read(): Promise<ByteStreamReadResult>; | ||
cancel(reason?: string): Promise<void>; | ||
} | ||
|
||
/** | ||
* An interface compatible with ReadableStreamReadResult<UInt8Array>. | ||
*/ | ||
export interface ByteStreamReadResult { | ||
done: boolean; | ||
value?: Uint8Array; | ||
} | ||
|
||
/** | ||
* Builds a `ByteStreamReader` from a UInt8Array. | ||
* @param source The data source to use. | ||
* @param bytesPerRead How many bytes each `read()` from the returned reader | ||
* will read. | ||
*/ | ||
export function toByteStreamReader( | ||
source: Uint8Array, | ||
bytesPerRead: number | ||
): ByteStreamReader { | ||
validatePositiveNumber('toByteStreamReader', 2, bytesPerRead); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is not a user facing method, this should be a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
let readFrom = 0; | ||
const reader: ByteStreamReader = { | ||
async read(): Promise<ByteStreamReadResult> { | ||
if (readFrom < source.byteLength) { | ||
const result = { | ||
value: source.slice(readFrom, readFrom + bytesPerRead), | ||
done: false | ||
}; | ||
readFrom += bytesPerRead; | ||
return result; | ||
} | ||
|
||
return { value: undefined, done: true }; | ||
}, | ||
|
||
async cancel(reason?: string): Promise<void> {} | ||
}; | ||
|
||
return reader; | ||
} | ||
|
||
/** | ||
* Provides singleton helpers where setup code can inject a platform at runtime. | ||
* setPlatform needs to be set before Firestore is used and must be set exactly | ||
|
@@ -74,6 +139,13 @@ export class PlatformSupport { | |
PlatformSupport.platform = platform; | ||
} | ||
|
||
/** | ||
* Forcing to set the platform instance, testing only! | ||
*/ | ||
private static _forceSetPlatform(platform: Platform): void { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can drop this now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
PlatformSupport.platform = platform; | ||
} | ||
|
||
static getPlatform(): Platform { | ||
if (!PlatformSupport.platform) { | ||
fail('Platform not set'); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,8 @@ import { | |
BundleMetadata | ||
} from '../protos/firestore_bundle_proto'; | ||
import { Deferred } from './promise'; | ||
import { ByteStreamReader, PlatformSupport } from '../platform/platform'; | ||
import { debugAssert } from './assert'; | ||
|
||
/** | ||
* A complete element in the bundle stream, together with the byte length it | ||
|
@@ -37,29 +39,19 @@ export class SizedBundleElement { | |
} | ||
} | ||
|
||
export type BundleSource = | ||
| ReadableStream<Uint8Array> | ||
| ArrayBuffer | ||
| Uint8Array; | ||
|
||
/** | ||
* Create a `ReadableStream` from a underlying buffer. | ||
* When applicable, how many bytets to read from the underlying data source | ||
* each time. | ||
* | ||
* @param data: Underlying buffer. | ||
* @param bytesPerRead: How many bytes to read from the underlying buffer from | ||
* each read through the stream. | ||
* It is not application when we don't really have control, for example, when | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/application/applicable But it could just be: "Not applicable for ReadableStreams." There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
* source is a ReadableStream. | ||
*/ | ||
export function toReadableStream( | ||
data: Uint8Array | ArrayBuffer, | ||
bytesPerRead = 10240 | ||
): ReadableStream<Uint8Array | ArrayBuffer> { | ||
let readFrom = 0; | ||
return new ReadableStream({ | ||
start(controller) {}, | ||
async pull(controller): Promise<void> { | ||
controller.enqueue(data.slice(readFrom, readFrom + bytesPerRead)); | ||
readFrom += bytesPerRead; | ||
if (readFrom >= data.byteLength) { | ||
controller.close(); | ||
} | ||
} | ||
}); | ||
} | ||
const BYTES_PER_READ = 10240; | ||
|
||
/** | ||
* A class representing a bundle. | ||
|
@@ -70,8 +62,6 @@ export function toReadableStream( | |
export class BundleReader { | ||
/** Cached bundle metadata. */ | ||
private metadata: Deferred<BundleMetadata> = new Deferred<BundleMetadata>(); | ||
/** The reader instance of the given ReadableStream. */ | ||
private reader: ReadableStreamDefaultReader; | ||
/** | ||
* Internal buffer to hold bundle content, accumulating incomplete element | ||
* content. | ||
|
@@ -80,20 +70,16 @@ export class BundleReader { | |
/** The decoder used to parse binary data into strings. */ | ||
private textDecoder = new TextDecoder('utf-8'); | ||
|
||
static fromBundleSource(source: BundleSource): BundleReader { | ||
return new BundleReader( | ||
PlatformSupport.getPlatform().toByteStreamReader(source, BYTES_PER_READ) | ||
); | ||
} | ||
|
||
constructor( | ||
private bundleStream: | ||
| ReadableStream<Uint8Array | ArrayBuffer> | ||
| Uint8Array | ||
| ArrayBuffer | ||
/** The reader to read from underlying binary bundle data source. */ | ||
private reader: ByteStreamReader | ||
) { | ||
if ( | ||
bundleStream instanceof Uint8Array || | ||
bundleStream instanceof ArrayBuffer | ||
) { | ||
this.bundleStream = toReadableStream(bundleStream); | ||
} | ||
this.reader = (this.bundleStream as ReadableStream).getReader(); | ||
|
||
// Read the metadata (which is the first element). | ||
this.nextElementImpl().then( | ||
element => { | ||
|
@@ -231,11 +217,12 @@ export class BundleReader { | |
private async pullMoreDataToBuffer(): Promise<boolean> { | ||
const result = await this.reader.read(); | ||
if (!result.done) { | ||
debugAssert(!!result.value, 'Read undefined when "done" is false.'); | ||
const newBuffer = new Uint8Array( | ||
this.buffer.length + result.value.length | ||
this.buffer.length + result.value!.length | ||
); | ||
newBuffer.set(this.buffer); | ||
newBuffer.set(result.value, this.buffer.length); | ||
newBuffer.set(result.value!, this.buffer.length); | ||
this.buffer = newBuffer; | ||
} | ||
return result.done; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you take a look at using the type
ReadableStreamReader
directly?(from lib.dom.d.ts)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the document, it seems
ReadableStreamDefaultReader
is the interface? There are several differences.releaseLock()
is not added there because it is not used. I dont have a strong preference though.ArrayBuffer | ReadableStream<UInt8Array>
I think it is fine?cancel
takes a string becauseReadableStreamDefaultReader
takes a reason for its cancel (https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader/cancel).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we would be better off if we re-used existing types. It makes our code smaller, easier to understand for outsiders and is more future proof. You are already using most of the idioms of the existing API, and re-using the type would be a good explanation of why you are doing this ("citing the source").
The change here is actually pretty simple: https://gist.github.com/schmidt-sebastian/62cabbbcab7f78f3a6e1aee7809f1b73
If we need a more complicated type, we could keep ByteStreamReader but make it:
As for 3: Right now, we ignore the reason for cancelling. Could we just make it part of the error message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
I had the assumption that ReadableStreamReader will not be available for Node, turns out the interface does exist with
d.ts
. Thanks for the patch!