@@ -191,9 +191,18 @@ class DelayedOperation<T extends unknown> implements CancelablePromise<T> {
191
191
}
192
192
}
193
193
194
+ interface PendingOp {
195
+ op : Function
196
+ signature : string
197
+ attempts : number
198
+ resolve : Function
199
+ reject : Function
200
+ promise : Promise < void >
201
+ } ;
202
+
194
203
export class AsyncQueue {
195
- // The last promise in the queue.
196
- private tail : Promise < unknown > = Promise . resolve ( ) ;
204
+ // all operations waiting to be processed
205
+ private pending : PendingOp [ ] = [ ] ;
197
206
198
207
// Is this AsyncQueue being shut down? Once it is set to true, it will not
199
208
// be changed again.
@@ -209,9 +218,7 @@ export class AsyncQueue {
209
218
// Flag set while there's an outstanding AsyncQueue operation, used for
210
219
// assertion sanity-checks.
211
220
private operationInProgress = false ;
212
-
213
- operationSignature = '<none>' ;
214
- operationStart = 0 ;
221
+ private operationStart = 0 ;
215
222
216
223
// List of TimerIds to fast-forward delays for.
217
224
private timerIdsToSkip : TimerId [ ] = [ ] ;
@@ -283,40 +290,85 @@ export class AsyncQueue {
283
290
}
284
291
285
292
private enqueueInternal < T extends unknown > ( op : ( ) => Promise < T > ) : Promise < T > {
286
- const newTail = this . tail . then ( ( ) => {
287
- this . operationStart = Date . now ( ) ;
288
- this . operationSignature = op . toString ( ) ;
289
- this . operationInProgress = true ;
290
- return op ( )
291
- . catch ( ( error : FirestoreError ) => {
292
- this . failure = error ;
293
- this . operationInProgress = false ;
294
- const message = error . stack || error . message || '' ;
295
- log . error ( 'INTERNAL UNHANDLED ERROR: ' , message ) ;
296
-
297
- // Escape the promise chain and throw the error globally so that
298
- // e.g. any global crash reporting library detects and reports it.
299
- // (but not for simulated errors in our tests since this breaks mocha)
300
- if ( message . indexOf ( 'Firestore Test Simulated Error' ) < 0 ) {
301
- setTimeout ( ( ) => {
302
- throw error ;
303
- } , 0 ) ;
304
- }
305
-
306
- // Re-throw the error so that this.tail becomes a rejected Promise and
307
- // all further attempts to chain (via .then) will just short-circuit
308
- // and return the rejected Promise.
309
- throw error ;
310
- } )
311
- . then ( result => {
312
- this . operationStart = 0 ;
313
- this . operationSignature = '<none>' ;
314
- this . operationInProgress = false ;
315
- return result ;
316
- } ) ;
293
+ let resolve : Function , reject : Function ;
294
+ const promise = new Promise ( ( res , rej ) => {
295
+ resolve = res ;
296
+ reject = rej ;
297
+ } ) ;
298
+
299
+ // @ts -ignore
300
+ this . pending . push ( { op, attempts : 0 , resolve, reject, promise} as PendingOp ) ;
301
+
302
+ if ( ! this . operationInProgress ) {
303
+ this . runNext ( ) ;
304
+ }
305
+
306
+ return promise as Promise < T > ;
307
+ }
308
+
309
+ private withTimeout < T extends unknown > ( promise : Promise < T > , ms : number ) : Promise < T > {
310
+ let hasFinished = false ;
311
+
312
+ const timeout = new Promise ( ( _ , reject ) => {
313
+ setTimeout ( ( ) => {
314
+ if ( ! hasFinished ) {
315
+ reject ( 'Timeout' ) ;
316
+ }
317
+ } , ms ) ;
318
+ } ) ;
319
+
320
+ return Promise . race ( [ promise . then ( ( ) => { hasFinished = true ; } ) , timeout ] ) as Promise < T > ;
321
+ }
322
+
323
+ private runNext ( ) : void {
324
+ if ( this . operationInProgress || this . pending . length === 0 ) { return ; }
325
+
326
+ const toRun = this . pending [ 0 ] ;
327
+ toRun . attempts ++ ;
328
+ this . operationStart = Date . now ( ) ;
329
+ this . operationInProgress = true ;
330
+
331
+ let runningOp = toRun . op ( ) ;
332
+
333
+
334
+ // @ts -ignore
335
+ if ( window . TRIGGER_FAILED_FIRESTORE_OP ) {
336
+ runningOp = new Promise ( ( ) => { } ) ;
337
+ // @ts -ignore
338
+ window . TRIGGER_FAILED_FIRESTORE_OP = false ;
339
+ }
340
+
341
+ this . withTimeout ( runningOp , 10 * 1000 ) . then ( ( res : unknown ) => {
342
+ this . operationInProgress = false ;
343
+ this . pending . shift ( ) ;
344
+ this . runNext ( ) ;
345
+ return res ;
346
+ } )
347
+ . catch ( ( e : FirestoreError ) => {
348
+ this . operationInProgress = false ;
349
+
350
+ // @ts -ignore
351
+ window . Profile ( 'log.firestoreError' , { message : e . message , signature : toRun . op . toString ( ) , attempt : toRun . attempts , duration : Date . now ( ) - this . operationStart } ) ;
352
+
353
+ // @ts -ignore
354
+ window . Bugsnag . notify ( e ) ;
355
+
356
+ // give up on the operation, throw an error, and continue on
357
+ if ( toRun . attempts >= 3 ) {
358
+ this . pending . shift ( ) ;
359
+
360
+ // Escape the promise chain and throw the error globally so that
361
+ // any global crash reporting library detects and reports it.
362
+ // (but not for simulated errors in our tests since this breaks mocha)
363
+ if ( ( e . message || '' ) . indexOf ( 'Firestore Test Simulated Error' ) < 0 ) {
364
+ setTimeout ( ( ) => { throw e ; } , 0 ) ;
365
+ }
366
+ }
367
+
368
+ // Add a slight delay before trying again. If it was an I/O problem,
369
+ // waiting often helps.
370
+ setTimeout ( ( ) => this . runNext ( ) , 5000 ) ;
317
371
} ) ;
318
- this . tail = newTail ;
319
- return newTail ;
320
372
}
321
373
322
374
/**
@@ -384,11 +436,9 @@ export class AsyncQueue {
384
436
// operations. Keep draining the queue until the tail is no longer advanced,
385
437
// which indicates that no more new operations were enqueued and that all
386
438
// operations were executed.
387
- let currentTail : Promise < unknown > ;
388
- do {
389
- currentTail = this . tail ;
390
- await currentTail ;
391
- } while ( currentTail !== this . tail ) ;
439
+ while ( this . pending . length > 0 ) {
440
+ this . runNext ( ) ;
441
+ }
392
442
}
393
443
394
444
/**
0 commit comments