@@ -19,6 +19,8 @@ import {
19
19
BundleElement ,
20
20
BundleMetadata
21
21
} from '../protos/firestore_bundle_proto' ;
22
+ import { Deferred } from './promise' ;
23
+ import { AsyncQueue } from './async_queue' ;
22
24
23
25
/**
24
26
* A complete element in the bundle stream, together with the byte length it
@@ -40,7 +42,8 @@ export class SizedBundleElement {
40
42
* Create a `ReadableStream` from a underlying buffer.
41
43
*
42
44
* @param data: Underlying buffer.
43
- * @param bytesPerRead: How many bytes to read from the underlying buffer from each read through the stream.
45
+ * @param bytesPerRead: How many bytes to read from the underlying buffer from
46
+ * each read through the stream.
44
47
*/
45
48
export function toReadableStream (
46
49
data : Uint8Array | ArrayBuffer ,
@@ -66,19 +69,25 @@ export function toReadableStream(
66
69
* elements out of the underlying content.
67
70
*/
68
71
export class BundleReader {
69
- // Cached bundle metadata.
70
- private metadata ?: BundleMetadata | null ;
71
- // The reader instance of the given ReadableStream.
72
+ /** Cached bundle metadata. */
73
+ private metadata : Deferred < BundleMetadata > = new Deferred < BundleMetadata > ( ) ;
74
+ /** The reader instance of the given ReadableStream. */
72
75
private reader : ReadableStreamDefaultReader ;
73
- // Internal buffer to hold bundle content, accumulating incomplete element content.
76
+ /**
77
+ * Internal buffer to hold bundle content, accumulating incomplete element
78
+ * content.
79
+ */
74
80
private buffer : Uint8Array = new Uint8Array ( ) ;
81
+ /** The decoder used to parse binary data into strings. */
75
82
private textDecoder = new TextDecoder ( 'utf-8' ) ;
76
83
77
84
constructor (
78
85
private bundleStream :
79
86
| ReadableStream < Uint8Array | ArrayBuffer >
80
87
| Uint8Array
81
- | ArrayBuffer
88
+ | ArrayBuffer ,
89
+ /** Async queue used to perform bundle reading. */
90
+ private asyncQueue : AsyncQueue = new AsyncQueue ( )
82
91
) {
83
92
if (
84
93
bundleStream instanceof Uint8Array ||
@@ -87,56 +96,54 @@ export class BundleReader {
87
96
this . bundleStream = toReadableStream ( bundleStream ) ;
88
97
}
89
98
this . reader = ( this . bundleStream as ReadableStream ) . getReader ( ) ;
99
+
100
+ this . nextElement ( ) . then (
101
+ element => {
102
+ if ( element && element . isBundleMetadata ( ) ) {
103
+ this . metadata . resolve ( element . payload . metadata ! ) ;
104
+ } else {
105
+ const payload = ( element || { payload : null } ) . payload ;
106
+ this . metadata . reject (
107
+ new Error ( `The first element of the bundle is not a metadata, it is
108
+ ${ JSON . stringify ( payload ) } ` )
109
+ ) ;
110
+ }
111
+ } ,
112
+ error => {
113
+ this . metadata . reject ( error ) ;
114
+ }
115
+ ) ;
90
116
}
91
117
92
118
/**
93
119
* Returns the metadata of the bundle.
94
120
*/
95
121
async getMetadata ( ) : Promise < BundleMetadata > {
96
- if ( ! this . metadata ) {
97
- await this . nextElement ( ) ;
98
- }
99
-
100
- return this . metadata ! ;
122
+ return this . metadata . promise ;
101
123
}
102
124
103
125
/**
104
126
* Returns the next BundleElement (together with its byte size in the bundle)
105
127
* that has not been read from underlying ReadableStream. Returns null if we
106
128
* have reached the end of the stream.
107
- *
108
- * Throws an error if the first element is not a BundleMetadata.
109
129
*/
110
130
async nextElement ( ) : Promise < SizedBundleElement | null > {
111
- const element = await this . readNextElement ( ) ;
112
- if ( ! element ) {
113
- return element ;
114
- }
115
-
116
- if ( ! this . metadata ) {
117
- if ( element . isBundleMetadata ( ) ) {
118
- this . metadata = element . payload . metadata ;
119
- } else {
120
- this . raiseError (
121
- `The first element of the bundle is not a metadata, it is ${ JSON . stringify (
122
- element . payload
123
- ) } `
124
- ) ;
125
- }
126
- }
127
-
128
- return element ;
131
+ // Ensures `nextElementImpl` calls are executed sequentially before they
132
+ // modifies internal buffer.
133
+ return this . asyncQueue . enqueue ( ( ) => this . nextElementImpl ( ) ) ;
129
134
}
130
135
131
136
/**
132
- * Reads from the head of internal buffer, and pulling more data from underlying stream if a complete element
133
- * cannot be found, until an element(including the prefixed length and the JSON string) is found.
137
+ * Reads from the head of internal buffer, and pulling more data from
138
+ * underlying stream if a complete element cannot be found, until an
139
+ * element(including the prefixed length and the JSON string) is found.
134
140
*
135
141
* Once a complete element is read, it is dropped from internal buffer.
136
142
*
137
- * Returns either the bundled element, or null if we have reached the end of the stream.
143
+ * Returns either the bundled element, or null if we have reached the end of
144
+ * the stream.
138
145
*/
139
- private async readNextElement ( ) : Promise < SizedBundleElement | null > {
146
+ private async nextElementImpl ( ) : Promise < SizedBundleElement | null > {
140
147
const lengthBuffer = await this . readLength ( ) ;
141
148
if ( lengthBuffer === null ) {
142
149
return null ;
@@ -148,29 +155,30 @@ export class BundleReader {
148
155
this . raiseError ( `length string (${ lengthString } ) is not valid number` ) ;
149
156
}
150
157
151
- const jsonString = await this . readJsonString ( lengthBuffer . length , length ) ;
152
- // Update the internal buffer to drop the read length and json string.
153
- this . buffer = this . buffer . slice ( lengthBuffer . length + length ) ;
158
+ const jsonString = await this . readJsonString ( length ) ;
154
159
155
160
return new SizedBundleElement (
156
161
JSON . parse ( jsonString ) ,
157
162
lengthBuffer . length + length
158
163
) ;
159
164
}
160
165
161
- // First index of '{' from the underlying buffer.
166
+ /** First index of '{' from the underlying buffer. */
162
167
private indexOfOpenBracket ( ) : number {
163
168
return this . buffer . findIndex ( v => v === '{' . charCodeAt ( 0 ) ) ;
164
169
}
165
170
166
- // Reads from the beginning of the internal buffer, until the first '{', and return
167
- // the content.
168
- // If reached end of the stream, returns a null.
171
+ /**
172
+ * Reads from the beginning of the internal buffer, until the first '{', and
173
+ * return the content.
174
+ *
175
+ * If reached end of the stream, returns a null.
176
+ */
169
177
private async readLength ( ) : Promise < Uint8Array | null > {
170
178
let position = this . indexOfOpenBracket ( ) ;
171
179
while ( position < 0 ) {
172
- const bytesRead = await this . pullMoreDataToBuffer ( ) ;
173
- if ( bytesRead < 0 ) {
180
+ const done = await this . pullMoreDataToBuffer ( ) ;
181
+ if ( done ) {
174
182
if ( this . buffer . length === 0 ) {
175
183
return null ;
176
184
}
@@ -186,22 +194,30 @@ export class BundleReader {
186
194
}
187
195
}
188
196
189
- return this . buffer . slice ( 0 , position ) ;
197
+ const result = this . buffer . slice ( 0 , position ) ;
198
+ // Update the internal buffer to drop the read length.
199
+ this . buffer = this . buffer . slice ( position ) ;
200
+ return result ;
190
201
}
191
202
192
- // Reads from a specified position from the internal buffer, for a specified
193
- // number of bytes, pulling more data from the underlying stream if needed.
194
- //
195
- // Returns a string decoded from the read bytes.
196
- private async readJsonString ( start : number , length : number ) : Promise < string > {
197
- while ( this . buffer . length < start + length ) {
198
- const bytesRead = await this . pullMoreDataToBuffer ( ) ;
199
- if ( bytesRead < 0 ) {
203
+ /**
204
+ * Reads from a specified position from the internal buffer, for a specified
205
+ * number of bytes, pulling more data from the underlying stream if needed.
206
+ *
207
+ * Returns a string decoded from the read bytes.
208
+ */
209
+ private async readJsonString ( length : number ) : Promise < string > {
210
+ while ( this . buffer . length < length ) {
211
+ const done = await this . pullMoreDataToBuffer ( ) ;
212
+ if ( done ) {
200
213
this . raiseError ( 'Reached the end of bundle when more is expected.' ) ;
201
214
}
202
215
}
203
216
204
- return this . textDecoder . decode ( this . buffer . slice ( start , start + length ) ) ;
217
+ const result = this . textDecoder . decode ( this . buffer . slice ( 0 , length ) ) ;
218
+ // Update the internal buffer to drop the read json string.
219
+ this . buffer = this . buffer . slice ( length ) ;
220
+ return result ;
205
221
}
206
222
207
223
private raiseError ( message : string ) : void {
@@ -210,20 +226,20 @@ export class BundleReader {
210
226
throw new Error ( message ) ;
211
227
}
212
228
213
- // Pulls more data from underlying stream to internal buffer.
214
- // Returns a boolean indicating whether the stream is finished.
215
- private async pullMoreDataToBuffer ( ) : Promise < number > {
229
+ /**
230
+ * Pulls more data from underlying stream to internal buffer.
231
+ * Returns a boolean indicating whether the stream is finished.
232
+ */
233
+ private async pullMoreDataToBuffer ( ) : Promise < boolean > {
216
234
const result = await this . reader . read ( ) ;
217
- let bytesRead = - 1 ;
218
235
if ( ! result . done ) {
219
- bytesRead = result . value . length ;
220
236
const newBuffer = new Uint8Array (
221
237
this . buffer . length + result . value . length
222
238
) ;
223
239
newBuffer . set ( this . buffer ) ;
224
240
newBuffer . set ( result . value , this . buffer . length ) ;
225
241
this . buffer = newBuffer ;
226
242
}
227
- return bytesRead ;
243
+ return result . done ;
228
244
}
229
245
}
0 commit comments