| 1 | use crate::stream::{FuturesUnordered, StreamExt}; |
| 2 | use core::fmt; |
| 3 | use core::mem; |
| 4 | use core::num::NonZeroUsize; |
| 5 | use core::pin::Pin; |
| 6 | use futures_core::future::{FusedFuture, Future}; |
| 7 | use futures_core::stream::TryStream; |
| 8 | use futures_core::task::{Context, Poll}; |
| 9 | use pin_project_lite::pin_project; |
| 10 | |
| 11 | pin_project! { |
| 12 | /// Future for the |
| 13 | /// [`try_for_each_concurrent`](super::TryStreamExt::try_for_each_concurrent) |
| 14 | /// method. |
| 15 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
| 16 | pub struct TryForEachConcurrent<St, Fut, F> { |
| 17 | #[pin] |
| 18 | stream: Option<St>, |
| 19 | f: F, |
| 20 | futures: FuturesUnordered<Fut>, |
| 21 | limit: Option<NonZeroUsize>, |
| 22 | } |
| 23 | } |
| 24 | |
| 25 | impl<St, Fut, F> fmt::Debug for TryForEachConcurrent<St, Fut, F> |
| 26 | where |
| 27 | St: fmt::Debug, |
| 28 | Fut: fmt::Debug, |
| 29 | { |
| 30 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 31 | f&mut DebugStruct<'_, '_>.debug_struct("TryForEachConcurrent" ) |
| 32 | .field("stream" , &self.stream) |
| 33 | .field("futures" , &self.futures) |
| 34 | .field(name:"limit" , &self.limit) |
| 35 | .finish() |
| 36 | } |
| 37 | } |
| 38 | |
| 39 | impl<St, Fut, F> FusedFuture for TryForEachConcurrent<St, Fut, F> |
| 40 | where |
| 41 | St: TryStream, |
| 42 | F: FnMut(St::Ok) -> Fut, |
| 43 | Fut: Future<Output = Result<(), St::Error>>, |
| 44 | { |
| 45 | fn is_terminated(&self) -> bool { |
| 46 | self.stream.is_none() && self.futures.is_empty() |
| 47 | } |
| 48 | } |
| 49 | |
| 50 | impl<St, Fut, F> TryForEachConcurrent<St, Fut, F> |
| 51 | where |
| 52 | St: TryStream, |
| 53 | F: FnMut(St::Ok) -> Fut, |
| 54 | Fut: Future<Output = Result<(), St::Error>>, |
| 55 | { |
| 56 | pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> Self { |
| 57 | Self { |
| 58 | stream: Some(stream), |
| 59 | // Note: `limit` = 0 gets ignored. |
| 60 | limit: limit.and_then(NonZeroUsize::new), |
| 61 | f, |
| 62 | futures: FuturesUnordered::new(), |
| 63 | } |
| 64 | } |
| 65 | } |
| 66 | |
| 67 | impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F> |
| 68 | where |
| 69 | St: TryStream, |
| 70 | F: FnMut(St::Ok) -> Fut, |
| 71 | Fut: Future<Output = Result<(), St::Error>>, |
| 72 | { |
| 73 | type Output = Result<(), St::Error>; |
| 74 | |
| 75 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 76 | let mut this = self.project(); |
| 77 | loop { |
| 78 | let mut made_progress_this_iter = false; |
| 79 | |
| 80 | // Check if we've already created a number of futures greater than `limit` |
| 81 | if this.limit.map(|limit| limit.get() > this.futures.len()).unwrap_or(true) { |
| 82 | let poll_res = match this.stream.as_mut().as_pin_mut() { |
| 83 | Some(stream) => stream.try_poll_next(cx), |
| 84 | None => Poll::Ready(None), |
| 85 | }; |
| 86 | |
| 87 | let elem = match poll_res { |
| 88 | Poll::Ready(Some(Ok(elem))) => { |
| 89 | made_progress_this_iter = true; |
| 90 | Some(elem) |
| 91 | } |
| 92 | Poll::Ready(None) => { |
| 93 | this.stream.set(None); |
| 94 | None |
| 95 | } |
| 96 | Poll::Pending => None, |
| 97 | Poll::Ready(Some(Err(e))) => { |
| 98 | // Empty the stream and futures so that we know |
| 99 | // the future has completed. |
| 100 | this.stream.set(None); |
| 101 | drop(mem::replace(this.futures, FuturesUnordered::new())); |
| 102 | return Poll::Ready(Err(e)); |
| 103 | } |
| 104 | }; |
| 105 | |
| 106 | if let Some(elem) = elem { |
| 107 | this.futures.push((this.f)(elem)); |
| 108 | } |
| 109 | } |
| 110 | |
| 111 | match this.futures.poll_next_unpin(cx) { |
| 112 | Poll::Ready(Some(Ok(()))) => made_progress_this_iter = true, |
| 113 | Poll::Ready(None) => { |
| 114 | if this.stream.is_none() { |
| 115 | return Poll::Ready(Ok(())); |
| 116 | } |
| 117 | } |
| 118 | Poll::Pending => {} |
| 119 | Poll::Ready(Some(Err(e))) => { |
| 120 | // Empty the stream and futures so that we know |
| 121 | // the future has completed. |
| 122 | this.stream.set(None); |
| 123 | drop(mem::replace(this.futures, FuturesUnordered::new())); |
| 124 | return Poll::Ready(Err(e)); |
| 125 | } |
| 126 | } |
| 127 | |
| 128 | if !made_progress_this_iter { |
| 129 | return Poll::Pending; |
| 130 | } |
| 131 | } |
| 132 | } |
| 133 | } |
| 134 | |