Skip to content

Commit 334714d

Browse files
Schema migration that drops held write acks
1 parent 9a30621 commit 334714d

File tree

4 files changed

+158
-16
lines changed

4 files changed

+158
-16
lines changed

firebase-firestore/src/main/java/com/google/firebase/firestore/core/Transaction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
package com.google.firebase.firestore.core;
1616

17+
import static com.google.firebase.firestore.util.Assert.fail;
18+
1719
import com.google.android.gms.tasks.Task;
1820
import com.google.android.gms.tasks.Tasks;
1921
import com.google.firebase.firestore.FirebaseFirestoreException;
@@ -41,8 +43,6 @@
4143
import java.util.concurrent.TimeUnit;
4244
import javax.annotation.Nullable;
4345

44-
import static com.google.firebase.firestore.util.Assert.fail;
45-
4646
/**
4747
* Internal transaction object responsible for accumulating the mutations to perform and the base
4848
* versions for any documents read.

firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLitePersistence.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public static String databaseName(String persistenceKey, DatabaseId databaseId)
7979
public SQLitePersistence(
8080
Context context, String persistenceKey, DatabaseId databaseId, LocalSerializer serializer) {
8181
String databaseName = databaseName(persistenceKey, databaseId);
82-
this.opener = new OpenHelper(context, databaseName);
82+
this.opener = new OpenHelper(context, serializer, databaseName);
8383
this.serializer = serializer;
8484
this.queryCache = new SQLiteQueryCache(this, this.serializer);
8585
this.remoteDocumentCache = new SQLiteRemoteDocumentCache(this, this.serializer);
@@ -194,10 +194,12 @@ <T> T runTransaction(String action, Supplier<T> operation) {
194194
*/
195195
private static class OpenHelper extends SQLiteOpenHelper {
196196

197+
private final LocalSerializer serializer;
197198
private boolean configured;
198199

199-
OpenHelper(Context context, String databaseName) {
200+
OpenHelper(Context context, LocalSerializer serializer, String databaseName) {
200201
super(context, databaseName, null, SQLiteSchema.VERSION);
202+
this.serializer = serializer;
201203
}
202204

203205
@Override
@@ -223,13 +225,13 @@ private void ensureConfigured(SQLiteDatabase db) {
223225
@Override
224226
public void onCreate(SQLiteDatabase db) {
225227
ensureConfigured(db);
226-
new SQLiteSchema(db).runMigrations(0);
228+
new SQLiteSchema(db, serializer).runMigrations(0);
227229
}
228230

229231
@Override
230232
public void onUpgrade(SQLiteDatabase db, int oldVersion, int newVersion) {
231233
ensureConfigured(db);
232-
new SQLiteSchema(db).runMigrations(oldVersion);
234+
new SQLiteSchema(db, serializer).runMigrations(oldVersion);
233235
}
234236

235237
@Override
@@ -286,7 +288,7 @@ int execute(SQLiteStatement statement, Object... args) {
286288
* chaining further methods off the query.
287289
*/
288290
Query query(String sql) {
289-
return new Query(sql);
291+
return new Query(db, sql);
290292
}
291293

292294
/**
@@ -326,11 +328,13 @@ Query query(String sql) {
326328
* return result;
327329
* </pre>
328330
*/
329-
class Query {
331+
static class Query {
332+
private final SQLiteDatabase db;
330333
private final String sql;
331334
private CursorFactory cursorFactory;
332335

333-
private Query(String sql) {
336+
Query(SQLiteDatabase db, String sql) {
337+
this.db = db;
334338
this.sql = sql;
335339
}
336340

@@ -464,7 +468,7 @@ private Cursor startQuery() {
464468
* This method bridges the gap by examining the types of the bindArgs and calling to the
465469
* appropriate bind method on the program.
466470
*/
467-
private void bind(SQLiteProgram program, Object[] bindArgs) {
471+
private static void bind(SQLiteProgram program, Object[] bindArgs) {
468472
for (int i = 0; i < bindArgs.length; i++) {
469473
Object arg = bindArgs[i];
470474
if (arg == null) {

firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteSchema.java

Lines changed: 74 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,18 @@
1414

1515
package com.google.firebase.firestore.local;
1616

17+
import static com.google.firebase.firestore.util.Assert.fail;
18+
import static com.google.firebase.firestore.util.Assert.hardAssert;
19+
1720
import android.content.ContentValues;
1821
import android.database.DatabaseUtils;
1922
import android.database.sqlite.SQLiteDatabase;
23+
import android.database.sqlite.SQLiteStatement;
24+
import com.google.common.base.Preconditions;
25+
import com.google.firebase.firestore.model.DocumentKey;
26+
import com.google.firebase.firestore.model.mutation.Mutation;
27+
import com.google.firebase.firestore.model.mutation.MutationBatch;
28+
import com.google.protobuf.InvalidProtocolBufferException;
2029

2130
/**
2231
* Migrates schemas from version 0 (empty) to whatever the current version is.
@@ -34,12 +43,14 @@ class SQLiteSchema {
3443
* The version of the schema. Increase this by one for each migration added to runMigrations
3544
* below.
3645
*/
37-
static final int VERSION = (Persistence.INDEXING_SUPPORT_ENABLED) ? 6 : 5;
46+
static final int VERSION = (Persistence.INDEXING_SUPPORT_ENABLED) ? 7 : 6;
3847

3948
private final SQLiteDatabase db;
49+
private final LocalSerializer serializer;
4050

41-
SQLiteSchema(SQLiteDatabase db) {
51+
SQLiteSchema(SQLiteDatabase db, LocalSerializer serializer) {
4252
this.db = db;
53+
this.serializer = serializer;
4354
}
4455

4556
void runMigrations() {
@@ -90,9 +101,12 @@ void runMigrations(int fromVersion, int toVersion) {
90101
}
91102

92103
if (fromVersion < 6 && toVersion >= 6) {
93-
if (Persistence.INDEXING_SUPPORT_ENABLED) {
94-
createLocalDocumentsCollectionIndex();
95-
}
104+
removeAcknowledgedMutations();
105+
}
106+
107+
if (fromVersion < 7 && toVersion >= 7) {
108+
Preconditions.checkState(Persistence.INDEXING_SUPPORT_ENABLED);
109+
createLocalDocumentsCollectionIndex();
96110
}
97111
}
98112

@@ -122,6 +136,61 @@ private void createMutationQueue() {
122136
+ "PRIMARY KEY (uid, path, batch_id))");
123137
}
124138

139+
private void removeAcknowledgedMutations() {
140+
SQLitePersistence.Query mutationQueuesQuery =
141+
new SQLitePersistence.Query(
142+
db, "SELECT uid, last_acknowledged_batch_id FROM mutation_queues");
143+
144+
mutationQueuesQuery.forEach(
145+
mutationQueueEntry -> {
146+
String uid = mutationQueueEntry.getString(0);
147+
long lastAcknowledgedBatchId = mutationQueueEntry.getLong(1);
148+
149+
SQLitePersistence.Query mutationsQuery =
150+
new SQLitePersistence.Query(
151+
db, "SELECT mutations FROM mutations WHERE uid = ? AND batch_id <= ?")
152+
.binding(uid, lastAcknowledgedBatchId);
153+
mutationsQuery.forEach(
154+
value -> {
155+
try {
156+
MutationBatch batch =
157+
serializer.decodeMutationBatch(
158+
com.google.firebase.firestore.proto.WriteBatch.parseFrom(
159+
value.getBlob(0)));
160+
removeMutationBatch(uid, batch);
161+
} catch (InvalidProtocolBufferException e) {
162+
throw fail("MutationBatch failed to parse: %s", e);
163+
}
164+
});
165+
});
166+
}
167+
168+
private void removeMutationBatch(String uid, MutationBatch batch) {
169+
int batchId = batch.getBatchId();
170+
171+
SQLiteStatement mutationDeleter =
172+
db.compileStatement("DELETE FROM mutations WHERE uid = ? AND batch_id = ?");
173+
mutationDeleter.bindString(1, uid);
174+
mutationDeleter.bindLong(2, batchId);
175+
int deleted = mutationDeleter.executeUpdateDelete();
176+
hardAssert(deleted != 0, "Mutation batch (%s, %d) did not exist", uid, batch.getBatchId());
177+
178+
SQLiteStatement indexDeleter =
179+
db.compileStatement(
180+
"DELETE FROM document_mutations WHERE uid = ? AND path = ? AND batch_id = ?");
181+
182+
for (Mutation mutation : batch.getMutations()) {
183+
DocumentKey key = mutation.getKey();
184+
String path = EncodedPath.encode(key.getPath());
185+
indexDeleter.bindString(1, uid);
186+
indexDeleter.bindString(2, path);
187+
indexDeleter.bindLong(3, batchId);
188+
deleted = indexDeleter.executeUpdateDelete();
189+
hardAssert(
190+
deleted != 0, "Index entry (%s, %s, %d) did not exist", uid, key, batch.getBatchId());
191+
}
192+
}
193+
125194
private void createQueryCache() {
126195
// A cache of targets and associated metadata
127196
db.execSQL(

firebase-firestore/src/test/java/com/google/firebase/firestore/local/SQLiteSchemaTest.java

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@
2222
import android.database.sqlite.SQLiteDatabase;
2323
import android.database.sqlite.SQLiteOpenHelper;
2424
import com.google.firebase.firestore.model.DatabaseId;
25+
import com.google.firebase.firestore.model.ResourcePath;
26+
import com.google.firebase.firestore.proto.WriteBatch;
27+
import com.google.firebase.firestore.remote.RemoteSerializer;
28+
import com.google.firestore.v1beta1.Document;
29+
import com.google.firestore.v1beta1.Write;
2530
import org.junit.After;
2631
import org.junit.Before;
2732
import org.junit.Test;
@@ -51,7 +56,9 @@ public void onCreate(SQLiteDatabase db) {}
5156
public void onUpgrade(SQLiteDatabase db, int oldVersion, int newVersion) {}
5257
};
5358
db = opener.getWritableDatabase();
54-
schema = new SQLiteSchema(db);
59+
schema =
60+
new SQLiteSchema(
61+
db, new LocalSerializer(new RemoteSerializer(DatabaseId.forProject("projectId"))));
5562
}
5663

5764
@After
@@ -123,6 +130,68 @@ public void testDatabaseName() {
123130
"[DEFAULT]", DatabaseId.forDatabase("my-project", "my-database")));
124131
}
125132

133+
@Test
134+
public void dropsHeldWriteAcks() {
135+
// This test creates a database with schema version 5 that has two users, both of which have
136+
// acknowledged mutations that haven't yet been removed from IndexedDb ("heldWriteAcks").
137+
// Schema version 6 removes heldWriteAcks, and as such these mutations are deleted.
138+
schema.runMigrations(0, 5);
139+
140+
// User 'userA' has two acknowledged mutations and one that is pending.
141+
// User 'userB' has one acknowledged mutation and one that is pending.
142+
addMutationBatch(db, 1, "userA", "docs/foo");
143+
addMutationBatch(db, 2, "userA", "docs/foo");
144+
addMutationBatch(db, 3, "userB", "docs/bar", "doc/baz");
145+
addMutationBatch(db, 4, "userB", "docs/pending");
146+
addMutationBatch(db, 5, "userA", "docs/pending");
147+
148+
// Populate the mutation queues' metadata
149+
db.execSQL(
150+
"INSERT INTO mutation_queues (uid, last_acknowledged_batch_id) VALUES (?, ?)",
151+
new Object[] {"userA", 2});
152+
db.execSQL(
153+
"INSERT INTO mutation_queues (uid, last_acknowledged_batch_id) VALUES (?, ?)",
154+
new Object[] {"userB", 3});
155+
db.execSQL(
156+
"INSERT INTO mutation_queues (uid, last_acknowledged_batch_id) VALUES (?, ?)",
157+
new Object[] {"userC", -1});
158+
159+
schema.runMigrations(5, 6);
160+
161+
// Verify that all but the two pending mutations have been cleared by the migration.
162+
new SQLitePersistence.Query(db, "SELECT COUNT(*) FROM mutations")
163+
.first(value -> assertEquals(2, value.getInt(0)));
164+
165+
// Verify that we still have two index entries for the pending documents
166+
new SQLitePersistence.Query(db, "SELECT COUNT(*) FROM document_mutations")
167+
.first(value -> assertEquals(2, value.getInt(0)));
168+
169+
// Verify that we still have one metadata entry for each existing queue
170+
new SQLitePersistence.Query(db, "SELECT COUNT(*) FROM mutation_queues")
171+
.first(value -> assertEquals(3, value.getInt(0)));
172+
}
173+
174+
private void addMutationBatch(SQLiteDatabase db, int batchId, String uid, String... docs) {
175+
WriteBatch.Builder write = WriteBatch.newBuilder();
176+
write.setBatchId(batchId);
177+
178+
for (String doc : docs) {
179+
db.execSQL(
180+
"INSERT INTO document_mutations (uid, path, batch_id) VALUES (?, ?, ?)",
181+
new Object[] {uid, EncodedPath.encode(ResourcePath.fromString(doc)), batchId});
182+
183+
write.addWrites(
184+
Write.newBuilder()
185+
.setUpdate(
186+
Document.newBuilder()
187+
.setName("projects/projectId/databases/(default)/documents/" + doc)));
188+
}
189+
190+
db.execSQL(
191+
"INSERT INTO mutations (uid, batch_id, mutations) VALUES (?,?,?)",
192+
new Object[] {uid, batchId, write.build().toByteArray()});
193+
}
194+
126195
private void assertNoResultsForQuery(String query, String[] args) {
127196
Cursor cursor = null;
128197
try {

0 commit comments

Comments
 (0)