1 | use tokio_stream::{self as stream, Stream, StreamExt}; |
2 | use tokio_test::{assert_pending, assert_ready, task}; |
3 | |
4 | mod support { |
5 | pub(crate) mod mpsc; |
6 | } |
7 | |
8 | use support::mpsc; |
9 | |
10 | #[tokio::test ] |
11 | async fn basic_usage() { |
12 | let one = stream::iter(vec![1, 2, 3]); |
13 | let two = stream::iter(vec![4, 5, 6]); |
14 | |
15 | let mut stream = one.chain(two); |
16 | |
17 | assert_eq!(stream.size_hint(), (6, Some(6))); |
18 | assert_eq!(stream.next().await, Some(1)); |
19 | |
20 | assert_eq!(stream.size_hint(), (5, Some(5))); |
21 | assert_eq!(stream.next().await, Some(2)); |
22 | |
23 | assert_eq!(stream.size_hint(), (4, Some(4))); |
24 | assert_eq!(stream.next().await, Some(3)); |
25 | |
26 | assert_eq!(stream.size_hint(), (3, Some(3))); |
27 | assert_eq!(stream.next().await, Some(4)); |
28 | |
29 | assert_eq!(stream.size_hint(), (2, Some(2))); |
30 | assert_eq!(stream.next().await, Some(5)); |
31 | |
32 | assert_eq!(stream.size_hint(), (1, Some(1))); |
33 | assert_eq!(stream.next().await, Some(6)); |
34 | |
35 | assert_eq!(stream.size_hint(), (0, Some(0))); |
36 | assert_eq!(stream.next().await, None); |
37 | |
38 | assert_eq!(stream.size_hint(), (0, Some(0))); |
39 | assert_eq!(stream.next().await, None); |
40 | } |
41 | |
42 | #[tokio::test ] |
43 | async fn pending_first() { |
44 | let (tx1, rx1) = mpsc::unbounded_channel_stream(); |
45 | let (tx2, rx2) = mpsc::unbounded_channel_stream(); |
46 | |
47 | let mut stream = task::spawn(rx1.chain(rx2)); |
48 | assert_eq!(stream.size_hint(), (0, None)); |
49 | |
50 | assert_pending!(stream.poll_next()); |
51 | |
52 | tx2.send(2).unwrap(); |
53 | assert!(!stream.is_woken()); |
54 | |
55 | assert_pending!(stream.poll_next()); |
56 | |
57 | tx1.send(1).unwrap(); |
58 | assert!(stream.is_woken()); |
59 | assert_eq!(Some(1), assert_ready!(stream.poll_next())); |
60 | |
61 | assert_pending!(stream.poll_next()); |
62 | |
63 | drop(tx1); |
64 | |
65 | assert_eq!(stream.size_hint(), (0, None)); |
66 | |
67 | assert!(stream.is_woken()); |
68 | assert_eq!(Some(2), assert_ready!(stream.poll_next())); |
69 | |
70 | assert_eq!(stream.size_hint(), (0, None)); |
71 | |
72 | drop(tx2); |
73 | |
74 | assert_eq!(stream.size_hint(), (0, None)); |
75 | assert_eq!(None, assert_ready!(stream.poll_next())); |
76 | } |
77 | |
78 | #[test] |
79 | fn size_overflow() { |
80 | struct Monster; |
81 | |
82 | impl tokio_stream::Stream for Monster { |
83 | type Item = (); |
84 | fn poll_next( |
85 | self: std::pin::Pin<&mut Self>, |
86 | _cx: &mut std::task::Context<'_>, |
87 | ) -> std::task::Poll<Option<()>> { |
88 | panic!() |
89 | } |
90 | |
91 | fn size_hint(&self) -> (usize, Option<usize>) { |
92 | (usize::MAX, Some(usize::MAX)) |
93 | } |
94 | } |
95 | |
96 | let m1 = Monster; |
97 | let m2 = Monster; |
98 | let m = m1.chain(m2); |
99 | assert_eq!(m.size_hint(), (usize::MAX, None)); |
100 | } |
101 | |