1use crate::stream::{Fuse, FuturesUnordered, StreamExt};
2use core::fmt;
3use core::pin::Pin;
4use futures_core::future::Future;
5use futures_core::stream::{FusedStream, Stream};
6use futures_core::task::{Context, Poll};
7#[cfg(feature = "sink")]
8use futures_sink::Sink;
9use pin_project_lite::pin_project;
10
11pin_project! {
12 /// Stream for the [`buffer_unordered`](super::StreamExt::buffer_unordered)
13 /// method.
14 #[must_use = "streams do nothing unless polled"]
15 pub struct BufferUnordered<St>
16 where
17 St: Stream,
18 {
19 #[pin]
20 stream: Fuse<St>,
21 in_progress_queue: FuturesUnordered<St::Item>,
22 max: usize,
23 }
24}
25
26impl<St> fmt::Debug for BufferUnordered<St>
27where
28 St: Stream + fmt::Debug,
29{
30 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31 f&mut DebugStruct<'_, '_>.debug_struct("BufferUnordered")
32 .field("stream", &self.stream)
33 .field("in_progress_queue", &self.in_progress_queue)
34 .field(name:"max", &self.max)
35 .finish()
36 }
37}
38
39impl<St> BufferUnordered<St>
40where
41 St: Stream,
42 St::Item: Future,
43{
44 pub(super) fn new(stream: St, n: usize) -> Self {
45 Self {
46 stream: super::Fuse::new(stream),
47 in_progress_queue: FuturesUnordered::new(),
48 max: n,
49 }
50 }
51
52 delegate_access_inner!(stream, St, (.));
53}
54
55impl<St> Stream for BufferUnordered<St>
56where
57 St: Stream,
58 St::Item: Future,
59{
60 type Item = <St::Item as Future>::Output;
61
62 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
63 let mut this = self.project();
64
65 // First up, try to spawn off as many futures as possible by filling up
66 // our queue of futures.
67 while this.in_progress_queue.len() < *this.max {
68 match this.stream.as_mut().poll_next(cx) {
69 Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut),
70 Poll::Ready(None) | Poll::Pending => break,
71 }
72 }
73
74 // Attempt to pull the next value from the in_progress_queue
75 match this.in_progress_queue.poll_next_unpin(cx) {
76 x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x,
77 Poll::Ready(None) => {}
78 }
79
80 // If more values are still coming from the stream, we're not done yet
81 if this.stream.is_done() {
82 Poll::Ready(None)
83 } else {
84 Poll::Pending
85 }
86 }
87
88 fn size_hint(&self) -> (usize, Option<usize>) {
89 let queue_len = self.in_progress_queue.len();
90 let (lower, upper) = self.stream.size_hint();
91 let lower = lower.saturating_add(queue_len);
92 let upper = match upper {
93 Some(x) => x.checked_add(queue_len),
94 None => None,
95 };
96 (lower, upper)
97 }
98}
99
100impl<St> FusedStream for BufferUnordered<St>
101where
102 St: Stream,
103 St::Item: Future,
104{
105 fn is_terminated(&self) -> bool {
106 self.in_progress_queue.is_terminated() && self.stream.is_terminated()
107 }
108}
109
110// Forwarding impl of Sink from the underlying stream
111#[cfg(feature = "sink")]
112impl<S, Item> Sink<Item> for BufferUnordered<S>
113where
114 S: Stream + Sink<Item>,
115 S::Item: Future,
116{
117 type Error = S::Error;
118
119 delegate_sink!(stream, Item);
120}
121