1 | #![warn (rust_2018_idioms)] |
2 | #![cfg (feature = "full" )] |
3 | |
4 | use futures::future::FutureExt; |
5 | use tokio::sync::oneshot; |
6 | use tokio::task::JoinSet; |
7 | use tokio::time::Duration; |
8 | |
9 | fn rt() -> tokio::runtime::Runtime { |
10 | tokio::runtime::Builder::new_current_thread() |
11 | .build() |
12 | .unwrap() |
13 | } |
14 | |
15 | #[tokio::test (start_paused = true)] |
16 | async fn test_with_sleep() { |
17 | let mut set = JoinSet::new(); |
18 | |
19 | for i in 0..10 { |
20 | set.spawn(async move { i }); |
21 | assert_eq!(set.len(), 1 + i); |
22 | } |
23 | set.detach_all(); |
24 | assert_eq!(set.len(), 0); |
25 | |
26 | assert!(matches!(set.join_next().await, None)); |
27 | |
28 | for i in 0..10 { |
29 | set.spawn(async move { |
30 | tokio::time::sleep(Duration::from_secs(i as u64)).await; |
31 | i |
32 | }); |
33 | assert_eq!(set.len(), 1 + i); |
34 | } |
35 | |
36 | let mut seen = [false; 10]; |
37 | while let Some(res) = set.join_next().await.transpose().unwrap() { |
38 | seen[res] = true; |
39 | } |
40 | |
41 | for was_seen in &seen { |
42 | assert!(was_seen); |
43 | } |
44 | assert!(matches!(set.join_next().await, None)); |
45 | |
46 | // Do it again. |
47 | for i in 0..10 { |
48 | set.spawn(async move { |
49 | tokio::time::sleep(Duration::from_secs(i as u64)).await; |
50 | i |
51 | }); |
52 | } |
53 | |
54 | let mut seen = [false; 10]; |
55 | while let Some(res) = set.join_next().await.transpose().unwrap() { |
56 | seen[res] = true; |
57 | } |
58 | |
59 | for was_seen in &seen { |
60 | assert!(was_seen); |
61 | } |
62 | assert!(matches!(set.join_next().await, None)); |
63 | } |
64 | |
65 | #[tokio::test ] |
66 | async fn test_abort_on_drop() { |
67 | let mut set = JoinSet::new(); |
68 | |
69 | let mut recvs = Vec::new(); |
70 | |
71 | for _ in 0..16 { |
72 | let (send, recv) = oneshot::channel::<()>(); |
73 | recvs.push(recv); |
74 | |
75 | set.spawn(async { |
76 | // This task will never complete on its own. |
77 | futures::future::pending::<()>().await; |
78 | drop(send); |
79 | }); |
80 | } |
81 | |
82 | drop(set); |
83 | |
84 | for recv in recvs { |
85 | // The task is aborted soon and we will receive an error. |
86 | assert!(recv.await.is_err()); |
87 | } |
88 | } |
89 | |
90 | #[tokio::test ] |
91 | async fn alternating() { |
92 | let mut set = JoinSet::new(); |
93 | |
94 | assert_eq!(set.len(), 0); |
95 | set.spawn(async {}); |
96 | assert_eq!(set.len(), 1); |
97 | set.spawn(async {}); |
98 | assert_eq!(set.len(), 2); |
99 | |
100 | for _ in 0..16 { |
101 | let () = set.join_next().await.unwrap().unwrap(); |
102 | assert_eq!(set.len(), 1); |
103 | set.spawn(async {}); |
104 | assert_eq!(set.len(), 2); |
105 | } |
106 | } |
107 | |
108 | #[tokio::test (start_paused = true)] |
109 | async fn abort_tasks() { |
110 | let mut set = JoinSet::new(); |
111 | let mut num_canceled = 0; |
112 | let mut num_completed = 0; |
113 | for i in 0..16 { |
114 | let abort = set.spawn(async move { |
115 | tokio::time::sleep(Duration::from_secs(i as u64)).await; |
116 | i |
117 | }); |
118 | |
119 | if i % 2 != 0 { |
120 | // abort odd-numbered tasks. |
121 | abort.abort(); |
122 | } |
123 | } |
124 | loop { |
125 | match set.join_next().await { |
126 | Some(Ok(res)) => { |
127 | num_completed += 1; |
128 | assert_eq!(res % 2, 0); |
129 | } |
130 | Some(Err(e)) => { |
131 | assert!(e.is_cancelled()); |
132 | num_canceled += 1; |
133 | } |
134 | None => break, |
135 | } |
136 | } |
137 | |
138 | assert_eq!(num_canceled, 8); |
139 | assert_eq!(num_completed, 8); |
140 | } |
141 | |
142 | #[test] |
143 | fn runtime_gone() { |
144 | let mut set = JoinSet::new(); |
145 | { |
146 | let rt = rt(); |
147 | set.spawn_on(async { 1 }, rt.handle()); |
148 | drop(rt); |
149 | } |
150 | |
151 | assert!(rt() |
152 | .block_on(set.join_next()) |
153 | .unwrap() |
154 | .unwrap_err() |
155 | .is_cancelled()); |
156 | } |
157 | |
158 | #[tokio::test (start_paused = true)] |
159 | async fn abort_all() { |
160 | let mut set: JoinSet<()> = JoinSet::new(); |
161 | |
162 | for _ in 0..5 { |
163 | set.spawn(futures::future::pending()); |
164 | } |
165 | for _ in 0..5 { |
166 | set.spawn(async { |
167 | tokio::time::sleep(Duration::from_secs(1)).await; |
168 | }); |
169 | } |
170 | |
171 | // The join set will now have 5 pending tasks and 5 ready tasks. |
172 | tokio::time::sleep(Duration::from_secs(2)).await; |
173 | |
174 | set.abort_all(); |
175 | assert_eq!(set.len(), 10); |
176 | |
177 | let mut count = 0; |
178 | while let Some(res) = set.join_next().await { |
179 | if let Err(err) = res { |
180 | assert!(err.is_cancelled()); |
181 | } |
182 | count += 1; |
183 | } |
184 | assert_eq!(count, 10); |
185 | assert_eq!(set.len(), 0); |
186 | } |
187 | |
188 | // This ensures that `join_next` works correctly when the coop budget is |
189 | // exhausted. |
190 | #[tokio::test (flavor = "current_thread" )] |
191 | async fn join_set_coop() { |
192 | // Large enough to trigger coop. |
193 | const TASK_NUM: u32 = 1000; |
194 | |
195 | static SEM: tokio::sync::Semaphore = tokio::sync::Semaphore::const_new(0); |
196 | |
197 | let mut set = JoinSet::new(); |
198 | |
199 | for _ in 0..TASK_NUM { |
200 | set.spawn(async { |
201 | SEM.add_permits(1); |
202 | }); |
203 | } |
204 | |
205 | // Wait for all tasks to complete. |
206 | // |
207 | // Since this is a `current_thread` runtime, there's no race condition |
208 | // between the last permit being added and the task completing. |
209 | let _ = SEM.acquire_many(TASK_NUM).await.unwrap(); |
210 | |
211 | let mut count = 0; |
212 | let mut coop_count = 0; |
213 | loop { |
214 | match set.join_next().now_or_never() { |
215 | Some(Some(Ok(()))) => {} |
216 | Some(Some(Err(err))) => panic!("failed: {}" , err), |
217 | None => { |
218 | coop_count += 1; |
219 | tokio::task::yield_now().await; |
220 | continue; |
221 | } |
222 | Some(None) => break, |
223 | } |
224 | |
225 | count += 1; |
226 | } |
227 | assert!(coop_count >= 1); |
228 | assert_eq!(count, TASK_NUM); |
229 | } |
230 | |