1//! Runs `!Send` futures on the current thread.
2use crate::loom::cell::UnsafeCell;
3use crate::loom::sync::{Arc, Mutex};
4use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task};
5use crate::runtime::{context, ThreadId};
6use crate::sync::AtomicWaker;
7use crate::util::RcCell;
8
9use std::cell::Cell;
10use std::collections::VecDeque;
11use std::fmt;
12use std::future::Future;
13use std::marker::PhantomData;
14use std::pin::Pin;
15use std::rc::Rc;
16use std::task::Poll;
17
18use pin_project_lite::pin_project;
19
20cfg_rt! {
21 /// A set of tasks which are executed on the same thread.
22 ///
23 /// In some cases, it is necessary to run one or more futures that do not
24 /// implement [`Send`] and thus are unsafe to send between threads. In these
25 /// cases, a [local task set] may be used to schedule one or more `!Send`
26 /// futures to run together on the same thread.
27 ///
28 /// For example, the following code will not compile:
29 ///
30 /// ```rust,compile_fail
31 /// use std::rc::Rc;
32 ///
33 /// #[tokio::main]
34 /// async fn main() {
35 /// // `Rc` does not implement `Send`, and thus may not be sent between
36 /// // threads safely.
37 /// let nonsend_data = Rc::new("my nonsend data...");
38 ///
39 /// let nonsend_data = nonsend_data.clone();
40 /// // Because the `async` block here moves `nonsend_data`, the future is `!Send`.
41 /// // Since `tokio::spawn` requires the spawned future to implement `Send`, this
42 /// // will not compile.
43 /// tokio::spawn(async move {
44 /// println!("{}", nonsend_data);
45 /// // ...
46 /// }).await.unwrap();
47 /// }
48 /// ```
49 ///
50 /// # Use with `run_until`
51 ///
52 /// To spawn `!Send` futures, we can use a local task set to schedule them
53 /// on the thread calling [`Runtime::block_on`]. When running inside of the
54 /// local task set, we can use [`task::spawn_local`], which can spawn
55 /// `!Send` futures. For example:
56 ///
57 /// ```rust
58 /// use std::rc::Rc;
59 /// use tokio::task;
60 ///
61 /// #[tokio::main]
62 /// async fn main() {
63 /// let nonsend_data = Rc::new("my nonsend data...");
64 ///
65 /// // Construct a local task set that can run `!Send` futures.
66 /// let local = task::LocalSet::new();
67 ///
68 /// // Run the local task set.
69 /// local.run_until(async move {
70 /// let nonsend_data = nonsend_data.clone();
71 /// // `spawn_local` ensures that the future is spawned on the local
72 /// // task set.
73 /// task::spawn_local(async move {
74 /// println!("{}", nonsend_data);
75 /// // ...
76 /// }).await.unwrap();
77 /// }).await;
78 /// }
79 /// ```
80 /// **Note:** The `run_until` method can only be used in `#[tokio::main]`,
81 /// `#[tokio::test]` or directly inside a call to [`Runtime::block_on`]. It
82 /// cannot be used inside a task spawned with `tokio::spawn`.
83 ///
84 /// ## Awaiting a `LocalSet`
85 ///
86 /// Additionally, a `LocalSet` itself implements `Future`, completing when
87 /// *all* tasks spawned on the `LocalSet` complete. This can be used to run
88 /// several futures on a `LocalSet` and drive the whole set until they
89 /// complete. For example,
90 ///
91 /// ```rust
92 /// use tokio::{task, time};
93 /// use std::rc::Rc;
94 ///
95 /// #[tokio::main]
96 /// async fn main() {
97 /// let nonsend_data = Rc::new("world");
98 /// let local = task::LocalSet::new();
99 ///
100 /// let nonsend_data2 = nonsend_data.clone();
101 /// local.spawn_local(async move {
102 /// // ...
103 /// println!("hello {}", nonsend_data2)
104 /// });
105 ///
106 /// local.spawn_local(async move {
107 /// time::sleep(time::Duration::from_millis(100)).await;
108 /// println!("goodbye {}", nonsend_data)
109 /// });
110 ///
111 /// // ...
112 ///
113 /// local.await;
114 /// }
115 /// ```
116 /// **Note:** Awaiting a `LocalSet` can only be done inside
117 /// `#[tokio::main]`, `#[tokio::test]` or directly inside a call to
118 /// [`Runtime::block_on`]. It cannot be used inside a task spawned with
119 /// `tokio::spawn`.
120 ///
121 /// ## Use inside `tokio::spawn`
122 ///
123 /// The two methods mentioned above cannot be used inside `tokio::spawn`, so
124 /// to spawn `!Send` futures from inside `tokio::spawn`, we need to do
125 /// something else. The solution is to create the `LocalSet` somewhere else,
126 /// and communicate with it using an [`mpsc`] channel.
127 ///
128 /// The following example puts the `LocalSet` inside a new thread.
129 /// ```
130 /// use tokio::runtime::Builder;
131 /// use tokio::sync::{mpsc, oneshot};
132 /// use tokio::task::LocalSet;
133 ///
134 /// // This struct describes the task you want to spawn. Here we include
135 /// // some simple examples. The oneshot channel allows sending a response
136 /// // to the spawner.
137 /// #[derive(Debug)]
138 /// enum Task {
139 /// PrintNumber(u32),
140 /// AddOne(u32, oneshot::Sender<u32>),
141 /// }
142 ///
143 /// #[derive(Clone)]
144 /// struct LocalSpawner {
145 /// send: mpsc::UnboundedSender<Task>,
146 /// }
147 ///
148 /// impl LocalSpawner {
149 /// pub fn new() -> Self {
150 /// let (send, mut recv) = mpsc::unbounded_channel();
151 ///
152 /// let rt = Builder::new_current_thread()
153 /// .enable_all()
154 /// .build()
155 /// .unwrap();
156 ///
157 /// std::thread::spawn(move || {
158 /// let local = LocalSet::new();
159 ///
160 /// local.spawn_local(async move {
161 /// while let Some(new_task) = recv.recv().await {
162 /// tokio::task::spawn_local(run_task(new_task));
163 /// }
164 /// // If the while loop returns, then all the LocalSpawner
165 /// // objects have been dropped.
166 /// });
167 ///
168 /// // This will return once all senders are dropped and all
169 /// // spawned tasks have returned.
170 /// rt.block_on(local);
171 /// });
172 ///
173 /// Self {
174 /// send,
175 /// }
176 /// }
177 ///
178 /// pub fn spawn(&self, task: Task) {
179 /// self.send.send(task).expect("Thread with LocalSet has shut down.");
180 /// }
181 /// }
182 ///
183 /// // This task may do !Send stuff. We use printing a number as an example,
184 /// // but it could be anything.
185 /// //
186 /// // The Task struct is an enum to support spawning many different kinds
187 /// // of operations.
188 /// async fn run_task(task: Task) {
189 /// match task {
190 /// Task::PrintNumber(n) => {
191 /// println!("{}", n);
192 /// },
193 /// Task::AddOne(n, response) => {
194 /// // We ignore failures to send the response.
195 /// let _ = response.send(n + 1);
196 /// },
197 /// }
198 /// }
199 ///
200 /// #[tokio::main]
201 /// async fn main() {
202 /// let spawner = LocalSpawner::new();
203 ///
204 /// let (send, response) = oneshot::channel();
205 /// spawner.spawn(Task::AddOne(10, send));
206 /// let eleven = response.await.unwrap();
207 /// assert_eq!(eleven, 11);
208 /// }
209 /// ```
210 ///
211 /// [`Send`]: trait@std::marker::Send
212 /// [local task set]: struct@LocalSet
213 /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
214 /// [`task::spawn_local`]: fn@spawn_local
215 /// [`mpsc`]: mod@crate::sync::mpsc
216 pub struct LocalSet {
217 /// Current scheduler tick.
218 tick: Cell<u8>,
219
220 /// State available from thread-local.
221 context: Rc<Context>,
222
223 /// This type should not be Send.
224 _not_send: PhantomData<*const ()>,
225 }
226}
227
228/// State available from the thread-local.
229struct Context {
230 /// State shared between threads.
231 shared: Arc<Shared>,
232
233 /// True if a task panicked without being handled and the local set is
234 /// configured to shutdown on unhandled panic.
235 unhandled_panic: Cell<bool>,
236}
237
238/// LocalSet state shared between threads.
239struct Shared {
240 /// # Safety
241 ///
242 /// This field must *only* be accessed from the thread that owns the
243 /// `LocalSet` (i.e., `Thread::current().id() == owner`).
244 local_state: LocalState,
245
246 /// Remote run queue sender.
247 queue: Mutex<Option<VecDeque<task::Notified<Arc<Shared>>>>>,
248
249 /// Wake the `LocalSet` task.
250 waker: AtomicWaker,
251
252 /// How to respond to unhandled task panics.
253 #[cfg(tokio_unstable)]
254 pub(crate) unhandled_panic: crate::runtime::UnhandledPanic,
255}
256
257/// Tracks the `LocalSet` state that must only be accessed from the thread that
258/// created the `LocalSet`.
259struct LocalState {
260 /// The `ThreadId` of the thread that owns the `LocalSet`.
261 owner: ThreadId,
262
263 /// Local run queue sender and receiver.
264 local_queue: UnsafeCell<VecDeque<task::Notified<Arc<Shared>>>>,
265
266 /// Collection of all active tasks spawned onto this executor.
267 owned: LocalOwnedTasks<Arc<Shared>>,
268}
269
270pin_project! {
271 #[derive(Debug)]
272 struct RunUntil<'a, F> {
273 local_set: &'a LocalSet,
274 #[pin]
275 future: F,
276 }
277}
278
279tokio_thread_local!(static CURRENT: LocalData = const { LocalData {
280 ctx: RcCell::new(),
281} });
282
283struct LocalData {
284 ctx: RcCell<Context>,
285}
286
287cfg_rt! {
288 /// Spawns a `!Send` future on the current [`LocalSet`].
289 ///
290 /// The spawned future will run on the same thread that called `spawn_local`.
291 ///
292 /// The provided future will start running in the background immediately
293 /// when `spawn_local` is called, even if you don't await the returned
294 /// `JoinHandle`.
295 ///
296 /// # Panics
297 ///
298 /// This function panics if called outside of a [`LocalSet`].
299 ///
300 /// Note that if [`tokio::spawn`] is used from within a `LocalSet`, the
301 /// resulting new task will _not_ be inside the `LocalSet`, so you must use
302 /// `spawn_local` if you want to stay within the `LocalSet`.
303 ///
304 /// # Examples
305 ///
306 /// ```rust
307 /// use std::rc::Rc;
308 /// use tokio::task;
309 ///
310 /// #[tokio::main]
311 /// async fn main() {
312 /// let nonsend_data = Rc::new("my nonsend data...");
313 ///
314 /// let local = task::LocalSet::new();
315 ///
316 /// // Run the local task set.
317 /// local.run_until(async move {
318 /// let nonsend_data = nonsend_data.clone();
319 /// task::spawn_local(async move {
320 /// println!("{}", nonsend_data);
321 /// // ...
322 /// }).await.unwrap();
323 /// }).await;
324 /// }
325 /// ```
326 ///
327 /// [`LocalSet`]: struct@crate::task::LocalSet
328 /// [`tokio::spawn`]: fn@crate::task::spawn
329 #[track_caller]
330 pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output>
331 where
332 F: Future + 'static,
333 F::Output: 'static,
334 {
335 spawn_local_inner(future, None)
336 }
337
338
339 #[track_caller]
340 pub(super) fn spawn_local_inner<F>(future: F, name: Option<&str>) -> JoinHandle<F::Output>
341 where F: Future + 'static,
342 F::Output: 'static
343 {
344 match CURRENT.with(|LocalData { ctx, .. }| ctx.get()) {
345 None => panic!("`spawn_local` called from outside of a `task::LocalSet`"),
346 Some(cx) => cx.spawn(future, name)
347 }
348 }
349}
350
351/// Initial queue capacity.
352const INITIAL_CAPACITY: usize = 64;
353
354/// Max number of tasks to poll per tick.
355const MAX_TASKS_PER_TICK: usize = 61;
356
357/// How often it check the remote queue first.
358const REMOTE_FIRST_INTERVAL: u8 = 31;
359
360/// Context guard for LocalSet
361pub struct LocalEnterGuard(Option<Rc<Context>>);
362
363impl Drop for LocalEnterGuard {
364 fn drop(&mut self) {
365 CURRENT.with(|LocalData { ctx: &RcCell, .. }| {
366 ctx.set(self.0.take());
367 })
368 }
369}
370
371impl fmt::Debug for LocalEnterGuard {
372 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
373 f.debug_struct(name:"LocalEnterGuard").finish()
374 }
375}
376
377impl LocalSet {
378 /// Returns a new local task set.
379 pub fn new() -> LocalSet {
380 let owner = context::thread_id().expect("cannot create LocalSet during thread shutdown");
381
382 LocalSet {
383 tick: Cell::new(0),
384 context: Rc::new(Context {
385 shared: Arc::new(Shared {
386 local_state: LocalState {
387 owner,
388 owned: LocalOwnedTasks::new(),
389 local_queue: UnsafeCell::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
390 },
391 queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
392 waker: AtomicWaker::new(),
393 #[cfg(tokio_unstable)]
394 unhandled_panic: crate::runtime::UnhandledPanic::Ignore,
395 }),
396 unhandled_panic: Cell::new(false),
397 }),
398 _not_send: PhantomData,
399 }
400 }
401
402 /// Enters the context of this `LocalSet`.
403 ///
404 /// The [`spawn_local`] method will spawn tasks on the `LocalSet` whose
405 /// context you are inside.
406 ///
407 /// [`spawn_local`]: fn@crate::task::spawn_local
408 pub fn enter(&self) -> LocalEnterGuard {
409 CURRENT.with(|LocalData { ctx, .. }| {
410 let old = ctx.replace(Some(self.context.clone()));
411 LocalEnterGuard(old)
412 })
413 }
414
415 /// Spawns a `!Send` task onto the local task set.
416 ///
417 /// This task is guaranteed to be run on the current thread.
418 ///
419 /// Unlike the free function [`spawn_local`], this method may be used to
420 /// spawn local tasks when the `LocalSet` is _not_ running. The provided
421 /// future will start running once the `LocalSet` is next started, even if
422 /// you don't await the returned `JoinHandle`.
423 ///
424 /// # Examples
425 ///
426 /// ```rust
427 /// use tokio::task;
428 ///
429 /// #[tokio::main]
430 /// async fn main() {
431 /// let local = task::LocalSet::new();
432 ///
433 /// // Spawn a future on the local set. This future will be run when
434 /// // we call `run_until` to drive the task set.
435 /// local.spawn_local(async {
436 /// // ...
437 /// });
438 ///
439 /// // Run the local task set.
440 /// local.run_until(async move {
441 /// // ...
442 /// }).await;
443 ///
444 /// // When `run` finishes, we can spawn _more_ futures, which will
445 /// // run in subsequent calls to `run_until`.
446 /// local.spawn_local(async {
447 /// // ...
448 /// });
449 ///
450 /// local.run_until(async move {
451 /// // ...
452 /// }).await;
453 /// }
454 /// ```
455 /// [`spawn_local`]: fn@spawn_local
456 #[track_caller]
457 pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
458 where
459 F: Future + 'static,
460 F::Output: 'static,
461 {
462 self.spawn_named(future, None)
463 }
464
465 /// Runs a future to completion on the provided runtime, driving any local
466 /// futures spawned on this task set on the current thread.
467 ///
468 /// This runs the given future on the runtime, blocking until it is
469 /// complete, and yielding its resolved result. Any tasks or timers which
470 /// the future spawns internally will be executed on the runtime. The future
471 /// may also call [`spawn_local`] to spawn_local additional local futures on the
472 /// current thread.
473 ///
474 /// This method should not be called from an asynchronous context.
475 ///
476 /// # Panics
477 ///
478 /// This function panics if the executor is at capacity, if the provided
479 /// future panics, or if called within an asynchronous execution context.
480 ///
481 /// # Notes
482 ///
483 /// Since this function internally calls [`Runtime::block_on`], and drives
484 /// futures in the local task set inside that call to `block_on`, the local
485 /// futures may not use [in-place blocking]. If a blocking call needs to be
486 /// issued from a local task, the [`spawn_blocking`] API may be used instead.
487 ///
488 /// For example, this will panic:
489 /// ```should_panic
490 /// use tokio::runtime::Runtime;
491 /// use tokio::task;
492 ///
493 /// let rt = Runtime::new().unwrap();
494 /// let local = task::LocalSet::new();
495 /// local.block_on(&rt, async {
496 /// let join = task::spawn_local(async {
497 /// let blocking_result = task::block_in_place(|| {
498 /// // ...
499 /// });
500 /// // ...
501 /// });
502 /// join.await.unwrap();
503 /// })
504 /// ```
505 /// This, however, will not panic:
506 /// ```
507 /// use tokio::runtime::Runtime;
508 /// use tokio::task;
509 ///
510 /// let rt = Runtime::new().unwrap();
511 /// let local = task::LocalSet::new();
512 /// local.block_on(&rt, async {
513 /// let join = task::spawn_local(async {
514 /// let blocking_result = task::spawn_blocking(|| {
515 /// // ...
516 /// }).await;
517 /// // ...
518 /// });
519 /// join.await.unwrap();
520 /// })
521 /// ```
522 ///
523 /// [`spawn_local`]: fn@spawn_local
524 /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
525 /// [in-place blocking]: fn@crate::task::block_in_place
526 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
527 #[track_caller]
528 #[cfg(feature = "rt")]
529 #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
530 pub fn block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output
531 where
532 F: Future,
533 {
534 rt.block_on(self.run_until(future))
535 }
536
537 /// Runs a future to completion on the local set, returning its output.
538 ///
539 /// This returns a future that runs the given future with a local set,
540 /// allowing it to call [`spawn_local`] to spawn additional `!Send` futures.
541 /// Any local futures spawned on the local set will be driven in the
542 /// background until the future passed to `run_until` completes. When the future
543 /// passed to `run` finishes, any local futures which have not completed
544 /// will remain on the local set, and will be driven on subsequent calls to
545 /// `run_until` or when [awaiting the local set] itself.
546 ///
547 /// # Examples
548 ///
549 /// ```rust
550 /// use tokio::task;
551 ///
552 /// #[tokio::main]
553 /// async fn main() {
554 /// task::LocalSet::new().run_until(async {
555 /// task::spawn_local(async move {
556 /// // ...
557 /// }).await.unwrap();
558 /// // ...
559 /// }).await;
560 /// }
561 /// ```
562 ///
563 /// [`spawn_local`]: fn@spawn_local
564 /// [awaiting the local set]: #awaiting-a-localset
565 pub async fn run_until<F>(&self, future: F) -> F::Output
566 where
567 F: Future,
568 {
569 let run_until = RunUntil {
570 future,
571 local_set: self,
572 };
573 run_until.await
574 }
575
576 pub(in crate::task) fn spawn_named<F>(
577 &self,
578 future: F,
579 name: Option<&str>,
580 ) -> JoinHandle<F::Output>
581 where
582 F: Future + 'static,
583 F::Output: 'static,
584 {
585 let handle = self.context.spawn(future, name);
586
587 // Because a task was spawned from *outside* the `LocalSet`, wake the
588 // `LocalSet` future to execute the new task, if it hasn't been woken.
589 //
590 // Spawning via the free fn `spawn` does not require this, as it can
591 // only be called from *within* a future executing on the `LocalSet` —
592 // in that case, the `LocalSet` must already be awake.
593 self.context.shared.waker.wake();
594 handle
595 }
596
597 /// Ticks the scheduler, returning whether the local future needs to be
598 /// notified again.
599 fn tick(&self) -> bool {
600 for _ in 0..MAX_TASKS_PER_TICK {
601 // Make sure we didn't hit an unhandled panic
602 if self.context.unhandled_panic.get() {
603 panic!("a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic");
604 }
605
606 match self.next_task() {
607 // Run the task
608 //
609 // Safety: As spawned tasks are `!Send`, `run_unchecked` must be
610 // used. We are responsible for maintaining the invariant that
611 // `run_unchecked` is only called on threads that spawned the
612 // task initially. Because `LocalSet` itself is `!Send`, and
613 // `spawn_local` spawns into the `LocalSet` on the current
614 // thread, the invariant is maintained.
615 Some(task) => crate::runtime::coop::budget(|| task.run()),
616 // We have fully drained the queue of notified tasks, so the
617 // local future doesn't need to be notified again — it can wait
618 // until something else wakes a task in the local set.
619 None => return false,
620 }
621 }
622
623 true
624 }
625
626 fn next_task(&self) -> Option<task::LocalNotified<Arc<Shared>>> {
627 let tick = self.tick.get();
628 self.tick.set(tick.wrapping_add(1));
629
630 let task = if tick % REMOTE_FIRST_INTERVAL == 0 {
631 self.context
632 .shared
633 .queue
634 .lock()
635 .as_mut()
636 .and_then(|queue| queue.pop_front())
637 .or_else(|| self.pop_local())
638 } else {
639 self.pop_local().or_else(|| {
640 self.context
641 .shared
642 .queue
643 .lock()
644 .as_mut()
645 .and_then(|queue| queue.pop_front())
646 })
647 };
648
649 task.map(|task| unsafe {
650 // Safety: because the `LocalSet` itself is `!Send`, we know we are
651 // on the same thread if we have access to the `LocalSet`, and can
652 // therefore access the local run queue.
653 self.context.shared.local_state.assert_owner(task)
654 })
655 }
656
657 fn pop_local(&self) -> Option<task::Notified<Arc<Shared>>> {
658 unsafe {
659 // Safety: because the `LocalSet` itself is `!Send`, we know we are
660 // on the same thread if we have access to the `LocalSet`, and can
661 // therefore access the local run queue.
662 self.context.shared.local_state.task_pop_front()
663 }
664 }
665
666 fn with<T>(&self, f: impl FnOnce() -> T) -> T {
667 CURRENT.with(|LocalData { ctx, .. }| {
668 struct Reset<'a> {
669 ctx_ref: &'a RcCell<Context>,
670 val: Option<Rc<Context>>,
671 }
672 impl<'a> Drop for Reset<'a> {
673 fn drop(&mut self) {
674 self.ctx_ref.set(self.val.take());
675 }
676 }
677 let old = ctx.replace(Some(self.context.clone()));
678
679 let _reset = Reset {
680 ctx_ref: ctx,
681 val: old,
682 };
683
684 f()
685 })
686 }
687
688 /// This method is like `with`, but it just calls `f` without setting the thread-local if that
689 /// fails.
690 fn with_if_possible<T>(&self, f: impl FnOnce() -> T) -> T {
691 let mut f = Some(f);
692
693 let res = CURRENT.try_with(|LocalData { ctx, .. }| {
694 struct Reset<'a> {
695 ctx_ref: &'a RcCell<Context>,
696 val: Option<Rc<Context>>,
697 }
698 impl<'a> Drop for Reset<'a> {
699 fn drop(&mut self) {
700 self.ctx_ref.replace(self.val.take());
701 }
702 }
703 let old = ctx.replace(Some(self.context.clone()));
704
705 let _reset = Reset {
706 ctx_ref: ctx,
707 val: old,
708 };
709
710 (f.take().unwrap())()
711 });
712
713 match res {
714 Ok(res) => res,
715 Err(_access_error) => (f.take().unwrap())(),
716 }
717 }
718}
719
720cfg_unstable! {
721 impl LocalSet {
722 /// Configure how the `LocalSet` responds to an unhandled panic on a
723 /// spawned task.
724 ///
725 /// By default, an unhandled panic (i.e. a panic not caught by
726 /// [`std::panic::catch_unwind`]) has no impact on the `LocalSet`'s
727 /// execution. The panic is error value is forwarded to the task's
728 /// [`JoinHandle`] and all other spawned tasks continue running.
729 ///
730 /// The `unhandled_panic` option enables configuring this behavior.
731 ///
732 /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
733 /// spawned tasks have no impact on the `LocalSet`'s execution.
734 /// * `UnhandledPanic::ShutdownRuntime` will force the `LocalSet` to
735 /// shutdown immediately when a spawned task panics even if that
736 /// task's `JoinHandle` has not been dropped. All other spawned tasks
737 /// will immediately terminate and further calls to
738 /// [`LocalSet::block_on`] and [`LocalSet::run_until`] will panic.
739 ///
740 /// # Panics
741 ///
742 /// This method panics if called after the `LocalSet` has started
743 /// running.
744 ///
745 /// # Unstable
746 ///
747 /// This option is currently unstable and its implementation is
748 /// incomplete. The API may change or be removed in the future. See
749 /// tokio-rs/tokio#4516 for more details.
750 ///
751 /// # Examples
752 ///
753 /// The following demonstrates a `LocalSet` configured to shutdown on
754 /// panic. The first spawned task panics and results in the `LocalSet`
755 /// shutting down. The second spawned task never has a chance to
756 /// execute. The call to `run_until` will panic due to the runtime being
757 /// forcibly shutdown.
758 ///
759 /// ```should_panic
760 /// use tokio::runtime::UnhandledPanic;
761 ///
762 /// # #[tokio::main]
763 /// # async fn main() {
764 /// tokio::task::LocalSet::new()
765 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
766 /// .run_until(async {
767 /// tokio::task::spawn_local(async { panic!("boom"); });
768 /// tokio::task::spawn_local(async {
769 /// // This task never completes
770 /// });
771 ///
772 /// // Do some work, but `run_until` will panic before it completes
773 /// # loop { tokio::task::yield_now().await; }
774 /// })
775 /// .await;
776 /// # }
777 /// ```
778 ///
779 /// [`JoinHandle`]: struct@crate::task::JoinHandle
780 pub fn unhandled_panic(&mut self, behavior: crate::runtime::UnhandledPanic) -> &mut Self {
781 // TODO: This should be set as a builder
782 Rc::get_mut(&mut self.context)
783 .and_then(|ctx| Arc::get_mut(&mut ctx.shared))
784 .expect("Unhandled Panic behavior modified after starting LocalSet")
785 .unhandled_panic = behavior;
786 self
787 }
788 }
789}
790
791impl fmt::Debug for LocalSet {
792 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
793 fmt.debug_struct(name:"LocalSet").finish()
794 }
795}
796
797impl Future for LocalSet {
798 type Output = ();
799
800 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
801 // Register the waker before starting to work
802 self.context.shared.waker.register_by_ref(cx.waker());
803
804 if self.with(|| self.tick()) {
805 // If `tick` returns true, we need to notify the local future again:
806 // there are still tasks remaining in the run queue.
807 cx.waker().wake_by_ref();
808 Poll::Pending
809
810 // Safety: called from the thread that owns `LocalSet`. Because
811 // `LocalSet` is `!Send`, this is safe.
812 } else if unsafe { self.context.shared.local_state.owned_is_empty() } {
813 // If the scheduler has no remaining futures, we're done!
814 Poll::Ready(())
815 } else {
816 // There are still futures in the local set, but we've polled all the
817 // futures in the run queue. Therefore, we can just return Pending
818 // since the remaining futures will be woken from somewhere else.
819 Poll::Pending
820 }
821 }
822}
823
824impl Default for LocalSet {
825 fn default() -> LocalSet {
826 LocalSet::new()
827 }
828}
829
830impl Drop for LocalSet {
831 fn drop(&mut self) {
832 self.with_if_possible(|| {
833 // Shut down all tasks in the LocalOwnedTasks and close it to
834 // prevent new tasks from ever being added.
835 unsafe {
836 // Safety: called from the thread that owns `LocalSet`
837 self.context.shared.local_state.close_and_shutdown_all();
838 }
839
840 // We already called shutdown on all tasks above, so there is no
841 // need to call shutdown.
842
843 // Safety: note that this *intentionally* bypasses the unsafe
844 // `Shared::local_queue()` method. This is in order to avoid the
845 // debug assertion that we are on the thread that owns the
846 // `LocalSet`, because on some systems (e.g. at least some macOS
847 // versions), attempting to get the current thread ID can panic due
848 // to the thread's local data that stores the thread ID being
849 // dropped *before* the `LocalSet`.
850 //
851 // Despite avoiding the assertion here, it is safe for us to access
852 // the local queue in `Drop`, because the `LocalSet` itself is
853 // `!Send`, so we can reasonably guarantee that it will not be
854 // `Drop`ped from another thread.
855 let local_queue = unsafe {
856 // Safety: called from the thread that owns `LocalSet`
857 self.context.shared.local_state.take_local_queue()
858 };
859 for task in local_queue {
860 drop(task);
861 }
862
863 // Take the queue from the Shared object to prevent pushing
864 // notifications to it in the future.
865 let queue = self.context.shared.queue.lock().take().unwrap();
866 for task in queue {
867 drop(task);
868 }
869
870 // Safety: called from the thread that owns `LocalSet`
871 assert!(unsafe { self.context.shared.local_state.owned_is_empty() });
872 });
873 }
874}
875
876// === impl Context ===
877
878impl Context {
879 #[track_caller]
880 fn spawn<F>(&self, future: F, name: Option<&str>) -> JoinHandle<F::Output>
881 where
882 F: Future + 'static,
883 F::Output: 'static,
884 {
885 let id = crate::runtime::task::Id::next();
886 let future = crate::util::trace::task(future, "local", name, id.as_u64());
887
888 // Safety: called from the thread that owns the `LocalSet`
889 let (handle, notified) = {
890 self.shared.local_state.assert_called_from_owner_thread();
891 self.shared
892 .local_state
893 .owned
894 .bind(future, self.shared.clone(), id)
895 };
896
897 if let Some(notified) = notified {
898 self.shared.schedule(notified);
899 }
900
901 handle
902 }
903}
904
905// === impl LocalFuture ===
906
907impl<T: Future> Future for RunUntil<'_, T> {
908 type Output = T::Output;
909
910 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
911 let me = self.project();
912
913 me.local_set.with(|| {
914 me.local_set
915 .context
916 .shared
917 .waker
918 .register_by_ref(cx.waker());
919
920 let _no_blocking = crate::runtime::context::disallow_block_in_place();
921 let f = me.future;
922
923 if let Poll::Ready(output) = f.poll(cx) {
924 return Poll::Ready(output);
925 }
926
927 if me.local_set.tick() {
928 // If `tick` returns `true`, we need to notify the local future again:
929 // there are still tasks remaining in the run queue.
930 cx.waker().wake_by_ref();
931 }
932
933 Poll::Pending
934 })
935 }
936}
937
938impl Shared {
939 /// Schedule the provided task on the scheduler.
940 fn schedule(&self, task: task::Notified<Arc<Self>>) {
941 CURRENT.with(|localdata| {
942 match localdata.ctx.get() {
943 Some(cx) if cx.shared.ptr_eq(self) => unsafe {
944 // Safety: if the current `LocalSet` context points to this
945 // `LocalSet`, then we are on the thread that owns it.
946 cx.shared.local_state.task_push_back(task);
947 },
948
949 // We are on the thread that owns the `LocalSet`, so we can
950 // wake to the local queue.
951 _ if context::thread_id().ok() == Some(self.local_state.owner) => {
952 unsafe {
953 // Safety: we just checked that the thread ID matches
954 // the localset's owner, so this is safe.
955 self.local_state.task_push_back(task);
956 }
957 // We still have to wake the `LocalSet`, because it isn't
958 // currently being polled.
959 self.waker.wake();
960 }
961
962 // We are *not* on the thread that owns the `LocalSet`, so we
963 // have to wake to the remote queue.
964 _ => {
965 // First, check whether the queue is still there (if not, the
966 // LocalSet is dropped). Then push to it if so, and if not,
967 // do nothing.
968 let mut lock = self.queue.lock();
969
970 if let Some(queue) = lock.as_mut() {
971 queue.push_back(task);
972 drop(lock);
973 self.waker.wake();
974 }
975 }
976 }
977 });
978 }
979
980 fn ptr_eq(&self, other: &Shared) -> bool {
981 std::ptr::eq(self, other)
982 }
983}
984
985// This is safe because (and only because) we *pinky pwomise* to never touch the
986// local run queue except from the thread that owns the `LocalSet`.
987unsafe impl Sync for Shared {}
988
989impl task::Schedule for Arc<Shared> {
990 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
991 // Safety, this is always called from the thread that owns `LocalSet`
992 unsafe { self.local_state.task_remove(task) }
993 }
994
995 fn schedule(&self, task: task::Notified<Self>) {
996 Shared::schedule(self, task);
997 }
998
999 cfg_unstable! {
1000 fn unhandled_panic(&self) {
1001 use crate::runtime::UnhandledPanic;
1002
1003 match self.unhandled_panic {
1004 UnhandledPanic::Ignore => {
1005 // Do nothing
1006 }
1007 UnhandledPanic::ShutdownRuntime => {
1008 // This hook is only called from within the runtime, so
1009 // `CURRENT` should match with `&self`, i.e. there is no
1010 // opportunity for a nested scheduler to be called.
1011 CURRENT.with(|LocalData { ctx, .. }| match ctx.get() {
1012 Some(cx) if Arc::ptr_eq(self, &cx.shared) => {
1013 cx.unhandled_panic.set(true);
1014 // Safety: this is always called from the thread that owns `LocalSet`
1015 unsafe { cx.shared.local_state.close_and_shutdown_all(); }
1016 }
1017 _ => unreachable!("runtime core not set in CURRENT thread-local"),
1018 })
1019 }
1020 }
1021 }
1022 }
1023}
1024
1025impl LocalState {
1026 unsafe fn task_pop_front(&self) -> Option<task::Notified<Arc<Shared>>> {
1027 // The caller ensures it is called from the same thread that owns
1028 // the LocalSet.
1029 self.assert_called_from_owner_thread();
1030
1031 self.local_queue.with_mut(|ptr| (*ptr).pop_front())
1032 }
1033
1034 unsafe fn task_push_back(&self, task: task::Notified<Arc<Shared>>) {
1035 // The caller ensures it is called from the same thread that owns
1036 // the LocalSet.
1037 self.assert_called_from_owner_thread();
1038
1039 self.local_queue.with_mut(|ptr| (*ptr).push_back(task))
1040 }
1041
1042 unsafe fn take_local_queue(&self) -> VecDeque<task::Notified<Arc<Shared>>> {
1043 // The caller ensures it is called from the same thread that owns
1044 // the LocalSet.
1045 self.assert_called_from_owner_thread();
1046
1047 self.local_queue.with_mut(|ptr| std::mem::take(&mut (*ptr)))
1048 }
1049
1050 unsafe fn task_remove(&self, task: &Task<Arc<Shared>>) -> Option<Task<Arc<Shared>>> {
1051 // The caller ensures it is called from the same thread that owns
1052 // the LocalSet.
1053 self.assert_called_from_owner_thread();
1054
1055 self.owned.remove(task)
1056 }
1057
1058 /// Returns true if the `LocalSet` does not have any spawned tasks
1059 unsafe fn owned_is_empty(&self) -> bool {
1060 // The caller ensures it is called from the same thread that owns
1061 // the LocalSet.
1062 self.assert_called_from_owner_thread();
1063
1064 self.owned.is_empty()
1065 }
1066
1067 unsafe fn assert_owner(
1068 &self,
1069 task: task::Notified<Arc<Shared>>,
1070 ) -> task::LocalNotified<Arc<Shared>> {
1071 // The caller ensures it is called from the same thread that owns
1072 // the LocalSet.
1073 self.assert_called_from_owner_thread();
1074
1075 self.owned.assert_owner(task)
1076 }
1077
1078 unsafe fn close_and_shutdown_all(&self) {
1079 // The caller ensures it is called from the same thread that owns
1080 // the LocalSet.
1081 self.assert_called_from_owner_thread();
1082
1083 self.owned.close_and_shutdown_all()
1084 }
1085
1086 #[track_caller]
1087 fn assert_called_from_owner_thread(&self) {
1088 // FreeBSD has some weirdness around thread-local destruction.
1089 // TODO: remove this hack when thread id is cleaned up
1090 #[cfg(not(any(target_os = "openbsd", target_os = "freebsd")))]
1091 debug_assert!(
1092 // if we couldn't get the thread ID because we're dropping the local
1093 // data, skip the assertion --- the `Drop` impl is not going to be
1094 // called from another thread, because `LocalSet` is `!Send`
1095 context::thread_id()
1096 .map(|id| id == self.owner)
1097 .unwrap_or(true),
1098 "`LocalSet`'s local run queue must not be accessed by another thread!"
1099 );
1100 }
1101}
1102
1103// This is `Send` because it is stored in `Shared`. It is up to the caller to
1104// ensure they are on the same thread that owns the `LocalSet`.
1105unsafe impl Send for LocalState {}
1106
1107#[cfg(all(test, not(loom)))]
1108mod tests {
1109 use super::*;
1110
1111 // Does a `LocalSet` running on a current-thread runtime...basically work?
1112 //
1113 // This duplicates a test in `tests/task_local_set.rs`, but because this is
1114 // a lib test, it wil run under Miri, so this is necessary to catch stacked
1115 // borrows violations in the `LocalSet` implementation.
1116 #[test]
1117 fn local_current_thread_scheduler() {
1118 let f = async {
1119 LocalSet::new()
1120 .run_until(async {
1121 spawn_local(async {}).await.unwrap();
1122 })
1123 .await;
1124 };
1125 crate::runtime::Builder::new_current_thread()
1126 .build()
1127 .expect("rt")
1128 .block_on(f)
1129 }
1130
1131 // Tests that when a task on a `LocalSet` is woken by an io driver on the
1132 // same thread, the task is woken to the localset's local queue rather than
1133 // its remote queue.
1134 //
1135 // This test has to be defined in the `local.rs` file as a lib test, rather
1136 // than in `tests/`, because it makes assertions about the local set's
1137 // internal state.
1138 #[test]
1139 fn wakes_to_local_queue() {
1140 use super::*;
1141 use crate::sync::Notify;
1142 let rt = crate::runtime::Builder::new_current_thread()
1143 .build()
1144 .expect("rt");
1145 rt.block_on(async {
1146 let local = LocalSet::new();
1147 let notify = Arc::new(Notify::new());
1148 let task = local.spawn_local({
1149 let notify = notify.clone();
1150 async move {
1151 notify.notified().await;
1152 }
1153 });
1154 let mut run_until = Box::pin(local.run_until(async move {
1155 task.await.unwrap();
1156 }));
1157
1158 // poll the run until future once
1159 crate::future::poll_fn(|cx| {
1160 let _ = run_until.as_mut().poll(cx);
1161 Poll::Ready(())
1162 })
1163 .await;
1164
1165 notify.notify_one();
1166 let task = unsafe { local.context.shared.local_state.task_pop_front() };
1167 // TODO(eliza): it would be nice to be able to assert that this is
1168 // the local task.
1169 assert!(
1170 task.is_some(),
1171 "task should have been notified to the LocalSet's local queue"
1172 );
1173 })
1174 }
1175}
1176