1use crate::Stream;
2
3use core::cmp;
4use core::fmt;
5use core::pin::Pin;
6use core::task::{Context, Poll};
7use pin_project_lite::pin_project;
8
9pin_project! {
10 /// Stream for the [`take`](super::StreamExt::take) method.
11 #[must_use = "streams do nothing unless polled"]
12 pub struct Take<St> {
13 #[pin]
14 stream: St,
15 remaining: usize,
16 }
17}
18
19impl<St> fmt::Debug for Take<St>
20where
21 St: fmt::Debug,
22{
23 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24 f.debug_struct("Take")
25 .field("stream", &self.stream)
26 .finish()
27 }
28}
29
30impl<St> Take<St> {
31 pub(super) fn new(stream: St, remaining: usize) -> Self {
32 Self { stream, remaining }
33 }
34}
35
36impl<St> Stream for Take<St>
37where
38 St: Stream,
39{
40 type Item = St::Item;
41
42 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43 if *self.as_mut().project().remaining > 0 {
44 self.as_mut().project().stream.poll_next(cx).map(|ready| {
45 match &ready {
46 Some(_) => {
47 *self.as_mut().project().remaining -= 1;
48 }
49 None => {
50 *self.as_mut().project().remaining = 0;
51 }
52 }
53 ready
54 })
55 } else {
56 Poll::Ready(None)
57 }
58 }
59
60 fn size_hint(&self) -> (usize, Option<usize>) {
61 if self.remaining == 0 {
62 return (0, Some(0));
63 }
64
65 let (lower, upper) = self.stream.size_hint();
66
67 let lower = cmp::min(lower, self.remaining);
68
69 let upper = match upper {
70 Some(x) if x < self.remaining => Some(x),
71 _ => Some(self.remaining),
72 };
73
74 (lower, upper)
75 }
76}
77