1 | use crate::stream::StreamExt; |
2 | use core::pin::Pin; |
3 | use futures_core::future::{FusedFuture, Future}; |
4 | use futures_core::ready; |
5 | use futures_core::stream::FusedStream; |
6 | use futures_core::task::{Context, Poll}; |
7 | |
8 | /// Future for the [`select_next_some`](super::StreamExt::select_next_some) |
9 | /// method. |
10 | #[derive (Debug)] |
11 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
12 | pub struct SelectNextSome<'a, St: ?Sized> { |
13 | stream: &'a mut St, |
14 | } |
15 | |
16 | impl<'a, St: ?Sized> SelectNextSome<'a, St> { |
17 | pub(super) fn new(stream: &'a mut St) -> Self { |
18 | Self { stream } |
19 | } |
20 | } |
21 | |
22 | impl<St: ?Sized + FusedStream + Unpin> FusedFuture for SelectNextSome<'_, St> { |
23 | fn is_terminated(&self) -> bool { |
24 | self.stream.is_terminated() |
25 | } |
26 | } |
27 | |
28 | impl<St: ?Sized + FusedStream + Unpin> Future for SelectNextSome<'_, St> { |
29 | type Output = St::Item; |
30 | |
31 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
32 | assert!(!self.stream.is_terminated(), "SelectNextSome polled after terminated" ); |
33 | |
34 | if let Some(item: ::Item) = ready!(self.stream.poll_next_unpin(cx)) { |
35 | Poll::Ready(item) |
36 | } else { |
37 | debug_assert!(self.stream.is_terminated()); |
38 | cx.waker().wake_by_ref(); |
39 | Poll::Pending |
40 | } |
41 | } |
42 | } |
43 | |