1#![warn(rust_2018_idioms)]
2#![cfg(feature = "full")]
3
4use futures::future::FutureExt;
5use tokio::sync::oneshot;
6use tokio::task::JoinSet;
7use tokio::time::Duration;
8
9fn rt() -> tokio::runtime::Runtime {
10 tokio::runtime::Builder::new_current_thread()
11 .build()
12 .unwrap()
13}
14
15#[tokio::test(start_paused = true)]
16async fn test_with_sleep() {
17 let mut set = JoinSet::new();
18
19 for i in 0..10 {
20 set.spawn(async move { i });
21 assert_eq!(set.len(), 1 + i);
22 }
23 set.detach_all();
24 assert_eq!(set.len(), 0);
25
26 assert!(matches!(set.join_next().await, None));
27
28 for i in 0..10 {
29 set.spawn(async move {
30 tokio::time::sleep(Duration::from_secs(i as u64)).await;
31 i
32 });
33 assert_eq!(set.len(), 1 + i);
34 }
35
36 let mut seen = [false; 10];
37 while let Some(res) = set.join_next().await.transpose().unwrap() {
38 seen[res] = true;
39 }
40
41 for was_seen in &seen {
42 assert!(was_seen);
43 }
44 assert!(matches!(set.join_next().await, None));
45
46 // Do it again.
47 for i in 0..10 {
48 set.spawn(async move {
49 tokio::time::sleep(Duration::from_secs(i as u64)).await;
50 i
51 });
52 }
53
54 let mut seen = [false; 10];
55 while let Some(res) = set.join_next().await.transpose().unwrap() {
56 seen[res] = true;
57 }
58
59 for was_seen in &seen {
60 assert!(was_seen);
61 }
62 assert!(matches!(set.join_next().await, None));
63}
64
65#[tokio::test]
66async fn test_abort_on_drop() {
67 let mut set = JoinSet::new();
68
69 let mut recvs = Vec::new();
70
71 for _ in 0..16 {
72 let (send, recv) = oneshot::channel::<()>();
73 recvs.push(recv);
74
75 set.spawn(async {
76 // This task will never complete on its own.
77 futures::future::pending::<()>().await;
78 drop(send);
79 });
80 }
81
82 drop(set);
83
84 for recv in recvs {
85 // The task is aborted soon and we will receive an error.
86 assert!(recv.await.is_err());
87 }
88}
89
90#[tokio::test]
91async fn alternating() {
92 let mut set = JoinSet::new();
93
94 assert_eq!(set.len(), 0);
95 set.spawn(async {});
96 assert_eq!(set.len(), 1);
97 set.spawn(async {});
98 assert_eq!(set.len(), 2);
99
100 for _ in 0..16 {
101 let () = set.join_next().await.unwrap().unwrap();
102 assert_eq!(set.len(), 1);
103 set.spawn(async {});
104 assert_eq!(set.len(), 2);
105 }
106}
107
108#[tokio::test(start_paused = true)]
109async fn abort_tasks() {
110 let mut set = JoinSet::new();
111 let mut num_canceled = 0;
112 let mut num_completed = 0;
113 for i in 0..16 {
114 let abort = set.spawn(async move {
115 tokio::time::sleep(Duration::from_secs(i as u64)).await;
116 i
117 });
118
119 if i % 2 != 0 {
120 // abort odd-numbered tasks.
121 abort.abort();
122 }
123 }
124 loop {
125 match set.join_next().await {
126 Some(Ok(res)) => {
127 num_completed += 1;
128 assert_eq!(res % 2, 0);
129 }
130 Some(Err(e)) => {
131 assert!(e.is_cancelled());
132 num_canceled += 1;
133 }
134 None => break,
135 }
136 }
137
138 assert_eq!(num_canceled, 8);
139 assert_eq!(num_completed, 8);
140}
141
142#[test]
143fn runtime_gone() {
144 let mut set = JoinSet::new();
145 {
146 let rt = rt();
147 set.spawn_on(async { 1 }, rt.handle());
148 drop(rt);
149 }
150
151 assert!(rt()
152 .block_on(set.join_next())
153 .unwrap()
154 .unwrap_err()
155 .is_cancelled());
156}
157
158#[tokio::test(start_paused = true)]
159async fn abort_all() {
160 let mut set: JoinSet<()> = JoinSet::new();
161
162 for _ in 0..5 {
163 set.spawn(futures::future::pending());
164 }
165 for _ in 0..5 {
166 set.spawn(async {
167 tokio::time::sleep(Duration::from_secs(1)).await;
168 });
169 }
170
171 // The join set will now have 5 pending tasks and 5 ready tasks.
172 tokio::time::sleep(Duration::from_secs(2)).await;
173
174 set.abort_all();
175 assert_eq!(set.len(), 10);
176
177 let mut count = 0;
178 while let Some(res) = set.join_next().await {
179 if let Err(err) = res {
180 assert!(err.is_cancelled());
181 }
182 count += 1;
183 }
184 assert_eq!(count, 10);
185 assert_eq!(set.len(), 0);
186}
187
188// This ensures that `join_next` works correctly when the coop budget is
189// exhausted.
190#[tokio::test(flavor = "current_thread")]
191async fn join_set_coop() {
192 // Large enough to trigger coop.
193 const TASK_NUM: u32 = 1000;
194
195 static SEM: tokio::sync::Semaphore = tokio::sync::Semaphore::const_new(0);
196
197 let mut set = JoinSet::new();
198
199 for _ in 0..TASK_NUM {
200 set.spawn(async {
201 SEM.add_permits(1);
202 });
203 }
204
205 // Wait for all tasks to complete.
206 //
207 // Since this is a `current_thread` runtime, there's no race condition
208 // between the last permit being added and the task completing.
209 let _ = SEM.acquire_many(TASK_NUM).await.unwrap();
210
211 let mut count = 0;
212 let mut coop_count = 0;
213 loop {
214 match set.join_next().now_or_never() {
215 Some(Some(Ok(()))) => {}
216 Some(Some(Err(err))) => panic!("failed: {}", err),
217 None => {
218 coop_count += 1;
219 tokio::task::yield_now().await;
220 continue;
221 }
222 Some(None) => break,
223 }
224
225 count += 1;
226 }
227 assert!(coop_count >= 1);
228 assert_eq!(count, TASK_NUM);
229}
230