1 | use tokio_stream::{self as stream, StreamExt}; |
2 | use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task}; |
3 | |
4 | mod support { |
5 | pub(crate) mod mpsc; |
6 | } |
7 | |
8 | use support::mpsc; |
9 | |
10 | #[allow (clippy::let_unit_value)] |
11 | #[tokio::test ] |
12 | async fn empty_unit() { |
13 | // Drains the stream. |
14 | let mut iter = vec![(), (), ()].into_iter(); |
15 | let _: () = stream::iter(&mut iter).collect().await; |
16 | assert!(iter.next().is_none()); |
17 | } |
18 | |
19 | #[tokio::test ] |
20 | async fn empty_vec() { |
21 | let coll: Vec<u32> = stream::empty().collect().await; |
22 | assert!(coll.is_empty()); |
23 | } |
24 | |
25 | #[tokio::test ] |
26 | async fn empty_box_slice() { |
27 | let coll: Box<[u32]> = stream::empty().collect().await; |
28 | assert!(coll.is_empty()); |
29 | } |
30 | |
31 | #[tokio::test ] |
32 | async fn empty_string() { |
33 | let coll: String = stream::empty::<&str>().collect().await; |
34 | assert!(coll.is_empty()); |
35 | } |
36 | |
37 | #[tokio::test ] |
38 | async fn empty_result() { |
39 | let coll: Result<Vec<u32>, &str> = stream::empty().collect().await; |
40 | assert_eq!(Ok(vec![]), coll); |
41 | } |
42 | |
43 | #[tokio::test ] |
44 | async fn collect_vec_items() { |
45 | let (tx, rx) = mpsc::unbounded_channel_stream(); |
46 | let mut fut = task::spawn(rx.collect::<Vec<i32>>()); |
47 | |
48 | assert_pending!(fut.poll()); |
49 | |
50 | tx.send(1).unwrap(); |
51 | assert!(fut.is_woken()); |
52 | assert_pending!(fut.poll()); |
53 | |
54 | tx.send(2).unwrap(); |
55 | assert!(fut.is_woken()); |
56 | assert_pending!(fut.poll()); |
57 | |
58 | drop(tx); |
59 | assert!(fut.is_woken()); |
60 | let coll = assert_ready!(fut.poll()); |
61 | assert_eq!(vec![1, 2], coll); |
62 | } |
63 | |
64 | #[tokio::test ] |
65 | async fn collect_string_items() { |
66 | let (tx, rx) = mpsc::unbounded_channel_stream(); |
67 | |
68 | let mut fut = task::spawn(rx.collect::<String>()); |
69 | |
70 | assert_pending!(fut.poll()); |
71 | |
72 | tx.send("hello " .to_string()).unwrap(); |
73 | assert!(fut.is_woken()); |
74 | assert_pending!(fut.poll()); |
75 | |
76 | tx.send("world" .to_string()).unwrap(); |
77 | assert!(fut.is_woken()); |
78 | assert_pending!(fut.poll()); |
79 | |
80 | drop(tx); |
81 | assert!(fut.is_woken()); |
82 | let coll = assert_ready!(fut.poll()); |
83 | assert_eq!("hello world" , coll); |
84 | } |
85 | |
86 | #[tokio::test ] |
87 | async fn collect_str_items() { |
88 | let (tx, rx) = mpsc::unbounded_channel_stream(); |
89 | |
90 | let mut fut = task::spawn(rx.collect::<String>()); |
91 | |
92 | assert_pending!(fut.poll()); |
93 | |
94 | tx.send("hello " ).unwrap(); |
95 | assert!(fut.is_woken()); |
96 | assert_pending!(fut.poll()); |
97 | |
98 | tx.send("world" ).unwrap(); |
99 | assert!(fut.is_woken()); |
100 | assert_pending!(fut.poll()); |
101 | |
102 | drop(tx); |
103 | assert!(fut.is_woken()); |
104 | let coll = assert_ready!(fut.poll()); |
105 | assert_eq!("hello world" , coll); |
106 | } |
107 | |
108 | #[tokio::test ] |
109 | async fn collect_results_ok() { |
110 | let (tx, rx) = mpsc::unbounded_channel_stream(); |
111 | |
112 | let mut fut = task::spawn(rx.collect::<Result<String, &str>>()); |
113 | |
114 | assert_pending!(fut.poll()); |
115 | |
116 | tx.send(Ok("hello " )).unwrap(); |
117 | assert!(fut.is_woken()); |
118 | assert_pending!(fut.poll()); |
119 | |
120 | tx.send(Ok("world" )).unwrap(); |
121 | assert!(fut.is_woken()); |
122 | assert_pending!(fut.poll()); |
123 | |
124 | drop(tx); |
125 | assert!(fut.is_woken()); |
126 | let coll = assert_ready_ok!(fut.poll()); |
127 | assert_eq!("hello world" , coll); |
128 | } |
129 | |
130 | #[tokio::test ] |
131 | async fn collect_results_err() { |
132 | let (tx, rx) = mpsc::unbounded_channel_stream(); |
133 | |
134 | let mut fut = task::spawn(rx.collect::<Result<String, &str>>()); |
135 | |
136 | assert_pending!(fut.poll()); |
137 | |
138 | tx.send(Ok("hello " )).unwrap(); |
139 | assert!(fut.is_woken()); |
140 | assert_pending!(fut.poll()); |
141 | |
142 | tx.send(Err("oh no" )).unwrap(); |
143 | assert!(fut.is_woken()); |
144 | let err = assert_ready_err!(fut.poll()); |
145 | assert_eq!("oh no" , err); |
146 | } |
147 | |