-
Notifications
You must be signed in to change notification settings - Fork 930
Implement limbo resolution throttling. #2790
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
d8d7293
5900aa8
f0a2077
45802c3
545fa89
abfa6fb
4c1a20c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -148,9 +148,13 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { | |
q.canonicalId() | ||
); | ||
private queriesByTarget: { [targetId: number]: Query[] } = {}; | ||
/** The keys of documents that are in limbo for which we haven't yet started a limbo resolution query. */ | ||
private limboListenQueue: DocumentKey[] = []; | ||
/** Keeps track of the target ID for each document that is in limbo with an active target. */ | ||
private limboTargetsByKey = new SortedMap<DocumentKey, TargetId>( | ||
DocumentKey.comparator | ||
); | ||
/** Keeps track of the information about an active limbo resolution for each active target ID that was started for the purpose of limbo resolution. */ | ||
private limboResolutionsByTarget: { | ||
[targetId: number]: LimboResolution; | ||
} = {}; | ||
|
@@ -174,7 +178,8 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { | |
private remoteStore: RemoteStore, | ||
// PORTING NOTE: Manages state synchronization in multi-tab environments. | ||
private sharedClientState: SharedClientState, | ||
private currentUser: User | ||
private currentUser: User, | ||
private maxConcurrentLimboResolutions: number = 100 | ||
) {} | ||
|
||
// Only used for testing. | ||
|
@@ -486,6 +491,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { | |
// So go ahead and remove it from bookkeeping. | ||
this.limboTargetsByKey = this.limboTargetsByKey.remove(limboKey); | ||
delete this.limboResolutionsByTarget[targetId]; | ||
this.pumpLimboResolutionListenQueue(); | ||
|
||
// TODO(klimt): We really only should do the following on permission | ||
// denied errors, but we don't have the cause code here. | ||
|
@@ -740,6 +746,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { | |
this.remoteStore.unlisten(limboTargetId); | ||
this.limboTargetsByKey = this.limboTargetsByKey.remove(key); | ||
delete this.limboResolutionsByTarget[limboTargetId]; | ||
this.pumpLimboResolutionListenQueue(); | ||
} | ||
|
||
private updateTrackedLimbos( | ||
|
@@ -770,29 +777,63 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { | |
const key = limboChange.key; | ||
if (!this.limboTargetsByKey.get(key)) { | ||
log.debug(LOG_TAG, 'New document in limbo: ' + key); | ||
this.limboListenQueue.push(key); | ||
this.pumpLimboResolutionListenQueue(); | ||
} | ||
} | ||
|
||
/** | ||
* Starts listens for documents in limbo that are enqueued for resolution. | ||
* | ||
* When a document goes into limbo it is enqueued for resolution. This method | ||
* repeatedly removes entries from the limbo resolution queue and starts a | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Your description here uses "limbo resolution queue". I think this supports renaming "limbo listen" to "limbo resolution". There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. I will update this PR to use consistent terminology. |
||
* listen for them until either (1) the queue is empty, meaning that all | ||
* documents that were in limbo either have active listens or have been | ||
* resolved, or (2) the maximum number of concurrent limbo resolution listens | ||
* has been reached. | ||
* | ||
* This method is invoked every time an entry is added to the limbo | ||
* resolution queue and every time that a limbo resolution listen completes | ||
* (either successfully or unsuccessfully). This ensures that all documents in | ||
* limbo are eventually resolved. | ||
* | ||
* A maximum number of concurrent limbo resolution listens was implemented to | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment buries the lede: limiting concurrent limbo resolution is a top-line feature of this method. It also focuses on the change we made instead of focusing on documenting the intended behavior and what motivated that. In particular, it's not clear why "resource exhausted" errors are necessarily a problem. I suggest moving this into the first paragraph and emphasizing what this fixes. Something like this:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. I like your comment a lot better. I've removed some of the extra explanation from the comments since they are bound to go out-of-date anyways with future changes to the sync engine. |
||
* prevent an unbounded number of active limbo resolution listens that can | ||
* exhaust server resources and result in "resource exhausted" errors. | ||
*/ | ||
private pumpLimboResolutionListenQueue(): void { | ||
while ( | ||
this.limboListenQueue.length > 0 && | ||
this.limboTargetsByKey.size < this.maxConcurrentLimboResolutions | ||
) { | ||
const key = this.limboListenQueue.shift()!; | ||
const limboTargetId = this.limboTargetIdGenerator.next(); | ||
const query = Query.atPath(key.path); | ||
this.limboResolutionsByTarget[limboTargetId] = new LimboResolution(key); | ||
this.limboTargetsByKey = this.limboTargetsByKey.insert( | ||
key, | ||
limboTargetId | ||
); | ||
this.remoteStore.listen( | ||
new TargetData( | ||
query.toTarget(), | ||
Query.atPath(key.path).toTarget(), | ||
limboTargetId, | ||
TargetPurpose.LimboResolution, | ||
ListenSequence.INVALID | ||
) | ||
); | ||
this.limboTargetsByKey = this.limboTargetsByKey.insert( | ||
key, | ||
limboTargetId | ||
); | ||
} | ||
} | ||
|
||
// Visible for testing | ||
currentLimboDocs(): SortedMap<DocumentKey, TargetId> { | ||
activeLimboDocumentResolutions(): SortedMap<DocumentKey, TargetId> { | ||
return this.limboTargetsByKey; | ||
} | ||
|
||
// Visible for testing | ||
documentsEnqueuedForLimboResolution(): DocumentKey[] { | ||
return this.limboListenQueue; | ||
} | ||
|
||
private async emitNewSnapsAndNotifyLocalStore( | ||
changes: MaybeDocumentMap, | ||
remoteEvent?: RemoteEvent | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
80 columns. Here and below. Unfortunately, the formatter doesn't enforce this for comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops. I had assumed that
yarn prettier
would take care of this.