Skip to content

Drop acknowledged mutations in schema migration #1818

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Sep 15, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions Firestore/Example/Tests/Local/FSTLevelDBMigrationsTests.mm
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
#include <string>
#include <vector>

#import "Firestore/Protos/objc/firestore/local/Mutation.pbobjc.h"
#import "Firestore/Protos/objc/firestore/local/Target.pbobjc.h"
#import "Firestore/Source/Local/FSTLevelDB.h"
#import "Firestore/Source/Local/FSTLevelDBMutationQueue.h"
#import "Firestore/Source/Local/FSTLevelDBQueryCache.h"

#include "Firestore/core/src/firebase/firestore/local/leveldb_key.h"
#include "Firestore/core/src/firebase/firestore/local/leveldb_migrations.h"
#include "Firestore/core/src/firebase/firestore/model/document_key.h"
#include "Firestore/core/src/firebase/firestore/util/ordered_code.h"
#include "Firestore/core/src/firebase/firestore/util/status.h"
#include "Firestore/core/test/firebase/firestore/testutil/testutil.h"
Expand All @@ -38,6 +40,7 @@
NS_ASSUME_NONNULL_BEGIN

using firebase::firestore::FirestoreErrorCode;
using firebase::firestore::local::LevelDbDocumentMutationKey;
using firebase::firestore::local::LevelDbDocumentTargetKey;
using firebase::firestore::local::LevelDbMigrations;
using firebase::firestore::local::LevelDbMutationKey;
Expand All @@ -47,6 +50,7 @@
using firebase::firestore::local::LevelDbTargetKey;
using firebase::firestore::local::LevelDbTransaction;
using firebase::firestore::model::BatchId;
using firebase::firestore::model::DocumentKey;
using firebase::firestore::model::TargetId;
using firebase::firestore::testutil::Key;
using firebase::firestore::util::OrderedCode;
Expand Down Expand Up @@ -199,6 +203,111 @@ - (void)testDropsTheQueryCacheWithThousandsOfEntries {
}
}

- (void)testRemovesMutationBatches {
std::string emptyBuffer;
DocumentKey testWriteFoo = DocumentKey::FromPathString("docs/foo");
DocumentKey testWriteBar = DocumentKey::FromPathString("docs/bar");
DocumentKey testWriteBaz = DocumentKey::FromPathString("docs/baz");
DocumentKey testWritePending = DocumentKey::FromPathString("docs/pending");
// Do everything up until the mutation batch migration.
LevelDbMigrations::RunMigrations(_db.get(), 3);
// Set up data
{
LevelDbTransaction transaction(_db.get(), "Setup Foo");
// User 'foo' has two acknowledged mutations and one that is pending.
FSTPBMutationQueue *fooQueue = [[FSTPBMutationQueue alloc] init];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional:

With a minor helper in here I think you could make these use nanopb firestore_client_MutationQueue messages instead. Here would be:

firestore_client_MutationQueue foo_queue{};
foo_queue.last_acknowledged_batch_id = 2;
std::string foo_key = LevelDbMutationQueueKey::Key("foo");
transaction.put(foo_key, Write(firestore_client_MutationQueue_fields, &foo_queue));

The helper would look like:

std::string Write(const pb_field_t fields[], const void* src_struct) {
  std::string result;
  Writer writer = Writer::Wrap(&result);
  writer.WriteNanopbMessage(fields, src_struct);
  return result;
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially tried including the nanopb headers here, but I ran into a build issue of some headers not being found. So, I ended up going w/ the ObjC protos like the rest of the file. Sadly, it looks like we can't port this test yet, as it has a bunch of ObjC dependencies.

fooQueue.lastAcknowledgedBatchId = 2;
std::string fooKey = LevelDbMutationQueueKey::Key("foo");
transaction.Put(fooKey, fooQueue);

FSTPBWriteBatch *fooBatch1 = [[FSTPBWriteBatch alloc] init];
fooBatch1.batchId = 1;
std::string fooBatchKey1 = LevelDbMutationKey::Key("foo", 1);
transaction.Put(fooBatchKey1, fooBatch1);
transaction.Put(LevelDbDocumentMutationKey::Key("foo", testWriteFoo, 1), emptyBuffer);

FSTPBWriteBatch *fooBatch2 = [[FSTPBWriteBatch alloc] init];
fooBatch2.batchId = 2;
std::string fooBatchKey2 = LevelDbMutationKey::Key("foo", 2);
transaction.Put(fooBatchKey2, fooBatch2);
transaction.Put(LevelDbDocumentMutationKey::Key("foo", testWriteFoo, 2), emptyBuffer);

FSTPBWriteBatch *fooBatch3 = [[FSTPBWriteBatch alloc] init];
fooBatch3.batchId = 5;
std::string fooBatchKey3 = LevelDbMutationKey::Key("foo", 5);
transaction.Put(fooBatchKey3, fooBatch3);
transaction.Put(LevelDbDocumentMutationKey::Key("foo", testWritePending, 5), emptyBuffer);

transaction.Commit();
}

{
LevelDbTransaction transaction(_db.get(), "Setup Bar");
// User 'bar' has one acknowledged mutation and one that is pending
FSTPBMutationQueue *barQueue = [[FSTPBMutationQueue alloc] init];
barQueue.lastAcknowledgedBatchId = 3;
std::string barKey = LevelDbMutationQueueKey::Key("bar");
transaction.Put(barKey, barQueue);

FSTPBWriteBatch *barBatch1 = [[FSTPBWriteBatch alloc] init];
barBatch1.batchId = 3;
std::string barBatchKey1 = LevelDbMutationKey::Key("bar", 3);
transaction.Put(barBatchKey1, barBatch1);
transaction.Put(LevelDbDocumentMutationKey::Key("bar", testWriteBar, 3), emptyBuffer);
transaction.Put(LevelDbDocumentMutationKey::Key("bar", testWriteBaz, 3), emptyBuffer);

FSTPBWriteBatch *barBatch2 = [[FSTPBWriteBatch alloc] init];
barBatch2.batchId = 4;
std::string barBatchKey2 = LevelDbMutationKey::Key("bar", 4);
transaction.Put(barBatchKey2, barBatch2);
transaction.Put(LevelDbDocumentMutationKey::Key("bar", testWritePending, 4), emptyBuffer);

transaction.Commit();
}

{
LevelDbTransaction transaction(_db.get(), "Setup Empty");
// User 'empty' has no mutations
FSTPBMutationQueue *emptyQueue = [[FSTPBMutationQueue alloc] init];
emptyQueue.lastAcknowledgedBatchId = -1;
std::string emptyKey = LevelDbMutationQueueKey::Key("empty");
transaction.Put(emptyKey, emptyQueue);

transaction.Commit();
}

LevelDbMigrations::RunMigrations(_db.get(), 4);

{
// Verify
std::string buffer;
LevelDbTransaction transaction(_db.get(), "Verify");
auto it = transaction.NewIterator();
// verify that we deleted the correct batches
XCTAssertTrue(transaction.Get(LevelDbMutationKey::Key("foo", 1), &buffer).IsNotFound());
XCTAssertTrue(transaction.Get(LevelDbMutationKey::Key("foo", 2), &buffer).IsNotFound());
XCTAssertTrue(transaction.Get(LevelDbMutationKey::Key("foo", 5), &buffer).ok());

XCTAssertTrue(transaction.Get(LevelDbMutationKey::Key("bar", 3), &buffer).IsNotFound());
XCTAssertTrue(transaction.Get(LevelDbMutationKey::Key("bar", 4), &buffer).ok());

// verify document associations have been removed
XCTAssertTrue(transaction.Get(LevelDbDocumentMutationKey::Key("foo", testWriteFoo, 1), &buffer)
.IsNotFound());
XCTAssertTrue(transaction.Get(LevelDbDocumentMutationKey::Key("foo", testWriteFoo, 2), &buffer)
.IsNotFound());
XCTAssertTrue(
transaction.Get(LevelDbDocumentMutationKey::Key("foo", testWritePending, 5), &buffer).ok());

XCTAssertTrue(transaction.Get(LevelDbDocumentMutationKey::Key("bar", testWriteBar, 3), &buffer)
.IsNotFound());
XCTAssertTrue(transaction.Get(LevelDbDocumentMutationKey::Key("bar", testWriteBaz, 3), &buffer)
.IsNotFound());
XCTAssertTrue(
transaction.Get(LevelDbDocumentMutationKey::Key("bar", testWritePending, 4), &buffer).ok());
}
}

/**
* Creates the name of a dummy entry to make sure the iteration is correctly bounded.
*/
Expand Down
2 changes: 1 addition & 1 deletion Firestore/core/src/firebase/firestore/local/leveldb_key.cc
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ std::string LevelDbMutationQueueKey::Key(absl::string_view user_id) {
return writer.result();
}

bool LevelDbMutationQueueKey::Decode(leveldb::Slice key) {
bool LevelDbMutationQueueKey::Decode(absl::string_view key) {
Reader reader{key};
reader.ReadTableNameMatching(kMutationQueuesTable);
user_id_ = reader.ReadUserId();
Expand Down
5 changes: 4 additions & 1 deletion Firestore/core/src/firebase/firestore/local/leveldb_key.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class LevelDbMutationKey {
* returned, this instance is in an undefined state until the next call to
* `Decode()`.
*/
ABSL_MUST_USE_RESULT
bool Decode(absl::string_view key);

/** The user that owns the mutation batches. */
Expand Down Expand Up @@ -184,6 +185,7 @@ class LevelDbDocumentMutationKey {
* returned, this instance is in an undefined state until the next call to
* `Decode()`.
*/
ABSL_MUST_USE_RESULT
bool Decode(absl::string_view key);

/** The user that owns the mutation batches. */
Expand Down Expand Up @@ -235,7 +237,8 @@ class LevelDbMutationQueueKey {
* returned, this instance is in an undefined state until the next call to
* `Decode()`.
*/
bool Decode(leveldb::Slice key);
ABSL_MUST_USE_RESULT
bool Decode(absl::string_view key);

const std::string& user_id() const {
return user_id_;
Expand Down
71 changes: 71 additions & 0 deletions Firestore/core/src/firebase/firestore/local/leveldb_migrations.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
#include <string>
#include <utility>

#include "Firestore/Protos/nanopb/firestore/local/mutation.nanopb.h"
#include "Firestore/Protos/nanopb/firestore/local/target.nanopb.h"
#include "Firestore/core/src/firebase/firestore/local/leveldb_key.h"
#include "Firestore/core/src/firebase/firestore/nanopb/reader.h"
#include "Firestore/core/src/firebase/firestore/nanopb/writer.h"
#include "absl/strings/match.h"

Expand All @@ -32,6 +34,7 @@ using leveldb::Iterator;
using leveldb::Slice;
using leveldb::Status;
using leveldb::WriteOptions;
using nanopb::Reader;
using nanopb::Writer;

namespace {
Expand Down Expand Up @@ -110,6 +113,71 @@ void ClearQueryCache(leveldb::DB* db) {
transaction.Commit();
}

void RemoveMutationDocuments(LevelDbTransaction *transaction,
absl::string_view user_id,
int32_t last_acknowledged_batch_id) {
LevelDbDocumentMutationKey doc_key;
std::string prefix = LevelDbDocumentMutationKey::KeyPrefix(user_id);

auto it = transaction->NewIterator();
it->Seek(prefix);
for (; it->Valid() && absl::StartsWith(it->key(), prefix); it->Next()) {
HARD_ASSERT(doc_key.Decode(it->key()),
"Failed to decode document mutation key");
if (doc_key.batch_id() <= last_acknowledged_batch_id) {
transaction->Delete(it->key());
}
}
}

void RemoveMutationBatches(LevelDbTransaction *transaction,
absl::string_view user_id,
int32_t last_acknowledged_batch_id) {
LevelDbMutationKey key;
std::string mutations_key = LevelDbMutationKey::KeyPrefix(user_id);
std::string last_key =
LevelDbMutationKey::Key(user_id, last_acknowledged_batch_id);
auto it = transaction->NewIterator();
it->Seek(mutations_key);
for (; it->Valid() && absl::StartsWith(it->key(), mutations_key) &&
it->key() <= last_key;
it->Next()) {
transaction->Delete(it->key());
}
}

/** Migration 4. */
void RemoveAcknowledgedMutations(leveldb::DB* db) {
LevelDbTransaction transaction(db, "remove acknowledged mutations");
std::string mutation_queue_start = LevelDbMutationQueueKey::KeyPrefix();

LevelDbMutationQueueKey key;

auto it = transaction.NewIterator();
it->Seek(mutation_queue_start);
for (; it->Valid() && absl::StartsWith(it->key(), mutation_queue_start);
it->Next()) {
HARD_ASSERT(key.Decode(it->key()), "Failed to decode mutation queue key");
firestore_client_MutationQueue mutation_queue
firestore_client_MutationQueue_init_zero;
size_t byte_len = it->value().size();
const char* bytes = it->value().data();
// const uint8_t *bytes = static_cast<const uint8_t *>(it->value().data());
Reader reader =
Reader::Wrap(reinterpret_cast<const uint8_t*>(bytes), byte_len);
reader.ReadNanopbMessage(firestore_client_MutationQueue_fields,
&mutation_queue);
HARD_ASSERT(reader.status().ok(), "Failed to deserialize MutationQueue");
RemoveMutationBatches(&transaction, key.user_id(),
mutation_queue.last_acknowledged_batch_id);
RemoveMutationDocuments(&transaction, key.user_id(),
mutation_queue.last_acknowledged_batch_id);
}

SaveVersion(4, &transaction);
transaction.Commit();
}

} // namespace

LevelDbMigrations::SchemaVersion LevelDbMigrations::ReadSchemaVersion(
Expand Down Expand Up @@ -139,6 +207,9 @@ void LevelDbMigrations::RunMigrations(leveldb::DB* db,
if (from_version < 3 && to_version >= 3) {
ClearQueryCache(db);
}
if (from_version < 4 && to_version >= 4) {
RemoveAcknowledgedMutations(db);
}
}

} // namespace local
Expand Down