-
Notifications
You must be signed in to change notification settings - Fork 938
Implement a bundle reader for Web #3097
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
a99583f
to
5e7fb89
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comments!
I replied some of them without making any code changes, because I feel we should address those before I actually change anything.
// Update the internal buffer to drop the read length and json string. | ||
this.buffer = this.buffer.slice(lengthBuffer.length + length); | ||
|
||
if (!this.metadata) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Metadata being the first element of a stream is a feature, IMO. I cannot think of a use case where metadata being in the middle of a stream is useful, and I think it's better to reject bundles without metadata upfront, than try to cope with it.
I am willing to add it if someone actually find this useful, and request for that.
// If reached end of the stream, returns a null. | ||
private async readLength(): Promise<Uint8Array | null> { | ||
let position = this.indexOfOpenBracket(); | ||
while (position < 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean we always encode length in a fixed length string, like instead of 123
, we always encode it as 00000123
?
This might be easier to implement (and less error-prone), but note when dealing with ReadableStream
we cannot control how much bytes we read back. So we still need to pull more data from the stream if it just happens that our first read is not sufficient.
); | ||
newBuffer.set(this.buffer); | ||
newBuffer.set(result.value, this.buffer.length); | ||
this.buffer = newBuffer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note we cannot control how many bytes we read from ReadableStream
, so an internal buffer cannot be avoided completely.
Currently implementation only pulls more data when necessary, so the internal buffer is already kept as small as possible when there is a need to pull and concat.
Actually, I don't think it's O(n^2)
copies in reality, because most of the data read from underlying stream is copied only once (into existing internal buffer), the part that will be copied again is the part that is the start of an element, which will be copied again when the next pull is called.
The worst case is when one element requires multiple pulls, and each pulls will have to copy the old and new buffer. I feel this might not be very common though.
|
||
function genericBundleReadingTests(bytesPerRead: number): void { | ||
// Setting up test data. | ||
const meta = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It definitely makes it shorter, but might not be as more readable as the size reduction? Because readers now have to read the built bundle content to make sure the tests are doing the right thing?
* Create a `ReadableStream` from a underlying buffer. | ||
* | ||
* @param data: Underlying buffer. | ||
* @param bytesPerRead: How many bytes to read from the underlying buffer from each read through the stream. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Line wrap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
*/ | ||
export class BundleReader { | ||
// Cached bundle metadata. | ||
private metadata?: BundleMetadata | null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All comments that describe methods, classes or members should be JSDoc. Comments about code logic are prefixed by //
.
The question you should ask yourself: Should this comment show up in code completion or API documentation? If so, it should be JSDoc /** */
. See here:
/** Our 'visibilitychange' listener if registered. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
await this.nextElement(); | ||
} | ||
|
||
return this.metadata!; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think this code is a bit surprising. It is not clear to me why nextElement()
sets this.metadata
.
I played around with your PR a bit. What do you think of these changes? https://paste.googleplex.com/5782882595373056
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks nice!
One thing I added there is the introduction of an async queue, it looks calls to nextElement()
might interleave with each other without it.
} | ||
} | ||
|
||
return element; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes linked above simplify this and the related method above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above!
} | ||
} | ||
|
||
return this.buffer.slice(0, position); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this method advance the buffer as well and return the length as a number, rather than returning the encoded byte array (this is not part of my change)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The length buffer also counts in bytes processed, we need that information for SizedBundleElement, that is why i choose to do it this way.
Alternative is to return a number (length) and another one (bytes consumed in bundles), which is also not ideal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Applied buffer advancement here.
// number of bytes, pulling more data from the underlying stream if needed. | ||
// | ||
// Returns a string decoded from the read bytes. | ||
private async readJsonString(start: number, length: number): Promise<string> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JSDoc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
} | ||
|
||
return this.textDecoder.decode(this.buffer.slice(start, start + length)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, could we advance the buffer here? It might make the code simpler as start
would always be zero now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
}; | ||
const metaString = lengthPrefixedString(meta); | ||
|
||
const doc1Meta = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One benefit of not using JSON is that you could encore types here. It might help if you assigned a strict type here:
const doc1Meta : BundleMetadata = {}
Your test is currently mostly untyped. If you change the schema, your test will still compile and may even pass depending on if you update the test expectations. By enforcing types manually, you can help us to diagnose problems in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Also fixed an error in api proto d.ts
.
'throws with ill-formatted bundle with bytesPerRead ' + bytesPerRead, | ||
async () => { | ||
await expect( | ||
parseThroughBundle('metadata: "no length prefix"', bytesPerRead) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the Web SDK, we generally assert on the actual error messages as this allows us to verify that the messages make sense. Do you mind doing this here? You can look at validation.test.ts
for examples.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
// Pulls more data from underlying stream to internal buffer. | ||
// Returns a boolean indicating whether the stream is finished. | ||
private async pullMoreDataToBuffer(): Promise<number> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JSDoc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
|
||
// Pulls more data from underlying stream to internal buffer. | ||
// Returns a boolean indicating whether the stream is finished. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you keep this as is, please update the method comment to stay that it returns the length read. I did notice however that this information is not used, so we could keep the boolean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed back to boolean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is super close. Thanks for the updates!
const payload = (element || { payload: null }).payload; | ||
this.metadata.reject( | ||
new Error(`The first element of the bundle is not a metadata, it is | ||
${JSON.stringify(payload)}`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const payload = (element || { payload: null }).payload; | |
this.metadata.reject( | |
new Error(`The first element of the bundle is not a metadata, it is | |
${JSON.stringify(payload)}`) | |
this.metadata.reject( | |
new Error(`The first element of the bundle is not a metadata element, it is: | |
${JSON.stringify(element?.payload)}`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This leads to compiler error, actually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It works for me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, my intellij keeps complaining, but compile from CLI works. I guess there is some config i need to fix for my intellij.
error => { | ||
this.metadata.reject(error); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error => { | |
this.metadata.reject(error); | |
} | |
error => this.metadata.reject(error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removes two lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
async nextElement(): Promise<SizedBundleElement | null> { | ||
// Ensures `nextElementImpl` calls are executed sequentially before they | ||
// modifies internal buffer. | ||
return this.asyncQueue.enqueue(() => this.nextElementImpl()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not read user data on the async queue. All data that is processed on the async queue needs to be verified first. For bundles, that means that the parsing should happen outside.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an async queue specifically for this particular reader, it does not affect the queue that most of the sdk is using.
If we hit an error reading the bundle, we will abort the rest anyways, so maybe here it is fine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, the queue has been removed. This improves the error handling for the user as we can distinguish between different error messages and only raise the AsyncQueue errors for internal failures.
let position = this.indexOfOpenBracket(); | ||
while (position < 0) { | ||
const done = await this.pullMoreDataToBuffer(); | ||
if (done) { | ||
if (this.buffer.length === 0) { | ||
return null; | ||
} | ||
position = this.indexOfOpenBracket(); | ||
// Underlying stream is closed, and we still cannot find a '{'. | ||
if (position < 0) { | ||
this.raiseError( | ||
'Reached the end of bundle when a length string is expected.' | ||
); | ||
} | ||
} else { | ||
position = this.indexOfOpenBracket(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we find a way to cut down on the size of this block? For example, we shouldn't invoke indexOfOpenBracket()
thrice.
Something like:
let position: number;
while ((position = this.indexOfOpenBracket()) == -1) {
const done = await this.pullMoreDataToBuffer();
if (done) {
break;
}
}
if (position < 0) {
this.raiseError('....');
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the hint!
}; | ||
const limitToLastQueryString = lengthPrefixedString(limitToLastQuery); | ||
|
||
async function getAllElement( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/getAllElement/getAllElements
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
); | ||
} | ||
|
||
async function parseThroughBundle( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reads like "passthrough" but it's not. Maybe generateBundleAndParse
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
this.reader = (this.bundleStream as ReadableStream).getReader(); | ||
|
||
this.nextElement().then( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a small inline comment before this line:
// Read the metadata (which is the first element)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all the updates!
return null; | ||
} | ||
|
||
position = this.indexOfOpenBracket(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can drop position
from the loop above and assign it here as a const.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
const payload = (element || { payload: null }).payload; | ||
this.metadata.reject( | ||
new Error(`The first element of the bundle is not a metadata, it is | ||
${JSON.stringify(payload)}`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It works for me.
The implementation is based around Web's ReadableStream. This does not work on NodeJS unfortunately. I need to fix that, but want to send this out before it gets too big.