From f1325084d1a34305af284802e2899a3c793b6afd Mon Sep 17 00:00:00 2001 From: Roman Proskuryakov Date: Wed, 28 Aug 2019 19:26:58 +0300 Subject: [PATCH 1/2] Use async/await syntax in Stdin::read_line and AsyncRead for Stdin --- src/io/stdin.rs | 151 ++++++++++++++---------------------------------- 1 file changed, 43 insertions(+), 108 deletions(-) diff --git a/src/io/stdin.rs b/src/io/stdin.rs index bd4c1118d..527dcdd77 100644 --- a/src/io/stdin.rs +++ b/src/io/stdin.rs @@ -1,9 +1,9 @@ use std::io; use std::pin::Pin; -use std::sync::Mutex; +use std::sync::Arc; +use futures::lock::Mutex; use cfg_if::cfg_if; -use futures::future; use futures::io::{AsyncRead, Initializer}; use crate::future::Future; @@ -29,12 +29,7 @@ use crate::task::{blocking, Context, Poll}; /// # Ok(()) }) } /// ``` pub fn stdin() -> Stdin { - Stdin(Mutex::new(State::Idle(Some(Inner { - stdin: io::stdin(), - line: String::new(), - buf: Vec::new(), - last_op: None, - })))) + Stdin(Arc::new(Mutex::new(io::stdin()))) } /// A handle to the standard input of the current process. @@ -45,22 +40,8 @@ pub fn stdin() -> Stdin { /// /// [`stdin`]: fn.stdin.html /// [`std::io::Stdin`]: https://doc.rust-lang.org/std/io/struct.Stdin.html -#[derive(Debug)] -pub struct Stdin(Mutex); - -/// The state of the asynchronous stdin. -/// -/// The stdin can be either idle or busy performing an asynchronous operation. -#[derive(Debug)] -enum State { - /// The stdin is idle. - Idle(Option), - - /// The stdin is blocked on an asynchronous operation. - /// - /// Awaiting this operation will result in the new state of the stdin. - Busy(blocking::JoinHandle), -} +#[derive(Debug, Clone)] +pub struct Stdin(Arc>); /// Inner representation of the asynchronous stdin. #[derive(Debug)] @@ -73,16 +54,6 @@ struct Inner { /// The write buffer. buf: Vec, - - /// The result of the last asynchronous operation on the stdin. - last_op: Option, -} - -/// Possible results of an asynchronous operation on the stdin. -#[derive(Debug)] -enum Operation { - ReadLine(io::Result), - Read(io::Result), } impl Stdin { @@ -102,89 +73,53 @@ impl Stdin { /// # Ok(()) }) } /// ``` pub async fn read_line(&self, buf: &mut String) -> io::Result { - future::poll_fn(|cx| { - let state = &mut *self.0.lock().unwrap(); - - loop { - match state { - State::Idle(opt) => { - let inner = opt.as_mut().unwrap(); - - // Check if the operation has completed. - if let Some(Operation::ReadLine(res)) = inner.last_op.take() { - let n = res?; - - // Copy the read data into the buffer and return. - buf.push_str(&inner.line); - return Poll::Ready(Ok(n)); - } else { - let mut inner = opt.take().unwrap(); - - // Start the operation asynchronously. - *state = State::Busy(blocking::spawn(async move { - inner.line.clear(); - let res = inner.stdin.read_line(&mut inner.line); - inner.last_op = Some(Operation::ReadLine(res)); - State::Idle(Some(inner)) - })); - } - } - // Poll the asynchronous operation the stdin is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), - } - } - }) - .await + let this = self.clone(); + + let handle = blocking::spawn(async move { + let io = this.0.lock().await; + + let mut line = String::new(); + let res = io.read_line(&mut line); + (res, line) + }); + let (res, line) = handle.await; + + let n = res?; + buf.push_str(&line); + + Ok(n) } } impl AsyncRead for Stdin { fn poll_read( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - let state = &mut *self.0.lock().unwrap(); - - loop { - match state { - State::Idle(opt) => { - let inner = opt.as_mut().unwrap(); - - // Check if the operation has completed. - if let Some(Operation::Read(res)) = inner.last_op.take() { - let n = res?; - - // If more data was read than fits into the buffer, let's retry the read - // operation. - if n <= buf.len() { - // Copy the read data into the buffer and return. - buf[..n].copy_from_slice(&inner.buf[..n]); - return Poll::Ready(Ok(n)); - } - } else { - let mut inner = opt.take().unwrap(); - - // Set the length of the inner buffer to the length of the provided buffer. - if inner.buf.len() < buf.len() { - inner.buf.reserve(buf.len() - inner.buf.len()); - } - unsafe { - inner.buf.set_len(buf.len()); - } - - // Start the operation asynchronously. - *state = State::Busy(blocking::spawn(async move { - let res = io::Read::read(&mut inner.stdin, &mut inner.buf); - inner.last_op = Some(Operation::Read(res)); - State::Idle(Some(inner)) - })); - } - } - // Poll the asynchronous operation the stdin is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), + let this = self.clone(); + let len = buf.len(); + + let handle = blocking::spawn(async move { + let mut io = this.0.lock().await; + + let mut inner_buf: Vec = Vec::with_capacity(len); + unsafe { + inner_buf.set_len(len); } - } + let res = io::Read::read(&mut *io, &mut inner_buf); + res.and_then(|n| { + unsafe { + inner_buf.set_len(n); + } + Ok((n, inner_buf)) + }) + }); + pin_utils::pin_mut!(handle); + handle.poll(cx).map_ok(|(n, inner_buf)| { + buf[..n].copy_from_slice(&inner_buf[..n]); + n + }) } #[inline] From 7850380f210037dde18828a7687954b9d0a7af5c Mon Sep 17 00:00:00 2001 From: Roman Proskuryakov Date: Wed, 28 Aug 2019 22:36:22 +0300 Subject: [PATCH 2/2] Reuse Stdin::Inner buffers in read_line and poll_read operations --- src/io/stdin.rs | 67 +++++++++++++++++++++++++++++++------------------ 1 file changed, 43 insertions(+), 24 deletions(-) diff --git a/src/io/stdin.rs b/src/io/stdin.rs index 527dcdd77..9595973e9 100644 --- a/src/io/stdin.rs +++ b/src/io/stdin.rs @@ -1,7 +1,8 @@ +use futures::lock::Mutex; use std::io; use std::pin::Pin; use std::sync::Arc; -use futures::lock::Mutex; +use std::sync::Mutex as StdMutex; use cfg_if::cfg_if; use futures::io::{AsyncRead, Initializer}; @@ -29,7 +30,11 @@ use crate::task::{blocking, Context, Poll}; /// # Ok(()) }) } /// ``` pub fn stdin() -> Stdin { - Stdin(Arc::new(Mutex::new(io::stdin()))) + Stdin(Mutex::new(Arc::new(StdMutex::new(Inner { + stdin: io::stdin(), + line: Default::default(), + buf: Default::default(), + })))) } /// A handle to the standard input of the current process. @@ -40,8 +45,8 @@ pub fn stdin() -> Stdin { /// /// [`stdin`]: fn.stdin.html /// [`std::io::Stdin`]: https://doc.rust-lang.org/std/io/struct.Stdin.html -#[derive(Debug, Clone)] -pub struct Stdin(Arc>); +#[derive(Debug)] +pub struct Stdin(Mutex>>); /// Inner representation of the asynchronous stdin. #[derive(Debug)] @@ -73,19 +78,26 @@ impl Stdin { /// # Ok(()) }) } /// ``` pub async fn read_line(&self, buf: &mut String) -> io::Result { - let this = self.clone(); + let future_lock = self.0.lock().await; + let mutex = future_lock.clone(); + // Start the operation asynchronously. let handle = blocking::spawn(async move { - let io = this.0.lock().await; + let mut guard = mutex.lock().unwrap(); + let inner: &mut Inner = &mut guard; - let mut line = String::new(); - let res = io.read_line(&mut line); - (res, line) + inner.line.clear(); + inner.stdin.read_line(&mut inner.line) }); - let (res, line) = handle.await; + let res = handle.await; let n = res?; - buf.push_str(&line); + + let mutex = future_lock.clone(); + let inner = mutex.lock().unwrap(); + + // Copy the read data into the buffer and return. + buf.push_str(&inner.line); Ok(n) } @@ -97,27 +109,34 @@ impl AsyncRead for Stdin { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - let this = self.clone(); let len = buf.len(); + let future_lock = self.0.lock(); + pin_utils::pin_mut!(future_lock); + let future_lock = futures::ready!(future_lock.poll(cx)); + + let mutex = future_lock.clone(); let handle = blocking::spawn(async move { - let mut io = this.0.lock().await; + let mut guard = mutex.lock().unwrap(); + let inner: &mut Inner = &mut guard; - let mut inner_buf: Vec = Vec::with_capacity(len); + // Set the length of the inner buffer to the length of the provided buffer. + if inner.buf.len() < len { + inner.buf.reserve(len - inner.buf.len()); + } unsafe { - inner_buf.set_len(len); + inner.buf.set_len(len); } - let res = io::Read::read(&mut *io, &mut inner_buf); - res.and_then(|n| { - unsafe { - inner_buf.set_len(n); - } - Ok((n, inner_buf)) - }) + + io::Read::read(&mut inner.stdin, &mut inner.buf) }); pin_utils::pin_mut!(handle); - handle.poll(cx).map_ok(|(n, inner_buf)| { - buf[..n].copy_from_slice(&inner_buf[..n]); + handle.poll(cx).map_ok(|n| { + let mutex = future_lock.clone(); + let inner = mutex.lock().unwrap(); + + // Copy the read data into the buffer and return. + buf[..n].copy_from_slice(&inner.buf[..n]); n }) }