@@ -148,10 +148,23 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
148
148
q . canonicalId ( )
149
149
) ;
150
150
private queriesByTarget : { [ targetId : number ] : Query [ ] } = { } ;
151
- private limboTargetsByKey = new SortedMap < DocumentKey , TargetId > (
151
+ /**
152
+ * The keys of documents that are in limbo for which we haven't yet started a
153
+ * limbo resolution query.
154
+ */
155
+ private enqueuedLimboResolutions : DocumentKey [ ] = [ ] ;
156
+ /**
157
+ * Keeps track of the target ID for each document that is in limbo with an
158
+ * active target.
159
+ */
160
+ private activeLimboTargetsByKey = new SortedMap < DocumentKey , TargetId > (
152
161
DocumentKey . comparator
153
162
) ;
154
- private limboResolutionsByTarget : {
163
+ /**
164
+ * Keeps track of the information about an active limbo resolution for each
165
+ * active target ID that was started for the purpose of limbo resolution.
166
+ */
167
+ private activeLimboResolutionsByTarget : {
155
168
[ targetId : number ] : LimboResolution ;
156
169
} = { } ;
157
170
private limboDocumentRefs = new ReferenceSet ( ) ;
@@ -174,7 +187,8 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
174
187
private remoteStore : RemoteStore ,
175
188
// PORTING NOTE: Manages state synchronization in multi-tab environments.
176
189
private sharedClientState : SharedClientState ,
177
- private currentUser : User
190
+ private currentUser : User ,
191
+ private maxConcurrentLimboResolutions : number
178
192
) { }
179
193
180
194
// Only used for testing.
@@ -400,7 +414,9 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
400
414
const changes = await this . localStore . applyRemoteEvent ( remoteEvent ) ;
401
415
// Update `receivedDocument` as appropriate for any limbo targets.
402
416
objUtils . forEach ( remoteEvent . targetChanges , ( targetId , targetChange ) => {
403
- const limboResolution = this . limboResolutionsByTarget [ Number ( targetId ) ] ;
417
+ const limboResolution = this . activeLimboResolutionsByTarget [
418
+ Number ( targetId )
419
+ ] ;
404
420
if ( limboResolution ) {
405
421
// Since this is a limbo resolution lookup, it's for a single document
406
422
// and it could be added, modified, or removed, but not a combination.
@@ -479,13 +495,16 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
479
495
// PORTING NOTE: Multi-tab only.
480
496
this . sharedClientState . updateQueryState ( targetId , 'rejected' , err ) ;
481
497
482
- const limboResolution = this . limboResolutionsByTarget [ targetId ] ;
498
+ const limboResolution = this . activeLimboResolutionsByTarget [ targetId ] ;
483
499
const limboKey = limboResolution && limboResolution . key ;
484
500
if ( limboKey ) {
485
501
// Since this query failed, we won't want to manually unlisten to it.
486
502
// So go ahead and remove it from bookkeeping.
487
- this . limboTargetsByKey = this . limboTargetsByKey . remove ( limboKey ) ;
488
- delete this . limboResolutionsByTarget [ targetId ] ;
503
+ this . activeLimboTargetsByKey = this . activeLimboTargetsByKey . remove (
504
+ limboKey
505
+ ) ;
506
+ delete this . activeLimboResolutionsByTarget [ targetId ] ;
507
+ this . pumpEnqueuedLimboResolutions ( ) ;
489
508
490
509
// TODO(klimt): We really only should do the following on permission
491
510
// denied errors, but we don't have the cause code here.
@@ -731,15 +750,16 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
731
750
private removeLimboTarget ( key : DocumentKey ) : void {
732
751
// It's possible that the target already got removed because the query failed. In that case,
733
752
// the key won't exist in `limboTargetsByKey`. Only do the cleanup if we still have the target.
734
- const limboTargetId = this . limboTargetsByKey . get ( key ) ;
753
+ const limboTargetId = this . activeLimboTargetsByKey . get ( key ) ;
735
754
if ( limboTargetId === null ) {
736
755
// This target already got removed, because the query failed.
737
756
return ;
738
757
}
739
758
740
759
this . remoteStore . unlisten ( limboTargetId ) ;
741
- this . limboTargetsByKey = this . limboTargetsByKey . remove ( key ) ;
742
- delete this . limboResolutionsByTarget [ limboTargetId ] ;
760
+ this . activeLimboTargetsByKey = this . activeLimboTargetsByKey . remove ( key ) ;
761
+ delete this . activeLimboResolutionsByTarget [ limboTargetId ] ;
762
+ this . pumpEnqueuedLimboResolutions ( ) ;
743
763
}
744
764
745
765
private updateTrackedLimbos (
@@ -768,29 +788,54 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
768
788
769
789
private trackLimboChange ( limboChange : AddedLimboDocument ) : void {
770
790
const key = limboChange . key ;
771
- if ( ! this . limboTargetsByKey . get ( key ) ) {
791
+ if ( ! this . activeLimboTargetsByKey . get ( key ) ) {
772
792
logDebug ( LOG_TAG , 'New document in limbo: ' + key ) ;
793
+ this . enqueuedLimboResolutions . push ( key ) ;
794
+ this . pumpEnqueuedLimboResolutions ( ) ;
795
+ }
796
+ }
797
+
798
+ /**
799
+ * Starts listens for documents in limbo that are enqueued for resolution,
800
+ * subject to a maximum number of concurrent resolutions.
801
+ *
802
+ * Without bounding the number of concurrent resolutions, the server can fail
803
+ * with "resource exhausted" errors which can lead to pathological client
804
+ * behavior as seen in https://github.com/firebase/firebase-js-sdk/issues/2683.
805
+ */
806
+ private pumpEnqueuedLimboResolutions ( ) : void {
807
+ while (
808
+ this . enqueuedLimboResolutions . length > 0 &&
809
+ this . activeLimboTargetsByKey . size < this . maxConcurrentLimboResolutions
810
+ ) {
811
+ const key = this . enqueuedLimboResolutions . shift ( ) ! ;
773
812
const limboTargetId = this . limboTargetIdGenerator . next ( ) ;
774
- const query = Query . atPath ( key . path ) ;
775
- this . limboResolutionsByTarget [ limboTargetId ] = new LimboResolution ( key ) ;
813
+ this . activeLimboResolutionsByTarget [ limboTargetId ] = new LimboResolution (
814
+ key
815
+ ) ;
816
+ this . activeLimboTargetsByKey = this . activeLimboTargetsByKey . insert (
817
+ key ,
818
+ limboTargetId
819
+ ) ;
776
820
this . remoteStore . listen (
777
821
new TargetData (
778
- query . toTarget ( ) ,
822
+ Query . atPath ( key . path ) . toTarget ( ) ,
779
823
limboTargetId ,
780
824
TargetPurpose . LimboResolution ,
781
825
ListenSequence . INVALID
782
826
)
783
827
) ;
784
- this . limboTargetsByKey = this . limboTargetsByKey . insert (
785
- key ,
786
- limboTargetId
787
- ) ;
788
828
}
789
829
}
790
830
791
831
// Visible for testing
792
- currentLimboDocs ( ) : SortedMap < DocumentKey , TargetId > {
793
- return this . limboTargetsByKey ;
832
+ activeLimboDocumentResolutions ( ) : SortedMap < DocumentKey , TargetId > {
833
+ return this . activeLimboTargetsByKey ;
834
+ }
835
+
836
+ // Visible for testing
837
+ enqueuedLimboDocumentResolutions ( ) : DocumentKey [ ] {
838
+ return this . enqueuedLimboResolutions ;
794
839
}
795
840
796
841
private async emitNewSnapsAndNotifyLocalStore (
@@ -936,12 +981,12 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
936
981
937
982
// PORTING NOTE: Multi-tab only.
938
983
private resetLimboDocuments ( ) : void {
939
- objUtils . forEachNumber ( this . limboResolutionsByTarget , targetId => {
984
+ objUtils . forEachNumber ( this . activeLimboResolutionsByTarget , targetId => {
940
985
this . remoteStore . unlisten ( targetId ) ;
941
986
} ) ;
942
987
this . limboDocumentRefs . removeAllReferences ( ) ;
943
- this . limboResolutionsByTarget = [ ] ;
944
- this . limboTargetsByKey = new SortedMap < DocumentKey , TargetId > (
988
+ this . activeLimboResolutionsByTarget = [ ] ;
989
+ this . activeLimboTargetsByKey = new SortedMap < DocumentKey , TargetId > (
945
990
DocumentKey . comparator
946
991
) ;
947
992
}
@@ -1138,7 +1183,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
1138
1183
}
1139
1184
1140
1185
getRemoteKeysForTarget ( targetId : TargetId ) : DocumentKeySet {
1141
- const limboResolution = this . limboResolutionsByTarget [ targetId ] ;
1186
+ const limboResolution = this . activeLimboResolutionsByTarget [ targetId ] ;
1142
1187
if ( limboResolution && limboResolution . receivedDocument ) {
1143
1188
return documentKeySet ( ) . add ( limboResolution . key ) ;
1144
1189
} else {
0 commit comments