1 | use crate::stream::{FuturesUnordered, StreamExt}; |
2 | use alloc::collections::binary_heap::{BinaryHeap, PeekMut}; |
3 | use core::cmp::Ordering; |
4 | use core::fmt::{self, Debug}; |
5 | use core::iter::FromIterator; |
6 | use core::pin::Pin; |
7 | use futures_core::future::Future; |
8 | use futures_core::ready; |
9 | use futures_core::stream::Stream; |
10 | use futures_core::{ |
11 | task::{Context, Poll}, |
12 | FusedStream, |
13 | }; |
14 | use pin_project_lite::pin_project; |
15 | |
16 | pin_project! { |
17 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
18 | #[derive(Debug)] |
19 | struct OrderWrapper<T> { |
20 | #[pin] |
21 | data: T, // A future or a future's output |
22 | index: isize, |
23 | } |
24 | } |
25 | |
26 | impl<T> PartialEq for OrderWrapper<T> { |
27 | fn eq(&self, other: &Self) -> bool { |
28 | self.index == other.index |
29 | } |
30 | } |
31 | |
32 | impl<T> Eq for OrderWrapper<T> {} |
33 | |
34 | impl<T> PartialOrd for OrderWrapper<T> { |
35 | fn partial_cmp(&self, other: &Self) -> Option<Ordering> { |
36 | Some(self.cmp(other)) |
37 | } |
38 | } |
39 | |
40 | impl<T> Ord for OrderWrapper<T> { |
41 | fn cmp(&self, other: &Self) -> Ordering { |
42 | // BinaryHeap is a max heap, so compare backwards here. |
43 | other.index.cmp(&self.index) |
44 | } |
45 | } |
46 | |
47 | impl<T> Future for OrderWrapper<T> |
48 | where |
49 | T: Future, |
50 | { |
51 | type Output = OrderWrapper<T::Output>; |
52 | |
53 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
54 | let index: isize = self.index; |
55 | self.project().data.poll(cx).map(|output: ::Output| OrderWrapper { data: output, index }) |
56 | } |
57 | } |
58 | |
59 | /// An unbounded queue of futures. |
60 | /// |
61 | /// This "combinator" is similar to [`FuturesUnordered`], but it imposes a FIFO order |
62 | /// on top of the set of futures. While futures in the set will race to |
63 | /// completion in parallel, results will only be returned in the order their |
64 | /// originating futures were added to the queue. |
65 | /// |
66 | /// Futures are pushed into this queue and their realized values are yielded in |
67 | /// order. This structure is optimized to manage a large number of futures. |
68 | /// Futures managed by `FuturesOrdered` will only be polled when they generate |
69 | /// notifications. This reduces the required amount of work needed to coordinate |
70 | /// large numbers of futures. |
71 | /// |
72 | /// When a `FuturesOrdered` is first created, it does not contain any futures. |
73 | /// Calling `poll` in this state will result in `Poll::Ready(None))` to be |
74 | /// returned. Futures are submitted to the queue using `push`; however, the |
75 | /// future will **not** be polled at this point. `FuturesOrdered` will only |
76 | /// poll managed futures when `FuturesOrdered::poll` is called. As such, it |
77 | /// is important to call `poll` after pushing new futures. |
78 | /// |
79 | /// If `FuturesOrdered::poll` returns `Poll::Ready(None)` this means that |
80 | /// the queue is currently not managing any futures. A future may be submitted |
81 | /// to the queue at a later time. At that point, a call to |
82 | /// `FuturesOrdered::poll` will either return the future's resolved value |
83 | /// **or** `Poll::Pending` if the future has not yet completed. When |
84 | /// multiple futures are submitted to the queue, `FuturesOrdered::poll` will |
85 | /// return `Poll::Pending` until the first future completes, even if |
86 | /// some of the later futures have already completed. |
87 | /// |
88 | /// Note that you can create a ready-made `FuturesOrdered` via the |
89 | /// [`collect`](Iterator::collect) method, or you can start with an empty queue |
90 | /// with the `FuturesOrdered::new` constructor. |
91 | /// |
92 | /// This type is only available when the `std` or `alloc` feature of this |
93 | /// library is activated, and it is activated by default. |
94 | #[must_use = "streams do nothing unless polled" ] |
95 | pub struct FuturesOrdered<T: Future> { |
96 | in_progress_queue: FuturesUnordered<OrderWrapper<T>>, |
97 | queued_outputs: BinaryHeap<OrderWrapper<T::Output>>, |
98 | next_incoming_index: isize, |
99 | next_outgoing_index: isize, |
100 | } |
101 | |
102 | impl<T: Future> Unpin for FuturesOrdered<T> {} |
103 | |
104 | impl<Fut: Future> FuturesOrdered<Fut> { |
105 | /// Constructs a new, empty `FuturesOrdered` |
106 | /// |
107 | /// The returned `FuturesOrdered` does not contain any futures and, in this |
108 | /// state, `FuturesOrdered::poll_next` will return `Poll::Ready(None)`. |
109 | pub fn new() -> Self { |
110 | Self { |
111 | in_progress_queue: FuturesUnordered::new(), |
112 | queued_outputs: BinaryHeap::new(), |
113 | next_incoming_index: 0, |
114 | next_outgoing_index: 0, |
115 | } |
116 | } |
117 | |
118 | /// Returns the number of futures contained in the queue. |
119 | /// |
120 | /// This represents the total number of in-flight futures, both |
121 | /// those currently processing and those that have completed but |
122 | /// which are waiting for earlier futures to complete. |
123 | pub fn len(&self) -> usize { |
124 | self.in_progress_queue.len() + self.queued_outputs.len() |
125 | } |
126 | |
127 | /// Returns `true` if the queue contains no futures |
128 | pub fn is_empty(&self) -> bool { |
129 | self.in_progress_queue.is_empty() && self.queued_outputs.is_empty() |
130 | } |
131 | |
132 | /// Push a future into the queue. |
133 | /// |
134 | /// This function submits the given future to the internal set for managing. |
135 | /// This function will not call `poll` on the submitted future. The caller |
136 | /// must ensure that `FuturesOrdered::poll` is called in order to receive |
137 | /// task notifications. |
138 | #[deprecated (note = "use `push_back` instead" )] |
139 | pub fn push(&mut self, future: Fut) { |
140 | self.push_back(future); |
141 | } |
142 | |
143 | /// Pushes a future to the back of the queue. |
144 | /// |
145 | /// This function submits the given future to the internal set for managing. |
146 | /// This function will not call `poll` on the submitted future. The caller |
147 | /// must ensure that `FuturesOrdered::poll` is called in order to receive |
148 | /// task notifications. |
149 | pub fn push_back(&mut self, future: Fut) { |
150 | let wrapped = OrderWrapper { data: future, index: self.next_incoming_index }; |
151 | self.next_incoming_index += 1; |
152 | self.in_progress_queue.push(wrapped); |
153 | } |
154 | |
155 | /// Pushes a future to the front of the queue. |
156 | /// |
157 | /// This function submits the given future to the internal set for managing. |
158 | /// This function will not call `poll` on the submitted future. The caller |
159 | /// must ensure that `FuturesOrdered::poll` is called in order to receive |
160 | /// task notifications. This future will be the next future to be returned |
161 | /// complete. |
162 | pub fn push_front(&mut self, future: Fut) { |
163 | let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 }; |
164 | self.next_outgoing_index -= 1; |
165 | self.in_progress_queue.push(wrapped); |
166 | } |
167 | } |
168 | |
169 | impl<Fut: Future> Default for FuturesOrdered<Fut> { |
170 | fn default() -> Self { |
171 | Self::new() |
172 | } |
173 | } |
174 | |
175 | impl<Fut: Future> Stream for FuturesOrdered<Fut> { |
176 | type Item = Fut::Output; |
177 | |
178 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
179 | let this = &mut *self; |
180 | |
181 | // Check to see if we've already received the next value |
182 | if let Some(next_output) = this.queued_outputs.peek_mut() { |
183 | if next_output.index == this.next_outgoing_index { |
184 | this.next_outgoing_index += 1; |
185 | return Poll::Ready(Some(PeekMut::pop(next_output).data)); |
186 | } |
187 | } |
188 | |
189 | loop { |
190 | match ready!(this.in_progress_queue.poll_next_unpin(cx)) { |
191 | Some(output) => { |
192 | if output.index == this.next_outgoing_index { |
193 | this.next_outgoing_index += 1; |
194 | return Poll::Ready(Some(output.data)); |
195 | } else { |
196 | this.queued_outputs.push(output) |
197 | } |
198 | } |
199 | None => return Poll::Ready(None), |
200 | } |
201 | } |
202 | } |
203 | |
204 | fn size_hint(&self) -> (usize, Option<usize>) { |
205 | let len = self.len(); |
206 | (len, Some(len)) |
207 | } |
208 | } |
209 | |
210 | impl<Fut: Future> Debug for FuturesOrdered<Fut> { |
211 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
212 | write!(f, "FuturesOrdered {{ ... }}" ) |
213 | } |
214 | } |
215 | |
216 | impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> { |
217 | fn from_iter<T>(iter: T) -> Self |
218 | where |
219 | T: IntoIterator<Item = Fut>, |
220 | { |
221 | let acc: FuturesOrdered = Self::new(); |
222 | iter.into_iter().fold(init:acc, |mut acc: FuturesOrdered, item: Fut| { |
223 | acc.push_back(future:item); |
224 | acc |
225 | }) |
226 | } |
227 | } |
228 | |
229 | impl<Fut: Future> FusedStream for FuturesOrdered<Fut> { |
230 | fn is_terminated(&self) -> bool { |
231 | self.in_progress_queue.is_terminated() && self.queued_outputs.is_empty() |
232 | } |
233 | } |
234 | |
235 | impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> { |
236 | fn extend<I>(&mut self, iter: I) |
237 | where |
238 | I: IntoIterator<Item = Fut>, |
239 | { |
240 | for item: Fut in iter { |
241 | self.push_back(future:item); |
242 | } |
243 | } |
244 | } |
245 | |