Skip to content

Commit 8b832df

Browse files
authored
Improvements to the CrashlyticsWorker (#6143)
Added a method to submit a task followed by a continuation to the worker. This is useful for making a continuation happen right after the submitted task, even if other tasks were submitted to the worker in the meantime. This will be useful for fetching settings from a network executor, then continuing on the common worker to parse the settings json. Also added a method to race two tasks on the worker. This is useful for things like waiting for an for explicit data collection enable call, or automatic data collection being enabled. Both methods have their behaviour fully documented in the javadoc and tested in unit tests.
1 parent d0fea0e commit 8b832df

File tree

2 files changed

+319
-0
lines changed

2 files changed

+319
-0
lines changed

firebase-crashlytics/src/androidTest/java/com/google/firebase/crashlytics/internal/CrashlyticsWorkerTest.java

Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.junit.Assert.assertThrows;
2121

2222
import com.google.android.gms.tasks.Task;
23+
import com.google.android.gms.tasks.TaskCompletionSource;
2324
import com.google.android.gms.tasks.Tasks;
2425
import com.google.firebase.concurrent.TestOnlyExecutors;
2526
import java.io.IOException;
@@ -421,6 +422,258 @@ public void submitTaskWhenThreadPoolFull() {
421422
assertThrows(TimeoutException.class, () -> Tasks.await(task, 30, TimeUnit.MILLISECONDS));
422423
}
423424

425+
@Test
426+
public void submitTaskThatReturnsWithContinuation() throws Exception {
427+
Task<String> result =
428+
crashlyticsWorker.submitTask(
429+
() -> Tasks.forResult(1337),
430+
task -> Tasks.forResult(Integer.toString(task.getResult())));
431+
432+
assertThat(Tasks.await(result)).isEqualTo("1337");
433+
}
434+
435+
@Test
436+
public void submitTaskThatThrowsWithContinuation() throws Exception {
437+
Task<String> result =
438+
crashlyticsWorker.submitTask(
439+
() -> Tasks.forException(new IndexOutOfBoundsException("Sometimes we look too far.")),
440+
task -> {
441+
if (task.getException() != null) {
442+
return Tasks.forResult("Task threw.");
443+
}
444+
return Tasks.forResult("I dunno how I got here?");
445+
});
446+
447+
assertThat(Tasks.await(result)).isEqualTo("Task threw.");
448+
}
449+
450+
@Test
451+
public void submitTaskWithContinuationThatThrows() throws Exception {
452+
Task<String> result =
453+
crashlyticsWorker.submitTask(
454+
() -> Tasks.forResult(7), task -> Tasks.forException(new IOException()));
455+
456+
ExecutionException thrown = assertThrows(ExecutionException.class, () -> Tasks.await(result));
457+
458+
assertThat(thrown).hasCauseThat().isInstanceOf(IOException.class);
459+
460+
// Verify the worker still executes tasks after the continuation threw.
461+
assertThat(Tasks.await(crashlyticsWorker.submit(() -> 42))).isEqualTo(42);
462+
}
463+
464+
@Test
465+
public void submitTaskThatCancelsWithContinuation() throws Exception {
466+
Task<String> result =
467+
crashlyticsWorker.submitTask(
468+
Tasks::forCanceled,
469+
task -> Tasks.forResult(task.isCanceled() ? "Task cancelled." : "What?"));
470+
471+
assertThat(Tasks.await(result)).isEqualTo("Task cancelled.");
472+
}
473+
474+
@Test
475+
public void submitTaskWithContinuationThatCancels() throws Exception {
476+
Task<String> result =
477+
crashlyticsWorker.submitTask(() -> Tasks.forResult(7), task -> Tasks.forCanceled());
478+
479+
assertThrows(CancellationException.class, () -> Tasks.await(result));
480+
481+
// Verify the worker still executes tasks after the continuation was cancelled.
482+
assertThat(Tasks.await(crashlyticsWorker.submit(() -> "jk"))).isEqualTo("jk");
483+
}
484+
485+
@Test
486+
public void submitTaskWithContinuationExecutesInOrder() throws Exception {
487+
// The integers added to the list represent the order they should be executed in.
488+
List<Integer> list = new ArrayList<>();
489+
490+
// Start the chain which adds 1, then kicks off tasks to add 6 & 7 later, but adds 2 before
491+
// executing the newly added tasks in the continuation.
492+
crashlyticsWorker.submitTask(
493+
() -> {
494+
list.add(1);
495+
496+
// Sleep to give time for the tasks 3, 4, 5 to be submitted.
497+
sleep(300);
498+
499+
// We added the 1 and will add 2 in the continuation. And 3, 4, 5 have been submitted.
500+
crashlyticsWorker.submit(() -> list.add(6));
501+
crashlyticsWorker.submit(() -> list.add(7));
502+
503+
return Tasks.forResult(1);
504+
},
505+
task -> {
506+
// When the task 1 completes the next number to add is 2. Because all the other tasks are
507+
// just submitted, not executed yet.
508+
list.add(2);
509+
return Tasks.forResult("a");
510+
});
511+
512+
// Submit tasks to add 3, 4, 5 since we just added 1 and know a continuation will add the 2.
513+
crashlyticsWorker.submit(() -> list.add(3));
514+
crashlyticsWorker.submit(() -> list.add(4));
515+
crashlyticsWorker.submit(() -> list.add(5));
516+
517+
crashlyticsWorker.await();
518+
519+
// Verify the list is complete and in order.
520+
assertThat(list).isInOrder();
521+
assertThat(list).hasSize(7);
522+
}
523+
524+
@Test
525+
public void raceReturnsFirstResult() throws Exception {
526+
// Create 2 tasks on different workers to race.
527+
Task<String> task1 =
528+
new CrashlyticsWorker(TestOnlyExecutors.background())
529+
.submit(
530+
() -> {
531+
sleep(200);
532+
return "first";
533+
});
534+
Task<String> task2 =
535+
new CrashlyticsWorker(TestOnlyExecutors.background())
536+
.submit(
537+
() -> {
538+
sleep(400);
539+
return "slow";
540+
});
541+
542+
Task<String> task = crashlyticsWorker.race(task1, task2);
543+
String result = Tasks.await(task);
544+
545+
assertThat(result).isEqualTo("first");
546+
}
547+
548+
@Test
549+
public void raceReturnsFirstException() {
550+
// Create 2 tasks on different workers to race.
551+
Task<String> task1 =
552+
new CrashlyticsWorker(TestOnlyExecutors.background())
553+
.submitTask(
554+
() -> {
555+
sleep(200);
556+
return Tasks.forException(new ArithmeticException());
557+
});
558+
Task<String> task2 =
559+
new CrashlyticsWorker(TestOnlyExecutors.background())
560+
.submitTask(
561+
() -> {
562+
sleep(400);
563+
return Tasks.forException(new IllegalStateException());
564+
});
565+
566+
Task<String> task = crashlyticsWorker.race(task1, task2);
567+
ExecutionException thrown = assertThrows(ExecutionException.class, () -> Tasks.await(task));
568+
569+
// The first task throws an ArithmeticException.
570+
assertThat(thrown).hasCauseThat().isInstanceOf(ArithmeticException.class);
571+
}
572+
573+
@Test
574+
public void raceFirstCancelsReturnsSecondResult() throws Exception {
575+
// Create 2 tasks on different workers to race.
576+
Task<String> task1 =
577+
new CrashlyticsWorker(TestOnlyExecutors.background())
578+
.submitTask(
579+
() -> {
580+
sleep(200);
581+
return Tasks.forCanceled();
582+
});
583+
Task<String> task2 =
584+
new CrashlyticsWorker(TestOnlyExecutors.background())
585+
.submitTask(
586+
() -> {
587+
sleep(400);
588+
return Tasks.forResult("I am slow but didn't cancel.");
589+
});
590+
591+
Task<String> task = crashlyticsWorker.race(task1, task2);
592+
String result = Tasks.await(task);
593+
594+
assertThat(result).isEqualTo("I am slow but didn't cancel.");
595+
}
596+
597+
@Test
598+
public void raceBothCancel() {
599+
// Create 2 tasks on different workers to race.
600+
Task<String> task1 =
601+
new CrashlyticsWorker(TestOnlyExecutors.background())
602+
.submitTask(
603+
() -> {
604+
sleep(200);
605+
return Tasks.forCanceled();
606+
});
607+
Task<String> task2 =
608+
new CrashlyticsWorker(TestOnlyExecutors.background())
609+
.submitTask(
610+
() -> {
611+
sleep(400);
612+
return Tasks.forCanceled();
613+
});
614+
615+
Task<String> task = crashlyticsWorker.race(task1, task2);
616+
617+
// Both cancelled, so cancel the race result.
618+
assertThrows(CancellationException.class, () -> Tasks.await(task));
619+
}
620+
621+
@Test
622+
public void raceTasksOnSameWorker() throws Exception {
623+
// Create 2 tasks on this worker to race.
624+
Task<String> task1 =
625+
crashlyticsWorker.submit(
626+
() -> {
627+
sleep(200);
628+
return "first";
629+
});
630+
Task<String> task2 =
631+
crashlyticsWorker.submit(
632+
() -> {
633+
sleep(300);
634+
return "second";
635+
});
636+
637+
Task<String> task = crashlyticsWorker.race(task1, task2);
638+
String result = Tasks.await(task);
639+
640+
// The first task is submitted to this worker first, so will always be first.
641+
assertThat(result).isEqualTo("first");
642+
}
643+
644+
@Test
645+
public void raceTaskOneOnSameWorkerAnotherNeverCompletes() throws Exception {
646+
// Create a task on this worker, and another that never completes, to race.
647+
Task<String> task1 = crashlyticsWorker.submit(() -> "first");
648+
Task<String> task2 = new TaskCompletionSource<String>().getTask();
649+
650+
Task<String> task = crashlyticsWorker.race(task1, task2);
651+
String result = Tasks.await(task);
652+
653+
assertThat(result).isEqualTo("first");
654+
}
655+
656+
@Test
657+
public void raceTaskOneOnSameWorkerAnotherOtherThatCompletesFirst() throws Exception {
658+
// Add a decoy task to the worker to take up some time.
659+
crashlyticsWorker.submitTask(
660+
() -> {
661+
sleep(200);
662+
return Tasks.forResult(null);
663+
});
664+
665+
// Create a task on this worker, and another, to race.
666+
Task<String> task1 = crashlyticsWorker.submit(() -> "same worker");
667+
TaskCompletionSource<String> task2 = new TaskCompletionSource<>();
668+
task2.trySetResult("other");
669+
670+
Task<String> task = crashlyticsWorker.race(task1, task2.getTask());
671+
String result = Tasks.await(task);
672+
673+
// The other tasks completes first because the first task is queued up later on the worker.
674+
assertThat(result).isEqualTo("other");
675+
}
676+
424677
private static void sleep(long millis) {
425678
try {
426679
Thread.sleep(millis);

firebase-crashlytics/src/main/java/com/google/firebase/crashlytics/internal/CrashlyticsWorker.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,18 @@
1717
package com.google.firebase.crashlytics.internal;
1818

1919
import androidx.annotation.VisibleForTesting;
20+
import com.google.android.gms.tasks.CancellationTokenSource;
21+
import com.google.android.gms.tasks.Continuation;
2022
import com.google.android.gms.tasks.Task;
23+
import com.google.android.gms.tasks.TaskCompletionSource;
2124
import com.google.android.gms.tasks.Tasks;
25+
import com.google.errorprone.annotations.CanIgnoreReturnValue;
2226
import java.util.concurrent.Callable;
2327
import java.util.concurrent.ExecutionException;
2428
import java.util.concurrent.ExecutorService;
2529
import java.util.concurrent.TimeUnit;
2630
import java.util.concurrent.TimeoutException;
31+
import java.util.concurrent.atomic.AtomicBoolean;
2732

2833
/**
2934
* Helper for executing tasks sequentially on the given executor service.
@@ -57,9 +62,12 @@ public ExecutorService getExecutor() {
5762
/**
5863
* Submits a <code>Callable</code> task for asynchronous execution on the executor.
5964
*
65+
* <p>A blocking callable will block an underlying thread.
66+
*
6067
* <p>Returns a <code>Task</code> which will be resolved upon successful completion of the
6168
* callable, or throws an <code>ExecutionException</code> if the callable throws an exception.
6269
*/
70+
@CanIgnoreReturnValue
6371
public <T> Task<T> submit(Callable<T> callable) {
6472
synchronized (tailLock) {
6573
// Do not propagate a cancellation.
@@ -76,9 +84,12 @@ public <T> Task<T> submit(Callable<T> callable) {
7684
/**
7785
* Submits a <code>Runnable</code> task for asynchronous execution on the executor.
7886
*
87+
* <p>A blocking runnable will block an underlying thread.
88+
*
7989
* <p>Returns a <code>Task</code> which will be resolved with null upon successful completion of
8090
* the runnable, or throws an <code>ExecutionException</code> if the runnable throws an exception.
8191
*/
92+
@CanIgnoreReturnValue
8293
public Task<Void> submit(Runnable runnable) {
8394
synchronized (tailLock) {
8495
// Do not propagate a cancellation.
@@ -108,6 +119,7 @@ public Task<Void> submit(Runnable runnable) {
108119
* returned by the callable, throws an <code>ExecutionException</code> if the callable throws an
109120
* exception, or throws a <code>CancellationException</code> if the task is cancelled.
110121
*/
122+
@CanIgnoreReturnValue
111123
public <T> Task<T> submitTask(Callable<Task<T>> callable) {
112124
synchronized (tailLock) {
113125
// Chain the new callable task onto the queue's tail, regardless of cancellation.
@@ -117,6 +129,60 @@ public <T> Task<T> submitTask(Callable<Task<T>> callable) {
117129
}
118130
}
119131

132+
/**
133+
* Submits a <code>Callable</code> <code>Task</code> followed by a <code>Continuation</code> for
134+
* asynchronous execution on the executor.
135+
*
136+
* <p>This is useful for submitting a task that must be immediately followed by another task,
137+
* regardless of more tasks being submitted in parallel. For example, settings.
138+
*
139+
* <p>Returns a <code>Task</code> which will be resolved upon successful completion of the Task
140+
* returned by the callable and continued by the continuation, throws an <code>ExecutionException
141+
* </code> if either task throws an exception, or throws a <code>CancellationException</code> if
142+
* either task is cancelled.
143+
*/
144+
@CanIgnoreReturnValue
145+
public <T, R> Task<R> submitTask(
146+
Callable<Task<T>> callable, Continuation<T, Task<R>> continuation) {
147+
synchronized (tailLock) {
148+
// Chain the new callable task and continuation onto the queue's tail.
149+
Task<R> result =
150+
tail.continueWithTask(executor, task -> callable.call())
151+
.continueWithTask(executor, continuation);
152+
tail = result;
153+
return result;
154+
}
155+
}
156+
157+
/**
158+
* Returns a task that is resolved when either of the given tasks is resolved.
159+
*
160+
* <p>When both tasks are cancelled, the returned task will be cancelled.
161+
*/
162+
public <T> Task<T> race(Task<T> task1, Task<T> task2) {
163+
CancellationTokenSource cancellation = new CancellationTokenSource();
164+
TaskCompletionSource<T> result = new TaskCompletionSource<>(cancellation.getToken());
165+
166+
AtomicBoolean otherTaskCancelled = new AtomicBoolean(false);
167+
168+
Continuation<T, Task<Void>> continuation =
169+
task -> {
170+
if (task.isSuccessful()) {
171+
result.trySetResult(task.getResult());
172+
} else if (task.getException() != null) {
173+
result.trySetException(task.getException());
174+
} else if (otherTaskCancelled.getAndSet(true)) {
175+
cancellation.cancel();
176+
}
177+
return Tasks.forResult(null);
178+
};
179+
180+
task1.continueWithTask(executor, continuation);
181+
task2.continueWithTask(executor, continuation);
182+
183+
return result.getTask();
184+
}
185+
120186
/**
121187
* Blocks until all current pending tasks have completed, up to 30 seconds. Useful for testing.
122188
*

0 commit comments

Comments
 (0)