Skip to content

Remove Held Write Acks #1135

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Sep 5, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/firestore/src/api/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ import {
fieldPathFromArgument,
UserDataConverter
} from './user_data_converter';
import { SnapshotVersion } from '../core/snapshot_version';

// The objects that are a part of this API are exposed to third-parties as
// compiled javascript so we want to flag our private members with a leading
Expand Down Expand Up @@ -1226,7 +1227,8 @@ export class DocumentSnapshot implements firestore.DocumentSnapshot {

get metadata(): firestore.SnapshotMetadata {
return new SnapshotMetadata(
this._document !== null && this._document.hasLocalMutations,
this._document !== null &&
this._document.hasPendingWrites(SnapshotVersion.MIN),
this._fromCache
);
}
Expand Down
16 changes: 9 additions & 7 deletions packages/firestore/src/core/firestore_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import {
import { AutoId } from '../util/misc';
import { PersistenceSettings } from '../api/database';
import { assert } from '../util/assert';
import { SnapshotVersion } from './snapshot_version';

const LOG_TAG = 'FirestoreClient';

Expand Down Expand Up @@ -293,17 +294,15 @@ export class FirestoreClient {
// TODO(http://b/33384523): For now we just disable garbage collection
// when persistence is enabled.
this.garbageCollector = new NoOpGarbageCollector();
const storagePrefix = IndexedDbPersistence.buildStoragePrefix(
this.databaseInfo
);

// Opt to use proto3 JSON in case the platform doesn't support Uint8Array.
const serializer = new JsonProtoSerializer(this.databaseInfo.databaseId, {
useProto3Json: true
});

return Promise.resolve().then(() => {
const persistence: IndexedDbPersistence = new IndexedDbPersistence(
storagePrefix,
this.databaseInfo,
this.clientId,
this.platform,
this.asyncQueue,
Expand All @@ -322,6 +321,9 @@ export class FirestoreClient {
);
}

const storagePrefix = IndexedDbPersistence.buildStoragePrefix(
this.databaseInfo
);
this.sharedClientState = settings.experimentalTabSynchronization
? new WebStorageSharedClientState(
this.asyncQueue,
Expand Down Expand Up @@ -360,8 +362,7 @@ export class FirestoreClient {
this.localStore = new LocalStore(
this.persistence,
user,
this.garbageCollector,
this.sharedClientState
this.garbageCollector
);
const serializer = this.platform.newSerializer(
this.databaseInfo.databaseId
Expand Down Expand Up @@ -503,7 +504,8 @@ export class FirestoreClient {

const view = new View(query, remoteKeys);
const viewDocChanges: ViewDocumentChanges = view.computeDocChanges(
docs
docs,
SnapshotVersion.MIN
);
return view.applyChanges(
viewDocChanges,
Expand Down
78 changes: 64 additions & 14 deletions packages/firestore/src/core/sync_engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,10 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
.remoteDocumentKeys(queryData.targetId)
.then(remoteKeys => {
const view = new View(query, remoteKeys);
const viewDocChanges = view.computeDocChanges(docs);
const viewDocChanges = view.computeDocChanges(
docs,
SnapshotVersion.MIN
);
const synthesizedTargetChange = TargetChange.createSynthesizedTargetChangeForCurrentChange(
queryData.targetId,
current && this.onlineState !== OnlineState.Offline
Expand Down Expand Up @@ -349,7 +352,10 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
.then(result => {
this.sharedClientState.addPendingMutation(result.batchId);
this.addMutationCallback(result.batchId, userCallback);
return this.emitNewSnapsAndNotifyLocalStore(result.changes);
return this.emitNewSnapsAndNotifyLocalStore(
result.changes,
SnapshotVersion.MIN
);
})
.then(() => {
return this.remoteStore.fillWritePipeline();
Expand Down Expand Up @@ -459,7 +465,11 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
}
}
);
return this.emitNewSnapsAndNotifyLocalStore(changes, remoteEvent);
return this.emitNewSnapsAndNotifyLocalStore(
changes,
remoteEvent.snapshotVersion,
remoteEvent
);
})
.catch(err => this.ignoreIfPrimaryLeaseLoss(err));
}
Expand Down Expand Up @@ -505,7 +515,12 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
this.assertSubscribed('rejectListens()');

// PORTING NOTE: Multi-tab only.
this.sharedClientState.updateQueryState(targetId, 'rejected', err);
this.sharedClientState.updateQueryState(
targetId,
SnapshotVersion.MIN,
'rejected',
err
);

const limboResolution = this.limboResolutionsByTarget[targetId];
const limboKey = limboResolution && limboResolution.key;
Expand All @@ -527,7 +542,11 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
);
documentUpdates = documentUpdates.insert(
limboKey,
new NoDocument(limboKey, SnapshotVersion.forDeletedDoc())
new NoDocument(
limboKey,
SnapshotVersion.forDeletedDoc(),
SnapshotVersion.MIN
)
);
const resolvedLimboDocuments = documentKeySet().add(limboKey);
const event = new RemoteEvent(
Expand All @@ -552,6 +571,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
// PORTING NOTE: Multi-tab only
async applyBatchState(
batchId: BatchId,
snapshotVersion: SnapshotVersion,
batchState: MutationBatchState,
error?: FirestoreError
): Promise<void> {
Expand Down Expand Up @@ -585,7 +605,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
fail(`Unknown batchState: ${batchState}`);
}

await this.emitNewSnapsAndNotifyLocalStore(documents);
await this.emitNewSnapsAndNotifyLocalStore(documents, snapshotVersion);
}

applySuccessfulWrite(
Expand All @@ -604,7 +624,15 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
return this.localStore
.acknowledgeBatch(mutationBatchResult)
.then(changes => {
return this.emitNewSnapsAndNotifyLocalStore(changes);
this.sharedClientState.updateMutationState(
batchId,
mutationBatchResult.commitVersion,
'acknowledged'
);
return this.emitNewSnapsAndNotifyLocalStore(
changes,
mutationBatchResult.commitVersion
);
})
.catch(err => this.ignoreIfPrimaryLeaseLoss(err));
}
Expand All @@ -621,8 +649,16 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
return this.localStore
.rejectBatch(batchId)
.then(changes => {
this.sharedClientState.updateMutationState(batchId, 'rejected', error);
return this.emitNewSnapsAndNotifyLocalStore(changes);
this.sharedClientState.updateMutationState(
batchId,
SnapshotVersion.MIN,
'rejected',
error
);
return this.emitNewSnapsAndNotifyLocalStore(
changes,
SnapshotVersion.MIN
);
})
.catch(err => this.ignoreIfPrimaryLeaseLoss(err));
}
Expand Down Expand Up @@ -759,30 +795,38 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {

private async emitNewSnapsAndNotifyLocalStore(
changes: MaybeDocumentMap,
snapshotVersion: SnapshotVersion,
remoteEvent?: RemoteEvent
): Promise<void> {
const newSnaps: ViewSnapshot[] = [];
const docChangesInAllViews: LocalViewChanges[] = [];
const queriesProcessed: Array<Promise<void>> = [];

this.queryViewsByQuery.forEach((_, queryView) => {
const targetChange =
remoteEvent && remoteEvent.targetChanges[queryView.targetId];
queriesProcessed.push(
Promise.resolve()
.then(() => {
const viewDocChanges = queryView.view.computeDocChanges(changes);
const viewDocChanges = queryView.view.computeDocChanges(
changes,
snapshotVersion
);
if (!viewDocChanges.needsRefill) {
return viewDocChanges;
}
// The query has a limit and some docs were removed, so we need
// to re-run the query against the local store to make sure we
// didn't lose any good docs that had been past the limit.
return this.localStore.executeQuery(queryView.query).then(docs => {
return queryView.view.computeDocChanges(docs, viewDocChanges);
return queryView.view.computeDocChanges(
docs,
snapshotVersion,
viewDocChanges
);
});
})
.then((viewDocChanges: ViewDocumentChanges) => {
const targetChange =
remoteEvent && remoteEvent.targetChanges[queryView.targetId];
const viewChange = queryView.view.applyChanges(
viewDocChanges,
/* updateLimboDocuments= */ this.isPrimary,
Expand All @@ -796,6 +840,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
if (this.isPrimary) {
this.sharedClientState.updateQueryState(
queryView.targetId,
snapshotVersion,
viewChange.snapshot.fromCache ? 'not-current' : 'current'
);
}
Expand Down Expand Up @@ -860,7 +905,10 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
result.removedBatchIds,
result.addedBatchIds
);
await this.emitNewSnapsAndNotifyLocalStore(result.affectedDocuments);
await this.emitNewSnapsAndNotifyLocalStore(
result.affectedDocuments,
SnapshotVersion.MIN
);
}

await this.remoteStore.handleCredentialChange();
Expand Down Expand Up @@ -973,6 +1021,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
// PORTING NOTE: Multi-tab only
async applyTargetState(
targetId: TargetId,
snapshotVersion: SnapshotVersion,
state: QueryTargetState,
error?: FirestoreError
): Promise<void> {
Expand All @@ -994,6 +1043,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
);
return this.emitNewSnapsAndNotifyLocalStore(
changes,
snapshotVersion,
synthesizedRemoteEvent
);
}
Expand Down
2 changes: 1 addition & 1 deletion packages/firestore/src/core/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export class Transaction {
constructor(private datastore: Datastore) {}

private recordVersion(doc: MaybeDocument): void {
let docVersion = doc.version;
let docVersion = doc.remoteVersion;
if (doc instanceof NoDocument) {
// For deleted docs, we must use baseVersion 0 when we overwrite them.
docVersion = SnapshotVersion.forDeletedDoc();
Expand Down
Loading