1 | use futures::channel::mpsc; |
2 | use futures::executor::{block_on, block_on_stream}; |
3 | use futures::future::{self, FutureExt}; |
4 | use futures::stream::{self, select_all, FusedStream, SelectAll, StreamExt}; |
5 | use futures::task::Poll; |
6 | use futures_test::task::noop_context; |
7 | |
8 | #[test] |
9 | fn is_terminated() { |
10 | let mut cx = noop_context(); |
11 | let mut tasks = SelectAll::new(); |
12 | |
13 | assert_eq!(tasks.is_terminated(), false); |
14 | assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None)); |
15 | assert_eq!(tasks.is_terminated(), true); |
16 | |
17 | // Test that the sentinel value doesn't leak |
18 | assert_eq!(tasks.is_empty(), true); |
19 | assert_eq!(tasks.len(), 0); |
20 | |
21 | tasks.push(future::ready(1).into_stream()); |
22 | |
23 | assert_eq!(tasks.is_empty(), false); |
24 | assert_eq!(tasks.len(), 1); |
25 | |
26 | assert_eq!(tasks.is_terminated(), false); |
27 | assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(Some(1))); |
28 | assert_eq!(tasks.is_terminated(), false); |
29 | assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None)); |
30 | assert_eq!(tasks.is_terminated(), true); |
31 | } |
32 | |
33 | #[test] |
34 | fn issue_1626() { |
35 | let a = stream::iter(0..=2); |
36 | let b = stream::iter(10..=14); |
37 | |
38 | let mut s = block_on_stream(stream::select_all(vec![a, b])); |
39 | |
40 | assert_eq!(s.next(), Some(0)); |
41 | assert_eq!(s.next(), Some(10)); |
42 | assert_eq!(s.next(), Some(1)); |
43 | assert_eq!(s.next(), Some(11)); |
44 | assert_eq!(s.next(), Some(2)); |
45 | assert_eq!(s.next(), Some(12)); |
46 | assert_eq!(s.next(), Some(13)); |
47 | assert_eq!(s.next(), Some(14)); |
48 | assert_eq!(s.next(), None); |
49 | } |
50 | |
51 | #[test] |
52 | fn works_1() { |
53 | let (a_tx, a_rx) = mpsc::unbounded::<u32>(); |
54 | let (b_tx, b_rx) = mpsc::unbounded::<u32>(); |
55 | let (c_tx, c_rx) = mpsc::unbounded::<u32>(); |
56 | |
57 | let streams = vec![a_rx, b_rx, c_rx]; |
58 | |
59 | let mut stream = block_on_stream(select_all(streams)); |
60 | |
61 | b_tx.unbounded_send(99).unwrap(); |
62 | a_tx.unbounded_send(33).unwrap(); |
63 | assert_eq!(Some(33), stream.next()); |
64 | assert_eq!(Some(99), stream.next()); |
65 | |
66 | b_tx.unbounded_send(99).unwrap(); |
67 | a_tx.unbounded_send(33).unwrap(); |
68 | assert_eq!(Some(33), stream.next()); |
69 | assert_eq!(Some(99), stream.next()); |
70 | |
71 | c_tx.unbounded_send(42).unwrap(); |
72 | assert_eq!(Some(42), stream.next()); |
73 | a_tx.unbounded_send(43).unwrap(); |
74 | assert_eq!(Some(43), stream.next()); |
75 | |
76 | drop((a_tx, b_tx, c_tx)); |
77 | assert_eq!(None, stream.next()); |
78 | } |
79 | |
80 | #[test] |
81 | fn clear() { |
82 | let mut tasks = |
83 | select_all(vec![stream::iter(vec![1].into_iter()), stream::iter(vec![2].into_iter())]); |
84 | |
85 | assert_eq!(block_on(tasks.next()), Some(1)); |
86 | assert!(!tasks.is_empty()); |
87 | |
88 | tasks.clear(); |
89 | assert!(tasks.is_empty()); |
90 | |
91 | tasks.push(stream::iter(vec![3].into_iter())); |
92 | assert!(!tasks.is_empty()); |
93 | |
94 | tasks.clear(); |
95 | assert!(tasks.is_empty()); |
96 | |
97 | assert_eq!(block_on(tasks.next()), None); |
98 | assert!(tasks.is_terminated()); |
99 | tasks.clear(); |
100 | assert!(!tasks.is_terminated()); |
101 | } |
102 | |
103 | #[test] |
104 | fn iter_mut() { |
105 | let mut stream = |
106 | vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()] |
107 | .into_iter() |
108 | .collect::<SelectAll<_>>(); |
109 | |
110 | let mut iter = stream.iter_mut(); |
111 | assert_eq!(iter.len(), 3); |
112 | assert!(iter.next().is_some()); |
113 | assert_eq!(iter.len(), 2); |
114 | assert!(iter.next().is_some()); |
115 | assert_eq!(iter.len(), 1); |
116 | assert!(iter.next().is_some()); |
117 | assert_eq!(iter.len(), 0); |
118 | assert!(iter.next().is_none()); |
119 | |
120 | let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])] |
121 | .into_iter() |
122 | .collect::<SelectAll<_>>(); |
123 | |
124 | assert_eq!(stream.len(), 3); |
125 | assert_eq!(block_on(stream.next()), Some(1)); |
126 | assert_eq!(stream.len(), 2); |
127 | let mut iter = stream.iter_mut(); |
128 | assert_eq!(iter.len(), 2); |
129 | assert!(iter.next().is_some()); |
130 | assert_eq!(iter.len(), 1); |
131 | assert!(iter.next().is_some()); |
132 | assert_eq!(iter.len(), 0); |
133 | assert!(iter.next().is_none()); |
134 | |
135 | assert_eq!(block_on(stream.next()), Some(2)); |
136 | assert_eq!(stream.len(), 2); |
137 | assert_eq!(block_on(stream.next()), None); |
138 | let mut iter = stream.iter_mut(); |
139 | assert_eq!(iter.len(), 0); |
140 | assert!(iter.next().is_none()); |
141 | } |
142 | |
143 | #[test] |
144 | fn iter() { |
145 | let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()] |
146 | .into_iter() |
147 | .collect::<SelectAll<_>>(); |
148 | |
149 | let mut iter = stream.iter(); |
150 | assert_eq!(iter.len(), 3); |
151 | assert!(iter.next().is_some()); |
152 | assert_eq!(iter.len(), 2); |
153 | assert!(iter.next().is_some()); |
154 | assert_eq!(iter.len(), 1); |
155 | assert!(iter.next().is_some()); |
156 | assert_eq!(iter.len(), 0); |
157 | assert!(iter.next().is_none()); |
158 | |
159 | let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])] |
160 | .into_iter() |
161 | .collect::<SelectAll<_>>(); |
162 | |
163 | assert_eq!(stream.len(), 3); |
164 | assert_eq!(block_on(stream.next()), Some(1)); |
165 | assert_eq!(stream.len(), 2); |
166 | let mut iter = stream.iter(); |
167 | assert_eq!(iter.len(), 2); |
168 | assert!(iter.next().is_some()); |
169 | assert_eq!(iter.len(), 1); |
170 | assert!(iter.next().is_some()); |
171 | assert_eq!(iter.len(), 0); |
172 | assert!(iter.next().is_none()); |
173 | |
174 | assert_eq!(block_on(stream.next()), Some(2)); |
175 | assert_eq!(stream.len(), 2); |
176 | assert_eq!(block_on(stream.next()), None); |
177 | let mut iter = stream.iter(); |
178 | assert_eq!(iter.len(), 0); |
179 | assert!(iter.next().is_none()); |
180 | } |
181 | |
182 | #[test] |
183 | fn into_iter() { |
184 | let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()] |
185 | .into_iter() |
186 | .collect::<SelectAll<_>>(); |
187 | |
188 | let mut iter = stream.into_iter(); |
189 | assert_eq!(iter.len(), 3); |
190 | assert!(iter.next().is_some()); |
191 | assert_eq!(iter.len(), 2); |
192 | assert!(iter.next().is_some()); |
193 | assert_eq!(iter.len(), 1); |
194 | assert!(iter.next().is_some()); |
195 | assert_eq!(iter.len(), 0); |
196 | assert!(iter.next().is_none()); |
197 | } |
198 | |