1use crate::Stream;
2use pin_project_lite::pin_project;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6pin_project! {
7 /// A `Stream` that wraps the values in an `Option`.
8 ///
9 /// Whenever the wrapped stream yields an item, this stream yields that item
10 /// wrapped in `Some`. When the inner stream ends, then this stream first
11 /// yields a `None` item, and then this stream will also end.
12 ///
13 /// # Example
14 ///
15 /// Using `StreamNotifyClose` to handle closed streams with `StreamMap`.
16 ///
17 /// ```
18 /// use tokio_stream::{StreamExt, StreamMap, StreamNotifyClose};
19 ///
20 /// #[tokio::main]
21 /// async fn main() {
22 /// let mut map = StreamMap::new();
23 /// let stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
24 /// let stream2 = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
25 /// map.insert(0, stream);
26 /// map.insert(1, stream2);
27 /// while let Some((key, val)) = map.next().await {
28 /// match val {
29 /// Some(val) => println!("got {val:?} from stream {key:?}"),
30 /// None => println!("stream {key:?} closed"),
31 /// }
32 /// }
33 /// }
34 /// ```
35 #[must_use = "streams do nothing unless polled"]
36 pub struct StreamNotifyClose<S> {
37 #[pin]
38 inner: Option<S>,
39 }
40}
41
42impl<S> StreamNotifyClose<S> {
43 /// Create a new `StreamNotifyClose`.
44 pub fn new(stream: S) -> Self {
45 Self {
46 inner: Some(stream),
47 }
48 }
49
50 /// Get back the inner `Stream`.
51 ///
52 /// Returns `None` if the stream has reached its end.
53 pub fn into_inner(self) -> Option<S> {
54 self.inner
55 }
56}
57
58impl<S> Stream for StreamNotifyClose<S>
59where
60 S: Stream,
61{
62 type Item = Option<S::Item>;
63
64 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
65 // We can't invoke poll_next after it ended, so we unset the inner stream as a marker.
66 match self
67 .as_mut()
68 .project()
69 .inner
70 .as_pin_mut()
71 .map(|stream| S::poll_next(stream, cx))
72 {
73 Some(Poll::Ready(Some(item))) => Poll::Ready(Some(Some(item))),
74 Some(Poll::Ready(None)) => {
75 self.project().inner.set(None);
76 Poll::Ready(Some(None))
77 }
78 Some(Poll::Pending) => Poll::Pending,
79 None => Poll::Ready(None),
80 }
81 }
82
83 #[inline]
84 fn size_hint(&self) -> (usize, Option<usize>) {
85 if let Some(inner) = &self.inner {
86 // We always return +1 because when there's stream there's atleast one more item.
87 let (l, u) = inner.size_hint();
88 (l.saturating_add(1), u.and_then(|u| u.checked_add(1)))
89 } else {
90 (0, Some(0))
91 }
92 }
93}
94