Skip to content

Commit 2715647

Browse files
Backfill Mutations (#3323)
1 parent 28c950c commit 2715647

21 files changed

+508
-189
lines changed

firebase-firestore/src/main/java/com/google/firebase/firestore/core/SyncEngine.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@
3232
import com.google.firebase.firestore.bundle.BundleMetadata;
3333
import com.google.firebase.firestore.bundle.BundleReader;
3434
import com.google.firebase.firestore.core.ViewSnapshot.SyncState;
35+
import com.google.firebase.firestore.local.LocalDocumentsResult;
3536
import com.google.firebase.firestore.local.LocalStore;
3637
import com.google.firebase.firestore.local.LocalViewChanges;
37-
import com.google.firebase.firestore.local.LocalWriteResult;
3838
import com.google.firebase.firestore.local.QueryPurpose;
3939
import com.google.firebase.firestore.local.QueryResult;
4040
import com.google.firebase.firestore.local.ReferenceSet;
@@ -274,10 +274,10 @@ void stopListening(Query query) {
274274
public void writeMutations(List<Mutation> mutations, TaskCompletionSource<Void> userTask) {
275275
assertCallback("writeMutations");
276276

277-
LocalWriteResult result = localStore.writeLocally(mutations);
277+
LocalDocumentsResult result = localStore.writeLocally(mutations);
278278
addUserCallback(result.getBatchId(), userTask);
279279

280-
emitNewSnapsAndNotifyLocalStore(result.getChanges(), /*remoteEvent=*/ null);
280+
emitNewSnapsAndNotifyLocalStore(result.getDocuments(), /*remoteEvent=*/ null);
281281
remoteStore.fillWritePipeline();
282282
}
283283

firebase-firestore/src/main/java/com/google/firebase/firestore/local/IndexBackfiller.java

Lines changed: 41 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import androidx.annotation.Nullable;
2020
import androidx.annotation.VisibleForTesting;
21-
import com.google.firebase.database.collection.ImmutableSortedMap;
2221
import com.google.firebase.firestore.model.Document;
2322
import com.google.firebase.firestore.model.DocumentKey;
2423
import com.google.firebase.firestore.model.FieldIndex;
@@ -45,15 +44,13 @@ public class IndexBackfiller {
4544

4645
private final Scheduler scheduler;
4746
private final Persistence persistence;
48-
private final RemoteDocumentCache remoteDocumentCache;
4947
private LocalDocumentsView localDocumentsView;
5048
private IndexManager indexManager;
5149
private int maxDocumentsToProcess = MAX_DOCUMENTS_TO_PROCESS;
5250

5351
public IndexBackfiller(Persistence persistence, AsyncQueue asyncQueue) {
5452
this.persistence = persistence;
5553
this.scheduler = new Scheduler(asyncQueue);
56-
this.remoteDocumentCache = persistence.getRemoteDocumentCache();
5754
}
5855

5956
public void setLocalDocumentsView(LocalDocumentsView localDocumentsView) {
@@ -110,12 +107,11 @@ public Scheduler getScheduler() {
110107
public int backfill() {
111108
hardAssert(localDocumentsView != null, "setLocalDocumentsView() not called");
112109
hardAssert(indexManager != null, "setIndexManager() not called");
113-
return persistence.runTransaction(
114-
"Backfill Indexes", () -> writeIndexEntries(localDocumentsView));
110+
return persistence.runTransaction("Backfill Indexes", () -> this.writeIndexEntries());
115111
}
116112

117113
/** Writes index entries until the cap is reached. Returns the number of documents processed. */
118-
private int writeIndexEntries(LocalDocumentsView localDocumentsView) {
114+
private int writeIndexEntries() {
119115
Set<String> processedCollectionGroups = new HashSet<>();
120116
int documentsRemaining = maxDocumentsToProcess;
121117
while (documentsRemaining > 0) {
@@ -124,58 +120,63 @@ private int writeIndexEntries(LocalDocumentsView localDocumentsView) {
124120
break;
125121
}
126122
Logger.debug(LOG_TAG, "Processing collection: %s", collectionGroup);
127-
documentsRemaining -=
128-
writeEntriesForCollectionGroup(localDocumentsView, collectionGroup, documentsRemaining);
123+
documentsRemaining -= writeEntriesForCollectionGroup(collectionGroup, documentsRemaining);
129124
processedCollectionGroups.add(collectionGroup);
130125
}
131126
return maxDocumentsToProcess - documentsRemaining;
132127
}
133128

134-
/** Writes entries for the fetched field indexes. */
129+
/**
130+
* Writes entries for the provided collection group. Returns the number of documents processed.
131+
*/
135132
private int writeEntriesForCollectionGroup(
136-
LocalDocumentsView localDocumentsView, String collectionGroup, int entriesRemainingUnderCap) {
137-
// TODO(indexing): Support mutation batch Ids when sorting and writing indexes.
138-
133+
String collectionGroup, int documentsRemainingUnderCap) {
139134
// Use the earliest offset of all field indexes to query the local cache.
140-
IndexOffset existingOffset = getExistingOffset(indexManager.getFieldIndexes(collectionGroup));
141-
ImmutableSortedMap<DocumentKey, Document> documents =
142-
localDocumentsView.getDocuments(collectionGroup, existingOffset, entriesRemainingUnderCap);
143-
indexManager.updateIndexEntries(documents);
135+
Collection<FieldIndex> fieldIndexes = indexManager.getFieldIndexes(collectionGroup);
136+
IndexOffset existingOffset = getExistingOffset(fieldIndexes);
137+
138+
LocalDocumentsResult nextBatch =
139+
localDocumentsView.getNextDocuments(
140+
collectionGroup, existingOffset, documentsRemainingUnderCap);
141+
indexManager.updateIndexEntries(nextBatch.getDocuments());
144142

145-
IndexOffset newOffset = getNewOffset(documents, existingOffset);
143+
IndexOffset newOffset = getNewOffset(existingOffset, nextBatch);
146144
indexManager.updateCollectionGroup(collectionGroup, newOffset);
147145

148-
return documents.size();
146+
return nextBatch.getDocuments().size();
149147
}
150148

151-
/** Returns the lowest offset for the provided index group. */
152-
private IndexOffset getExistingOffset(Collection<FieldIndex> fieldIndexes) {
153-
IndexOffset lowestOffset = null;
154-
for (FieldIndex fieldIndex : fieldIndexes) {
155-
if (lowestOffset == null
156-
|| fieldIndex.getIndexState().getOffset().compareTo(lowestOffset) < 0) {
157-
lowestOffset = fieldIndex.getIndexState().getOffset();
149+
/** Returns the next offset based on the provided documents. */
150+
private IndexOffset getNewOffset(IndexOffset existingOffset, LocalDocumentsResult lookupResult) {
151+
IndexOffset maxOffset = existingOffset;
152+
for (Map.Entry<DocumentKey, Document> entry : lookupResult.getDocuments()) {
153+
IndexOffset newOffset = IndexOffset.fromDocument(entry.getValue());
154+
if (newOffset.compareTo(maxOffset) > 0) {
155+
maxOffset = newOffset;
158156
}
159157
}
160-
return lowestOffset == null ? IndexOffset.NONE : lowestOffset;
158+
return IndexOffset.create(
159+
maxOffset.getReadTime(),
160+
maxOffset.getDocumentKey(),
161+
Math.max(lookupResult.getBatchId(), existingOffset.getLargestBatchId()));
161162
}
162163

163-
/** Returns the offset for the index based on the newly indexed documents. */
164-
private IndexOffset getNewOffset(
165-
ImmutableSortedMap<DocumentKey, Document> documents, IndexOffset currentOffset) {
166-
if (documents.isEmpty()) {
167-
return IndexOffset.create(remoteDocumentCache.getLatestReadTime());
168-
} else {
169-
IndexOffset latestOffset = currentOffset;
170-
Iterator<Map.Entry<DocumentKey, Document>> it = documents.iterator();
171-
while (it.hasNext()) {
172-
IndexOffset newOffset = IndexOffset.fromDocument(it.next().getValue());
173-
if (newOffset.compareTo(latestOffset) > 0) {
174-
latestOffset = newOffset;
175-
}
164+
/** Returns the lowest offset for the provided index group. */
165+
private IndexOffset getExistingOffset(Collection<FieldIndex> fieldIndexes) {
166+
hardAssert(!fieldIndexes.isEmpty(), "Updating collection without indexes");
167+
168+
Iterator<FieldIndex> it = fieldIndexes.iterator();
169+
IndexOffset minOffset = it.next().getIndexState().getOffset();
170+
int minBatchId = minOffset.getLargestBatchId();
171+
while (it.hasNext()) {
172+
IndexOffset newOffset = it.next().getIndexState().getOffset();
173+
if (newOffset.compareTo(minOffset) < 0) {
174+
minOffset = newOffset;
176175
}
177-
return latestOffset;
176+
minBatchId = Math.max(newOffset.getLargestBatchId(), minBatchId);
178177
}
178+
179+
return IndexOffset.create(minOffset.getReadTime(), minOffset.getDocumentKey(), minBatchId);
179180
}
180181

181182
@VisibleForTesting

firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalWriteResult.java renamed to firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalDocumentsResult.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,26 @@
1818
import com.google.firebase.firestore.model.Document;
1919
import com.google.firebase.firestore.model.DocumentKey;
2020

21-
/** The result of a write to the local store. */
22-
public final class LocalWriteResult {
21+
/**
22+
* Represents a set of document along with their mutation batch ID.
23+
*
24+
* <p>This class is used when applying mutations to the local store and to propagate document
25+
* updates to the indexing table.
26+
*/
27+
public final class LocalDocumentsResult {
2328
private final int batchId;
29+
private final ImmutableSortedMap<DocumentKey, Document> documents;
2430

25-
private final ImmutableSortedMap<DocumentKey, Document> changes;
26-
27-
LocalWriteResult(int batchId, ImmutableSortedMap<DocumentKey, Document> changes) {
31+
LocalDocumentsResult(int batchId, ImmutableSortedMap<DocumentKey, Document> documents) {
2832
this.batchId = batchId;
29-
this.changes = changes;
33+
this.documents = documents;
3034
}
3135

3236
public int getBatchId() {
3337
return batchId;
3438
}
3539

36-
public ImmutableSortedMap<DocumentKey, Document> getChanges() {
37-
return changes;
40+
public ImmutableSortedMap<DocumentKey, Document> getDocuments() {
41+
return documents;
3842
}
3943
}

firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalDocumentsView.java

Lines changed: 71 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717
import static com.google.firebase.firestore.model.DocumentCollections.emptyDocumentMap;
1818
import static com.google.firebase.firestore.util.Assert.hardAssert;
1919

20+
import androidx.annotation.Nullable;
2021
import androidx.annotation.VisibleForTesting;
2122
import com.google.firebase.Timestamp;
2223
import com.google.firebase.database.collection.ImmutableSortedMap;
2324
import com.google.firebase.firestore.core.Query;
2425
import com.google.firebase.firestore.model.Document;
2526
import com.google.firebase.firestore.model.DocumentKey;
27+
import com.google.firebase.firestore.model.FieldIndex;
2628
import com.google.firebase.firestore.model.FieldIndex.IndexOffset;
2729
import com.google.firebase.firestore.model.MutableDocument;
2830
import com.google.firebase.firestore.model.ResourcePath;
@@ -31,6 +33,7 @@
3133
import com.google.firebase.firestore.model.mutation.MutationBatch;
3234
import com.google.firebase.firestore.model.mutation.Overlay;
3335
import com.google.firebase.firestore.model.mutation.PatchMutation;
36+
import java.util.Collections;
3437
import java.util.HashMap;
3538
import java.util.HashSet;
3639
import java.util.List;
@@ -84,15 +87,10 @@ DocumentOverlayCache getDocumentOverlayCache() {
8487
*/
8588
Document getDocument(DocumentKey key) {
8689
Overlay overlay = documentOverlayCache.getOverlay(key);
87-
// Only read from remote document cache if overlay is a patch.
88-
MutableDocument document =
89-
(overlay == null || overlay.getMutation() instanceof PatchMutation)
90-
? remoteDocumentCache.get(key)
91-
: MutableDocument.newInvalidDocument(key);
90+
MutableDocument document = getBaseDocument(key, overlay);
9291
if (overlay != null) {
9392
overlay.getMutation().applyToLocalView(document, null, Timestamp.now());
9493
}
95-
9694
return document;
9795
}
9896

@@ -117,21 +115,35 @@ ImmutableSortedMap<DocumentKey, Document> getDocuments(Iterable<DocumentKey> key
117115
*/
118116
ImmutableSortedMap<DocumentKey, Document> getLocalViewOfDocuments(
119117
Map<DocumentKey, MutableDocument> docs, Set<DocumentKey> existenceStateChanged) {
118+
return computeViews(docs, Collections.emptyMap(), existenceStateChanged);
119+
}
120+
121+
/**
122+
* Computes the local view for doc, applying overlays from both {@code memoizedOverlays} and the
123+
* overlay cache.
124+
*/
125+
private ImmutableSortedMap<DocumentKey, Document> computeViews(
126+
Map<DocumentKey, MutableDocument> docs,
127+
Map<DocumentKey, Overlay> memoizedOverlays,
128+
Set<DocumentKey> existenceStateChanged) {
120129
ImmutableSortedMap<DocumentKey, Document> results = emptyDocumentMap();
121130
Map<DocumentKey, MutableDocument> recalculateDocuments = new HashMap<>();
122-
for (Map.Entry<DocumentKey, MutableDocument> entry : docs.entrySet()) {
123-
Overlay overlay = documentOverlayCache.getOverlay(entry.getKey());
131+
for (MutableDocument doc : docs.values()) {
132+
Overlay overlay =
133+
memoizedOverlays.containsKey(doc.getKey())
134+
? memoizedOverlays.get(doc.getKey())
135+
: documentOverlayCache.getOverlay(doc.getKey());
124136
// Recalculate an overlay if the document's existence state is changed due to a remote
125137
// event *and* the overlay is a PatchMutation. This is because document existence state
126138
// can change if some patch mutation's preconditions are met.
127139
// NOTE: we recalculate when `overlay` is null as well, because there might be a patch
128140
// mutation whose precondition does not match before the change (hence overlay==null),
129141
// but would now match.
130-
if (existenceStateChanged.contains(entry.getKey())
142+
if (existenceStateChanged.contains(doc.getKey())
131143
&& (overlay == null || overlay.getMutation() instanceof PatchMutation)) {
132-
recalculateDocuments.put(entry.getKey(), docs.get(entry.getKey()));
144+
recalculateDocuments.put(doc.getKey(), doc);
133145
} else if (overlay != null) {
134-
overlay.getMutation().applyToLocalView(entry.getValue(), null, Timestamp.now());
146+
overlay.getMutation().applyToLocalView(doc, null, Timestamp.now());
135147
}
136148
}
137149

@@ -190,14 +202,6 @@ void recalculateAndSaveOverlays(Set<DocumentKey> documentKeys) {
190202
recalculateAndSaveOverlays(docs);
191203
}
192204

193-
/** Gets the local view of the next {@code count} documents based on their read time. */
194-
ImmutableSortedMap<DocumentKey, Document> getDocuments(
195-
String collectionGroup, IndexOffset offset, int count) {
196-
Map<DocumentKey, MutableDocument> docs =
197-
remoteDocumentCache.getAll(collectionGroup, offset, count);
198-
return getLocalViewOfDocuments(docs, new HashSet<>());
199-
}
200-
201205
/**
202206
* Performs a query against the local view of all documents.
203207
*
@@ -250,7 +254,47 @@ private ImmutableSortedMap<DocumentKey, Document> getDocumentsMatchingCollection
250254
return results;
251255
}
252256

253-
/** Queries the remote documents and overlays by doing a full collection scan. */
257+
/**
258+
* Given a collection group, returns the next documents that follow the provided offset, along
259+
* with an updated batch ID.
260+
*
261+
* <p>The documents returned by this method are ordered by remote version from the provided
262+
* offset. If there are no more remote documents after the provided offset, documents with
263+
* mutations in order of batch id from the offset are returned. Since all documents in a batch are
264+
* returned together, the total number of documents returned can exceed {@code count}.
265+
*
266+
* @param collectionGroup The collection group for the documents.
267+
* @param offset The offset to index into.
268+
* @param count The number of documents to return
269+
* @return A LocalDocumentsResult with the documents that follow the provided offset and the last
270+
* processed batch id.
271+
*/
272+
LocalDocumentsResult getNextDocuments(String collectionGroup, IndexOffset offset, int count) {
273+
Map<DocumentKey, MutableDocument> docs =
274+
remoteDocumentCache.getAll(collectionGroup, offset, count);
275+
Map<DocumentKey, Overlay> overlays =
276+
count - docs.size() > 0
277+
? documentOverlayCache.getOverlays(
278+
collectionGroup, offset.getLargestBatchId(), count - docs.size())
279+
: Collections.emptyMap();
280+
281+
int largestBatchId = FieldIndex.INITIAL_LARGEST_BATCH_ID;
282+
for (Overlay overlay : overlays.values()) {
283+
if (!docs.containsKey(overlay.getKey())) {
284+
docs.put(overlay.getKey(), getBaseDocument(overlay.getKey(), overlay));
285+
}
286+
// The callsite will use the largest batch ID together with the latest read time to create
287+
// a new index offset. Since we only process batch IDs if all remote documents have been read,
288+
// no overlay will increase the overall read time. This is why we only need to special case
289+
// the batch id.
290+
largestBatchId = Math.max(largestBatchId, overlay.getLargestBatchId());
291+
}
292+
293+
ImmutableSortedMap<DocumentKey, Document> localDocs =
294+
computeViews(docs, overlays, Collections.emptySet());
295+
return new LocalDocumentsResult(largestBatchId, localDocs);
296+
}
297+
254298
private ImmutableSortedMap<DocumentKey, Document> getDocumentsMatchingCollectionQuery(
255299
Query query, IndexOffset offset) {
256300
Map<DocumentKey, MutableDocument> remoteDocuments =
@@ -281,4 +325,11 @@ private ImmutableSortedMap<DocumentKey, Document> getDocumentsMatchingCollection
281325

282326
return results;
283327
}
328+
329+
/** Returns a base document that can be used to apply `overlay`. */
330+
private MutableDocument getBaseDocument(DocumentKey key, @Nullable Overlay overlay) {
331+
return (overlay == null || overlay.getMutation() instanceof PatchMutation)
332+
? remoteDocumentCache.get(key)
333+
: MutableDocument.newInvalidDocument(key);
334+
}
284335
}

firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalStore.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ public ImmutableSortedMap<DocumentKey, Document> handleUserChange(User user) {
236236
}
237237

238238
/** Accepts locally generated Mutations and commits them to storage. */
239-
public LocalWriteResult writeLocally(List<Mutation> mutations) {
239+
public LocalDocumentsResult writeLocally(List<Mutation> mutations) {
240240
Timestamp localWriteTime = Timestamp.now();
241241

242242
// TODO: Call queryEngine.handleDocumentChange() appropriately.
@@ -277,7 +277,7 @@ public LocalWriteResult writeLocally(List<Mutation> mutations) {
277277
mutationQueue.addMutationBatch(localWriteTime, baseMutations, mutations);
278278
Map<DocumentKey, Mutation> overlays = batch.applyToLocalDocumentSet(documents);
279279
documentOverlayCache.saveOverlays(batch.getBatchId(), overlays);
280-
return new LocalWriteResult(batch.getBatchId(), documents);
280+
return new LocalDocumentsResult(batch.getBatchId(), documents);
281281
});
282282
}
283283

0 commit comments

Comments
 (0)