Skip to content

Commit fb02163

Browse files
committed
Cleanup and queue unification
1 parent 57cbb96 commit fb02163

File tree

2 files changed

+143
-92
lines changed

2 files changed

+143
-92
lines changed

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,13 @@ edition = "2018"
66

77
[dependencies]
88
lock_api = "0.1.5"
9-
pin-utils = "0.1.0-alpha.4"
109

1110
[dependencies.futures-preview]
1211
version = "0.3.0-alpha.12"
1312
default-features = false
1413

1514
[dependencies.generational-arena]
16-
version = "0.2.0"
15+
version = "0.2.1"
1716
default_features = false
1817
optional = true
1918

src/alloc_exec.rs

Lines changed: 142 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
use core::task::Poll;
2-
3-
use pin_utils::pin_mut;
1+
use core::{
2+
mem,
3+
pin::Pin,
4+
task::Poll,
5+
};
46

57
use alloc::{
68
collections::VecDeque,
@@ -14,6 +16,7 @@ use alloc::{
1416
use futures::{
1517
future::FutureObj,
1618
task::{
19+
LocalWaker,
1720
Spawn,
1821
SpawnError,
1922
},
@@ -23,6 +26,7 @@ use generational_arena::{
2326
Arena,
2427
Index,
2528
};
29+
2630
use lock_api::{
2731
Mutex,
2832
RawMutex,
@@ -34,22 +38,76 @@ use crate::{
3438
sleep::*,
3539
};
3640

41+
// default registry capacity
42+
const REG_CAP: usize = 16;
43+
44+
// default queue capacity
45+
const QUEUE_CAP: usize = 16;
46+
47+
enum QueueItem<'a> {
48+
Poll(Index),
49+
Spawn(FutureObj<'a, ()>),
50+
}
51+
52+
type Queue<'a> = VecDeque<QueueItem<'a>>;
53+
54+
type QueueHandle<'a, R> = Arc<Mutex<R, Queue<'a>>>;
55+
56+
fn new_queue<'a, R>(capacity: usize) -> QueueHandle<'a, R>
57+
where
58+
R: RawMutex + Send + Sync,
59+
{
60+
Arc::new(Mutex::new(Queue::with_capacity(capacity)))
61+
}
62+
3763
// Super simple Wake implementation
3864
// Sticks the Index into the queue and calls Alarm::ring
39-
struct QueueWaker<R: RawMutex + Send + Sync + 'static, A: Alarm>(
40-
Arc<Mutex<R, VecDeque<Index>>>,
41-
Index,
42-
A,
43-
);
65+
struct QueueWaker<R, A>
66+
where
67+
R: RawMutex + Send + Sync,
68+
{
69+
queue: QueueHandle<'static, R>,
70+
id: Index,
71+
alarm: A,
72+
}
73+
74+
impl<R, A> QueueWaker<R, A>
75+
where
76+
R: RawMutex + Send + Sync,
77+
{
78+
fn new(queue: QueueHandle<'static, R>, id: Index, alarm: A) -> Self {
79+
QueueWaker { queue, id, alarm }
80+
}
81+
}
4482

4583
impl<R, A> Wake for QueueWaker<R, A>
4684
where
47-
R: RawMutex + Send + Sync + 'static,
85+
R: RawMutex + Send + Sync,
4886
A: Alarm,
4987
{
5088
fn wake(arc_self: &Arc<Self>) {
51-
arc_self.0.lock().push_back(arc_self.1);
52-
arc_self.2.ring();
89+
arc_self
90+
.queue
91+
.lock()
92+
.push_back(QueueItem::Poll(arc_self.id));
93+
arc_self.alarm.ring();
94+
}
95+
}
96+
97+
struct Task<'a> {
98+
future: FutureObj<'a, ()>,
99+
waker: Option<LocalWaker>,
100+
}
101+
102+
impl<'a> Task<'a> {
103+
fn new(future: FutureObj<'a, ()>) -> Task<'a> {
104+
Task {
105+
future,
106+
waker: None,
107+
}
108+
}
109+
fn set_waker(&mut self, waker: LocalWaker) {
110+
self.waker = Some(waker);
53111
}
54112
}
55113

@@ -67,39 +125,29 @@ where
67125
// instruction away, but could be beneficial if threads get involved.
68126
pub struct AllocExecutor<'a, R, S>
69127
where
70-
R: RawMutex + Send + Sync + 'static,
128+
R: RawMutex + Send + Sync,
71129
S: Sleep,
72130
{
73-
// Wow, so this is an ugly type. Sorry about that.
74-
// Anyway, we're storing our Wake-implementing type next to its task so that
75-
// we can re-use the exact same Arc every time we poll it. That way we're
76-
// not creating a new allocation on every poll and it gives the Future
77-
// implementations the ability to take advantage of the `will_wake*`
78-
// functions.
79-
registry: Arena<(
80-
FutureObj<'a, ()>,
81-
Option<Arc<QueueWaker<R, <S as Sleep>::Alarm>>>,
82-
)>,
83-
poll_queue: Arc<Mutex<R, VecDeque<Index>>>,
84-
spawn_queue: Arc<Mutex<R, VecDeque<FutureObj<'a, ()>>>>,
131+
registry: Arena<Task<'a>>,
132+
queue: QueueHandle<'a, R>,
85133
alarm: <S as Sleep>::Alarm,
86134
}
87135

88136
/// Spawner for an `AllocExecutor`
89137
///
90138
/// This can be cloned and passed to an async function to allow it to spawn more
91139
/// tasks.
92-
pub struct Spawner<'a, R>(Arc<Mutex<R, VecDeque<FutureObj<'a, ()>>>>)
140+
pub struct Spawner<'a, R>(QueueHandle<'a, R>)
93141
where
94-
R: RawMutex + Send + Sync + 'static;
142+
R: RawMutex + Send + Sync;
95143

96144
impl<'a, R> Spawner<'a, R>
97145
where
98-
R: RawMutex + Send + Sync + 'static,
146+
R: RawMutex + Send + Sync,
99147
{
100148
/// Spawn a `FutureObj` into the corresponding `AllocExecutor`
101149
pub fn spawn_obj(&mut self, future: FutureObj<'a, ()>) {
102-
self.0.lock().push_back(future);
150+
self.0.lock().push_back(QueueItem::Spawn(future));
103151
}
104152

105153
/// Spawn a `Future` into the corresponding `AllocExecutor`
@@ -117,7 +165,7 @@ where
117165

118166
impl<'a, R> Clone for Spawner<'a, R>
119167
where
120-
R: RawMutex + Send + Sync + 'static,
168+
R: RawMutex + Send + Sync,
121169
{
122170
fn clone(&self) -> Self {
123171
Spawner(self.0.clone())
@@ -126,7 +174,7 @@ where
126174

127175
impl<'a, R> Spawn for Spawner<'a, R>
128176
where
129-
R: RawMutex + Send + Sync + 'static,
177+
R: RawMutex + Send + Sync,
130178
{
131179
fn spawn_obj(&mut self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
132180
self.spawn_obj(future);
@@ -143,33 +191,47 @@ where
143191
///
144192
/// Does nothing unless it's `run()`
145193
pub fn new() -> Self {
146-
// TODO(Josh) `with_capacity`?
194+
Self::with_capacity(REG_CAP, QUEUE_CAP)
195+
}
196+
197+
/// Initialize a new `AllocExecutor` with the given capacities.
198+
///
199+
/// Does nothing unless it's `run()`
200+
pub fn with_capacity(registry: usize, queue: usize) -> Self {
147201
AllocExecutor {
148-
registry: Arena::new(),
149-
poll_queue: Arc::new(Mutex::new(Default::default())),
150-
spawn_queue: Arc::new(Mutex::new(Default::default())),
202+
registry: Arena::with_capacity(registry),
203+
queue: new_queue(queue),
151204
alarm: S::make_alarm(),
152205
}
153206
}
154207

155208
/// Get a handle to a `Spawner` that can be passed to `Future` constructors
156209
/// to spawn even *more* `Future`s
157210
pub fn spawner(&self) -> Spawner<'a, R> {
158-
Spawner(self.spawn_queue.clone())
211+
Spawner(self.queue.clone())
159212
}
160213

161214
/// Spawn a `FutureObj` into the executor.
162215
///
163216
/// Thanks to the `'a` lifetime bound, these don't necessarily have to be
164217
/// `'static` `Futures`, so long as they outlive the executor.
165218
pub fn spawn_obj(&mut self, future: FutureObj<'a, ()>) {
166-
let id = self.registry.insert((future, None));
167-
self.registry.get_mut(id).unwrap().1 = Some(Arc::new(QueueWaker(
168-
self.poll_queue.clone(),
169-
id,
170-
self.alarm.clone(),
171-
)));
172-
self.poll_queue.lock().push_back(id);
219+
let id = self.registry.insert(Task::new(future));
220+
221+
// Safety: The QueueWaker only deals in 'static lifetimed things, i.e.
222+
// task `Index`es, only writes to the queue, and will never give anyone
223+
// else this transmuted version.
224+
let static_queue: QueueHandle<'static, R> = unsafe { mem::transmute(self.queue.clone()) };
225+
226+
let queue_waker = Arc::new(QueueWaker::new(static_queue, id, self.alarm.clone()));
227+
228+
// Safety: Our QueueWaker does the exact same thing for local vs
229+
// non-local wake.
230+
let local_waker = unsafe { local_waker(queue_waker) };
231+
self.registry.get_mut(id).unwrap().set_waker(local_waker);
232+
233+
// Insert the newly spawned task into the queue to be polled
234+
self.queue.lock().push_back(QueueItem::Poll(id));
173235
}
174236

175237
/// Spawn a `Future` into the executor.
@@ -183,6 +245,32 @@ where
183245
self.spawn_obj(make_obj(future));
184246
}
185247

248+
/// Polls a task with the given id
249+
///
250+
/// If no such task exists, it's a no-op.
251+
/// If the task returns `Poll::Ready`, it will be removed from the registry.
252+
fn poll_task(&mut self, id: Index) {
253+
// It's possible that the waker is still hanging out somewhere and
254+
// getting called even though its task is gone. If so, we can just
255+
// skip it.
256+
if let Some(Task { future, waker }) = self.registry.get_mut(id) {
257+
let future = Pin::new(future);
258+
259+
let waker = waker
260+
.as_ref()
261+
.expect("waker not set, task spawned incorrectly");
262+
263+
// Safety: Our waker doesn't do anything special for wake_local vs
264+
// wake, so this is safe.
265+
match future.poll(waker) {
266+
Poll::Ready(_) => {
267+
self.registry.remove(id);
268+
}
269+
Poll::Pending => {}
270+
}
271+
}
272+
}
273+
186274
/// Run the executor
187275
///
188276
/// Each loop will poll at most one task from the queue and then check for
@@ -192,61 +280,25 @@ where
192280
/// Once there's nothing to spawn and nothing left in the registry, the
193281
/// executor will return.
194282
pub fn run(&mut self) {
195-
// Cloning these pointers at the start so that we don't anger the borrow
283+
// Cloning this pointer at the start so that we don't anger the borrow
196284
// checking gods.
197-
let poll_queue = self.poll_queue.clone();
198-
let spawn_queue = self.spawn_queue.clone();
285+
let queue = self.queue.clone();
199286

200287
loop {
201-
// This will be the queue length *after* the front is popped.
202-
// We're only going to handle one task per loop so that futures that
203-
// call wake immediately don't starve the spawner. We'll use the
204-
// remaining queue length to decide whether we need to sleep or not.
205-
let (queue_len, front) = {
206-
let mut queue = poll_queue.lock();
207-
let front = queue.pop_front();
208-
let queue_len = queue.len();
209-
(queue_len, front)
210-
};
211-
212-
// It's possible that the waker is still hanging out somewhere and
213-
// getting called even though its task is gone. If so, we can just
214-
// skip it.
215-
if let Some((future, waker, id)) =
216-
front.and_then(|id| self.registry.get_mut(id).map(|(f, w)| (f, w, id)))
217-
{
218-
pin_mut!(future);
219-
220-
let waker = waker.as_ref().expect("waker not set").clone();
221-
222-
// Our waker doesn't do anything special for wake_local vs wake,
223-
// so this is safe.
224-
match future.poll(&unsafe { local_waker(waker) }) {
225-
Poll::Ready(_) => {
226-
self.registry.remove(id);
288+
while let Some(item) = queue.lock().pop_front() {
289+
match item {
290+
QueueItem::Poll(id) => {
291+
self.poll_task(id);
292+
}
293+
QueueItem::Spawn(task) => {
294+
self.spawn_obj(task);
227295
}
228-
Poll::Pending => {}
229296
}
230297
}
231-
232-
let mut spawn_queue = spawn_queue.lock();
233-
if spawn_queue.is_empty() {
234-
// if there's nothing to spawn and nothing left in the task
235-
// registry, there's nothing more to do and we can break.
236-
// However, if the registry isn't empty, we need to know if there
237-
// are more things waiting to be polled before deciding to sleep.
238-
if self.registry.is_empty() {
239-
break;
240-
} else if queue_len == 0 {
241-
S::sleep(&self.alarm);
242-
}
243-
} else {
244-
// If there *are* things to spawn, those will go straight into
245-
// the poll queue, so we don't need to sleep here either.
246-
while let Some(future) = spawn_queue.pop_front() {
247-
self.spawn_obj(future)
248-
}
298+
if self.registry.is_empty() {
299+
break;
249300
}
301+
S::sleep(&self.alarm);
250302
}
251303
}
252304
}

0 commit comments

Comments
 (0)