From 3c82166c38b1a088a3170270051d7609ac1b8bb9 Mon Sep 17 00:00:00 2001 From: Matthew Robertson Date: Wed, 31 Jul 2024 13:26:28 -0400 Subject: [PATCH 1/4] Add method to submit a task with a continuation to worker --- .../internal/CrashlyticsWorkerTest.java | 99 +++++++++++++++++++ .../internal/CrashlyticsWorker.java | 30 ++++++ 2 files changed, 129 insertions(+) diff --git a/firebase-crashlytics/src/androidTest/java/com/google/firebase/crashlytics/internal/CrashlyticsWorkerTest.java b/firebase-crashlytics/src/androidTest/java/com/google/firebase/crashlytics/internal/CrashlyticsWorkerTest.java index a64b20d5f56..fbfdac2f64b 100644 --- a/firebase-crashlytics/src/androidTest/java/com/google/firebase/crashlytics/internal/CrashlyticsWorkerTest.java +++ b/firebase-crashlytics/src/androidTest/java/com/google/firebase/crashlytics/internal/CrashlyticsWorkerTest.java @@ -421,6 +421,105 @@ public void submitTaskWhenThreadPoolFull() { assertThrows(TimeoutException.class, () -> Tasks.await(task, 30, TimeUnit.MILLISECONDS)); } + @Test + public void submitTaskThatReturnsWithContinuation() throws Exception { + Task result = + crashlyticsWorker.submitTask( + () -> Tasks.forResult(1337), + task -> Tasks.forResult(Integer.toString(task.getResult()))); + + assertThat(Tasks.await(result)).isEqualTo("1337"); + } + + @Test + public void submitTaskThatThrowsWithContinuation() throws Exception { + Task result = + crashlyticsWorker.submitTask( + () -> Tasks.forException(new IndexOutOfBoundsException("Sometimes we look too far.")), + task -> { + if (task.getException() != null) { + return Tasks.forResult("Task threw."); + } + return Tasks.forResult("I dunno how I got here?"); + }); + + assertThat(Tasks.await(result)).isEqualTo("Task threw."); + } + + @Test + public void submitTaskWithContinuationThatThrows() throws Exception { + Task result = + crashlyticsWorker.submitTask( + () -> Tasks.forResult(7), task -> Tasks.forException(new IOException())); + + ExecutionException thrown = assertThrows(ExecutionException.class, () -> Tasks.await(result)); + + assertThat(thrown).hasCauseThat().isInstanceOf(IOException.class); + + // Verify the worker still executes tasks after the continuation threw. + assertThat(Tasks.await(crashlyticsWorker.submit(() -> 42))).isEqualTo(42); + } + + @Test + public void submitTaskThatCancelsWithContinuation() throws Exception { + Task result = + crashlyticsWorker.submitTask( + Tasks::forCanceled, + task -> Tasks.forResult(task.isCanceled() ? "Task cancelled." : "What?")); + + assertThat(Tasks.await(result)).isEqualTo("Task cancelled."); + } + + @Test + public void submitTaskWithContinuationThatCancels() throws Exception { + Task result = + crashlyticsWorker.submitTask(() -> Tasks.forResult(7), task -> Tasks.forCanceled()); + + assertThrows(CancellationException.class, () -> Tasks.await(result)); + + // Verify the worker still executes tasks after the continuation was cancelled. + assertThat(Tasks.await(crashlyticsWorker.submit(() -> "jk"))).isEqualTo("jk"); + } + + @Test + public void submitTaskWithContinuationExecutesInOrder() throws Exception { + // The integers added to the list represent the order they should be executed in. + List list = new ArrayList<>(); + + // Start the chain which adds 1, then kicks off tasks to add 6 & 7 later, but adds 2 before + // executing the newly added tasks in the continuation. + crashlyticsWorker.submitTask( + () -> { + list.add(1); + + // Sleep to give time for the tasks 3, 4, 5 to be submitted. + sleep(300); + + // We added the 1 and will add 2 in the continuation. And 3, 4, 5 have been submitted. + crashlyticsWorker.submit(() -> list.add(6)); + crashlyticsWorker.submit(() -> list.add(7)); + + return Tasks.forResult(1); + }, + task -> { + // When the task 1 completes the next number to add is 2. Because all the other tasks are + // just submitted, not executed yet. + list.add(2); + return Tasks.forResult("a"); + }); + + // Submit tasks to add 3, 4, 5 since we just added 1 and know a continuation will add the 2. + crashlyticsWorker.submit(() -> list.add(3)); + crashlyticsWorker.submit(() -> list.add(4)); + crashlyticsWorker.submit(() -> list.add(5)); + + crashlyticsWorker.await(); + + // Verify the list is complete and in order. + assertThat(list).isInOrder(); + assertThat(list).hasSize(7); + } + private static void sleep(long millis) { try { Thread.sleep(millis); diff --git a/firebase-crashlytics/src/main/java/com/google/firebase/crashlytics/internal/CrashlyticsWorker.java b/firebase-crashlytics/src/main/java/com/google/firebase/crashlytics/internal/CrashlyticsWorker.java index e48fba16c48..ebc4c049a64 100644 --- a/firebase-crashlytics/src/main/java/com/google/firebase/crashlytics/internal/CrashlyticsWorker.java +++ b/firebase-crashlytics/src/main/java/com/google/firebase/crashlytics/internal/CrashlyticsWorker.java @@ -17,8 +17,10 @@ package com.google.firebase.crashlytics.internal; import androidx.annotation.VisibleForTesting; +import com.google.android.gms.tasks.Continuation; import com.google.android.gms.tasks.Task; import com.google.android.gms.tasks.Tasks; +import com.google.errorprone.annotations.CanIgnoreReturnValue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -60,6 +62,7 @@ public ExecutorService getExecutor() { *

Returns a Task which will be resolved upon successful completion of the * callable, or throws an ExecutionException if the callable throws an exception. */ + @CanIgnoreReturnValue public Task submit(Callable callable) { synchronized (tailLock) { // Do not propagate a cancellation. @@ -79,6 +82,7 @@ public Task submit(Callable callable) { *

Returns a Task which will be resolved with null upon successful completion of * the runnable, or throws an ExecutionException if the runnable throws an exception. */ + @CanIgnoreReturnValue public Task submit(Runnable runnable) { synchronized (tailLock) { // Do not propagate a cancellation. @@ -108,6 +112,7 @@ public Task submit(Runnable runnable) { * returned by the callable, throws an ExecutionException if the callable throws an * exception, or throws a CancellationException if the task is cancelled. */ + @CanIgnoreReturnValue public Task submitTask(Callable> callable) { synchronized (tailLock) { // Chain the new callable task onto the queue's tail, regardless of cancellation. @@ -117,6 +122,31 @@ public Task submitTask(Callable> callable) { } } + /** + * Submits a Callable Task followed by a Continuation for + * asynchronous execution on the executor. + * + *

This is useful for submitting a task that must be immediately followed by another task, + * regardless of more tasks being submitted in parallel. For example, settings. + * + *

Returns a Task which will be resolved upon successful completion of the Task + * returned by the callable and continued by the continuation, throws an ExecutionException + * if either task throws an exception, or throws a CancellationException if + * either task is cancelled. + */ + @CanIgnoreReturnValue + public Task submitTask( + Callable> callable, Continuation> continuation) { + synchronized (tailLock) { + // Chain the new callable task and continuation onto the queue's tail. + Task result = + tail.continueWithTask(executor, task -> callable.call()) + .continueWithTask(executor, continuation); + tail = result; + return result; + } + } + /** * Blocks until all current pending tasks have completed, up to 30 seconds. Useful for testing. * From 44c63278ef1a22c34a129f7981811a26f6f8f5db Mon Sep 17 00:00:00 2001 From: Matthew Robertson Date: Wed, 31 Jul 2024 14:52:56 -0400 Subject: [PATCH 2/4] Add method to race tasks --- .../internal/CrashlyticsWorkerTest.java | 144 ++++++++++++++++++ .../internal/CrashlyticsWorker.java | 32 ++++ 2 files changed, 176 insertions(+) diff --git a/firebase-crashlytics/src/androidTest/java/com/google/firebase/crashlytics/internal/CrashlyticsWorkerTest.java b/firebase-crashlytics/src/androidTest/java/com/google/firebase/crashlytics/internal/CrashlyticsWorkerTest.java index fbfdac2f64b..aee8f91fc9c 100644 --- a/firebase-crashlytics/src/androidTest/java/com/google/firebase/crashlytics/internal/CrashlyticsWorkerTest.java +++ b/firebase-crashlytics/src/androidTest/java/com/google/firebase/crashlytics/internal/CrashlyticsWorkerTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertThrows; import com.google.android.gms.tasks.Task; +import com.google.android.gms.tasks.TaskCompletionSource; import com.google.android.gms.tasks.Tasks; import com.google.firebase.concurrent.TestOnlyExecutors; import java.io.IOException; @@ -520,6 +521,149 @@ public void submitTaskWithContinuationExecutesInOrder() throws Exception { assertThat(list).hasSize(7); } + @Test + public void raceReturnsFirstResult() throws Exception { + // Create 2 tasks on different workers to race. + Task task1 = + new CrashlyticsWorker(TestOnlyExecutors.background()) + .submit( + () -> { + sleep(200); + return "first"; + }); + Task task2 = + new CrashlyticsWorker(TestOnlyExecutors.background()) + .submit( + () -> { + sleep(400); + return "slow"; + }); + + Task task = crashlyticsWorker.race(task1, task2); + String result = Tasks.await(task); + + assertThat(result).isEqualTo("first"); + } + + @Test + public void raceReturnsFirstException() { + // Create 2 tasks on different workers to race. + Task task1 = + new CrashlyticsWorker(TestOnlyExecutors.background()) + .submitTask( + () -> { + sleep(200); + return Tasks.forException(new ArithmeticException()); + }); + Task task2 = + new CrashlyticsWorker(TestOnlyExecutors.background()) + .submitTask( + () -> { + sleep(400); + return Tasks.forException(new IllegalStateException()); + }); + + Task task = crashlyticsWorker.race(task1, task2); + ExecutionException thrown = assertThrows(ExecutionException.class, () -> Tasks.await(task)); + + // The first task throws an ArithmeticException. + assertThat(thrown).hasCauseThat().isInstanceOf(ArithmeticException.class); + } + + @Test + public void raceFirstCancelsReturnsSecondResult() throws Exception { + // Create 2 tasks on different workers to race. + Task task1 = + new CrashlyticsWorker(TestOnlyExecutors.background()) + .submitTask( + () -> { + sleep(200); + return Tasks.forCanceled(); + }); + Task task2 = + new CrashlyticsWorker(TestOnlyExecutors.background()) + .submitTask( + () -> { + sleep(400); + return Tasks.forResult("I am slow but didn't cancel."); + }); + + Task task = crashlyticsWorker.race(task1, task2); + String result = Tasks.await(task); + + assertThat(result).isEqualTo("I am slow but didn't cancel."); + } + + @Test + public void raceBothCancel() { + // Create 2 tasks on different workers to race. + Task task1 = + new CrashlyticsWorker(TestOnlyExecutors.background()) + .submitTask( + () -> { + sleep(200); + return Tasks.forCanceled(); + }); + Task task2 = + new CrashlyticsWorker(TestOnlyExecutors.background()) + .submitTask( + () -> { + sleep(400); + return Tasks.forCanceled(); + }); + + Task task = crashlyticsWorker.race(task1, task2); + + // Both cancelled, so cancel the race result. + assertThrows(CancellationException.class, () -> Tasks.await(task)); + } + + @Test + public void raceTasksOnSameWorker() throws Exception { + // Create 2 tasks on this worker to race. + Task task1 = crashlyticsWorker.submit(() -> "first"); + Task task2 = crashlyticsWorker.submit(() -> "second"); + + Task task = crashlyticsWorker.race(task1, task2); + String result = Tasks.await(task); + + // The first task is submitted to this worker first, so will always be first. + assertThat(result).isEqualTo("first"); + } + + @Test + public void raceTaskOneOnSameWorkerAnotherNeverCompletes() throws Exception { + // Create a task on this worker, and another that never completes, to race. + Task task1 = crashlyticsWorker.submit(() -> "first"); + Task task2 = new TaskCompletionSource().getTask(); + + Task task = crashlyticsWorker.race(task1, task2); + String result = Tasks.await(task); + + assertThat(result).isEqualTo("first"); + } + + @Test + public void raceTaskOneOnSameWorkerAnotherOtherThatCompletesFirst() throws Exception { + // Add a decoy task to the worker to take up some time. + crashlyticsWorker.submitTask( + () -> { + sleep(200); + return Tasks.forResult(null); + }); + + // Create a task on this worker, and another, to race. + Task task1 = crashlyticsWorker.submit(() -> "same worker"); + TaskCompletionSource task2 = new TaskCompletionSource<>(); + task2.trySetResult("other"); + + Task task = crashlyticsWorker.race(task1, task2.getTask()); + String result = Tasks.await(task); + + // The other tasks completes first because the first task is queued up later on the worker. + assertThat(result).isEqualTo("other"); + } + private static void sleep(long millis) { try { Thread.sleep(millis); diff --git a/firebase-crashlytics/src/main/java/com/google/firebase/crashlytics/internal/CrashlyticsWorker.java b/firebase-crashlytics/src/main/java/com/google/firebase/crashlytics/internal/CrashlyticsWorker.java index ebc4c049a64..b4380041a21 100644 --- a/firebase-crashlytics/src/main/java/com/google/firebase/crashlytics/internal/CrashlyticsWorker.java +++ b/firebase-crashlytics/src/main/java/com/google/firebase/crashlytics/internal/CrashlyticsWorker.java @@ -17,8 +17,10 @@ package com.google.firebase.crashlytics.internal; import androidx.annotation.VisibleForTesting; +import com.google.android.gms.tasks.CancellationTokenSource; import com.google.android.gms.tasks.Continuation; import com.google.android.gms.tasks.Task; +import com.google.android.gms.tasks.TaskCompletionSource; import com.google.android.gms.tasks.Tasks; import com.google.errorprone.annotations.CanIgnoreReturnValue; import java.util.concurrent.Callable; @@ -26,6 +28,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; /** * Helper for executing tasks sequentially on the given executor service. @@ -147,6 +150,35 @@ public Task submitTask( } } + /** + * Returns a task that is resolved when either of the given tasks is resolved. + * + *

When both tasks are cancelled, the returned task will be cancelled. + */ + public Task race(Task task1, Task task2) { + CancellationTokenSource cancellation = new CancellationTokenSource(); + TaskCompletionSource result = new TaskCompletionSource<>(cancellation.getToken()); + + AtomicBoolean otherTaskCancelled = new AtomicBoolean(false); + + Continuation> continuation = + task -> { + if (task.isSuccessful()) { + result.trySetResult(task.getResult()); + } else if (task.getException() != null) { + result.trySetException(task.getException()); + } else if (otherTaskCancelled.getAndSet(true)) { + cancellation.cancel(); + } + return Tasks.forResult(null); + }; + + task1.continueWithTask(executor, continuation); + task2.continueWithTask(executor, continuation); + + return result.getTask(); + } + /** * Blocks until all current pending tasks have completed, up to 30 seconds. Useful for testing. * From 007d7398aa7f04bd7bd85fa4c89e8e4be3e076a3 Mon Sep 17 00:00:00 2001 From: Matthew Robertson Date: Wed, 31 Jul 2024 15:09:13 -0400 Subject: [PATCH 3/4] Fix test --- .../internal/CrashlyticsWorkerTest.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/firebase-crashlytics/src/androidTest/java/com/google/firebase/crashlytics/internal/CrashlyticsWorkerTest.java b/firebase-crashlytics/src/androidTest/java/com/google/firebase/crashlytics/internal/CrashlyticsWorkerTest.java index aee8f91fc9c..1f102d57f26 100644 --- a/firebase-crashlytics/src/androidTest/java/com/google/firebase/crashlytics/internal/CrashlyticsWorkerTest.java +++ b/firebase-crashlytics/src/androidTest/java/com/google/firebase/crashlytics/internal/CrashlyticsWorkerTest.java @@ -621,8 +621,18 @@ public void raceBothCancel() { @Test public void raceTasksOnSameWorker() throws Exception { // Create 2 tasks on this worker to race. - Task task1 = crashlyticsWorker.submit(() -> "first"); - Task task2 = crashlyticsWorker.submit(() -> "second"); + Task task1 = + crashlyticsWorker.submit( + () -> { + sleep(200); + return "first"; + }); + Task task2 = + crashlyticsWorker.submit( + () -> { + sleep(300); + return "second"; + }); Task task = crashlyticsWorker.race(task1, task2); String result = Tasks.await(task); From a9ab00e1c1b43c60bd72198a3154f15a48f7d0a4 Mon Sep 17 00:00:00 2001 From: Matthew Robertson Date: Wed, 31 Jul 2024 15:40:19 -0400 Subject: [PATCH 4/4] Javadoc --- .../firebase/crashlytics/internal/CrashlyticsWorker.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/firebase-crashlytics/src/main/java/com/google/firebase/crashlytics/internal/CrashlyticsWorker.java b/firebase-crashlytics/src/main/java/com/google/firebase/crashlytics/internal/CrashlyticsWorker.java index b4380041a21..7714bd92adb 100644 --- a/firebase-crashlytics/src/main/java/com/google/firebase/crashlytics/internal/CrashlyticsWorker.java +++ b/firebase-crashlytics/src/main/java/com/google/firebase/crashlytics/internal/CrashlyticsWorker.java @@ -62,6 +62,8 @@ public ExecutorService getExecutor() { /** * Submits a Callable task for asynchronous execution on the executor. * + *

A blocking callable will block an underlying thread. + * *

Returns a Task which will be resolved upon successful completion of the * callable, or throws an ExecutionException if the callable throws an exception. */ @@ -82,6 +84,8 @@ public Task submit(Callable callable) { /** * Submits a Runnable task for asynchronous execution on the executor. * + *

A blocking runnable will block an underlying thread. + * *

Returns a Task which will be resolved with null upon successful completion of * the runnable, or throws an ExecutionException if the runnable throws an exception. */