1 | //! A collection of tasks spawned on a Tokio runtime. |
2 | //! |
3 | //! This module provides the [`JoinSet`] type, a collection which stores a set |
4 | //! of spawned tasks and allows asynchronously awaiting the output of those |
5 | //! tasks as they complete. See the documentation for the [`JoinSet`] type for |
6 | //! details. |
7 | use std::future::Future; |
8 | use std::pin::Pin; |
9 | use std::task::{Context, Poll}; |
10 | use std::{fmt, panic}; |
11 | |
12 | use crate::runtime::Handle; |
13 | use crate::task::Id; |
14 | use crate::task::{unconstrained, AbortHandle, JoinError, JoinHandle, LocalSet}; |
15 | use crate::util::IdleNotifiedSet; |
16 | |
17 | /// A collection of tasks spawned on a Tokio runtime. |
18 | /// |
19 | /// A `JoinSet` can be used to await the completion of some or all of the tasks |
20 | /// in the set. The set is not ordered, and the tasks will be returned in the |
21 | /// order they complete. |
22 | /// |
23 | /// All of the tasks must have the same return type `T`. |
24 | /// |
25 | /// When the `JoinSet` is dropped, all tasks in the `JoinSet` are immediately aborted. |
26 | /// |
27 | /// # Examples |
28 | /// |
29 | /// Spawn multiple tasks and wait for them. |
30 | /// |
31 | /// ``` |
32 | /// use tokio::task::JoinSet; |
33 | /// |
34 | /// #[tokio::main] |
35 | /// async fn main() { |
36 | /// let mut set = JoinSet::new(); |
37 | /// |
38 | /// for i in 0..10 { |
39 | /// set.spawn(async move { i }); |
40 | /// } |
41 | /// |
42 | /// let mut seen = [false; 10]; |
43 | /// while let Some(res) = set.join_next().await { |
44 | /// let idx = res.unwrap(); |
45 | /// seen[idx] = true; |
46 | /// } |
47 | /// |
48 | /// for i in 0..10 { |
49 | /// assert!(seen[i]); |
50 | /// } |
51 | /// } |
52 | /// ``` |
53 | #[cfg_attr (docsrs, doc(cfg(feature = "rt" )))] |
54 | pub struct JoinSet<T> { |
55 | inner: IdleNotifiedSet<JoinHandle<T>>, |
56 | } |
57 | |
58 | /// A variant of [`task::Builder`] that spawns tasks on a [`JoinSet`] rather |
59 | /// than on the current default runtime. |
60 | /// |
61 | /// [`task::Builder`]: crate::task::Builder |
62 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
63 | #[cfg_attr (docsrs, doc(cfg(all(tokio_unstable, feature = "tracing" ))))] |
64 | #[must_use = "builders do nothing unless used to spawn a task" ] |
65 | pub struct Builder<'a, T> { |
66 | joinset: &'a mut JoinSet<T>, |
67 | builder: super::Builder<'a>, |
68 | } |
69 | |
70 | impl<T> JoinSet<T> { |
71 | /// Create a new `JoinSet`. |
72 | pub fn new() -> Self { |
73 | Self { |
74 | inner: IdleNotifiedSet::new(), |
75 | } |
76 | } |
77 | |
78 | /// Returns the number of tasks currently in the `JoinSet`. |
79 | pub fn len(&self) -> usize { |
80 | self.inner.len() |
81 | } |
82 | |
83 | /// Returns whether the `JoinSet` is empty. |
84 | pub fn is_empty(&self) -> bool { |
85 | self.inner.is_empty() |
86 | } |
87 | } |
88 | |
89 | impl<T: 'static> JoinSet<T> { |
90 | /// Returns a [`Builder`] that can be used to configure a task prior to |
91 | /// spawning it on this `JoinSet`. |
92 | /// |
93 | /// # Examples |
94 | /// |
95 | /// ``` |
96 | /// use tokio::task::JoinSet; |
97 | /// |
98 | /// #[tokio::main] |
99 | /// async fn main() -> std::io::Result<()> { |
100 | /// let mut set = JoinSet::new(); |
101 | /// |
102 | /// // Use the builder to configure a task's name before spawning it. |
103 | /// set.build_task() |
104 | /// .name("my_task") |
105 | /// .spawn(async { /* ... */ })?; |
106 | /// |
107 | /// Ok(()) |
108 | /// } |
109 | /// ``` |
110 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
111 | #[cfg_attr (docsrs, doc(cfg(all(tokio_unstable, feature = "tracing" ))))] |
112 | pub fn build_task(&mut self) -> Builder<'_, T> { |
113 | Builder { |
114 | builder: super::Builder::new(), |
115 | joinset: self, |
116 | } |
117 | } |
118 | |
119 | /// Spawn the provided task on the `JoinSet`, returning an [`AbortHandle`] |
120 | /// that can be used to remotely cancel the task. |
121 | /// |
122 | /// The provided future will start running in the background immediately |
123 | /// when this method is called, even if you don't await anything on this |
124 | /// `JoinSet`. |
125 | /// |
126 | /// # Panics |
127 | /// |
128 | /// This method panics if called outside of a Tokio runtime. |
129 | /// |
130 | /// [`AbortHandle`]: crate::task::AbortHandle |
131 | #[track_caller ] |
132 | pub fn spawn<F>(&mut self, task: F) -> AbortHandle |
133 | where |
134 | F: Future<Output = T>, |
135 | F: Send + 'static, |
136 | T: Send, |
137 | { |
138 | self.insert(crate::spawn(task)) |
139 | } |
140 | |
141 | /// Spawn the provided task on the provided runtime and store it in this |
142 | /// `JoinSet` returning an [`AbortHandle`] that can be used to remotely |
143 | /// cancel the task. |
144 | /// |
145 | /// The provided future will start running in the background immediately |
146 | /// when this method is called, even if you don't await anything on this |
147 | /// `JoinSet`. |
148 | /// |
149 | /// [`AbortHandle`]: crate::task::AbortHandle |
150 | #[track_caller ] |
151 | pub fn spawn_on<F>(&mut self, task: F, handle: &Handle) -> AbortHandle |
152 | where |
153 | F: Future<Output = T>, |
154 | F: Send + 'static, |
155 | T: Send, |
156 | { |
157 | self.insert(handle.spawn(task)) |
158 | } |
159 | |
160 | /// Spawn the provided task on the current [`LocalSet`] and store it in this |
161 | /// `JoinSet`, returning an [`AbortHandle`] that can be used to remotely |
162 | /// cancel the task. |
163 | /// |
164 | /// The provided future will start running in the background immediately |
165 | /// when this method is called, even if you don't await anything on this |
166 | /// `JoinSet`. |
167 | /// |
168 | /// # Panics |
169 | /// |
170 | /// This method panics if it is called outside of a `LocalSet`. |
171 | /// |
172 | /// [`LocalSet`]: crate::task::LocalSet |
173 | /// [`AbortHandle`]: crate::task::AbortHandle |
174 | #[track_caller ] |
175 | pub fn spawn_local<F>(&mut self, task: F) -> AbortHandle |
176 | where |
177 | F: Future<Output = T>, |
178 | F: 'static, |
179 | { |
180 | self.insert(crate::task::spawn_local(task)) |
181 | } |
182 | |
183 | /// Spawn the provided task on the provided [`LocalSet`] and store it in |
184 | /// this `JoinSet`, returning an [`AbortHandle`] that can be used to |
185 | /// remotely cancel the task. |
186 | /// |
187 | /// Unlike the [`spawn_local`] method, this method may be used to spawn local |
188 | /// tasks on a `LocalSet` that is _not_ currently running. The provided |
189 | /// future will start running whenever the `LocalSet` is next started. |
190 | /// |
191 | /// [`LocalSet`]: crate::task::LocalSet |
192 | /// [`AbortHandle`]: crate::task::AbortHandle |
193 | /// [`spawn_local`]: Self::spawn_local |
194 | #[track_caller ] |
195 | pub fn spawn_local_on<F>(&mut self, task: F, local_set: &LocalSet) -> AbortHandle |
196 | where |
197 | F: Future<Output = T>, |
198 | F: 'static, |
199 | { |
200 | self.insert(local_set.spawn_local(task)) |
201 | } |
202 | |
203 | /// Spawn the blocking code on the blocking threadpool and store |
204 | /// it in this `JoinSet`, returning an [`AbortHandle`] that can be |
205 | /// used to remotely cancel the task. |
206 | /// |
207 | /// # Examples |
208 | /// |
209 | /// Spawn multiple blocking tasks and wait for them. |
210 | /// |
211 | /// ``` |
212 | /// use tokio::task::JoinSet; |
213 | /// |
214 | /// #[tokio::main] |
215 | /// async fn main() { |
216 | /// let mut set = JoinSet::new(); |
217 | /// |
218 | /// for i in 0..10 { |
219 | /// set.spawn_blocking(move || { i }); |
220 | /// } |
221 | /// |
222 | /// let mut seen = [false; 10]; |
223 | /// while let Some(res) = set.join_next().await { |
224 | /// let idx = res.unwrap(); |
225 | /// seen[idx] = true; |
226 | /// } |
227 | /// |
228 | /// for i in 0..10 { |
229 | /// assert!(seen[i]); |
230 | /// } |
231 | /// } |
232 | /// ``` |
233 | /// |
234 | /// # Panics |
235 | /// |
236 | /// This method panics if called outside of a Tokio runtime. |
237 | /// |
238 | /// [`AbortHandle`]: crate::task::AbortHandle |
239 | #[track_caller ] |
240 | pub fn spawn_blocking<F>(&mut self, f: F) -> AbortHandle |
241 | where |
242 | F: FnOnce() -> T, |
243 | F: Send + 'static, |
244 | T: Send, |
245 | { |
246 | self.insert(crate::runtime::spawn_blocking(f)) |
247 | } |
248 | |
249 | /// Spawn the blocking code on the blocking threadpool of the |
250 | /// provided runtime and store it in this `JoinSet`, returning an |
251 | /// [`AbortHandle`] that can be used to remotely cancel the task. |
252 | /// |
253 | /// [`AbortHandle`]: crate::task::AbortHandle |
254 | #[track_caller ] |
255 | pub fn spawn_blocking_on<F>(&mut self, f: F, handle: &Handle) -> AbortHandle |
256 | where |
257 | F: FnOnce() -> T, |
258 | F: Send + 'static, |
259 | T: Send, |
260 | { |
261 | self.insert(handle.spawn_blocking(f)) |
262 | } |
263 | |
264 | fn insert(&mut self, jh: JoinHandle<T>) -> AbortHandle { |
265 | let abort = jh.abort_handle(); |
266 | let mut entry = self.inner.insert_idle(jh); |
267 | |
268 | // Set the waker that is notified when the task completes. |
269 | entry.with_value_and_context(|jh, ctx| jh.set_join_waker(ctx.waker())); |
270 | abort |
271 | } |
272 | |
273 | /// Waits until one of the tasks in the set completes and returns its output. |
274 | /// |
275 | /// Returns `None` if the set is empty. |
276 | /// |
277 | /// # Cancel Safety |
278 | /// |
279 | /// This method is cancel safe. If `join_next` is used as the event in a `tokio::select!` |
280 | /// statement and some other branch completes first, it is guaranteed that no tasks were |
281 | /// removed from this `JoinSet`. |
282 | pub async fn join_next(&mut self) -> Option<Result<T, JoinError>> { |
283 | std::future::poll_fn(|cx| self.poll_join_next(cx)).await |
284 | } |
285 | |
286 | /// Waits until one of the tasks in the set completes and returns its |
287 | /// output, along with the [task ID] of the completed task. |
288 | /// |
289 | /// Returns `None` if the set is empty. |
290 | /// |
291 | /// When this method returns an error, then the id of the task that failed can be accessed |
292 | /// using the [`JoinError::id`] method. |
293 | /// |
294 | /// # Cancel Safety |
295 | /// |
296 | /// This method is cancel safe. If `join_next_with_id` is used as the event in a `tokio::select!` |
297 | /// statement and some other branch completes first, it is guaranteed that no tasks were |
298 | /// removed from this `JoinSet`. |
299 | /// |
300 | /// [task ID]: crate::task::Id |
301 | /// [`JoinError::id`]: fn@crate::task::JoinError::id |
302 | pub async fn join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> { |
303 | std::future::poll_fn(|cx| self.poll_join_next_with_id(cx)).await |
304 | } |
305 | |
306 | /// Tries to join one of the tasks in the set that has completed and return its output. |
307 | /// |
308 | /// Returns `None` if there are no completed tasks, or if the set is empty. |
309 | pub fn try_join_next(&mut self) -> Option<Result<T, JoinError>> { |
310 | // Loop over all notified `JoinHandle`s to find one that's ready, or until none are left. |
311 | loop { |
312 | let mut entry = self.inner.try_pop_notified()?; |
313 | |
314 | let res = entry.with_value_and_context(|jh, ctx| { |
315 | // Since this function is not async and cannot be forced to yield, we should |
316 | // disable budgeting when we want to check for the `JoinHandle` readiness. |
317 | Pin::new(&mut unconstrained(jh)).poll(ctx) |
318 | }); |
319 | |
320 | if let Poll::Ready(res) = res { |
321 | let _entry = entry.remove(); |
322 | |
323 | return Some(res); |
324 | } |
325 | } |
326 | } |
327 | |
328 | /// Tries to join one of the tasks in the set that has completed and return its output, |
329 | /// along with the [task ID] of the completed task. |
330 | /// |
331 | /// Returns `None` if there are no completed tasks, or if the set is empty. |
332 | /// |
333 | /// When this method returns an error, then the id of the task that failed can be accessed |
334 | /// using the [`JoinError::id`] method. |
335 | /// |
336 | /// [task ID]: crate::task::Id |
337 | /// [`JoinError::id`]: fn@crate::task::JoinError::id |
338 | pub fn try_join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> { |
339 | // Loop over all notified `JoinHandle`s to find one that's ready, or until none are left. |
340 | loop { |
341 | let mut entry = self.inner.try_pop_notified()?; |
342 | |
343 | let res = entry.with_value_and_context(|jh, ctx| { |
344 | // Since this function is not async and cannot be forced to yield, we should |
345 | // disable budgeting when we want to check for the `JoinHandle` readiness. |
346 | Pin::new(&mut unconstrained(jh)).poll(ctx) |
347 | }); |
348 | |
349 | if let Poll::Ready(res) = res { |
350 | let entry = entry.remove(); |
351 | |
352 | return Some(res.map(|output| (entry.id(), output))); |
353 | } |
354 | } |
355 | } |
356 | |
357 | /// Aborts all tasks and waits for them to finish shutting down. |
358 | /// |
359 | /// Calling this method is equivalent to calling [`abort_all`] and then calling [`join_next`] in |
360 | /// a loop until it returns `None`. |
361 | /// |
362 | /// This method ignores any panics in the tasks shutting down. When this call returns, the |
363 | /// `JoinSet` will be empty. |
364 | /// |
365 | /// [`abort_all`]: fn@Self::abort_all |
366 | /// [`join_next`]: fn@Self::join_next |
367 | pub async fn shutdown(&mut self) { |
368 | self.abort_all(); |
369 | while self.join_next().await.is_some() {} |
370 | } |
371 | |
372 | /// Awaits the completion of all tasks in this `JoinSet`, returning a vector of their results. |
373 | /// |
374 | /// The results will be stored in the order they completed not the order they were spawned. |
375 | /// This is a convenience method that is equivalent to calling [`join_next`] in |
376 | /// a loop. If any tasks on the `JoinSet` fail with an [`JoinError`], then this call |
377 | /// to `join_all` will panic and all remaining tasks on the `JoinSet` are |
378 | /// cancelled. To handle errors in any other way, manually call [`join_next`] |
379 | /// in a loop. |
380 | /// |
381 | /// # Examples |
382 | /// |
383 | /// Spawn multiple tasks and `join_all` them. |
384 | /// |
385 | /// ``` |
386 | /// use tokio::task::JoinSet; |
387 | /// use std::time::Duration; |
388 | /// |
389 | /// #[tokio::main] |
390 | /// async fn main() { |
391 | /// let mut set = JoinSet::new(); |
392 | /// |
393 | /// for i in 0..3 { |
394 | /// set.spawn(async move { |
395 | /// tokio::time::sleep(Duration::from_secs(3 - i)).await; |
396 | /// i |
397 | /// }); |
398 | /// } |
399 | /// |
400 | /// let output = set.join_all().await; |
401 | /// assert_eq!(output, vec![2, 1, 0]); |
402 | /// } |
403 | /// ``` |
404 | /// |
405 | /// Equivalent implementation of `join_all`, using [`join_next`] and loop. |
406 | /// |
407 | /// ``` |
408 | /// use tokio::task::JoinSet; |
409 | /// use std::panic; |
410 | /// |
411 | /// #[tokio::main] |
412 | /// async fn main() { |
413 | /// let mut set = JoinSet::new(); |
414 | /// |
415 | /// for i in 0..3 { |
416 | /// set.spawn(async move {i}); |
417 | /// } |
418 | /// |
419 | /// let mut output = Vec::new(); |
420 | /// while let Some(res) = set.join_next().await{ |
421 | /// match res { |
422 | /// Ok(t) => output.push(t), |
423 | /// Err(err) if err.is_panic() => panic::resume_unwind(err.into_panic()), |
424 | /// Err(err) => panic!("{err}" ), |
425 | /// } |
426 | /// } |
427 | /// assert_eq!(output.len(),3); |
428 | /// } |
429 | /// ``` |
430 | /// [`join_next`]: fn@Self::join_next |
431 | /// [`JoinError::id`]: fn@crate::task::JoinError::id |
432 | pub async fn join_all(mut self) -> Vec<T> { |
433 | let mut output = Vec::with_capacity(self.len()); |
434 | |
435 | while let Some(res) = self.join_next().await { |
436 | match res { |
437 | Ok(t) => output.push(t), |
438 | Err(err) if err.is_panic() => panic::resume_unwind(err.into_panic()), |
439 | Err(err) => panic!(" {err}" ), |
440 | } |
441 | } |
442 | output |
443 | } |
444 | |
445 | /// Aborts all tasks on this `JoinSet`. |
446 | /// |
447 | /// This does not remove the tasks from the `JoinSet`. To wait for the tasks to complete |
448 | /// cancellation, you should call `join_next` in a loop until the `JoinSet` is empty. |
449 | pub fn abort_all(&mut self) { |
450 | self.inner.for_each(|jh| jh.abort()); |
451 | } |
452 | |
453 | /// Removes all tasks from this `JoinSet` without aborting them. |
454 | /// |
455 | /// The tasks removed by this call will continue to run in the background even if the `JoinSet` |
456 | /// is dropped. |
457 | pub fn detach_all(&mut self) { |
458 | self.inner.drain(drop); |
459 | } |
460 | |
461 | /// Polls for one of the tasks in the set to complete. |
462 | /// |
463 | /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set. |
464 | /// |
465 | /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled |
466 | /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to |
467 | /// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is |
468 | /// scheduled to receive a wakeup. |
469 | /// |
470 | /// # Returns |
471 | /// |
472 | /// This function returns: |
473 | /// |
474 | /// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is |
475 | /// available right now. |
476 | /// * `Poll::Ready(Some(Ok(value)))` if one of the tasks in this `JoinSet` has completed. |
477 | /// The `value` is the return value of one of the tasks that completed. |
478 | /// * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been |
479 | /// aborted. The `err` is the `JoinError` from the panicked/aborted task. |
480 | /// * `Poll::Ready(None)` if the `JoinSet` is empty. |
481 | /// |
482 | /// Note that this method may return `Poll::Pending` even if one of the tasks has completed. |
483 | /// This can happen if the [coop budget] is reached. |
484 | /// |
485 | /// [coop budget]: crate::task::coop#cooperative-scheduling |
486 | pub fn poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<T, JoinError>>> { |
487 | // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to |
488 | // the `notified` list if the waker is notified in the `poll` call below. |
489 | let mut entry = match self.inner.pop_notified(cx.waker()) { |
490 | Some(entry) => entry, |
491 | None => { |
492 | if self.is_empty() { |
493 | return Poll::Ready(None); |
494 | } else { |
495 | // The waker was set by `pop_notified`. |
496 | return Poll::Pending; |
497 | } |
498 | } |
499 | }; |
500 | |
501 | let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx)); |
502 | |
503 | if let Poll::Ready(res) = res { |
504 | let _entry = entry.remove(); |
505 | Poll::Ready(Some(res)) |
506 | } else { |
507 | // A JoinHandle generally won't emit a wakeup without being ready unless |
508 | // the coop limit has been reached. We yield to the executor in this |
509 | // case. |
510 | cx.waker().wake_by_ref(); |
511 | Poll::Pending |
512 | } |
513 | } |
514 | |
515 | /// Polls for one of the tasks in the set to complete. |
516 | /// |
517 | /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set. |
518 | /// |
519 | /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled |
520 | /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to |
521 | /// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is |
522 | /// scheduled to receive a wakeup. |
523 | /// |
524 | /// # Returns |
525 | /// |
526 | /// This function returns: |
527 | /// |
528 | /// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is |
529 | /// available right now. |
530 | /// * `Poll::Ready(Some(Ok((id, value))))` if one of the tasks in this `JoinSet` has completed. |
531 | /// The `value` is the return value of one of the tasks that completed, and |
532 | /// `id` is the [task ID] of that task. |
533 | /// * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been |
534 | /// aborted. The `err` is the `JoinError` from the panicked/aborted task. |
535 | /// * `Poll::Ready(None)` if the `JoinSet` is empty. |
536 | /// |
537 | /// Note that this method may return `Poll::Pending` even if one of the tasks has completed. |
538 | /// This can happen if the [coop budget] is reached. |
539 | /// |
540 | /// [coop budget]: crate::task::coop#cooperative-scheduling |
541 | /// [task ID]: crate::task::Id |
542 | pub fn poll_join_next_with_id( |
543 | &mut self, |
544 | cx: &mut Context<'_>, |
545 | ) -> Poll<Option<Result<(Id, T), JoinError>>> { |
546 | // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to |
547 | // the `notified` list if the waker is notified in the `poll` call below. |
548 | let mut entry = match self.inner.pop_notified(cx.waker()) { |
549 | Some(entry) => entry, |
550 | None => { |
551 | if self.is_empty() { |
552 | return Poll::Ready(None); |
553 | } else { |
554 | // The waker was set by `pop_notified`. |
555 | return Poll::Pending; |
556 | } |
557 | } |
558 | }; |
559 | |
560 | let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx)); |
561 | |
562 | if let Poll::Ready(res) = res { |
563 | let entry = entry.remove(); |
564 | // If the task succeeded, add the task ID to the output. Otherwise, the |
565 | // `JoinError` will already have the task's ID. |
566 | Poll::Ready(Some(res.map(|output| (entry.id(), output)))) |
567 | } else { |
568 | // A JoinHandle generally won't emit a wakeup without being ready unless |
569 | // the coop limit has been reached. We yield to the executor in this |
570 | // case. |
571 | cx.waker().wake_by_ref(); |
572 | Poll::Pending |
573 | } |
574 | } |
575 | } |
576 | |
577 | impl<T> Drop for JoinSet<T> { |
578 | fn drop(&mut self) { |
579 | self.inner.drain(|join_handle: JoinHandle| join_handle.abort()); |
580 | } |
581 | } |
582 | |
583 | impl<T> fmt::Debug for JoinSet<T> { |
584 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
585 | f.debug_struct("JoinSet" ).field(name:"len" , &self.len()).finish() |
586 | } |
587 | } |
588 | |
589 | impl<T> Default for JoinSet<T> { |
590 | fn default() -> Self { |
591 | Self::new() |
592 | } |
593 | } |
594 | |
595 | /// Collect an iterator of futures into a [`JoinSet`]. |
596 | /// |
597 | /// This is equivalent to calling [`JoinSet::spawn`] on each element of the iterator. |
598 | /// |
599 | /// # Examples |
600 | /// |
601 | /// The main example from [`JoinSet`]'s documentation can also be written using [`collect`]: |
602 | /// |
603 | /// ``` |
604 | /// use tokio::task::JoinSet; |
605 | /// |
606 | /// #[tokio::main] |
607 | /// async fn main() { |
608 | /// let mut set: JoinSet<_> = (0..10).map(|i| async move { i }).collect(); |
609 | /// |
610 | /// let mut seen = [false; 10]; |
611 | /// while let Some(res) = set.join_next().await { |
612 | /// let idx = res.unwrap(); |
613 | /// seen[idx] = true; |
614 | /// } |
615 | /// |
616 | /// for i in 0..10 { |
617 | /// assert!(seen[i]); |
618 | /// } |
619 | /// } |
620 | /// ``` |
621 | /// |
622 | /// [`collect`]: std::iter::Iterator::collect |
623 | impl<T, F> std::iter::FromIterator<F> for JoinSet<T> |
624 | where |
625 | F: Future<Output = T>, |
626 | F: Send + 'static, |
627 | T: Send + 'static, |
628 | { |
629 | fn from_iter<I: IntoIterator<Item = F>>(iter: I) -> Self { |
630 | let mut set: JoinSet = Self::new(); |
631 | iter.into_iter().for_each(|task: F| { |
632 | set.spawn(task); |
633 | }); |
634 | set |
635 | } |
636 | } |
637 | |
638 | // === impl Builder === |
639 | |
640 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
641 | #[cfg_attr (docsrs, doc(cfg(all(tokio_unstable, feature = "tracing" ))))] |
642 | impl<'a, T: 'static> Builder<'a, T> { |
643 | /// Assigns a name to the task which will be spawned. |
644 | pub fn name(self, name: &'a str) -> Self { |
645 | let builder = self.builder.name(name); |
646 | Self { builder, ..self } |
647 | } |
648 | |
649 | /// Spawn the provided task with this builder's settings and store it in the |
650 | /// [`JoinSet`], returning an [`AbortHandle`] that can be used to remotely |
651 | /// cancel the task. |
652 | /// |
653 | /// # Returns |
654 | /// |
655 | /// An [`AbortHandle`] that can be used to remotely cancel the task. |
656 | /// |
657 | /// # Panics |
658 | /// |
659 | /// This method panics if called outside of a Tokio runtime. |
660 | /// |
661 | /// [`AbortHandle`]: crate::task::AbortHandle |
662 | #[track_caller ] |
663 | pub fn spawn<F>(self, future: F) -> std::io::Result<AbortHandle> |
664 | where |
665 | F: Future<Output = T>, |
666 | F: Send + 'static, |
667 | T: Send, |
668 | { |
669 | Ok(self.joinset.insert(self.builder.spawn(future)?)) |
670 | } |
671 | |
672 | /// Spawn the provided task on the provided [runtime handle] with this |
673 | /// builder's settings, and store it in the [`JoinSet`]. |
674 | /// |
675 | /// # Returns |
676 | /// |
677 | /// An [`AbortHandle`] that can be used to remotely cancel the task. |
678 | /// |
679 | /// |
680 | /// [`AbortHandle`]: crate::task::AbortHandle |
681 | /// [runtime handle]: crate::runtime::Handle |
682 | #[track_caller ] |
683 | pub fn spawn_on<F>(self, future: F, handle: &Handle) -> std::io::Result<AbortHandle> |
684 | where |
685 | F: Future<Output = T>, |
686 | F: Send + 'static, |
687 | T: Send, |
688 | { |
689 | Ok(self.joinset.insert(self.builder.spawn_on(future, handle)?)) |
690 | } |
691 | |
692 | /// Spawn the blocking code on the blocking threadpool with this builder's |
693 | /// settings, and store it in the [`JoinSet`]. |
694 | /// |
695 | /// # Returns |
696 | /// |
697 | /// An [`AbortHandle`] that can be used to remotely cancel the task. |
698 | /// |
699 | /// # Panics |
700 | /// |
701 | /// This method panics if called outside of a Tokio runtime. |
702 | /// |
703 | /// [`JoinSet`]: crate::task::JoinSet |
704 | /// [`AbortHandle`]: crate::task::AbortHandle |
705 | #[track_caller ] |
706 | pub fn spawn_blocking<F>(self, f: F) -> std::io::Result<AbortHandle> |
707 | where |
708 | F: FnOnce() -> T, |
709 | F: Send + 'static, |
710 | T: Send, |
711 | { |
712 | Ok(self.joinset.insert(self.builder.spawn_blocking(f)?)) |
713 | } |
714 | |
715 | /// Spawn the blocking code on the blocking threadpool of the provided |
716 | /// runtime handle with this builder's settings, and store it in the |
717 | /// [`JoinSet`]. |
718 | /// |
719 | /// # Returns |
720 | /// |
721 | /// An [`AbortHandle`] that can be used to remotely cancel the task. |
722 | /// |
723 | /// [`JoinSet`]: crate::task::JoinSet |
724 | /// [`AbortHandle`]: crate::task::AbortHandle |
725 | #[track_caller ] |
726 | pub fn spawn_blocking_on<F>(self, f: F, handle: &Handle) -> std::io::Result<AbortHandle> |
727 | where |
728 | F: FnOnce() -> T, |
729 | F: Send + 'static, |
730 | T: Send, |
731 | { |
732 | Ok(self |
733 | .joinset |
734 | .insert(self.builder.spawn_blocking_on(f, handle)?)) |
735 | } |
736 | |
737 | /// Spawn the provided task on the current [`LocalSet`] with this builder's |
738 | /// settings, and store it in the [`JoinSet`]. |
739 | /// |
740 | /// # Returns |
741 | /// |
742 | /// An [`AbortHandle`] that can be used to remotely cancel the task. |
743 | /// |
744 | /// # Panics |
745 | /// |
746 | /// This method panics if it is called outside of a `LocalSet`. |
747 | /// |
748 | /// [`LocalSet`]: crate::task::LocalSet |
749 | /// [`AbortHandle`]: crate::task::AbortHandle |
750 | #[track_caller ] |
751 | pub fn spawn_local<F>(self, future: F) -> std::io::Result<AbortHandle> |
752 | where |
753 | F: Future<Output = T>, |
754 | F: 'static, |
755 | { |
756 | Ok(self.joinset.insert(self.builder.spawn_local(future)?)) |
757 | } |
758 | |
759 | /// Spawn the provided task on the provided [`LocalSet`] with this builder's |
760 | /// settings, and store it in the [`JoinSet`]. |
761 | /// |
762 | /// # Returns |
763 | /// |
764 | /// An [`AbortHandle`] that can be used to remotely cancel the task. |
765 | /// |
766 | /// [`LocalSet`]: crate::task::LocalSet |
767 | /// [`AbortHandle`]: crate::task::AbortHandle |
768 | #[track_caller ] |
769 | pub fn spawn_local_on<F>(self, future: F, local_set: &LocalSet) -> std::io::Result<AbortHandle> |
770 | where |
771 | F: Future<Output = T>, |
772 | F: 'static, |
773 | { |
774 | Ok(self |
775 | .joinset |
776 | .insert(self.builder.spawn_local_on(future, local_set)?)) |
777 | } |
778 | } |
779 | |
780 | // Manual `Debug` impl so that `Builder` is `Debug` regardless of whether `T` is |
781 | // `Debug`. |
782 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
783 | #[cfg_attr (docsrs, doc(cfg(all(tokio_unstable, feature = "tracing" ))))] |
784 | impl<'a, T> fmt::Debug for Builder<'a, T> { |
785 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
786 | f.debug_struct("join_set::Builder" ) |
787 | .field("joinset" , &self.joinset) |
788 | .field("builder" , &self.builder) |
789 | .finish() |
790 | } |
791 | } |
792 | |