Skip to content

Commit 7709793

Browse files
author
Greg Soltis
authored
Drop acknowledged mutations in schema migration (#1818)
Drop acknowledged mutations in schema migration as part of removing held write acks. Port of firebase/firebase-js-sdk#1149
1 parent f151178 commit 7709793

File tree

7 files changed

+208
-3
lines changed

7 files changed

+208
-3
lines changed

Firestore/Example/Tests/Local/FSTLevelDBMigrationsTests.mm

+109
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@
2020
#include <string>
2121
#include <vector>
2222

23+
#import "Firestore/Protos/objc/firestore/local/Mutation.pbobjc.h"
2324
#import "Firestore/Protos/objc/firestore/local/Target.pbobjc.h"
2425
#import "Firestore/Source/Local/FSTLevelDB.h"
2526
#import "Firestore/Source/Local/FSTLevelDBMutationQueue.h"
2627
#import "Firestore/Source/Local/FSTLevelDBQueryCache.h"
2728

2829
#include "Firestore/core/src/firebase/firestore/local/leveldb_key.h"
2930
#include "Firestore/core/src/firebase/firestore/local/leveldb_migrations.h"
31+
#include "Firestore/core/src/firebase/firestore/model/document_key.h"
3032
#include "Firestore/core/src/firebase/firestore/util/ordered_code.h"
3133
#include "Firestore/core/src/firebase/firestore/util/status.h"
3234
#include "Firestore/core/test/firebase/firestore/testutil/testutil.h"
@@ -38,6 +40,7 @@
3840
NS_ASSUME_NONNULL_BEGIN
3941

4042
using firebase::firestore::FirestoreErrorCode;
43+
using firebase::firestore::local::LevelDbDocumentMutationKey;
4144
using firebase::firestore::local::LevelDbDocumentTargetKey;
4245
using firebase::firestore::local::LevelDbMigrations;
4346
using firebase::firestore::local::LevelDbMutationKey;
@@ -47,6 +50,7 @@
4750
using firebase::firestore::local::LevelDbTargetKey;
4851
using firebase::firestore::local::LevelDbTransaction;
4952
using firebase::firestore::model::BatchId;
53+
using firebase::firestore::model::DocumentKey;
5054
using firebase::firestore::model::TargetId;
5155
using firebase::firestore::testutil::Key;
5256
using firebase::firestore::util::OrderedCode;
@@ -199,6 +203,111 @@ - (void)testDropsTheQueryCacheWithThousandsOfEntries {
199203
}
200204
}
201205

206+
- (void)testRemovesMutationBatches {
207+
std::string emptyBuffer;
208+
DocumentKey testWriteFoo = DocumentKey::FromPathString("docs/foo");
209+
DocumentKey testWriteBar = DocumentKey::FromPathString("docs/bar");
210+
DocumentKey testWriteBaz = DocumentKey::FromPathString("docs/baz");
211+
DocumentKey testWritePending = DocumentKey::FromPathString("docs/pending");
212+
// Do everything up until the mutation batch migration.
213+
LevelDbMigrations::RunMigrations(_db.get(), 3);
214+
// Set up data
215+
{
216+
LevelDbTransaction transaction(_db.get(), "Setup Foo");
217+
// User 'foo' has two acknowledged mutations and one that is pending.
218+
FSTPBMutationQueue *fooQueue = [[FSTPBMutationQueue alloc] init];
219+
fooQueue.lastAcknowledgedBatchId = 2;
220+
std::string fooKey = LevelDbMutationQueueKey::Key("foo");
221+
transaction.Put(fooKey, fooQueue);
222+
223+
FSTPBWriteBatch *fooBatch1 = [[FSTPBWriteBatch alloc] init];
224+
fooBatch1.batchId = 1;
225+
std::string fooBatchKey1 = LevelDbMutationKey::Key("foo", 1);
226+
transaction.Put(fooBatchKey1, fooBatch1);
227+
transaction.Put(LevelDbDocumentMutationKey::Key("foo", testWriteFoo, 1), emptyBuffer);
228+
229+
FSTPBWriteBatch *fooBatch2 = [[FSTPBWriteBatch alloc] init];
230+
fooBatch2.batchId = 2;
231+
std::string fooBatchKey2 = LevelDbMutationKey::Key("foo", 2);
232+
transaction.Put(fooBatchKey2, fooBatch2);
233+
transaction.Put(LevelDbDocumentMutationKey::Key("foo", testWriteFoo, 2), emptyBuffer);
234+
235+
FSTPBWriteBatch *fooBatch3 = [[FSTPBWriteBatch alloc] init];
236+
fooBatch3.batchId = 5;
237+
std::string fooBatchKey3 = LevelDbMutationKey::Key("foo", 5);
238+
transaction.Put(fooBatchKey3, fooBatch3);
239+
transaction.Put(LevelDbDocumentMutationKey::Key("foo", testWritePending, 5), emptyBuffer);
240+
241+
transaction.Commit();
242+
}
243+
244+
{
245+
LevelDbTransaction transaction(_db.get(), "Setup Bar");
246+
// User 'bar' has one acknowledged mutation and one that is pending
247+
FSTPBMutationQueue *barQueue = [[FSTPBMutationQueue alloc] init];
248+
barQueue.lastAcknowledgedBatchId = 3;
249+
std::string barKey = LevelDbMutationQueueKey::Key("bar");
250+
transaction.Put(barKey, barQueue);
251+
252+
FSTPBWriteBatch *barBatch1 = [[FSTPBWriteBatch alloc] init];
253+
barBatch1.batchId = 3;
254+
std::string barBatchKey1 = LevelDbMutationKey::Key("bar", 3);
255+
transaction.Put(barBatchKey1, barBatch1);
256+
transaction.Put(LevelDbDocumentMutationKey::Key("bar", testWriteBar, 3), emptyBuffer);
257+
transaction.Put(LevelDbDocumentMutationKey::Key("bar", testWriteBaz, 3), emptyBuffer);
258+
259+
FSTPBWriteBatch *barBatch2 = [[FSTPBWriteBatch alloc] init];
260+
barBatch2.batchId = 4;
261+
std::string barBatchKey2 = LevelDbMutationKey::Key("bar", 4);
262+
transaction.Put(barBatchKey2, barBatch2);
263+
transaction.Put(LevelDbDocumentMutationKey::Key("bar", testWritePending, 4), emptyBuffer);
264+
265+
transaction.Commit();
266+
}
267+
268+
{
269+
LevelDbTransaction transaction(_db.get(), "Setup Empty");
270+
// User 'empty' has no mutations
271+
FSTPBMutationQueue *emptyQueue = [[FSTPBMutationQueue alloc] init];
272+
emptyQueue.lastAcknowledgedBatchId = -1;
273+
std::string emptyKey = LevelDbMutationQueueKey::Key("empty");
274+
transaction.Put(emptyKey, emptyQueue);
275+
276+
transaction.Commit();
277+
}
278+
279+
LevelDbMigrations::RunMigrations(_db.get(), 4);
280+
281+
{
282+
// Verify
283+
std::string buffer;
284+
LevelDbTransaction transaction(_db.get(), "Verify");
285+
auto it = transaction.NewIterator();
286+
// verify that we deleted the correct batches
287+
XCTAssertTrue(transaction.Get(LevelDbMutationKey::Key("foo", 1), &buffer).IsNotFound());
288+
XCTAssertTrue(transaction.Get(LevelDbMutationKey::Key("foo", 2), &buffer).IsNotFound());
289+
XCTAssertTrue(transaction.Get(LevelDbMutationKey::Key("foo", 5), &buffer).ok());
290+
291+
XCTAssertTrue(transaction.Get(LevelDbMutationKey::Key("bar", 3), &buffer).IsNotFound());
292+
XCTAssertTrue(transaction.Get(LevelDbMutationKey::Key("bar", 4), &buffer).ok());
293+
294+
// verify document associations have been removed
295+
XCTAssertTrue(transaction.Get(LevelDbDocumentMutationKey::Key("foo", testWriteFoo, 1), &buffer)
296+
.IsNotFound());
297+
XCTAssertTrue(transaction.Get(LevelDbDocumentMutationKey::Key("foo", testWriteFoo, 2), &buffer)
298+
.IsNotFound());
299+
XCTAssertTrue(
300+
transaction.Get(LevelDbDocumentMutationKey::Key("foo", testWritePending, 5), &buffer).ok());
301+
302+
XCTAssertTrue(transaction.Get(LevelDbDocumentMutationKey::Key("bar", testWriteBar, 3), &buffer)
303+
.IsNotFound());
304+
XCTAssertTrue(transaction.Get(LevelDbDocumentMutationKey::Key("bar", testWriteBaz, 3), &buffer)
305+
.IsNotFound());
306+
XCTAssertTrue(
307+
transaction.Get(LevelDbDocumentMutationKey::Key("bar", testWritePending, 4), &buffer).ok());
308+
}
309+
}
310+
202311
/**
203312
* Creates the name of a dummy entry to make sure the iteration is correctly bounded.
204313
*/

Firestore/Source/Local/FSTLevelDBQueryCache.mm

+1-1
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ - (void)enumerateOrphanedDocumentsUsingBlock:
193193
LevelDbDocumentTargetKey key;
194194
BOOL stop = NO;
195195
for (; !stop && it->Valid() && absl::StartsWith(it->key(), documentTargetPrefix); it->Next()) {
196-
key.Decode(it->key());
196+
HARD_ASSERT(key.Decode(it->key()), "Failed to decode DocumentTarget key");
197197
if (key.IsSentinel()) {
198198
// if nextToReport is non-zero, report it, this is a new key so the last one
199199
// must be not be a member of any targets.

Firestore/core/src/firebase/firestore/local/leveldb_key.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -656,7 +656,7 @@ std::string LevelDbMutationQueueKey::Key(absl::string_view user_id) {
656656
return writer.result();
657657
}
658658

659-
bool LevelDbMutationQueueKey::Decode(leveldb::Slice key) {
659+
bool LevelDbMutationQueueKey::Decode(absl::string_view key) {
660660
Reader reader{key};
661661
reader.ReadTableNameMatching(kMutationQueuesTable);
662662
user_id_ = reader.ReadUserId();

Firestore/core/src/firebase/firestore/local/leveldb_key.h

+10-1
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ class LevelDbMutationKey {
121121
* returned, this instance is in an undefined state until the next call to
122122
* `Decode()`.
123123
*/
124+
ABSL_MUST_USE_RESULT
124125
bool Decode(absl::string_view key);
125126

126127
/** The user that owns the mutation batches. */
@@ -184,6 +185,7 @@ class LevelDbDocumentMutationKey {
184185
* returned, this instance is in an undefined state until the next call to
185186
* `Decode()`.
186187
*/
188+
ABSL_MUST_USE_RESULT
187189
bool Decode(absl::string_view key);
188190

189191
/** The user that owns the mutation batches. */
@@ -235,7 +237,8 @@ class LevelDbMutationQueueKey {
235237
* returned, this instance is in an undefined state until the next call to
236238
* `Decode()`.
237239
*/
238-
bool Decode(leveldb::Slice key);
240+
ABSL_MUST_USE_RESULT
241+
bool Decode(absl::string_view key);
239242

240243
const std::string& user_id() const {
241244
return user_id_;
@@ -259,6 +262,7 @@ class LevelDbTargetGlobalKey {
259262
* Decodes the contents of a target global key, essentially just verifying
260263
* that the key has the correct table name.
261264
*/
265+
ABSL_MUST_USE_RESULT
262266
bool Decode(leveldb::Slice key);
263267
};
264268

@@ -281,6 +285,7 @@ class LevelDbTargetKey {
281285
* returned, this instance is in an undefined state until the next call to
282286
* `Decode()`.
283287
*/
288+
ABSL_MUST_USE_RESULT
284289
bool Decode(leveldb::Slice key);
285290

286291
model::TargetId target_id() {
@@ -322,6 +327,7 @@ class LevelDbQueryTargetKey {
322327
* returned, this instance is in an undefined state until the next call to
323328
* `Decode()`.
324329
*/
330+
ABSL_MUST_USE_RESULT
325331
bool Decode(absl::string_view key);
326332

327333
/** The canonical_id derived from the query. */
@@ -370,6 +376,7 @@ class LevelDbTargetDocumentKey {
370376
* returned, this instance is in an undefined state until the next call to
371377
* `Decode()`.
372378
*/
379+
ABSL_MUST_USE_RESULT
373380
bool Decode(absl::string_view key);
374381

375382
/** The target_id identifying a target. */
@@ -424,6 +431,7 @@ class LevelDbDocumentTargetKey {
424431
* returned, this instance is in an undefined state until the next call to
425432
* `Decode()`.
426433
*/
434+
ABSL_MUST_USE_RESULT
427435
bool Decode(absl::string_view key);
428436

429437
/** The target_id identifying a target. */
@@ -487,6 +495,7 @@ class LevelDbRemoteDocumentKey {
487495
* returned, this instance is in an undefined state until the next call to
488496
* `Decode()`.
489497
*/
498+
ABSL_MUST_USE_RESULT
490499
bool Decode(absl::string_view key);
491500

492501
/** The path to the document, as encoded in the key. */

Firestore/core/src/firebase/firestore/local/leveldb_migrations.cc

+72
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
#include <string>
2020
#include <utility>
2121

22+
#include "Firestore/Protos/nanopb/firestore/local/mutation.nanopb.h"
2223
#include "Firestore/Protos/nanopb/firestore/local/target.nanopb.h"
2324
#include "Firestore/core/src/firebase/firestore/local/leveldb_key.h"
25+
#include "Firestore/core/src/firebase/firestore/nanopb/reader.h"
2426
#include "Firestore/core/src/firebase/firestore/nanopb/writer.h"
2527
#include "absl/strings/match.h"
2628

@@ -32,6 +34,7 @@ using leveldb::Iterator;
3234
using leveldb::Slice;
3335
using leveldb::Status;
3436
using leveldb::WriteOptions;
37+
using nanopb::Reader;
3538
using nanopb::Writer;
3639

3740
namespace {
@@ -110,6 +113,72 @@ void ClearQueryCache(leveldb::DB* db) {
110113
transaction.Commit();
111114
}
112115

116+
/**
117+
* Removes document associations for the given user's mutation queue for
118+
* any mutation with a `batch_id` less than or equal to
119+
* `last_acknowledged_batch_id`.
120+
*/
121+
void RemoveMutationDocuments(LevelDbTransaction* transaction,
122+
absl::string_view user_id,
123+
int32_t last_acknowledged_batch_id) {
124+
LevelDbDocumentMutationKey doc_key;
125+
std::string prefix = LevelDbDocumentMutationKey::KeyPrefix(user_id);
126+
127+
auto it = transaction->NewIterator();
128+
it->Seek(prefix);
129+
for (; it->Valid() && absl::StartsWith(it->key(), prefix); it->Next()) {
130+
HARD_ASSERT(doc_key.Decode(it->key()),
131+
"Failed to decode document mutation key");
132+
if (doc_key.batch_id() <= last_acknowledged_batch_id) {
133+
transaction->Delete(it->key());
134+
}
135+
}
136+
}
137+
138+
/**
139+
* Removes mutation batches for the given user with a `batch_id` less than
140+
* or equal to `last_acknowledged_batch_id`
141+
*/
142+
void RemoveMutationBatches(LevelDbTransaction* transaction,
143+
absl::string_view user_id,
144+
int32_t last_acknowledged_batch_id) {
145+
std::string mutations_key = LevelDbMutationKey::KeyPrefix(user_id);
146+
std::string last_key =
147+
LevelDbMutationKey::Key(user_id, last_acknowledged_batch_id);
148+
auto it = transaction->NewIterator();
149+
it->Seek(mutations_key);
150+
for (; it->Valid() && it->key() <= last_key; it->Next()) {
151+
transaction->Delete(it->key());
152+
}
153+
}
154+
155+
/** Migration 4. */
156+
void RemoveAcknowledgedMutations(leveldb::DB* db) {
157+
LevelDbTransaction transaction(db, "remove acknowledged mutations");
158+
std::string mutation_queue_start = LevelDbMutationQueueKey::KeyPrefix();
159+
160+
LevelDbMutationQueueKey key;
161+
162+
auto it = transaction.NewIterator();
163+
it->Seek(mutation_queue_start);
164+
for (; it->Valid() && absl::StartsWith(it->key(), mutation_queue_start);
165+
it->Next()) {
166+
HARD_ASSERT(key.Decode(it->key()), "Failed to decode mutation queue key");
167+
firestore_client_MutationQueue mutation_queue{};
168+
Reader reader = Reader::Wrap(it->value());
169+
reader.ReadNanopbMessage(firestore_client_MutationQueue_fields,
170+
&mutation_queue);
171+
HARD_ASSERT(reader.status().ok(), "Failed to deserialize MutationQueue");
172+
RemoveMutationBatches(&transaction, key.user_id(),
173+
mutation_queue.last_acknowledged_batch_id);
174+
RemoveMutationDocuments(&transaction, key.user_id(),
175+
mutation_queue.last_acknowledged_batch_id);
176+
}
177+
178+
SaveVersion(4, &transaction);
179+
transaction.Commit();
180+
}
181+
113182
} // namespace
114183

115184
LevelDbMigrations::SchemaVersion LevelDbMigrations::ReadSchemaVersion(
@@ -139,6 +208,9 @@ void LevelDbMigrations::RunMigrations(leveldb::DB* db,
139208
if (from_version < 3 && to_version >= 3) {
140209
ClearQueryCache(db);
141210
}
211+
if (from_version < 4 && to_version >= 4) {
212+
RemoveAcknowledgedMutations(db);
213+
}
142214
}
143215

144216
} // namespace local

Firestore/core/src/firebase/firestore/nanopb/reader.cc

+6
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ Reader Reader::Wrap(const uint8_t* bytes, size_t length) {
3030
return Reader{pb_istream_from_buffer(bytes, length)};
3131
}
3232

33+
Reader Reader::Wrap(absl::string_view string_view) {
34+
return Reader{pb_istream_from_buffer(
35+
reinterpret_cast<const uint8_t*>(string_view.data()),
36+
string_view.size())};
37+
}
38+
3339
uint32_t Reader::ReadTag() {
3440
Tag tag;
3541
if (!status_.ok()) return 0;

Firestore/core/src/firebase/firestore/nanopb/reader.h

+9
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,15 @@ class Reader {
5959
*/
6060
static Reader Wrap(const uint8_t* bytes, size_t length);
6161

62+
/**
63+
* Creates an input stream from bytes backing the string_view. Note that
64+
* the backing buffer must remain valid for the lifetime of this Reader.
65+
*
66+
* (This is roughly equivalent to the nanopb function
67+
* pb_istream_from_buffer())
68+
*/
69+
static Reader Wrap(absl::string_view);
70+
6271
/**
6372
* Reads a message type from the input stream.
6473
*

0 commit comments

Comments
 (0)