1 | use crate::stream::{FuturesUnordered, StreamExt}; |
2 | use core::fmt; |
3 | use core::num::NonZeroUsize; |
4 | use core::pin::Pin; |
5 | use futures_core::future::{FusedFuture, Future}; |
6 | use futures_core::stream::Stream; |
7 | use futures_core::task::{Context, Poll}; |
8 | use pin_project_lite::pin_project; |
9 | |
10 | pin_project! { |
11 | /// Future for the [`for_each_concurrent`](super::StreamExt::for_each_concurrent) |
12 | /// method. |
13 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
14 | pub struct ForEachConcurrent<St, Fut, F> { |
15 | #[pin] |
16 | stream: Option<St>, |
17 | f: F, |
18 | futures: FuturesUnordered<Fut>, |
19 | limit: Option<NonZeroUsize>, |
20 | } |
21 | } |
22 | |
23 | impl<St, Fut, F> fmt::Debug for ForEachConcurrent<St, Fut, F> |
24 | where |
25 | St: fmt::Debug, |
26 | Fut: fmt::Debug, |
27 | { |
28 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
29 | f.debug_struct("ForEachConcurrent" ) |
30 | .field("stream" , &self.stream) |
31 | .field("futures" , &self.futures) |
32 | .field("limit" , &self.limit) |
33 | .finish() |
34 | } |
35 | } |
36 | |
37 | impl<St, Fut, F> ForEachConcurrent<St, Fut, F> |
38 | where |
39 | St: Stream, |
40 | F: FnMut(St::Item) -> Fut, |
41 | Fut: Future<Output = ()>, |
42 | { |
43 | pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> Self { |
44 | Self { |
45 | stream: Some(stream), |
46 | // Note: `limit` = 0 gets ignored. |
47 | limit: limit.and_then(NonZeroUsize::new), |
48 | f, |
49 | futures: FuturesUnordered::new(), |
50 | } |
51 | } |
52 | } |
53 | |
54 | impl<St, Fut, F> FusedFuture for ForEachConcurrent<St, Fut, F> |
55 | where |
56 | St: Stream, |
57 | F: FnMut(St::Item) -> Fut, |
58 | Fut: Future<Output = ()>, |
59 | { |
60 | fn is_terminated(&self) -> bool { |
61 | self.stream.is_none() && self.futures.is_empty() |
62 | } |
63 | } |
64 | |
65 | impl<St, Fut, F> Future for ForEachConcurrent<St, Fut, F> |
66 | where |
67 | St: Stream, |
68 | F: FnMut(St::Item) -> Fut, |
69 | Fut: Future<Output = ()>, |
70 | { |
71 | type Output = (); |
72 | |
73 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { |
74 | let mut this = self.project(); |
75 | loop { |
76 | let mut made_progress_this_iter = false; |
77 | |
78 | // Check if we've already created a number of futures greater than `limit` |
79 | if this.limit.map(|limit| limit.get() > this.futures.len()).unwrap_or(true) { |
80 | let mut stream_completed = false; |
81 | let elem = if let Some(stream) = this.stream.as_mut().as_pin_mut() { |
82 | match stream.poll_next(cx) { |
83 | Poll::Ready(Some(elem)) => { |
84 | made_progress_this_iter = true; |
85 | Some(elem) |
86 | } |
87 | Poll::Ready(None) => { |
88 | stream_completed = true; |
89 | None |
90 | } |
91 | Poll::Pending => None, |
92 | } |
93 | } else { |
94 | None |
95 | }; |
96 | if stream_completed { |
97 | this.stream.set(None); |
98 | } |
99 | if let Some(elem) = elem { |
100 | this.futures.push((this.f)(elem)); |
101 | } |
102 | } |
103 | |
104 | match this.futures.poll_next_unpin(cx) { |
105 | Poll::Ready(Some(())) => made_progress_this_iter = true, |
106 | Poll::Ready(None) => { |
107 | if this.stream.is_none() { |
108 | return Poll::Ready(()); |
109 | } |
110 | } |
111 | Poll::Pending => {} |
112 | } |
113 | |
114 | if !made_progress_this_iter { |
115 | return Poll::Pending; |
116 | } |
117 | } |
118 | } |
119 | } |
120 | |