Skip to content

Commit 85eba03

Browse files
Backfill Mutations
1 parent 58c706b commit 85eba03

20 files changed

+419
-184
lines changed

firebase-firestore/src/main/java/com/google/firebase/firestore/core/SyncEngine.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@
3232
import com.google.firebase.firestore.bundle.BundleMetadata;
3333
import com.google.firebase.firestore.bundle.BundleReader;
3434
import com.google.firebase.firestore.core.ViewSnapshot.SyncState;
35+
import com.google.firebase.firestore.local.LocalDocumentsResult;
3536
import com.google.firebase.firestore.local.LocalStore;
3637
import com.google.firebase.firestore.local.LocalViewChanges;
37-
import com.google.firebase.firestore.local.LocalWriteResult;
3838
import com.google.firebase.firestore.local.QueryPurpose;
3939
import com.google.firebase.firestore.local.QueryResult;
4040
import com.google.firebase.firestore.local.ReferenceSet;
@@ -274,10 +274,10 @@ void stopListening(Query query) {
274274
public void writeMutations(List<Mutation> mutations, TaskCompletionSource<Void> userTask) {
275275
assertCallback("writeMutations");
276276

277-
LocalWriteResult result = localStore.writeLocally(mutations);
278-
addUserCallback(result.getBatchId(), userTask);
277+
LocalDocumentsResult result = localStore.writeLocally(mutations);
278+
addUserCallback(result.getLargestBatchId(), userTask);
279279

280-
emitNewSnapsAndNotifyLocalStore(result.getChanges(), /*remoteEvent=*/ null);
280+
emitNewSnapsAndNotifyLocalStore(result.getDocuments(), /*remoteEvent=*/ null);
281281
remoteStore.fillWritePipeline();
282282
}
283283

firebase-firestore/src/main/java/com/google/firebase/firestore/local/IndexBackfiller.java

Lines changed: 43 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import androidx.annotation.Nullable;
2020
import androidx.annotation.VisibleForTesting;
21-
import com.google.firebase.database.collection.ImmutableSortedMap;
2221
import com.google.firebase.firestore.model.Document;
2322
import com.google.firebase.firestore.model.DocumentKey;
2423
import com.google.firebase.firestore.model.FieldIndex;
@@ -110,12 +109,11 @@ public Scheduler getScheduler() {
110109
public int backfill() {
111110
hardAssert(localDocumentsView != null, "setLocalDocumentsView() not called");
112111
hardAssert(indexManager != null, "setIndexManager() not called");
113-
return persistence.runTransaction(
114-
"Backfill Indexes", () -> writeIndexEntries(localDocumentsView));
112+
return persistence.runTransaction("Backfill Indexes", this::writeIndexEntries);
115113
}
116114

117115
/** Writes index entries until the cap is reached. Returns the number of documents processed. */
118-
private int writeIndexEntries(LocalDocumentsView localDocumentsView) {
116+
private int writeIndexEntries() {
119117
Set<String> processedCollectionGroups = new HashSet<>();
120118
int documentsRemaining = maxDocumentsToProcess;
121119
while (documentsRemaining > 0) {
@@ -124,58 +122,65 @@ private int writeIndexEntries(LocalDocumentsView localDocumentsView) {
124122
break;
125123
}
126124
Logger.debug(LOG_TAG, "Processing collection: %s", collectionGroup);
127-
documentsRemaining -=
128-
writeEntriesForCollectionGroup(localDocumentsView, collectionGroup, documentsRemaining);
125+
documentsRemaining -= writeEntriesForCollectionGroup(collectionGroup, documentsRemaining);
129126
processedCollectionGroups.add(collectionGroup);
130127
}
131128
return maxDocumentsToProcess - documentsRemaining;
132129
}
133130

134-
/** Writes entries for the fetched field indexes. */
131+
/**
132+
* Writes entries for the provided collection group. Returns the number of documents processed.
133+
*/
135134
private int writeEntriesForCollectionGroup(
136-
LocalDocumentsView localDocumentsView, String collectionGroup, int entriesRemainingUnderCap) {
137-
// TODO(indexing): Support mutation batch Ids when sorting and writing indexes.
138-
135+
String collectionGroup, int documentsRemainingUnderCap) {
139136
// Use the earliest offset of all field indexes to query the local cache.
140-
IndexOffset existingOffset = getExistingOffset(indexManager.getFieldIndexes(collectionGroup));
141-
ImmutableSortedMap<DocumentKey, Document> documents =
142-
localDocumentsView.getDocuments(collectionGroup, existingOffset, entriesRemainingUnderCap);
143-
indexManager.updateIndexEntries(documents);
137+
Collection<FieldIndex> fieldIndexes = indexManager.getFieldIndexes(collectionGroup);
138+
IndexOffset existingOffset = getExistingOffset(fieldIndexes);
139+
140+
LocalDocumentsResult nextBatch =
141+
localDocumentsView.getNextDocuments(
142+
collectionGroup, existingOffset, documentsRemainingUnderCap);
143+
indexManager.updateIndexEntries(nextBatch.getDocuments());
144144

145-
IndexOffset newOffset = getNewOffset(documents, existingOffset);
145+
IndexOffset newOffset = getNewOffset(existingOffset, nextBatch);
146146
indexManager.updateCollectionGroup(collectionGroup, newOffset);
147147

148-
return documents.size();
148+
return nextBatch.getDocuments().size();
149149
}
150150

151-
/** Returns the lowest offset for the provided index group. */
152-
private IndexOffset getExistingOffset(Collection<FieldIndex> fieldIndexes) {
153-
IndexOffset lowestOffset = null;
154-
for (FieldIndex fieldIndex : fieldIndexes) {
155-
if (lowestOffset == null
156-
|| fieldIndex.getIndexState().getOffset().compareTo(lowestOffset) < 0) {
157-
lowestOffset = fieldIndex.getIndexState().getOffset();
151+
/** Returns the next offset based on the provided documents. */
152+
private IndexOffset getNewOffset(IndexOffset existingOffset, LocalDocumentsResult lookupResult) {
153+
IndexOffset maxOffset = existingOffset;
154+
for (Map.Entry<DocumentKey, Document> entry : lookupResult.getDocuments()) {
155+
IndexOffset newOffset = IndexOffset.fromDocument(entry.getValue());
156+
if (newOffset.compareTo(maxOffset) > 0) {
157+
maxOffset = newOffset;
158158
}
159159
}
160-
return lowestOffset == null ? IndexOffset.NONE : lowestOffset;
160+
return IndexOffset.create(
161+
maxOffset.getReadTime(),
162+
maxOffset.getDocumentKey(),
163+
Math.max(lookupResult.getLargestBatchId(), existingOffset.getLargestBatchId()));
161164
}
162165

163-
/** Returns the offset for the index based on the newly indexed documents. */
164-
private IndexOffset getNewOffset(
165-
ImmutableSortedMap<DocumentKey, Document> documents, IndexOffset currentOffset) {
166-
if (documents.isEmpty()) {
167-
return IndexOffset.create(remoteDocumentCache.getLatestReadTime());
168-
} else {
169-
IndexOffset latestOffset = currentOffset;
170-
Iterator<Map.Entry<DocumentKey, Document>> it = documents.iterator();
171-
while (it.hasNext()) {
172-
IndexOffset newOffset = IndexOffset.fromDocument(it.next().getValue());
173-
if (newOffset.compareTo(latestOffset) > 0) {
174-
latestOffset = newOffset;
175-
}
166+
/** Returns the lowest offset for the provided index group. */
167+
private IndexOffset getExistingOffset(Collection<FieldIndex> fieldIndexes) {
168+
if (fieldIndexes.isEmpty()) {
169+
return IndexOffset.NONE;
170+
}
171+
172+
Iterator<FieldIndex> it = fieldIndexes.iterator();
173+
IndexOffset minOffset = it.next().getIndexState().getOffset();
174+
int minBatchId = minOffset.getLargestBatchId();
175+
while (it.hasNext()) {
176+
IndexOffset newOffset = it.next().getIndexState().getOffset();
177+
if (newOffset.compareTo(minOffset) < 0) {
178+
minOffset = newOffset;
176179
}
177-
return latestOffset;
180+
minBatchId = Math.max(newOffset.getLargestBatchId(), minBatchId);
178181
}
182+
183+
return IndexOffset.create(minOffset.getReadTime(), minOffset.getDocumentKey(), minBatchId);
179184
}
180185

181186
@VisibleForTesting

firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalWriteResult.java renamed to firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalDocumentsResult.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,21 @@
1818
import com.google.firebase.firestore.model.Document;
1919
import com.google.firebase.firestore.model.DocumentKey;
2020

21-
/** The result of a write to the local store. */
22-
public final class LocalWriteResult {
23-
private final int batchId;
21+
/** The result of a applying local mutations. */
22+
public final class LocalDocumentsResult {
23+
private final int largestBatchId;
24+
private final ImmutableSortedMap<DocumentKey, Document> documents;
2425

25-
private final ImmutableSortedMap<DocumentKey, Document> changes;
26-
27-
LocalWriteResult(int batchId, ImmutableSortedMap<DocumentKey, Document> changes) {
28-
this.batchId = batchId;
29-
this.changes = changes;
26+
LocalDocumentsResult(int largestBatchId, ImmutableSortedMap<DocumentKey, Document> documents) {
27+
this.largestBatchId = largestBatchId;
28+
this.documents = documents;
3029
}
3130

32-
public int getBatchId() {
33-
return batchId;
31+
public int getLargestBatchId() {
32+
return largestBatchId;
3433
}
3534

36-
public ImmutableSortedMap<DocumentKey, Document> getChanges() {
37-
return changes;
35+
public ImmutableSortedMap<DocumentKey, Document> getDocuments() {
36+
return documents;
3837
}
3938
}

firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalDocumentsView.java

Lines changed: 66 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import static com.google.firebase.firestore.model.DocumentCollections.emptyDocumentMap;
1818
import static com.google.firebase.firestore.util.Assert.hardAssert;
1919

20+
import androidx.annotation.Nullable;
2021
import androidx.annotation.VisibleForTesting;
2122
import com.google.firebase.Timestamp;
2223
import com.google.firebase.database.collection.ImmutableSortedMap;
@@ -31,6 +32,7 @@
3132
import com.google.firebase.firestore.model.mutation.MutationBatch;
3233
import com.google.firebase.firestore.model.mutation.Overlay;
3334
import com.google.firebase.firestore.model.mutation.PatchMutation;
35+
import java.util.Collections;
3436
import java.util.HashMap;
3537
import java.util.HashSet;
3638
import java.util.List;
@@ -84,15 +86,10 @@ DocumentOverlayCache getDocumentOverlayCache() {
8486
*/
8587
Document getDocument(DocumentKey key) {
8688
Overlay overlay = documentOverlayCache.getOverlay(key);
87-
// Only read from remote document cache if overlay is a patch.
88-
MutableDocument document =
89-
(overlay == null || overlay.getMutation() instanceof PatchMutation)
90-
? remoteDocumentCache.get(key)
91-
: MutableDocument.newInvalidDocument(key);
89+
MutableDocument document = createBaseDocument(key, overlay);
9290
if (overlay != null) {
9391
overlay.getMutation().applyToLocalView(document, null, Timestamp.now());
9492
}
95-
9693
return document;
9794
}
9895

@@ -117,21 +114,35 @@ ImmutableSortedMap<DocumentKey, Document> getDocuments(Iterable<DocumentKey> key
117114
*/
118115
ImmutableSortedMap<DocumentKey, Document> getLocalViewOfDocuments(
119116
Map<DocumentKey, MutableDocument> docs, Set<DocumentKey> existenceStateChanged) {
117+
return computeView(docs, Collections.emptyMap(), existenceStateChanged);
118+
}
119+
120+
/**
121+
* Computes the local view for doc, applying overlays from both {@code memoizedOverlays} and the
122+
* overlay cache.
123+
*/
124+
private ImmutableSortedMap<DocumentKey, Document> computeView(
125+
Map<DocumentKey, MutableDocument> docs,
126+
Map<DocumentKey, Overlay> memoizedOverlays,
127+
Set<DocumentKey> existenceStateChanged) {
120128
ImmutableSortedMap<DocumentKey, Document> results = emptyDocumentMap();
121129
Map<DocumentKey, MutableDocument> recalculateDocuments = new HashMap<>();
122-
for (Map.Entry<DocumentKey, MutableDocument> entry : docs.entrySet()) {
123-
Overlay overlay = documentOverlayCache.getOverlay(entry.getKey());
130+
for (MutableDocument doc : docs.values()) {
131+
Overlay overlay =
132+
memoizedOverlays.containsKey(doc.getKey())
133+
? memoizedOverlays.get(doc.getKey())
134+
: documentOverlayCache.getOverlay(doc.getKey());
124135
// Recalculate an overlay if the document's existence state is changed due to a remote
125136
// event *and* the overlay is a PatchMutation. This is because document existence state
126137
// can change if some patch mutation's preconditions are met.
127138
// NOTE: we recalculate when `overlay` is null as well, because there might be a patch
128139
// mutation whose precondition does not match before the change (hence overlay==null),
129140
// but would now match.
130-
if (existenceStateChanged.contains(entry.getKey())
141+
if (existenceStateChanged.contains(doc.getKey())
131142
&& (overlay == null || overlay.getMutation() instanceof PatchMutation)) {
132-
recalculateDocuments.put(entry.getKey(), docs.get(entry.getKey()));
143+
recalculateDocuments.put(doc.getKey(), doc);
133144
} else if (overlay != null) {
134-
overlay.getMutation().applyToLocalView(entry.getValue(), null, Timestamp.now());
145+
overlay.getMutation().applyToLocalView(doc, null, Timestamp.now());
135146
}
136147
}
137148

@@ -190,14 +201,6 @@ void recalculateAndSaveOverlays(Set<DocumentKey> documentKeys) {
190201
recalculateAndSaveOverlays(docs);
191202
}
192203

193-
/** Gets the local view of the next {@code count} documents based on their read time. */
194-
ImmutableSortedMap<DocumentKey, Document> getDocuments(
195-
String collectionGroup, IndexOffset offset, int count) {
196-
Map<DocumentKey, MutableDocument> docs =
197-
remoteDocumentCache.getAll(collectionGroup, offset, count);
198-
return getLocalViewOfDocuments(docs, new HashSet<>());
199-
}
200-
201204
/**
202205
* Performs a query against the local view of all documents.
203206
*
@@ -250,7 +253,43 @@ private ImmutableSortedMap<DocumentKey, Document> getDocumentsMatchingCollection
250253
return results;
251254
}
252255

253-
/** Queries the remote documents and overlays by doing a full collection scan. */
256+
/**
257+
* Given a collection group, returns the next documents that follow the provided offset, along
258+
* with an updated batch ID.
259+
*
260+
* <p>The documents returned by this method are ordered by remote version from the provided
261+
* offset. If there are no more remote documents after the provided offset, documents with
262+
* mutations in order of batch id from the offset are returned. Since all documents in a batch are
263+
* returned together, the total number of documents returned can exceed {@code count}.
264+
*
265+
* @param collectionGroup The collection group for the documents.
266+
* @param offset The offset to index into.
267+
* @param count The number of documents to return
268+
* @return A LocalDocumentsRResult with the documents that follow the provided offset and the last
269+
* processed batch id.
270+
*/
271+
LocalDocumentsResult getNextDocuments(String collectionGroup, IndexOffset offset, int count) {
272+
Map<DocumentKey, MutableDocument> docs =
273+
remoteDocumentCache.getAll(collectionGroup, offset, count);
274+
Map<DocumentKey, Overlay> overlays =
275+
count - docs.size() > 0
276+
? documentOverlayCache.getOverlays(
277+
collectionGroup, offset.getLargestBatchId(), count - docs.size())
278+
: Collections.emptyMap();
279+
280+
int largestBatchId = -1;
281+
for (Overlay overlay : overlays.values()) {
282+
if (!docs.containsKey(overlay.getKey())) {
283+
docs.put(overlay.getKey(), createBaseDocument(overlay.getKey(), overlay));
284+
}
285+
largestBatchId = Math.max(largestBatchId, overlay.getLargestBatchId());
286+
}
287+
288+
ImmutableSortedMap<DocumentKey, Document> localDocs =
289+
computeView(docs, overlays, Collections.emptySet());
290+
return new LocalDocumentsResult(largestBatchId, localDocs);
291+
}
292+
254293
private ImmutableSortedMap<DocumentKey, Document> getDocumentsMatchingCollectionQuery(
255294
Query query, IndexOffset offset) {
256295
Map<DocumentKey, MutableDocument> remoteDocuments =
@@ -281,4 +320,11 @@ private ImmutableSortedMap<DocumentKey, Document> getDocumentsMatchingCollection
281320

282321
return results;
283322
}
323+
324+
/** Returns a base document that can be used to apply `overlay`. */
325+
private MutableDocument createBaseDocument(DocumentKey key, @Nullable Overlay overlay) {
326+
return (overlay == null || overlay.getMutation() instanceof PatchMutation)
327+
? remoteDocumentCache.get(key)
328+
: MutableDocument.newInvalidDocument(key);
329+
}
284330
}

firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalStore.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ public ImmutableSortedMap<DocumentKey, Document> handleUserChange(User user) {
236236
}
237237

238238
/** Accepts locally generated Mutations and commits them to storage. */
239-
public LocalWriteResult writeLocally(List<Mutation> mutations) {
239+
public LocalDocumentsResult writeLocally(List<Mutation> mutations) {
240240
Timestamp localWriteTime = Timestamp.now();
241241

242242
// TODO: Call queryEngine.handleDocumentChange() appropriately.
@@ -277,7 +277,7 @@ public LocalWriteResult writeLocally(List<Mutation> mutations) {
277277
mutationQueue.addMutationBatch(localWriteTime, baseMutations, mutations);
278278
Map<DocumentKey, Mutation> overlays = batch.applyToLocalDocumentSet(documents);
279279
documentOverlayCache.saveOverlays(batch.getBatchId(), overlays);
280-
return new LocalWriteResult(batch.getBatchId(), documents);
280+
return new LocalDocumentsResult(batch.getBatchId(), documents);
281281
});
282282
}
283283

firebase-firestore/src/main/java/com/google/firebase/firestore/local/MemoryRemoteDocumentCache.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,9 @@ final class MemoryRemoteDocumentCache implements RemoteDocumentCache {
3737
private ImmutableSortedMap<DocumentKey, Document> docs;
3838
/** Manages the collection group index. */
3939
private IndexManager indexManager;
40-
/** The latest read time of any document in the cache. */
41-
private SnapshotVersion latestReadTime;
4240

4341
MemoryRemoteDocumentCache() {
4442
docs = emptyDocumentMap();
45-
latestReadTime = SnapshotVersion.NONE;
4643
}
4744

4845
@Override
@@ -57,7 +54,6 @@ public void add(MutableDocument document, SnapshotVersion readTime) {
5754
!readTime.equals(SnapshotVersion.NONE),
5855
"Cannot add document to the RemoteDocumentCache with a read time of zero");
5956
docs = docs.insert(document.getKey(), document.mutableCopy().withReadTime(readTime));
60-
latestReadTime = readTime.compareTo(latestReadTime) > 0 ? readTime : latestReadTime;
6157

6258
indexManager.addToCollectionParentIndex(document.getKey().getCollectionPath());
6359
}
@@ -132,11 +128,6 @@ public Map<DocumentKey, MutableDocument> getAll(ResourcePath collection, IndexOf
132128
return result;
133129
}
134130

135-
@Override
136-
public SnapshotVersion getLatestReadTime() {
137-
return latestReadTime;
138-
}
139-
140131
Iterable<Document> getDocuments() {
141132
return new DocumentIterable();
142133
}

firebase-firestore/src/main/java/com/google/firebase/firestore/local/QueryEngine.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,9 @@ && needsRefill(
160160
}
161161

162162
return appendRemainingResults(
163-
previousResults, query, IndexOffset.create(lastLimboFreeSnapshotVersion));
163+
previousResults,
164+
query,
165+
IndexOffset.create(lastLimboFreeSnapshotVersion, FieldIndex.INITIAL_LARGEST_BATCH_ID));
164166
}
165167

166168
/** Applies the query filter and sorting to the provided documents. */

firebase-firestore/src/main/java/com/google/firebase/firestore/local/RemoteDocumentCache.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,4 @@ interface RemoteDocumentCache {
8484
* @return A newly created map with the set of documents in the collection.
8585
*/
8686
Map<DocumentKey, MutableDocument> getAll(ResourcePath collection, IndexOffset offset);
87-
88-
/** Returns the latest read time of any document in the cache. */
89-
SnapshotVersion getLatestReadTime();
9087
}

0 commit comments

Comments
 (0)