Skip to content

Commit 05d223b

Browse files
authored
Merge a9ab00e into d0fea0e
2 parents d0fea0e + a9ab00e commit 05d223b

File tree

2 files changed

+319
-0
lines changed

2 files changed

+319
-0
lines changed

firebase-crashlytics/src/androidTest/java/com/google/firebase/crashlytics/internal/CrashlyticsWorkerTest.java

Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.junit.Assert.assertThrows;
2121

2222
import com.google.android.gms.tasks.Task;
23+
import com.google.android.gms.tasks.TaskCompletionSource;
2324
import com.google.android.gms.tasks.Tasks;
2425
import com.google.firebase.concurrent.TestOnlyExecutors;
2526
import java.io.IOException;
@@ -421,6 +422,258 @@ public void submitTaskWhenThreadPoolFull() {
421422
assertThrows(TimeoutException.class, () -> Tasks.await(task, 30, TimeUnit.MILLISECONDS));
422423
}
423424

425+
@Test
426+
public void submitTaskThatReturnsWithContinuation() throws Exception {
427+
Task<String> result =
428+
crashlyticsWorker.submitTask(
429+
() -> Tasks.forResult(1337),
430+
task -> Tasks.forResult(Integer.toString(task.getResult())));
431+
432+
assertThat(Tasks.await(result)).isEqualTo("1337");
433+
}
434+
435+
@Test
436+
public void submitTaskThatThrowsWithContinuation() throws Exception {
437+
Task<String> result =
438+
crashlyticsWorker.submitTask(
439+
() -> Tasks.forException(new IndexOutOfBoundsException("Sometimes we look too far.")),
440+
task -> {
441+
if (task.getException() != null) {
442+
return Tasks.forResult("Task threw.");
443+
}
444+
return Tasks.forResult("I dunno how I got here?");
445+
});
446+
447+
assertThat(Tasks.await(result)).isEqualTo("Task threw.");
448+
}
449+
450+
@Test
451+
public void submitTaskWithContinuationThatThrows() throws Exception {
452+
Task<String> result =
453+
crashlyticsWorker.submitTask(
454+
() -> Tasks.forResult(7), task -> Tasks.forException(new IOException()));
455+
456+
ExecutionException thrown = assertThrows(ExecutionException.class, () -> Tasks.await(result));
457+
458+
assertThat(thrown).hasCauseThat().isInstanceOf(IOException.class);
459+
460+
// Verify the worker still executes tasks after the continuation threw.
461+
assertThat(Tasks.await(crashlyticsWorker.submit(() -> 42))).isEqualTo(42);
462+
}
463+
464+
@Test
465+
public void submitTaskThatCancelsWithContinuation() throws Exception {
466+
Task<String> result =
467+
crashlyticsWorker.submitTask(
468+
Tasks::forCanceled,
469+
task -> Tasks.forResult(task.isCanceled() ? "Task cancelled." : "What?"));
470+
471+
assertThat(Tasks.await(result)).isEqualTo("Task cancelled.");
472+
}
473+
474+
@Test
475+
public void submitTaskWithContinuationThatCancels() throws Exception {
476+
Task<String> result =
477+
crashlyticsWorker.submitTask(() -> Tasks.forResult(7), task -> Tasks.forCanceled());
478+
479+
assertThrows(CancellationException.class, () -> Tasks.await(result));
480+
481+
// Verify the worker still executes tasks after the continuation was cancelled.
482+
assertThat(Tasks.await(crashlyticsWorker.submit(() -> "jk"))).isEqualTo("jk");
483+
}
484+
485+
@Test
486+
public void submitTaskWithContinuationExecutesInOrder() throws Exception {
487+
// The integers added to the list represent the order they should be executed in.
488+
List<Integer> list = new ArrayList<>();
489+
490+
// Start the chain which adds 1, then kicks off tasks to add 6 & 7 later, but adds 2 before
491+
// executing the newly added tasks in the continuation.
492+
crashlyticsWorker.submitTask(
493+
() -> {
494+
list.add(1);
495+
496+
// Sleep to give time for the tasks 3, 4, 5 to be submitted.
497+
sleep(300);
498+
499+
// We added the 1 and will add 2 in the continuation. And 3, 4, 5 have been submitted.
500+
crashlyticsWorker.submit(() -> list.add(6));
501+
crashlyticsWorker.submit(() -> list.add(7));
502+
503+
return Tasks.forResult(1);
504+
},
505+
task -> {
506+
// When the task 1 completes the next number to add is 2. Because all the other tasks are
507+
// just submitted, not executed yet.
508+
list.add(2);
509+
return Tasks.forResult("a");
510+
});
511+
512+
// Submit tasks to add 3, 4, 5 since we just added 1 and know a continuation will add the 2.
513+
crashlyticsWorker.submit(() -> list.add(3));
514+
crashlyticsWorker.submit(() -> list.add(4));
515+
crashlyticsWorker.submit(() -> list.add(5));
516+
517+
crashlyticsWorker.await();
518+
519+
// Verify the list is complete and in order.
520+
assertThat(list).isInOrder();
521+
assertThat(list).hasSize(7);
522+
}
523+
524+
@Test
525+
public void raceReturnsFirstResult() throws Exception {
526+
// Create 2 tasks on different workers to race.
527+
Task<String> task1 =
528+
new CrashlyticsWorker(TestOnlyExecutors.background())
529+
.submit(
530+
() -> {
531+
sleep(200);
532+
return "first";
533+
});
534+
Task<String> task2 =
535+
new CrashlyticsWorker(TestOnlyExecutors.background())
536+
.submit(
537+
() -> {
538+
sleep(400);
539+
return "slow";
540+
});
541+
542+
Task<String> task = crashlyticsWorker.race(task1, task2);
543+
String result = Tasks.await(task);
544+
545+
assertThat(result).isEqualTo("first");
546+
}
547+
548+
@Test
549+
public void raceReturnsFirstException() {
550+
// Create 2 tasks on different workers to race.
551+
Task<String> task1 =
552+
new CrashlyticsWorker(TestOnlyExecutors.background())
553+
.submitTask(
554+
() -> {
555+
sleep(200);
556+
return Tasks.forException(new ArithmeticException());
557+
});
558+
Task<String> task2 =
559+
new CrashlyticsWorker(TestOnlyExecutors.background())
560+
.submitTask(
561+
() -> {
562+
sleep(400);
563+
return Tasks.forException(new IllegalStateException());
564+
});
565+
566+
Task<String> task = crashlyticsWorker.race(task1, task2);
567+
ExecutionException thrown = assertThrows(ExecutionException.class, () -> Tasks.await(task));
568+
569+
// The first task throws an ArithmeticException.
570+
assertThat(thrown).hasCauseThat().isInstanceOf(ArithmeticException.class);
571+
}
572+
573+
@Test
574+
public void raceFirstCancelsReturnsSecondResult() throws Exception {
575+
// Create 2 tasks on different workers to race.
576+
Task<String> task1 =
577+
new CrashlyticsWorker(TestOnlyExecutors.background())
578+
.submitTask(
579+
() -> {
580+
sleep(200);
581+
return Tasks.forCanceled();
582+
});
583+
Task<String> task2 =
584+
new CrashlyticsWorker(TestOnlyExecutors.background())
585+
.submitTask(
586+
() -> {
587+
sleep(400);
588+
return Tasks.forResult("I am slow but didn't cancel.");
589+
});
590+
591+
Task<String> task = crashlyticsWorker.race(task1, task2);
592+
String result = Tasks.await(task);
593+
594+
assertThat(result).isEqualTo("I am slow but didn't cancel.");
595+
}
596+
597+
@Test
598+
public void raceBothCancel() {
599+
// Create 2 tasks on different workers to race.
600+
Task<String> task1 =
601+
new CrashlyticsWorker(TestOnlyExecutors.background())
602+
.submitTask(
603+
() -> {
604+
sleep(200);
605+
return Tasks.forCanceled();
606+
});
607+
Task<String> task2 =
608+
new CrashlyticsWorker(TestOnlyExecutors.background())
609+
.submitTask(
610+
() -> {
611+
sleep(400);
612+
return Tasks.forCanceled();
613+
});
614+
615+
Task<String> task = crashlyticsWorker.race(task1, task2);
616+
617+
// Both cancelled, so cancel the race result.
618+
assertThrows(CancellationException.class, () -> Tasks.await(task));
619+
}
620+
621+
@Test
622+
public void raceTasksOnSameWorker() throws Exception {
623+
// Create 2 tasks on this worker to race.
624+
Task<String> task1 =
625+
crashlyticsWorker.submit(
626+
() -> {
627+
sleep(200);
628+
return "first";
629+
});
630+
Task<String> task2 =
631+
crashlyticsWorker.submit(
632+
() -> {
633+
sleep(300);
634+
return "second";
635+
});
636+
637+
Task<String> task = crashlyticsWorker.race(task1, task2);
638+
String result = Tasks.await(task);
639+
640+
// The first task is submitted to this worker first, so will always be first.
641+
assertThat(result).isEqualTo("first");
642+
}
643+
644+
@Test
645+
public void raceTaskOneOnSameWorkerAnotherNeverCompletes() throws Exception {
646+
// Create a task on this worker, and another that never completes, to race.
647+
Task<String> task1 = crashlyticsWorker.submit(() -> "first");
648+
Task<String> task2 = new TaskCompletionSource<String>().getTask();
649+
650+
Task<String> task = crashlyticsWorker.race(task1, task2);
651+
String result = Tasks.await(task);
652+
653+
assertThat(result).isEqualTo("first");
654+
}
655+
656+
@Test
657+
public void raceTaskOneOnSameWorkerAnotherOtherThatCompletesFirst() throws Exception {
658+
// Add a decoy task to the worker to take up some time.
659+
crashlyticsWorker.submitTask(
660+
() -> {
661+
sleep(200);
662+
return Tasks.forResult(null);
663+
});
664+
665+
// Create a task on this worker, and another, to race.
666+
Task<String> task1 = crashlyticsWorker.submit(() -> "same worker");
667+
TaskCompletionSource<String> task2 = new TaskCompletionSource<>();
668+
task2.trySetResult("other");
669+
670+
Task<String> task = crashlyticsWorker.race(task1, task2.getTask());
671+
String result = Tasks.await(task);
672+
673+
// The other tasks completes first because the first task is queued up later on the worker.
674+
assertThat(result).isEqualTo("other");
675+
}
676+
424677
private static void sleep(long millis) {
425678
try {
426679
Thread.sleep(millis);

firebase-crashlytics/src/main/java/com/google/firebase/crashlytics/internal/CrashlyticsWorker.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,18 @@
1717
package com.google.firebase.crashlytics.internal;
1818

1919
import androidx.annotation.VisibleForTesting;
20+
import com.google.android.gms.tasks.CancellationTokenSource;
21+
import com.google.android.gms.tasks.Continuation;
2022
import com.google.android.gms.tasks.Task;
23+
import com.google.android.gms.tasks.TaskCompletionSource;
2124
import com.google.android.gms.tasks.Tasks;
25+
import com.google.errorprone.annotations.CanIgnoreReturnValue;
2226
import java.util.concurrent.Callable;
2327
import java.util.concurrent.ExecutionException;
2428
import java.util.concurrent.ExecutorService;
2529
import java.util.concurrent.TimeUnit;
2630
import java.util.concurrent.TimeoutException;
31+
import java.util.concurrent.atomic.AtomicBoolean;
2732

2833
/**
2934
* Helper for executing tasks sequentially on the given executor service.
@@ -57,9 +62,12 @@ public ExecutorService getExecutor() {
5762
/**
5863
* Submits a <code>Callable</code> task for asynchronous execution on the executor.
5964
*
65+
* <p>A blocking callable will block an underlying thread.
66+
*
6067
* <p>Returns a <code>Task</code> which will be resolved upon successful completion of the
6168
* callable, or throws an <code>ExecutionException</code> if the callable throws an exception.
6269
*/
70+
@CanIgnoreReturnValue
6371
public <T> Task<T> submit(Callable<T> callable) {
6472
synchronized (tailLock) {
6573
// Do not propagate a cancellation.
@@ -76,9 +84,12 @@ public <T> Task<T> submit(Callable<T> callable) {
7684
/**
7785
* Submits a <code>Runnable</code> task for asynchronous execution on the executor.
7886
*
87+
* <p>A blocking runnable will block an underlying thread.
88+
*
7989
* <p>Returns a <code>Task</code> which will be resolved with null upon successful completion of
8090
* the runnable, or throws an <code>ExecutionException</code> if the runnable throws an exception.
8191
*/
92+
@CanIgnoreReturnValue
8293
public Task<Void> submit(Runnable runnable) {
8394
synchronized (tailLock) {
8495
// Do not propagate a cancellation.
@@ -108,6 +119,7 @@ public Task<Void> submit(Runnable runnable) {
108119
* returned by the callable, throws an <code>ExecutionException</code> if the callable throws an
109120
* exception, or throws a <code>CancellationException</code> if the task is cancelled.
110121
*/
122+
@CanIgnoreReturnValue
111123
public <T> Task<T> submitTask(Callable<Task<T>> callable) {
112124
synchronized (tailLock) {
113125
// Chain the new callable task onto the queue's tail, regardless of cancellation.
@@ -117,6 +129,60 @@ public <T> Task<T> submitTask(Callable<Task<T>> callable) {
117129
}
118130
}
119131

132+
/**
133+
* Submits a <code>Callable</code> <code>Task</code> followed by a <code>Continuation</code> for
134+
* asynchronous execution on the executor.
135+
*
136+
* <p>This is useful for submitting a task that must be immediately followed by another task,
137+
* regardless of more tasks being submitted in parallel. For example, settings.
138+
*
139+
* <p>Returns a <code>Task</code> which will be resolved upon successful completion of the Task
140+
* returned by the callable and continued by the continuation, throws an <code>ExecutionException
141+
* </code> if either task throws an exception, or throws a <code>CancellationException</code> if
142+
* either task is cancelled.
143+
*/
144+
@CanIgnoreReturnValue
145+
public <T, R> Task<R> submitTask(
146+
Callable<Task<T>> callable, Continuation<T, Task<R>> continuation) {
147+
synchronized (tailLock) {
148+
// Chain the new callable task and continuation onto the queue's tail.
149+
Task<R> result =
150+
tail.continueWithTask(executor, task -> callable.call())
151+
.continueWithTask(executor, continuation);
152+
tail = result;
153+
return result;
154+
}
155+
}
156+
157+
/**
158+
* Returns a task that is resolved when either of the given tasks is resolved.
159+
*
160+
* <p>When both tasks are cancelled, the returned task will be cancelled.
161+
*/
162+
public <T> Task<T> race(Task<T> task1, Task<T> task2) {
163+
CancellationTokenSource cancellation = new CancellationTokenSource();
164+
TaskCompletionSource<T> result = new TaskCompletionSource<>(cancellation.getToken());
165+
166+
AtomicBoolean otherTaskCancelled = new AtomicBoolean(false);
167+
168+
Continuation<T, Task<Void>> continuation =
169+
task -> {
170+
if (task.isSuccessful()) {
171+
result.trySetResult(task.getResult());
172+
} else if (task.getException() != null) {
173+
result.trySetException(task.getException());
174+
} else if (otherTaskCancelled.getAndSet(true)) {
175+
cancellation.cancel();
176+
}
177+
return Tasks.forResult(null);
178+
};
179+
180+
task1.continueWithTask(executor, continuation);
181+
task2.continueWithTask(executor, continuation);
182+
183+
return result.getTask();
184+
}
185+
120186
/**
121187
* Blocks until all current pending tasks have completed, up to 30 seconds. Useful for testing.
122188
*

0 commit comments

Comments
 (0)