Skip to content

Add largest batch ID #3331

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ private ImmutableSortedMap<DocumentKey, Document> getDocumentsMatchingCollection
Query query, IndexOffset offset) {
Map<DocumentKey, MutableDocument> remoteDocuments =
remoteDocumentCache.getAll(query.getPath(), offset);
Map<DocumentKey, Overlay> overlays = documentOverlayCache.getOverlays(query.getPath(), -1);
Map<DocumentKey, Overlay> overlays =
documentOverlayCache.getOverlays(query.getPath(), offset.getLargestBatchId());

// As documents might match the query because of their overlay we need to include documents
// for all overlays in the initial document set.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ public void start() {
// Fetch all index states if persisted for the user. These states contain per user information
// on how up to date the index is.
db.query(
"SELECT index_id, sequence_number, read_time_seconds, read_time_nanos, document_key "
+ "FROM index_state WHERE uid = ?")
"SELECT index_id, sequence_number, read_time_seconds, read_time_nanos, document_key, "
+ "largest_batch_id FROM index_state WHERE uid = ?")
.binding(uid)
.forEach(
row -> {
Expand All @@ -117,8 +117,11 @@ public void start() {
new SnapshotVersion(new Timestamp(row.getLong(2), row.getInt(3)));
DocumentKey documentKey =
DocumentKey.fromPath(EncodedPath.decodeResourcePath(row.getString(4)));
int largestBatchId = row.getInt(5);
indexStates.put(
indexId, FieldIndex.IndexState.create(sequenceNumber, readTime, documentKey));
indexId,
FieldIndex.IndexState.create(
sequenceNumber, readTime, documentKey, largestBatchId));
});

// Fetch all indices and combine with user's index state if available.
Expand Down Expand Up @@ -660,13 +663,15 @@ public void updateCollectionGroup(String collectionGroup, FieldIndex.IndexOffset
FieldIndex.IndexState.create(memoizedMaxSequenceNumber, offset));
db.execute(
"REPLACE INTO index_state (index_id, uid, sequence_number, "
+ "read_time_seconds, read_time_nanos, document_key) VALUES(?, ?, ?, ?, ?, ?)",
+ "read_time_seconds, read_time_nanos, document_key, largest_batch_id) "
+ "VALUES(?, ?, ?, ?, ?, ?, ?)",
fieldIndex.getIndexId(),
uid,
memoizedMaxSequenceNumber,
offset.getReadTime().getTimestamp().getSeconds(),
offset.getReadTime().getTimestamp().getNanoseconds(),
EncodedPath.encode(offset.getDocumentKey().getPath()));
EncodedPath.encode(offset.getDocumentKey().getPath()),
offset.getLargestBatchId());
memoizeIndex(updatedIndex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ private void createFieldIndex() {
+ "read_time_seconds INTEGER, " // Read time of last processed document
+ "read_time_nanos INTEGER, "
+ "document_key TEXT, " // Key of the last processed document
+ "largest_batch_id INTEGER, " // Largest mutation batch id that was processed
+ "PRIMARY KEY (index_id, uid))");

// The index entry table stores the encoded entries for all fields.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,15 @@ public abstract class FieldIndex {
/** An ID for an index that has not yet been added to persistence. */
public static final int UNKNOWN_ID = -1;

/** The initial mutation batch id for each index. Gets updated during index backfill. */
public static final int INITIAL_LARGEST_BATCH_ID = -1;

/** The initial sequence number for each index. Gets updated during index backfill. */
public static final int INITIAL_SEQUENCE_NUMBER = 0;

/** The state of an index that has not yet been backfilled. */
public static IndexState INITIAL_STATE =
IndexState.create(INITIAL_SEQUENCE_NUMBER, SnapshotVersion.NONE, DocumentKey.empty());
IndexState.create(INITIAL_SEQUENCE_NUMBER, IndexOffset.NONE);

/** Compares indexes by collection group and segments. Ignores update time and index ID. */
public static final Comparator<FieldIndex> SEMANTIC_COMPARATOR =
Expand Down Expand Up @@ -100,8 +103,11 @@ public static IndexState create(long sequenceNumber, IndexOffset offset) {
}

public static IndexState create(
long sequenceNumber, SnapshotVersion readTime, DocumentKey documentKey) {
return create(sequenceNumber, IndexOffset.create(readTime, documentKey));
long sequenceNumber,
SnapshotVersion readTime,
DocumentKey documentKey,
int largestBatchId) {
return create(sequenceNumber, IndexOffset.create(readTime, documentKey, largestBatchId));
}

/**
Expand All @@ -116,17 +122,20 @@ public static IndexState create(
/** Stores the latest read time and document that were processed for an index. */
@AutoValue
public abstract static class IndexOffset implements Comparable<IndexOffset> {
public static final IndexOffset NONE =
create(SnapshotVersion.NONE, DocumentKey.empty(), INITIAL_LARGEST_BATCH_ID);

public static final Comparator<MutableDocument> DOCUMENT_COMPARATOR =
(l, r) -> IndexOffset.fromDocument(l).compareTo(IndexOffset.fromDocument(r));

public static final IndexOffset NONE = create(SnapshotVersion.NONE, DocumentKey.empty());

/**
* Creates an offset that matches all documents with a read time higher than {@code readTime} or
* with a key higher than {@code documentKey} for equal read times.
* with a key higher than {@code documentKey} for equal read times. The largest batch ID is used
* as a final tie breaker.
*/
public static IndexOffset create(SnapshotVersion readTime, DocumentKey documentKey) {
return new AutoValue_FieldIndex_IndexOffset(readTime, documentKey);
public static IndexOffset create(
SnapshotVersion readTime, DocumentKey key, int largestBatchId) {
return new AutoValue_FieldIndex_IndexOffset(readTime, key, largestBatchId);
}

/**
Expand All @@ -145,12 +154,12 @@ public static IndexOffset create(SnapshotVersion readTime) {
successorNanos == 1e9
? new Timestamp(successorSeconds + 1, 0)
: new Timestamp(successorSeconds, successorNanos));
return new AutoValue_FieldIndex_IndexOffset(successor, DocumentKey.empty());
return create(successor, DocumentKey.empty(), INITIAL_LARGEST_BATCH_ID);
}

/** Creates a new offset based on the provided document. */
public static IndexOffset fromDocument(Document document) {
return new AutoValue_FieldIndex_IndexOffset(document.getReadTime(), document.getKey());
return create(document.getReadTime(), document.getKey(), INITIAL_LARGEST_BATCH_ID);
}

/**
Expand All @@ -164,10 +173,17 @@ public static IndexOffset fromDocument(Document document) {
*/
public abstract DocumentKey getDocumentKey();

/*
* Returns the largest mutation batch id that's been processed by Firestore.
*/
public abstract int getLargestBatchId();

public int compareTo(IndexOffset other) {
int cmp = getReadTime().compareTo(other.getReadTime());
if (cmp != 0) return cmp;
return getDocumentKey().compareTo(other.getDocumentKey());
cmp = getDocumentKey().compareTo(other.getDocumentKey());
if (cmp != 0) return cmp;
return Integer.compare(getLargestBatchId(), other.getLargestBatchId());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ private void addFieldIndex(String collectionGroup, String fieldName, SnapshotVer
fieldIndex(
collectionGroup,
FieldIndex.UNKNOWN_ID,
FieldIndex.IndexState.create(0, version, DocumentKey.empty()),
FieldIndex.IndexState.create(0, version, DocumentKey.empty(), -1),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: use INITIAL_LARGEST_BATCH_ID here and throughout

fieldName,
FieldIndex.Segment.Kind.ASCENDING);
indexManager.addFieldIndex(fieldIndex);
Expand All @@ -300,7 +300,7 @@ private void addFieldIndex(String collectionGroup, String fieldName, long sequen
fieldIndex(
collectionGroup,
FieldIndex.UNKNOWN_ID,
FieldIndex.IndexState.create(sequenceNumber, SnapshotVersion.NONE, DocumentKey.empty()),
FieldIndex.IndexState.create(sequenceNumber, IndexOffset.NONE),
fieldName,
FieldIndex.Segment.Kind.ASCENDING);
indexManager.addFieldIndex(fieldIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public void testGetAllFromSinceReadTimeAndDocumentKey() {

ResourcePath collection = path("b");
Map<DocumentKey, MutableDocument> results =
remoteDocumentCache.getAll(collection, IndexOffset.create(version(11), key("b/b")));
remoteDocumentCache.getAll(collection, IndexOffset.create(version(11), key("b/b"), -1));
assertThat(results.values()).containsExactly(doc("b/c", 3, DOC_DATA), doc("b/d", 4, DOC_DATA));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,19 +582,18 @@ public void testAdvancedQueries() {
}

@Test
public void testUpdateTime() {
indexManager.addFieldIndex(
fieldIndex(
"coll1",
1,
IndexState.create(-1, version(20), DocumentKey.empty()),
"value",
Kind.ASCENDING));
public void testPersistsIndexOffset() {
indexManager.addFieldIndex(fieldIndex("coll1", "value", Kind.ASCENDING));
IndexOffset offset = IndexOffset.create(version(20), key("coll/doc"), 42);
indexManager.updateCollectionGroup("coll1", offset);

indexManager = persistence.getIndexManager(User.UNAUTHENTICATED);
indexManager.start();

Collection<FieldIndex> indexes = indexManager.getFieldIndexes("coll1");
assertEquals(indexes.size(), 1);
FieldIndex index = indexes.iterator().next();
assertEquals(index.getIndexState().getOffset().getReadTime(), version(20));
assertEquals(offset, index.getIndexState().getOffset());
}

@Test
Expand Down Expand Up @@ -638,19 +637,9 @@ public void testGetFieldIndexes() {
@Test
public void testDeleteFieldIndexRemovesEntryFromCollectionGroup() {
indexManager.addFieldIndex(
fieldIndex(
"coll1",
1,
IndexState.create(1, version(30), DocumentKey.empty()),
"value",
Kind.ASCENDING));
fieldIndex("coll1", 1, IndexState.create(1, IndexOffset.NONE), "value", Kind.ASCENDING));
indexManager.addFieldIndex(
fieldIndex(
"coll2",
2,
IndexState.create(2, version(0), DocumentKey.empty()),
"value",
Kind.CONTAINS));
fieldIndex("coll2", 2, IndexState.create(2, IndexOffset.NONE), "value", Kind.CONTAINS));
String collectionGroup = indexManager.getNextCollectionGroupToUpdate();
assertEquals("coll1", collectionGroup);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void comparatorIgnoresIndexState() {
FieldIndex indexOriginal = fieldIndex("collA", 1, FieldIndex.INITIAL_STATE);
FieldIndex indexSame = fieldIndex("collA", 1, FieldIndex.INITIAL_STATE);
FieldIndex indexDifferent =
fieldIndex("collA", 1, IndexState.create(1, version(2), DocumentKey.empty()));
fieldIndex("collA", 1, IndexState.create(1, version(2), DocumentKey.empty(), -1));
assertEquals(0, SEMANTIC_COMPARATOR.compare(indexOriginal, indexSame));
assertEquals(0, SEMANTIC_COMPARATOR.compare(indexOriginal, indexDifferent));
}
Expand Down Expand Up @@ -94,10 +94,10 @@ public void comparatorIncludesSegmentsLength() {

@Test
public void indexOffsetComparator() {
IndexOffset docAOffset = IndexOffset.create(version(1), key("foo/a"));
IndexOffset docBOffset = IndexOffset.create(version(1), key("foo/b"));
IndexOffset docAOffset = IndexOffset.create(version(1), key("foo/a"), -1);
IndexOffset docBOffset = IndexOffset.create(version(1), key("foo/b"), -1);
IndexOffset version1Offset = IndexOffset.create(version(1));
IndexOffset docCOffset = IndexOffset.create(version(2), key("foo/c"));
IndexOffset docCOffset = IndexOffset.create(version(2), key("foo/c"), -1);
IndexOffset version2Offset = IndexOffset.create(version(2));

assertEquals(-1, docAOffset.compareTo(docBOffset));
Expand All @@ -110,7 +110,7 @@ public void indexOffsetComparator() {
@Test
public void indexOffsetAdvancesSeconds() {
IndexOffset actualSuccessor = IndexOffset.create(version(1, (int) 1e9 - 1));
IndexOffset expectedSuccessor = IndexOffset.create(version(2, 0), DocumentKey.empty());
IndexOffset expectedSuccessor = IndexOffset.create(version(2, 0), DocumentKey.empty(), -1);
assertEquals(expectedSuccessor, actualSuccessor);
}
}