Skip to content

Adding SnapshotVersion to SharedClientState #1151

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 39 additions & 7 deletions packages/firestore/src/core/sync_engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,12 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
this.assertSubscribed('rejectListens()');

// PORTING NOTE: Multi-tab only.
this.sharedClientState.updateQueryState(targetId, 'rejected', err);
this.sharedClientState.updateQueryState(
targetId,
SnapshotVersion.MIN,
'rejected',
err
);

const limboResolution = this.limboResolutionsByTarget[targetId];
const limboKey = limboResolution && limboResolution.key;
Expand Down Expand Up @@ -552,6 +557,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
// PORTING NOTE: Multi-tab only
async applyBatchState(
batchId: BatchId,
snapshotVersion: SnapshotVersion,
batchState: MutationBatchState,
error?: FirestoreError
): Promise<void> {
Expand All @@ -570,22 +576,30 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
return;
}

let remoteEvent;

if (batchState === 'pending') {
// If we are the primary client, we need to send this write to the
// backend. Secondary clients will ignore these writes since their remote
// connection is disabled.
await this.remoteStore.fillWritePipeline();
} else if (batchState === 'acknowledged' || batchState === 'rejected') {
} else if (batchState === 'acknowledged') {
remoteEvent = RemoteEvent.createSynthesizedRemoteEventForSuccessfulWrite(
snapshotVersion
);
// NOTE: Both these methods are no-ops for batches that originated from
// other clients.
this.processUserCallback(batchId, error ? error : null);

this.processUserCallback(batchId, null);
this.localStore.removeCachedMutationBatchMetadata(batchId);
} else if (batchState === 'rejected') {
assert(error !== null, 'Error not set for rejected mutation');
this.processUserCallback(batchId, error!);
this.localStore.removeCachedMutationBatchMetadata(batchId);
} else {
fail(`Unknown batchState: ${batchState}`);
}

await this.emitNewSnapsAndNotifyLocalStore(documents);
await this.emitNewSnapsAndNotifyLocalStore(documents, remoteEvent);
}

applySuccessfulWrite(
Expand All @@ -604,7 +618,13 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
return this.localStore
.acknowledgeBatch(mutationBatchResult)
.then(changes => {
return this.emitNewSnapsAndNotifyLocalStore(changes);
const synthesizedRemoteEvent = RemoteEvent.createSynthesizedRemoteEventForSuccessfulWrite(
mutationBatchResult.commitVersion
);
return this.emitNewSnapsAndNotifyLocalStore(
changes,
synthesizedRemoteEvent
);
})
.catch(err => this.ignoreIfPrimaryLeaseLoss(err));
}
Expand All @@ -621,7 +641,12 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
return this.localStore
.rejectBatch(batchId)
.then(changes => {
this.sharedClientState.updateMutationState(batchId, 'rejected', error);
this.sharedClientState.updateMutationState(
batchId,
SnapshotVersion.MIN,
'rejected',
error
);
return this.emitNewSnapsAndNotifyLocalStore(changes);
})
.catch(err => this.ignoreIfPrimaryLeaseLoss(err));
Expand Down Expand Up @@ -761,6 +786,10 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
const docChangesInAllViews: LocalViewChanges[] = [];
const queriesProcessed: Array<Promise<void>> = [];

const snapshotVersion = remoteEvent
? remoteEvent.snapshotVersion
: SnapshotVersion.MIN;

this.queryViewsByQuery.forEach((_, queryView) => {
queriesProcessed.push(
Promise.resolve()
Expand Down Expand Up @@ -792,6 +821,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
if (this.isPrimary) {
this.sharedClientState.updateQueryState(
queryView.targetId,
snapshotVersion,
viewChange.snapshot.fromCache ? 'not-current' : 'current'
);
}
Expand Down Expand Up @@ -969,6 +999,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
// PORTING NOTE: Multi-tab only
async applyTargetState(
targetId: TargetId,
snapshotVersion: SnapshotVersion,
state: QueryTargetState,
error?: FirestoreError
): Promise<void> {
Expand All @@ -985,6 +1016,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
case 'not-current': {
const changes = await this.localStore.getNewDocumentChanges();
const synthesizedRemoteEvent = RemoteEvent.createSynthesizedRemoteEventForCurrentChange(
snapshotVersion,
targetId,
state === 'current'
);
Expand Down
1 change: 1 addition & 0 deletions packages/firestore/src/local/local_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,7 @@ export class LocalStore {
// is committed. b/33446471 will remove this reliance.
this.sharedClientState.updateMutationState(
batchResult.batch.batchId,
SnapshotVersion.MIN, // TODO: Replace with batch version from commit
'acknowledged'
);
});
Expand Down
65 changes: 61 additions & 4 deletions packages/firestore/src/local/shared_client_state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import {
import { AsyncQueue } from '../util/async_queue';
import { Platform } from '../platform/platform';
import { TargetIdSet, targetIdSet } from '../model/collections';
import { SnapshotVersion } from '../core/snapshot_version';
import { Timestamp } from '../api/timestamp';

const LOG_TAG = 'SharedClientState';

Expand Down Expand Up @@ -91,6 +93,7 @@ export interface SharedClientState {
*/
updateMutationState(
batchId: BatchId,
snapshotVersion: SnapshotVersion,
state: 'acknowledged' | 'rejected',
error?: FirestoreError
): void;
Expand All @@ -113,6 +116,7 @@ export interface SharedClientState {
*/
updateQueryState(
targetId: TargetId,
snapshotVersion: SnapshotVersion,
state: QueryTargetState,
error?: FirestoreError
): void;
Expand Down Expand Up @@ -173,6 +177,7 @@ export interface SharedClientState {
* encoded as part of the key.
*/
interface MutationMetadataSchema {
snapshotVersion: { seconds: number; nanos: number };
state: MutationBatchState;
error?: { code: string; message: string }; // Only set when state === 'rejected'
}
Expand All @@ -186,6 +191,7 @@ export class MutationMetadata {
constructor(
readonly user: User,
readonly batchId: BatchId,
readonly snapshotVersion: SnapshotVersion,
readonly state: MutationBatchState,
readonly error?: FirestoreError
) {
Expand All @@ -210,6 +216,7 @@ export class MutationMetadata {
typeof mutationBatch === 'object' &&
['pending', 'acknowledged', 'rejected'].indexOf(mutationBatch.state) !==
-1 &&
isSnapshotVersion(mutationBatch.snapshotVersion) &&
(mutationBatch.error === undefined ||
typeof mutationBatch.error === 'object');

Expand All @@ -231,6 +238,12 @@ export class MutationMetadata {
return new MutationMetadata(
user,
batchId,
SnapshotVersion.fromTimestamp(
new Timestamp(
mutationBatch.snapshotVersion.seconds,
mutationBatch.snapshotVersion.nanos
)
),
mutationBatch.state,
firestoreError
);
Expand All @@ -244,7 +257,12 @@ export class MutationMetadata {
}

toLocalStorageJSON(): string {
const timestamp = this.snapshotVersion.toTimestamp();
const batchMetadata: MutationMetadataSchema = {
snapshotVersion: {
seconds: timestamp.seconds,
nanos: timestamp.nanoseconds
},
state: this.state
};

Expand All @@ -264,6 +282,7 @@ export class MutationMetadata {
* serialization. The TargetId is omitted as it is encoded as part of the key.
*/
interface QueryTargetStateSchema {
snapshotVersion: { seconds: number; nanos: number };
state: QueryTargetState;
error?: { code: string; message: string }; // Only set when state === 'rejected'
}
Expand All @@ -276,6 +295,7 @@ interface QueryTargetStateSchema {
export class QueryTargetMetadata {
constructor(
readonly targetId: TargetId,
readonly snapshotVersion: SnapshotVersion,
readonly state: QueryTargetState,
readonly error?: FirestoreError
) {
Expand All @@ -297,6 +317,7 @@ export class QueryTargetMetadata {

let validData =
typeof targetState === 'object' &&
isSnapshotVersion(targetState.snapshotVersion) &&
['not-current', 'current', 'rejected'].indexOf(targetState.state) !==
-1 &&
(targetState.error === undefined ||
Expand All @@ -319,6 +340,12 @@ export class QueryTargetMetadata {
if (validData) {
return new QueryTargetMetadata(
targetId,
SnapshotVersion.fromTimestamp(
new Timestamp(
targetState.snapshotVersion.seconds,
targetState.snapshotVersion.nanos
)
),
targetState.state,
firestoreError
);
Expand All @@ -332,7 +359,12 @@ export class QueryTargetMetadata {
}

toLocalStorageJSON(): string {
const timestamp = this.snapshotVersion.toTimestamp();
const targetState: QueryTargetStateSchema = {
snapshotVersion: {
seconds: timestamp.seconds,
nanos: timestamp.nanoseconds
},
state: this.state
};

Expand Down Expand Up @@ -654,15 +686,16 @@ export class WebStorageSharedClientState implements SharedClientState {
}

addPendingMutation(batchId: BatchId): void {
this.persistMutationState(batchId, 'pending');
this.persistMutationState(batchId, SnapshotVersion.MIN, 'pending');
}

updateMutationState(
batchId: BatchId,
snapshotVersion: SnapshotVersion,
state: 'acknowledged' | 'rejected',
error?: FirestoreError
): void {
this.persistMutationState(batchId, state, error);
this.persistMutationState(batchId, snapshotVersion, state, error);

// Once a final mutation result is observed by other clients, they no longer
// access the mutation's metadata entry. Since LocalStorage replays events
Expand Down Expand Up @@ -708,10 +741,11 @@ export class WebStorageSharedClientState implements SharedClientState {

updateQueryState(
targetId: TargetId,
snapshotVersion: SnapshotVersion,
state: QueryTargetState,
error?: FirestoreError
): void {
this.persistQueryTargetState(targetId, state, error);
this.persistQueryTargetState(targetId, snapshotVersion, state, error);
}

handleUserChange(
Expand Down Expand Up @@ -836,12 +870,14 @@ export class WebStorageSharedClientState implements SharedClientState {

private persistMutationState(
batchId: BatchId,
snapshotVersion: SnapshotVersion,
state: MutationBatchState,
error?: FirestoreError
): void {
const mutationState = new MutationMetadata(
this.currentUser,
batchId,
snapshotVersion,
state,
error
);
Expand All @@ -864,11 +900,17 @@ export class WebStorageSharedClientState implements SharedClientState {

private persistQueryTargetState(
targetId: TargetId,
snapshotVersion: SnapshotVersion,
state: QueryTargetState,
error?: FirestoreError
): void {
const targetKey = this.toLocalStorageQueryTargetMetadataKey(targetId);
const targetMetadata = new QueryTargetMetadata(targetId, state, error);
const targetMetadata = new QueryTargetMetadata(
targetId,
snapshotVersion,
state,
error
);
this.setItem(targetKey, targetMetadata.toLocalStorageJSON());
}

Expand Down Expand Up @@ -978,6 +1020,7 @@ export class WebStorageSharedClientState implements SharedClientState {

return this.syncEngine.applyBatchState(
mutationBatch.batchId,
mutationBatch.snapshotVersion,
mutationBatch.state,
mutationBatch.error
);
Expand All @@ -988,6 +1031,7 @@ export class WebStorageSharedClientState implements SharedClientState {
): Promise<void> {
return this.syncEngine.applyTargetState(
targetMetadata.targetId,
targetMetadata.snapshotVersion,
targetMetadata.state,
targetMetadata.error
);
Expand Down Expand Up @@ -1058,6 +1102,7 @@ export class MemorySharedClientState implements SharedClientState {

updateMutationState(
batchId: BatchId,
snapshotVersion: SnapshotVersion,
state: 'acknowledged' | 'rejected',
error?: FirestoreError
): void {
Expand All @@ -1071,6 +1116,7 @@ export class MemorySharedClientState implements SharedClientState {

updateQueryState(
targetId: TargetId,
snapshotVersion: SnapshotVersion,
state: QueryTargetState,
error?: FirestoreError
): void {
Expand Down Expand Up @@ -1112,3 +1158,14 @@ export class MemorySharedClientState implements SharedClientState {

shutdown(): void {}
}

function isSnapshotVersion(snapshotVersion: {
seconds: number;
nanos: number;
}): boolean {
return (
typeof snapshotVersion === 'object' &&
isSafeInteger(snapshotVersion.seconds) &&
isSafeInteger(snapshotVersion.nanos)
);
}
3 changes: 3 additions & 0 deletions packages/firestore/src/local/shared_client_state_syncer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import { BatchId, MutationBatchState, TargetId } from '../core/types';
import { FirestoreError } from '../util/error';
import { ClientId } from './shared_client_state';
import { SnapshotVersion } from '../core/snapshot_version';

/** The different states of a watch target. */
export type QueryTargetState = 'not-current' | 'current' | 'rejected';
Expand All @@ -34,13 +35,15 @@ export interface SharedClientStateSyncer {
/** Applies a mutation state to an existing batch. */
applyBatchState(
batchId: BatchId,
snapshotVersion: SnapshotVersion,
state: MutationBatchState,
error?: FirestoreError
): Promise<void>;

/** Applies a query target change from a different tab. */
applyTargetState(
targetId: TargetId,
snapshotVersion: SnapshotVersion,
state: QueryTargetState,
error?: FirestoreError
): Promise<void>;
Expand Down
Loading