-
Notifications
You must be signed in to change notification settings - Fork 339
add Stream::last #347
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add Stream::last #347
Changes from 5 commits
504ef07
041abb4
d120c58
54aaf29
5055821
cd5e439
5445b2b
6b9043a
4b60659
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
use std::pin::Pin; | ||
|
||
use crate::future::Future; | ||
use crate::stream::Stream; | ||
use crate::task::{Context, Poll}; | ||
|
||
#[doc(hidden)] | ||
#[allow(missing_debug_implementations)] | ||
pub struct LastFuture<S, T> { | ||
stream: S, | ||
last: Option<T>, | ||
} | ||
|
||
impl<S, T> LastFuture<S, T> { | ||
pin_utils::unsafe_pinned!(stream: S); | ||
pin_utils::unsafe_pinned!(last: Option<T>); | ||
|
||
pub(crate) fn new(stream: S) -> Self { | ||
LastFuture { stream, last: None } | ||
} | ||
} | ||
|
||
impl<S> Future for LastFuture<S, S::Item> | ||
where | ||
S: Stream + Unpin + Sized, | ||
S::Item: Copy, | ||
{ | ||
type Output = Option<S::Item>; | ||
|
||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); | ||
|
||
match next { | ||
Some(new) => { | ||
cx.waker().wake_by_ref(); | ||
*self.as_mut().last() = Some(new); | ||
starsheriff marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Poll::Pending | ||
} | ||
None => Poll::Ready(self.last), | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -36,6 +36,7 @@ mod fuse; | |||||
mod ge; | ||||||
mod gt; | ||||||
mod inspect; | ||||||
mod last; | ||||||
mod le; | ||||||
mod lt; | ||||||
mod map; | ||||||
|
@@ -62,6 +63,7 @@ use find::FindFuture; | |||||
use find_map::FindMapFuture; | ||||||
use fold::FoldFuture; | ||||||
use for_each::ForEachFuture; | ||||||
use last::LastFuture; | ||||||
use ge::GeFuture; | ||||||
use gt::GtFuture; | ||||||
use le::LeFuture; | ||||||
|
@@ -456,6 +458,38 @@ extension_trait! { | |||||
Inspect::new(self, f) | ||||||
} | ||||||
|
||||||
#[doc = r#" | ||||||
Returns the last element of the stream. | ||||||
|
||||||
# Examples | ||||||
|
||||||
Basic usage: | ||||||
|
||||||
``` | ||||||
# fn main() { async_std::task::block_on(async { | ||||||
# | ||||||
use std::collections::VecDeque; | ||||||
|
||||||
use async_std::prelude::*; | ||||||
|
||||||
let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect(); | ||||||
|
||||||
let last = s.last().await; | ||||||
assert_eq!(last, Some(3)); | ||||||
# | ||||||
# }) } | ||||||
``` | ||||||
|
||||||
"#] | ||||||
fn last( | ||||||
self, | ||||||
) -> impl Future<Output = Option<Self::Item>> + '_ [LastFuture<Self, Self::Item>] | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. right, the lifetime is not needed. I got a bit lost in the type signature I guess 😬 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No worries, thanks! :) |
||||||
where | ||||||
Self: Sized, | ||||||
{ | ||||||
LastFuture::new(self) | ||||||
} | ||||||
|
||||||
#[doc = r#" | ||||||
Transforms this `Stream` into a "fused" `Stream` such that after the first time | ||||||
`poll` returns `Poll::Ready(None)`, all future calls to `poll` will also return | ||||||
|
Uh oh!
There was an error while loading. Please reload this page.