Skip to content

Schema migration that drops held write acks #33

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package com.google.firebase.firestore.core;

import static com.google.firebase.firestore.util.Assert.fail;

import com.google.android.gms.tasks.Task;
import com.google.android.gms.tasks.Tasks;
import com.google.firebase.firestore.FirebaseFirestoreException;
Expand Down Expand Up @@ -41,8 +43,6 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

import static com.google.firebase.firestore.util.Assert.fail;

/**
* Internal transaction object responsible for accumulating the mutations to perform and the base
* versions for any documents read.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static String databaseName(String persistenceKey, DatabaseId databaseId)
public SQLitePersistence(
Context context, String persistenceKey, DatabaseId databaseId, LocalSerializer serializer) {
String databaseName = databaseName(persistenceKey, databaseId);
this.opener = new OpenHelper(context, databaseName);
this.opener = new OpenHelper(context, serializer, databaseName);
this.serializer = serializer;
this.queryCache = new SQLiteQueryCache(this, this.serializer);
this.remoteDocumentCache = new SQLiteRemoteDocumentCache(this, this.serializer);
Expand Down Expand Up @@ -194,10 +194,12 @@ <T> T runTransaction(String action, Supplier<T> operation) {
*/
private static class OpenHelper extends SQLiteOpenHelper {

private final LocalSerializer serializer;
private boolean configured;

OpenHelper(Context context, String databaseName) {
OpenHelper(Context context, LocalSerializer serializer, String databaseName) {
super(context, databaseName, null, SQLiteSchema.VERSION);
this.serializer = serializer;
}

@Override
Expand All @@ -223,13 +225,13 @@ private void ensureConfigured(SQLiteDatabase db) {
@Override
public void onCreate(SQLiteDatabase db) {
ensureConfigured(db);
new SQLiteSchema(db).runMigrations(0);
new SQLiteSchema(db, serializer).runMigrations(0);
}

@Override
public void onUpgrade(SQLiteDatabase db, int oldVersion, int newVersion) {
ensureConfigured(db);
new SQLiteSchema(db).runMigrations(oldVersion);
new SQLiteSchema(db, serializer).runMigrations(oldVersion);
}

@Override
Expand Down Expand Up @@ -286,7 +288,7 @@ int execute(SQLiteStatement statement, Object... args) {
* chaining further methods off the query.
*/
Query query(String sql) {
return new Query(sql);
return new Query(db, sql);
}

/**
Expand Down Expand Up @@ -326,11 +328,13 @@ Query query(String sql) {
* return result;
* </pre>
*/
class Query {
static class Query {
private final SQLiteDatabase db;
private final String sql;
private CursorFactory cursorFactory;

private Query(String sql) {
Query(SQLiteDatabase db, String sql) {
this.db = db;
this.sql = sql;
}

Expand Down Expand Up @@ -464,7 +468,7 @@ private Cursor startQuery() {
* This method bridges the gap by examining the types of the bindArgs and calling to the
* appropriate bind method on the program.
*/
private void bind(SQLiteProgram program, Object[] bindArgs) {
private static void bind(SQLiteProgram program, Object[] bindArgs) {
for (int i = 0; i < bindArgs.length; i++) {
Object arg = bindArgs[i];
if (arg == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,18 @@

package com.google.firebase.firestore.local;

import static com.google.firebase.firestore.util.Assert.fail;
import static com.google.firebase.firestore.util.Assert.hardAssert;

import android.content.ContentValues;
import android.database.DatabaseUtils;
import android.database.sqlite.SQLiteDatabase;
import android.database.sqlite.SQLiteStatement;
import com.google.common.base.Preconditions;
import com.google.firebase.firestore.model.DocumentKey;
import com.google.firebase.firestore.model.mutation.Mutation;
import com.google.firebase.firestore.model.mutation.MutationBatch;
import com.google.protobuf.InvalidProtocolBufferException;

/**
* Migrates schemas from version 0 (empty) to whatever the current version is.
Expand All @@ -34,12 +43,14 @@ class SQLiteSchema {
* The version of the schema. Increase this by one for each migration added to runMigrations
* below.
*/
static final int VERSION = (Persistence.INDEXING_SUPPORT_ENABLED) ? 6 : 5;
static final int VERSION = (Persistence.INDEXING_SUPPORT_ENABLED) ? 7 : 6;

private final SQLiteDatabase db;
private final LocalSerializer serializer;

SQLiteSchema(SQLiteDatabase db) {
SQLiteSchema(SQLiteDatabase db, LocalSerializer serializer) {
this.db = db;
this.serializer = serializer;
}

void runMigrations() {
Expand Down Expand Up @@ -90,9 +101,12 @@ void runMigrations(int fromVersion, int toVersion) {
}

if (fromVersion < 6 && toVersion >= 6) {
if (Persistence.INDEXING_SUPPORT_ENABLED) {
createLocalDocumentsCollectionIndex();
}
removeAcknowledgedMutations();
}

if (fromVersion < 7 && toVersion >= 7) {
Preconditions.checkState(Persistence.INDEXING_SUPPORT_ENABLED);
createLocalDocumentsCollectionIndex();
}
}

Expand Down Expand Up @@ -122,6 +136,61 @@ private void createMutationQueue() {
+ "PRIMARY KEY (uid, path, batch_id))");
}

private void removeAcknowledgedMutations() {
SQLitePersistence.Query mutationQueuesQuery =
new SQLitePersistence.Query(
db, "SELECT uid, last_acknowledged_batch_id FROM mutation_queues");

mutationQueuesQuery.forEach(
mutationQueueEntry -> {
String uid = mutationQueueEntry.getString(0);
long lastAcknowledgedBatchId = mutationQueueEntry.getLong(1);

SQLitePersistence.Query mutationsQuery =
new SQLitePersistence.Query(
db, "SELECT mutations FROM mutations WHERE uid = ? AND batch_id <= ?")
.binding(uid, lastAcknowledgedBatchId);
mutationsQuery.forEach(
value -> {
try {
MutationBatch batch =
serializer.decodeMutationBatch(
com.google.firebase.firestore.proto.WriteBatch.parseFrom(
value.getBlob(0)));
removeMutationBatch(uid, batch);
} catch (InvalidProtocolBufferException e) {
throw fail("MutationBatch failed to parse: %s", e);
}
});
});
}

private void removeMutationBatch(String uid, MutationBatch batch) {
int batchId = batch.getBatchId();

SQLiteStatement mutationDeleter =
db.compileStatement("DELETE FROM mutations WHERE uid = ? AND batch_id = ?");
mutationDeleter.bindString(1, uid);
mutationDeleter.bindLong(2, batchId);
int deleted = mutationDeleter.executeUpdateDelete();
hardAssert(deleted != 0, "Mutation batch (%s, %d) did not exist", uid, batch.getBatchId());

SQLiteStatement indexDeleter =
db.compileStatement(
"DELETE FROM document_mutations WHERE uid = ? AND path = ? AND batch_id = ?");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the path? Don't we want to delete all rows from a batch? Could we do DELETE FROM document_mutations WHERE uid = ? AND batch_id = ? and then maybe not even need the serializer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whaaaaaa. Nice. I didn't even think about that.


for (Mutation mutation : batch.getMutations()) {
DocumentKey key = mutation.getKey();
String path = EncodedPath.encode(key.getPath());
indexDeleter.bindString(1, uid);
indexDeleter.bindString(2, path);
indexDeleter.bindLong(3, batchId);
deleted = indexDeleter.executeUpdateDelete();
hardAssert(
deleted != 0, "Index entry (%s, %s, %d) did not exist", uid, key, batch.getBatchId());
}
}

private void createQueryCache() {
// A cache of targets and associated metadata
db.execSQL(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
import android.database.sqlite.SQLiteDatabase;
import android.database.sqlite.SQLiteOpenHelper;
import com.google.firebase.firestore.model.DatabaseId;
import com.google.firebase.firestore.model.ResourcePath;
import com.google.firebase.firestore.proto.WriteBatch;
import com.google.firebase.firestore.remote.RemoteSerializer;
import com.google.firestore.v1beta1.Document;
import com.google.firestore.v1beta1.Write;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -51,7 +56,9 @@ public void onCreate(SQLiteDatabase db) {}
public void onUpgrade(SQLiteDatabase db, int oldVersion, int newVersion) {}
};
db = opener.getWritableDatabase();
schema = new SQLiteSchema(db);
schema =
new SQLiteSchema(
db, new LocalSerializer(new RemoteSerializer(DatabaseId.forProject("projectId"))));
}

@After
Expand Down Expand Up @@ -123,6 +130,68 @@ public void testDatabaseName() {
"[DEFAULT]", DatabaseId.forDatabase("my-project", "my-database")));
}

@Test
public void dropsHeldWriteAcks() {
// This test creates a database with schema version 5 that has two users, both of which have
// acknowledged mutations that haven't yet been removed from IndexedDb ("heldWriteAcks").
// Schema version 6 removes heldWriteAcks, and as such these mutations are deleted.
schema.runMigrations(0, 5);

// User 'userA' has two acknowledged mutations and one that is pending.
// User 'userB' has one acknowledged mutation and one that is pending.
addMutationBatch(db, 1, "userA", "docs/foo");
addMutationBatch(db, 2, "userA", "docs/foo");
addMutationBatch(db, 3, "userB", "docs/bar", "doc/baz");
addMutationBatch(db, 4, "userB", "docs/pending");
addMutationBatch(db, 5, "userA", "docs/pending");

// Populate the mutation queues' metadata
db.execSQL(
"INSERT INTO mutation_queues (uid, last_acknowledged_batch_id) VALUES (?, ?)",
new Object[] {"userA", 2});
db.execSQL(
"INSERT INTO mutation_queues (uid, last_acknowledged_batch_id) VALUES (?, ?)",
new Object[] {"userB", 3});
db.execSQL(
"INSERT INTO mutation_queues (uid, last_acknowledged_batch_id) VALUES (?, ?)",
new Object[] {"userC", -1});

schema.runMigrations(5, 6);

// Verify that all but the two pending mutations have been cleared by the migration.
new SQLitePersistence.Query(db, "SELECT COUNT(*) FROM mutations")
.first(value -> assertEquals(2, value.getInt(0)));

// Verify that we still have two index entries for the pending documents
new SQLitePersistence.Query(db, "SELECT COUNT(*) FROM document_mutations")
.first(value -> assertEquals(2, value.getInt(0)));

// Verify that we still have one metadata entry for each existing queue
new SQLitePersistence.Query(db, "SELECT COUNT(*) FROM mutation_queues")
.first(value -> assertEquals(3, value.getInt(0)));
}

private void addMutationBatch(SQLiteDatabase db, int batchId, String uid, String... docs) {
WriteBatch.Builder write = WriteBatch.newBuilder();
write.setBatchId(batchId);

for (String doc : docs) {
db.execSQL(
"INSERT INTO document_mutations (uid, path, batch_id) VALUES (?, ?, ?)",
new Object[] {uid, EncodedPath.encode(ResourcePath.fromString(doc)), batchId});

write.addWrites(
Write.newBuilder()
.setUpdate(
Document.newBuilder()
.setName("projects/projectId/databases/(default)/documents/" + doc)));
}

db.execSQL(
"INSERT INTO mutations (uid, batch_id, mutations) VALUES (?,?,?)",
new Object[] {uid, batchId, write.build().toByteArray()});
}

private void assertNoResultsForQuery(String query, String[] args) {
Cursor cursor = null;
try {
Expand Down