1 | use core::cmp; |
2 | use core::pin::Pin; |
3 | use futures_core::ready; |
4 | use futures_core::stream::{FusedStream, Stream}; |
5 | use futures_core::task::{Context, Poll}; |
6 | #[cfg (feature = "sink" )] |
7 | use futures_sink::Sink; |
8 | use pin_project_lite::pin_project; |
9 | |
10 | pin_project! { |
11 | /// Stream for the [`take`](super::StreamExt::take) method. |
12 | #[derive(Debug)] |
13 | #[must_use = "streams do nothing unless polled" ] |
14 | pub struct Take<St> { |
15 | #[pin] |
16 | stream: St, |
17 | remaining: usize, |
18 | } |
19 | } |
20 | |
21 | impl<St: Stream> Take<St> { |
22 | pub(super) fn new(stream: St, n: usize) -> Self { |
23 | Self { stream, remaining: n } |
24 | } |
25 | |
26 | delegate_access_inner!(stream, St, ()); |
27 | } |
28 | |
29 | impl<St> Stream for Take<St> |
30 | where |
31 | St: Stream, |
32 | { |
33 | type Item = St::Item; |
34 | |
35 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> { |
36 | if self.remaining == 0 { |
37 | Poll::Ready(None) |
38 | } else { |
39 | let this = self.project(); |
40 | let next = ready!(this.stream.poll_next(cx)); |
41 | if next.is_some() { |
42 | *this.remaining -= 1; |
43 | } else { |
44 | *this.remaining = 0; |
45 | } |
46 | Poll::Ready(next) |
47 | } |
48 | } |
49 | |
50 | fn size_hint(&self) -> (usize, Option<usize>) { |
51 | if self.remaining == 0 { |
52 | return (0, Some(0)); |
53 | } |
54 | |
55 | let (lower, upper) = self.stream.size_hint(); |
56 | |
57 | let lower = cmp::min(lower, self.remaining); |
58 | |
59 | let upper = match upper { |
60 | Some(x) if x < self.remaining => Some(x), |
61 | _ => Some(self.remaining), |
62 | }; |
63 | |
64 | (lower, upper) |
65 | } |
66 | } |
67 | |
68 | impl<St> FusedStream for Take<St> |
69 | where |
70 | St: FusedStream, |
71 | { |
72 | fn is_terminated(&self) -> bool { |
73 | self.remaining == 0 || self.stream.is_terminated() |
74 | } |
75 | } |
76 | |
77 | // Forwarding impl of Sink from the underlying stream |
78 | #[cfg (feature = "sink" )] |
79 | impl<S, Item> Sink<Item> for Take<S> |
80 | where |
81 | S: Stream + Sink<Item>, |
82 | { |
83 | type Error = S::Error; |
84 | |
85 | delegate_sink!(stream, Item); |
86 | } |
87 | |