1use tokio_stream::{self as stream, StreamExt};
2use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task};
3
4mod support {
5 pub(crate) mod mpsc;
6}
7
8use support::mpsc;
9
10#[allow(clippy::let_unit_value)]
11#[tokio::test]
12async fn empty_unit() {
13 // Drains the stream.
14 let mut iter = vec![(), (), ()].into_iter();
15 let _: () = stream::iter(&mut iter).collect().await;
16 assert!(iter.next().is_none());
17}
18
19#[tokio::test]
20async fn empty_vec() {
21 let coll: Vec<u32> = stream::empty().collect().await;
22 assert!(coll.is_empty());
23}
24
25#[tokio::test]
26async fn empty_box_slice() {
27 let coll: Box<[u32]> = stream::empty().collect().await;
28 assert!(coll.is_empty());
29}
30
31#[tokio::test]
32async fn empty_string() {
33 let coll: String = stream::empty::<&str>().collect().await;
34 assert!(coll.is_empty());
35}
36
37#[tokio::test]
38async fn empty_result() {
39 let coll: Result<Vec<u32>, &str> = stream::empty().collect().await;
40 assert_eq!(Ok(vec![]), coll);
41}
42
43#[tokio::test]
44async fn collect_vec_items() {
45 let (tx, rx) = mpsc::unbounded_channel_stream();
46 let mut fut = task::spawn(rx.collect::<Vec<i32>>());
47
48 assert_pending!(fut.poll());
49
50 tx.send(1).unwrap();
51 assert!(fut.is_woken());
52 assert_pending!(fut.poll());
53
54 tx.send(2).unwrap();
55 assert!(fut.is_woken());
56 assert_pending!(fut.poll());
57
58 drop(tx);
59 assert!(fut.is_woken());
60 let coll = assert_ready!(fut.poll());
61 assert_eq!(vec![1, 2], coll);
62}
63
64#[tokio::test]
65async fn collect_string_items() {
66 let (tx, rx) = mpsc::unbounded_channel_stream();
67
68 let mut fut = task::spawn(rx.collect::<String>());
69
70 assert_pending!(fut.poll());
71
72 tx.send("hello ".to_string()).unwrap();
73 assert!(fut.is_woken());
74 assert_pending!(fut.poll());
75
76 tx.send("world".to_string()).unwrap();
77 assert!(fut.is_woken());
78 assert_pending!(fut.poll());
79
80 drop(tx);
81 assert!(fut.is_woken());
82 let coll = assert_ready!(fut.poll());
83 assert_eq!("hello world", coll);
84}
85
86#[tokio::test]
87async fn collect_str_items() {
88 let (tx, rx) = mpsc::unbounded_channel_stream();
89
90 let mut fut = task::spawn(rx.collect::<String>());
91
92 assert_pending!(fut.poll());
93
94 tx.send("hello ").unwrap();
95 assert!(fut.is_woken());
96 assert_pending!(fut.poll());
97
98 tx.send("world").unwrap();
99 assert!(fut.is_woken());
100 assert_pending!(fut.poll());
101
102 drop(tx);
103 assert!(fut.is_woken());
104 let coll = assert_ready!(fut.poll());
105 assert_eq!("hello world", coll);
106}
107
108#[tokio::test]
109async fn collect_results_ok() {
110 let (tx, rx) = mpsc::unbounded_channel_stream();
111
112 let mut fut = task::spawn(rx.collect::<Result<String, &str>>());
113
114 assert_pending!(fut.poll());
115
116 tx.send(Ok("hello ")).unwrap();
117 assert!(fut.is_woken());
118 assert_pending!(fut.poll());
119
120 tx.send(Ok("world")).unwrap();
121 assert!(fut.is_woken());
122 assert_pending!(fut.poll());
123
124 drop(tx);
125 assert!(fut.is_woken());
126 let coll = assert_ready_ok!(fut.poll());
127 assert_eq!("hello world", coll);
128}
129
130#[tokio::test]
131async fn collect_results_err() {
132 let (tx, rx) = mpsc::unbounded_channel_stream();
133
134 let mut fut = task::spawn(rx.collect::<Result<String, &str>>());
135
136 assert_pending!(fut.poll());
137
138 tx.send(Ok("hello ")).unwrap();
139 assert!(fut.is_woken());
140 assert_pending!(fut.poll());
141
142 tx.send(Err("oh no")).unwrap();
143 assert!(fut.is_woken());
144 let err = assert_ready_err!(fut.poll());
145 assert_eq!("oh no", err);
146}
147