1 | use crate::stream::{FuturesUnordered, StreamExt}; |
2 | use core::fmt; |
3 | use core::mem; |
4 | use core::num::NonZeroUsize; |
5 | use core::pin::Pin; |
6 | use futures_core::future::{FusedFuture, Future}; |
7 | use futures_core::stream::TryStream; |
8 | use futures_core::task::{Context, Poll}; |
9 | use pin_project_lite::pin_project; |
10 | |
11 | pin_project! { |
12 | /// Future for the |
13 | /// [`try_for_each_concurrent`](super::TryStreamExt::try_for_each_concurrent) |
14 | /// method. |
15 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
16 | pub struct TryForEachConcurrent<St, Fut, F> { |
17 | #[pin] |
18 | stream: Option<St>, |
19 | f: F, |
20 | futures: FuturesUnordered<Fut>, |
21 | limit: Option<NonZeroUsize>, |
22 | } |
23 | } |
24 | |
25 | impl<St, Fut, F> fmt::Debug for TryForEachConcurrent<St, Fut, F> |
26 | where |
27 | St: fmt::Debug, |
28 | Fut: fmt::Debug, |
29 | { |
30 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
31 | f&mut DebugStruct<'_, '_>.debug_struct("TryForEachConcurrent" ) |
32 | .field("stream" , &self.stream) |
33 | .field("futures" , &self.futures) |
34 | .field(name:"limit" , &self.limit) |
35 | .finish() |
36 | } |
37 | } |
38 | |
39 | impl<St, Fut, F> FusedFuture for TryForEachConcurrent<St, Fut, F> |
40 | where |
41 | St: TryStream, |
42 | F: FnMut(St::Ok) -> Fut, |
43 | Fut: Future<Output = Result<(), St::Error>>, |
44 | { |
45 | fn is_terminated(&self) -> bool { |
46 | self.stream.is_none() && self.futures.is_empty() |
47 | } |
48 | } |
49 | |
50 | impl<St, Fut, F> TryForEachConcurrent<St, Fut, F> |
51 | where |
52 | St: TryStream, |
53 | F: FnMut(St::Ok) -> Fut, |
54 | Fut: Future<Output = Result<(), St::Error>>, |
55 | { |
56 | pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> Self { |
57 | Self { |
58 | stream: Some(stream), |
59 | // Note: `limit` = 0 gets ignored. |
60 | limit: limit.and_then(NonZeroUsize::new), |
61 | f, |
62 | futures: FuturesUnordered::new(), |
63 | } |
64 | } |
65 | } |
66 | |
67 | impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F> |
68 | where |
69 | St: TryStream, |
70 | F: FnMut(St::Ok) -> Fut, |
71 | Fut: Future<Output = Result<(), St::Error>>, |
72 | { |
73 | type Output = Result<(), St::Error>; |
74 | |
75 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
76 | let mut this = self.project(); |
77 | loop { |
78 | let mut made_progress_this_iter = false; |
79 | |
80 | // Check if we've already created a number of futures greater than `limit` |
81 | if this.limit.map(|limit| limit.get() > this.futures.len()).unwrap_or(true) { |
82 | let poll_res = match this.stream.as_mut().as_pin_mut() { |
83 | Some(stream) => stream.try_poll_next(cx), |
84 | None => Poll::Ready(None), |
85 | }; |
86 | |
87 | let elem = match poll_res { |
88 | Poll::Ready(Some(Ok(elem))) => { |
89 | made_progress_this_iter = true; |
90 | Some(elem) |
91 | } |
92 | Poll::Ready(None) => { |
93 | this.stream.set(None); |
94 | None |
95 | } |
96 | Poll::Pending => None, |
97 | Poll::Ready(Some(Err(e))) => { |
98 | // Empty the stream and futures so that we know |
99 | // the future has completed. |
100 | this.stream.set(None); |
101 | drop(mem::replace(this.futures, FuturesUnordered::new())); |
102 | return Poll::Ready(Err(e)); |
103 | } |
104 | }; |
105 | |
106 | if let Some(elem) = elem { |
107 | this.futures.push((this.f)(elem)); |
108 | } |
109 | } |
110 | |
111 | match this.futures.poll_next_unpin(cx) { |
112 | Poll::Ready(Some(Ok(()))) => made_progress_this_iter = true, |
113 | Poll::Ready(None) => { |
114 | if this.stream.is_none() { |
115 | return Poll::Ready(Ok(())); |
116 | } |
117 | } |
118 | Poll::Pending => {} |
119 | Poll::Ready(Some(Err(e))) => { |
120 | // Empty the stream and futures so that we know |
121 | // the future has completed. |
122 | this.stream.set(None); |
123 | drop(mem::replace(this.futures, FuturesUnordered::new())); |
124 | return Poll::Ready(Err(e)); |
125 | } |
126 | } |
127 | |
128 | if !made_progress_this_iter { |
129 | return Poll::Pending; |
130 | } |
131 | } |
132 | } |
133 | } |
134 | |