Skip to content

Remove Held Write Acks #1135

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Sep 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 30 additions & 14 deletions packages/firestore/src/api/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -593,9 +593,21 @@ export class Transaction implements firestore.Transaction {
}
const doc = docs[0];
if (doc instanceof NoDocument) {
return new DocumentSnapshot(this._firestore, ref._key, null, false);
return new DocumentSnapshot(
this._firestore,
ref._key,
null,
/* fromCache= */ false,
/* hasPendingWrites= */ false
);
} else if (doc instanceof Document) {
return new DocumentSnapshot(this._firestore, ref._key, doc, false);
return new DocumentSnapshot(
this._firestore,
ref._key,
doc,
/* fromCache= */ false,
/* hasPendingWrites= */ false
);
} else {
throw fail(
`BatchGetDocumentsRequest returned unexpected document type: ${
Expand Down Expand Up @@ -1044,7 +1056,8 @@ export class DocumentReference implements firestore.DocumentReference {
this.firestore,
this._key,
doc,
snapshot.fromCache
snapshot.fromCache,
snapshot.hasPendingWrites
)
);
}
Expand Down Expand Up @@ -1086,7 +1099,8 @@ export class DocumentReference implements firestore.DocumentReference {
this.firestore,
this._key,
doc,
/*fromCache=*/ true
/*fromCache=*/ true,
doc instanceof Document ? doc.hasLocalMutations : false
)
);
}, reject);
Expand Down Expand Up @@ -1177,7 +1191,8 @@ export class DocumentSnapshot implements firestore.DocumentSnapshot {
private _firestore: Firestore,
private _key: DocumentKey,
public _document: Document | null,
private _fromCache: boolean
private _fromCache: boolean,
private _hasPendingWrites: boolean
) {}

data(
Expand Down Expand Up @@ -1232,10 +1247,7 @@ export class DocumentSnapshot implements firestore.DocumentSnapshot {
}

get metadata(): firestore.SnapshotMetadata {
return new SnapshotMetadata(
this._document !== null && this._document.hasLocalMutations,
this._fromCache
);
return new SnapshotMetadata(this._hasPendingWrites, this._fromCache);
}

isEqual(other: firestore.DocumentSnapshot): boolean {
Expand Down Expand Up @@ -1303,9 +1315,10 @@ export class QueryDocumentSnapshot extends DocumentSnapshot
firestore: Firestore,
key: DocumentKey,
document: Document,
fromCache: boolean
fromCache: boolean,
hasPendingWrites: boolean
) {
super(firestore, key, document, fromCache);
super(firestore, key, document, fromCache, hasPendingWrites);
}

data(options?: SnapshotOptions): firestore.DocumentData {
Expand Down Expand Up @@ -1949,7 +1962,8 @@ export class QuerySnapshot implements firestore.QuerySnapshot {
this._firestore,
doc.key,
doc,
this.metadata.fromCache
this.metadata.fromCache,
this._snapshot.mutatedKeys.has(doc.key)
);
}
}
Expand Down Expand Up @@ -2137,7 +2151,8 @@ export function changesFromSnapshot(
firestore,
change.doc.key,
change.doc,
snapshot.fromCache
snapshot.fromCache,
snapshot.mutatedKeys.has(change.doc.key)
);
assert(
change.type === ChangeType.Added,
Expand Down Expand Up @@ -2168,7 +2183,8 @@ export function changesFromSnapshot(
firestore,
change.doc.key,
change.doc,
snapshot.fromCache
snapshot.fromCache,
snapshot.mutatedKeys.has(change.doc.key)
);
let oldIndex = -1;
let newIndex = -1;
Expand Down
6 changes: 3 additions & 3 deletions packages/firestore/src/core/event_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ export class QueryListener {
snap.docs,
snap.oldDocs,
docChanges,
snap.mutatedKeys,
snap.fromCache,
snap.hasPendingWrites,
snap.syncStateChanged,
/* excludesMetadataChanges= */ true
);
Expand Down Expand Up @@ -288,8 +288,8 @@ export class QueryListener {
snap = ViewSnapshot.fromInitialDocuments(
snap.query,
snap.docs,
snap.fromCache,
snap.hasPendingWrites
snap.mutatedKeys,
snap.fromCache
);
this.raisedInitialEvent = true;
this.queryObserver.next(snap);
Expand Down
3 changes: 1 addition & 2 deletions packages/firestore/src/core/firestore_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,7 @@ export class FirestoreClient {
this.localStore = new LocalStore(
this.persistence,
user,
this.garbageCollector,
this.sharedClientState
this.garbageCollector
);
const serializer = this.platform.newSerializer(
this.databaseInfo.databaseId
Expand Down
3 changes: 2 additions & 1 deletion packages/firestore/src/core/sync_engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
.remoteDocumentKeys(queryData.targetId)
.then(remoteKeys => {
const view = new View(query, remoteKeys);
const viewDocChanges = view.computeDocChanges(docs);
const viewDocChanges = view.computeInitialChanges(docs);
const synthesizedTargetChange = TargetChange.createSynthesizedTargetChangeForCurrentChange(
queryData.targetId,
current && this.onlineState !== OnlineState.Offline
Expand Down Expand Up @@ -606,6 +606,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
return this.localStore
.acknowledgeBatch(mutationBatchResult)
.then(changes => {
this.sharedClientState.updateMutationState(batchId, 'acknowledged');
return this.emitNewSnapsAndNotifyLocalStore(changes);
})
.catch(err => this.ignoreIfPrimaryLeaseLoss(err));
Expand Down
12 changes: 9 additions & 3 deletions packages/firestore/src/core/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,18 @@ export class Transaction {

constructor(private datastore: Datastore) {}

private recordVersion(doc: NoDocument | Document): void {
let docVersion = doc.version;
if (doc instanceof NoDocument) {
private recordVersion(doc: MaybeDocument): void {
let docVersion: SnapshotVersion;

if (doc instanceof Document) {
docVersion = doc.version;
} else if (doc instanceof NoDocument) {
// For deleted docs, we must use baseVersion 0 when we overwrite them.
docVersion = SnapshotVersion.forDeletedDoc();
} else {
throw fail('Document in a transaction was a ' + doc.constructor.name);
}

const existingVersion = this.readVersions.get(doc.key);
if (existingVersion !== null) {
if (!docVersion.isEqual(existingVersion)) {
Expand Down
151 changes: 119 additions & 32 deletions packages/firestore/src/core/view.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,56 @@ export class View {
return this._syncedDocuments;
}

/**
* Computes the initial set of document changes based on the provided
* documents.
*
* Unlike `computeDocChanges`, documents with committed mutations don't raise
* `hasPendingWrites`. This distinction allows us to only raise
* `hasPendingWrite` events for documents that changed during the lifetime of
* the View.
*
* @param docs The docs to apply to this view.
* @return A new set of docs, changes, and refill flag.
*/
computeInitialChanges(docs: MaybeDocumentMap): ViewDocumentChanges {
assert(
this.documentSet.size === 0,
'computeInitialChanges called when docs are aleady present'
);

const changeSet = new DocumentChangeSet();
let newMutatedKeys = this.mutatedKeys;
let newDocumentSet = this.documentSet;

docs.inorderTraversal((key: DocumentKey, maybeDoc: MaybeDocument) => {
if (maybeDoc instanceof Document) {
if (this.query.matches(maybeDoc)) {
changeSet.track({ type: ChangeType.Added, doc: maybeDoc });
newDocumentSet = newDocumentSet.add(maybeDoc);
if (maybeDoc.hasLocalMutations) {
newMutatedKeys = newMutatedKeys.add(key);
}
}
}
});

if (this.query.hasLimit()) {
while (newDocumentSet.size > this.query.limit!) {
const oldDoc = newDocumentSet.last();
newDocumentSet = newDocumentSet.delete(oldDoc!.key);
newMutatedKeys = newMutatedKeys.delete(oldDoc!.key);
changeSet.track({ type: ChangeType.Removed, doc: oldDoc! });
}
}
return {
documentSet: newDocumentSet,
changeSet,
needsRefill: false,
mutatedKeys: newMutatedKeys
};
}

/**
* Iterates over a set of doc changes, applies the query limit, and computes
* what the new results should be, what the changes were, and whether we may
Expand Down Expand Up @@ -153,53 +203,72 @@ export class View {
);
newDoc = this.query.matches(newDoc) ? newDoc : null;
}
if (newDoc) {
newDocumentSet = newDocumentSet.add(newDoc);
if (newDoc.hasLocalMutations) {
newMutatedKeys = newMutatedKeys.add(key);
} else {
newMutatedKeys = newMutatedKeys.delete(key);
}
} else {
newDocumentSet = newDocumentSet.delete(key);
newMutatedKeys = newMutatedKeys.delete(key);
}

const oldDocHadPendingMutations = oldDoc
? this.mutatedKeys.has(oldDoc.key)
: false;
const newDocHasPendingMutations = newDoc
? newDoc.hasLocalMutations ||
// We only consider committed mutations for documents that were
// mutated during the lifetime of the view.
(this.mutatedKeys.has(newDoc.key) && newDoc.hasCommittedMutations)
: false;

let changeApplied = false;

// Calculate change
if (oldDoc && newDoc) {
const docsEqual = oldDoc.data.isEqual(newDoc.data);
if (
!docsEqual ||
oldDoc.hasLocalMutations !== newDoc.hasLocalMutations
) {
// only report a change if document actually changed
if (docsEqual) {
changeSet.track({ type: ChangeType.Metadata, doc: newDoc });
} else {
changeSet.track({ type: ChangeType.Modified, doc: newDoc });
}
if (!docsEqual) {
if (!this.shouldWaitForSyncedDocument(oldDoc, newDoc)) {
changeSet.track({
type: ChangeType.Modified,
doc: newDoc
});
changeApplied = true;

if (
lastDocInLimit &&
this.query.docComparator(newDoc, lastDocInLimit) > 0
) {
// This doc moved from inside the limit to after the limit.
// That means there may be some doc in the local cache that's
// actually less than this one.
needsRefill = true;
if (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this functional change intended? Previously we'd check the limit case in both the full and metadata-only cases but now only in the former case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's simpler now to do this check only in this branch (since we otherwise also have to include changeApplied in the if-check). I can't think of a case where a document would fall outside the limit based on a metadata-only change.

lastDocInLimit &&
this.query.docComparator(newDoc, lastDocInLimit) > 0
) {
// This doc moved from inside the limit to after the limit.
// That means there may be some doc in the local cache that's
// actually less than this one.
needsRefill = true;
}
}
} else if (oldDocHadPendingMutations !== newDocHasPendingMutations) {
changeSet.track({ type: ChangeType.Metadata, doc: newDoc });
changeApplied = true;
}
} else if (!oldDoc && newDoc) {
changeSet.track({ type: ChangeType.Added, doc: newDoc });
changeApplied = true;
} else if (oldDoc && !newDoc) {
changeSet.track({ type: ChangeType.Removed, doc: oldDoc });
changeApplied = true;

if (lastDocInLimit) {
// A doc was removed from a full limit query. We'll need to
// requery from the local cache to see if we know about some other
// doc that should be in the results.
needsRefill = true;
}
}

if (changeApplied) {
if (newDoc) {
newDocumentSet = newDocumentSet.add(newDoc);
if (newDocHasPendingMutations) {
newMutatedKeys = newMutatedKeys.add(key);
} else {
newMutatedKeys = newMutatedKeys.delete(key);
}
} else {
newDocumentSet = newDocumentSet.delete(key);
newMutatedKeys = newMutatedKeys.delete(key);
}
}
}
);
if (this.query.hasLimit()) {
Expand All @@ -223,6 +292,24 @@ export class View {
};
}

private shouldWaitForSyncedDocument(
oldDoc: Document,
newDoc: Document
): boolean {
// We suppress the initial change event for documents that were modified as
// part of a write acknowledgment (e.g. when the value of a server transform
// is applied) as Watch will send us the same document again.
// By suppressing the event, we only raise two user visible events (one with
// `hasPendingWrites` and the final state of the document) instead of three
// (one with `hasPendingWrites`, the modified document with
// `hasPendingWrites` and the final state of the document).
return (
oldDoc.hasLocalMutations &&
newDoc.hasCommittedMutations &&
!newDoc.hasLocalMutations
);
}

/**
* Updates the view with the given ViewDocumentChanges and optionally updates
* limbo docs and sync state from the provided target change.
Expand Down Expand Up @@ -270,8 +357,8 @@ export class View {
docChanges.documentSet,
oldDocs,
changes,
docChanges.mutatedKeys,
newSyncState === SyncState.Local,
!docChanges.mutatedKeys.isEmpty(),
syncStateChanged,
/* excludesMetadataChanges= */ false
);
Expand Down Expand Up @@ -424,8 +511,8 @@ export class View {
return ViewSnapshot.fromInitialDocuments(
this.query,
this.documentSet,
this.syncState === SyncState.Local,
!this.mutatedKeys.isEmpty()
this.mutatedKeys,
this.syncState === SyncState.Local
);
}
}
Expand Down
Loading