1use futures::channel::oneshot;
2use futures::executor::{block_on, block_on_stream};
3use futures::future::{self, join, Future, FutureExt, TryFutureExt};
4use futures::stream::{FuturesOrdered, StreamExt};
5use futures::task::Poll;
6use futures_test::task::noop_context;
7use std::any::Any;
8
9#[test]
10fn works_1() {
11 let (a_tx, a_rx) = oneshot::channel::<i32>();
12 let (b_tx, b_rx) = oneshot::channel::<i32>();
13 let (c_tx, c_rx) = oneshot::channel::<i32>();
14
15 let mut stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesOrdered<_>>();
16
17 b_tx.send(99).unwrap();
18 assert!(stream.poll_next_unpin(&mut noop_context()).is_pending());
19
20 a_tx.send(33).unwrap();
21 c_tx.send(33).unwrap();
22
23 let mut iter = block_on_stream(stream);
24 assert_eq!(Some(Ok(33)), iter.next());
25 assert_eq!(Some(Ok(99)), iter.next());
26 assert_eq!(Some(Ok(33)), iter.next());
27 assert_eq!(None, iter.next());
28}
29
30#[test]
31fn works_2() {
32 let (a_tx, a_rx) = oneshot::channel::<i32>();
33 let (b_tx, b_rx) = oneshot::channel::<i32>();
34 let (c_tx, c_rx) = oneshot::channel::<i32>();
35
36 let mut stream = vec![a_rx.boxed(), join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed()]
37 .into_iter()
38 .collect::<FuturesOrdered<_>>();
39
40 let mut cx = noop_context();
41 a_tx.send(33).unwrap();
42 b_tx.send(33).unwrap();
43 assert!(stream.poll_next_unpin(&mut cx).is_ready());
44 assert!(stream.poll_next_unpin(&mut cx).is_pending());
45 c_tx.send(33).unwrap();
46 assert!(stream.poll_next_unpin(&mut cx).is_ready());
47}
48
49#[test]
50fn test_push_front() {
51 let (a_tx, a_rx) = oneshot::channel::<i32>();
52 let (b_tx, b_rx) = oneshot::channel::<i32>();
53 let (c_tx, c_rx) = oneshot::channel::<i32>();
54 let (d_tx, d_rx) = oneshot::channel::<i32>();
55
56 let mut stream = FuturesOrdered::new();
57
58 let mut cx = noop_context();
59
60 stream.push_back(a_rx);
61 stream.push_back(b_rx);
62 stream.push_back(c_rx);
63
64 a_tx.send(1).unwrap();
65 b_tx.send(2).unwrap();
66 c_tx.send(3).unwrap();
67
68 // 1 and 2 should be received in order
69 assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
70 assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
71
72 stream.push_front(d_rx);
73 d_tx.send(4).unwrap();
74
75 // we pushed `d_rx` to the front and sent 4, so we should recieve 4 next
76 // and then 3 after it
77 assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx));
78 assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
79}
80
81#[test]
82fn test_push_back() {
83 let (a_tx, a_rx) = oneshot::channel::<i32>();
84 let (b_tx, b_rx) = oneshot::channel::<i32>();
85 let (c_tx, c_rx) = oneshot::channel::<i32>();
86 let (d_tx, d_rx) = oneshot::channel::<i32>();
87
88 let mut stream = FuturesOrdered::new();
89
90 let mut cx = noop_context();
91
92 stream.push_back(a_rx);
93 stream.push_back(b_rx);
94 stream.push_back(c_rx);
95
96 a_tx.send(1).unwrap();
97 b_tx.send(2).unwrap();
98 c_tx.send(3).unwrap();
99
100 // All results should be received in order
101
102 assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
103 assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
104
105 stream.push_back(d_rx);
106 d_tx.send(4).unwrap();
107
108 assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
109 assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx));
110}
111
112#[test]
113fn from_iterator() {
114 let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)]
115 .into_iter()
116 .collect::<FuturesOrdered<_>>();
117 assert_eq!(stream.len(), 3);
118 assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]);
119}
120
121#[test]
122fn queue_never_unblocked() {
123 let (_a_tx, a_rx) = oneshot::channel::<Box<dyn Any + Send>>();
124 let (b_tx, b_rx) = oneshot::channel::<Box<dyn Any + Send>>();
125 let (c_tx, c_rx) = oneshot::channel::<Box<dyn Any + Send>>();
126
127 let mut stream = vec![
128 Box::new(a_rx) as Box<dyn Future<Output = _> + Unpin>,
129 Box::new(
130 future::try_select(b_rx, c_rx)
131 .map_err(|e| e.factor_first().0)
132 .and_then(|e| future::ok(Box::new(e) as Box<dyn Any + Send>)),
133 ) as _,
134 ]
135 .into_iter()
136 .collect::<FuturesOrdered<_>>();
137
138 let cx = &mut noop_context();
139 for _ in 0..10 {
140 assert!(stream.poll_next_unpin(cx).is_pending());
141 }
142
143 b_tx.send(Box::new(())).unwrap();
144 assert!(stream.poll_next_unpin(cx).is_pending());
145 c_tx.send(Box::new(())).unwrap();
146 assert!(stream.poll_next_unpin(cx).is_pending());
147 assert!(stream.poll_next_unpin(cx).is_pending());
148}
149
150#[test]
151fn test_push_front_negative() {
152 let (a_tx, a_rx) = oneshot::channel::<i32>();
153 let (b_tx, b_rx) = oneshot::channel::<i32>();
154 let (c_tx, c_rx) = oneshot::channel::<i32>();
155
156 let mut stream = FuturesOrdered::new();
157
158 let mut cx = noop_context();
159
160 stream.push_front(a_rx);
161 stream.push_front(b_rx);
162 stream.push_front(c_rx);
163
164 a_tx.send(1).unwrap();
165 b_tx.send(2).unwrap();
166 c_tx.send(3).unwrap();
167
168 // These should all be recieved in reverse order
169 assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
170 assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
171 assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
172}
173