1 | use futures::channel::oneshot; |
2 | use futures::executor::{block_on, block_on_stream}; |
3 | use futures::future::{self, join, Future, FutureExt}; |
4 | use futures::stream::{FusedStream, FuturesUnordered, StreamExt}; |
5 | use futures::task::{Context, Poll}; |
6 | use futures_test::future::FutureTestExt; |
7 | use futures_test::task::noop_context; |
8 | use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending}; |
9 | use std::iter::FromIterator; |
10 | use std::pin::Pin; |
11 | use std::sync::atomic::{AtomicBool, Ordering}; |
12 | |
13 | #[test] |
14 | fn is_terminated() { |
15 | let mut cx = noop_context(); |
16 | let mut tasks = FuturesUnordered::new(); |
17 | |
18 | assert_eq!(tasks.is_terminated(), false); |
19 | assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None)); |
20 | assert_eq!(tasks.is_terminated(), true); |
21 | |
22 | // Test that the sentinel value doesn't leak |
23 | assert_eq!(tasks.is_empty(), true); |
24 | assert_eq!(tasks.len(), 0); |
25 | assert_eq!(tasks.iter_mut().len(), 0); |
26 | |
27 | tasks.push(future::ready(1)); |
28 | |
29 | assert_eq!(tasks.is_empty(), false); |
30 | assert_eq!(tasks.len(), 1); |
31 | assert_eq!(tasks.iter_mut().len(), 1); |
32 | |
33 | assert_eq!(tasks.is_terminated(), false); |
34 | assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(Some(1))); |
35 | assert_eq!(tasks.is_terminated(), false); |
36 | assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None)); |
37 | assert_eq!(tasks.is_terminated(), true); |
38 | } |
39 | |
40 | #[test] |
41 | fn works_1() { |
42 | let (a_tx, a_rx) = oneshot::channel::<i32>(); |
43 | let (b_tx, b_rx) = oneshot::channel::<i32>(); |
44 | let (c_tx, c_rx) = oneshot::channel::<i32>(); |
45 | |
46 | let mut iter = |
47 | block_on_stream(vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>()); |
48 | |
49 | b_tx.send(99).unwrap(); |
50 | assert_eq!(Some(Ok(99)), iter.next()); |
51 | |
52 | a_tx.send(33).unwrap(); |
53 | c_tx.send(33).unwrap(); |
54 | assert_eq!(Some(Ok(33)), iter.next()); |
55 | assert_eq!(Some(Ok(33)), iter.next()); |
56 | assert_eq!(None, iter.next()); |
57 | } |
58 | |
59 | #[test] |
60 | fn works_2() { |
61 | let (a_tx, a_rx) = oneshot::channel::<i32>(); |
62 | let (b_tx, b_rx) = oneshot::channel::<i32>(); |
63 | let (c_tx, c_rx) = oneshot::channel::<i32>(); |
64 | |
65 | let mut stream = vec![a_rx.boxed(), join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed()] |
66 | .into_iter() |
67 | .collect::<FuturesUnordered<_>>(); |
68 | |
69 | a_tx.send(9).unwrap(); |
70 | b_tx.send(10).unwrap(); |
71 | |
72 | let mut cx = noop_context(); |
73 | assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(9)))); |
74 | c_tx.send(20).unwrap(); |
75 | assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(30)))); |
76 | assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(None)); |
77 | } |
78 | |
79 | #[test] |
80 | fn from_iterator() { |
81 | let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)] |
82 | .into_iter() |
83 | .collect::<FuturesUnordered<_>>(); |
84 | assert_eq!(stream.len(), 3); |
85 | assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]); |
86 | } |
87 | |
88 | #[test] |
89 | fn finished_future() { |
90 | let (_a_tx, a_rx) = oneshot::channel::<i32>(); |
91 | let (b_tx, b_rx) = oneshot::channel::<i32>(); |
92 | let (c_tx, c_rx) = oneshot::channel::<i32>(); |
93 | |
94 | let mut stream = vec![ |
95 | Box::new(a_rx) as Box<dyn Future<Output = Result<_, _>> + Unpin>, |
96 | Box::new(future::select(b_rx, c_rx).map(|e| e.factor_first().0)) as _, |
97 | ] |
98 | .into_iter() |
99 | .collect::<FuturesUnordered<_>>(); |
100 | |
101 | let cx = &mut noop_context(); |
102 | for _ in 0..10 { |
103 | assert!(stream.poll_next_unpin(cx).is_pending()); |
104 | } |
105 | |
106 | b_tx.send(12).unwrap(); |
107 | c_tx.send(3).unwrap(); |
108 | assert!(stream.poll_next_unpin(cx).is_ready()); |
109 | assert!(stream.poll_next_unpin(cx).is_pending()); |
110 | assert!(stream.poll_next_unpin(cx).is_pending()); |
111 | } |
112 | |
113 | #[test] |
114 | fn iter_mut_cancel() { |
115 | let (a_tx, a_rx) = oneshot::channel::<i32>(); |
116 | let (b_tx, b_rx) = oneshot::channel::<i32>(); |
117 | let (c_tx, c_rx) = oneshot::channel::<i32>(); |
118 | |
119 | let mut stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>(); |
120 | |
121 | for rx in stream.iter_mut() { |
122 | rx.close(); |
123 | } |
124 | |
125 | let mut iter = block_on_stream(stream); |
126 | |
127 | assert!(a_tx.is_canceled()); |
128 | assert!(b_tx.is_canceled()); |
129 | assert!(c_tx.is_canceled()); |
130 | |
131 | assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); |
132 | assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); |
133 | assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); |
134 | assert_eq!(iter.next(), None); |
135 | } |
136 | |
137 | #[test] |
138 | fn iter_mut_len() { |
139 | let mut stream = |
140 | vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()] |
141 | .into_iter() |
142 | .collect::<FuturesUnordered<_>>(); |
143 | |
144 | let mut iter_mut = stream.iter_mut(); |
145 | assert_eq!(iter_mut.len(), 3); |
146 | assert!(iter_mut.next().is_some()); |
147 | assert_eq!(iter_mut.len(), 2); |
148 | assert!(iter_mut.next().is_some()); |
149 | assert_eq!(iter_mut.len(), 1); |
150 | assert!(iter_mut.next().is_some()); |
151 | assert_eq!(iter_mut.len(), 0); |
152 | assert!(iter_mut.next().is_none()); |
153 | } |
154 | |
155 | #[test] |
156 | fn iter_cancel() { |
157 | struct AtomicCancel<F> { |
158 | future: F, |
159 | cancel: AtomicBool, |
160 | } |
161 | |
162 | impl<F: Future + Unpin> Future for AtomicCancel<F> { |
163 | type Output = Option<<F as Future>::Output>; |
164 | |
165 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
166 | if self.cancel.load(Ordering::Relaxed) { |
167 | Poll::Ready(None) |
168 | } else { |
169 | self.future.poll_unpin(cx).map(Some) |
170 | } |
171 | } |
172 | } |
173 | |
174 | impl<F: Future + Unpin> AtomicCancel<F> { |
175 | fn new(future: F) -> Self { |
176 | Self { future, cancel: AtomicBool::new(false) } |
177 | } |
178 | } |
179 | |
180 | let stream = vec![ |
181 | AtomicCancel::new(future::pending::<()>()), |
182 | AtomicCancel::new(future::pending::<()>()), |
183 | AtomicCancel::new(future::pending::<()>()), |
184 | ] |
185 | .into_iter() |
186 | .collect::<FuturesUnordered<_>>(); |
187 | |
188 | for f in stream.iter() { |
189 | f.cancel.store(true, Ordering::Relaxed); |
190 | } |
191 | |
192 | let mut iter = block_on_stream(stream); |
193 | |
194 | assert_eq!(iter.next(), Some(None)); |
195 | assert_eq!(iter.next(), Some(None)); |
196 | assert_eq!(iter.next(), Some(None)); |
197 | assert_eq!(iter.next(), None); |
198 | } |
199 | |
200 | #[test] |
201 | fn iter_len() { |
202 | let stream = vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()] |
203 | .into_iter() |
204 | .collect::<FuturesUnordered<_>>(); |
205 | |
206 | let mut iter = stream.iter(); |
207 | assert_eq!(iter.len(), 3); |
208 | assert!(iter.next().is_some()); |
209 | assert_eq!(iter.len(), 2); |
210 | assert!(iter.next().is_some()); |
211 | assert_eq!(iter.len(), 1); |
212 | assert!(iter.next().is_some()); |
213 | assert_eq!(iter.len(), 0); |
214 | assert!(iter.next().is_none()); |
215 | } |
216 | |
217 | #[test] |
218 | fn into_iter_cancel() { |
219 | let (a_tx, a_rx) = oneshot::channel::<i32>(); |
220 | let (b_tx, b_rx) = oneshot::channel::<i32>(); |
221 | let (c_tx, c_rx) = oneshot::channel::<i32>(); |
222 | |
223 | let stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>(); |
224 | |
225 | let stream = stream |
226 | .into_iter() |
227 | .map(|mut rx| { |
228 | rx.close(); |
229 | rx |
230 | }) |
231 | .collect::<FuturesUnordered<_>>(); |
232 | |
233 | let mut iter = block_on_stream(stream); |
234 | |
235 | assert!(a_tx.is_canceled()); |
236 | assert!(b_tx.is_canceled()); |
237 | assert!(c_tx.is_canceled()); |
238 | |
239 | assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); |
240 | assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); |
241 | assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); |
242 | assert_eq!(iter.next(), None); |
243 | } |
244 | |
245 | #[test] |
246 | fn into_iter_len() { |
247 | let stream = vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()] |
248 | .into_iter() |
249 | .collect::<FuturesUnordered<_>>(); |
250 | |
251 | let mut into_iter = stream.into_iter(); |
252 | assert_eq!(into_iter.len(), 3); |
253 | assert!(into_iter.next().is_some()); |
254 | assert_eq!(into_iter.len(), 2); |
255 | assert!(into_iter.next().is_some()); |
256 | assert_eq!(into_iter.len(), 1); |
257 | assert!(into_iter.next().is_some()); |
258 | assert_eq!(into_iter.len(), 0); |
259 | assert!(into_iter.next().is_none()); |
260 | } |
261 | |
262 | #[test] |
263 | fn into_iter_partial() { |
264 | let stream = vec![future::ready(1), future::ready(2), future::ready(3), future::ready(4)] |
265 | .into_iter() |
266 | .collect::<FuturesUnordered<_>>(); |
267 | |
268 | let mut into_iter = stream.into_iter(); |
269 | assert!(into_iter.next().is_some()); |
270 | assert!(into_iter.next().is_some()); |
271 | assert!(into_iter.next().is_some()); |
272 | assert_eq!(into_iter.len(), 1); |
273 | // don't panic when iterator is dropped before completing |
274 | } |
275 | |
276 | #[test] |
277 | fn futures_not_moved_after_poll() { |
278 | // Future that will be ready after being polled twice, |
279 | // asserting that it does not move. |
280 | let fut = future::ready(()).pending_once().assert_unmoved(); |
281 | let mut stream = vec![fut; 3].into_iter().collect::<FuturesUnordered<_>>(); |
282 | assert_stream_pending!(stream); |
283 | assert_stream_next!(stream, ()); |
284 | assert_stream_next!(stream, ()); |
285 | assert_stream_next!(stream, ()); |
286 | assert_stream_done!(stream); |
287 | } |
288 | |
289 | #[test] |
290 | fn len_valid_during_out_of_order_completion() { |
291 | // Complete futures out-of-order and add new futures afterwards to ensure |
292 | // length values remain correct. |
293 | let (a_tx, a_rx) = oneshot::channel::<i32>(); |
294 | let (b_tx, b_rx) = oneshot::channel::<i32>(); |
295 | let (c_tx, c_rx) = oneshot::channel::<i32>(); |
296 | let (d_tx, d_rx) = oneshot::channel::<i32>(); |
297 | |
298 | let mut cx = noop_context(); |
299 | let mut stream = FuturesUnordered::new(); |
300 | assert_eq!(stream.len(), 0); |
301 | |
302 | stream.push(a_rx); |
303 | assert_eq!(stream.len(), 1); |
304 | stream.push(b_rx); |
305 | assert_eq!(stream.len(), 2); |
306 | stream.push(c_rx); |
307 | assert_eq!(stream.len(), 3); |
308 | |
309 | b_tx.send(4).unwrap(); |
310 | assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(4)))); |
311 | assert_eq!(stream.len(), 2); |
312 | |
313 | stream.push(d_rx); |
314 | assert_eq!(stream.len(), 3); |
315 | |
316 | c_tx.send(5).unwrap(); |
317 | assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(5)))); |
318 | assert_eq!(stream.len(), 2); |
319 | |
320 | d_tx.send(6).unwrap(); |
321 | assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(6)))); |
322 | assert_eq!(stream.len(), 1); |
323 | |
324 | a_tx.send(7).unwrap(); |
325 | assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(7)))); |
326 | assert_eq!(stream.len(), 0); |
327 | } |
328 | |
329 | #[test] |
330 | fn polled_only_once_at_most_per_iteration() { |
331 | #[derive(Debug, Clone, Copy, Default)] |
332 | struct F { |
333 | polled: bool, |
334 | } |
335 | |
336 | impl Future for F { |
337 | type Output = (); |
338 | |
339 | fn poll(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Self::Output> { |
340 | if self.polled { |
341 | panic!("polled twice" ) |
342 | } else { |
343 | self.polled = true; |
344 | Poll::Pending |
345 | } |
346 | } |
347 | } |
348 | |
349 | let cx = &mut noop_context(); |
350 | |
351 | let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 10]); |
352 | assert!(tasks.poll_next_unpin(cx).is_pending()); |
353 | assert_eq!(10, tasks.iter().filter(|f| f.polled).count()); |
354 | |
355 | let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 33]); |
356 | assert!(tasks.poll_next_unpin(cx).is_pending()); |
357 | assert_eq!(33, tasks.iter().filter(|f| f.polled).count()); |
358 | |
359 | let mut tasks = FuturesUnordered::<F>::new(); |
360 | assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx)); |
361 | } |
362 | |
363 | #[test] |
364 | fn clear() { |
365 | let mut tasks = FuturesUnordered::from_iter(vec![future::ready(1), future::ready(2)]); |
366 | |
367 | assert_eq!(block_on(tasks.next()), Some(1)); |
368 | assert!(!tasks.is_empty()); |
369 | |
370 | tasks.clear(); |
371 | assert!(tasks.is_empty()); |
372 | |
373 | tasks.push(future::ready(3)); |
374 | assert!(!tasks.is_empty()); |
375 | |
376 | tasks.clear(); |
377 | assert!(tasks.is_empty()); |
378 | |
379 | assert_eq!(block_on(tasks.next()), None); |
380 | assert!(tasks.is_terminated()); |
381 | tasks.clear(); |
382 | assert!(!tasks.is_terminated()); |
383 | } |
384 | |