Skip to content

Implement limbo resolution throttling. #2790

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Mar 30, 2020
57 changes: 49 additions & 8 deletions packages/firestore/src/core/sync_engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,13 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
q.canonicalId()
);
private queriesByTarget: { [targetId: number]: Query[] } = {};
/** The keys of documents that are in limbo for which we haven't yet started a limbo resolution query. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

80 columns. Here and below. Unfortunately, the formatter doesn't enforce this for comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops. I had assumed that yarn prettier would take care of this.

private limboListenQueue: DocumentKey[] = [];
/** Keeps track of the target ID for each document that is in limbo with an active target. */
private limboTargetsByKey = new SortedMap<DocumentKey, TargetId>(
DocumentKey.comparator
);
/** Keeps track of the information about an active limbo resolution for each active target ID that was started for the purpose of limbo resolution. */
private limboResolutionsByTarget: {
[targetId: number]: LimboResolution;
} = {};
Expand All @@ -174,7 +178,8 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
private remoteStore: RemoteStore,
// PORTING NOTE: Manages state synchronization in multi-tab environments.
private sharedClientState: SharedClientState,
private currentUser: User
private currentUser: User,
private maxConcurrentLimboResolutions: number = 100
) {}

// Only used for testing.
Expand Down Expand Up @@ -486,6 +491,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
// So go ahead and remove it from bookkeeping.
this.limboTargetsByKey = this.limboTargetsByKey.remove(limboKey);
delete this.limboResolutionsByTarget[targetId];
this.pumpLimboResolutionListenQueue();

// TODO(klimt): We really only should do the following on permission
// denied errors, but we don't have the cause code here.
Expand Down Expand Up @@ -740,6 +746,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
this.remoteStore.unlisten(limboTargetId);
this.limboTargetsByKey = this.limboTargetsByKey.remove(key);
delete this.limboResolutionsByTarget[limboTargetId];
this.pumpLimboResolutionListenQueue();
}

private updateTrackedLimbos(
Expand Down Expand Up @@ -770,29 +777,63 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
const key = limboChange.key;
if (!this.limboTargetsByKey.get(key)) {
log.debug(LOG_TAG, 'New document in limbo: ' + key);
this.limboListenQueue.push(key);
this.pumpLimboResolutionListenQueue();
}
}

/**
* Starts listens for documents in limbo that are enqueued for resolution.
*
* When a document goes into limbo it is enqueued for resolution. This method
* repeatedly removes entries from the limbo resolution queue and starts a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your description here uses "limbo resolution queue". I think this supports renaming "limbo listen" to "limbo resolution".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I will update this PR to use consistent terminology.

* listen for them until either (1) the queue is empty, meaning that all
* documents that were in limbo either have active listens or have been
* resolved, or (2) the maximum number of concurrent limbo resolution listens
* has been reached.
*
* This method is invoked every time an entry is added to the limbo
* resolution queue and every time that a limbo resolution listen completes
* (either successfully or unsuccessfully). This ensures that all documents in
* limbo are eventually resolved.
*
* A maximum number of concurrent limbo resolution listens was implemented to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment buries the lede: limiting concurrent limbo resolution is a top-line feature of this method. It also focuses on the change we made instead of focusing on documenting the intended behavior and what motivated that. In particular, it's not clear why "resource exhausted" errors are necessarily a problem. I suggest moving this into the first paragraph and emphasizing what this fixes. Something like this:

Starts listens for documents in limbo that are enqueued for resolution, subject to a maximum number of concurrent resolutions. Without bounding the number of concurrent resolutions, the server can fail with resource exhausted errors which can lead to pathological client behavior as seen in #2683.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I like your comment a lot better. I've removed some of the extra explanation from the comments since they are bound to go out-of-date anyways with future changes to the sync engine.

* prevent an unbounded number of active limbo resolution listens that can
* exhaust server resources and result in "resource exhausted" errors.
*/
private pumpLimboResolutionListenQueue(): void {
while (
this.limboListenQueue.length > 0 &&
this.limboTargetsByKey.size < this.maxConcurrentLimboResolutions
) {
const key = this.limboListenQueue.shift()!;
const limboTargetId = this.limboTargetIdGenerator.next();
const query = Query.atPath(key.path);
this.limboResolutionsByTarget[limboTargetId] = new LimboResolution(key);
this.limboTargetsByKey = this.limboTargetsByKey.insert(
key,
limboTargetId
);
this.remoteStore.listen(
new TargetData(
query.toTarget(),
Query.atPath(key.path).toTarget(),
limboTargetId,
TargetPurpose.LimboResolution,
ListenSequence.INVALID
)
);
this.limboTargetsByKey = this.limboTargetsByKey.insert(
key,
limboTargetId
);
}
}

// Visible for testing
currentLimboDocs(): SortedMap<DocumentKey, TargetId> {
activeLimboDocumentResolutions(): SortedMap<DocumentKey, TargetId> {
return this.limboTargetsByKey;
}

// Visible for testing
documentsEnqueuedForLimboResolution(): DocumentKey[] {
return this.limboListenQueue;
}

private async emitNewSnapsAndNotifyLocalStore(
changes: MaybeDocumentMap,
remoteEvent?: RemoteEvent
Expand Down
Loading