1 | use core::pin::Pin; |
2 | use futures_core::ready; |
3 | use futures_core::stream::{FusedStream, Stream}; |
4 | use futures_core::task::{Context, Poll}; |
5 | #[cfg (feature = "sink" )] |
6 | use futures_sink::Sink; |
7 | use pin_project_lite::pin_project; |
8 | |
9 | pin_project! { |
10 | /// Stream for the [`fuse`](super::StreamExt::fuse) method. |
11 | #[derive(Debug)] |
12 | #[must_use = "streams do nothing unless polled" ] |
13 | pub struct Fuse<St> { |
14 | #[pin] |
15 | stream: St, |
16 | done: bool, |
17 | } |
18 | } |
19 | |
20 | impl<St> Fuse<St> { |
21 | pub(super) fn new(stream: St) -> Self { |
22 | Self { stream, done: false } |
23 | } |
24 | |
25 | /// Returns whether the underlying stream has finished or not. |
26 | /// |
27 | /// If this method returns `true`, then all future calls to poll are |
28 | /// guaranteed to return `None`. If this returns `false`, then the |
29 | /// underlying stream is still in use. |
30 | pub fn is_done(&self) -> bool { |
31 | self.done |
32 | } |
33 | |
34 | delegate_access_inner!(stream, St, ()); |
35 | } |
36 | |
37 | impl<S: Stream> FusedStream for Fuse<S> { |
38 | fn is_terminated(&self) -> bool { |
39 | self.done |
40 | } |
41 | } |
42 | |
43 | impl<S: Stream> Stream for Fuse<S> { |
44 | type Item = S::Item; |
45 | |
46 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> { |
47 | let this = self.project(); |
48 | |
49 | if *this.done { |
50 | return Poll::Ready(None); |
51 | } |
52 | |
53 | let item = ready!(this.stream.poll_next(cx)); |
54 | if item.is_none() { |
55 | *this.done = true; |
56 | } |
57 | Poll::Ready(item) |
58 | } |
59 | |
60 | fn size_hint(&self) -> (usize, Option<usize>) { |
61 | if self.done { |
62 | (0, Some(0)) |
63 | } else { |
64 | self.stream.size_hint() |
65 | } |
66 | } |
67 | } |
68 | |
69 | // Forwarding impl of Sink from the underlying stream |
70 | #[cfg (feature = "sink" )] |
71 | impl<S: Stream + Sink<Item>, Item> Sink<Item> for Fuse<S> { |
72 | type Error = S::Error; |
73 | |
74 | delegate_sink!(stream, Item); |
75 | } |
76 | |