15
15
* limitations under the License.
16
16
*/
17
17
18
- import * as api from '../protos/firestore_proto_api' ;
19
18
import {
20
- BundledDocumentMetadata ,
21
- BundledQuery ,
22
19
BundleElement ,
23
20
BundleMetadata
24
21
} from '../protos/firestore_bundle_proto' ;
@@ -29,9 +26,14 @@ import {
29
26
*/
30
27
export class SizedBundleElement {
31
28
constructor (
32
- public payload : BundledQuery | api . Document | BundledDocumentMetadata ,
33
- public byteLength : number
29
+ public readonly payload : BundleElement ,
30
+ // How many bytes this element takes to store in the bundle.
31
+ public readonly byteLength : number
34
32
) { }
33
+
34
+ isBundleMetadata ( ) : boolean {
35
+ return 'metadata' in this . payload ;
36
+ }
35
37
}
36
38
37
39
/**
@@ -63,7 +65,7 @@ export function toReadableStream(
63
65
* Takes a bundle stream or buffer, and presents abstractions to read bundled
64
66
* elements out of the underlying content.
65
67
*/
66
- export class Bundle {
68
+ export class BundleReader {
67
69
// Cached bundle metadata.
68
70
private metadata ?: BundleMetadata | null ;
69
71
// The reader instance of the given ReadableStream.
@@ -92,87 +94,91 @@ export class Bundle {
92
94
*/
93
95
async getMetadata ( ) : Promise < BundleMetadata > {
94
96
if ( ! this . metadata ) {
95
- const result = await this . nextElement ( ) ;
96
- if ( result === null || result instanceof SizedBundleElement ) {
97
- throw new Error ( `The first element is not metadata, it is ${ result } ` ) ;
98
- }
99
- this . metadata = ( result as BundleElement ) . metadata ;
97
+ await this . nextElement ( ) ;
100
98
}
101
99
102
100
return this . metadata ! ;
103
101
}
104
102
105
103
/**
106
- * Asynchronously iterate through all bundle elements (except bundle metadata).
104
+ * Returns the next BundleElement (together with its byte size in the bundle)
105
+ * that has not been read from underlying ReadableStream. Returns null if we
106
+ * have reached the end of the stream.
107
+ *
108
+ * Throws an error if the first element is not a BundleMetadata.
107
109
*/
108
- async * elements ( ) : AsyncIterableIterator < SizedBundleElement > {
109
- let element = await this . nextElement ( ) ;
110
- while ( element !== null ) {
111
- if ( element instanceof SizedBundleElement ) {
112
- yield element ;
110
+ async nextElement ( ) : Promise < SizedBundleElement | null > {
111
+ const element = await this . readNextElement ( ) ;
112
+ if ( ! element ) {
113
+ return element ;
114
+ }
115
+
116
+ if ( ! this . metadata ) {
117
+ if ( element . isBundleMetadata ( ) ) {
118
+ this . metadata = element . payload . metadata ;
113
119
} else {
114
- this . metadata = element . metadata ;
120
+ this . raiseError (
121
+ `The first element of the bundle is not a metadata, it is ${ JSON . stringify (
122
+ element . payload
123
+ ) } `
124
+ ) ;
115
125
}
116
- element = await this . nextElement ( ) ;
117
126
}
127
+
128
+ return element ;
118
129
}
119
130
120
- // Reads from the head of internal buffer, and pulling more data from underlying stream if a complete element
121
- // cannot be found, until an element(including the prefixed length and the JSON string) is found.
122
- //
123
- // Once a complete element is read, it is dropped from internal buffer.
124
- //
125
- // Returns either the bundled element, or null if we have reached the end of the stream.
126
- private async nextElement ( ) : Promise <
127
- BundleElement | SizedBundleElement | null
128
- > {
131
+ /**
132
+ * Reads from the head of internal buffer, and pulling more data from underlying stream if a complete element
133
+ * cannot be found, until an element(including the prefixed length and the JSON string) is found.
134
+ *
135
+ * Once a complete element is read, it is dropped from internal buffer.
136
+ *
137
+ * Returns either the bundled element, or null if we have reached the end of the stream.
138
+ */
139
+ private async readNextElement ( ) : Promise < SizedBundleElement | null > {
129
140
const lengthBuffer = await this . readLength ( ) ;
130
141
if ( lengthBuffer === null ) {
131
142
return null ;
132
143
}
133
144
134
145
const lengthString = this . textDecoder . decode ( lengthBuffer ) ;
135
- const length = parseInt ( lengthString , 10 ) ;
146
+ const length = Number ( lengthString ) ;
136
147
if ( isNaN ( length ) ) {
137
- throw new Error ( `length string (${ lengthString } ) is not valid number` ) ;
148
+ this . raiseError ( `length string (${ lengthString } ) is not valid number` ) ;
138
149
}
139
150
140
151
const jsonString = await this . readJsonString ( lengthBuffer . length , length ) ;
141
152
// Update the internal buffer to drop the read length and json string.
142
153
this . buffer = this . buffer . slice ( lengthBuffer . length + length ) ;
143
154
144
- if ( ! this . metadata ) {
145
- const element = JSON . parse ( jsonString ) as BundleElement ;
146
- return element ;
147
- } else {
148
- return new SizedBundleElement (
149
- JSON . parse ( jsonString ) ,
150
- lengthBuffer . length + length
151
- ) ;
152
- }
155
+ return new SizedBundleElement (
156
+ JSON . parse ( jsonString ) ,
157
+ lengthBuffer . length + length
158
+ ) ;
153
159
}
154
160
155
161
// First index of '{' from the underlying buffer.
156
162
private indexOfOpenBracket ( ) : number {
157
- return this . buffer . findIndex ( v => v === 123 ) ;
163
+ return this . buffer . findIndex ( v => v === '{' . charCodeAt ( 0 ) ) ;
158
164
}
159
165
160
- // Reads from the beginning of the inernal buffer, until the first '{', and return
166
+ // Reads from the beginning of the internal buffer, until the first '{', and return
161
167
// the content.
162
168
// If reached end of the stream, returns a null.
163
169
private async readLength ( ) : Promise < Uint8Array | null > {
164
170
let position = this . indexOfOpenBracket ( ) ;
165
171
while ( position < 0 ) {
166
- const done = await this . pullMoreDataToBuffer ( ) ;
167
- if ( done ) {
172
+ const bytesRead = await this . pullMoreDataToBuffer ( ) ;
173
+ if ( bytesRead < 0 ) {
168
174
if ( this . buffer . length === 0 ) {
169
175
return null ;
170
176
}
171
177
position = this . indexOfOpenBracket ( ) ;
172
178
// Underlying stream is closed, and we still cannot find a '{'.
173
179
if ( position < 0 ) {
174
- throw new Error (
175
- 'Reach to the end of bundle when a length string is expected.'
180
+ this . raiseError (
181
+ 'Reached the end of bundle when a length string is expected.'
176
182
) ;
177
183
}
178
184
} else {
@@ -189,27 +195,35 @@ export class Bundle {
189
195
// Returns a string decoded from the read bytes.
190
196
private async readJsonString ( start : number , length : number ) : Promise < string > {
191
197
while ( this . buffer . length < start + length ) {
192
- const done = await this . pullMoreDataToBuffer ( ) ;
193
- if ( done ) {
194
- throw new Error ( 'Reach to the end of bundle when more is expected.') ;
198
+ const bytesRead = await this . pullMoreDataToBuffer ( ) ;
199
+ if ( bytesRead < 0 ) {
200
+ this . raiseError ( 'Reached the end of bundle when more is expected.') ;
195
201
}
196
202
}
197
203
198
204
return this . textDecoder . decode ( this . buffer . slice ( start , start + length ) ) ;
199
205
}
200
206
207
+ private raiseError ( message : string ) : void {
208
+ // eslint-disable-next-line @typescript-eslint/no-floating-promises
209
+ this . reader . cancel ( 'Invalid bundle format.' ) ;
210
+ throw new Error ( message ) ;
211
+ }
212
+
201
213
// Pulls more data from underlying stream to internal buffer.
202
214
// Returns a boolean indicating whether the stream is finished.
203
- private async pullMoreDataToBuffer ( ) : Promise < boolean > {
215
+ private async pullMoreDataToBuffer ( ) : Promise < number > {
204
216
const result = await this . reader . read ( ) ;
217
+ let bytesRead = - 1 ;
205
218
if ( ! result . done ) {
219
+ bytesRead = result . value . length ;
206
220
const newBuffer = new Uint8Array (
207
221
this . buffer . length + result . value . length
208
222
) ;
209
223
newBuffer . set ( this . buffer ) ;
210
224
newBuffer . set ( result . value , this . buffer . length ) ;
211
225
this . buffer = newBuffer ;
212
226
}
213
- return result . done ;
227
+ return bytesRead ;
214
228
}
215
229
}
0 commit comments