-
Notifications
You must be signed in to change notification settings - Fork 617
awaitPendingWrites initial revision #689
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
74603b8
9d8841e
a683e2f
86e3abf
67fec18
ec2d7d2
a4e4355
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,11 +36,13 @@ | |
import com.google.firebase.firestore.MetadataChanges; | ||
import com.google.firebase.firestore.QuerySnapshot; | ||
import com.google.firebase.firestore.auth.EmptyCredentialsProvider; | ||
import com.google.firebase.firestore.auth.User; | ||
import com.google.firebase.firestore.core.DatabaseInfo; | ||
import com.google.firebase.firestore.local.Persistence; | ||
import com.google.firebase.firestore.model.DatabaseId; | ||
import com.google.firebase.firestore.testutil.provider.FirestoreProvider; | ||
import com.google.firebase.firestore.util.AsyncQueue; | ||
import com.google.firebase.firestore.util.Listener; | ||
import com.google.firebase.firestore.util.Logger; | ||
import com.google.firebase.firestore.util.Logger.Level; | ||
import java.util.ArrayList; | ||
|
@@ -53,6 +55,34 @@ | |
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.TimeoutException; | ||
|
||
class MockCredentialsProvider extends EmptyCredentialsProvider { | ||
|
||
private static MockCredentialsProvider instance; | ||
|
||
public static MockCredentialsProvider instance() { | ||
if (MockCredentialsProvider.instance == null) { | ||
MockCredentialsProvider.instance = new MockCredentialsProvider(); | ||
} | ||
return MockCredentialsProvider.instance; | ||
} | ||
|
||
private MockCredentialsProvider() { | ||
super(); | ||
} | ||
|
||
@Override | ||
public void setChangeListener(Listener<User> changeListener) { | ||
super.setChangeListener(changeListener); | ||
this.listener = changeListener; | ||
} | ||
|
||
public void changeUserTo(User user) { | ||
listener.onValue(user); | ||
} | ||
|
||
private Listener<User> listener; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Move to top of class. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
} | ||
|
||
/** A set of helper methods for tests */ | ||
public class IntegrationTestUtil { | ||
|
||
|
@@ -239,7 +269,7 @@ public static FirebaseFirestore testFirestore( | |
context, | ||
databaseId, | ||
persistenceKey, | ||
new EmptyCredentialsProvider(), | ||
MockCredentialsProvider.instance(), | ||
asyncQueue, | ||
/*firebaseApp=*/ null, | ||
/*instanceRegistry=*/ (dbId) -> {}); | ||
|
@@ -409,4 +439,8 @@ public static Map<String, Object> toDataMap(QuerySnapshot qrySnap) { | |
public static boolean isRunningAgainstEmulator() { | ||
return CONNECT_TO_EMULATOR; | ||
} | ||
|
||
public static void testChangeUserTo(User user) { | ||
MockCredentialsProvider.instance().changeUserTo(user); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -362,6 +362,24 @@ Task<Void> shutdown() { | |
return shutdownInternal(); | ||
} | ||
|
||
/** | ||
* Waits until all currently pending writes for the active user have been acknowledged by the | ||
* backend. | ||
* | ||
* <p>The returned Task completes immediately if there are no outstanding writes. Otherwise, the | ||
* Task waits for all previously issued writes (including those written in a previous app | ||
* session), but it does not wait for writes that were added after the method is called. If you | ||
* wish to wait for additional writes, you have to call `waitForPendingWrites()` again. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
* | ||
* <p>Any outstanding `waitForPendingWrites()` Tasks are cancelled during user changes. | ||
* | ||
* @return A {@code Task} which resolves when all currently pending writes have been acknowledged | ||
* by the backend. | ||
*/ | ||
Task<Void> waitForPendingWrites() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I still think we should move this comment to a doc (it could be the API proposal doc) and do some collaborative editing:
@mikelehen usually has a ton of good feedback on these comments, and his feedback tends to be much more valuable than what I can provide. Thanks! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The API proposal doc would be a good place to collaborate on this. Note that the "written to the server" language is used in the iOS documentation already which is why I suggested it: https://github.com/firebase/firebase-ios-sdk/blob/master/Firestore/Source/Public/FIRDocumentReference.h#L117. I'm definitely open to finding a better way to succinctly describe this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using Sebastian's version now, looks great. |
||
return client.waitForPendingWrites(); | ||
} | ||
|
||
@VisibleForTesting | ||
AsyncQueue getAsyncQueue() { | ||
return asyncQueue; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ | |
import com.google.android.gms.tasks.TaskCompletionSource; | ||
import com.google.android.gms.tasks.Tasks; | ||
import com.google.common.base.Function; | ||
import com.google.common.collect.Lists; | ||
import com.google.firebase.database.collection.ImmutableSortedMap; | ||
import com.google.firebase.database.collection.ImmutableSortedSet; | ||
import com.google.firebase.firestore.FirebaseFirestoreException; | ||
|
@@ -39,6 +40,7 @@ | |
import com.google.firebase.firestore.model.NoDocument; | ||
import com.google.firebase.firestore.model.SnapshotVersion; | ||
import com.google.firebase.firestore.model.mutation.Mutation; | ||
import com.google.firebase.firestore.model.mutation.MutationBatch; | ||
import com.google.firebase.firestore.model.mutation.MutationBatchResult; | ||
import com.google.firebase.firestore.remote.Datastore; | ||
import com.google.firebase.firestore.remote.RemoteEvent; | ||
|
@@ -133,6 +135,9 @@ interface SyncEngineCallback { | |
/** Stores user completion blocks, indexed by user and batch ID. */ | ||
private final Map<User, Map<Integer, TaskCompletionSource<Void>>> mutationUserCallbacks; | ||
|
||
/** Stores user callbacks waiting for all pending writes to be acknowledged. */ | ||
private final Map<Integer, List<TaskCompletionSource<Void>>> pendingWritesCallbacks; | ||
|
||
/** Used for creating the target IDs for the listens used to resolve limbo documents. */ | ||
private final TargetIdGenerator targetIdGenerator; | ||
|
||
|
@@ -154,6 +159,8 @@ public SyncEngine(LocalStore localStore, RemoteStore remoteStore, User initialUs | |
mutationUserCallbacks = new HashMap<>(); | ||
targetIdGenerator = TargetIdGenerator.forSyncEngine(); | ||
currentUser = initialUser; | ||
|
||
pendingWritesCallbacks = new HashMap<>(); | ||
} | ||
|
||
public void setCallback(SyncEngineCallback callback) { | ||
|
@@ -407,6 +414,8 @@ public void handleSuccessfulWrite(MutationBatchResult mutationBatchResult) { | |
// they consistently happen before listen events. | ||
notifyUser(mutationBatchResult.getBatch().getBatchId(), /*status=*/ null); | ||
|
||
resolvePendingWriteTasks(mutationBatchResult.getBatch().getBatchId()); | ||
|
||
ImmutableSortedMap<DocumentKey, MaybeDocument> changes = | ||
localStore.acknowledgeBatch(mutationBatchResult); | ||
|
||
|
@@ -427,9 +436,63 @@ public void handleRejectedWrite(int batchId, Status status) { | |
// they consistently happen before listen events. | ||
notifyUser(batchId, status); | ||
|
||
resolvePendingWriteTasks(batchId); | ||
|
||
emitNewSnapsAndNotifyLocalStore(changes, /*remoteEvent=*/ null); | ||
} | ||
|
||
/** | ||
* Takes a snapshot of current mutation queue, and register a user task which will resolve when | ||
* all those mutations are either accepted or rejected by the server. | ||
*/ | ||
public void registerPendingWritesTask(TaskCompletionSource<Void> userTask) { | ||
if (!remoteStore.canUseNetwork()) { | ||
Logger.debug( | ||
TAG, | ||
"The network is disabled. The task returned by 'awaitPendingWrites()' will not" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Super nit: We usually leave the space at the end of the line. This helps to align comments with more than two lines. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
+ " complete until the network is enabled."); | ||
} | ||
|
||
int largestPendingBatchId = localStore.getHighestUnacknowledgedBatchId(); | ||
|
||
if (largestPendingBatchId == MutationBatch.UNKNOWN) { | ||
// Complete the task right away if there is no pending writes at the moment. | ||
userTask.setResult(null); | ||
return; | ||
} | ||
|
||
if (pendingWritesCallbacks.containsKey(largestPendingBatchId)) { | ||
pendingWritesCallbacks.get(largestPendingBatchId).add(userTask); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you don't need a multi-map for this case. You should be able to do the following:
Note that this introduces a dependency on other listeners that a user might already have added to the task, but that is likely fine. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMHO., i found this harder to understand, and i don't immediately see benefits of chaining tasks this way. I can be wrong though, but I'd like to see if more people find it better this way. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's fine to stick with the multi-map. |
||
} else { | ||
pendingWritesCallbacks.put(largestPendingBatchId, Lists.newArrayList(userTask)); | ||
This conversation was marked as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
/** Resolves tasks waiting for this batch id to get acknowledged by server, if there is any. */ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/is/are/ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
private void resolvePendingWriteTasks(int batchId) { | ||
if (pendingWritesCallbacks.containsKey(batchId)) { | ||
for (TaskCompletionSource<Void> task : pendingWritesCallbacks.get(batchId)) { | ||
task.setResult(null); | ||
} | ||
|
||
pendingWritesCallbacks.remove(batchId); | ||
} | ||
} | ||
|
||
private void failOutstandingPendingWritesAwaitingTasks() { | ||
for (Map.Entry<Integer, List<TaskCompletionSource<Void>>> entry : | ||
pendingWritesCallbacks.entrySet()) { | ||
for (TaskCompletionSource<Void> task : entry.getValue()) { | ||
task.setException( | ||
new FirebaseFirestoreException( | ||
"'waitForPendingWrites' task is cancelled due to User change.", | ||
FirebaseFirestoreException.Code.CANCELLED)); | ||
} | ||
} | ||
|
||
pendingWritesCallbacks.clear(); | ||
} | ||
|
||
/** Resolves the task corresponding to this write result. */ | ||
private void notifyUser(int batchId, @Nullable Status status) { | ||
Map<Integer, TaskCompletionSource<Void>> userTasks = mutationUserCallbacks.get(currentUser); | ||
|
@@ -562,6 +625,8 @@ public void handleCredentialChange(User user) { | |
currentUser = user; | ||
|
||
if (userChanged) { | ||
// Fails tasks waiting for pending writes requested by previous user. | ||
failOutstandingPendingWritesAwaitingTasks(); | ||
// Notify local store and emit any resulting events from swapping out the mutation queue. | ||
ImmutableSortedMap<DocumentKey, MaybeDocument> changes = localStore.handleUserChange(user); | ||
emitNewSnapsAndNotifyLocalStore(changes, /*remoteEvent=*/ null); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -282,6 +282,14 @@ public ImmutableSortedMap<DocumentKey, MaybeDocument> rejectBatch(int batchId) { | |
}); | ||
} | ||
|
||
/** | ||
* Returns the largest (latest) batch id in mutation queue that is pending server response. | ||
* Returns {@link MutationBatch#UNKNOWN} if the queue is empty. | ||
*/ | ||
public int getHighestUnacknowledgedBatchId() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would advocate for being lazy here and name this consistently between the mutation batch API and the local store API. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
return mutationQueue.getHighestUnacknowledgedBatchId(); | ||
} | ||
|
||
/** Returns the last recorded stream token for the current user. */ | ||
public ByteString getLastStreamToken() { | ||
return mutationQueue.getLastStreamToken(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1143,4 +1143,21 @@ public void testHandlesPatchMutationWithTransformThenRemoteEvent() { | |
assertChanged(doc("foo/bar", 1, map("sum", 1), Document.DocumentState.LOCAL_MUTATIONS)); | ||
assertContains(doc("foo/bar", 1, map("sum", 1), Document.DocumentState.LOCAL_MUTATIONS)); | ||
} | ||
|
||
@Test | ||
public void testGetHighestUnacknowledgedBatchId() { | ||
assertEquals(-1, localStore.getHighestUnacknowledgedBatchId()); | ||
|
||
writeMutation(setMutation("foo/bar", map("abc", 123))); | ||
assertEquals(1, localStore.getHighestUnacknowledgedBatchId()); | ||
|
||
writeMutation(patchMutation("foo/bar", map("abc", 321))); | ||
assertEquals(2, localStore.getHighestUnacknowledgedBatchId()); | ||
|
||
acknowledgeMutation(1); | ||
assertEquals(2, localStore.getHighestUnacknowledgedBatchId()); | ||
|
||
rejectMutation(); | ||
assertEquals(-1, localStore.getHighestUnacknowledgedBatchId()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/-1/MutationBatch.UNKNOWN/ |
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: This super() call is not needed, is it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not.