1 | use crate::Stream; |
2 | use pin_project_lite::pin_project; |
3 | use std::pin::Pin; |
4 | use std::task::{Context, Poll}; |
5 | |
6 | pin_project! { |
7 | /// A `Stream` that wraps the values in an `Option`. |
8 | /// |
9 | /// Whenever the wrapped stream yields an item, this stream yields that item |
10 | /// wrapped in `Some`. When the inner stream ends, then this stream first |
11 | /// yields a `None` item, and then this stream will also end. |
12 | /// |
13 | /// # Example |
14 | /// |
15 | /// Using `StreamNotifyClose` to handle closed streams with `StreamMap`. |
16 | /// |
17 | /// ``` |
18 | /// use tokio_stream::{StreamExt, StreamMap, StreamNotifyClose}; |
19 | /// |
20 | /// #[tokio::main] |
21 | /// async fn main() { |
22 | /// let mut map = StreamMap::new(); |
23 | /// let stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1])); |
24 | /// let stream2 = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1])); |
25 | /// map.insert(0, stream); |
26 | /// map.insert(1, stream2); |
27 | /// while let Some((key, val)) = map.next().await { |
28 | /// match val { |
29 | /// Some(val) => println!("got {val:?} from stream {key:?}"), |
30 | /// None => println!("stream {key:?} closed"), |
31 | /// } |
32 | /// } |
33 | /// } |
34 | /// ``` |
35 | #[must_use = "streams do nothing unless polled" ] |
36 | pub struct StreamNotifyClose<S> { |
37 | #[pin] |
38 | inner: Option<S>, |
39 | } |
40 | } |
41 | |
42 | impl<S> StreamNotifyClose<S> { |
43 | /// Create a new `StreamNotifyClose`. |
44 | pub fn new(stream: S) -> Self { |
45 | Self { |
46 | inner: Some(stream), |
47 | } |
48 | } |
49 | |
50 | /// Get back the inner `Stream`. |
51 | /// |
52 | /// Returns `None` if the stream has reached its end. |
53 | pub fn into_inner(self) -> Option<S> { |
54 | self.inner |
55 | } |
56 | } |
57 | |
58 | impl<S> Stream for StreamNotifyClose<S> |
59 | where |
60 | S: Stream, |
61 | { |
62 | type Item = Option<S::Item>; |
63 | |
64 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
65 | // We can't invoke poll_next after it ended, so we unset the inner stream as a marker. |
66 | match self |
67 | .as_mut() |
68 | .project() |
69 | .inner |
70 | .as_pin_mut() |
71 | .map(|stream| S::poll_next(stream, cx)) |
72 | { |
73 | Some(Poll::Ready(Some(item))) => Poll::Ready(Some(Some(item))), |
74 | Some(Poll::Ready(None)) => { |
75 | self.project().inner.set(None); |
76 | Poll::Ready(Some(None)) |
77 | } |
78 | Some(Poll::Pending) => Poll::Pending, |
79 | None => Poll::Ready(None), |
80 | } |
81 | } |
82 | |
83 | #[inline ] |
84 | fn size_hint(&self) -> (usize, Option<usize>) { |
85 | if let Some(inner) = &self.inner { |
86 | // We always return +1 because when there's stream there's atleast one more item. |
87 | let (l, u) = inner.size_hint(); |
88 | (l.saturating_add(1), u.and_then(|u| u.checked_add(1))) |
89 | } else { |
90 | (0, Some(0)) |
91 | } |
92 | } |
93 | } |
94 | |