Skip to content

Commit 373d69f

Browse files
WIP
1 parent 275b3eb commit 373d69f

14 files changed

+432
-111
lines changed

firebase-firestore/src/main/java/com/google/firebase/firestore/core/SyncEngine.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@
3232
import com.google.firebase.firestore.bundle.BundleMetadata;
3333
import com.google.firebase.firestore.bundle.BundleReader;
3434
import com.google.firebase.firestore.core.ViewSnapshot.SyncState;
35+
import com.google.firebase.firestore.local.LocalDocumentsResult;
3536
import com.google.firebase.firestore.local.LocalStore;
3637
import com.google.firebase.firestore.local.LocalViewChanges;
37-
import com.google.firebase.firestore.local.LocalWriteResult;
3838
import com.google.firebase.firestore.local.QueryPurpose;
3939
import com.google.firebase.firestore.local.QueryResult;
4040
import com.google.firebase.firestore.local.ReferenceSet;
@@ -274,10 +274,10 @@ void stopListening(Query query) {
274274
public void writeMutations(List<Mutation> mutations, TaskCompletionSource<Void> userTask) {
275275
assertCallback("writeMutations");
276276

277-
LocalWriteResult result = localStore.writeLocally(mutations);
278-
addUserCallback(result.getBatchId(), userTask);
277+
LocalDocumentsResult result = localStore.writeLocally(mutations);
278+
addUserCallback(result.getLargestBatchId(), userTask);
279279

280-
emitNewSnapsAndNotifyLocalStore(result.getChanges(), /*remoteEvent=*/ null);
280+
emitNewSnapsAndNotifyLocalStore(result.getDocuments(), /*remoteEvent=*/ null);
281281
remoteStore.fillWritePipeline();
282282
}
283283

firebase-firestore/src/main/java/com/google/firebase/firestore/local/IndexBackfiller.java

Lines changed: 48 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,15 @@
1818

1919
import androidx.annotation.Nullable;
2020
import androidx.annotation.VisibleForTesting;
21-
import com.google.firebase.database.collection.ImmutableSortedMap;
2221
import com.google.firebase.firestore.model.Document;
2322
import com.google.firebase.firestore.model.DocumentKey;
2423
import com.google.firebase.firestore.model.FieldIndex;
2524
import com.google.firebase.firestore.model.FieldIndex.IndexOffset;
25+
import com.google.firebase.firestore.model.SnapshotVersion;
2626
import com.google.firebase.firestore.util.AsyncQueue;
2727
import com.google.firebase.firestore.util.Logger;
2828
import java.util.Collection;
2929
import java.util.HashSet;
30-
import java.util.Iterator;
3130
import java.util.Map;
3231
import java.util.Set;
3332
import java.util.concurrent.TimeUnit;
@@ -52,8 +51,8 @@ public class IndexBackfiller {
5251

5352
public IndexBackfiller(Persistence persistence, AsyncQueue asyncQueue) {
5453
this.persistence = persistence;
55-
this.scheduler = new Scheduler(asyncQueue);
5654
this.remoteDocumentCache = persistence.getRemoteDocumentCache();
55+
this.scheduler = new Scheduler(asyncQueue);
5756
}
5857

5958
public void setLocalDocumentsView(LocalDocumentsView localDocumentsView) {
@@ -110,12 +109,11 @@ public Scheduler getScheduler() {
110109
public int backfill() {
111110
hardAssert(localDocumentsView != null, "setLocalDocumentsView() not called");
112111
hardAssert(indexManager != null, "setIndexManager() not called");
113-
return persistence.runTransaction(
114-
"Backfill Indexes", () -> writeIndexEntries(localDocumentsView));
112+
return persistence.runTransaction("Backfill Indexes", this::writeIndexEntries);
115113
}
116114

117115
/** Writes index entries until the cap is reached. Returns the number of documents processed. */
118-
private int writeIndexEntries(LocalDocumentsView localDocumentsView) {
116+
private int writeIndexEntries() {
119117
Set<String> processedCollectionGroups = new HashSet<>();
120118
int documentsRemaining = maxDocumentsToProcess;
121119
while (documentsRemaining > 0) {
@@ -124,52 +122,43 @@ private int writeIndexEntries(LocalDocumentsView localDocumentsView) {
124122
break;
125123
}
126124
Logger.debug(LOG_TAG, "Processing collection: %s", collectionGroup);
127-
documentsRemaining -=
128-
writeEntriesForCollectionGroup(localDocumentsView, collectionGroup, documentsRemaining);
125+
documentsRemaining -= writeEntriesForCollectionGroup(collectionGroup, documentsRemaining);
129126
processedCollectionGroups.add(collectionGroup);
130127
}
131128
return maxDocumentsToProcess - documentsRemaining;
132129
}
133130

134-
/** Writes entries for the fetched field indexes. */
131+
/**
132+
* Writes entries for the provided collection group. Returns the number of documents processed.
133+
*/
135134
private int writeEntriesForCollectionGroup(
136-
LocalDocumentsView localDocumentsView, String collectionGroup, int entriesRemainingUnderCap) {
137-
// TODO(indexing): Support mutation batch Ids when sorting and writing indexes.
138-
135+
String collectionGroup, int documentsRemainingUnderCap) {
139136
// Use the earliest offset of all field indexes to query the local cache.
140-
IndexOffset existingOffset = getExistingOffset(indexManager.getFieldIndexes(collectionGroup));
141-
ImmutableSortedMap<DocumentKey, Document> documents =
142-
localDocumentsView.getDocuments(collectionGroup, existingOffset, entriesRemainingUnderCap);
143-
indexManager.updateIndexEntries(documents);
137+
Collection<FieldIndex> fieldIndexes = indexManager.getFieldIndexes(collectionGroup);
138+
IndexOffset existingOffset = getExistingOffset(fieldIndexes);
144139

145-
IndexOffset newOffset = getNewOffset(documents, existingOffset);
146-
indexManager.updateCollectionGroup(collectionGroup, newOffset);
140+
LocalDocumentsResult nextDocuments =
141+
localDocumentsView.getNextDocuments(
142+
collectionGroup, existingOffset, documentsRemainingUnderCap);
147143

148-
return documents.size();
149-
}
144+
indexManager.updateIndexEntries(nextDocuments.getDocuments());
145+
IndexOffset newOffset = getNewOffset(existingOffset, nextDocuments);
146+
indexManager.updateCollectionGroup(collectionGroup, newOffset);
150147

151-
/** Returns the lowest offset for the provided index group. */
152-
private IndexOffset getExistingOffset(Collection<FieldIndex> fieldIndexes) {
153-
IndexOffset lowestOffset = null;
154-
for (FieldIndex fieldIndex : fieldIndexes) {
155-
if (lowestOffset == null
156-
|| fieldIndex.getIndexState().getOffset().compareTo(lowestOffset) < 0) {
157-
lowestOffset = fieldIndex.getIndexState().getOffset();
158-
}
159-
}
160-
return lowestOffset == null ? IndexOffset.NONE : lowestOffset;
148+
return nextDocuments.getDocuments().size();
161149
}
162150

163-
/** Returns the offset for the index based on the newly indexed documents. */
164-
private IndexOffset getNewOffset(
165-
ImmutableSortedMap<DocumentKey, Document> documents, IndexOffset currentOffset) {
166-
if (documents.isEmpty()) {
167-
return IndexOffset.create(remoteDocumentCache.getLatestReadTime());
151+
private IndexOffset getNewOffset(IndexOffset existingOffset, LocalDocumentsResult lookupResult) {
152+
if (lookupResult.getDocuments().isEmpty()) {
153+
return IndexOffset.create(
154+
remoteDocumentCache.getLatestReadTime(),
155+
DocumentKey.empty(),
156+
existingOffset.getLargestBatchId());
168157
} else {
169-
IndexOffset latestOffset = currentOffset;
170-
Iterator<Map.Entry<DocumentKey, Document>> it = documents.iterator();
171-
while (it.hasNext()) {
172-
IndexOffset newOffset = IndexOffset.fromDocument(it.next().getValue());
158+
IndexOffset latestOffset = existingOffset;
159+
for (Map.Entry<DocumentKey, Document> entry : lookupResult.getDocuments()) {
160+
IndexOffset newOffset =
161+
IndexOffset.fromDocument(entry.getValue(), lookupResult.getLargestBatchId());
173162
if (newOffset.compareTo(latestOffset) > 0) {
174163
latestOffset = newOffset;
175164
}
@@ -178,6 +167,26 @@ private IndexOffset getNewOffset(
178167
}
179168
}
180169

170+
/** Returns the lowest offset for the provided index group. */
171+
private IndexOffset getExistingOffset(Collection<FieldIndex> fieldIndexes) {
172+
SnapshotVersion readTime = SnapshotVersion.NONE;
173+
DocumentKey key = DocumentKey.empty();
174+
int batchId = 0;
175+
176+
for (FieldIndex fieldIndex : fieldIndexes) {
177+
IndexOffset offset = fieldIndex.getIndexState().getOffset();
178+
if (readTime.compareTo(offset.getReadTime()) > 0) {
179+
readTime = offset.getReadTime();
180+
key = offset.getDocumentKey();
181+
} else if (readTime.compareTo(offset.getReadTime()) == 0) {
182+
key = key.compareTo(offset.getDocumentKey()) < 0 ? key : offset.getDocumentKey();
183+
}
184+
batchId = Math.max(batchId, offset.getLargestBatchId());
185+
}
186+
187+
return IndexOffset.create(readTime, key, batchId);
188+
}
189+
181190
@VisibleForTesting
182191
void setMaxDocumentsToProcess(int newMax) {
183192
maxDocumentsToProcess = newMax;

firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalWriteResult.java renamed to firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalDocumentsResult.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,21 @@
1818
import com.google.firebase.firestore.model.Document;
1919
import com.google.firebase.firestore.model.DocumentKey;
2020

21-
/** The result of a write to the local store. */
22-
public final class LocalWriteResult {
23-
private final int batchId;
21+
/** The result of a applying local mutations. */
22+
public final class LocalDocumentsResult {
23+
private final int largestBatchId;
24+
private final ImmutableSortedMap<DocumentKey, Document> documents;
2425

25-
private final ImmutableSortedMap<DocumentKey, Document> changes;
26-
27-
LocalWriteResult(int batchId, ImmutableSortedMap<DocumentKey, Document> changes) {
28-
this.batchId = batchId;
29-
this.changes = changes;
26+
LocalDocumentsResult(int largestBatchId, ImmutableSortedMap<DocumentKey, Document> documents) {
27+
this.largestBatchId = largestBatchId;
28+
this.documents = documents;
3029
}
3130

32-
public int getBatchId() {
33-
return batchId;
31+
public int getLargestBatchId() {
32+
return largestBatchId;
3433
}
3534

36-
public ImmutableSortedMap<DocumentKey, Document> getChanges() {
37-
return changes;
35+
public ImmutableSortedMap<DocumentKey, Document> getDocuments() {
36+
return documents;
3837
}
3938
}

firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalDocumentsView.java

Lines changed: 71 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import com.google.firebase.firestore.model.mutation.MutationBatch;
3232
import com.google.firebase.firestore.model.mutation.Overlay;
3333
import com.google.firebase.firestore.model.mutation.PatchMutation;
34+
import java.util.ArrayList;
35+
import java.util.Collections;
3436
import java.util.HashMap;
3537
import java.util.HashSet;
3638
import java.util.List;
@@ -112,31 +114,28 @@ ImmutableSortedMap<DocumentKey, Document> getDocuments(Iterable<DocumentKey> key
112114
return getLocalViewOfDocuments(docs, new HashSet<>());
113115
}
114116

115-
/**
116-
* Similar to {@link #getDocuments}, but creates the local view from the given {@code baseDocs}
117-
* without retrieving documents from the local store.
118-
*
119-
* @param docs The documents to apply local mutations to get the local views.
120-
* @param existenceStateChanged The set of document keys whose existence state is changed. This is
121-
* useful to determine if some documents overlay needs to be recalculated.
122-
*/
123-
ImmutableSortedMap<DocumentKey, Document> getLocalViewOfDocuments(
124-
Map<DocumentKey, MutableDocument> docs, Set<DocumentKey> existenceStateChanged) {
117+
private ImmutableSortedMap<DocumentKey, Document> computeView(
118+
Map<DocumentKey, MutableDocument> docs,
119+
Map<DocumentKey, Overlay> memoizedOverlays,
120+
Set<DocumentKey> existenceStateChanged) {
125121
ImmutableSortedMap<DocumentKey, Document> results = emptyDocumentMap();
126122
Map<DocumentKey, MutableDocument> recalculateDocuments = new HashMap<>();
127-
for (Map.Entry<DocumentKey, MutableDocument> entry : docs.entrySet()) {
128-
Overlay overlay = documentOverlayCache.getOverlay(entry.getKey());
123+
for (MutableDocument doc : docs.values()) {
124+
Overlay overlay =
125+
memoizedOverlays.containsKey(doc.getKey())
126+
? memoizedOverlays.get(doc.getKey())
127+
: documentOverlayCache.getOverlay(doc.getKey());
129128
// Recalculate an overlay if the document's existence state is changed due to a remote
130129
// event *and* the overlay is a PatchMutation. This is because document existence state
131130
// can change if some patch mutation's preconditions are met.
132131
// NOTE: we recalculate when `overlay` is null as well, because there might be a patch
133132
// mutation whose precondition does not match before the change (hence overlay==null),
134133
// but would now match.
135-
if (existenceStateChanged.contains(entry.getKey())
134+
if (existenceStateChanged.contains(doc.getKey())
136135
&& (overlay == null || overlay.getMutation() instanceof PatchMutation)) {
137-
recalculateDocuments.put(entry.getKey(), docs.get(entry.getKey()));
136+
recalculateDocuments.put(doc.getKey(), doc);
138137
} else if (overlay != null) {
139-
overlay.getMutation().applyToLocalView(entry.getValue(), null, Timestamp.now());
138+
overlay.getMutation().applyToLocalView(doc, null, Timestamp.now());
140139
}
141140
}
142141

@@ -148,6 +147,19 @@ ImmutableSortedMap<DocumentKey, Document> getLocalViewOfDocuments(
148147
return results;
149148
}
150149

150+
/**
151+
* Similar to {@link #getDocuments}, but creates the local view from the given {@code baseDocs}
152+
* without retrieving documents from the local store.
153+
*
154+
* @param docs The documents to apply local mutations to get the local views.
155+
* @param existenceStateChanged The set of document keys whose existence state is changed. This is
156+
* useful to determine if some documents overlay needs to be recalculated.
157+
*/
158+
ImmutableSortedMap<DocumentKey, Document> getLocalViewOfDocuments(
159+
Map<DocumentKey, MutableDocument> docs, Set<DocumentKey> existenceStateChanged) {
160+
return computeView(docs, Collections.emptyMap(), existenceStateChanged);
161+
}
162+
151163
private void recalculateAndSaveOverlays(Map<DocumentKey, MutableDocument> docs) {
152164
List<MutationBatch> batches =
153165
mutationQueue.getAllMutationBatchesAffectingDocumentKeys(docs.keySet());
@@ -195,14 +207,6 @@ void recalculateAndSaveOverlays(Set<DocumentKey> documentKeys) {
195207
recalculateAndSaveOverlays(docs);
196208
}
197209

198-
/** Gets the local view of the next {@code count} documents based on their read time. */
199-
ImmutableSortedMap<DocumentKey, Document> getDocuments(
200-
String collectionGroup, IndexOffset offset, int count) {
201-
Map<DocumentKey, MutableDocument> docs =
202-
remoteDocumentCache.getAll(collectionGroup, offset, count);
203-
return getLocalViewOfDocuments(docs, new HashSet<>());
204-
}
205-
206210
/**
207211
* Performs a query against the local view of all documents.
208212
*
@@ -255,12 +259,54 @@ private ImmutableSortedMap<DocumentKey, Document> getDocumentsMatchingCollection
255259
return results;
256260
}
257261

258-
/** Queries the remote documents and overlays by doing a full collection scan. */
262+
/**
263+
* Given a collection group, returns the next documents that follow the provided offset, along
264+
* with an updated offset.
265+
*
266+
* <p>The documents returned by this method are ordered by remote version from the provided
267+
* offset. If there are no more remote documents after the provided offset, documents with
268+
* mutations in order of batch id from the offset are returned. Since all documents in a batch are
269+
* returned together, the total number of documents returned can exceed {@code count}.
270+
*
271+
* @param collectionGroup The collection group for the documents.
272+
* @param offset The offset to index into.
273+
* @param count The number of documents to return
274+
* @return A LocalDocumentsRResult with the documents that follow the provided offset and the last
275+
* processed batch id.
276+
*/
277+
LocalDocumentsResult getNextDocuments(String collectionGroup, IndexOffset offset, int count) {
278+
Map<DocumentKey, MutableDocument> docs =
279+
remoteDocumentCache.getAll(collectionGroup, offset, count);
280+
Map<DocumentKey, Overlay> overlays =
281+
count - docs.size() > 0
282+
? documentOverlayCache.getOverlays(
283+
collectionGroup, offset.getLargestBatchId(), count - docs.size())
284+
: Collections.emptyMap();
285+
286+
int largestBatchId = -1;
287+
List<DocumentKey> mutatedKeys = new ArrayList<>();
288+
for (Overlay overlay : overlays.values()) {
289+
if (!docs.containsKey(overlay.getKey())) {
290+
mutatedKeys.add(overlay.getKey());
291+
}
292+
largestBatchId = Math.max(largestBatchId, overlay.getLargestBatchId());
293+
}
294+
295+
if (!mutatedKeys.isEmpty()) {
296+
docs.putAll(remoteDocumentCache.getAll(mutatedKeys));
297+
}
298+
299+
ImmutableSortedMap<DocumentKey, Document> localDocs =
300+
computeView(docs, overlays, Collections.emptySet());
301+
return new LocalDocumentsResult(largestBatchId, localDocs);
302+
}
303+
259304
private ImmutableSortedMap<DocumentKey, Document> getDocumentsMatchingCollectionQuery(
260305
Query query, IndexOffset offset) {
261306
Map<DocumentKey, MutableDocument> remoteDocuments =
262307
remoteDocumentCache.getAll(query.getPath(), offset);
263-
Map<DocumentKey, Overlay> overlays = documentOverlayCache.getOverlays(query.getPath(), -1);
308+
Map<DocumentKey, Overlay> overlays =
309+
documentOverlayCache.getOverlays(query.getPath(), offset.getLargestBatchId());
264310

265311
// As documents might match the query because of their overlay we need to include documents
266312
// for all overlays in the initial document set.
@@ -282,7 +328,6 @@ private ImmutableSortedMap<DocumentKey, Document> getDocumentsMatchingCollection
282328
results = results.insert(docEntry.getKey(), docEntry.getValue());
283329
}
284330
}
285-
286331
return results;
287332
}
288333
}

firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalStore.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ public ImmutableSortedMap<DocumentKey, Document> handleUserChange(User user) {
236236
}
237237

238238
/** Accepts locally generated Mutations and commits them to storage. */
239-
public LocalWriteResult writeLocally(List<Mutation> mutations) {
239+
public LocalDocumentsResult writeLocally(List<Mutation> mutations) {
240240
Timestamp localWriteTime = Timestamp.now();
241241

242242
// TODO: Call queryEngine.handleDocumentChange() appropriately.
@@ -277,7 +277,7 @@ public LocalWriteResult writeLocally(List<Mutation> mutations) {
277277
mutationQueue.addMutationBatch(localWriteTime, baseMutations, mutations);
278278
Map<DocumentKey, Mutation> overlays = batch.applyToLocalDocumentSet(documents);
279279
documentOverlayCache.saveOverlays(batch.getBatchId(), overlays);
280-
return new LocalWriteResult(batch.getBatchId(), documents);
280+
return new LocalDocumentsResult(batch.getBatchId(), documents);
281281
});
282282
}
283283

firebase-firestore/src/main/java/com/google/firebase/firestore/local/MemoryRemoteDocumentCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public Map<DocumentKey, MutableDocument> getAll(ResourcePath collection, IndexOf
121121
continue;
122122
}
123123

124-
if (IndexOffset.fromDocument(doc).compareTo(offset) <= 0) {
124+
if (IndexOffset.fromDocument(doc, 0).compareTo(offset) <= 0) {
125125
// The document sorts before the offset.
126126
continue;
127127
}

0 commit comments

Comments
 (0)