diff --git a/Firestore/Example/Tests/Local/FSTLevelDBMigrationsTests.mm b/Firestore/Example/Tests/Local/FSTLevelDBMigrationsTests.mm index 5f25935ea41..818948b7fbf 100644 --- a/Firestore/Example/Tests/Local/FSTLevelDBMigrationsTests.mm +++ b/Firestore/Example/Tests/Local/FSTLevelDBMigrationsTests.mm @@ -20,6 +20,7 @@ #include #include +#import "Firestore/Protos/objc/firestore/local/Mutation.pbobjc.h" #import "Firestore/Protos/objc/firestore/local/Target.pbobjc.h" #import "Firestore/Source/Local/FSTLevelDB.h" #import "Firestore/Source/Local/FSTLevelDBMutationQueue.h" @@ -27,6 +28,7 @@ #include "Firestore/core/src/firebase/firestore/local/leveldb_key.h" #include "Firestore/core/src/firebase/firestore/local/leveldb_migrations.h" +#include "Firestore/core/src/firebase/firestore/model/document_key.h" #include "Firestore/core/src/firebase/firestore/util/ordered_code.h" #include "Firestore/core/src/firebase/firestore/util/status.h" #include "Firestore/core/test/firebase/firestore/testutil/testutil.h" @@ -38,6 +40,7 @@ NS_ASSUME_NONNULL_BEGIN using firebase::firestore::FirestoreErrorCode; +using firebase::firestore::local::LevelDbDocumentMutationKey; using firebase::firestore::local::LevelDbDocumentTargetKey; using firebase::firestore::local::LevelDbMigrations; using firebase::firestore::local::LevelDbMutationKey; @@ -47,6 +50,7 @@ using firebase::firestore::local::LevelDbTargetKey; using firebase::firestore::local::LevelDbTransaction; using firebase::firestore::model::BatchId; +using firebase::firestore::model::DocumentKey; using firebase::firestore::model::TargetId; using firebase::firestore::testutil::Key; using firebase::firestore::util::OrderedCode; @@ -199,6 +203,111 @@ - (void)testDropsTheQueryCacheWithThousandsOfEntries { } } +- (void)testRemovesMutationBatches { + std::string emptyBuffer; + DocumentKey testWriteFoo = DocumentKey::FromPathString("docs/foo"); + DocumentKey testWriteBar = DocumentKey::FromPathString("docs/bar"); + DocumentKey testWriteBaz = DocumentKey::FromPathString("docs/baz"); + DocumentKey testWritePending = DocumentKey::FromPathString("docs/pending"); + // Do everything up until the mutation batch migration. + LevelDbMigrations::RunMigrations(_db.get(), 3); + // Set up data + { + LevelDbTransaction transaction(_db.get(), "Setup Foo"); + // User 'foo' has two acknowledged mutations and one that is pending. + FSTPBMutationQueue *fooQueue = [[FSTPBMutationQueue alloc] init]; + fooQueue.lastAcknowledgedBatchId = 2; + std::string fooKey = LevelDbMutationQueueKey::Key("foo"); + transaction.Put(fooKey, fooQueue); + + FSTPBWriteBatch *fooBatch1 = [[FSTPBWriteBatch alloc] init]; + fooBatch1.batchId = 1; + std::string fooBatchKey1 = LevelDbMutationKey::Key("foo", 1); + transaction.Put(fooBatchKey1, fooBatch1); + transaction.Put(LevelDbDocumentMutationKey::Key("foo", testWriteFoo, 1), emptyBuffer); + + FSTPBWriteBatch *fooBatch2 = [[FSTPBWriteBatch alloc] init]; + fooBatch2.batchId = 2; + std::string fooBatchKey2 = LevelDbMutationKey::Key("foo", 2); + transaction.Put(fooBatchKey2, fooBatch2); + transaction.Put(LevelDbDocumentMutationKey::Key("foo", testWriteFoo, 2), emptyBuffer); + + FSTPBWriteBatch *fooBatch3 = [[FSTPBWriteBatch alloc] init]; + fooBatch3.batchId = 5; + std::string fooBatchKey3 = LevelDbMutationKey::Key("foo", 5); + transaction.Put(fooBatchKey3, fooBatch3); + transaction.Put(LevelDbDocumentMutationKey::Key("foo", testWritePending, 5), emptyBuffer); + + transaction.Commit(); + } + + { + LevelDbTransaction transaction(_db.get(), "Setup Bar"); + // User 'bar' has one acknowledged mutation and one that is pending + FSTPBMutationQueue *barQueue = [[FSTPBMutationQueue alloc] init]; + barQueue.lastAcknowledgedBatchId = 3; + std::string barKey = LevelDbMutationQueueKey::Key("bar"); + transaction.Put(barKey, barQueue); + + FSTPBWriteBatch *barBatch1 = [[FSTPBWriteBatch alloc] init]; + barBatch1.batchId = 3; + std::string barBatchKey1 = LevelDbMutationKey::Key("bar", 3); + transaction.Put(barBatchKey1, barBatch1); + transaction.Put(LevelDbDocumentMutationKey::Key("bar", testWriteBar, 3), emptyBuffer); + transaction.Put(LevelDbDocumentMutationKey::Key("bar", testWriteBaz, 3), emptyBuffer); + + FSTPBWriteBatch *barBatch2 = [[FSTPBWriteBatch alloc] init]; + barBatch2.batchId = 4; + std::string barBatchKey2 = LevelDbMutationKey::Key("bar", 4); + transaction.Put(barBatchKey2, barBatch2); + transaction.Put(LevelDbDocumentMutationKey::Key("bar", testWritePending, 4), emptyBuffer); + + transaction.Commit(); + } + + { + LevelDbTransaction transaction(_db.get(), "Setup Empty"); + // User 'empty' has no mutations + FSTPBMutationQueue *emptyQueue = [[FSTPBMutationQueue alloc] init]; + emptyQueue.lastAcknowledgedBatchId = -1; + std::string emptyKey = LevelDbMutationQueueKey::Key("empty"); + transaction.Put(emptyKey, emptyQueue); + + transaction.Commit(); + } + + LevelDbMigrations::RunMigrations(_db.get(), 4); + + { + // Verify + std::string buffer; + LevelDbTransaction transaction(_db.get(), "Verify"); + auto it = transaction.NewIterator(); + // verify that we deleted the correct batches + XCTAssertTrue(transaction.Get(LevelDbMutationKey::Key("foo", 1), &buffer).IsNotFound()); + XCTAssertTrue(transaction.Get(LevelDbMutationKey::Key("foo", 2), &buffer).IsNotFound()); + XCTAssertTrue(transaction.Get(LevelDbMutationKey::Key("foo", 5), &buffer).ok()); + + XCTAssertTrue(transaction.Get(LevelDbMutationKey::Key("bar", 3), &buffer).IsNotFound()); + XCTAssertTrue(transaction.Get(LevelDbMutationKey::Key("bar", 4), &buffer).ok()); + + // verify document associations have been removed + XCTAssertTrue(transaction.Get(LevelDbDocumentMutationKey::Key("foo", testWriteFoo, 1), &buffer) + .IsNotFound()); + XCTAssertTrue(transaction.Get(LevelDbDocumentMutationKey::Key("foo", testWriteFoo, 2), &buffer) + .IsNotFound()); + XCTAssertTrue( + transaction.Get(LevelDbDocumentMutationKey::Key("foo", testWritePending, 5), &buffer).ok()); + + XCTAssertTrue(transaction.Get(LevelDbDocumentMutationKey::Key("bar", testWriteBar, 3), &buffer) + .IsNotFound()); + XCTAssertTrue(transaction.Get(LevelDbDocumentMutationKey::Key("bar", testWriteBaz, 3), &buffer) + .IsNotFound()); + XCTAssertTrue( + transaction.Get(LevelDbDocumentMutationKey::Key("bar", testWritePending, 4), &buffer).ok()); + } +} + /** * Creates the name of a dummy entry to make sure the iteration is correctly bounded. */ diff --git a/Firestore/Source/Local/FSTLevelDBQueryCache.mm b/Firestore/Source/Local/FSTLevelDBQueryCache.mm index 978aa0d79eb..62b562553f0 100644 --- a/Firestore/Source/Local/FSTLevelDBQueryCache.mm +++ b/Firestore/Source/Local/FSTLevelDBQueryCache.mm @@ -193,7 +193,7 @@ - (void)enumerateOrphanedDocumentsUsingBlock: LevelDbDocumentTargetKey key; BOOL stop = NO; for (; !stop && it->Valid() && absl::StartsWith(it->key(), documentTargetPrefix); it->Next()) { - key.Decode(it->key()); + HARD_ASSERT(key.Decode(it->key()), "Failed to decode DocumentTarget key"); if (key.IsSentinel()) { // if nextToReport is non-zero, report it, this is a new key so the last one // must be not be a member of any targets. diff --git a/Firestore/core/src/firebase/firestore/local/leveldb_key.cc b/Firestore/core/src/firebase/firestore/local/leveldb_key.cc index c82a53f1dd8..a6d6bbe7bdf 100644 --- a/Firestore/core/src/firebase/firestore/local/leveldb_key.cc +++ b/Firestore/core/src/firebase/firestore/local/leveldb_key.cc @@ -656,7 +656,7 @@ std::string LevelDbMutationQueueKey::Key(absl::string_view user_id) { return writer.result(); } -bool LevelDbMutationQueueKey::Decode(leveldb::Slice key) { +bool LevelDbMutationQueueKey::Decode(absl::string_view key) { Reader reader{key}; reader.ReadTableNameMatching(kMutationQueuesTable); user_id_ = reader.ReadUserId(); diff --git a/Firestore/core/src/firebase/firestore/local/leveldb_key.h b/Firestore/core/src/firebase/firestore/local/leveldb_key.h index c31abefd3c0..c57769f95a2 100644 --- a/Firestore/core/src/firebase/firestore/local/leveldb_key.h +++ b/Firestore/core/src/firebase/firestore/local/leveldb_key.h @@ -121,6 +121,7 @@ class LevelDbMutationKey { * returned, this instance is in an undefined state until the next call to * `Decode()`. */ + ABSL_MUST_USE_RESULT bool Decode(absl::string_view key); /** The user that owns the mutation batches. */ @@ -184,6 +185,7 @@ class LevelDbDocumentMutationKey { * returned, this instance is in an undefined state until the next call to * `Decode()`. */ + ABSL_MUST_USE_RESULT bool Decode(absl::string_view key); /** The user that owns the mutation batches. */ @@ -235,7 +237,8 @@ class LevelDbMutationQueueKey { * returned, this instance is in an undefined state until the next call to * `Decode()`. */ - bool Decode(leveldb::Slice key); + ABSL_MUST_USE_RESULT + bool Decode(absl::string_view key); const std::string& user_id() const { return user_id_; @@ -259,6 +262,7 @@ class LevelDbTargetGlobalKey { * Decodes the contents of a target global key, essentially just verifying * that the key has the correct table name. */ + ABSL_MUST_USE_RESULT bool Decode(leveldb::Slice key); }; @@ -281,6 +285,7 @@ class LevelDbTargetKey { * returned, this instance is in an undefined state until the next call to * `Decode()`. */ + ABSL_MUST_USE_RESULT bool Decode(leveldb::Slice key); model::TargetId target_id() { @@ -322,6 +327,7 @@ class LevelDbQueryTargetKey { * returned, this instance is in an undefined state until the next call to * `Decode()`. */ + ABSL_MUST_USE_RESULT bool Decode(absl::string_view key); /** The canonical_id derived from the query. */ @@ -370,6 +376,7 @@ class LevelDbTargetDocumentKey { * returned, this instance is in an undefined state until the next call to * `Decode()`. */ + ABSL_MUST_USE_RESULT bool Decode(absl::string_view key); /** The target_id identifying a target. */ @@ -424,6 +431,7 @@ class LevelDbDocumentTargetKey { * returned, this instance is in an undefined state until the next call to * `Decode()`. */ + ABSL_MUST_USE_RESULT bool Decode(absl::string_view key); /** The target_id identifying a target. */ @@ -487,6 +495,7 @@ class LevelDbRemoteDocumentKey { * returned, this instance is in an undefined state until the next call to * `Decode()`. */ + ABSL_MUST_USE_RESULT bool Decode(absl::string_view key); /** The path to the document, as encoded in the key. */ diff --git a/Firestore/core/src/firebase/firestore/local/leveldb_migrations.cc b/Firestore/core/src/firebase/firestore/local/leveldb_migrations.cc index dc4d1594e56..6e8b97035e7 100644 --- a/Firestore/core/src/firebase/firestore/local/leveldb_migrations.cc +++ b/Firestore/core/src/firebase/firestore/local/leveldb_migrations.cc @@ -19,8 +19,10 @@ #include #include +#include "Firestore/Protos/nanopb/firestore/local/mutation.nanopb.h" #include "Firestore/Protos/nanopb/firestore/local/target.nanopb.h" #include "Firestore/core/src/firebase/firestore/local/leveldb_key.h" +#include "Firestore/core/src/firebase/firestore/nanopb/reader.h" #include "Firestore/core/src/firebase/firestore/nanopb/writer.h" #include "absl/strings/match.h" @@ -32,6 +34,7 @@ using leveldb::Iterator; using leveldb::Slice; using leveldb::Status; using leveldb::WriteOptions; +using nanopb::Reader; using nanopb::Writer; namespace { @@ -110,6 +113,72 @@ void ClearQueryCache(leveldb::DB* db) { transaction.Commit(); } +/** + * Removes document associations for the given user's mutation queue for + * any mutation with a `batch_id` less than or equal to + * `last_acknowledged_batch_id`. + */ +void RemoveMutationDocuments(LevelDbTransaction* transaction, + absl::string_view user_id, + int32_t last_acknowledged_batch_id) { + LevelDbDocumentMutationKey doc_key; + std::string prefix = LevelDbDocumentMutationKey::KeyPrefix(user_id); + + auto it = transaction->NewIterator(); + it->Seek(prefix); + for (; it->Valid() && absl::StartsWith(it->key(), prefix); it->Next()) { + HARD_ASSERT(doc_key.Decode(it->key()), + "Failed to decode document mutation key"); + if (doc_key.batch_id() <= last_acknowledged_batch_id) { + transaction->Delete(it->key()); + } + } +} + +/** + * Removes mutation batches for the given user with a `batch_id` less than + * or equal to `last_acknowledged_batch_id` + */ +void RemoveMutationBatches(LevelDbTransaction* transaction, + absl::string_view user_id, + int32_t last_acknowledged_batch_id) { + std::string mutations_key = LevelDbMutationKey::KeyPrefix(user_id); + std::string last_key = + LevelDbMutationKey::Key(user_id, last_acknowledged_batch_id); + auto it = transaction->NewIterator(); + it->Seek(mutations_key); + for (; it->Valid() && it->key() <= last_key; it->Next()) { + transaction->Delete(it->key()); + } +} + +/** Migration 4. */ +void RemoveAcknowledgedMutations(leveldb::DB* db) { + LevelDbTransaction transaction(db, "remove acknowledged mutations"); + std::string mutation_queue_start = LevelDbMutationQueueKey::KeyPrefix(); + + LevelDbMutationQueueKey key; + + auto it = transaction.NewIterator(); + it->Seek(mutation_queue_start); + for (; it->Valid() && absl::StartsWith(it->key(), mutation_queue_start); + it->Next()) { + HARD_ASSERT(key.Decode(it->key()), "Failed to decode mutation queue key"); + firestore_client_MutationQueue mutation_queue{}; + Reader reader = Reader::Wrap(it->value()); + reader.ReadNanopbMessage(firestore_client_MutationQueue_fields, + &mutation_queue); + HARD_ASSERT(reader.status().ok(), "Failed to deserialize MutationQueue"); + RemoveMutationBatches(&transaction, key.user_id(), + mutation_queue.last_acknowledged_batch_id); + RemoveMutationDocuments(&transaction, key.user_id(), + mutation_queue.last_acknowledged_batch_id); + } + + SaveVersion(4, &transaction); + transaction.Commit(); +} + } // namespace LevelDbMigrations::SchemaVersion LevelDbMigrations::ReadSchemaVersion( @@ -139,6 +208,9 @@ void LevelDbMigrations::RunMigrations(leveldb::DB* db, if (from_version < 3 && to_version >= 3) { ClearQueryCache(db); } + if (from_version < 4 && to_version >= 4) { + RemoveAcknowledgedMutations(db); + } } } // namespace local diff --git a/Firestore/core/src/firebase/firestore/nanopb/reader.cc b/Firestore/core/src/firebase/firestore/nanopb/reader.cc index ae8f4cdef96..fdc49330d92 100644 --- a/Firestore/core/src/firebase/firestore/nanopb/reader.cc +++ b/Firestore/core/src/firebase/firestore/nanopb/reader.cc @@ -30,6 +30,12 @@ Reader Reader::Wrap(const uint8_t* bytes, size_t length) { return Reader{pb_istream_from_buffer(bytes, length)}; } +Reader Reader::Wrap(absl::string_view string_view) { + return Reader{pb_istream_from_buffer( + reinterpret_cast(string_view.data()), + string_view.size())}; +} + uint32_t Reader::ReadTag() { Tag tag; if (!status_.ok()) return 0; diff --git a/Firestore/core/src/firebase/firestore/nanopb/reader.h b/Firestore/core/src/firebase/firestore/nanopb/reader.h index af5ecd4cd3c..30b3b04e6f2 100644 --- a/Firestore/core/src/firebase/firestore/nanopb/reader.h +++ b/Firestore/core/src/firebase/firestore/nanopb/reader.h @@ -59,6 +59,15 @@ class Reader { */ static Reader Wrap(const uint8_t* bytes, size_t length); + /** + * Creates an input stream from bytes backing the string_view. Note that + * the backing buffer must remain valid for the lifetime of this Reader. + * + * (This is roughly equivalent to the nanopb function + * pb_istream_from_buffer()) + */ + static Reader Wrap(absl::string_view); + /** * Reads a message type from the input stream. *