1use crate::stream::{Fuse, FusedStream, FuturesOrdered, StreamExt};
2use core::fmt;
3use core::pin::Pin;
4use futures_core::future::Future;
5use futures_core::ready;
6use futures_core::stream::Stream;
7use futures_core::task::{Context, Poll};
8#[cfg(feature = "sink")]
9use futures_sink::Sink;
10use pin_project_lite::pin_project;
11
12pin_project! {
13 /// Stream for the [`buffered`](super::StreamExt::buffered) method.
14 #[must_use = "streams do nothing unless polled"]
15 pub struct Buffered<St>
16 where
17 St: Stream,
18 St::Item: Future,
19 {
20 #[pin]
21 stream: Fuse<St>,
22 in_progress_queue: FuturesOrdered<St::Item>,
23 max: usize,
24 }
25}
26
27impl<St> fmt::Debug for Buffered<St>
28where
29 St: Stream + fmt::Debug,
30 St::Item: Future,
31{
32 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33 f&mut DebugStruct<'_, '_>.debug_struct("Buffered")
34 .field("stream", &self.stream)
35 .field("in_progress_queue", &self.in_progress_queue)
36 .field(name:"max", &self.max)
37 .finish()
38 }
39}
40
41impl<St> Buffered<St>
42where
43 St: Stream,
44 St::Item: Future,
45{
46 pub(super) fn new(stream: St, n: usize) -> Self {
47 Self { stream: super::Fuse::new(stream), in_progress_queue: FuturesOrdered::new(), max: n }
48 }
49
50 delegate_access_inner!(stream, St, (.));
51}
52
53impl<St> Stream for Buffered<St>
54where
55 St: Stream,
56 St::Item: Future,
57{
58 type Item = <St::Item as Future>::Output;
59
60 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
61 let mut this = self.project();
62
63 // First up, try to spawn off as many futures as possible by filling up
64 // our queue of futures.
65 while this.in_progress_queue.len() < *this.max {
66 match this.stream.as_mut().poll_next(cx) {
67 Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut),
68 Poll::Ready(None) | Poll::Pending => break,
69 }
70 }
71
72 // Attempt to pull the next value from the in_progress_queue
73 let res = this.in_progress_queue.poll_next_unpin(cx);
74 if let Some(val) = ready!(res) {
75 return Poll::Ready(Some(val));
76 }
77
78 // If more values are still coming from the stream, we're not done yet
79 if this.stream.is_done() {
80 Poll::Ready(None)
81 } else {
82 Poll::Pending
83 }
84 }
85
86 fn size_hint(&self) -> (usize, Option<usize>) {
87 let queue_len = self.in_progress_queue.len();
88 let (lower, upper) = self.stream.size_hint();
89 let lower = lower.saturating_add(queue_len);
90 let upper = match upper {
91 Some(x) => x.checked_add(queue_len),
92 None => None,
93 };
94 (lower, upper)
95 }
96}
97
98impl<St> FusedStream for Buffered<St>
99where
100 St: Stream,
101 St::Item: Future,
102{
103 fn is_terminated(&self) -> bool {
104 self.stream.is_done() && self.in_progress_queue.is_terminated()
105 }
106}
107
108// Forwarding impl of Sink from the underlying stream
109#[cfg(feature = "sink")]
110impl<S, Item> Sink<Item> for Buffered<S>
111where
112 S: Stream + Sink<Item>,
113 S::Item: Future,
114{
115 type Error = S::Error;
116
117 delegate_sink!(stream, Item);
118}
119