1use core::cmp;
2use core::pin::Pin;
3use futures_core::ready;
4use futures_core::stream::{FusedStream, Stream};
5use futures_core::task::{Context, Poll};
6#[cfg(feature = "sink")]
7use futures_sink::Sink;
8use pin_project_lite::pin_project;
9
10pin_project! {
11 /// Stream for the [`take`](super::StreamExt::take) method.
12 #[derive(Debug)]
13 #[must_use = "streams do nothing unless polled"]
14 pub struct Take<St> {
15 #[pin]
16 stream: St,
17 remaining: usize,
18 }
19}
20
21impl<St: Stream> Take<St> {
22 pub(super) fn new(stream: St, n: usize) -> Self {
23 Self { stream, remaining: n }
24 }
25
26 delegate_access_inner!(stream, St, ());
27}
28
29impl<St> Stream for Take<St>
30where
31 St: Stream,
32{
33 type Item = St::Item;
34
35 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
36 if self.remaining == 0 {
37 Poll::Ready(None)
38 } else {
39 let this = self.project();
40 let next = ready!(this.stream.poll_next(cx));
41 if next.is_some() {
42 *this.remaining -= 1;
43 } else {
44 *this.remaining = 0;
45 }
46 Poll::Ready(next)
47 }
48 }
49
50 fn size_hint(&self) -> (usize, Option<usize>) {
51 if self.remaining == 0 {
52 return (0, Some(0));
53 }
54
55 let (lower, upper) = self.stream.size_hint();
56
57 let lower = cmp::min(lower, self.remaining);
58
59 let upper = match upper {
60 Some(x) if x < self.remaining => Some(x),
61 _ => Some(self.remaining),
62 };
63
64 (lower, upper)
65 }
66}
67
68impl<St> FusedStream for Take<St>
69where
70 St: FusedStream,
71{
72 fn is_terminated(&self) -> bool {
73 self.remaining == 0 || self.stream.is_terminated()
74 }
75}
76
77// Forwarding impl of Sink from the underlying stream
78#[cfg(feature = "sink")]
79impl<S, Item> Sink<Item> for Take<S>
80where
81 S: Stream + Sink<Item>,
82{
83 type Error = S::Error;
84
85 delegate_sink!(stream, Item);
86}
87