1use futures::channel::mpsc;
2use futures::executor::{block_on, block_on_stream};
3use futures::future::{self, FutureExt};
4use futures::stream::{self, select_all, FusedStream, SelectAll, StreamExt};
5use futures::task::Poll;
6use futures_test::task::noop_context;
7
8#[test]
9fn is_terminated() {
10 let mut cx = noop_context();
11 let mut tasks = SelectAll::new();
12
13 assert_eq!(tasks.is_terminated(), false);
14 assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None));
15 assert_eq!(tasks.is_terminated(), true);
16
17 // Test that the sentinel value doesn't leak
18 assert_eq!(tasks.is_empty(), true);
19 assert_eq!(tasks.len(), 0);
20
21 tasks.push(future::ready(1).into_stream());
22
23 assert_eq!(tasks.is_empty(), false);
24 assert_eq!(tasks.len(), 1);
25
26 assert_eq!(tasks.is_terminated(), false);
27 assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(Some(1)));
28 assert_eq!(tasks.is_terminated(), false);
29 assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None));
30 assert_eq!(tasks.is_terminated(), true);
31}
32
33#[test]
34fn issue_1626() {
35 let a = stream::iter(0..=2);
36 let b = stream::iter(10..=14);
37
38 let mut s = block_on_stream(stream::select_all(vec![a, b]));
39
40 assert_eq!(s.next(), Some(0));
41 assert_eq!(s.next(), Some(10));
42 assert_eq!(s.next(), Some(1));
43 assert_eq!(s.next(), Some(11));
44 assert_eq!(s.next(), Some(2));
45 assert_eq!(s.next(), Some(12));
46 assert_eq!(s.next(), Some(13));
47 assert_eq!(s.next(), Some(14));
48 assert_eq!(s.next(), None);
49}
50
51#[test]
52fn works_1() {
53 let (a_tx, a_rx) = mpsc::unbounded::<u32>();
54 let (b_tx, b_rx) = mpsc::unbounded::<u32>();
55 let (c_tx, c_rx) = mpsc::unbounded::<u32>();
56
57 let streams = vec![a_rx, b_rx, c_rx];
58
59 let mut stream = block_on_stream(select_all(streams));
60
61 b_tx.unbounded_send(99).unwrap();
62 a_tx.unbounded_send(33).unwrap();
63 assert_eq!(Some(33), stream.next());
64 assert_eq!(Some(99), stream.next());
65
66 b_tx.unbounded_send(99).unwrap();
67 a_tx.unbounded_send(33).unwrap();
68 assert_eq!(Some(33), stream.next());
69 assert_eq!(Some(99), stream.next());
70
71 c_tx.unbounded_send(42).unwrap();
72 assert_eq!(Some(42), stream.next());
73 a_tx.unbounded_send(43).unwrap();
74 assert_eq!(Some(43), stream.next());
75
76 drop((a_tx, b_tx, c_tx));
77 assert_eq!(None, stream.next());
78}
79
80#[test]
81fn clear() {
82 let mut tasks =
83 select_all(vec![stream::iter(vec![1].into_iter()), stream::iter(vec![2].into_iter())]);
84
85 assert_eq!(block_on(tasks.next()), Some(1));
86 assert!(!tasks.is_empty());
87
88 tasks.clear();
89 assert!(tasks.is_empty());
90
91 tasks.push(stream::iter(vec![3].into_iter()));
92 assert!(!tasks.is_empty());
93
94 tasks.clear();
95 assert!(tasks.is_empty());
96
97 assert_eq!(block_on(tasks.next()), None);
98 assert!(tasks.is_terminated());
99 tasks.clear();
100 assert!(!tasks.is_terminated());
101}
102
103#[test]
104fn iter_mut() {
105 let mut stream =
106 vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()]
107 .into_iter()
108 .collect::<SelectAll<_>>();
109
110 let mut iter = stream.iter_mut();
111 assert_eq!(iter.len(), 3);
112 assert!(iter.next().is_some());
113 assert_eq!(iter.len(), 2);
114 assert!(iter.next().is_some());
115 assert_eq!(iter.len(), 1);
116 assert!(iter.next().is_some());
117 assert_eq!(iter.len(), 0);
118 assert!(iter.next().is_none());
119
120 let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])]
121 .into_iter()
122 .collect::<SelectAll<_>>();
123
124 assert_eq!(stream.len(), 3);
125 assert_eq!(block_on(stream.next()), Some(1));
126 assert_eq!(stream.len(), 2);
127 let mut iter = stream.iter_mut();
128 assert_eq!(iter.len(), 2);
129 assert!(iter.next().is_some());
130 assert_eq!(iter.len(), 1);
131 assert!(iter.next().is_some());
132 assert_eq!(iter.len(), 0);
133 assert!(iter.next().is_none());
134
135 assert_eq!(block_on(stream.next()), Some(2));
136 assert_eq!(stream.len(), 2);
137 assert_eq!(block_on(stream.next()), None);
138 let mut iter = stream.iter_mut();
139 assert_eq!(iter.len(), 0);
140 assert!(iter.next().is_none());
141}
142
143#[test]
144fn iter() {
145 let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()]
146 .into_iter()
147 .collect::<SelectAll<_>>();
148
149 let mut iter = stream.iter();
150 assert_eq!(iter.len(), 3);
151 assert!(iter.next().is_some());
152 assert_eq!(iter.len(), 2);
153 assert!(iter.next().is_some());
154 assert_eq!(iter.len(), 1);
155 assert!(iter.next().is_some());
156 assert_eq!(iter.len(), 0);
157 assert!(iter.next().is_none());
158
159 let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])]
160 .into_iter()
161 .collect::<SelectAll<_>>();
162
163 assert_eq!(stream.len(), 3);
164 assert_eq!(block_on(stream.next()), Some(1));
165 assert_eq!(stream.len(), 2);
166 let mut iter = stream.iter();
167 assert_eq!(iter.len(), 2);
168 assert!(iter.next().is_some());
169 assert_eq!(iter.len(), 1);
170 assert!(iter.next().is_some());
171 assert_eq!(iter.len(), 0);
172 assert!(iter.next().is_none());
173
174 assert_eq!(block_on(stream.next()), Some(2));
175 assert_eq!(stream.len(), 2);
176 assert_eq!(block_on(stream.next()), None);
177 let mut iter = stream.iter();
178 assert_eq!(iter.len(), 0);
179 assert!(iter.next().is_none());
180}
181
182#[test]
183fn into_iter() {
184 let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()]
185 .into_iter()
186 .collect::<SelectAll<_>>();
187
188 let mut iter = stream.into_iter();
189 assert_eq!(iter.len(), 3);
190 assert!(iter.next().is_some());
191 assert_eq!(iter.len(), 2);
192 assert!(iter.next().is_some());
193 assert_eq!(iter.len(), 1);
194 assert!(iter.next().is_some());
195 assert_eq!(iter.len(), 0);
196 assert!(iter.next().is_none());
197}
198