Skip to content

Commit 4d0eb03

Browse files
committed
Initial commit of bundle reading - for web only.
1 parent 32f93ff commit 4d0eb03

File tree

2 files changed

+568
-0
lines changed

2 files changed

+568
-0
lines changed

packages/firestore/src/util/bundle.ts

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
/**
2+
* @license
3+
* Copyright 2020 Google LLC
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
import * as api from '../protos/firestore_proto_api';
19+
import {
20+
BundledDocumentMetadata,
21+
BundledQuery,
22+
BundleElement,
23+
BundleMetadata
24+
} from '../protos/firestore_bundle_proto';
25+
26+
/**
27+
* A complete element in the bundle stream, together with the byte length it
28+
* occupies in the stream.
29+
*/
30+
export class SizedBundleElement {
31+
constructor(
32+
public payload: BundledQuery | api.Document | BundledDocumentMetadata,
33+
public byteLength: number
34+
) {}
35+
}
36+
37+
/**
38+
* Create a `ReadableStream` from a underlying buffer.
39+
*
40+
* @param data: Underlying buffer.
41+
* @param bytesPerRead: How many bytes to read from the underlying buffer from each read through the stream.
42+
*/
43+
export function toReadableStream(
44+
data: Uint8Array | ArrayBuffer,
45+
bytesPerRead = 10240
46+
): ReadableStream<Uint8Array | ArrayBuffer> {
47+
let readFrom = 0;
48+
return new ReadableStream({
49+
start(controller) {},
50+
async pull(controller): Promise<void> {
51+
controller.enqueue(data.slice(readFrom, readFrom + bytesPerRead));
52+
readFrom += bytesPerRead;
53+
if (readFrom >= data.byteLength) {
54+
controller.close();
55+
}
56+
}
57+
});
58+
}
59+
60+
/**
61+
* A class representing a bundle.
62+
*
63+
* Takes a bundle stream or buffer, and presents abstractions to read bundled
64+
* elements out of the underlying content.
65+
*/
66+
export class Bundle {
67+
// Cached bundle metadata.
68+
private metadata?: BundleMetadata | null;
69+
// The reader instance of the given ReadableStream.
70+
private reader: ReadableStreamDefaultReader;
71+
// Internal buffer to hold bundle content, accumulating incomplete element content.
72+
private buffer: Uint8Array = new Uint8Array();
73+
private textDecoder = new TextDecoder('utf-8');
74+
75+
constructor(
76+
private bundleStream:
77+
| ReadableStream<Uint8Array | ArrayBuffer>
78+
| Uint8Array
79+
| ArrayBuffer
80+
) {
81+
if (
82+
bundleStream instanceof Uint8Array ||
83+
bundleStream instanceof ArrayBuffer
84+
) {
85+
this.bundleStream = toReadableStream(bundleStream);
86+
}
87+
this.reader = (this.bundleStream as ReadableStream).getReader();
88+
}
89+
90+
/**
91+
* Returns the metadata of the bundle.
92+
*/
93+
async getMetadata(): Promise<BundleMetadata> {
94+
if (!this.metadata) {
95+
const result = await this.nextElement();
96+
if (result === null || result instanceof SizedBundleElement) {
97+
throw new Error(`The first element is not metadata, it is ${result}`);
98+
}
99+
this.metadata = (result as BundleElement).metadata;
100+
}
101+
102+
return this.metadata!;
103+
}
104+
105+
/**
106+
* Asynchronously iterate through all bundle elements (except bundle metadata).
107+
*/
108+
async *elements(): AsyncIterableIterator<SizedBundleElement> {
109+
let element = await this.nextElement();
110+
while (element !== null) {
111+
if (element instanceof SizedBundleElement) {
112+
yield element;
113+
} else {
114+
this.metadata = element.metadata;
115+
}
116+
element = await this.nextElement();
117+
}
118+
}
119+
120+
// Reads from the head of internal buffer, and pulling more data from underlying stream if a complete element
121+
// cannot be found, until an element(including the prefixed length and the JSON string) is found.
122+
//
123+
// Once a complete element is read, it is dropped from internal buffer.
124+
//
125+
// Returns either the bundled element, or null if we have reached the end of the stream.
126+
private async nextElement(): Promise<
127+
BundleElement | SizedBundleElement | null
128+
> {
129+
const lengthBuffer = await this.readLength();
130+
if (lengthBuffer === null) {
131+
return null;
132+
}
133+
134+
const lengthString = this.textDecoder.decode(lengthBuffer);
135+
const length = parseInt(lengthString, 10);
136+
if (isNaN(length)) {
137+
throw new Error(`length string (${lengthString}) is not valid number`);
138+
}
139+
140+
const jsonString = await this.readJsonString(lengthBuffer.length, length);
141+
// Update the internal buffer to drop the read length and json string.
142+
this.buffer = this.buffer.slice(lengthBuffer.length + length);
143+
144+
if (!this.metadata) {
145+
const element = JSON.parse(jsonString) as BundleElement;
146+
return element;
147+
} else {
148+
return new SizedBundleElement(
149+
JSON.parse(jsonString),
150+
lengthBuffer.length + length
151+
);
152+
}
153+
}
154+
155+
// First index of '{' from the underlying buffer.
156+
private indexOfOpenBracket(): number {
157+
return this.buffer.findIndex(v => v === 123);
158+
}
159+
160+
// Reads from the beginning of the inernal buffer, until the first '{', and return
161+
// the content.
162+
// If reached end of the stream, returns a null.
163+
private async readLength(): Promise<Uint8Array | null> {
164+
let position = this.indexOfOpenBracket();
165+
while (position < 0) {
166+
const done = await this.pullMoreDataToBuffer();
167+
if (done) {
168+
if (this.buffer.length === 0) {
169+
return null;
170+
}
171+
position = this.indexOfOpenBracket();
172+
// Underlying stream is closed, and we still cannot find a '{'.
173+
if (position < 0) {
174+
throw new Error(
175+
'Reach to the end of bundle when a length string is expected.'
176+
);
177+
}
178+
} else {
179+
position = this.indexOfOpenBracket();
180+
}
181+
}
182+
183+
return this.buffer.slice(0, position);
184+
}
185+
186+
// Reads from a specified position from the internal buffer, for a specified
187+
// number of bytes, pulling more data from the underlying stream if needed.
188+
//
189+
// Returns a string decoded from the read bytes.
190+
private async readJsonString(start: number, length: number): Promise<string> {
191+
while (this.buffer.length < start + length) {
192+
const done = await this.pullMoreDataToBuffer();
193+
if (done) {
194+
throw new Error('Reach to the end of bundle when more is expected.');
195+
}
196+
}
197+
198+
return this.textDecoder.decode(this.buffer.slice(start, start + length));
199+
}
200+
201+
// Pulls more data from underlying stream to internal buffer.
202+
// Returns a boolean indicating whether the stream is finished.
203+
private async pullMoreDataToBuffer(): Promise<boolean> {
204+
const result = await this.reader.read();
205+
if (!result.done) {
206+
const newBuffer = new Uint8Array(
207+
this.buffer.length + result.value.length
208+
);
209+
newBuffer.set(this.buffer);
210+
newBuffer.set(result.value, this.buffer.length);
211+
this.buffer = newBuffer;
212+
}
213+
return result.done;
214+
}
215+
}

0 commit comments

Comments
 (0)