Skip to content

Commit cc70a21

Browse files
authored
task: add join_all method to JoinSet (#6784)
Adds join_all method to JoinSet. join_all consumes JoinSet and awaits the completion of all tasks on it, returning the results of the tasks in a vec. An error or panic in the task will cause join_all to panic, canceling all other tasks. Fixes: #6664
1 parent 1ac8dff commit cc70a21

File tree

2 files changed

+114
-1
lines changed

2 files changed

+114
-1
lines changed

tokio/src/task/join_set.rs

+74-1
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
//! of spawned tasks and allows asynchronously awaiting the output of those
55
//! tasks as they complete. See the documentation for the [`JoinSet`] type for
66
//! details.
7-
use std::fmt;
87
use std::future::Future;
98
use std::pin::Pin;
109
use std::task::{Context, Poll};
10+
use std::{fmt, panic};
1111

1212
use crate::runtime::Handle;
1313
#[cfg(tokio_unstable)]
@@ -374,6 +374,79 @@ impl<T: 'static> JoinSet<T> {
374374
while self.join_next().await.is_some() {}
375375
}
376376

377+
/// Awaits the completion of all tasks in this `JoinSet`, returning a vector of their results.
378+
///
379+
/// The results will be stored in the order they completed not the order they were spawned.
380+
/// This is a convenience method that is equivalent to calling [`join_next`] in
381+
/// a loop. If any tasks on the `JoinSet` fail with an [`JoinError`], then this call
382+
/// to `join_all` will panic and all remaining tasks on the `JoinSet` are
383+
/// cancelled. To handle errors in any other way, manually call [`join_next`]
384+
/// in a loop.
385+
///
386+
/// # Examples
387+
///
388+
/// Spawn multiple tasks and `join_all` them.
389+
///
390+
/// ```
391+
/// use tokio::task::JoinSet;
392+
/// use std::time::Duration;
393+
///
394+
/// #[tokio::main]
395+
/// async fn main() {
396+
/// let mut set = JoinSet::new();
397+
///
398+
/// for i in 0..3 {
399+
/// set.spawn(async move {
400+
/// tokio::time::sleep(Duration::from_secs(3 - i)).await;
401+
/// i
402+
/// });
403+
/// }
404+
///
405+
/// let output = set.join_all().await;
406+
/// assert_eq!(output, vec![2, 1, 0]);
407+
/// }
408+
/// ```
409+
///
410+
/// Equivalent implementation of `join_all`, using [`join_next`] and loop.
411+
///
412+
/// ```
413+
/// use tokio::task::JoinSet;
414+
/// use std::panic;
415+
///
416+
/// #[tokio::main]
417+
/// async fn main() {
418+
/// let mut set = JoinSet::new();
419+
///
420+
/// for i in 0..3 {
421+
/// set.spawn(async move {i});
422+
/// }
423+
///
424+
/// let mut output = Vec::new();
425+
/// while let Some(res) = set.join_next().await{
426+
/// match res {
427+
/// Ok(t) => output.push(t),
428+
/// Err(err) if err.is_panic() => panic::resume_unwind(err.into_panic()),
429+
/// Err(err) => panic!("{err}"),
430+
/// }
431+
/// }
432+
/// assert_eq!(output.len(),3);
433+
/// }
434+
/// ```
435+
/// [`join_next`]: fn@Self::join_next
436+
/// [`JoinError::id`]: fn@crate::task::JoinError::id
437+
pub async fn join_all(mut self) -> Vec<T> {
438+
let mut output = Vec::with_capacity(self.len());
439+
440+
while let Some(res) = self.join_next().await {
441+
match res {
442+
Ok(t) => output.push(t),
443+
Err(err) if err.is_panic() => panic::resume_unwind(err.into_panic()),
444+
Err(err) => panic!("{err}"),
445+
}
446+
}
447+
output
448+
}
449+
377450
/// Aborts all tasks on this `JoinSet`.
378451
///
379452
/// This does not remove the tasks from the `JoinSet`. To wait for the tasks to complete

tokio/tests/task_join_set.rs

+40
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,46 @@ fn runtime_gone() {
156156
.is_cancelled());
157157
}
158158

159+
#[tokio::test]
160+
async fn join_all() {
161+
let mut set: JoinSet<i32> = JoinSet::new();
162+
163+
for _ in 0..5 {
164+
set.spawn(async { 1 });
165+
}
166+
let res: Vec<i32> = set.join_all().await;
167+
168+
assert_eq!(res.len(), 5);
169+
for itm in res.into_iter() {
170+
assert_eq!(itm, 1)
171+
}
172+
}
173+
174+
#[cfg(panic = "unwind")]
175+
#[tokio::test(start_paused = true)]
176+
async fn task_panics() {
177+
let mut set: JoinSet<()> = JoinSet::new();
178+
179+
let (tx, mut rx) = oneshot::channel();
180+
assert_eq!(set.len(), 0);
181+
182+
set.spawn(async move {
183+
tokio::time::sleep(Duration::from_secs(2)).await;
184+
tx.send(()).unwrap();
185+
});
186+
assert_eq!(set.len(), 1);
187+
188+
set.spawn(async {
189+
tokio::time::sleep(Duration::from_secs(1)).await;
190+
panic!();
191+
});
192+
assert_eq!(set.len(), 2);
193+
194+
let panic = tokio::spawn(set.join_all()).await.unwrap_err();
195+
assert!(rx.try_recv().is_err());
196+
assert!(panic.is_panic());
197+
}
198+
159199
#[tokio::test(start_paused = true)]
160200
async fn abort_all() {
161201
let mut set: JoinSet<()> = JoinSet::new();

0 commit comments

Comments
 (0)