1use tokio_stream::{self as stream, pending, Stream, StreamExt, StreamMap};
2use tokio_test::{assert_ok, assert_pending, assert_ready, task};
3
4mod support {
5 pub(crate) mod mpsc;
6}
7
8use support::mpsc;
9
10use std::pin::Pin;
11
12macro_rules! assert_ready_some {
13 ($($t:tt)*) => {
14 match assert_ready!($($t)*) {
15 Some(v) => v,
16 None => panic!("expected `Some`, got `None`"),
17 }
18 };
19}
20
21macro_rules! assert_ready_none {
22 ($($t:tt)*) => {
23 match assert_ready!($($t)*) {
24 None => {}
25 Some(v) => panic!("expected `None`, got `Some({:?})`", v),
26 }
27 };
28}
29
30#[tokio::test]
31async fn empty() {
32 let mut map = StreamMap::<&str, stream::Pending<()>>::new();
33
34 assert_eq!(map.len(), 0);
35 assert!(map.is_empty());
36
37 assert!(map.next().await.is_none());
38 assert!(map.next().await.is_none());
39
40 assert!(map.remove("foo").is_none());
41}
42
43#[tokio::test]
44async fn single_entry() {
45 let mut map = task::spawn(StreamMap::new());
46 let (tx, rx) = mpsc::unbounded_channel_stream();
47 let rx = Box::pin(rx);
48
49 assert_ready_none!(map.poll_next());
50
51 assert!(map.insert("foo", rx).is_none());
52 assert!(map.contains_key("foo"));
53 assert!(!map.contains_key("bar"));
54
55 assert_eq!(map.len(), 1);
56 assert!(!map.is_empty());
57
58 assert_pending!(map.poll_next());
59
60 assert_ok!(tx.send(1));
61
62 assert!(map.is_woken());
63 let (k, v) = assert_ready_some!(map.poll_next());
64 assert_eq!(k, "foo");
65 assert_eq!(v, 1);
66
67 assert_pending!(map.poll_next());
68
69 assert_ok!(tx.send(2));
70
71 assert!(map.is_woken());
72 let (k, v) = assert_ready_some!(map.poll_next());
73 assert_eq!(k, "foo");
74 assert_eq!(v, 2);
75
76 assert_pending!(map.poll_next());
77 drop(tx);
78 assert!(map.is_woken());
79 assert_ready_none!(map.poll_next());
80}
81
82#[tokio::test]
83async fn multiple_entries() {
84 let mut map = task::spawn(StreamMap::new());
85 let (tx1, rx1) = mpsc::unbounded_channel_stream();
86 let (tx2, rx2) = mpsc::unbounded_channel_stream();
87
88 let rx1 = Box::pin(rx1);
89 let rx2 = Box::pin(rx2);
90
91 map.insert("foo", rx1);
92 map.insert("bar", rx2);
93
94 assert_pending!(map.poll_next());
95
96 assert_ok!(tx1.send(1));
97
98 assert!(map.is_woken());
99 let (k, v) = assert_ready_some!(map.poll_next());
100 assert_eq!(k, "foo");
101 assert_eq!(v, 1);
102
103 assert_pending!(map.poll_next());
104
105 assert_ok!(tx2.send(2));
106
107 assert!(map.is_woken());
108 let (k, v) = assert_ready_some!(map.poll_next());
109 assert_eq!(k, "bar");
110 assert_eq!(v, 2);
111
112 assert_pending!(map.poll_next());
113
114 assert_ok!(tx1.send(3));
115 assert_ok!(tx2.send(4));
116
117 assert!(map.is_woken());
118
119 // Given the randomization, there is no guarantee what order the values will
120 // be received in.
121 let mut v = (0..2)
122 .map(|_| assert_ready_some!(map.poll_next()))
123 .collect::<Vec<_>>();
124
125 assert_pending!(map.poll_next());
126
127 v.sort_unstable();
128 assert_eq!(v[0].0, "bar");
129 assert_eq!(v[0].1, 4);
130 assert_eq!(v[1].0, "foo");
131 assert_eq!(v[1].1, 3);
132
133 drop(tx1);
134 assert!(map.is_woken());
135 assert_pending!(map.poll_next());
136 drop(tx2);
137
138 assert_ready_none!(map.poll_next());
139}
140
141#[tokio::test]
142async fn insert_remove() {
143 let mut map = task::spawn(StreamMap::new());
144 let (tx, rx) = mpsc::unbounded_channel_stream();
145
146 let rx = Box::pin(rx);
147
148 assert_ready_none!(map.poll_next());
149
150 assert!(map.insert("foo", rx).is_none());
151 let rx = map.remove("foo").unwrap();
152
153 assert_ok!(tx.send(1));
154
155 assert!(!map.is_woken());
156 assert_ready_none!(map.poll_next());
157
158 assert!(map.insert("bar", rx).is_none());
159
160 let v = assert_ready_some!(map.poll_next());
161 assert_eq!(v.0, "bar");
162 assert_eq!(v.1, 1);
163
164 assert!(map.remove("bar").is_some());
165 assert_ready_none!(map.poll_next());
166
167 assert!(map.is_empty());
168 assert_eq!(0, map.len());
169}
170
171#[tokio::test]
172async fn replace() {
173 let mut map = task::spawn(StreamMap::new());
174 let (tx1, rx1) = mpsc::unbounded_channel_stream();
175 let (tx2, rx2) = mpsc::unbounded_channel_stream();
176
177 let rx1 = Box::pin(rx1);
178 let rx2 = Box::pin(rx2);
179
180 assert!(map.insert("foo", rx1).is_none());
181
182 assert_pending!(map.poll_next());
183
184 let _rx1 = map.insert("foo", rx2).unwrap();
185
186 assert_pending!(map.poll_next());
187
188 tx1.send(1).unwrap();
189 assert_pending!(map.poll_next());
190
191 tx2.send(2).unwrap();
192 assert!(map.is_woken());
193 let v = assert_ready_some!(map.poll_next());
194 assert_eq!(v.0, "foo");
195 assert_eq!(v.1, 2);
196}
197
198#[test]
199fn size_hint_with_upper() {
200 let mut map = StreamMap::new();
201
202 map.insert("a", stream::iter(vec![1]));
203 map.insert("b", stream::iter(vec![1, 2]));
204 map.insert("c", stream::iter(vec![1, 2, 3]));
205
206 assert_eq!(3, map.len());
207 assert!(!map.is_empty());
208
209 let size_hint = map.size_hint();
210 assert_eq!(size_hint, (6, Some(6)));
211}
212
213#[test]
214fn size_hint_without_upper() {
215 let mut map = StreamMap::new();
216
217 map.insert("a", pin_box(stream::iter(vec![1])));
218 map.insert("b", pin_box(stream::iter(vec![1, 2])));
219 map.insert("c", pin_box(pending()));
220
221 let size_hint = map.size_hint();
222 assert_eq!(size_hint, (3, None));
223}
224
225#[test]
226fn new_capacity_zero() {
227 let map = StreamMap::<&str, stream::Pending<()>>::new();
228 assert_eq!(0, map.capacity());
229
230 assert!(map.keys().next().is_none());
231}
232
233#[test]
234fn with_capacity() {
235 let map = StreamMap::<&str, stream::Pending<()>>::with_capacity(10);
236 assert!(10 <= map.capacity());
237
238 assert!(map.keys().next().is_none());
239}
240
241#[test]
242fn iter_keys() {
243 let mut map = StreamMap::new();
244
245 map.insert("a", pending::<i32>());
246 map.insert("b", pending());
247 map.insert("c", pending());
248
249 let mut keys = map.keys().collect::<Vec<_>>();
250 keys.sort_unstable();
251
252 assert_eq!(&keys[..], &[&"a", &"b", &"c"]);
253}
254
255#[test]
256fn iter_values() {
257 let mut map = StreamMap::new();
258
259 map.insert("a", stream::iter(vec![1]));
260 map.insert("b", stream::iter(vec![1, 2]));
261 map.insert("c", stream::iter(vec![1, 2, 3]));
262
263 let mut size_hints = map.values().map(|s| s.size_hint().0).collect::<Vec<_>>();
264
265 size_hints.sort_unstable();
266
267 assert_eq!(&size_hints[..], &[1, 2, 3]);
268}
269
270#[test]
271fn iter_values_mut() {
272 let mut map = StreamMap::new();
273
274 map.insert("a", stream::iter(vec![1]));
275 map.insert("b", stream::iter(vec![1, 2]));
276 map.insert("c", stream::iter(vec![1, 2, 3]));
277
278 let mut size_hints = map
279 .values_mut()
280 .map(|s: &mut _| s.size_hint().0)
281 .collect::<Vec<_>>();
282
283 size_hints.sort_unstable();
284
285 assert_eq!(&size_hints[..], &[1, 2, 3]);
286}
287
288#[test]
289fn clear() {
290 let mut map = task::spawn(StreamMap::new());
291
292 map.insert("a", stream::iter(vec![1]));
293 map.insert("b", stream::iter(vec![1, 2]));
294 map.insert("c", stream::iter(vec![1, 2, 3]));
295
296 assert_ready_some!(map.poll_next());
297
298 map.clear();
299
300 assert_ready_none!(map.poll_next());
301 assert!(map.is_empty());
302}
303
304#[test]
305fn contains_key_borrow() {
306 let mut map = StreamMap::new();
307 map.insert("foo".to_string(), pending::<()>());
308
309 assert!(map.contains_key("foo"));
310}
311
312#[test]
313fn one_ready_many_none() {
314 // Run a few times because of randomness
315 for _ in 0..100 {
316 let mut map = task::spawn(StreamMap::new());
317
318 map.insert(0, pin_box(stream::empty()));
319 map.insert(1, pin_box(stream::empty()));
320 map.insert(2, pin_box(stream::once("hello")));
321 map.insert(3, pin_box(stream::pending()));
322
323 let v = assert_ready_some!(map.poll_next());
324 assert_eq!(v, (2, "hello"));
325 }
326}
327
328fn pin_box<T: Stream<Item = U> + 'static, U>(s: T) -> Pin<Box<dyn Stream<Item = U>>> {
329 Box::pin(s)
330}
331