Skip to content

awaitPendingWrites initial revision #689

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
7 commits merged into from
Aug 8, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static com.google.firebase.firestore.AccessHelper.getAsyncQueue;
import static com.google.firebase.firestore.testutil.IntegrationTestUtil.newTestSettings;
import static com.google.firebase.firestore.testutil.IntegrationTestUtil.provider;
import static com.google.firebase.firestore.testutil.IntegrationTestUtil.testChangeUserTo;
import static com.google.firebase.firestore.testutil.IntegrationTestUtil.testCollection;
import static com.google.firebase.firestore.testutil.IntegrationTestUtil.testCollectionWithDocs;
import static com.google.firebase.firestore.testutil.IntegrationTestUtil.testDocument;
Expand Down Expand Up @@ -44,6 +45,7 @@
import com.google.firebase.Timestamp;
import com.google.firebase.firestore.FirebaseFirestoreException.Code;
import com.google.firebase.firestore.Query.Direction;
import com.google.firebase.firestore.auth.User;
import com.google.firebase.firestore.testutil.EventAccumulator;
import com.google.firebase.firestore.testutil.IntegrationTestUtil;
import com.google.firebase.firestore.util.AsyncQueue.TimerId;
Expand Down Expand Up @@ -1096,4 +1098,48 @@ public void testShutdownCalledMultipleTimes() {

expectError(() -> waitFor(reference.get()), expectedMessage);
}

@Test
public void testWaitForPendingWritesResolves() {
DocumentReference documentReference = testCollection("abc").document("123");
FirebaseFirestore firestore = documentReference.getFirestore();
Map<String, Object> data = map("foo", "bar");

waitFor(firestore.disableNetwork());
Task<Void> awaitsPendingWrites1 = firestore.waitForPendingWrites();
Task<Void> pendingWrite = documentReference.set(data);
Task<Void> awaitsPendingWrites2 = firestore.waitForPendingWrites();

// `awaitsPendingWrites1` completes immediately because there are no pending writes at
// the time it is created.
waitFor(awaitsPendingWrites1);
assertTrue(awaitsPendingWrites1.isComplete() && awaitsPendingWrites1.isSuccessful());
assertTrue(!pendingWrite.isComplete());
assertTrue(!awaitsPendingWrites2.isComplete());

firestore.enableNetwork();
waitFor(awaitsPendingWrites2);
assertTrue(awaitsPendingWrites2.isComplete() && awaitsPendingWrites2.isSuccessful());
}

@Test
public void testWaitForPendingWritesFailsWhenUserChanges() {
DocumentReference documentReference = testCollection("abc").document("123");
FirebaseFirestore firestore = documentReference.getFirestore();
Map<String, Object> data = map("foo", "bar");

// Prevent pending writes receiving acknowledgement.
waitFor(firestore.disableNetwork());
Task<Void> pendingWrite = documentReference.set(data);
Task<Void> awaitsPendingWrites = firestore.waitForPendingWrites();
assertTrue(!pendingWrite.isComplete());
assertTrue(!awaitsPendingWrites.isComplete());

testChangeUserTo(new User("new user"));

assertTrue(!pendingWrite.isComplete());
assertEquals(
"'waitForPendingWrites' task is cancelled due to User change.",
waitForException(awaitsPendingWrites).getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@
import com.google.firebase.firestore.MetadataChanges;
import com.google.firebase.firestore.QuerySnapshot;
import com.google.firebase.firestore.auth.EmptyCredentialsProvider;
import com.google.firebase.firestore.auth.User;
import com.google.firebase.firestore.core.DatabaseInfo;
import com.google.firebase.firestore.local.Persistence;
import com.google.firebase.firestore.model.DatabaseId;
import com.google.firebase.firestore.testutil.provider.FirestoreProvider;
import com.google.firebase.firestore.util.AsyncQueue;
import com.google.firebase.firestore.util.Listener;
import com.google.firebase.firestore.util.Logger;
import com.google.firebase.firestore.util.Logger.Level;
import java.util.ArrayList;
Expand All @@ -53,6 +55,34 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

class MockCredentialsProvider extends EmptyCredentialsProvider {

private static MockCredentialsProvider instance;

public static MockCredentialsProvider instance() {
if (MockCredentialsProvider.instance == null) {
MockCredentialsProvider.instance = new MockCredentialsProvider();
}
return MockCredentialsProvider.instance;
}

private MockCredentialsProvider() {
super();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This super() call is not needed, is it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not.

}

@Override
public void setChangeListener(Listener<User> changeListener) {
super.setChangeListener(changeListener);
this.listener = changeListener;
}

public void changeUserTo(User user) {
listener.onValue(user);
}

private Listener<User> listener;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Move to top of class.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

/** A set of helper methods for tests */
public class IntegrationTestUtil {

Expand Down Expand Up @@ -239,7 +269,7 @@ public static FirebaseFirestore testFirestore(
context,
databaseId,
persistenceKey,
new EmptyCredentialsProvider(),
MockCredentialsProvider.instance(),
asyncQueue,
/*firebaseApp=*/ null,
/*instanceRegistry=*/ (dbId) -> {});
Expand Down Expand Up @@ -409,4 +439,8 @@ public static Map<String, Object> toDataMap(QuerySnapshot qrySnap) {
public static boolean isRunningAgainstEmulator() {
return CONNECT_TO_EMULATOR;
}

public static void testChangeUserTo(User user) {
MockCredentialsProvider.instance().changeUserTo(user);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,16 @@ Task<Void> shutdown() {
return shutdownInternal();
}

/**
* Wait until all pending writes existed at the time of calling are sent to the backend.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bunch of comments on this:

  • Phrase comments with an implicit "this method" at the beginning
  • missing "that" between "writes existed"
  • This doesn't just wait until writes are sent, but also waits for them to complete

[This method ...] Waits until all pending writes that existed at the time of calling are have been successfully written to the server. The task will will not resolve while the client is offline.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

*
* @return A {@link Task} which resolves when all pending writes are sent to the backend. If there
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't @link items that are already in the signature of the method. Use @link for anything related that's not part of the signature.

Use {@code Task} to highlight that it's a type without the extra visual noise that comes from an actual link.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* is a Firebase user change, the return {@link Task} will resolve to an exception.
*/
Task<Void> waitForPendingWrites() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think we should move this comment to a doc (it could be the API proposal doc) and do some collaborative editing:

  • pending writes that existed at the time of calling could be clearer.
  • written to the server is an unusual way of phrasing the right thing.
  • I think we should move the comment on the user change to a separate paragraph and move it out of the @return section
  • Tasks on Android don't resolve, they complete.

@mikelehen usually has a ton of good feedback on these comments, and his feedback tends to be much more valuable than what I can provide.

Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API proposal doc would be a good place to collaborate on this.

Note that the "written to the server" language is used in the iOS documentation already which is why I suggested it: https://github.com/firebase/firebase-ios-sdk/blob/master/Firestore/Source/Public/FIRDocumentReference.h#L117. I'm definitely open to finding a better way to succinctly describe this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Sebastian's version now, looks great.

return client.waitForPendingWrites();
}

@VisibleForTesting
AsyncQueue getAsyncQueue() {
return asyncQueue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,18 @@ public <TResult> Task<TResult> transaction(
() -> syncEngine.transaction(asyncQueue, updateFunction, retries));
}

/**
* Returns a task resolves when all the pending writes at the time when this method is called
* received server acknowledgement. An acknowledgement can be either acceptance or rejections.
*/
public Task<Void> waitForPendingWrites() {
this.verifyNotShutdown();

final TaskCompletionSource<Void> source = new TaskCompletionSource<>();
asyncQueue.enqueueAndForget(() -> syncEngine.registerPendingWritesTask(source));
return source.getTask();
}

private void initialize(Context context, User user, boolean usePersistence, long cacheSizeBytes) {
// Note: The initialization work must all be synchronous (we can't dispatch more work) since
// external write/listen operations could get queued to run before that subsequent work
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.android.gms.tasks.TaskCompletionSource;
import com.google.android.gms.tasks.Tasks;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.firebase.database.collection.ImmutableSortedMap;
import com.google.firebase.database.collection.ImmutableSortedSet;
import com.google.firebase.firestore.FirebaseFirestoreException;
Expand All @@ -39,6 +40,7 @@
import com.google.firebase.firestore.model.NoDocument;
import com.google.firebase.firestore.model.SnapshotVersion;
import com.google.firebase.firestore.model.mutation.Mutation;
import com.google.firebase.firestore.model.mutation.MutationBatch;
import com.google.firebase.firestore.model.mutation.MutationBatchResult;
import com.google.firebase.firestore.remote.Datastore;
import com.google.firebase.firestore.remote.RemoteEvent;
Expand Down Expand Up @@ -133,6 +135,9 @@ interface SyncEngineCallback {
/** Stores user completion blocks, indexed by user and batch ID. */
private final Map<User, Map<Integer, TaskCompletionSource<Void>>> mutationUserCallbacks;

/** Stores user callbacks waiting for all pending writes to be acknowledged. */
private final Map<Integer, List<TaskCompletionSource<Void>>> pendingWritesCallbacks;

/** Used for creating the target IDs for the listens used to resolve limbo documents. */
private final TargetIdGenerator targetIdGenerator;

Expand All @@ -154,6 +159,8 @@ public SyncEngine(LocalStore localStore, RemoteStore remoteStore, User initialUs
mutationUserCallbacks = new HashMap<>();
targetIdGenerator = TargetIdGenerator.forSyncEngine();
currentUser = initialUser;

pendingWritesCallbacks = new HashMap<>();
}

public void setCallback(SyncEngineCallback callback) {
Expand Down Expand Up @@ -407,6 +414,8 @@ public void handleSuccessfulWrite(MutationBatchResult mutationBatchResult) {
// they consistently happen before listen events.
notifyUser(mutationBatchResult.getBatch().getBatchId(), /*status=*/ null);

resolvePendingWriteTasks(mutationBatchResult.getBatch().getBatchId());

ImmutableSortedMap<DocumentKey, MaybeDocument> changes =
localStore.acknowledgeBatch(mutationBatchResult);

Expand All @@ -427,9 +436,63 @@ public void handleRejectedWrite(int batchId, Status status) {
// they consistently happen before listen events.
notifyUser(batchId, status);

resolvePendingWriteTasks(batchId);

emitNewSnapsAndNotifyLocalStore(changes, /*remoteEvent=*/ null);
}

/**
* Takes a snapshot of current mutation queue, and register a user task which will resolve when
* all those mutations are either accepted or rejected by the server.
*/
public void registerPendingWritesTask(TaskCompletionSource<Void> userTask) {
if (!remoteStore.canUseNetwork()) {
Logger.debug(
TAG,
"The network is disabled. The task returned by 'awaitPendingWrites()' will not"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super nit: We usually leave the space at the end of the line. This helps to align comments with more than two lines.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

+ " complete until the network is enabled.");
}

int largestPendingBatchId = localStore.getHighestUnacknowledgedBatchId();

if (largestPendingBatchId == MutationBatch.UNKNOWN) {
// Complete the task right away if there is no pending writes at the moment.
userTask.setResult(null);
return;
}

if (pendingWritesCallbacks.containsKey(largestPendingBatchId)) {
pendingWritesCallbacks.get(largestPendingBatchId).add(userTask);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you don't need a multi-map for this case. You should be able to do the following:

pendingWritesCallbacks.get(largestPendingBatchId).getTask().onCompleteListener(() -> userTask.getResult())

Note that this introduces a dependency on other listeners that a user might already have added to the task, but that is likely fine.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO., i found this harder to understand, and i don't immediately see benefits of chaining tasks this way.

I can be wrong though, but I'd like to see if more people find it better this way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fine to stick with the multi-map.

} else {
pendingWritesCallbacks.put(largestPendingBatchId, Lists.newArrayList(userTask));
}
}

/** Resolves tasks waiting for this batch id to get acknowledged by server, if there is any. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/is/are/

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

private void resolvePendingWriteTasks(int batchId) {
if (pendingWritesCallbacks.containsKey(batchId)) {
for (TaskCompletionSource<Void> task : pendingWritesCallbacks.get(batchId)) {
task.setResult(null);
}

pendingWritesCallbacks.remove(batchId);
}
}

private void failOutstandingPendingWritesAwaitingTasks() {
for (Map.Entry<Integer, List<TaskCompletionSource<Void>>> entry :
pendingWritesCallbacks.entrySet()) {
for (TaskCompletionSource<Void> task : entry.getValue()) {
task.setException(
new FirebaseFirestoreException(
"'waitForPendingWrites' task is cancelled due to User change.",
FirebaseFirestoreException.Code.CANCELLED));
}
}

pendingWritesCallbacks.clear();
}

/** Resolves the task corresponding to this write result. */
private void notifyUser(int batchId, @Nullable Status status) {
Map<Integer, TaskCompletionSource<Void>> userTasks = mutationUserCallbacks.get(currentUser);
Expand Down Expand Up @@ -562,6 +625,8 @@ public void handleCredentialChange(User user) {
currentUser = user;

if (userChanged) {
// Fails tasks waiting for pending writes requested by previous user.
failOutstandingPendingWritesAwaitingTasks();
// Notify local store and emit any resulting events from swapping out the mutation queue.
ImmutableSortedMap<DocumentKey, MaybeDocument> changes = localStore.handleUserChange(user);
emitNewSnapsAndNotifyLocalStore(changes, /*remoteEvent=*/ null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,14 @@ public ImmutableSortedMap<DocumentKey, MaybeDocument> rejectBatch(int batchId) {
});
}

/**
* Returns the largest (latest) batch id in mutation queue that is pending server response.
* Returns {@link MutationBatch#UNKNOWN} if the queue is empty.
*/
public int getHighestUnacknowledgedBatchId() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would advocate for being lazy here and name this consistently between the mutation batch API and the local store API.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return mutationQueue.getHighestUnacknowledgedBatchId();
}

/** Returns the last recorded stream token for the current user. */
public ByteString getLastStreamToken() {
return mutationQueue.getLastStreamToken();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ public MutationBatch getNextMutationBatchAfterBatchId(int batchId) {
return queue.size() > index ? queue.get(index) : null;
}

@Override
public int getHighestUnacknowledgedBatchId() {
return queue.isEmpty() ? MutationBatch.UNKNOWN : nextBatchId - 1;
}

@Override
public List<MutationBatch> getAllMutationBatches() {
return Collections.unmodifiableList(queue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ MutationBatch addMutationBatch(
@Nullable
MutationBatch getNextMutationBatchAfterBatchId(int batchId);

/**
* @return The largest (latest) batch id in mutation queue for the current user that is pending
* server response, {@link MutationBatch#UNKNOWN} if the queue is empty.
*/
int getHighestUnacknowledgedBatchId();

/** Returns all mutation batches in the mutation queue. */
// TODO: PERF: Current consumer only needs mutated keys; if we can provide that
// cheaply, we should replace this.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,13 @@ public MutationBatch getNextMutationBatchAfterBatchId(int batchId) {
.firstValue(row -> decodeInlineMutationBatch(row.getInt(0), row.getBlob(1)));
}

@Override
public int getHighestUnacknowledgedBatchId() {
return db.query("SELECT IFNULL(MAX(batch_id), ?) FROM mutations WHERE uid = ?")
.binding(MutationBatch.UNKNOWN, uid)
.firstValue(row -> row.getInt(0));
}

@Override
public List<MutationBatch> getAllMutationBatches() {
List<MutationBatch> result = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ private void handleWatchStreamClose(Status status) {
}
}

private boolean canUseNetwork() {
public boolean canUseNetwork() {
// PORTING NOTE: This method exists mostly because web also has to take into account primary
// vs. secondary state.
return networkEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1143,4 +1143,21 @@ public void testHandlesPatchMutationWithTransformThenRemoteEvent() {
assertChanged(doc("foo/bar", 1, map("sum", 1), Document.DocumentState.LOCAL_MUTATIONS));
assertContains(doc("foo/bar", 1, map("sum", 1), Document.DocumentState.LOCAL_MUTATIONS));
}

@Test
public void testGetHighestUnacknowledgedBatchId() {
assertEquals(-1, localStore.getHighestUnacknowledgedBatchId());

writeMutation(setMutation("foo/bar", map("abc", 123)));
assertEquals(1, localStore.getHighestUnacknowledgedBatchId());

writeMutation(patchMutation("foo/bar", map("abc", 321)));
assertEquals(2, localStore.getHighestUnacknowledgedBatchId());

acknowledgeMutation(1);
assertEquals(2, localStore.getHighestUnacknowledgedBatchId());

rejectMutation();
assertEquals(-1, localStore.getHighestUnacknowledgedBatchId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/-1/MutationBatch.UNKNOWN/

}
}