Skip to content

Adding SnapshotVersion to SharedClientState #1151

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 39 additions & 8 deletions packages/firestore/src/core/sync_engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,10 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
.then(result => {
this.sharedClientState.addPendingMutation(result.batchId);
this.addMutationCallback(result.batchId, userCallback);
return this.emitNewSnapsAndNotifyLocalStore(result.changes);
return this.emitNewSnapsAndNotifyLocalStore(
result.changes,
SnapshotVersion.MIN
);
})
.then(() => {
return this.remoteStore.fillWritePipeline();
Expand Down Expand Up @@ -459,7 +462,11 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
}
}
);
return this.emitNewSnapsAndNotifyLocalStore(changes, remoteEvent);
return this.emitNewSnapsAndNotifyLocalStore(
changes,
remoteEvent.snapshotVersion,
remoteEvent
);
})
.catch(err => this.ignoreIfPrimaryLeaseLoss(err));
}
Expand Down Expand Up @@ -505,7 +512,12 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
this.assertSubscribed('rejectListens()');

// PORTING NOTE: Multi-tab only.
this.sharedClientState.updateQueryState(targetId, 'rejected', err);
this.sharedClientState.updateQueryState(
targetId,
SnapshotVersion.MIN,
'rejected',
err
);

const limboResolution = this.limboResolutionsByTarget[targetId];
const limboKey = limboResolution && limboResolution.key;
Expand Down Expand Up @@ -552,6 +564,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
// PORTING NOTE: Multi-tab only
async applyBatchState(
batchId: BatchId,
snapshotVersion: SnapshotVersion,
batchState: MutationBatchState,
error?: FirestoreError
): Promise<void> {
Expand Down Expand Up @@ -585,7 +598,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
fail(`Unknown batchState: ${batchState}`);
}

await this.emitNewSnapsAndNotifyLocalStore(documents);
await this.emitNewSnapsAndNotifyLocalStore(documents, snapshotVersion);
}

applySuccessfulWrite(
Expand All @@ -604,7 +617,10 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
return this.localStore
.acknowledgeBatch(mutationBatchResult)
.then(changes => {
return this.emitNewSnapsAndNotifyLocalStore(changes);
return this.emitNewSnapsAndNotifyLocalStore(
changes,
mutationBatchResult.commitVersion
);
})
.catch(err => this.ignoreIfPrimaryLeaseLoss(err));
}
Expand All @@ -621,8 +637,16 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
return this.localStore
.rejectBatch(batchId)
.then(changes => {
this.sharedClientState.updateMutationState(batchId, 'rejected', error);
return this.emitNewSnapsAndNotifyLocalStore(changes);
this.sharedClientState.updateMutationState(
batchId,
SnapshotVersion.MIN,
'rejected',
error
);
return this.emitNewSnapsAndNotifyLocalStore(
changes,
SnapshotVersion.MIN
);
})
.catch(err => this.ignoreIfPrimaryLeaseLoss(err));
}
Expand Down Expand Up @@ -759,6 +783,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {

private async emitNewSnapsAndNotifyLocalStore(
changes: MaybeDocumentMap,
snapshotVersion: SnapshotVersion,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this snapshot version? A read version? A commit version? The highest of either? What happens if a caller passes SnapshotVersion.MIN?

Looking at callers I can't tell if they're passing the right values.

Assuming that passing the commit version and the read version is legit here, should we name this highestVersion? highWatermarkVersion?

Basically, I think naming these new variables and parameters "snapshotVersion" isn't helping: these should always be named after the concrete kind of version they are.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the version that the backend associates with the given operation - either the global snapshot version or the commit version of the write. I am not sure if the name can be much more specific :/

I did however go back and changed how this method gets its argument. I am using something similar to what I do in multi-tab (applying a "fake" current change via a synthesized RemoteEvent) and have added a helper that creates a "remote event" for an acknowledged write. This remote event is then used to propagate the commit version.
This gets rid of the argument and of some of the ambiguity. What do you think?

remoteEvent?: RemoteEvent
): Promise<void> {
const newSnaps: ViewSnapshot[] = [];
Expand Down Expand Up @@ -796,6 +821,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
if (this.isPrimary) {
this.sharedClientState.updateQueryState(
queryView.targetId,
snapshotVersion,
viewChange.snapshot.fromCache ? 'not-current' : 'current'
);
}
Expand Down Expand Up @@ -860,7 +886,10 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
result.removedBatchIds,
result.addedBatchIds
);
await this.emitNewSnapsAndNotifyLocalStore(result.affectedDocuments);
await this.emitNewSnapsAndNotifyLocalStore(
result.affectedDocuments,
SnapshotVersion.MIN
);
}

await this.remoteStore.handleCredentialChange();
Expand Down Expand Up @@ -973,6 +1002,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
// PORTING NOTE: Multi-tab only
async applyTargetState(
targetId: TargetId,
snapshotVersion: SnapshotVersion,
state: QueryTargetState,
error?: FirestoreError
): Promise<void> {
Expand All @@ -994,6 +1024,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
);
return this.emitNewSnapsAndNotifyLocalStore(
changes,
snapshotVersion,
synthesizedRemoteEvent
);
}
Expand Down
1 change: 1 addition & 0 deletions packages/firestore/src/local/local_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -954,6 +954,7 @@ export class LocalStore {
// is committed. b/33446471 will remove this reliance.
this.sharedClientState.updateMutationState(
batchResult.batch.batchId,
SnapshotVersion.MIN, // TODO: Replace with batch version from commit
'acknowledged'
);
});
Expand Down
65 changes: 61 additions & 4 deletions packages/firestore/src/local/shared_client_state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import {
import { AsyncQueue } from '../util/async_queue';
import { Platform } from '../platform/platform';
import { TargetIdSet, targetIdSet } from '../model/collections';
import { SnapshotVersion } from '../core/snapshot_version';
import { Timestamp } from '../api/timestamp';

const LOG_TAG = 'SharedClientState';

Expand Down Expand Up @@ -91,6 +93,7 @@ export interface SharedClientState {
*/
updateMutationState(
batchId: BatchId,
snapshotVersion: SnapshotVersion,
state: 'acknowledged' | 'rejected',
error?: FirestoreError
): void;
Expand All @@ -113,6 +116,7 @@ export interface SharedClientState {
*/
updateQueryState(
targetId: TargetId,
snapshotVersion: SnapshotVersion,
state: QueryTargetState,
error?: FirestoreError
): void;
Expand Down Expand Up @@ -173,6 +177,7 @@ export interface SharedClientState {
* encoded as part of the key.
*/
interface MutationMetadataSchema {
snapshotVersion: { seconds: number; nanos: number };
state: MutationBatchState;
error?: { code: string; message: string }; // Only set when state === 'rejected'
}
Expand All @@ -186,6 +191,7 @@ export class MutationMetadata {
constructor(
readonly user: User,
readonly batchId: BatchId,
readonly snapshotVersion: SnapshotVersion,
readonly state: MutationBatchState,
readonly error?: FirestoreError
) {
Expand All @@ -210,6 +216,7 @@ export class MutationMetadata {
typeof mutationBatch === 'object' &&
['pending', 'acknowledged', 'rejected'].indexOf(mutationBatch.state) !==
-1 &&
isSnapshotVersion(mutationBatch.snapshotVersion) &&
(mutationBatch.error === undefined ||
typeof mutationBatch.error === 'object');

Expand All @@ -231,6 +238,12 @@ export class MutationMetadata {
return new MutationMetadata(
user,
batchId,
SnapshotVersion.fromTimestamp(
new Timestamp(
mutationBatch.snapshotVersion.seconds,
mutationBatch.snapshotVersion.nanos
)
),
mutationBatch.state,
firestoreError
);
Expand All @@ -244,7 +257,12 @@ export class MutationMetadata {
}

toLocalStorageJSON(): string {
const timestamp = this.snapshotVersion.toTimestamp();
const batchMetadata: MutationMetadataSchema = {
snapshotVersion: {
seconds: timestamp.seconds,
nanos: timestamp.nanoseconds
},
state: this.state
};

Expand All @@ -264,6 +282,7 @@ export class MutationMetadata {
* serialization. The TargetId is omitted as it is encoded as part of the key.
*/
interface QueryTargetStateSchema {
snapshotVersion: { seconds: number; nanos: number };
state: QueryTargetState;
error?: { code: string; message: string }; // Only set when state === 'rejected'
}
Expand All @@ -276,6 +295,7 @@ interface QueryTargetStateSchema {
export class QueryTargetMetadata {
constructor(
readonly targetId: TargetId,
readonly snapshotVersion: SnapshotVersion,
readonly state: QueryTargetState,
readonly error?: FirestoreError
) {
Expand All @@ -297,6 +317,7 @@ export class QueryTargetMetadata {

let validData =
typeof targetState === 'object' &&
isSnapshotVersion(targetState.snapshotVersion) &&
['not-current', 'current', 'rejected'].indexOf(targetState.state) !==
-1 &&
(targetState.error === undefined ||
Expand All @@ -319,6 +340,12 @@ export class QueryTargetMetadata {
if (validData) {
return new QueryTargetMetadata(
targetId,
SnapshotVersion.fromTimestamp(
new Timestamp(
targetState.snapshotVersion.seconds,
targetState.snapshotVersion.nanos
)
),
targetState.state,
firestoreError
);
Expand All @@ -332,7 +359,12 @@ export class QueryTargetMetadata {
}

toLocalStorageJSON(): string {
const timestamp = this.snapshotVersion.toTimestamp();
const targetState: QueryTargetStateSchema = {
snapshotVersion: {
seconds: timestamp.seconds,
nanos: timestamp.nanoseconds
},
state: this.state
};

Expand Down Expand Up @@ -654,15 +686,16 @@ export class WebStorageSharedClientState implements SharedClientState {
}

addPendingMutation(batchId: BatchId): void {
this.persistMutationState(batchId, 'pending');
this.persistMutationState(batchId, SnapshotVersion.MIN, 'pending');
}

updateMutationState(
batchId: BatchId,
snapshotVersion: SnapshotVersion,
state: 'acknowledged' | 'rejected',
error?: FirestoreError
): void {
this.persistMutationState(batchId, state, error);
this.persistMutationState(batchId, snapshotVersion, state, error);

// Once a final mutation result is observed by other clients, they no longer
// access the mutation's metadata entry. Since LocalStorage replays events
Expand Down Expand Up @@ -708,10 +741,11 @@ export class WebStorageSharedClientState implements SharedClientState {

updateQueryState(
targetId: TargetId,
snapshotVersion: SnapshotVersion,
state: QueryTargetState,
error?: FirestoreError
): void {
this.persistQueryTargetState(targetId, state, error);
this.persistQueryTargetState(targetId, snapshotVersion, state, error);
}

handleUserChange(
Expand Down Expand Up @@ -836,12 +870,14 @@ export class WebStorageSharedClientState implements SharedClientState {

private persistMutationState(
batchId: BatchId,
snapshotVersion: SnapshotVersion,
state: MutationBatchState,
error?: FirestoreError
): void {
const mutationState = new MutationMetadata(
this.currentUser,
batchId,
snapshotVersion,
state,
error
);
Expand All @@ -864,11 +900,17 @@ export class WebStorageSharedClientState implements SharedClientState {

private persistQueryTargetState(
targetId: TargetId,
snapshotVersion: SnapshotVersion,
state: QueryTargetState,
error?: FirestoreError
): void {
const targetKey = this.toLocalStorageQueryTargetMetadataKey(targetId);
const targetMetadata = new QueryTargetMetadata(targetId, state, error);
const targetMetadata = new QueryTargetMetadata(
targetId,
snapshotVersion,
state,
error
);
this.setItem(targetKey, targetMetadata.toLocalStorageJSON());
}

Expand Down Expand Up @@ -978,6 +1020,7 @@ export class WebStorageSharedClientState implements SharedClientState {

return this.syncEngine.applyBatchState(
mutationBatch.batchId,
mutationBatch.snapshotVersion,
mutationBatch.state,
mutationBatch.error
);
Expand All @@ -988,6 +1031,7 @@ export class WebStorageSharedClientState implements SharedClientState {
): Promise<void> {
return this.syncEngine.applyTargetState(
targetMetadata.targetId,
targetMetadata.snapshotVersion,
targetMetadata.state,
targetMetadata.error
);
Expand Down Expand Up @@ -1058,6 +1102,7 @@ export class MemorySharedClientState implements SharedClientState {

updateMutationState(
batchId: BatchId,
snapshotVersion: SnapshotVersion,
state: 'acknowledged' | 'rejected',
error?: FirestoreError
): void {
Expand All @@ -1071,6 +1116,7 @@ export class MemorySharedClientState implements SharedClientState {

updateQueryState(
targetId: TargetId,
snapshotVersion: SnapshotVersion,
state: QueryTargetState,
error?: FirestoreError
): void {
Expand Down Expand Up @@ -1112,3 +1158,14 @@ export class MemorySharedClientState implements SharedClientState {

shutdown(): void {}
}

function isSnapshotVersion(snapshotVersion: {
seconds: number;
nanos: number;
}): boolean {
return (
typeof snapshotVersion === 'object' &&
isSafeInteger(snapshotVersion.seconds) &&
isSafeInteger(snapshotVersion.nanos)
);
}
Loading