Skip to content

Commit 01cd87e

Browse files
Schema migration that drops held write acks
1 parent 9a30621 commit 01cd87e

File tree

4 files changed

+225
-16
lines changed

4 files changed

+225
-16
lines changed

firebase-firestore/src/main/java/com/google/firebase/firestore/core/Transaction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
package com.google.firebase.firestore.core;
1616

17+
import static com.google.firebase.firestore.util.Assert.fail;
18+
1719
import com.google.android.gms.tasks.Task;
1820
import com.google.android.gms.tasks.Tasks;
1921
import com.google.firebase.firestore.FirebaseFirestoreException;
@@ -41,8 +43,6 @@
4143
import java.util.concurrent.TimeUnit;
4244
import javax.annotation.Nullable;
4345

44-
import static com.google.firebase.firestore.util.Assert.fail;
45-
4646
/**
4747
* Internal transaction object responsible for accumulating the mutations to perform and the base
4848
* versions for any documents read.

firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLitePersistence.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public static String databaseName(String persistenceKey, DatabaseId databaseId)
7979
public SQLitePersistence(
8080
Context context, String persistenceKey, DatabaseId databaseId, LocalSerializer serializer) {
8181
String databaseName = databaseName(persistenceKey, databaseId);
82-
this.opener = new OpenHelper(context, databaseName);
82+
this.opener = new OpenHelper(context, serializer, databaseName);
8383
this.serializer = serializer;
8484
this.queryCache = new SQLiteQueryCache(this, this.serializer);
8585
this.remoteDocumentCache = new SQLiteRemoteDocumentCache(this, this.serializer);
@@ -195,9 +195,11 @@ <T> T runTransaction(String action, Supplier<T> operation) {
195195
private static class OpenHelper extends SQLiteOpenHelper {
196196

197197
private boolean configured;
198+
private final LocalSerializer serializer;
198199

199-
OpenHelper(Context context, String databaseName) {
200+
OpenHelper(Context context, LocalSerializer serializer, String databaseName) {
200201
super(context, databaseName, null, SQLiteSchema.VERSION);
202+
this.serializer = serializer;
201203
}
202204

203205
@Override
@@ -223,13 +225,13 @@ private void ensureConfigured(SQLiteDatabase db) {
223225
@Override
224226
public void onCreate(SQLiteDatabase db) {
225227
ensureConfigured(db);
226-
new SQLiteSchema(db).runMigrations(0);
228+
new SQLiteSchema(db, serializer).runMigrations(0);
227229
}
228230

229231
@Override
230232
public void onUpgrade(SQLiteDatabase db, int oldVersion, int newVersion) {
231233
ensureConfigured(db);
232-
new SQLiteSchema(db).runMigrations(oldVersion);
234+
new SQLiteSchema(db, serializer).runMigrations(oldVersion);
233235
}
234236

235237
@Override
@@ -286,7 +288,7 @@ int execute(SQLiteStatement statement, Object... args) {
286288
* chaining further methods off the query.
287289
*/
288290
Query query(String sql) {
289-
return new Query(sql);
291+
return new Query(db, sql);
290292
}
291293

292294
/**
@@ -326,11 +328,13 @@ Query query(String sql) {
326328
* return result;
327329
* </pre>
328330
*/
329-
class Query {
331+
static class Query {
332+
private final SQLiteDatabase db;
330333
private final String sql;
331334
private CursorFactory cursorFactory;
332335

333-
private Query(String sql) {
336+
Query(SQLiteDatabase db, String sql) {
337+
this.db = db;
334338
this.sql = sql;
335339
}
336340

@@ -464,7 +468,7 @@ private Cursor startQuery() {
464468
* This method bridges the gap by examining the types of the bindArgs and calling to the
465469
* appropriate bind method on the program.
466470
*/
467-
private void bind(SQLiteProgram program, Object[] bindArgs) {
471+
private static void bind(SQLiteProgram program, Object[] bindArgs) {
468472
for (int i = 0; i < bindArgs.length; i++) {
469473
Object arg = bindArgs[i];
470474
if (arg == null) {

firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteSchema.java

Lines changed: 79 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,18 @@
1414

1515
package com.google.firebase.firestore.local;
1616

17+
import static com.google.firebase.firestore.util.Assert.fail;
18+
import static com.google.firebase.firestore.util.Assert.hardAssert;
19+
1720
import android.content.ContentValues;
1821
import android.database.DatabaseUtils;
1922
import android.database.sqlite.SQLiteDatabase;
23+
import android.database.sqlite.SQLiteStatement;
24+
import com.google.common.base.Preconditions;
25+
import com.google.firebase.firestore.model.DocumentKey;
26+
import com.google.firebase.firestore.model.mutation.Mutation;
27+
import com.google.firebase.firestore.model.mutation.MutationBatch;
28+
import com.google.protobuf.InvalidProtocolBufferException;
2029

2130
/**
2231
* Migrates schemas from version 0 (empty) to whatever the current version is.
@@ -34,12 +43,14 @@ class SQLiteSchema {
3443
* The version of the schema. Increase this by one for each migration added to runMigrations
3544
* below.
3645
*/
37-
static final int VERSION = (Persistence.INDEXING_SUPPORT_ENABLED) ? 6 : 5;
46+
static final int VERSION = (Persistence.INDEXING_SUPPORT_ENABLED) ? 7 : 6;
3847

3948
private final SQLiteDatabase db;
49+
private final LocalSerializer serializer;
4050

41-
SQLiteSchema(SQLiteDatabase db) {
51+
SQLiteSchema(SQLiteDatabase db, LocalSerializer serializer) {
4252
this.db = db;
53+
this.serializer = serializer;
4354
}
4455

4556
void runMigrations() {
@@ -90,9 +101,12 @@ void runMigrations(int fromVersion, int toVersion) {
90101
}
91102

92103
if (fromVersion < 6 && toVersion >= 6) {
93-
if (Persistence.INDEXING_SUPPORT_ENABLED) {
94-
createLocalDocumentsCollectionIndex();
95-
}
104+
removeAcknowledgedMutations();
105+
}
106+
107+
if (fromVersion < 7 && toVersion >= 7) {
108+
Preconditions.checkState(Persistence.INDEXING_SUPPORT_ENABLED);
109+
createLocalDocumentsCollectionIndex();
96110
}
97111
}
98112

@@ -122,6 +136,66 @@ private void createMutationQueue() {
122136
+ "PRIMARY KEY (uid, path, batch_id))");
123137
}
124138

139+
private void removeAcknowledgedMutations() {
140+
SQLitePersistence.Query mutationQueuesQuery =
141+
new SQLitePersistence.Query(
142+
db, "SELECT uid, last_acknowledged_batch_id FROM mutation_queues");
143+
144+
mutationQueuesQuery.forEach(
145+
mutationQueueEntry -> {
146+
String uid = mutationQueueEntry.getString(0);
147+
long lastAcknowledgedBatchId = mutationQueueEntry.getLong(1);
148+
SQLitePersistence.Query mutationsQuery =
149+
new SQLitePersistence.Query(
150+
db, "SELECT mutations FROM mutations WHERE uid = ? AND batch_id <= ?")
151+
.binding(uid, lastAcknowledgedBatchId);
152+
153+
mutationsQuery.forEach(
154+
value -> {
155+
try {
156+
MutationBatch batch =
157+
serializer.decodeMutationBatch(
158+
com.google.firebase.firestore.proto.WriteBatch.parseFrom(
159+
value.getBlob(0)));
160+
161+
int batchId = batch.getBatchId();
162+
163+
SQLiteStatement mutationDeleter =
164+
db.compileStatement("DELETE FROM mutations WHERE uid = ? AND batch_id = ?");
165+
mutationDeleter.bindString(1, uid);
166+
mutationDeleter.bindLong(2, batchId);
167+
int deleted = mutationDeleter.executeUpdateDelete();
168+
hardAssert(
169+
deleted != 0,
170+
"Mutation batch (%s, %d) did not exist",
171+
uid,
172+
batch.getBatchId());
173+
174+
SQLiteStatement indexDeleter =
175+
db.compileStatement(
176+
"DELETE FROM document_mutations WHERE uid = ? AND path = ? AND batch_id = ?");
177+
178+
for (Mutation mutation : batch.getMutations()) {
179+
DocumentKey key = mutation.getKey();
180+
String path = EncodedPath.encode(key.getPath());
181+
indexDeleter.bindString(1, uid);
182+
indexDeleter.bindString(2, path);
183+
indexDeleter.bindLong(3, batchId);
184+
deleted = indexDeleter.executeUpdateDelete();
185+
hardAssert(
186+
deleted != 0,
187+
"Index entry (%s, %s, %d) did not exist",
188+
uid,
189+
key,
190+
batch.getBatchId());
191+
}
192+
} catch (InvalidProtocolBufferException e) {
193+
throw fail("MutationBatch failed to parse: %s", e);
194+
}
195+
});
196+
});
197+
}
198+
125199
private void createQueryCache() {
126200
// A cache of targets and associated metadata
127201
db.execSQL(

firebase-firestore/src/test/java/com/google/firebase/firestore/local/SQLiteSchemaTest.java

Lines changed: 132 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@
2222
import android.database.sqlite.SQLiteDatabase;
2323
import android.database.sqlite.SQLiteOpenHelper;
2424
import com.google.firebase.firestore.model.DatabaseId;
25+
import com.google.firebase.firestore.model.ResourcePath;
26+
import com.google.firebase.firestore.proto.WriteBatch;
27+
import com.google.firebase.firestore.remote.RemoteSerializer;
28+
import com.google.firestore.v1beta1.Document;
29+
import com.google.firestore.v1beta1.Write;
2530
import org.junit.After;
2631
import org.junit.Before;
2732
import org.junit.Test;
@@ -51,7 +56,9 @@ public void onCreate(SQLiteDatabase db) {}
5156
public void onUpgrade(SQLiteDatabase db, int oldVersion, int newVersion) {}
5257
};
5358
db = opener.getWritableDatabase();
54-
schema = new SQLiteSchema(db);
59+
schema =
60+
new SQLiteSchema(
61+
db, new LocalSerializer(new RemoteSerializer(DatabaseId.forProject("projectId"))));
5562
}
5663

5764
@After
@@ -123,6 +130,130 @@ public void testDatabaseName() {
123130
"[DEFAULT]", DatabaseId.forDatabase("my-project", "my-database")));
124131
}
125132

133+
@Test
134+
public void dropsHeldWriteAcks() {
135+
// This test creates a database with schema version 4 that has two users,
136+
// both of which have acknowledged mutations that haven't yet been removed
137+
// from IndexedDb ("heldWriteAcks"). Schema version 5 removes heldWriteAcks,
138+
// and as such these mutations are deleted.
139+
schema.runMigrations(0, 5);
140+
141+
Write testWriteFoo =
142+
Write.newBuilder()
143+
.setUpdate(
144+
Document.newBuilder()
145+
.setName("projects/projectId/databases/(default)/documents/docs/foo"))
146+
.build();
147+
Write testWriteBar =
148+
Write.newBuilder()
149+
.setUpdate(
150+
Document.newBuilder()
151+
.setName("projects/projectId/databases/(default)/documents/docs/bar"))
152+
.build();
153+
Write testWriteBaz =
154+
Write.newBuilder()
155+
.setUpdate(
156+
Document.newBuilder()
157+
.setName("projects/projectId/databases/(default)/documents/docs/baz"))
158+
.build();
159+
Write testWritePending =
160+
Write.newBuilder()
161+
.setUpdate(
162+
Document.newBuilder()
163+
.setName("projects/projectId/databases/(default)/documents/docs/pending"))
164+
.build();
165+
166+
// User 'foo' has two acknowledged mutations and one that is pending.
167+
db.execSQL(
168+
"INSERT INTO mutations (uid, batch_id, mutations) VALUES (?,?,?)",
169+
new Object[] {
170+
"userA",
171+
1,
172+
WriteBatch.newBuilder().setBatchId(1).addWrites(testWriteFoo).build().toByteArray()
173+
});
174+
db.execSQL(
175+
"INSERT INTO mutations (uid, batch_id, mutations) VALUES (?,?,?)",
176+
new Object[] {
177+
"userA",
178+
2,
179+
WriteBatch.newBuilder().setBatchId(2).addWrites(testWriteFoo).build().toByteArray()
180+
});
181+
db.execSQL(
182+
"INSERT INTO mutations (uid, batch_id, mutations) VALUES (?,?,?)",
183+
new Object[] {
184+
"userA",
185+
5,
186+
WriteBatch.newBuilder().setBatchId(5).addWrites(testWritePending).build().toByteArray()
187+
});
188+
189+
// User 'bar' has one acknowledged mutation and one that is pending.
190+
db.execSQL(
191+
"INSERT INTO mutations (uid, batch_id, mutations) VALUES (?,?,?)",
192+
new Object[] {
193+
"userB",
194+
3,
195+
WriteBatch.newBuilder()
196+
.setBatchId(3)
197+
.addWrites(testWriteBar)
198+
.addWrites(testWriteBaz)
199+
.build()
200+
.toByteArray()
201+
});
202+
db.execSQL(
203+
"INSERT INTO mutations (uid, batch_id, mutations) VALUES (?,?,?)",
204+
new Object[] {
205+
"userB",
206+
4,
207+
WriteBatch.newBuilder().setBatchId(1).addWrites(testWritePending).build().toByteArray()
208+
});
209+
210+
// Create the document indices
211+
db.execSQL(
212+
"INSERT INTO document_mutations (uid, path, batch_id) VALUES (?, ?, ?)",
213+
new Object[] {"userA", EncodedPath.encode(ResourcePath.fromString("docs/foo")), 1});
214+
db.execSQL(
215+
"INSERT INTO document_mutations (uid, path, batch_id) VALUES (?, ?, ?)",
216+
new Object[] {"userA", EncodedPath.encode(ResourcePath.fromString("docs/foo")), 2});
217+
db.execSQL(
218+
"INSERT INTO document_mutations (uid, path, batch_id) VALUES (?, ?, ?)",
219+
new Object[] {"userA", EncodedPath.encode(ResourcePath.fromString("docs/pending")), 5});
220+
db.execSQL(
221+
"INSERT INTO document_mutations (uid, path, batch_id) VALUES (?, ?, ?)",
222+
new Object[] {"userB", EncodedPath.encode(ResourcePath.fromString("docs/bar")), 3});
223+
db.execSQL(
224+
"INSERT INTO document_mutations (uid, path, batch_id) VALUES (?, ?, ?)",
225+
new Object[] {"userB", EncodedPath.encode(ResourcePath.fromString("docs/baz")), 3});
226+
db.execSQL(
227+
"INSERT INTO document_mutations (uid, path, batch_id) VALUES (?, ?, ?)",
228+
new Object[] {"userB", EncodedPath.encode(ResourcePath.fromString("docs/pending")), 4});
229+
230+
// Populate the mutation queues' metadata
231+
db.execSQL(
232+
"INSERT INTO mutation_queues (uid, last_acknowledged_batch_id) VALUES (?, ?)",
233+
new Object[] {"userA", 2});
234+
db.execSQL(
235+
"INSERT INTO mutation_queues (uid, last_acknowledged_batch_id) VALUES (?, ?)",
236+
new Object[] {"userB", 3});
237+
db.execSQL(
238+
"INSERT INTO mutation_queues (uid, last_acknowledged_batch_id) VALUES (?, ?)",
239+
new Object[] {"userC", -1});
240+
241+
schema.runMigrations(5, 6);
242+
243+
// Verify that all but the two pending mutations have been cleared
244+
// by the migration.
245+
new SQLitePersistence.Query(db, "SELECT COUNT(*) FROM mutations")
246+
.first(value -> assertEquals(2, value.getInt(0)));
247+
248+
// Verify that we still have two index entries for the pending documents
249+
new SQLitePersistence.Query(db, "SELECT COUNT(*) FROM document_mutations")
250+
.first(value -> assertEquals(2, value.getInt(0)));
251+
252+
// Verify that we still have one metadata entry for each existing queue
253+
new SQLitePersistence.Query(db, "SELECT COUNT(*) FROM mutation_queues")
254+
.first(value -> assertEquals(3, value.getInt(0)));
255+
}
256+
126257
private void assertNoResultsForQuery(String query, String[] args) {
127258
Cursor cursor = null;
128259
try {

0 commit comments

Comments
 (0)