1 | use futures::channel::oneshot; |
2 | use futures::executor::{block_on, block_on_stream}; |
3 | use futures::future::{self, join, Future, FutureExt, TryFutureExt}; |
4 | use futures::stream::{FuturesOrdered, StreamExt}; |
5 | use futures::task::Poll; |
6 | use futures_test::task::noop_context; |
7 | use std::any::Any; |
8 | |
9 | #[test] |
10 | fn works_1() { |
11 | let (a_tx, a_rx) = oneshot::channel::<i32>(); |
12 | let (b_tx, b_rx) = oneshot::channel::<i32>(); |
13 | let (c_tx, c_rx) = oneshot::channel::<i32>(); |
14 | |
15 | let mut stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesOrdered<_>>(); |
16 | |
17 | b_tx.send(99).unwrap(); |
18 | assert!(stream.poll_next_unpin(&mut noop_context()).is_pending()); |
19 | |
20 | a_tx.send(33).unwrap(); |
21 | c_tx.send(33).unwrap(); |
22 | |
23 | let mut iter = block_on_stream(stream); |
24 | assert_eq!(Some(Ok(33)), iter.next()); |
25 | assert_eq!(Some(Ok(99)), iter.next()); |
26 | assert_eq!(Some(Ok(33)), iter.next()); |
27 | assert_eq!(None, iter.next()); |
28 | } |
29 | |
30 | #[test] |
31 | fn works_2() { |
32 | let (a_tx, a_rx) = oneshot::channel::<i32>(); |
33 | let (b_tx, b_rx) = oneshot::channel::<i32>(); |
34 | let (c_tx, c_rx) = oneshot::channel::<i32>(); |
35 | |
36 | let mut stream = vec![a_rx.boxed(), join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed()] |
37 | .into_iter() |
38 | .collect::<FuturesOrdered<_>>(); |
39 | |
40 | let mut cx = noop_context(); |
41 | a_tx.send(33).unwrap(); |
42 | b_tx.send(33).unwrap(); |
43 | assert!(stream.poll_next_unpin(&mut cx).is_ready()); |
44 | assert!(stream.poll_next_unpin(&mut cx).is_pending()); |
45 | c_tx.send(33).unwrap(); |
46 | assert!(stream.poll_next_unpin(&mut cx).is_ready()); |
47 | } |
48 | |
49 | #[test] |
50 | fn test_push_front() { |
51 | let (a_tx, a_rx) = oneshot::channel::<i32>(); |
52 | let (b_tx, b_rx) = oneshot::channel::<i32>(); |
53 | let (c_tx, c_rx) = oneshot::channel::<i32>(); |
54 | let (d_tx, d_rx) = oneshot::channel::<i32>(); |
55 | |
56 | let mut stream = FuturesOrdered::new(); |
57 | |
58 | let mut cx = noop_context(); |
59 | |
60 | stream.push_back(a_rx); |
61 | stream.push_back(b_rx); |
62 | stream.push_back(c_rx); |
63 | |
64 | a_tx.send(1).unwrap(); |
65 | b_tx.send(2).unwrap(); |
66 | c_tx.send(3).unwrap(); |
67 | |
68 | // 1 and 2 should be received in order |
69 | assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx)); |
70 | assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx)); |
71 | |
72 | stream.push_front(d_rx); |
73 | d_tx.send(4).unwrap(); |
74 | |
75 | // we pushed `d_rx` to the front and sent 4, so we should recieve 4 next |
76 | // and then 3 after it |
77 | assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx)); |
78 | assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx)); |
79 | } |
80 | |
81 | #[test] |
82 | fn test_push_back() { |
83 | let (a_tx, a_rx) = oneshot::channel::<i32>(); |
84 | let (b_tx, b_rx) = oneshot::channel::<i32>(); |
85 | let (c_tx, c_rx) = oneshot::channel::<i32>(); |
86 | let (d_tx, d_rx) = oneshot::channel::<i32>(); |
87 | |
88 | let mut stream = FuturesOrdered::new(); |
89 | |
90 | let mut cx = noop_context(); |
91 | |
92 | stream.push_back(a_rx); |
93 | stream.push_back(b_rx); |
94 | stream.push_back(c_rx); |
95 | |
96 | a_tx.send(1).unwrap(); |
97 | b_tx.send(2).unwrap(); |
98 | c_tx.send(3).unwrap(); |
99 | |
100 | // All results should be received in order |
101 | |
102 | assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx)); |
103 | assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx)); |
104 | |
105 | stream.push_back(d_rx); |
106 | d_tx.send(4).unwrap(); |
107 | |
108 | assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx)); |
109 | assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx)); |
110 | } |
111 | |
112 | #[test] |
113 | fn from_iterator() { |
114 | let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)] |
115 | .into_iter() |
116 | .collect::<FuturesOrdered<_>>(); |
117 | assert_eq!(stream.len(), 3); |
118 | assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]); |
119 | } |
120 | |
121 | #[test] |
122 | fn queue_never_unblocked() { |
123 | let (_a_tx, a_rx) = oneshot::channel::<Box<dyn Any + Send>>(); |
124 | let (b_tx, b_rx) = oneshot::channel::<Box<dyn Any + Send>>(); |
125 | let (c_tx, c_rx) = oneshot::channel::<Box<dyn Any + Send>>(); |
126 | |
127 | let mut stream = vec![ |
128 | Box::new(a_rx) as Box<dyn Future<Output = _> + Unpin>, |
129 | Box::new( |
130 | future::try_select(b_rx, c_rx) |
131 | .map_err(|e| e.factor_first().0) |
132 | .and_then(|e| future::ok(Box::new(e) as Box<dyn Any + Send>)), |
133 | ) as _, |
134 | ] |
135 | .into_iter() |
136 | .collect::<FuturesOrdered<_>>(); |
137 | |
138 | let cx = &mut noop_context(); |
139 | for _ in 0..10 { |
140 | assert!(stream.poll_next_unpin(cx).is_pending()); |
141 | } |
142 | |
143 | b_tx.send(Box::new(())).unwrap(); |
144 | assert!(stream.poll_next_unpin(cx).is_pending()); |
145 | c_tx.send(Box::new(())).unwrap(); |
146 | assert!(stream.poll_next_unpin(cx).is_pending()); |
147 | assert!(stream.poll_next_unpin(cx).is_pending()); |
148 | } |
149 | |
150 | #[test] |
151 | fn test_push_front_negative() { |
152 | let (a_tx, a_rx) = oneshot::channel::<i32>(); |
153 | let (b_tx, b_rx) = oneshot::channel::<i32>(); |
154 | let (c_tx, c_rx) = oneshot::channel::<i32>(); |
155 | |
156 | let mut stream = FuturesOrdered::new(); |
157 | |
158 | let mut cx = noop_context(); |
159 | |
160 | stream.push_front(a_rx); |
161 | stream.push_front(b_rx); |
162 | stream.push_front(c_rx); |
163 | |
164 | a_tx.send(1).unwrap(); |
165 | b_tx.send(2).unwrap(); |
166 | c_tx.send(3).unwrap(); |
167 | |
168 | // These should all be recieved in reverse order |
169 | assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx)); |
170 | assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx)); |
171 | assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx)); |
172 | } |
173 | |