Skip to content

Integrate Document Overlay with the SDK. #6054

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
Closed
338 changes: 257 additions & 81 deletions packages/firestore/src/local/local_documents_view.ts

Large diffs are not rendered by default.

116 changes: 96 additions & 20 deletions packages/firestore/src/local/local_store_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import { SortedMap } from '../util/sorted_map';
import { BATCHID_UNKNOWN } from '../util/types';

import { BundleCache } from './bundle_cache';
import { DocumentOverlayCache } from './document_overlay_cache';
import { IndexManager } from './index_manager';
import { IndexedDbMutationQueue } from './indexeddb_mutation_queue';
import { IndexedDbPersistence } from './indexeddb_persistence';
Expand Down Expand Up @@ -125,6 +126,12 @@ class LocalStoreImpl implements LocalStore {
*/
mutationQueue!: MutationQueue;

/**
* The overlays that can be used to short circuit applying all mutations from
* mutation queue.
*/
documentOverlayCache!: DocumentOverlayCache;

/** The set of all cached remote documents. */
remoteDocuments: RemoteDocumentCache;

Expand Down Expand Up @@ -186,6 +193,7 @@ class LocalStoreImpl implements LocalStore {
initializeUserComponents(user: User): void {
// TODO(indexing): Add spec tests that test these components change after a
// user change
this.documentOverlayCache = this.persistence.getDocumentOverlayCache(user);
this.indexManager = this.persistence.getIndexManager(user);
this.mutationQueue = this.persistence.getMutationQueue(
user,
Expand All @@ -194,6 +202,7 @@ class LocalStoreImpl implements LocalStore {
this.localDocuments = new LocalDocumentsView(
this.remoteDocuments,
this.mutationQueue,
this.documentOverlayCache,
this.indexManager
);
this.remoteDocuments.setIndexManager(this.indexManager);
Expand All @@ -209,6 +218,13 @@ class LocalStoreImpl implements LocalStore {
}
}

class DocumentChangeResult {
constructor(
readonly changedDocuments: MutableDocumentMap,
readonly existenceChangedKeys: DocumentKeySet
) {}
}

export function newLocalStore(
/** Manages our in-memory or durable persistence. */
persistence: Persistence,
Expand Down Expand Up @@ -296,6 +312,7 @@ export function localStoreWriteLocally(
const keys = mutations.reduce((keys, m) => keys.add(m.key), documentKeySet());

let existingDocs: DocumentMap;
let mutationBatch: MutationBatch;

return localStoreImpl.persistence
.runTransaction('Locally write mutations', 'readwrite', txn => {
Expand Down Expand Up @@ -340,11 +357,19 @@ export function localStoreWriteLocally(
baseMutations,
mutations
);
})
.next(batch => {
mutationBatch = batch;
const overlays = batch.applyToLocalDocumentSet(existingDocs);
return localStoreImpl.documentOverlayCache.saveOverlays(
txn,
batch.batchId,
overlays
);
});
})
.then(batch => {
batch.applyToLocalDocumentSet(existingDocs);
return { batchId: batch.batchId, changes: existingDocs };
.then(() => {
return { batchId: mutationBatch.batchId, changes: existingDocs };
});
}

Expand Down Expand Up @@ -383,11 +408,38 @@ export function localStoreAcknowledgeBatch(
)
.next(() => documentBuffer.apply(txn))
.next(() => localStoreImpl.mutationQueue.performConsistencyCheck(txn))
.next(() =>
localStoreImpl.documentOverlayCache.removeOverlaysForBatchId(
txn,
affected,
batchResult.batch.batchId
)
)
.next(() =>
localStoreImpl.localDocuments.recalculateAndSaveOverlaysForDocumentKeys(
txn,
getKeysWithTransformResults(batchResult)
)
)
.next(() => localStoreImpl.localDocuments.getDocuments(txn, affected));
}
);
}

function getKeysWithTransformResults(
batchResult: MutationBatchResult
): DocumentKeySet {
let result = documentKeySet();

for (let i = 0; i < batchResult.mutationResults.length; ++i) {
const mutationResult = batchResult.mutationResults[i];
if (mutationResult.transformResults.length > 0) {
result = result.add(batchResult.batch.mutations[i].key);
}
}
return result;
}

/**
* Removes mutations from the MutationQueue for the specified batch;
* LocalDocuments will be recalculated.
Expand All @@ -412,6 +464,19 @@ export function localStoreRejectBatch(
return localStoreImpl.mutationQueue.removeMutationBatch(txn, batch);
})
.next(() => localStoreImpl.mutationQueue.performConsistencyCheck(txn))
.next(() =>
localStoreImpl.documentOverlayCache.removeOverlaysForBatchId(
txn,
affectedKeys,
batchId
)
)
.next(() =>
localStoreImpl.localDocuments.recalculateAndSaveOverlaysForDocumentKeys(
txn,
affectedKeys
)
)
.next(() =>
localStoreImpl.localDocuments.getDocuments(txn, affectedKeys)
);
Expand Down Expand Up @@ -530,6 +595,7 @@ export function localStoreApplyRemoteEventToLocalCache(
});

let changedDocs = mutableDocumentMap();
let existenceChangedKeys = documentKeySet();
remoteEvent.documentUpdates.forEach(key => {
if (remoteEvent.resolvedLimboDocuments.has(key)) {
promises.push(
Expand All @@ -541,15 +607,16 @@ export function localStoreApplyRemoteEventToLocalCache(
}
});

// Each loop iteration only affects its "own" doc, so it's safe to get all the remote
// documents in advance in a single call.
// Each loop iteration only affects its "own" doc, so it's safe to get all
// the remote documents in advance in a single call.
promises.push(
populateDocumentChangeBuffer(
txn,
documentBuffer,
remoteEvent.documentUpdates
).next(result => {
changedDocs = result;
changedDocs = result.changedDocuments;
existenceChangedKeys = result.existenceChangedKeys;
})
);

Expand Down Expand Up @@ -580,9 +647,10 @@ export function localStoreApplyRemoteEventToLocalCache(
return PersistencePromise.waitFor(promises)
.next(() => documentBuffer.apply(txn))
.next(() =>
localStoreImpl.localDocuments.applyLocalViewToDocuments(
localStoreImpl.localDocuments.getLocalViewOfDocuments(
txn,
changedDocs
changedDocs,
existenceChangedKeys
)
)
.next(() => changedDocs);
Expand All @@ -595,7 +663,11 @@ export function localStoreApplyRemoteEventToLocalCache(

/**
* Populates document change buffer with documents from backend or a bundle.
* Returns the document changes resulting from applying those documents.
* Returns the document changes resulting from applying those documents, and
* also a set of documents whose existence state are changed as a result.
*
* Note: this function will use `documentVersions` if it is defined;
* when it is not defined, resorts to `globalVersion`.
*
* @param txn - Transaction to use to read existing documents from storage.
* @param documentBuffer - Document buffer to collect the resulted changes to be
Expand All @@ -605,22 +677,25 @@ export function localStoreApplyRemoteEventToLocalCache(
* documents have the same read time.
* @param documentVersions - A DocumentKey-to-SnapshotVersion map if documents
* have their own read time.
*
* Note: this function will use `documentVersions` if it is defined;
* when it is not defined, resorts to `globalVersion`.
*/
function populateDocumentChangeBuffer(
txn: PersistenceTransaction,
documentBuffer: RemoteDocumentChangeBuffer,
documents: MutableDocumentMap
): PersistencePromise<MutableDocumentMap> {
): PersistencePromise<DocumentChangeResult> {
let updatedKeys = documentKeySet();
let conditionChanged = documentKeySet();
documents.forEach(k => (updatedKeys = updatedKeys.add(k)));
return documentBuffer.getEntries(txn, updatedKeys).next(existingDocs => {
let changedDocs = mutableDocumentMap();
documents.forEach((key, doc) => {
const existingDoc = existingDocs.get(key)!;

// Check if see if there is a existence state change for this document.
if (doc.isFoundDocument() !== existingDoc.isFoundDocument()) {
conditionChanged = conditionChanged.add(key);
}

// Note: The order of the steps below is important, since we want
// to ensure that rejected limbo resolutions (which fabricate
// NoDocuments with SnapshotVersion.min()) never add documents to
Expand Down Expand Up @@ -655,7 +730,7 @@ function populateDocumentChangeBuffer(
);
}
});
return changedDocs;
return new DocumentChangeResult(changedDocs, conditionChanged);
});
}

Expand Down Expand Up @@ -1225,11 +1300,11 @@ export async function localStoreApplyBundledDocuments(
'readwrite',
txn => {
return populateDocumentChangeBuffer(txn, documentBuffer, documentMap)
.next(changedDocs => {
.next(documentChangeResult => {
documentBuffer.apply(txn);
return changedDocs;
return documentChangeResult;
})
.next(changedDocs => {
.next(documentChangeResult => {
return localStoreImpl.targetCache
.removeMatchingKeysForTargetId(txn, umbrellaTargetData.targetId)
.next(() =>
Expand All @@ -1240,12 +1315,13 @@ export async function localStoreApplyBundledDocuments(
)
)
.next(() =>
localStoreImpl.localDocuments.applyLocalViewToDocuments(
localStoreImpl.localDocuments.getLocalViewOfDocuments(
txn,
changedDocs
documentChangeResult.changedDocuments,
documentChangeResult.existenceChangedKeys
)
)
.next(() => changedDocs);
.next(() => documentChangeResult.changedDocuments);
});
}
);
Expand Down
8 changes: 6 additions & 2 deletions packages/firestore/src/local/query_engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ import {
import { SnapshotVersion } from '../core/snapshot_version';
import { DocumentKeySet, DocumentMap } from '../model/collections';
import { Document } from '../model/document';
import {
IndexOffset,
newIndexOffsetSuccessorFromReadTime
} from '../model/field_index';
import { debugAssert } from '../util/assert';
import { getLogLevel, LogLevel, logDebug } from '../util/log';
import { SortedSet } from '../util/sorted_set';
Expand Down Expand Up @@ -117,7 +121,7 @@ export class QueryEngine {
return this.localDocumentsView!.getDocumentsMatchingQuery(
transaction,
query,
lastLimboFreeSnapshotVersion
newIndexOffsetSuccessorFromReadTime(lastLimboFreeSnapshotVersion, -1)
).next(updatedResults => {
// We merge `previousResults` into `updateResults`, since
// `updateResults` is already a DocumentMap. If a document is
Expand Down Expand Up @@ -207,7 +211,7 @@ export class QueryEngine {
return this.localDocumentsView!.getDocumentsMatchingQuery(
transaction,
query,
SnapshotVersion.min()
IndexOffset.min()
);
}
}
8 changes: 8 additions & 0 deletions packages/firestore/src/model/collections.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ export function newMutationMap(): MutationMap {
);
}

export type DocumentKeyMap<T> = ObjectMap<DocumentKey, T>;
export function newDocumentKeyMap<T>(): DocumentKeyMap<T> {
return new ObjectMap<DocumentKey, T>(
key => key.toString(),
(l, r) => l.isEqual(r)
);
}

export type DocumentVersionMap = SortedMap<DocumentKey, SnapshotVersion>;
const EMPTY_DOCUMENT_VERSION_MAP = new SortedMap<DocumentKey, SnapshotVersion>(
DocumentKey.comparator
Expand Down
5 changes: 1 addition & 4 deletions packages/firestore/src/model/document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,8 @@ export class MutableDocument implements Document {
}

setHasLocalMutations(): MutableDocument {
debugAssert(
this.isFoundDocument(),
'Only found documents can have local mutations'
);
this.documentState = DocumentState.HAS_LOCAL_MUTATIONS;
this.version = SnapshotVersion.min();
return this;
}

Expand Down
4 changes: 4 additions & 0 deletions packages/firestore/src/model/field_mask.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ export class FieldMask {
);
}

static empty(): FieldMask {
return new FieldMask([]);
}

/**
* Verifies that `fieldPath` is included by at least one field in this field
* mask.
Expand Down
Loading