| 1 | use crate::Stream; |
| 2 | use pin_project_lite::pin_project; |
| 3 | use std::pin::Pin; |
| 4 | use std::task::{Context, Poll}; |
| 5 | |
| 6 | pin_project! { |
| 7 | /// A `Stream` that wraps the values in an `Option`. |
| 8 | /// |
| 9 | /// Whenever the wrapped stream yields an item, this stream yields that item |
| 10 | /// wrapped in `Some`. When the inner stream ends, then this stream first |
| 11 | /// yields a `None` item, and then this stream will also end. |
| 12 | /// |
| 13 | /// # Example |
| 14 | /// |
| 15 | /// Using `StreamNotifyClose` to handle closed streams with `StreamMap`. |
| 16 | /// |
| 17 | /// ``` |
| 18 | /// use tokio_stream::{StreamExt, StreamMap, StreamNotifyClose}; |
| 19 | /// |
| 20 | /// #[tokio::main] |
| 21 | /// async fn main() { |
| 22 | /// let mut map = StreamMap::new(); |
| 23 | /// let stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1])); |
| 24 | /// let stream2 = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1])); |
| 25 | /// map.insert(0, stream); |
| 26 | /// map.insert(1, stream2); |
| 27 | /// while let Some((key, val)) = map.next().await { |
| 28 | /// match val { |
| 29 | /// Some(val) => println!("got {val:?} from stream {key:?}"), |
| 30 | /// None => println!("stream {key:?} closed"), |
| 31 | /// } |
| 32 | /// } |
| 33 | /// } |
| 34 | /// ``` |
| 35 | #[must_use = "streams do nothing unless polled" ] |
| 36 | pub struct StreamNotifyClose<S> { |
| 37 | #[pin] |
| 38 | inner: Option<S>, |
| 39 | } |
| 40 | } |
| 41 | |
| 42 | impl<S> StreamNotifyClose<S> { |
| 43 | /// Create a new `StreamNotifyClose`. |
| 44 | pub fn new(stream: S) -> Self { |
| 45 | Self { |
| 46 | inner: Some(stream), |
| 47 | } |
| 48 | } |
| 49 | |
| 50 | /// Get back the inner `Stream`. |
| 51 | /// |
| 52 | /// Returns `None` if the stream has reached its end. |
| 53 | pub fn into_inner(self) -> Option<S> { |
| 54 | self.inner |
| 55 | } |
| 56 | } |
| 57 | |
| 58 | impl<S> Stream for StreamNotifyClose<S> |
| 59 | where |
| 60 | S: Stream, |
| 61 | { |
| 62 | type Item = Option<S::Item>; |
| 63 | |
| 64 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| 65 | // We can't invoke poll_next after it ended, so we unset the inner stream as a marker. |
| 66 | match self |
| 67 | .as_mut() |
| 68 | .project() |
| 69 | .inner |
| 70 | .as_pin_mut() |
| 71 | .map(|stream| S::poll_next(stream, cx)) |
| 72 | { |
| 73 | Some(Poll::Ready(Some(item))) => Poll::Ready(Some(Some(item))), |
| 74 | Some(Poll::Ready(None)) => { |
| 75 | self.project().inner.set(None); |
| 76 | Poll::Ready(Some(None)) |
| 77 | } |
| 78 | Some(Poll::Pending) => Poll::Pending, |
| 79 | None => Poll::Ready(None), |
| 80 | } |
| 81 | } |
| 82 | |
| 83 | #[inline ] |
| 84 | fn size_hint(&self) -> (usize, Option<usize>) { |
| 85 | if let Some(inner) = &self.inner { |
| 86 | // We always return +1 because when there's stream there's atleast one more item. |
| 87 | let (l, u) = inner.size_hint(); |
| 88 | (l.saturating_add(1), u.and_then(|u| u.checked_add(1))) |
| 89 | } else { |
| 90 | (0, Some(0)) |
| 91 | } |
| 92 | } |
| 93 | } |
| 94 | |