| 1 | use crate::Stream; |
| 2 | |
| 3 | use core::future::Future; |
| 4 | use core::marker::PhantomPinned; |
| 5 | use core::pin::Pin; |
| 6 | use core::task::{Context, Poll}; |
| 7 | use pin_project_lite::pin_project; |
| 8 | |
| 9 | pin_project! { |
| 10 | /// Future for the [`all`](super::StreamExt::all) method. |
| 11 | #[derive(Debug)] |
| 12 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
| 13 | pub struct AllFuture<'a, St: ?Sized, F> { |
| 14 | stream: &'a mut St, |
| 15 | f: F, |
| 16 | // Make this future `!Unpin` for compatibility with async trait methods. |
| 17 | #[pin] |
| 18 | _pin: PhantomPinned, |
| 19 | } |
| 20 | } |
| 21 | |
| 22 | impl<'a, St: ?Sized, F> AllFuture<'a, St, F> { |
| 23 | pub(super) fn new(stream: &'a mut St, f: F) -> Self { |
| 24 | Self { |
| 25 | stream, |
| 26 | f, |
| 27 | _pin: PhantomPinned, |
| 28 | } |
| 29 | } |
| 30 | } |
| 31 | |
| 32 | impl<St, F> Future for AllFuture<'_, St, F> |
| 33 | where |
| 34 | St: ?Sized + Stream + Unpin, |
| 35 | F: FnMut(St::Item) -> bool, |
| 36 | { |
| 37 | type Output = bool; |
| 38 | |
| 39 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 40 | let me = self.project(); |
| 41 | let mut stream = Pin::new(me.stream); |
| 42 | |
| 43 | // Take a maximum of 32 items from the stream before yielding. |
| 44 | for _ in 0..32 { |
| 45 | match futures_core::ready!(stream.as_mut().poll_next(cx)) { |
| 46 | Some(v) => { |
| 47 | if !(me.f)(v) { |
| 48 | return Poll::Ready(false); |
| 49 | } |
| 50 | } |
| 51 | None => return Poll::Ready(true), |
| 52 | } |
| 53 | } |
| 54 | |
| 55 | cx.waker().wake_by_ref(); |
| 56 | Poll::Pending |
| 57 | } |
| 58 | } |
| 59 | |