-
Notifications
You must be signed in to change notification settings - Fork 928
Implement global resume token #1052
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
338ac12
2051b1a
6bedde4
b4397fc
2cd9953
a4a7983
20728a5
6033958
f303c13
7d9d21d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -111,6 +111,12 @@ export interface LocalWriteResult { | |
* unrecoverable error (should be caught / reported by the async_queue). | ||
*/ | ||
export class LocalStore { | ||
/** | ||
* The maximum time to leave a resume token buffered without writing it | ||
* out. | ||
*/ | ||
private static readonly MAX_RESUME_TOKEN_BUFFERING_MICROS = 5 * 60 * 1e6; | ||
|
||
/** | ||
* The set of all mutations that have been sent but not yet been applied to | ||
* the backend. | ||
|
@@ -469,12 +475,22 @@ export class LocalStore { | |
// any preexisting value. | ||
const resumeToken = change.resumeToken; | ||
if (resumeToken.length > 0) { | ||
const oldQueryData = queryData; | ||
queryData = queryData.copy({ | ||
resumeToken, | ||
snapshotVersion: remoteEvent.snapshotVersion | ||
}); | ||
this.targetIds[targetId] = queryData; | ||
promises.push(this.queryCache.updateQueryData(txn, queryData)); | ||
|
||
if ( | ||
LocalStore.shouldPersistResumeToken( | ||
oldQueryData, | ||
queryData, | ||
change | ||
) | ||
) { | ||
promises.push(this.queryCache.updateQueryData(txn, queryData)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a comment to describe the handling of snapshot version? It's seems like it would be safe to never update the snapshot version, but a comment about this would clear my doubts. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added comments to the method declaration. If it's safe to omit the resume token it's safe to omit the snapshot version. Note that it's the global snapshot version that acts as the high water mark to defend against repeated changes from Watch. |
||
} | ||
} | ||
} | ||
); | ||
|
@@ -550,6 +566,42 @@ export class LocalStore { | |
}); | ||
} | ||
|
||
/** | ||
* Returns true if the the resume token in newQueryData should be persisted. | ||
*/ | ||
private static shouldPersistResumeToken( | ||
oldQueryData: QueryData, | ||
newQueryData: QueryData, | ||
change: TargetChange | ||
): boolean { | ||
// Avoid clearing any existing value | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This functions seems more like a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
if (newQueryData.resumeToken.length === 0) return false; | ||
|
||
// Any resume token is interesting if there isn't one already. | ||
if (oldQueryData.resumeToken.length === 0) return true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just learned something about our style guide: "Control flow statements spanning multiple lines always use blocks for the containing code. Good to know :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for looking this up so I wouldn't have to. :-) |
||
|
||
// Don't allow resume token changes to be buffered indefinitely. This | ||
// allows us to be reasonably up-to-date after a crash and avoids needing | ||
// to loop over all active queries on shutdown. Especially in the browser | ||
// we may not get time to do anything interesting while the current tab is | ||
// closing. | ||
const timeDelta = | ||
newQueryData.snapshotVersion.toMicroseconds() - | ||
oldQueryData.snapshotVersion.toMicroseconds(); | ||
if (timeDelta >= this.MAX_RESUME_TOKEN_BUFFERING_MICROS) return true; | ||
|
||
// Otherwise if the only thing that has changed about a target is its resume | ||
// token it's not worth persisting. Note that the RemoteStore keeps an | ||
// in-memory view of the currently active targets which includes the current | ||
// resume token, so stream failure or user changes will still use an | ||
// up-to-date resume token regardless of what we do here. | ||
const changes = | ||
change.addedDocuments.size + | ||
change.modifiedDocuments.size + | ||
change.removedDocuments.size; | ||
return changes > 0; | ||
} | ||
|
||
/** | ||
* Notify local store of the changed views to locally pin documents. | ||
*/ | ||
|
@@ -650,10 +702,22 @@ export class LocalStore { | |
queryData != null, | ||
'Tried to release nonexistent query: ' + query | ||
); | ||
this.localViewReferences.removeReferencesForId(queryData!.targetId); | ||
delete this.targetIds[queryData!.targetId]; | ||
|
||
const targetId = queryData.targetId; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FWIW, these should probably still be But trying to stay compliant without actually enforcing it is a losing game, so... ¯_(ツ)_/¯ |
||
const memoryQueryData = this.targetIds[targetId]; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/memoryQueryData/cachedQueryData/ ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
|
||
this.localViewReferences.removeReferencesForId(targetId); | ||
delete this.targetIds[targetId]; | ||
if (this.garbageCollector.isEager) { | ||
return this.queryCache.removeQueryData(txn, queryData!); | ||
return this.queryCache.removeQueryData(txn, queryData); | ||
} else if ( | ||
memoryQueryData.snapshotVersion > queryData.snapshotVersion | ||
) { | ||
// If we've been avoiding persisting the resumeToken (see | ||
// shouldPersistResumeToken for conditions and rationale) we need to | ||
// persist the token now because there will no longer be an | ||
// in-memory version to fall back on. | ||
return this.queryCache.updateQueryData(txn, memoryQueryData); | ||
} else { | ||
return PersistencePromise.resolve(); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -297,7 +297,7 @@ export class WatchChangeAggregator { | |
|
||
/** Processes and adds the WatchTargetChange to the current set of changes. */ | ||
handleTargetChange(targetChange: WatchTargetChange): void { | ||
targetChange.targetIds.forEach(targetId => { | ||
this.forEachTargetId(targetChange, targetId => { | ||
const targetState = this.ensureTargetState(targetId); | ||
switch (targetChange.state) { | ||
case WatchTargetChangeState.NoChange: | ||
|
@@ -352,6 +352,24 @@ export class WatchChangeAggregator { | |
}); | ||
} | ||
|
||
/** | ||
* Iterates over all targetIds that the watch change applies to: either the | ||
* targetIds explicitly listed in the change or the targetIds of all currently | ||
* active targets. | ||
*/ | ||
forEachTargetId( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Slight preference to name this just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
targetChange: WatchTargetChange, | ||
fn: (targetId: TargetId) => void | ||
): void { | ||
if (targetChange.targetIds.length > 0) { | ||
targetChange.targetIds.forEach(fn); | ||
} else { | ||
objUtils.forEachNumber(this.targetStates, (targetId, _) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be simplified to:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be simplified to:
😛 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That loses the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While I think I know where you're coming from (doing That said, I don't care strongly in the least which option we do. I just wanted to nit your nit. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
fn(targetId); | ||
}); | ||
} | ||
} | ||
|
||
/** | ||
* Handles existence filters and synthesizes deletes for filter mismatches. | ||
* Targets that are invalidated by filter mismatches are added to | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -509,4 +509,30 @@ describeSpec('Listens:', [], () => { | |
.watchAcksFull(query, 3000) | ||
.expectEvents(query, {}); | ||
}); | ||
|
||
specTest('Persists global resume tokens', [], () => { | ||
const query = Query.atPath(path('collection')); | ||
const docA = doc('collection/a', 1000, { key: 'a' }); | ||
|
||
return ( | ||
spec() | ||
.withGCEnabled(false) | ||
.userListens(query) | ||
.watchAcksFull(query, 1000, docA) | ||
.expectEvents(query, { added: [docA] }) | ||
|
||
// Some time later, watch sends an updated resume token and the user stops | ||
// listening. | ||
.watchSnapshots(2000, [], 'resume-token-2000') | ||
.userUnlistens(query) | ||
.watchRemoves(query) | ||
|
||
.userListens(query, 'resume-token-2000') | ||
.expectEvents(query, { added: [docA], fromCache: true }) | ||
.watchAcks(query) | ||
.watchCurrents(query, 'resume-token-3000') | ||
.watchSnapshots(3000) | ||
.expectEvents(query, { fromCache: false }) | ||
); | ||
}); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems pretty straightforward to test the MAX_RESUME_TOKEN_BUFFERING_MICROS logic too if we wanted to (might need to use .restart() to test re-listening without unlistening). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay fine, you caught my lazy bones out. Done. |
||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can leave the name as is, but it doesn't quite match other names we use do deal with similar logic:
CLIENT_METADATA_REFRESH_INTERVAL_MS
is used in multi-tab and replacesOWNER_LEASE_REFRESH_INTERVAL_MS
. There is alsoOWNER_LEASE_MAX_AGE_MS
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RESUME_TOKEN_MAX_AGE_MICROS? (don't care strongly though)
Also, is there any meaningful context on how 5 minutes was chosen (based on discussion with Jonny, etc.) that we should capture here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done:
RESUME_TOKEN_MAX_AGE_MICROS
. Also added a bit of verbiage describing the hat from which this number was pulled.