Skip to content

Commit dcd2a01

Browse files
committed
Add a minimal Futures executor
This change adds `std::thread::block_on_future`, which represents a minimal Futures executor. It is modelled after futures-rs `futures::executor::block_on`, which blocks the current thread until the Future had been driven to completion.
1 parent fae75cd commit dcd2a01

File tree

3 files changed

+474
-0
lines changed

3 files changed

+474
-0
lines changed

src/libstd/tests/block_on_future.rs

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
//! Tests for the block_on_future function
2+
3+
#![feature(block_on_future)]
4+
5+
use std::future::Future;
6+
use std::task::{Context, Poll, Waker};
7+
use std::pin::Pin;
8+
use std::sync::{Arc, Mutex};
9+
use std::thread::{block_on_future, JoinHandle, spawn};
10+
11+
struct WakeFromRemoteThreadFuture {
12+
was_polled: bool,
13+
wake_by_ref: bool,
14+
join_handle: Option<JoinHandle<()>>,
15+
}
16+
17+
impl WakeFromRemoteThreadFuture {
18+
fn new(wake_by_ref: bool) -> Self {
19+
WakeFromRemoteThreadFuture {
20+
was_polled: false,
21+
wake_by_ref,
22+
join_handle: None,
23+
}
24+
}
25+
}
26+
27+
impl Future for WakeFromRemoteThreadFuture {
28+
type Output = ();
29+
30+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
31+
if !self.was_polled {
32+
self.was_polled = true;
33+
let waker = cx.waker().clone();
34+
let wake_by_ref = self.wake_by_ref;
35+
self.join_handle = Some(spawn(move || {
36+
if wake_by_ref {
37+
waker.wake();
38+
} else {
39+
waker.wake_by_ref();
40+
}
41+
}));
42+
Poll::Pending
43+
} else {
44+
if let Some(handle) = self.join_handle.take() {
45+
handle.join().unwrap();
46+
}
47+
Poll::Ready(())
48+
}
49+
}
50+
}
51+
52+
struct Yield {
53+
iterations: usize,
54+
}
55+
56+
impl Yield {
57+
fn new(iterations: usize) -> Self {
58+
Yield {
59+
iterations
60+
}
61+
}
62+
}
63+
64+
impl Future for Yield {
65+
type Output = ();
66+
67+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
68+
if self.iterations == 0 {
69+
Poll::Ready(())
70+
} else {
71+
self.iterations -= 1;
72+
cx.waker().wake_by_ref();
73+
Poll::Pending
74+
}
75+
}
76+
}
77+
78+
struct NeverReady {
79+
}
80+
81+
impl NeverReady {
82+
fn new() -> Self {
83+
NeverReady {}
84+
}
85+
}
86+
87+
impl Future for NeverReady {
88+
type Output = ();
89+
90+
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
91+
Poll::Pending
92+
}
93+
}
94+
95+
struct WakerStore {
96+
waker: Option<Waker>,
97+
}
98+
99+
struct StoreWakerFuture {
100+
store: Arc<Mutex<WakerStore>>,
101+
}
102+
103+
impl StoreWakerFuture {
104+
fn new(store: Arc<Mutex<WakerStore>>) -> Self {
105+
StoreWakerFuture {
106+
store
107+
}
108+
}
109+
}
110+
111+
impl Future for StoreWakerFuture {
112+
type Output = ();
113+
114+
fn poll(mut self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
115+
(*self.store.lock().unwrap()).waker = Some(cx.waker().clone());
116+
Poll::Ready(())
117+
}
118+
}
119+
120+
struct WakeFromPreviouslyStoredWakerFuture {
121+
store: Arc<Mutex<WakerStore>>,
122+
was_polled: bool,
123+
join_handle: Option<JoinHandle<()>>,
124+
}
125+
126+
impl WakeFromPreviouslyStoredWakerFuture {
127+
fn new(store: Arc<Mutex<WakerStore>>) -> Self {
128+
WakeFromPreviouslyStoredWakerFuture {
129+
store,
130+
was_polled: false,
131+
join_handle: None,
132+
}
133+
}
134+
}
135+
136+
impl Future for WakeFromPreviouslyStoredWakerFuture {
137+
type Output = ();
138+
139+
fn poll(mut self: core::pin::Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
140+
if !self.was_polled {
141+
self.was_polled = true;
142+
// Don't take the waker from Context but from the side channel
143+
let waker = self.store.lock().unwrap().waker.clone().take().unwrap();
144+
self.join_handle = Some(spawn(move || {
145+
waker.wake();
146+
}));
147+
Poll::Pending
148+
} else {
149+
if let Some(handle) = self.join_handle.take() {
150+
handle.join().unwrap();
151+
}
152+
Poll::Ready(())
153+
}
154+
}
155+
}
156+
157+
#[test]
158+
fn wake_from_local_thread() {
159+
block_on_future(async {
160+
Yield::new(10).await;
161+
});
162+
}
163+
164+
#[test]
165+
fn wake_from_foreign_thread() {
166+
block_on_future(async {
167+
WakeFromRemoteThreadFuture::new(false).await;
168+
});
169+
}
170+
171+
#[test]
172+
fn wake_by_ref_from_foreign_thread() {
173+
block_on_future(async {
174+
WakeFromRemoteThreadFuture::new(true).await;
175+
});
176+
}
177+
178+
#[test]
179+
fn wake_from_multiple_threads() {
180+
block_on_future(async {
181+
WakeFromRemoteThreadFuture::new(false).await;
182+
WakeFromRemoteThreadFuture::new(true).await;
183+
});
184+
}
185+
186+
#[test]
187+
fn wake_local_remote_local() {
188+
block_on_future(async {
189+
Yield::new(10).await;
190+
WakeFromRemoteThreadFuture::new(false).await;
191+
Yield::new(20).await;
192+
WakeFromRemoteThreadFuture::new(true).await;
193+
});
194+
}
195+
196+
#[test]
197+
fn returns_result_from_task() {
198+
let result = block_on_future(async {
199+
let x = 42i32;
200+
Yield::new(10).await;
201+
x
202+
});
203+
assert_eq!(42, result);
204+
}
205+
206+
#[test]
207+
#[should_panic]
208+
fn panics_if_waker_was_not_cloned_and_task_is_not_ready() {
209+
block_on_future(async {
210+
NeverReady::new().await;
211+
});
212+
}
213+
214+
#[test]
215+
fn does_not_panic_if_waker_is_cloned_and_used_a_lot_later() {
216+
let store = Arc::new(Mutex::new(WakerStore {
217+
waker: None,
218+
}));
219+
220+
block_on_future(async {
221+
StoreWakerFuture::new(store.clone()).await;
222+
Yield::new(10).await;
223+
// Multiple wakes from an outdated waker - because it can
224+
// have been cloned multiple times.
225+
WakeFromPreviouslyStoredWakerFuture::new(store.clone()).await;
226+
WakeFromPreviouslyStoredWakerFuture::new(store.clone()).await;
227+
WakeFromPreviouslyStoredWakerFuture::new(store).await;
228+
});
229+
}

0 commit comments

Comments
 (0)