Skip to content

Schema migration that drops held write acks #33

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package com.google.firebase.firestore.core;

import static com.google.firebase.firestore.util.Assert.fail;

import com.google.android.gms.tasks.Task;
import com.google.android.gms.tasks.Tasks;
import com.google.firebase.firestore.FirebaseFirestoreException;
Expand Down Expand Up @@ -41,8 +43,6 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

import static com.google.firebase.firestore.util.Assert.fail;

/**
* Internal transaction object responsible for accumulating the mutations to perform and the base
* versions for any documents read.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ int execute(SQLiteStatement statement, Object... args) {
* chaining further methods off the query.
*/
Query query(String sql) {
return new Query(sql);
return new Query(db, sql);
}

/**
Expand Down Expand Up @@ -326,11 +326,13 @@ Query query(String sql) {
* return result;
* </pre>
*/
class Query {
static class Query {
private final SQLiteDatabase db;
private final String sql;
private CursorFactory cursorFactory;

private Query(String sql) {
Query(SQLiteDatabase db, String sql) {
this.db = db;
this.sql = sql;
}

Expand Down Expand Up @@ -464,7 +466,7 @@ private Cursor startQuery() {
* This method bridges the gap by examining the types of the bindArgs and calling to the
* appropriate bind method on the program.
*/
private void bind(SQLiteProgram program, Object[] bindArgs) {
private static void bind(SQLiteProgram program, Object[] bindArgs) {
for (int i = 0; i < bindArgs.length; i++) {
Object arg = bindArgs[i];
if (arg == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@

package com.google.firebase.firestore.local;

import static com.google.firebase.firestore.util.Assert.hardAssert;

import android.content.ContentValues;
import android.database.DatabaseUtils;
import android.database.sqlite.SQLiteDatabase;
import android.database.sqlite.SQLiteStatement;
import com.google.common.base.Preconditions;

/**
* Migrates schemas from version 0 (empty) to whatever the current version is.
Expand All @@ -34,10 +38,11 @@ class SQLiteSchema {
* The version of the schema. Increase this by one for each migration added to runMigrations
* below.
*/
static final int VERSION = (Persistence.INDEXING_SUPPORT_ENABLED) ? 6 : 5;
static final int VERSION = (Persistence.INDEXING_SUPPORT_ENABLED) ? 7 : 6;

private final SQLiteDatabase db;

// PORTING NOTE: The Android client doesn't need to use a serializer to remove held write acks.
SQLiteSchema(SQLiteDatabase db) {
this.db = db;
}
Expand Down Expand Up @@ -90,9 +95,12 @@ void runMigrations(int fromVersion, int toVersion) {
}

if (fromVersion < 6 && toVersion >= 6) {
if (Persistence.INDEXING_SUPPORT_ENABLED) {
createLocalDocumentsCollectionIndex();
}
removeAcknowledgedMutations();
}

if (fromVersion < 7 && toVersion >= 7) {
Preconditions.checkState(Persistence.INDEXING_SUPPORT_ENABLED);
createLocalDocumentsCollectionIndex();
}
}

Expand Down Expand Up @@ -122,6 +130,38 @@ private void createMutationQueue() {
+ "PRIMARY KEY (uid, path, batch_id))");
}

private void removeAcknowledgedMutations() {
SQLitePersistence.Query mutationQueuesQuery =
new SQLitePersistence.Query(
db, "SELECT uid, last_acknowledged_batch_id FROM mutation_queues");

mutationQueuesQuery.forEach(
mutationQueueEntry -> {
String uid = mutationQueueEntry.getString(0);
long lastAcknowledgedBatchId = mutationQueueEntry.getLong(1);

SQLitePersistence.Query mutationsQuery =
new SQLitePersistence.Query(
db, "SELECT batch_id FROM mutations WHERE uid = ? AND batch_id <= ?")
.binding(uid, lastAcknowledgedBatchId);
mutationsQuery.forEach(value -> removeMutationBatch(uid, value.getInt(0)));
});
}

private void removeMutationBatch(String uid, int batchId) {
SQLiteStatement mutationDeleter =
db.compileStatement("DELETE FROM mutations WHERE uid = ? AND batch_id = ?");
mutationDeleter.bindString(1, uid);
mutationDeleter.bindLong(2, batchId);
int deleted = mutationDeleter.executeUpdateDelete();
hardAssert(deleted != 0, "Mutatiohn batch (%s, %d) did not exist", uid, batchId);

// Delete all index entries for this batch
db.execSQL(
"DELETE FROM document_mutations WHERE uid = ? AND batch_id = ?",
new Object[] {uid, batchId});
}

private void createQueryCache() {
// A cache of targets and associated metadata
db.execSQL(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
import android.database.sqlite.SQLiteDatabase;
import android.database.sqlite.SQLiteOpenHelper;
import com.google.firebase.firestore.model.DatabaseId;
import com.google.firebase.firestore.model.ResourcePath;
import com.google.firebase.firestore.proto.WriteBatch;
import com.google.firestore.v1beta1.Document;
import com.google.firestore.v1beta1.Write;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -123,6 +127,68 @@ public void testDatabaseName() {
"[DEFAULT]", DatabaseId.forDatabase("my-project", "my-database")));
}

@Test
public void dropsHeldWriteAcks() {
// This test creates a database with schema version 5 that has two users, both of which have
// acknowledged mutations that haven't yet been removed from IndexedDb ("heldWriteAcks").
// Schema version 6 removes heldWriteAcks, and as such these mutations are deleted.
schema.runMigrations(0, 5);

// User 'userA' has two acknowledged mutations and one that is pending.
// User 'userB' has one acknowledged mutation and one that is pending.
addMutationBatch(db, 1, "userA", "docs/foo");
addMutationBatch(db, 2, "userA", "docs/foo");
addMutationBatch(db, 3, "userB", "docs/bar", "doc/baz");
addMutationBatch(db, 4, "userB", "docs/pending");
addMutationBatch(db, 5, "userA", "docs/pending");

// Populate the mutation queues' metadata
db.execSQL(
"INSERT INTO mutation_queues (uid, last_acknowledged_batch_id) VALUES (?, ?)",
new Object[] {"userA", 2});
db.execSQL(
"INSERT INTO mutation_queues (uid, last_acknowledged_batch_id) VALUES (?, ?)",
new Object[] {"userB", 3});
db.execSQL(
"INSERT INTO mutation_queues (uid, last_acknowledged_batch_id) VALUES (?, ?)",
new Object[] {"userC", -1});

schema.runMigrations(5, 6);

// Verify that all but the two pending mutations have been cleared by the migration.
new SQLitePersistence.Query(db, "SELECT COUNT(*) FROM mutations")
.first(value -> assertEquals(2, value.getInt(0)));

// Verify that we still have two index entries for the pending documents
new SQLitePersistence.Query(db, "SELECT COUNT(*) FROM document_mutations")
.first(value -> assertEquals(2, value.getInt(0)));

// Verify that we still have one metadata entry for each existing queue
new SQLitePersistence.Query(db, "SELECT COUNT(*) FROM mutation_queues")
.first(value -> assertEquals(3, value.getInt(0)));
}

private void addMutationBatch(SQLiteDatabase db, int batchId, String uid, String... docs) {
WriteBatch.Builder write = WriteBatch.newBuilder();
write.setBatchId(batchId);

for (String doc : docs) {
db.execSQL(
"INSERT INTO document_mutations (uid, path, batch_id) VALUES (?, ?, ?)",
new Object[] {uid, EncodedPath.encode(ResourcePath.fromString(doc)), batchId});

write.addWrites(
Write.newBuilder()
.setUpdate(
Document.newBuilder()
.setName("projects/projectId/databases/(default)/documents/" + doc)));
}

db.execSQL(
"INSERT INTO mutations (uid, batch_id, mutations) VALUES (?,?,?)",
new Object[] {uid, batchId, write.build().toByteArray()});
}

private void assertNoResultsForQuery(String query, String[] args) {
Cursor cursor = null;
try {
Expand Down