Skip to content

Commit 350e3fe

Browse files
committed
Switch from futures-cpupool to tokio-threadpool
This is a breaking change as the server must now be spawned on a multi-threaded `tokio::runtime::Runtime`. Tokio can manage the blocking threads more efficiently than we can with a separate pool.
1 parent 56bdfc9 commit 350e3fe

File tree

3 files changed

+100
-45
lines changed

3 files changed

+100
-45
lines changed

Cargo.toml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "conduit-hyper"
3-
version = "0.1.3"
3+
version = "0.2.0-a.0"
44
authors = ["Justin Geibel <[email protected]>"]
55
license = "MIT OR Apache-2.0"
66
description = "Host a conduit based web application on a hyper server"
@@ -11,8 +11,13 @@ edition = "2018"
1111
[dependencies]
1212
conduit = "0.8"
1313
futures = "0.1"
14-
futures-cpupool = "0.1"
1514
hyper = "0.12"
1615
http = "0.1"
1716
log = "0.4"
1817
semver = "0.5" # Must match version in conduit for now
18+
tokio-threadpool = "0.1.12"
19+
20+
[dev-dependencies]
21+
conduit = "0.8"
22+
conduit-router = "0.8"
23+
tokio = "0.1"

src/lib.rs

Lines changed: 56 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,27 @@ use std::path::{Component, Path, PathBuf};
99
use std::sync::Arc;
1010

1111
use futures::{future, Future, Stream};
12-
use futures_cpupool::CpuPool;
13-
use hyper::{Body, Chunk, Method, Request, Response, Server, StatusCode, Version};
12+
use hyper::{Body, Chunk, Method, Request, Response, StatusCode, Version};
1413
use log::error;
1514

1615
// Consumers of this library need access to this particular version of `semver`
1716
pub use semver;
1817

18+
/// A builder for a `hyper::Server`
19+
#[derive(Debug)]
20+
pub struct Server;
21+
22+
impl Server {
23+
/// Bind a handler to an address
24+
pub fn bind<H: conduit::Handler>(
25+
addr: &SocketAddr,
26+
handler: H,
27+
) -> hyper::Server<hyper::server::conn::AddrIncoming, Service<H>> {
28+
let service = Service::new(handler);
29+
hyper::Server::bind(&addr).serve(service)
30+
}
31+
}
32+
1933
#[derive(Debug)]
2034
struct Parts(http::request::Parts);
2135

@@ -67,7 +81,7 @@ struct ConduitRequest {
6781
parts: Parts,
6882
path: String,
6983
body: Cursor<Chunk>,
70-
extensions: conduit::Extensions,
84+
extensions: conduit::Extensions, // makes struct non-Send
7185
}
7286

7387
impl conduit::Request for ConduitRequest {
@@ -157,8 +171,34 @@ impl conduit::Request for ConduitRequest {
157171
}
158172
}
159173

174+
/// Owned data consumed by the worker thread
175+
///
176+
/// `ConduitRequest` cannot be sent between threads, so the input data is
177+
/// captured on a core thread and taken by the worker thread.
178+
struct RequestInfo(Option<(Parts, Chunk)>);
179+
180+
impl RequestInfo {
181+
/// Save the request info that can be sent between threads
182+
fn new(parts: http::request::Parts, body: Chunk) -> Self {
183+
let tuple = (Parts(parts), body);
184+
Self(Some(tuple))
185+
}
186+
187+
/// Take back the request info
188+
///
189+
/// Call this from the worker thread to obtain ownership of the `Send` data
190+
///
191+
/// # Panics
192+
///
193+
/// Panics if called more than once on a value
194+
fn take(&mut self) -> (Parts, Chunk) {
195+
self.0.take().expect("called take multiple times")
196+
}
197+
}
198+
160199
impl ConduitRequest {
161-
fn new(parts: Parts, body: Chunk) -> ConduitRequest {
200+
fn new(info: &mut RequestInfo) -> Self {
201+
let (parts, body) = info.take();
162202
let path = parts.0.uri.path().to_string();
163203
let path = Path::new(&path);
164204
let path = path
@@ -183,7 +223,7 @@ impl ConduitRequest {
183223
.to_string_lossy()
184224
.to_string(); // non-Unicode is replaced with U+FFFD REPLACEMENT CHARACTER
185225

186-
ConduitRequest {
226+
Self {
187227
parts,
188228
path,
189229
body: Cursor::new(body),
@@ -195,15 +235,13 @@ impl ConduitRequest {
195235
/// Serve a `conduit::Handler` on a thread pool
196236
#[derive(Debug)]
197237
pub struct Service<H> {
198-
pool: CpuPool,
199238
handler: Arc<H>,
200239
}
201240

202241
// #[derive(Clone)] results in cloning a ref, and not the Service
203242
impl<H> Clone for Service<H> {
204243
fn clone(&self) -> Self {
205244
Service {
206-
pool: self.pool.clone(),
207245
handler: self.handler.clone(),
208246
}
209247
}
@@ -230,39 +268,32 @@ impl<H: conduit::Handler> hyper::service::Service for Service<H> {
230268

231269
/// Returns a future which buffers the response body and then calls the conduit handler from a thread pool
232270
fn call(&mut self, request: Request<Self::ReqBody>) -> Self::Future {
233-
let pool = self.pool.clone();
234271
let handler = self.handler.clone();
235272

236273
let (parts, body) = request.into_parts();
237274
let response = body.concat2().and_then(move |full_body| {
238-
pool.spawn_fn(move || {
239-
let mut request = ConduitRequest::new(Parts(parts), full_body);
240-
let response = handler
241-
.call(&mut request)
242-
.map(good_response)
243-
.unwrap_or_else(|e| error_response(e.description()));
244-
245-
Ok(response)
275+
let mut request_info = RequestInfo::new(parts, full_body);
276+
future::poll_fn(move || {
277+
tokio_threadpool::blocking(|| {
278+
let mut request = ConduitRequest::new(&mut request_info);
279+
handler
280+
.call(&mut request)
281+
.map(good_response)
282+
.unwrap_or_else(|e| error_response(e.description()))
283+
})
284+
.map_err(|_| panic!("the threadpool shut down"))
246285
})
247286
});
248287
Box::new(response)
249288
}
250289
}
251290

252291
impl<H: conduit::Handler> Service<H> {
253-
/// Create a multi-threaded `Service` from a `Handler`
254-
pub fn new(handler: H, threads: usize) -> Service<H> {
292+
fn new(handler: H) -> Self {
255293
Service {
256-
pool: CpuPool::new(threads),
257294
handler: Arc::new(handler),
258295
}
259296
}
260-
261-
/// Run the `Service` bound to a given `SocketAddr`
262-
pub fn run(&self, addr: SocketAddr) {
263-
let server = Server::bind(&addr).serve(self.clone());
264-
hyper::rt::run(server.map_err(|e| error!("Server error: {}", e)));
265-
}
266297
}
267298

268299
/// Builds a `hyper::Response` given a `conduit:Response`

src/tests.rs

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,25 @@ fn build_headers(msg: &str) -> HashMap<String, Vec<String>> {
7373
headers
7474
}
7575

76+
fn build_threadpool() -> tokio::runtime::Runtime {
77+
tokio::runtime::Builder::new()
78+
.core_threads(1)
79+
.blocking_threads(1)
80+
.build()
81+
.unwrap()
82+
}
83+
7684
fn simulate_request<H: Handler>(handler: H) -> hyper::Response<hyper::Body> {
7785
use hyper::service::{NewService, Service};
7886

79-
let new_service = super::Service::new(handler, 1);
80-
let mut service = new_service.new_service().wait().unwrap();
81-
service.call(hyper::Request::default()).wait().unwrap()
87+
let mut pool = build_threadpool();
88+
89+
pool.block_on(futures::lazy(|| {
90+
let new_service = super::Service::new(handler);
91+
let mut service = new_service.new_service().wait().unwrap();
92+
service.call(hyper::Request::default()).wait()
93+
}))
94+
.unwrap()
8295
}
8396

8497
fn into_chunk(resp: hyper::Response<hyper::Body>) -> hyper::Chunk {
@@ -122,19 +135,25 @@ fn recover_from_panic() {
122135
fn normalize_path() {
123136
use hyper::service::{NewService, Service};
124137

125-
let new_service = super::Service::new(AssertPathNormalized, 1);
126-
let mut service = new_service.new_service().wait().unwrap();
127-
let req = hyper::Request::put("//removed/.././.././normalized")
128-
.body(hyper::Body::default())
129-
.unwrap();
130-
let resp = service.call(req).wait().unwrap();
131-
assert_eq!(resp.status(), 200);
132-
assert_eq!(resp.headers().len(), 1);
133-
134-
let req = hyper::Request::put("//normalized")
135-
.body(hyper::Body::default())
136-
.unwrap();
137-
let resp = service.call(req).wait().unwrap();
138-
assert_eq!(resp.status(), 200);
139-
assert_eq!(resp.headers().len(), 1);
138+
let mut pool = build_threadpool();
139+
140+
pool.block_on(futures::lazy(|| {
141+
let new_service = super::Service::new(AssertPathNormalized);
142+
let mut service = new_service.new_service().wait().unwrap();
143+
let req = hyper::Request::put("//removed/.././.././normalized")
144+
.body(hyper::Body::default())
145+
.unwrap();
146+
let resp = service.call(req).wait().unwrap();
147+
assert_eq!(resp.status(), 200);
148+
assert_eq!(resp.headers().len(), 1);
149+
150+
let req = hyper::Request::put("//normalized")
151+
.body(hyper::Body::default())
152+
.unwrap();
153+
let resp = service.call(req).wait().unwrap();
154+
assert_eq!(resp.status(), 200);
155+
assert_eq!(resp.headers().len(), 1);
156+
Ok::<_, ()>(())
157+
}))
158+
.unwrap()
140159
}

0 commit comments

Comments
 (0)