diff --git a/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/FirestoreTest.java b/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/FirestoreTest.java index 71a50100837..3b2c9d5c3e7 100644 --- a/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/FirestoreTest.java +++ b/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/FirestoreTest.java @@ -17,6 +17,7 @@ import static com.google.firebase.firestore.AccessHelper.getAsyncQueue; import static com.google.firebase.firestore.testutil.IntegrationTestUtil.newTestSettings; import static com.google.firebase.firestore.testutil.IntegrationTestUtil.provider; +import static com.google.firebase.firestore.testutil.IntegrationTestUtil.testChangeUserTo; import static com.google.firebase.firestore.testutil.IntegrationTestUtil.testCollection; import static com.google.firebase.firestore.testutil.IntegrationTestUtil.testCollectionWithDocs; import static com.google.firebase.firestore.testutil.IntegrationTestUtil.testDocument; @@ -44,6 +45,7 @@ import com.google.firebase.Timestamp; import com.google.firebase.firestore.FirebaseFirestoreException.Code; import com.google.firebase.firestore.Query.Direction; +import com.google.firebase.firestore.auth.User; import com.google.firebase.firestore.testutil.EventAccumulator; import com.google.firebase.firestore.testutil.IntegrationTestUtil; import com.google.firebase.firestore.util.AsyncQueue.TimerId; @@ -1096,4 +1098,62 @@ public void testShutdownCalledMultipleTimes() { expectError(() -> waitFor(reference.get()), expectedMessage); } + + @Test + public void testWaitForPendingWritesResolves() { + DocumentReference documentReference = testCollection("abc").document("123"); + FirebaseFirestore firestore = documentReference.getFirestore(); + Map data = map("foo", "bar"); + + waitFor(firestore.disableNetwork()); + Task awaitsPendingWrites1 = firestore.waitForPendingWrites(); + Task pendingWrite = documentReference.set(data); + Task awaitsPendingWrites2 = firestore.waitForPendingWrites(); + + // `awaitsPendingWrites1` completes immediately because there are no pending writes at + // the time it is created. + waitFor(awaitsPendingWrites1); + assertTrue(awaitsPendingWrites1.isComplete() && awaitsPendingWrites1.isSuccessful()); + assertTrue(!pendingWrite.isComplete()); + assertTrue(!awaitsPendingWrites2.isComplete()); + + firestore.enableNetwork(); + waitFor(awaitsPendingWrites2); + assertTrue(awaitsPendingWrites2.isComplete() && awaitsPendingWrites2.isSuccessful()); + } + + @Test + public void testWaitForPendingWritesFailsWhenUserChanges() { + DocumentReference documentReference = testCollection("abc").document("123"); + FirebaseFirestore firestore = documentReference.getFirestore(); + Map data = map("foo", "bar"); + + // Prevent pending writes receiving acknowledgement. + waitFor(firestore.disableNetwork()); + Task pendingWrite = documentReference.set(data); + Task awaitsPendingWrites = firestore.waitForPendingWrites(); + assertTrue(!pendingWrite.isComplete()); + assertTrue(!awaitsPendingWrites.isComplete()); + + testChangeUserTo(new User("new user")); + + assertTrue(!pendingWrite.isComplete()); + assertEquals( + "'waitForPendingWrites' task is cancelled due to User change.", + waitForException(awaitsPendingWrites).getMessage()); + } + + @Test + public void testPendingWriteTaskResolveWhenOfflineIfThereIsNoPending() { + DocumentReference documentReference = testCollection("abc").document("123"); + FirebaseFirestore firestore = documentReference.getFirestore(); + Map data = map("foo", "bar"); + + // Prevent pending writes receiving acknowledgement. + waitFor(firestore.disableNetwork()); + Task awaitsPendingWrites = firestore.waitForPendingWrites(); + waitFor(awaitsPendingWrites); + + assertTrue(awaitsPendingWrites.isComplete() && awaitsPendingWrites.isSuccessful()); + } } diff --git a/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/testutil/IntegrationTestUtil.java b/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/testutil/IntegrationTestUtil.java index 0971434e999..f3c54c8e12b 100644 --- a/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/testutil/IntegrationTestUtil.java +++ b/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/testutil/IntegrationTestUtil.java @@ -36,11 +36,13 @@ import com.google.firebase.firestore.MetadataChanges; import com.google.firebase.firestore.QuerySnapshot; import com.google.firebase.firestore.auth.EmptyCredentialsProvider; +import com.google.firebase.firestore.auth.User; import com.google.firebase.firestore.core.DatabaseInfo; import com.google.firebase.firestore.local.Persistence; import com.google.firebase.firestore.model.DatabaseId; import com.google.firebase.firestore.testutil.provider.FirestoreProvider; import com.google.firebase.firestore.util.AsyncQueue; +import com.google.firebase.firestore.util.Listener; import com.google.firebase.firestore.util.Logger; import com.google.firebase.firestore.util.Logger.Level; import java.util.ArrayList; @@ -53,6 +55,31 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +class MockCredentialsProvider extends EmptyCredentialsProvider { + + private static MockCredentialsProvider instance; + private Listener listener; + + public static MockCredentialsProvider instance() { + if (MockCredentialsProvider.instance == null) { + MockCredentialsProvider.instance = new MockCredentialsProvider(); + } + return MockCredentialsProvider.instance; + } + + private MockCredentialsProvider() {} + + @Override + public void setChangeListener(Listener changeListener) { + super.setChangeListener(changeListener); + this.listener = changeListener; + } + + public void changeUserTo(User user) { + listener.onValue(user); + } +} + /** A set of helper methods for tests */ public class IntegrationTestUtil { @@ -239,7 +266,7 @@ public static FirebaseFirestore testFirestore( context, databaseId, persistenceKey, - new EmptyCredentialsProvider(), + MockCredentialsProvider.instance(), asyncQueue, /*firebaseApp=*/ null, /*instanceRegistry=*/ (dbId) -> {}); @@ -409,4 +436,8 @@ public static Map toDataMap(QuerySnapshot qrySnap) { public static boolean isRunningAgainstEmulator() { return CONNECT_TO_EMULATOR; } + + public static void testChangeUserTo(User user) { + MockCredentialsProvider.instance().changeUserTo(user); + } } diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/FirebaseFirestore.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/FirebaseFirestore.java index efd1877425a..d1b3099b20d 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/FirebaseFirestore.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/FirebaseFirestore.java @@ -362,6 +362,24 @@ Task shutdown() { return shutdownInternal(); } + /** + * Waits until all currently pending writes for the active user have been acknowledged by the + * backend. + * + *

The returned Task completes immediately if there are no outstanding writes. Otherwise, the + * Task waits for all previously issued writes (including those written in a previous app + * session), but it does not wait for writes that were added after the method is called. If you + * wish to wait for additional writes, you have to call {@code waitForPendingWrites()} again. + * + *

Any outstanding {@code waitForPendingWrites()} Tasks are cancelled during user changes. + * + * @return A {@code Task} which resolves when all currently pending writes have been acknowledged + * by the backend. + */ + Task waitForPendingWrites() { + return client.waitForPendingWrites(); + } + @VisibleForTesting AsyncQueue getAsyncQueue() { return asyncQueue; diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/core/FirestoreClient.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/core/FirestoreClient.java index fc85ac5cb21..fb8b23c7948 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/core/FirestoreClient.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/core/FirestoreClient.java @@ -224,6 +224,18 @@ public Task transaction( () -> syncEngine.transaction(asyncQueue, updateFunction, retries)); } + /** + * Returns a task resolves when all the pending writes at the time when this method is called + * received server acknowledgement. An acknowledgement can be either acceptance or rejections. + */ + public Task waitForPendingWrites() { + this.verifyNotShutdown(); + + final TaskCompletionSource source = new TaskCompletionSource<>(); + asyncQueue.enqueueAndForget(() -> syncEngine.registerPendingWritesTask(source)); + return source.getTask(); + } + private void initialize(Context context, User user, boolean usePersistence, long cacheSizeBytes) { // Note: The initialization work must all be synchronous (we can't dispatch more work) since // external write/listen operations could get queued to run before that subsequent work diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/core/SyncEngine.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/core/SyncEngine.java index b11fc91c7fd..e03694637e4 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/core/SyncEngine.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/core/SyncEngine.java @@ -23,6 +23,7 @@ import com.google.android.gms.tasks.TaskCompletionSource; import com.google.android.gms.tasks.Tasks; import com.google.common.base.Function; +import com.google.common.collect.Lists; import com.google.firebase.database.collection.ImmutableSortedMap; import com.google.firebase.database.collection.ImmutableSortedSet; import com.google.firebase.firestore.FirebaseFirestoreException; @@ -39,6 +40,7 @@ import com.google.firebase.firestore.model.NoDocument; import com.google.firebase.firestore.model.SnapshotVersion; import com.google.firebase.firestore.model.mutation.Mutation; +import com.google.firebase.firestore.model.mutation.MutationBatch; import com.google.firebase.firestore.model.mutation.MutationBatchResult; import com.google.firebase.firestore.remote.Datastore; import com.google.firebase.firestore.remote.RemoteEvent; @@ -133,6 +135,9 @@ interface SyncEngineCallback { /** Stores user completion blocks, indexed by user and batch ID. */ private final Map>> mutationUserCallbacks; + /** Stores user callbacks waiting for all pending writes to be acknowledged. */ + private final Map>> pendingWritesCallbacks; + /** Used for creating the target IDs for the listens used to resolve limbo documents. */ private final TargetIdGenerator targetIdGenerator; @@ -154,6 +159,8 @@ public SyncEngine(LocalStore localStore, RemoteStore remoteStore, User initialUs mutationUserCallbacks = new HashMap<>(); targetIdGenerator = TargetIdGenerator.forSyncEngine(); currentUser = initialUser; + + pendingWritesCallbacks = new HashMap<>(); } public void setCallback(SyncEngineCallback callback) { @@ -407,6 +414,8 @@ public void handleSuccessfulWrite(MutationBatchResult mutationBatchResult) { // they consistently happen before listen events. notifyUser(mutationBatchResult.getBatch().getBatchId(), /*status=*/ null); + resolvePendingWriteTasks(mutationBatchResult.getBatch().getBatchId()); + ImmutableSortedMap changes = localStore.acknowledgeBatch(mutationBatchResult); @@ -427,9 +436,63 @@ public void handleRejectedWrite(int batchId, Status status) { // they consistently happen before listen events. notifyUser(batchId, status); + resolvePendingWriteTasks(batchId); + emitNewSnapsAndNotifyLocalStore(changes, /*remoteEvent=*/ null); } + /** + * Takes a snapshot of current mutation queue, and register a user task which will resolve when + * all those mutations are either accepted or rejected by the server. + */ + public void registerPendingWritesTask(TaskCompletionSource userTask) { + if (!remoteStore.canUseNetwork()) { + Logger.debug( + TAG, + "The network is disabled. The task returned by 'awaitPendingWrites()' will not " + + "complete until the network is enabled."); + } + + int largestPendingBatchId = localStore.getHighestUnacknowledgedBatchId(); + + if (largestPendingBatchId == MutationBatch.UNKNOWN) { + // Complete the task right away if there is no pending writes at the moment. + userTask.setResult(null); + return; + } + + if (pendingWritesCallbacks.containsKey(largestPendingBatchId)) { + pendingWritesCallbacks.get(largestPendingBatchId).add(userTask); + } else { + pendingWritesCallbacks.put(largestPendingBatchId, Lists.newArrayList(userTask)); + } + } + + /** Resolves tasks waiting for this batch id to get acknowledged by server, if there are any. */ + private void resolvePendingWriteTasks(int batchId) { + if (pendingWritesCallbacks.containsKey(batchId)) { + for (TaskCompletionSource task : pendingWritesCallbacks.get(batchId)) { + task.setResult(null); + } + + pendingWritesCallbacks.remove(batchId); + } + } + + private void failOutstandingPendingWritesAwaitingTasks() { + for (Map.Entry>> entry : + pendingWritesCallbacks.entrySet()) { + for (TaskCompletionSource task : entry.getValue()) { + task.setException( + new FirebaseFirestoreException( + "'waitForPendingWrites' task is cancelled due to User change.", + FirebaseFirestoreException.Code.CANCELLED)); + } + } + + pendingWritesCallbacks.clear(); + } + /** Resolves the task corresponding to this write result. */ private void notifyUser(int batchId, @Nullable Status status) { Map> userTasks = mutationUserCallbacks.get(currentUser); @@ -562,6 +625,8 @@ public void handleCredentialChange(User user) { currentUser = user; if (userChanged) { + // Fails tasks waiting for pending writes requested by previous user. + failOutstandingPendingWritesAwaitingTasks(); // Notify local store and emit any resulting events from swapping out the mutation queue. ImmutableSortedMap changes = localStore.handleUserChange(user); emitNewSnapsAndNotifyLocalStore(changes, /*remoteEvent=*/ null); diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalStore.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalStore.java index 0e92e76c2c2..8d2d2aafecb 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalStore.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalStore.java @@ -282,6 +282,14 @@ public ImmutableSortedMap rejectBatch(int batchId) { }); } + /** + * Returns the largest (latest) batch id in mutation queue that is pending server response. + * Returns {@link MutationBatch#UNKNOWN} if the queue is empty. + */ + public int getHighestUnacknowledgedBatchId() { + return mutationQueue.getHighestUnacknowledgedBatchId(); + } + /** Returns the last recorded stream token for the current user. */ public ByteString getLastStreamToken() { return mutationQueue.getLastStreamToken(); diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/MemoryMutationQueue.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/MemoryMutationQueue.java index 40330618760..f113580f422 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/MemoryMutationQueue.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/MemoryMutationQueue.java @@ -187,6 +187,11 @@ public MutationBatch getNextMutationBatchAfterBatchId(int batchId) { return queue.size() > index ? queue.get(index) : null; } + @Override + public int getHighestUnacknowledgedBatchId() { + return queue.isEmpty() ? MutationBatch.UNKNOWN : nextBatchId - 1; + } + @Override public List getAllMutationBatches() { return Collections.unmodifiableList(queue); diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/MutationQueue.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/MutationQueue.java index 0279914b96c..c400845a4b8 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/MutationQueue.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/MutationQueue.java @@ -73,6 +73,12 @@ MutationBatch addMutationBatch( @Nullable MutationBatch getNextMutationBatchAfterBatchId(int batchId); + /** + * @return The largest (latest) batch id in mutation queue for the current user that is pending + * server response, {@link MutationBatch#UNKNOWN} if the queue is empty. + */ + int getHighestUnacknowledgedBatchId(); + /** Returns all mutation batches in the mutation queue. */ // TODO: PERF: Current consumer only needs mutated keys; if we can provide that // cheaply, we should replace this. diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java index c6f208d4c59..3f7fe49c79f 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java @@ -249,6 +249,13 @@ public MutationBatch getNextMutationBatchAfterBatchId(int batchId) { .firstValue(row -> decodeInlineMutationBatch(row.getInt(0), row.getBlob(1))); } + @Override + public int getHighestUnacknowledgedBatchId() { + return db.query("SELECT IFNULL(MAX(batch_id), ?) FROM mutations WHERE uid = ?") + .binding(MutationBatch.UNKNOWN, uid) + .firstValue(row -> row.getInt(0)); + } + @Override public List getAllMutationBatches() { List result = new ArrayList<>(); diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java index a964a00d72b..476d88db342 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java @@ -478,7 +478,7 @@ private void handleWatchStreamClose(Status status) { } } - private boolean canUseNetwork() { + public boolean canUseNetwork() { // PORTING NOTE: This method exists mostly because web also has to take into account primary // vs. secondary state. return networkEnabled; diff --git a/firebase-firestore/src/test/java/com/google/firebase/firestore/local/LocalStoreTestCase.java b/firebase-firestore/src/test/java/com/google/firebase/firestore/local/LocalStoreTestCase.java index 93e45b30848..8a6d93fce30 100644 --- a/firebase-firestore/src/test/java/com/google/firebase/firestore/local/LocalStoreTestCase.java +++ b/firebase-firestore/src/test/java/com/google/firebase/firestore/local/LocalStoreTestCase.java @@ -1143,4 +1143,21 @@ public void testHandlesPatchMutationWithTransformThenRemoteEvent() { assertChanged(doc("foo/bar", 1, map("sum", 1), Document.DocumentState.LOCAL_MUTATIONS)); assertContains(doc("foo/bar", 1, map("sum", 1), Document.DocumentState.LOCAL_MUTATIONS)); } + + @Test + public void testGetHighestUnacknowledgedBatchId() { + assertEquals(MutationBatch.UNKNOWN, localStore.getHighestUnacknowledgedBatchId()); + + writeMutation(setMutation("foo/bar", map("abc", 123))); + assertEquals(1, localStore.getHighestUnacknowledgedBatchId()); + + writeMutation(patchMutation("foo/bar", map("abc", 321))); + assertEquals(2, localStore.getHighestUnacknowledgedBatchId()); + + acknowledgeMutation(1); + assertEquals(2, localStore.getHighestUnacknowledgedBatchId()); + + rejectMutation(); + assertEquals(MutationBatch.UNKNOWN, localStore.getHighestUnacknowledgedBatchId()); + } }