1use async_stream::stream;
2
3use futures_core::stream::{FusedStream, Stream};
4use futures_util::pin_mut;
5use futures_util::stream::StreamExt;
6use tokio::sync::mpsc;
7use tokio_test::assert_ok;
8
9#[tokio::test]
10async fn noop_stream() {
11 let s = stream! {};
12 pin_mut!(s);
13
14 while s.next().await.is_some() {
15 unreachable!();
16 }
17}
18
19#[tokio::test]
20async fn empty_stream() {
21 let mut ran = false;
22
23 {
24 let r = &mut ran;
25 let s = stream! {
26 *r = true;
27 println!("hello world!");
28 };
29 pin_mut!(s);
30
31 while s.next().await.is_some() {
32 unreachable!();
33 }
34 }
35
36 assert!(ran);
37}
38
39#[tokio::test]
40async fn yield_single_value() {
41 let s = stream! {
42 yield "hello";
43 };
44
45 let values: Vec<_> = s.collect().await;
46
47 assert_eq!(1, values.len());
48 assert_eq!("hello", values[0]);
49}
50
51#[tokio::test]
52async fn fused() {
53 let s = stream! {
54 yield "hello";
55 };
56 pin_mut!(s);
57
58 assert!(!s.is_terminated());
59 assert_eq!(s.next().await, Some("hello"));
60 assert_eq!(s.next().await, None);
61
62 assert!(s.is_terminated());
63 // This should return None from now on
64 assert_eq!(s.next().await, None);
65}
66
67#[tokio::test]
68async fn yield_multi_value() {
69 let s = stream! {
70 yield "hello";
71 yield "world";
72 yield "dizzy";
73 };
74
75 let values: Vec<_> = s.collect().await;
76
77 assert_eq!(3, values.len());
78 assert_eq!("hello", values[0]);
79 assert_eq!("world", values[1]);
80 assert_eq!("dizzy", values[2]);
81}
82
83#[tokio::test]
84async fn unit_yield_in_select() {
85 use tokio::select;
86
87 async fn do_stuff_async() {}
88
89 let s = stream! {
90 select! {
91 _ = do_stuff_async() => yield,
92 else => yield,
93 }
94 };
95
96 let values: Vec<_> = s.collect().await;
97 assert_eq!(values.len(), 1);
98}
99
100#[tokio::test]
101async fn yield_with_select() {
102 use tokio::select;
103
104 async fn do_stuff_async() {}
105 async fn more_async_work() {}
106
107 let s = stream! {
108 select! {
109 _ = do_stuff_async() => yield "hey",
110 _ = more_async_work() => yield "hey",
111 else => yield "hey",
112 }
113 };
114
115 let values: Vec<_> = s.collect().await;
116 assert_eq!(values, vec!["hey"]);
117}
118
119#[tokio::test]
120async fn return_stream() {
121 fn build_stream() -> impl Stream<Item = u32> {
122 stream! {
123 yield 1;
124 yield 2;
125 yield 3;
126 }
127 }
128
129 let s = build_stream();
130
131 let values: Vec<_> = s.collect().await;
132 assert_eq!(3, values.len());
133 assert_eq!(1, values[0]);
134 assert_eq!(2, values[1]);
135 assert_eq!(3, values[2]);
136}
137
138#[tokio::test]
139async fn consume_channel() {
140 let (tx, mut rx) = mpsc::channel(10);
141
142 let s = stream! {
143 while let Some(v) = rx.recv().await {
144 yield v;
145 }
146 };
147
148 pin_mut!(s);
149
150 for i in 0..3 {
151 assert_ok!(tx.send(i).await);
152 assert_eq!(Some(i), s.next().await);
153 }
154
155 drop(tx);
156 assert_eq!(None, s.next().await);
157}
158
159#[tokio::test]
160async fn borrow_self() {
161 struct Data(String);
162
163 impl Data {
164 fn stream<'a>(&'a self) -> impl Stream<Item = &str> + 'a {
165 stream! {
166 yield &self.0[..];
167 }
168 }
169 }
170
171 let data = Data("hello".to_string());
172 let s = data.stream();
173 pin_mut!(s);
174
175 assert_eq!(Some("hello"), s.next().await);
176}
177
178#[tokio::test]
179async fn stream_in_stream() {
180 let s = stream! {
181 let s = stream! {
182 for i in 0..3 {
183 yield i;
184 }
185 };
186
187 pin_mut!(s);
188 while let Some(v) = s.next().await {
189 yield v;
190 }
191 };
192
193 let values: Vec<_> = s.collect().await;
194 assert_eq!(3, values.len());
195}
196
197#[tokio::test]
198async fn yield_non_unpin_value() {
199 let s: Vec<_> = stream! {
200 for i in 0..3 {
201 yield async move { i };
202 }
203 }
204 .buffered(1)
205 .collect()
206 .await;
207
208 assert_eq!(s, vec![0, 1, 2]);
209}
210
211#[test]
212fn inner_try_stream() {
213 use async_stream::try_stream;
214 use tokio::select;
215
216 async fn do_stuff_async() {}
217
218 let _ = stream! {
219 select! {
220 _ = do_stuff_async() => {
221 let another_s = try_stream! {
222 yield;
223 };
224 let _: Result<(), ()> = Box::pin(another_s).next().await.unwrap();
225 },
226 else => {},
227 }
228 yield
229 };
230}
231
232#[rustversion::attr(not(stable), ignore)]
233#[test]
234fn test() {
235 let t = trybuild::TestCases::new();
236 t.compile_fail("tests/ui/*.rs");
237}
238