-
Notifications
You must be signed in to change notification settings - Fork 617
awaitPendingWrites initial revision #689
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
74603b8
9d8841e
a683e2f
86e3abf
67fec18
ec2d7d2
a4e4355
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -362,6 +362,18 @@ Task<Void> shutdown() { | |
return shutdownInternal(); | ||
} | ||
|
||
/** | ||
* Wait for server acknowledgement for all pending writes existing at the time of calling this | ||
* method. | ||
* | ||
* <p>Both acceptance and rejection count as server acknowledgement. | ||
* | ||
* @return A {@link Task} which resolves when all pending writes are acknowledged by the server. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We likely want to update the comment a bit:
If you want to, we can tweak this comment in the chatroom. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah. I made some change, but not enough. Especially i don't understand your comment about client restart, and how this still works with a restart. Let's chat tmr. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One of the reasons that this is a heavily requested feature is that it offers users the ability to monitor the state of writes that were issued during an earlier app session.If a user issues a write and then closes the app before the write is committed, it is not very straightforward to figure out whether the write made it to the backend once the app is opened again. Just for reference: This can be done today by either issuing a dummy write and waiting for its completion (one would have to know that we commit writes sequentially) or by issuing gets for all documents that were written and checking the |
||
*/ | ||
Task<Void> awaitPendingWrites() { | ||
return client.awaitPendingWrites(); | ||
} | ||
|
||
@VisibleForTesting | ||
AsyncQueue getAsyncQueue() { | ||
return asyncQueue; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -224,6 +224,24 @@ public <TResult> Task<TResult> transaction( | |
() -> syncEngine.transaction(asyncQueue, updateFunction, retries)); | ||
} | ||
|
||
/** | ||
* Returns a task resolves when all the pending writes at the time when this method is called | ||
* received server acknowledgement. An acknowledgement can be either acceptance or rejections. | ||
*/ | ||
public Task<Void> awaitPendingWrites() { | ||
this.verifyNotShutdown(); | ||
if (!remoteStore.canUseNetwork()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should run on the async queue:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the catch. |
||
Logger.warn( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am a little torn on whether this is a "warning" - it is certainly not a problem in the SDK itself, and technically not even for users of the SDK. It prevents unexpected behavior - so I would advocate to lower the log level here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
LOG_TAG, | ||
"Network is disabled, the Task created to wait for all writes getting" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can simplify this: "The network is disabled. The task returned by awaitPendingWrites() will not complete until the network is enabled." There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
+ " acknowledged by server will not complete until network is enabled."); | ||
} | ||
|
||
final TaskCompletionSource<Void> source = new TaskCompletionSource<>(); | ||
asyncQueue.enqueueAndForget(() -> syncEngine.registerPendingWritesTask(source)); | ||
return source.getTask(); | ||
} | ||
|
||
private void initialize(Context context, User user, boolean usePersistence, long cacheSizeBytes) { | ||
// Note: The initialization work must all be synchronous (we can't dispatch more work) since | ||
// external write/listen operations could get queued to run before that subsequent work | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ | |
import com.google.android.gms.tasks.TaskCompletionSource; | ||
import com.google.android.gms.tasks.Tasks; | ||
import com.google.common.base.Function; | ||
import com.google.common.collect.Lists; | ||
import com.google.firebase.database.collection.ImmutableSortedMap; | ||
import com.google.firebase.database.collection.ImmutableSortedSet; | ||
import com.google.firebase.firestore.FirebaseFirestoreException; | ||
|
@@ -133,6 +134,9 @@ interface SyncEngineCallback { | |
/** Stores user completion blocks, indexed by user and batch ID. */ | ||
private final Map<User, Map<Integer, TaskCompletionSource<Void>>> mutationUserCallbacks; | ||
|
||
/** Stores user callbacks waiting for all pending writes to be acknowledged. */ | ||
private final Map<Integer, List<TaskCompletionSource<Void>>> pendingWritesCallbacks; | ||
|
||
/** Used for creating the target IDs for the listens used to resolve limbo documents. */ | ||
private final TargetIdGenerator targetIdGenerator; | ||
|
||
|
@@ -154,6 +158,8 @@ public SyncEngine(LocalStore localStore, RemoteStore remoteStore, User initialUs | |
mutationUserCallbacks = new HashMap<>(); | ||
targetIdGenerator = TargetIdGenerator.forSyncEngine(); | ||
currentUser = initialUser; | ||
|
||
pendingWritesCallbacks = new HashMap<>(); | ||
} | ||
|
||
public void setCallback(SyncEngineCallback callback) { | ||
|
@@ -407,6 +413,8 @@ public void handleSuccessfulWrite(MutationBatchResult mutationBatchResult) { | |
// they consistently happen before listen events. | ||
notifyUser(mutationBatchResult.getBatch().getBatchId(), /*status=*/ null); | ||
|
||
resolveTasksAwaitingForPendingWritesIfAny(mutationBatchResult.getBatch().getBatchId()); | ||
|
||
ImmutableSortedMap<DocumentKey, MaybeDocument> changes = | ||
localStore.acknowledgeBatch(mutationBatchResult); | ||
|
||
|
@@ -427,9 +435,41 @@ public void handleRejectedWrite(int batchId, Status status) { | |
// they consistently happen before listen events. | ||
notifyUser(batchId, status); | ||
|
||
resolveTasksAwaitingForPendingWritesIfAny(batchId); | ||
|
||
emitNewSnapsAndNotifyLocalStore(changes, /*remoteEvent=*/ null); | ||
} | ||
|
||
/** | ||
* Takes a snapshot of current local mutation queue, and register a user task which will resolve | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: remove "local" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
* when all those mutations are either accepted or rejected by the server. | ||
*/ | ||
public void registerPendingWritesTask(TaskCompletionSource<Void> userTask) { | ||
int largestPendingBatchId = localStore.getHighestUnacknowledgedBatchId(); | ||
|
||
if (largestPendingBatchId == 0) { | ||
// Complete the task right away if there is no pending writes at the moment. | ||
userTask.setResult(null); | ||
} | ||
|
||
if (pendingWritesCallbacks.containsKey(largestPendingBatchId)) { | ||
pendingWritesCallbacks.get(largestPendingBatchId).add(userTask); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you don't need a multi-map for this case. You should be able to do the following:
Note that this introduces a dependency on other listeners that a user might already have added to the task, but that is likely fine. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMHO., i found this harder to understand, and i don't immediately see benefits of chaining tasks this way. I can be wrong though, but I'd like to see if more people find it better this way. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's fine to stick with the multi-map. |
||
} else { | ||
pendingWritesCallbacks.put(largestPendingBatchId, Lists.newArrayList(userTask)); | ||
This conversation was marked as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
/** Resolves tasks waiting for this batch id to get acknowledged by server, if there is any. */ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/is/are/ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
private void resolveTasksAwaitingForPendingWritesIfAny(int batchId) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: This is a very Objective-C name. Have you considered making this just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
if (pendingWritesCallbacks.containsKey(batchId)) { | ||
for (TaskCompletionSource<Void> task : pendingWritesCallbacks.get(batchId)) { | ||
task.setResult(null); | ||
} | ||
|
||
pendingWritesCallbacks.remove(batchId); | ||
} | ||
} | ||
|
||
/** Resolves the task corresponding to this write result. */ | ||
private void notifyUser(int batchId, @Nullable Status status) { | ||
Map<Integer, TaskCompletionSource<Void>> userTasks = mutationUserCallbacks.get(currentUser); | ||
|
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -282,6 +282,14 @@ public ImmutableSortedMap<DocumentKey, MaybeDocument> rejectBatch(int batchId) { | |||
}); | ||||
} | ||||
|
||||
/** | ||||
* Returns the largest (latest) batch id in mutation queue that is pending server response. | ||||
* Returns 0 if the queue is empty. | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we return -1 to clearly differentiate an empty queue and the case where we are writing for the first write? Returning Line 41 in 69f29d4
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When would you need this information, when is checking if it's -1 or 0 helpful? If we don't know a case when this will be useful, i'd like to keep the abstraction simple. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are checking for this constant in If you use -1 (or rather the existing concept of MutationBatch.UNKNOWN) it is apparent that this is a special sentinel value. -1 indicates right away that it is not a valid batch ID. 0, on the other hand, could very well be a valid batch ID. It is not obvious that we decided (or rather that Proto3 decided for us) that the first valid batch ID should be 1. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. Done. |
||||
*/ | ||||
public int getHighestUnacknowledgedBatchId() { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would advocate for being lazy here and name this consistently between the mutation batch API and the local store API. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||
return mutationQueue.getLargestUnacknowledgedBatchId(); | ||||
} | ||||
|
||||
/** Returns the last recorded stream token for the current user. */ | ||||
public ByteString getLastStreamToken() { | ||||
return mutationQueue.getLastStreamToken(); | ||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -187,6 +187,11 @@ public MutationBatch getNextMutationBatchAfterBatchId(int batchId) { | |
return queue.size() > index ? queue.get(index) : null; | ||
} | ||
|
||
@Override | ||
public int getLargestUnacknowledgedBatchId() { | ||
return queue.size() == 0 ? 0 : nextBatchId - 1; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Google style guide generally recommends using isEmpty(), since it always runs in O(1). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
} | ||
|
||
@Override | ||
public List<MutationBatch> getAllMutationBatches() { | ||
return Collections.unmodifiableList(queue); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -249,6 +249,16 @@ public MutationBatch getNextMutationBatchAfterBatchId(int batchId) { | |
.firstValue(row -> decodeInlineMutationBatch(row.getInt(0), row.getBlob(1))); | ||
} | ||
|
||
@Override | ||
public int getLargestUnacknowledgedBatchId() { | ||
if (isEmpty()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: In case the mutation queue is not empty, you are running a very similar query twice. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
return 0; | ||
} | ||
return db.query("SELECT MAX(batch_id) FROM mutations " + "WHERE uid = ?") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: s/" + "// There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
.binding(uid) | ||
.firstValue(row -> row.getInt(0)); | ||
} | ||
|
||
@Override | ||
public List<MutationBatch> getAllMutationBatches() { | ||
List<MutationBatch> result = new ArrayList<>(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1143,4 +1143,21 @@ public void testHandlesPatchMutationWithTransformThenRemoteEvent() { | |
assertChanged(doc("foo/bar", 1, map("sum", 1), Document.DocumentState.LOCAL_MUTATIONS)); | ||
assertContains(doc("foo/bar", 1, map("sum", 1), Document.DocumentState.LOCAL_MUTATIONS)); | ||
} | ||
|
||
@Test | ||
public void testGetHighestUnacknowledgedBatchIdReturnsExpectedResult() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Maybe remove There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
assertEquals(0, localStore.getHighestUnacknowledgedBatchId()); | ||
|
||
writeMutation(setMutation("foo/bar", map("abc", 123))); | ||
assertEquals(1, localStore.getHighestUnacknowledgedBatchId()); | ||
|
||
writeMutation(patchMutation("foo/bar", map("abc", 321))); | ||
assertEquals(2, localStore.getHighestUnacknowledgedBatchId()); | ||
|
||
acknowledgeMutation(1); | ||
assertEquals(2, localStore.getHighestUnacknowledgedBatchId()); | ||
|
||
rejectMutation(); | ||
assertEquals(0, localStore.getHighestUnacknowledgedBatchId()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically, you don't need to wait for
enableNetwork()
or the pending write.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.