Skip to content

Implement global resume token #1052

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jul 31, 2018
15 changes: 2 additions & 13 deletions packages/firestore/src/core/sync_engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import { Query } from './query';
import { SnapshotVersion } from './snapshot_version';
import { TargetIdGenerator } from './target_id_generator';
import { Transaction } from './transaction';
import { BatchId, OnlineState, ProtoByteString, TargetId } from './types';
import { BatchId, OnlineState, TargetId } from './types';
import {
AddedLimboDocument,
LimboDocumentChange,
Expand Down Expand Up @@ -77,12 +77,6 @@ class QueryView {
* stream to identify this query.
*/
public targetId: TargetId,
/**
* An identifier from the datastore backend that indicates the last state
* of the results that was received. This can be used to indicate where
* to continue receiving new doc changes for the query.
*/
public resumeToken: ProtoByteString,
/**
* The view is responsible for computing the final merged truth of what
* docs are in the query. It gets notified of local and remote changes,
Expand Down Expand Up @@ -195,12 +189,7 @@ export class SyncEngine implements RemoteSyncer {
'applyChanges for new view should always return a snapshot'
);

const data = new QueryView(
query,
queryData.targetId,
queryData.resumeToken,
view
);
const data = new QueryView(query, queryData.targetId, view);
this.queryViewsByQuery.set(query, data);
this.queryViewsByTarget[queryData.targetId] = data;
this.viewHandler!([viewChange.snapshot!]);
Expand Down
6 changes: 3 additions & 3 deletions packages/firestore/src/local/indexeddb_query_cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export class IndexedDbQueryCache implements QueryCache {
constructor(private serializer: LocalSerializer) {}

/**
* The last received snapshot version. We store this seperately from the
* The last received snapshot version. We store this separately from the
* metadata to avoid the extra conversion to/from DbTimestamp.
*/
private lastRemoteSnapshotVersion = SnapshotVersion.MIN;
Expand Down Expand Up @@ -173,7 +173,7 @@ export class IndexedDbQueryCache implements QueryCache {
): PersistencePromise<QueryData | null> {
// Iterating by the canonicalId may yield more than one result because
// canonicalId values are not required to be unique per target. This query
// depends on the queryTargets index to be efficent.
// depends on the queryTargets index to be efficient.
const canonicalId = query.canonicalId();
const range = IDBKeyRange.bound(
[canonicalId, Number.NEGATIVE_INFINITY],
Expand Down Expand Up @@ -202,7 +202,7 @@ export class IndexedDbQueryCache implements QueryCache {
targetId: TargetId
): PersistencePromise<void> {
// PORTING NOTE: The reverse index (documentsTargets) is maintained by
// Indexeddb.
// IndexedDb.
const promises: Array<PersistencePromise<void>> = [];
const store = documentTargetStore(txn);
keys.forEach(key => {
Expand Down
72 changes: 68 additions & 4 deletions packages/firestore/src/local/local_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ export interface LocalWriteResult {
* unrecoverable error (should be caught / reported by the async_queue).
*/
export class LocalStore {
/**
* The maximum time to leave a resume token buffered without writing it
* out.
*/
private static readonly MAX_RESUME_TOKEN_BUFFERING_MICROS = 5 * 60 * 1e6;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can leave the name as is, but it doesn't quite match other names we use do deal with similar logic:

CLIENT_METADATA_REFRESH_INTERVAL_MS is used in multi-tab and replaces OWNER_LEASE_REFRESH_INTERVAL_MS. There is also OWNER_LEASE_MAX_AGE_MS.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RESUME_TOKEN_MAX_AGE_MICROS? (don't care strongly though)

Also, is there any meaningful context on how 5 minutes was chosen (based on discussion with Jonny, etc.) that we should capture here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done: RESUME_TOKEN_MAX_AGE_MICROS. Also added a bit of verbiage describing the hat from which this number was pulled.


/**
* The set of all mutations that have been sent but not yet been applied to
* the backend.
Expand Down Expand Up @@ -469,12 +475,22 @@ export class LocalStore {
// any preexisting value.
const resumeToken = change.resumeToken;
if (resumeToken.length > 0) {
const oldQueryData = queryData;
queryData = queryData.copy({
resumeToken,
snapshotVersion: remoteEvent.snapshotVersion
});
this.targetIds[targetId] = queryData;
promises.push(this.queryCache.updateQueryData(txn, queryData));

if (
LocalStore.shouldPersistResumeToken(
oldQueryData,
queryData,
change
)
) {
promises.push(this.queryCache.updateQueryData(txn, queryData));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment to describe the handling of snapshot version? It's seems like it would be safe to never update the snapshot version, but a comment about this would clear my doubts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added comments to the method declaration. If it's safe to omit the resume token it's safe to omit the snapshot version. Note that it's the global snapshot version that acts as the high water mark to defend against repeated changes from Watch.

}
}
}
);
Expand Down Expand Up @@ -550,6 +566,42 @@ export class LocalStore {
});
}

/**
* Returns true if the the resume token in newQueryData should be persisted.
*/
private static shouldPersistResumeToken(
oldQueryData: QueryData,
newQueryData: QueryData,
change: TargetChange
): boolean {
// Avoid clearing any existing value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This functions seems more like a shouldPersistQueryData to me. To me, that would describe the way it is used and the internal logic better than its current name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if (newQueryData.resumeToken.length === 0) return false;

// Any resume token is interesting if there isn't one already.
if (oldQueryData.resumeToken.length === 0) return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just learned something about our style guide:

"Control flow statements spanning multiple lines always use blocks for the containing code.
The exception is that if statements fitting on one line may elide the block."

Good to know :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking this up so I wouldn't have to. :-)


// Don't allow resume token changes to be buffered indefinitely. This
// allows us to be reasonably up-to-date after a crash and avoids needing
// to loop over all active queries on shutdown. Especially in the browser
// we may not get time to do anything interesting while the current tab is
// closing.
const timeDelta =
newQueryData.snapshotVersion.toMicroseconds() -
oldQueryData.snapshotVersion.toMicroseconds();
if (timeDelta >= this.MAX_RESUME_TOKEN_BUFFERING_MICROS) return true;

// Otherwise if the only thing that has changed about a target is its resume
// token it's not worth persisting. Note that the RemoteStore keeps an
// in-memory view of the currently active targets which includes the current
// resume token, so stream failure or user changes will still use an
// up-to-date resume token regardless of what we do here.
const changes =
change.addedDocuments.size +
change.modifiedDocuments.size +
change.removedDocuments.size;
return changes > 0;
}

/**
* Notify local store of the changed views to locally pin documents.
*/
Expand Down Expand Up @@ -650,10 +702,22 @@ export class LocalStore {
queryData != null,
'Tried to release nonexistent query: ' + query
);
this.localViewReferences.removeReferencesForId(queryData!.targetId);
delete this.targetIds[queryData!.targetId];

const targetId = queryData.targetId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, these should probably still be queryData! but we currently do not compile with strictNullChecks. :-( [see b/73018483]

But trying to stay compliant without actually enforcing it is a losing game, so... ¯_(ツ)_/¯

const memoryQueryData = this.targetIds[targetId];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/memoryQueryData/cachedQueryData/ ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


this.localViewReferences.removeReferencesForId(targetId);
delete this.targetIds[targetId];
if (this.garbageCollector.isEager) {
return this.queryCache.removeQueryData(txn, queryData!);
return this.queryCache.removeQueryData(txn, queryData);
} else if (
memoryQueryData.snapshotVersion > queryData.snapshotVersion
) {
// If we've been avoiding persisting the resumeToken (see
// shouldPersistResumeToken for conditions and rationale) we need to
// persist the token now because there will no longer be an
// in-memory version to fall back on.
return this.queryCache.updateQueryData(txn, memoryQueryData);
} else {
return PersistencePromise.resolve();
}
Expand Down
20 changes: 19 additions & 1 deletion packages/firestore/src/remote/watch_change.ts
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ export class WatchChangeAggregator {

/** Processes and adds the WatchTargetChange to the current set of changes. */
handleTargetChange(targetChange: WatchTargetChange): void {
targetChange.targetIds.forEach(targetId => {
this.forEachTargetId(targetChange, targetId => {
const targetState = this.ensureTargetState(targetId);
switch (targetChange.state) {
case WatchTargetChangeState.NoChange:
Expand Down Expand Up @@ -352,6 +352,24 @@ export class WatchChangeAggregator {
});
}

/**
* Iterates over all targetIds that the watch change applies to: either the
* targetIds explicitly listed in the change or the targetIds of all currently
* active targets.
*/
forEachTargetId(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Slight preference to name this just forEachTarget, but please feel free to ignore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

targetChange: WatchTargetChange,
fn: (targetId: TargetId) => void
): void {
if (targetChange.targetIds.length > 0) {
targetChange.targetIds.forEach(fn);
} else {
objUtils.forEachNumber(this.targetStates, (targetId, _) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be simplified to:

objUtils.forEachNumber(this.targetStates, targetId => fn(targetId));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be simplified to:

      objUtils.forEachNumber(this.targetStates, fn);

😛

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That loses the this context and always creates really hard to debug issues in the long run.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I think I know where you're coming from (doing objUtils.forEachNumber(..., this.someFunction) would be dangerous), I don't think it applies here since fn is just a free-standing function, not a member function... plus we're already doing the exact same construct 2 lines up.

That said, I don't care strongly in the least which option we do. I just wanted to nit your nit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

fn(targetId);
});
}
}

/**
* Handles existence filters and synthesizes deletes for filter mismatches.
* Targets that are invalidated by filter mismatches are added to
Expand Down
26 changes: 26 additions & 0 deletions packages/firestore/test/unit/specs/listen_spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -509,4 +509,30 @@ describeSpec('Listens:', [], () => {
.watchAcksFull(query, 3000)
.expectEvents(query, {});
});

specTest('Persists global resume tokens', [], () => {
const query = Query.atPath(path('collection'));
const docA = doc('collection/a', 1000, { key: 'a' });

return (
spec()
.withGCEnabled(false)
.userListens(query)
.watchAcksFull(query, 1000, docA)
.expectEvents(query, { added: [docA] })

// Some time later, watch sends an updated resume token and the user stops
// listening.
.watchSnapshots(2000, [], 'resume-token-2000')
.userUnlistens(query)
.watchRemoves(query)

.userListens(query, 'resume-token-2000')
.expectEvents(query, { added: [docA], fromCache: true })
.watchAcks(query)
.watchCurrents(query, 'resume-token-3000')
.watchSnapshots(3000)
.expectEvents(query, { fromCache: false })
);
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems pretty straightforward to test the MAX_RESUME_TOKEN_BUFFERING_MICROS logic too if we wanted to (might need to use .restart() to test re-listening without unlistening).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay fine, you caught my lazy bones out. Done.

});