|
8 | 8 | ))]
|
9 | 9 |
|
10 | 10 | use std::future::Future;
|
11 |
| -use std::sync::{Arc, Mutex}; |
| 11 | +use std::sync::{mpsc, Arc, Mutex}; |
12 | 12 | use std::task::Poll;
|
13 | 13 | use std::thread;
|
14 | 14 | use tokio::macros::support::poll_fn;
|
@@ -295,42 +295,34 @@ fn worker_noop_count() {
|
295 | 295 | }
|
296 | 296 |
|
297 | 297 | #[test]
|
298 |
| -#[ignore] // this test is flaky, see https://github.com/tokio-rs/tokio/issues/6470 |
299 | 298 | fn worker_steal_count() {
|
300 | 299 | // This metric only applies to the multi-threaded runtime.
|
301 |
| - // |
302 |
| - // We use a blocking channel to backup one worker thread. |
303 |
| - use std::sync::mpsc::channel; |
304 |
| - |
305 |
| - let rt = threaded_no_lifo(); |
306 |
| - let metrics = rt.metrics(); |
307 |
| - |
308 |
| - rt.block_on(async { |
309 |
| - let (tx, rx) = channel(); |
| 300 | + for _ in 0..10 { |
| 301 | + let rt = threaded_no_lifo(); |
| 302 | + let metrics = rt.metrics(); |
310 | 303 |
|
311 |
| - // Move to the runtime. |
312 |
| - tokio::spawn(async move { |
313 |
| - // Spawn the task that sends to the channel |
314 |
| - // |
315 |
| - // Since the lifo slot is disabled, this task is stealable. |
316 |
| - tokio::spawn(async move { |
317 |
| - tx.send(()).unwrap(); |
318 |
| - }); |
| 304 | + let successfully_spawned_stealable_task = rt.block_on(async { |
| 305 | + // The call to `try_spawn_stealable_task` may time out, which means |
| 306 | + // that the sending task couldn't be scheduled due to a deadlock in |
| 307 | + // the runtime. |
| 308 | + // This is expected behaviour, we just retry until we succeed or |
| 309 | + // exhaust all tries, the latter causing this test to fail. |
| 310 | + try_spawn_stealable_task().await.is_ok() |
| 311 | + }); |
319 | 312 |
|
320 |
| - // Blocking receive on the channel. |
321 |
| - rx.recv().unwrap(); |
322 |
| - }) |
323 |
| - .await |
324 |
| - .unwrap(); |
325 |
| - }); |
| 313 | + drop(rt); |
326 | 314 |
|
327 |
| - drop(rt); |
| 315 | + if successfully_spawned_stealable_task { |
| 316 | + let n: u64 = (0..metrics.num_workers()) |
| 317 | + .map(|i| metrics.worker_steal_count(i)) |
| 318 | + .sum(); |
328 | 319 |
|
329 |
| - let n: u64 = (0..metrics.num_workers()) |
330 |
| - .map(|i| metrics.worker_steal_count(i)) |
331 |
| - .sum(); |
| 320 | + assert_eq!(1, n); |
| 321 | + return; |
| 322 | + } |
| 323 | + } |
332 | 324 |
|
333 |
| - assert_eq!(1, n); |
| 325 | + panic!("exhausted every try to schedule the stealable task"); |
334 | 326 | }
|
335 | 327 |
|
336 | 328 | #[test]
|
@@ -835,6 +827,30 @@ fn io_driver_ready_count() {
|
835 | 827 | assert_eq!(metrics.io_driver_ready_count(), 1);
|
836 | 828 | }
|
837 | 829 |
|
| 830 | +async fn try_spawn_stealable_task() -> Result<(), mpsc::RecvTimeoutError> { |
| 831 | + // We use a blocking channel to synchronize the tasks. |
| 832 | + let (tx, rx) = mpsc::channel(); |
| 833 | + |
| 834 | + // Make sure we are in the context of the runtime. |
| 835 | + tokio::spawn(async move { |
| 836 | + // Spawn the task that sends to the channel. |
| 837 | + // |
| 838 | + // Note that the runtime needs to have the lifo slot disabled to make |
| 839 | + // this task stealable. |
| 840 | + tokio::spawn(async move { |
| 841 | + tx.send(()).unwrap(); |
| 842 | + }); |
| 843 | + |
| 844 | + // Blocking receive on the channel, timing out if the sending task |
| 845 | + // wasn't scheduled in time. |
| 846 | + rx.recv_timeout(Duration::from_secs(1)) |
| 847 | + }) |
| 848 | + .await |
| 849 | + .unwrap()?; |
| 850 | + |
| 851 | + Ok(()) |
| 852 | +} |
| 853 | + |
838 | 854 | fn current_thread() -> Runtime {
|
839 | 855 | tokio::runtime::Builder::new_current_thread()
|
840 | 856 | .enable_all()
|
|
0 commit comments