1 | //! A collection of tasks spawned on a Tokio runtime. |
2 | //! |
3 | //! This module provides the [`JoinSet`] type, a collection which stores a set |
4 | //! of spawned tasks and allows asynchronously awaiting the output of those |
5 | //! tasks as they complete. See the documentation for the [`JoinSet`] type for |
6 | //! details. |
7 | use std::fmt; |
8 | use std::future::Future; |
9 | use std::pin::Pin; |
10 | use std::task::{Context, Poll}; |
11 | |
12 | use crate::runtime::Handle; |
13 | #[cfg (tokio_unstable)] |
14 | use crate::task::Id; |
15 | use crate::task::{unconstrained, AbortHandle, JoinError, JoinHandle, LocalSet}; |
16 | use crate::util::IdleNotifiedSet; |
17 | |
18 | /// A collection of tasks spawned on a Tokio runtime. |
19 | /// |
20 | /// A `JoinSet` can be used to await the completion of some or all of the tasks |
21 | /// in the set. The set is not ordered, and the tasks will be returned in the |
22 | /// order they complete. |
23 | /// |
24 | /// All of the tasks must have the same return type `T`. |
25 | /// |
26 | /// When the `JoinSet` is dropped, all tasks in the `JoinSet` are immediately aborted. |
27 | /// |
28 | /// # Examples |
29 | /// |
30 | /// Spawn multiple tasks and wait for them. |
31 | /// |
32 | /// ``` |
33 | /// use tokio::task::JoinSet; |
34 | /// |
35 | /// #[tokio::main] |
36 | /// async fn main() { |
37 | /// let mut set = JoinSet::new(); |
38 | /// |
39 | /// for i in 0..10 { |
40 | /// set.spawn(async move { i }); |
41 | /// } |
42 | /// |
43 | /// let mut seen = [false; 10]; |
44 | /// while let Some(res) = set.join_next().await { |
45 | /// let idx = res.unwrap(); |
46 | /// seen[idx] = true; |
47 | /// } |
48 | /// |
49 | /// for i in 0..10 { |
50 | /// assert!(seen[i]); |
51 | /// } |
52 | /// } |
53 | /// ``` |
54 | #[cfg_attr (docsrs, doc(cfg(feature = "rt" )))] |
55 | pub struct JoinSet<T> { |
56 | inner: IdleNotifiedSet<JoinHandle<T>>, |
57 | } |
58 | |
59 | /// A variant of [`task::Builder`] that spawns tasks on a [`JoinSet`] rather |
60 | /// than on the current default runtime. |
61 | /// |
62 | /// [`task::Builder`]: crate::task::Builder |
63 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
64 | #[cfg_attr (docsrs, doc(cfg(all(tokio_unstable, feature = "tracing" ))))] |
65 | #[must_use = "builders do nothing unless used to spawn a task" ] |
66 | pub struct Builder<'a, T> { |
67 | joinset: &'a mut JoinSet<T>, |
68 | builder: super::Builder<'a>, |
69 | } |
70 | |
71 | impl<T> JoinSet<T> { |
72 | /// Create a new `JoinSet`. |
73 | pub fn new() -> Self { |
74 | Self { |
75 | inner: IdleNotifiedSet::new(), |
76 | } |
77 | } |
78 | |
79 | /// Returns the number of tasks currently in the `JoinSet`. |
80 | pub fn len(&self) -> usize { |
81 | self.inner.len() |
82 | } |
83 | |
84 | /// Returns whether the `JoinSet` is empty. |
85 | pub fn is_empty(&self) -> bool { |
86 | self.inner.is_empty() |
87 | } |
88 | } |
89 | |
90 | impl<T: 'static> JoinSet<T> { |
91 | /// Returns a [`Builder`] that can be used to configure a task prior to |
92 | /// spawning it on this `JoinSet`. |
93 | /// |
94 | /// # Examples |
95 | /// |
96 | /// ``` |
97 | /// use tokio::task::JoinSet; |
98 | /// |
99 | /// #[tokio::main] |
100 | /// async fn main() -> std::io::Result<()> { |
101 | /// let mut set = JoinSet::new(); |
102 | /// |
103 | /// // Use the builder to configure a task's name before spawning it. |
104 | /// set.build_task() |
105 | /// .name("my_task") |
106 | /// .spawn(async { /* ... */ })?; |
107 | /// |
108 | /// Ok(()) |
109 | /// } |
110 | /// ``` |
111 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
112 | #[cfg_attr (docsrs, doc(cfg(all(tokio_unstable, feature = "tracing" ))))] |
113 | pub fn build_task(&mut self) -> Builder<'_, T> { |
114 | Builder { |
115 | builder: super::Builder::new(), |
116 | joinset: self, |
117 | } |
118 | } |
119 | |
120 | /// Spawn the provided task on the `JoinSet`, returning an [`AbortHandle`] |
121 | /// that can be used to remotely cancel the task. |
122 | /// |
123 | /// The provided future will start running in the background immediately |
124 | /// when this method is called, even if you don't await anything on this |
125 | /// `JoinSet`. |
126 | /// |
127 | /// # Panics |
128 | /// |
129 | /// This method panics if called outside of a Tokio runtime. |
130 | /// |
131 | /// [`AbortHandle`]: crate::task::AbortHandle |
132 | #[track_caller ] |
133 | pub fn spawn<F>(&mut self, task: F) -> AbortHandle |
134 | where |
135 | F: Future<Output = T>, |
136 | F: Send + 'static, |
137 | T: Send, |
138 | { |
139 | self.insert(crate::spawn(task)) |
140 | } |
141 | |
142 | /// Spawn the provided task on the provided runtime and store it in this |
143 | /// `JoinSet` returning an [`AbortHandle`] that can be used to remotely |
144 | /// cancel the task. |
145 | /// |
146 | /// The provided future will start running in the background immediately |
147 | /// when this method is called, even if you don't await anything on this |
148 | /// `JoinSet`. |
149 | /// |
150 | /// [`AbortHandle`]: crate::task::AbortHandle |
151 | #[track_caller ] |
152 | pub fn spawn_on<F>(&mut self, task: F, handle: &Handle) -> AbortHandle |
153 | where |
154 | F: Future<Output = T>, |
155 | F: Send + 'static, |
156 | T: Send, |
157 | { |
158 | self.insert(handle.spawn(task)) |
159 | } |
160 | |
161 | /// Spawn the provided task on the current [`LocalSet`] and store it in this |
162 | /// `JoinSet`, returning an [`AbortHandle`] that can be used to remotely |
163 | /// cancel the task. |
164 | /// |
165 | /// The provided future will start running in the background immediately |
166 | /// when this method is called, even if you don't await anything on this |
167 | /// `JoinSet`. |
168 | /// |
169 | /// # Panics |
170 | /// |
171 | /// This method panics if it is called outside of a `LocalSet`. |
172 | /// |
173 | /// [`LocalSet`]: crate::task::LocalSet |
174 | /// [`AbortHandle`]: crate::task::AbortHandle |
175 | #[track_caller ] |
176 | pub fn spawn_local<F>(&mut self, task: F) -> AbortHandle |
177 | where |
178 | F: Future<Output = T>, |
179 | F: 'static, |
180 | { |
181 | self.insert(crate::task::spawn_local(task)) |
182 | } |
183 | |
184 | /// Spawn the provided task on the provided [`LocalSet`] and store it in |
185 | /// this `JoinSet`, returning an [`AbortHandle`] that can be used to |
186 | /// remotely cancel the task. |
187 | /// |
188 | /// Unlike the [`spawn_local`] method, this method may be used to spawn local |
189 | /// tasks on a `LocalSet` that is _not_ currently running. The provided |
190 | /// future will start running whenever the `LocalSet` is next started. |
191 | /// |
192 | /// [`LocalSet`]: crate::task::LocalSet |
193 | /// [`AbortHandle`]: crate::task::AbortHandle |
194 | /// [`spawn_local`]: Self::spawn_local |
195 | #[track_caller ] |
196 | pub fn spawn_local_on<F>(&mut self, task: F, local_set: &LocalSet) -> AbortHandle |
197 | where |
198 | F: Future<Output = T>, |
199 | F: 'static, |
200 | { |
201 | self.insert(local_set.spawn_local(task)) |
202 | } |
203 | |
204 | /// Spawn the blocking code on the blocking threadpool and store |
205 | /// it in this `JoinSet`, returning an [`AbortHandle`] that can be |
206 | /// used to remotely cancel the task. |
207 | /// |
208 | /// # Examples |
209 | /// |
210 | /// Spawn multiple blocking tasks and wait for them. |
211 | /// |
212 | /// ``` |
213 | /// use tokio::task::JoinSet; |
214 | /// |
215 | /// #[tokio::main] |
216 | /// async fn main() { |
217 | /// let mut set = JoinSet::new(); |
218 | /// |
219 | /// for i in 0..10 { |
220 | /// set.spawn_blocking(move || { i }); |
221 | /// } |
222 | /// |
223 | /// let mut seen = [false; 10]; |
224 | /// while let Some(res) = set.join_next().await { |
225 | /// let idx = res.unwrap(); |
226 | /// seen[idx] = true; |
227 | /// } |
228 | /// |
229 | /// for i in 0..10 { |
230 | /// assert!(seen[i]); |
231 | /// } |
232 | /// } |
233 | /// ``` |
234 | /// |
235 | /// # Panics |
236 | /// |
237 | /// This method panics if called outside of a Tokio runtime. |
238 | /// |
239 | /// [`AbortHandle`]: crate::task::AbortHandle |
240 | #[track_caller ] |
241 | pub fn spawn_blocking<F>(&mut self, f: F) -> AbortHandle |
242 | where |
243 | F: FnOnce() -> T, |
244 | F: Send + 'static, |
245 | T: Send, |
246 | { |
247 | self.insert(crate::runtime::spawn_blocking(f)) |
248 | } |
249 | |
250 | /// Spawn the blocking code on the blocking threadpool of the |
251 | /// provided runtime and store it in this `JoinSet`, returning an |
252 | /// [`AbortHandle`] that can be used to remotely cancel the task. |
253 | /// |
254 | /// [`AbortHandle`]: crate::task::AbortHandle |
255 | #[track_caller ] |
256 | pub fn spawn_blocking_on<F>(&mut self, f: F, handle: &Handle) -> AbortHandle |
257 | where |
258 | F: FnOnce() -> T, |
259 | F: Send + 'static, |
260 | T: Send, |
261 | { |
262 | self.insert(handle.spawn_blocking(f)) |
263 | } |
264 | |
265 | fn insert(&mut self, jh: JoinHandle<T>) -> AbortHandle { |
266 | let abort = jh.abort_handle(); |
267 | let mut entry = self.inner.insert_idle(jh); |
268 | |
269 | // Set the waker that is notified when the task completes. |
270 | entry.with_value_and_context(|jh, ctx| jh.set_join_waker(ctx.waker())); |
271 | abort |
272 | } |
273 | |
274 | /// Waits until one of the tasks in the set completes and returns its output. |
275 | /// |
276 | /// Returns `None` if the set is empty. |
277 | /// |
278 | /// # Cancel Safety |
279 | /// |
280 | /// This method is cancel safe. If `join_next` is used as the event in a `tokio::select!` |
281 | /// statement and some other branch completes first, it is guaranteed that no tasks were |
282 | /// removed from this `JoinSet`. |
283 | pub async fn join_next(&mut self) -> Option<Result<T, JoinError>> { |
284 | crate::future::poll_fn(|cx| self.poll_join_next(cx)).await |
285 | } |
286 | |
287 | /// Waits until one of the tasks in the set completes and returns its |
288 | /// output, along with the [task ID] of the completed task. |
289 | /// |
290 | /// Returns `None` if the set is empty. |
291 | /// |
292 | /// When this method returns an error, then the id of the task that failed can be accessed |
293 | /// using the [`JoinError::id`] method. |
294 | /// |
295 | /// # Cancel Safety |
296 | /// |
297 | /// This method is cancel safe. If `join_next_with_id` is used as the event in a `tokio::select!` |
298 | /// statement and some other branch completes first, it is guaranteed that no tasks were |
299 | /// removed from this `JoinSet`. |
300 | /// |
301 | /// [task ID]: crate::task::Id |
302 | /// [`JoinError::id`]: fn@crate::task::JoinError::id |
303 | #[cfg (tokio_unstable)] |
304 | #[cfg_attr (docsrs, doc(cfg(tokio_unstable)))] |
305 | pub async fn join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> { |
306 | crate::future::poll_fn(|cx| self.poll_join_next_with_id(cx)).await |
307 | } |
308 | |
309 | /// Tries to join one of the tasks in the set that has completed and return its output. |
310 | /// |
311 | /// Returns `None` if the set is empty. |
312 | pub fn try_join_next(&mut self) -> Option<Result<T, JoinError>> { |
313 | // Loop over all notified `JoinHandle`s to find one that's ready, or until none are left. |
314 | loop { |
315 | let mut entry = self.inner.try_pop_notified()?; |
316 | |
317 | let res = entry.with_value_and_context(|jh, ctx| { |
318 | // Since this function is not async and cannot be forced to yield, we should |
319 | // disable budgeting when we want to check for the `JoinHandle` readiness. |
320 | Pin::new(&mut unconstrained(jh)).poll(ctx) |
321 | }); |
322 | |
323 | if let Poll::Ready(res) = res { |
324 | let _entry = entry.remove(); |
325 | |
326 | return Some(res); |
327 | } |
328 | } |
329 | } |
330 | |
331 | /// Tries to join one of the tasks in the set that has completed and return its output, |
332 | /// along with the [task ID] of the completed task. |
333 | /// |
334 | /// Returns `None` if the set is empty. |
335 | /// |
336 | /// When this method returns an error, then the id of the task that failed can be accessed |
337 | /// using the [`JoinError::id`] method. |
338 | /// |
339 | /// [task ID]: crate::task::Id |
340 | /// [`JoinError::id`]: fn@crate::task::JoinError::id |
341 | #[cfg (tokio_unstable)] |
342 | #[cfg_attr (docsrs, doc(cfg(tokio_unstable)))] |
343 | pub fn try_join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> { |
344 | // Loop over all notified `JoinHandle`s to find one that's ready, or until none are left. |
345 | loop { |
346 | let mut entry = self.inner.try_pop_notified()?; |
347 | |
348 | let res = entry.with_value_and_context(|jh, ctx| { |
349 | // Since this function is not async and cannot be forced to yield, we should |
350 | // disable budgeting when we want to check for the `JoinHandle` readiness. |
351 | Pin::new(&mut unconstrained(jh)).poll(ctx) |
352 | }); |
353 | |
354 | if let Poll::Ready(res) = res { |
355 | let entry = entry.remove(); |
356 | |
357 | return Some(res.map(|output| (entry.id(), output))); |
358 | } |
359 | } |
360 | } |
361 | |
362 | /// Aborts all tasks and waits for them to finish shutting down. |
363 | /// |
364 | /// Calling this method is equivalent to calling [`abort_all`] and then calling [`join_next`] in |
365 | /// a loop until it returns `None`. |
366 | /// |
367 | /// This method ignores any panics in the tasks shutting down. When this call returns, the |
368 | /// `JoinSet` will be empty. |
369 | /// |
370 | /// [`abort_all`]: fn@Self::abort_all |
371 | /// [`join_next`]: fn@Self::join_next |
372 | pub async fn shutdown(&mut self) { |
373 | self.abort_all(); |
374 | while self.join_next().await.is_some() {} |
375 | } |
376 | |
377 | /// Aborts all tasks on this `JoinSet`. |
378 | /// |
379 | /// This does not remove the tasks from the `JoinSet`. To wait for the tasks to complete |
380 | /// cancellation, you should call `join_next` in a loop until the `JoinSet` is empty. |
381 | pub fn abort_all(&mut self) { |
382 | self.inner.for_each(|jh| jh.abort()); |
383 | } |
384 | |
385 | /// Removes all tasks from this `JoinSet` without aborting them. |
386 | /// |
387 | /// The tasks removed by this call will continue to run in the background even if the `JoinSet` |
388 | /// is dropped. |
389 | pub fn detach_all(&mut self) { |
390 | self.inner.drain(drop); |
391 | } |
392 | |
393 | /// Polls for one of the tasks in the set to complete. |
394 | /// |
395 | /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set. |
396 | /// |
397 | /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled |
398 | /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to |
399 | /// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is |
400 | /// scheduled to receive a wakeup. |
401 | /// |
402 | /// # Returns |
403 | /// |
404 | /// This function returns: |
405 | /// |
406 | /// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is |
407 | /// available right now. |
408 | /// * `Poll::Ready(Some(Ok(value)))` if one of the tasks in this `JoinSet` has completed. |
409 | /// The `value` is the return value of one of the tasks that completed. |
410 | /// * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been |
411 | /// aborted. The `err` is the `JoinError` from the panicked/aborted task. |
412 | /// * `Poll::Ready(None)` if the `JoinSet` is empty. |
413 | /// |
414 | /// Note that this method may return `Poll::Pending` even if one of the tasks has completed. |
415 | /// This can happen if the [coop budget] is reached. |
416 | /// |
417 | /// [coop budget]: crate::task#cooperative-scheduling |
418 | pub fn poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<T, JoinError>>> { |
419 | // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to |
420 | // the `notified` list if the waker is notified in the `poll` call below. |
421 | let mut entry = match self.inner.pop_notified(cx.waker()) { |
422 | Some(entry) => entry, |
423 | None => { |
424 | if self.is_empty() { |
425 | return Poll::Ready(None); |
426 | } else { |
427 | // The waker was set by `pop_notified`. |
428 | return Poll::Pending; |
429 | } |
430 | } |
431 | }; |
432 | |
433 | let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx)); |
434 | |
435 | if let Poll::Ready(res) = res { |
436 | let _entry = entry.remove(); |
437 | Poll::Ready(Some(res)) |
438 | } else { |
439 | // A JoinHandle generally won't emit a wakeup without being ready unless |
440 | // the coop limit has been reached. We yield to the executor in this |
441 | // case. |
442 | cx.waker().wake_by_ref(); |
443 | Poll::Pending |
444 | } |
445 | } |
446 | |
447 | /// Polls for one of the tasks in the set to complete. |
448 | /// |
449 | /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set. |
450 | /// |
451 | /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled |
452 | /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to |
453 | /// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is |
454 | /// scheduled to receive a wakeup. |
455 | /// |
456 | /// # Returns |
457 | /// |
458 | /// This function returns: |
459 | /// |
460 | /// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is |
461 | /// available right now. |
462 | /// * `Poll::Ready(Some(Ok((id, value))))` if one of the tasks in this `JoinSet` has completed. |
463 | /// The `value` is the return value of one of the tasks that completed, and |
464 | /// `id` is the [task ID] of that task. |
465 | /// * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been |
466 | /// aborted. The `err` is the `JoinError` from the panicked/aborted task. |
467 | /// * `Poll::Ready(None)` if the `JoinSet` is empty. |
468 | /// |
469 | /// Note that this method may return `Poll::Pending` even if one of the tasks has completed. |
470 | /// This can happen if the [coop budget] is reached. |
471 | /// |
472 | /// [coop budget]: crate::task#cooperative-scheduling |
473 | /// [task ID]: crate::task::Id |
474 | #[cfg (tokio_unstable)] |
475 | #[cfg_attr (docsrs, doc(cfg(tokio_unstable)))] |
476 | pub fn poll_join_next_with_id( |
477 | &mut self, |
478 | cx: &mut Context<'_>, |
479 | ) -> Poll<Option<Result<(Id, T), JoinError>>> { |
480 | // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to |
481 | // the `notified` list if the waker is notified in the `poll` call below. |
482 | let mut entry = match self.inner.pop_notified(cx.waker()) { |
483 | Some(entry) => entry, |
484 | None => { |
485 | if self.is_empty() { |
486 | return Poll::Ready(None); |
487 | } else { |
488 | // The waker was set by `pop_notified`. |
489 | return Poll::Pending; |
490 | } |
491 | } |
492 | }; |
493 | |
494 | let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx)); |
495 | |
496 | if let Poll::Ready(res) = res { |
497 | let entry = entry.remove(); |
498 | // If the task succeeded, add the task ID to the output. Otherwise, the |
499 | // `JoinError` will already have the task's ID. |
500 | Poll::Ready(Some(res.map(|output| (entry.id(), output)))) |
501 | } else { |
502 | // A JoinHandle generally won't emit a wakeup without being ready unless |
503 | // the coop limit has been reached. We yield to the executor in this |
504 | // case. |
505 | cx.waker().wake_by_ref(); |
506 | Poll::Pending |
507 | } |
508 | } |
509 | } |
510 | |
511 | impl<T> Drop for JoinSet<T> { |
512 | fn drop(&mut self) { |
513 | self.inner.drain(|join_handle: JoinHandle| join_handle.abort()); |
514 | } |
515 | } |
516 | |
517 | impl<T> fmt::Debug for JoinSet<T> { |
518 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
519 | f.debug_struct("JoinSet" ).field(name:"len" , &self.len()).finish() |
520 | } |
521 | } |
522 | |
523 | impl<T> Default for JoinSet<T> { |
524 | fn default() -> Self { |
525 | Self::new() |
526 | } |
527 | } |
528 | |
529 | // === impl Builder === |
530 | |
531 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
532 | #[cfg_attr (docsrs, doc(cfg(all(tokio_unstable, feature = "tracing" ))))] |
533 | impl<'a, T: 'static> Builder<'a, T> { |
534 | /// Assigns a name to the task which will be spawned. |
535 | pub fn name(self, name: &'a str) -> Self { |
536 | let builder = self.builder.name(name); |
537 | Self { builder, ..self } |
538 | } |
539 | |
540 | /// Spawn the provided task with this builder's settings and store it in the |
541 | /// [`JoinSet`], returning an [`AbortHandle`] that can be used to remotely |
542 | /// cancel the task. |
543 | /// |
544 | /// # Returns |
545 | /// |
546 | /// An [`AbortHandle`] that can be used to remotely cancel the task. |
547 | /// |
548 | /// # Panics |
549 | /// |
550 | /// This method panics if called outside of a Tokio runtime. |
551 | /// |
552 | /// [`AbortHandle`]: crate::task::AbortHandle |
553 | #[track_caller ] |
554 | pub fn spawn<F>(self, future: F) -> std::io::Result<AbortHandle> |
555 | where |
556 | F: Future<Output = T>, |
557 | F: Send + 'static, |
558 | T: Send, |
559 | { |
560 | Ok(self.joinset.insert(self.builder.spawn(future)?)) |
561 | } |
562 | |
563 | /// Spawn the provided task on the provided [runtime handle] with this |
564 | /// builder's settings, and store it in the [`JoinSet`]. |
565 | /// |
566 | /// # Returns |
567 | /// |
568 | /// An [`AbortHandle`] that can be used to remotely cancel the task. |
569 | /// |
570 | /// |
571 | /// [`AbortHandle`]: crate::task::AbortHandle |
572 | /// [runtime handle]: crate::runtime::Handle |
573 | #[track_caller ] |
574 | pub fn spawn_on<F>(self, future: F, handle: &Handle) -> std::io::Result<AbortHandle> |
575 | where |
576 | F: Future<Output = T>, |
577 | F: Send + 'static, |
578 | T: Send, |
579 | { |
580 | Ok(self.joinset.insert(self.builder.spawn_on(future, handle)?)) |
581 | } |
582 | |
583 | /// Spawn the provided task on the current [`LocalSet`] with this builder's |
584 | /// settings, and store it in the [`JoinSet`]. |
585 | /// |
586 | /// # Returns |
587 | /// |
588 | /// An [`AbortHandle`] that can be used to remotely cancel the task. |
589 | /// |
590 | /// # Panics |
591 | /// |
592 | /// This method panics if it is called outside of a `LocalSet`. |
593 | /// |
594 | /// [`LocalSet`]: crate::task::LocalSet |
595 | /// [`AbortHandle`]: crate::task::AbortHandle |
596 | #[track_caller ] |
597 | pub fn spawn_local<F>(self, future: F) -> std::io::Result<AbortHandle> |
598 | where |
599 | F: Future<Output = T>, |
600 | F: 'static, |
601 | { |
602 | Ok(self.joinset.insert(self.builder.spawn_local(future)?)) |
603 | } |
604 | |
605 | /// Spawn the provided task on the provided [`LocalSet`] with this builder's |
606 | /// settings, and store it in the [`JoinSet`]. |
607 | /// |
608 | /// # Returns |
609 | /// |
610 | /// An [`AbortHandle`] that can be used to remotely cancel the task. |
611 | /// |
612 | /// [`LocalSet`]: crate::task::LocalSet |
613 | /// [`AbortHandle`]: crate::task::AbortHandle |
614 | #[track_caller ] |
615 | pub fn spawn_local_on<F>(self, future: F, local_set: &LocalSet) -> std::io::Result<AbortHandle> |
616 | where |
617 | F: Future<Output = T>, |
618 | F: 'static, |
619 | { |
620 | Ok(self |
621 | .joinset |
622 | .insert(self.builder.spawn_local_on(future, local_set)?)) |
623 | } |
624 | } |
625 | |
626 | // Manual `Debug` impl so that `Builder` is `Debug` regardless of whether `T` is |
627 | // `Debug`. |
628 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
629 | #[cfg_attr (docsrs, doc(cfg(all(tokio_unstable, feature = "tracing" ))))] |
630 | impl<'a, T> fmt::Debug for Builder<'a, T> { |
631 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
632 | f.debug_struct("join_set::Builder" ) |
633 | .field("joinset" , &self.joinset) |
634 | .field("builder" , &self.builder) |
635 | .finish() |
636 | } |
637 | } |
638 | |