1//! Runs `!Send` futures on the current thread.
2use crate::loom::cell::UnsafeCell;
3use crate::loom::sync::{Arc, Mutex};
4#[cfg(tokio_unstable)]
5use crate::runtime;
6use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task};
7use crate::runtime::{context, ThreadId};
8use crate::sync::AtomicWaker;
9use crate::util::RcCell;
10
11use std::cell::Cell;
12use std::collections::VecDeque;
13use std::fmt;
14use std::future::Future;
15use std::marker::PhantomData;
16use std::pin::Pin;
17use std::rc::Rc;
18use std::task::Poll;
19
20use pin_project_lite::pin_project;
21
22cfg_rt! {
23 /// A set of tasks which are executed on the same thread.
24 ///
25 /// In some cases, it is necessary to run one or more futures that do not
26 /// implement [`Send`] and thus are unsafe to send between threads. In these
27 /// cases, a [local task set] may be used to schedule one or more `!Send`
28 /// futures to run together on the same thread.
29 ///
30 /// For example, the following code will not compile:
31 ///
32 /// ```rust,compile_fail
33 /// use std::rc::Rc;
34 ///
35 /// #[tokio::main]
36 /// async fn main() {
37 /// // `Rc` does not implement `Send`, and thus may not be sent between
38 /// // threads safely.
39 /// let nonsend_data = Rc::new("my nonsend data...");
40 ///
41 /// let nonsend_data = nonsend_data.clone();
42 /// // Because the `async` block here moves `nonsend_data`, the future is `!Send`.
43 /// // Since `tokio::spawn` requires the spawned future to implement `Send`, this
44 /// // will not compile.
45 /// tokio::spawn(async move {
46 /// println!("{}", nonsend_data);
47 /// // ...
48 /// }).await.unwrap();
49 /// }
50 /// ```
51 ///
52 /// # Use with `run_until`
53 ///
54 /// To spawn `!Send` futures, we can use a local task set to schedule them
55 /// on the thread calling [`Runtime::block_on`]. When running inside of the
56 /// local task set, we can use [`task::spawn_local`], which can spawn
57 /// `!Send` futures. For example:
58 ///
59 /// ```rust
60 /// use std::rc::Rc;
61 /// use tokio::task;
62 ///
63 /// #[tokio::main]
64 /// async fn main() {
65 /// let nonsend_data = Rc::new("my nonsend data...");
66 ///
67 /// // Construct a local task set that can run `!Send` futures.
68 /// let local = task::LocalSet::new();
69 ///
70 /// // Run the local task set.
71 /// local.run_until(async move {
72 /// let nonsend_data = nonsend_data.clone();
73 /// // `spawn_local` ensures that the future is spawned on the local
74 /// // task set.
75 /// task::spawn_local(async move {
76 /// println!("{}", nonsend_data);
77 /// // ...
78 /// }).await.unwrap();
79 /// }).await;
80 /// }
81 /// ```
82 /// **Note:** The `run_until` method can only be used in `#[tokio::main]`,
83 /// `#[tokio::test]` or directly inside a call to [`Runtime::block_on`]. It
84 /// cannot be used inside a task spawned with `tokio::spawn`.
85 ///
86 /// ## Awaiting a `LocalSet`
87 ///
88 /// Additionally, a `LocalSet` itself implements `Future`, completing when
89 /// *all* tasks spawned on the `LocalSet` complete. This can be used to run
90 /// several futures on a `LocalSet` and drive the whole set until they
91 /// complete. For example,
92 ///
93 /// ```rust
94 /// use tokio::{task, time};
95 /// use std::rc::Rc;
96 ///
97 /// #[tokio::main]
98 /// async fn main() {
99 /// let nonsend_data = Rc::new("world");
100 /// let local = task::LocalSet::new();
101 ///
102 /// let nonsend_data2 = nonsend_data.clone();
103 /// local.spawn_local(async move {
104 /// // ...
105 /// println!("hello {}", nonsend_data2)
106 /// });
107 ///
108 /// local.spawn_local(async move {
109 /// time::sleep(time::Duration::from_millis(100)).await;
110 /// println!("goodbye {}", nonsend_data)
111 /// });
112 ///
113 /// // ...
114 ///
115 /// local.await;
116 /// }
117 /// ```
118 /// **Note:** Awaiting a `LocalSet` can only be done inside
119 /// `#[tokio::main]`, `#[tokio::test]` or directly inside a call to
120 /// [`Runtime::block_on`]. It cannot be used inside a task spawned with
121 /// `tokio::spawn`.
122 ///
123 /// ## Use inside `tokio::spawn`
124 ///
125 /// The two methods mentioned above cannot be used inside `tokio::spawn`, so
126 /// to spawn `!Send` futures from inside `tokio::spawn`, we need to do
127 /// something else. The solution is to create the `LocalSet` somewhere else,
128 /// and communicate with it using an [`mpsc`] channel.
129 ///
130 /// The following example puts the `LocalSet` inside a new thread.
131 /// ```
132 /// use tokio::runtime::Builder;
133 /// use tokio::sync::{mpsc, oneshot};
134 /// use tokio::task::LocalSet;
135 ///
136 /// // This struct describes the task you want to spawn. Here we include
137 /// // some simple examples. The oneshot channel allows sending a response
138 /// // to the spawner.
139 /// #[derive(Debug)]
140 /// enum Task {
141 /// PrintNumber(u32),
142 /// AddOne(u32, oneshot::Sender<u32>),
143 /// }
144 ///
145 /// #[derive(Clone)]
146 /// struct LocalSpawner {
147 /// send: mpsc::UnboundedSender<Task>,
148 /// }
149 ///
150 /// impl LocalSpawner {
151 /// pub fn new() -> Self {
152 /// let (send, mut recv) = mpsc::unbounded_channel();
153 ///
154 /// let rt = Builder::new_current_thread()
155 /// .enable_all()
156 /// .build()
157 /// .unwrap();
158 ///
159 /// std::thread::spawn(move || {
160 /// let local = LocalSet::new();
161 ///
162 /// local.spawn_local(async move {
163 /// while let Some(new_task) = recv.recv().await {
164 /// tokio::task::spawn_local(run_task(new_task));
165 /// }
166 /// // If the while loop returns, then all the LocalSpawner
167 /// // objects have been dropped.
168 /// });
169 ///
170 /// // This will return once all senders are dropped and all
171 /// // spawned tasks have returned.
172 /// rt.block_on(local);
173 /// });
174 ///
175 /// Self {
176 /// send,
177 /// }
178 /// }
179 ///
180 /// pub fn spawn(&self, task: Task) {
181 /// self.send.send(task).expect("Thread with LocalSet has shut down.");
182 /// }
183 /// }
184 ///
185 /// // This task may do !Send stuff. We use printing a number as an example,
186 /// // but it could be anything.
187 /// //
188 /// // The Task struct is an enum to support spawning many different kinds
189 /// // of operations.
190 /// async fn run_task(task: Task) {
191 /// match task {
192 /// Task::PrintNumber(n) => {
193 /// println!("{}", n);
194 /// },
195 /// Task::AddOne(n, response) => {
196 /// // We ignore failures to send the response.
197 /// let _ = response.send(n + 1);
198 /// },
199 /// }
200 /// }
201 ///
202 /// #[tokio::main]
203 /// async fn main() {
204 /// let spawner = LocalSpawner::new();
205 ///
206 /// let (send, response) = oneshot::channel();
207 /// spawner.spawn(Task::AddOne(10, send));
208 /// let eleven = response.await.unwrap();
209 /// assert_eq!(eleven, 11);
210 /// }
211 /// ```
212 ///
213 /// [`Send`]: trait@std::marker::Send
214 /// [local task set]: struct@LocalSet
215 /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
216 /// [`task::spawn_local`]: fn@spawn_local
217 /// [`mpsc`]: mod@crate::sync::mpsc
218 pub struct LocalSet {
219 /// Current scheduler tick.
220 tick: Cell<u8>,
221
222 /// State available from thread-local.
223 context: Rc<Context>,
224
225 /// This type should not be Send.
226 _not_send: PhantomData<*const ()>,
227 }
228}
229
230/// State available from the thread-local.
231struct Context {
232 /// State shared between threads.
233 shared: Arc<Shared>,
234
235 /// True if a task panicked without being handled and the local set is
236 /// configured to shutdown on unhandled panic.
237 unhandled_panic: Cell<bool>,
238}
239
240/// `LocalSet` state shared between threads.
241struct Shared {
242 /// # Safety
243 ///
244 /// This field must *only* be accessed from the thread that owns the
245 /// `LocalSet` (i.e., `Thread::current().id() == owner`).
246 local_state: LocalState,
247
248 /// Remote run queue sender.
249 queue: Mutex<Option<VecDeque<task::Notified<Arc<Shared>>>>>,
250
251 /// Wake the `LocalSet` task.
252 waker: AtomicWaker,
253
254 /// How to respond to unhandled task panics.
255 #[cfg(tokio_unstable)]
256 pub(crate) unhandled_panic: crate::runtime::UnhandledPanic,
257}
258
259/// Tracks the `LocalSet` state that must only be accessed from the thread that
260/// created the `LocalSet`.
261struct LocalState {
262 /// The `ThreadId` of the thread that owns the `LocalSet`.
263 owner: ThreadId,
264
265 /// Local run queue sender and receiver.
266 local_queue: UnsafeCell<VecDeque<task::Notified<Arc<Shared>>>>,
267
268 /// Collection of all active tasks spawned onto this executor.
269 owned: LocalOwnedTasks<Arc<Shared>>,
270}
271
272pin_project! {
273 #[derive(Debug)]
274 struct RunUntil<'a, F> {
275 local_set: &'a LocalSet,
276 #[pin]
277 future: F,
278 }
279}
280
281tokio_thread_local!(static CURRENT: LocalData = const { LocalData {
282 ctx: RcCell::new(),
283 wake_on_schedule: Cell::new(false),
284} });
285
286struct LocalData {
287 ctx: RcCell<Context>,
288 wake_on_schedule: Cell<bool>,
289}
290
291impl LocalData {
292 /// Should be called except when we call `LocalSet::enter`.
293 /// Especially when we poll a `LocalSet`.
294 #[must_use = "dropping this guard will reset the entered state"]
295 fn enter(&self, ctx: Rc<Context>) -> LocalDataEnterGuard<'_> {
296 let ctx: Option> = self.ctx.replace(val:Some(ctx));
297 let wake_on_schedule: bool = self.wake_on_schedule.replace(val:false);
298 LocalDataEnterGuard {
299 local_data_ref: self,
300 ctx,
301 wake_on_schedule,
302 }
303 }
304}
305
306/// A guard for `LocalData::enter()`
307struct LocalDataEnterGuard<'a> {
308 local_data_ref: &'a LocalData,
309 ctx: Option<Rc<Context>>,
310 wake_on_schedule: bool,
311}
312
313impl<'a> Drop for LocalDataEnterGuard<'a> {
314 fn drop(&mut self) {
315 self.local_data_ref.ctx.set(self.ctx.take());
316 self.local_data_ref
317 .wake_on_schedule
318 .set(self.wake_on_schedule)
319 }
320}
321
322cfg_rt! {
323 /// Spawns a `!Send` future on the current [`LocalSet`].
324 ///
325 /// The spawned future will run on the same thread that called `spawn_local`.
326 ///
327 /// The provided future will start running in the background immediately
328 /// when `spawn_local` is called, even if you don't await the returned
329 /// `JoinHandle`.
330 ///
331 /// # Panics
332 ///
333 /// This function panics if called outside of a [`LocalSet`].
334 ///
335 /// Note that if [`tokio::spawn`] is used from within a `LocalSet`, the
336 /// resulting new task will _not_ be inside the `LocalSet`, so you must use
337 /// `spawn_local` if you want to stay within the `LocalSet`.
338 ///
339 /// # Examples
340 ///
341 /// ```rust
342 /// use std::rc::Rc;
343 /// use tokio::task;
344 ///
345 /// #[tokio::main]
346 /// async fn main() {
347 /// let nonsend_data = Rc::new("my nonsend data...");
348 ///
349 /// let local = task::LocalSet::new();
350 ///
351 /// // Run the local task set.
352 /// local.run_until(async move {
353 /// let nonsend_data = nonsend_data.clone();
354 /// task::spawn_local(async move {
355 /// println!("{}", nonsend_data);
356 /// // ...
357 /// }).await.unwrap();
358 /// }).await;
359 /// }
360 /// ```
361 ///
362 /// [`LocalSet`]: struct@crate::task::LocalSet
363 /// [`tokio::spawn`]: fn@crate::task::spawn
364 #[track_caller]
365 pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output>
366 where
367 F: Future + 'static,
368 F::Output: 'static,
369 {
370 spawn_local_inner(future, None)
371 }
372
373
374 #[track_caller]
375 pub(super) fn spawn_local_inner<F>(future: F, name: Option<&str>) -> JoinHandle<F::Output>
376 where F: Future + 'static,
377 F::Output: 'static
378 {
379 match CURRENT.with(|LocalData { ctx, .. }| ctx.get()) {
380 None => panic!("`spawn_local` called from outside of a `task::LocalSet`"),
381 Some(cx) => cx.spawn(future, name)
382 }
383 }
384}
385
386/// Initial queue capacity.
387const INITIAL_CAPACITY: usize = 64;
388
389/// Max number of tasks to poll per tick.
390const MAX_TASKS_PER_TICK: usize = 61;
391
392/// How often it check the remote queue first.
393const REMOTE_FIRST_INTERVAL: u8 = 31;
394
395/// Context guard for `LocalSet`
396pub struct LocalEnterGuard {
397 ctx: Option<Rc<Context>>,
398
399 /// Distinguishes whether the context was entered or being polled.
400 /// When we enter it, the value `wake_on_schedule` is set. In this case
401 /// `spawn_local` refers the context, whereas it is not being polled now.
402 wake_on_schedule: bool,
403}
404
405impl Drop for LocalEnterGuard {
406 fn drop(&mut self) {
407 CURRENT.with(
408 |LocalData {
409 ctx: &RcCell,
410 wake_on_schedule: &Cell,
411 }| {
412 ctx.set(self.ctx.take());
413 wake_on_schedule.set(self.wake_on_schedule);
414 },
415 );
416 }
417}
418
419impl fmt::Debug for LocalEnterGuard {
420 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
421 f.debug_struct(name:"LocalEnterGuard").finish()
422 }
423}
424
425impl LocalSet {
426 /// Returns a new local task set.
427 pub fn new() -> LocalSet {
428 let owner = context::thread_id().expect("cannot create LocalSet during thread shutdown");
429
430 LocalSet {
431 tick: Cell::new(0),
432 context: Rc::new(Context {
433 shared: Arc::new(Shared {
434 local_state: LocalState {
435 owner,
436 owned: LocalOwnedTasks::new(),
437 local_queue: UnsafeCell::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
438 },
439 queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
440 waker: AtomicWaker::new(),
441 #[cfg(tokio_unstable)]
442 unhandled_panic: crate::runtime::UnhandledPanic::Ignore,
443 }),
444 unhandled_panic: Cell::new(false),
445 }),
446 _not_send: PhantomData,
447 }
448 }
449
450 /// Enters the context of this `LocalSet`.
451 ///
452 /// The [`spawn_local`] method will spawn tasks on the `LocalSet` whose
453 /// context you are inside.
454 ///
455 /// [`spawn_local`]: fn@crate::task::spawn_local
456 pub fn enter(&self) -> LocalEnterGuard {
457 CURRENT.with(
458 |LocalData {
459 ctx,
460 wake_on_schedule,
461 ..
462 }| {
463 let ctx = ctx.replace(Some(self.context.clone()));
464 let wake_on_schedule = wake_on_schedule.replace(true);
465 LocalEnterGuard {
466 ctx,
467 wake_on_schedule,
468 }
469 },
470 )
471 }
472
473 /// Spawns a `!Send` task onto the local task set.
474 ///
475 /// This task is guaranteed to be run on the current thread.
476 ///
477 /// Unlike the free function [`spawn_local`], this method may be used to
478 /// spawn local tasks when the `LocalSet` is _not_ running. The provided
479 /// future will start running once the `LocalSet` is next started, even if
480 /// you don't await the returned `JoinHandle`.
481 ///
482 /// # Examples
483 ///
484 /// ```rust
485 /// use tokio::task;
486 ///
487 /// #[tokio::main]
488 /// async fn main() {
489 /// let local = task::LocalSet::new();
490 ///
491 /// // Spawn a future on the local set. This future will be run when
492 /// // we call `run_until` to drive the task set.
493 /// local.spawn_local(async {
494 /// // ...
495 /// });
496 ///
497 /// // Run the local task set.
498 /// local.run_until(async move {
499 /// // ...
500 /// }).await;
501 ///
502 /// // When `run` finishes, we can spawn _more_ futures, which will
503 /// // run in subsequent calls to `run_until`.
504 /// local.spawn_local(async {
505 /// // ...
506 /// });
507 ///
508 /// local.run_until(async move {
509 /// // ...
510 /// }).await;
511 /// }
512 /// ```
513 /// [`spawn_local`]: fn@spawn_local
514 #[track_caller]
515 pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
516 where
517 F: Future + 'static,
518 F::Output: 'static,
519 {
520 self.spawn_named(future, None)
521 }
522
523 /// Runs a future to completion on the provided runtime, driving any local
524 /// futures spawned on this task set on the current thread.
525 ///
526 /// This runs the given future on the runtime, blocking until it is
527 /// complete, and yielding its resolved result. Any tasks or timers which
528 /// the future spawns internally will be executed on the runtime. The future
529 /// may also call [`spawn_local`] to `spawn_local` additional local futures on the
530 /// current thread.
531 ///
532 /// This method should not be called from an asynchronous context.
533 ///
534 /// # Panics
535 ///
536 /// This function panics if the executor is at capacity, if the provided
537 /// future panics, or if called within an asynchronous execution context.
538 ///
539 /// # Notes
540 ///
541 /// Since this function internally calls [`Runtime::block_on`], and drives
542 /// futures in the local task set inside that call to `block_on`, the local
543 /// futures may not use [in-place blocking]. If a blocking call needs to be
544 /// issued from a local task, the [`spawn_blocking`] API may be used instead.
545 ///
546 /// For example, this will panic:
547 /// ```should_panic
548 /// use tokio::runtime::Runtime;
549 /// use tokio::task;
550 ///
551 /// let rt = Runtime::new().unwrap();
552 /// let local = task::LocalSet::new();
553 /// local.block_on(&rt, async {
554 /// let join = task::spawn_local(async {
555 /// let blocking_result = task::block_in_place(|| {
556 /// // ...
557 /// });
558 /// // ...
559 /// });
560 /// join.await.unwrap();
561 /// })
562 /// ```
563 /// This, however, will not panic:
564 /// ```
565 /// use tokio::runtime::Runtime;
566 /// use tokio::task;
567 ///
568 /// let rt = Runtime::new().unwrap();
569 /// let local = task::LocalSet::new();
570 /// local.block_on(&rt, async {
571 /// let join = task::spawn_local(async {
572 /// let blocking_result = task::spawn_blocking(|| {
573 /// // ...
574 /// }).await;
575 /// // ...
576 /// });
577 /// join.await.unwrap();
578 /// })
579 /// ```
580 ///
581 /// [`spawn_local`]: fn@spawn_local
582 /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
583 /// [in-place blocking]: fn@crate::task::block_in_place
584 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
585 #[track_caller]
586 #[cfg(feature = "rt")]
587 #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
588 pub fn block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output
589 where
590 F: Future,
591 {
592 rt.block_on(self.run_until(future))
593 }
594
595 /// Runs a future to completion on the local set, returning its output.
596 ///
597 /// This returns a future that runs the given future with a local set,
598 /// allowing it to call [`spawn_local`] to spawn additional `!Send` futures.
599 /// Any local futures spawned on the local set will be driven in the
600 /// background until the future passed to `run_until` completes. When the future
601 /// passed to `run` finishes, any local futures which have not completed
602 /// will remain on the local set, and will be driven on subsequent calls to
603 /// `run_until` or when [awaiting the local set] itself.
604 ///
605 /// # Cancel safety
606 ///
607 /// This method is cancel safe when `future` is cancel safe.
608 ///
609 /// # Examples
610 ///
611 /// ```rust
612 /// use tokio::task;
613 ///
614 /// #[tokio::main]
615 /// async fn main() {
616 /// task::LocalSet::new().run_until(async {
617 /// task::spawn_local(async move {
618 /// // ...
619 /// }).await.unwrap();
620 /// // ...
621 /// }).await;
622 /// }
623 /// ```
624 ///
625 /// [`spawn_local`]: fn@spawn_local
626 /// [awaiting the local set]: #awaiting-a-localset
627 pub async fn run_until<F>(&self, future: F) -> F::Output
628 where
629 F: Future,
630 {
631 let run_until = RunUntil {
632 future,
633 local_set: self,
634 };
635 run_until.await
636 }
637
638 #[track_caller]
639 pub(in crate::task) fn spawn_named<F>(
640 &self,
641 future: F,
642 name: Option<&str>,
643 ) -> JoinHandle<F::Output>
644 where
645 F: Future + 'static,
646 F::Output: 'static,
647 {
648 let handle = self.context.spawn(future, name);
649
650 // Because a task was spawned from *outside* the `LocalSet`, wake the
651 // `LocalSet` future to execute the new task, if it hasn't been woken.
652 //
653 // Spawning via the free fn `spawn` does not require this, as it can
654 // only be called from *within* a future executing on the `LocalSet` —
655 // in that case, the `LocalSet` must already be awake.
656 self.context.shared.waker.wake();
657 handle
658 }
659
660 /// Ticks the scheduler, returning whether the local future needs to be
661 /// notified again.
662 fn tick(&self) -> bool {
663 for _ in 0..MAX_TASKS_PER_TICK {
664 // Make sure we didn't hit an unhandled panic
665 assert!(!self.context.unhandled_panic.get(), "a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic");
666
667 match self.next_task() {
668 // Run the task
669 //
670 // Safety: As spawned tasks are `!Send`, `run_unchecked` must be
671 // used. We are responsible for maintaining the invariant that
672 // `run_unchecked` is only called on threads that spawned the
673 // task initially. Because `LocalSet` itself is `!Send`, and
674 // `spawn_local` spawns into the `LocalSet` on the current
675 // thread, the invariant is maintained.
676 Some(task) => crate::runtime::coop::budget(|| task.run()),
677 // We have fully drained the queue of notified tasks, so the
678 // local future doesn't need to be notified again — it can wait
679 // until something else wakes a task in the local set.
680 None => return false,
681 }
682 }
683
684 true
685 }
686
687 fn next_task(&self) -> Option<task::LocalNotified<Arc<Shared>>> {
688 let tick = self.tick.get();
689 self.tick.set(tick.wrapping_add(1));
690
691 let task = if tick % REMOTE_FIRST_INTERVAL == 0 {
692 self.context
693 .shared
694 .queue
695 .lock()
696 .as_mut()
697 .and_then(|queue| queue.pop_front())
698 .or_else(|| self.pop_local())
699 } else {
700 self.pop_local().or_else(|| {
701 self.context
702 .shared
703 .queue
704 .lock()
705 .as_mut()
706 .and_then(VecDeque::pop_front)
707 })
708 };
709
710 task.map(|task| unsafe {
711 // Safety: because the `LocalSet` itself is `!Send`, we know we are
712 // on the same thread if we have access to the `LocalSet`, and can
713 // therefore access the local run queue.
714 self.context.shared.local_state.assert_owner(task)
715 })
716 }
717
718 fn pop_local(&self) -> Option<task::Notified<Arc<Shared>>> {
719 unsafe {
720 // Safety: because the `LocalSet` itself is `!Send`, we know we are
721 // on the same thread if we have access to the `LocalSet`, and can
722 // therefore access the local run queue.
723 self.context.shared.local_state.task_pop_front()
724 }
725 }
726
727 fn with<T>(&self, f: impl FnOnce() -> T) -> T {
728 CURRENT.with(|local_data| {
729 let _guard = local_data.enter(self.context.clone());
730 f()
731 })
732 }
733
734 /// This method is like `with`, but it just calls `f` without setting the thread-local if that
735 /// fails.
736 fn with_if_possible<T>(&self, f: impl FnOnce() -> T) -> T {
737 let mut f = Some(f);
738
739 let res = CURRENT.try_with(|local_data| {
740 let _guard = local_data.enter(self.context.clone());
741 (f.take().unwrap())()
742 });
743
744 match res {
745 Ok(res) => res,
746 Err(_access_error) => (f.take().unwrap())(),
747 }
748 }
749}
750
751cfg_unstable! {
752 impl LocalSet {
753 /// Configure how the `LocalSet` responds to an unhandled panic on a
754 /// spawned task.
755 ///
756 /// By default, an unhandled panic (i.e. a panic not caught by
757 /// [`std::panic::catch_unwind`]) has no impact on the `LocalSet`'s
758 /// execution. The panic is error value is forwarded to the task's
759 /// [`JoinHandle`] and all other spawned tasks continue running.
760 ///
761 /// The `unhandled_panic` option enables configuring this behavior.
762 ///
763 /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
764 /// spawned tasks have no impact on the `LocalSet`'s execution.
765 /// * `UnhandledPanic::ShutdownRuntime` will force the `LocalSet` to
766 /// shutdown immediately when a spawned task panics even if that
767 /// task's `JoinHandle` has not been dropped. All other spawned tasks
768 /// will immediately terminate and further calls to
769 /// [`LocalSet::block_on`] and [`LocalSet::run_until`] will panic.
770 ///
771 /// # Panics
772 ///
773 /// This method panics if called after the `LocalSet` has started
774 /// running.
775 ///
776 /// # Unstable
777 ///
778 /// This option is currently unstable and its implementation is
779 /// incomplete. The API may change or be removed in the future. See
780 /// tokio-rs/tokio#4516 for more details.
781 ///
782 /// # Examples
783 ///
784 /// The following demonstrates a `LocalSet` configured to shutdown on
785 /// panic. The first spawned task panics and results in the `LocalSet`
786 /// shutting down. The second spawned task never has a chance to
787 /// execute. The call to `run_until` will panic due to the runtime being
788 /// forcibly shutdown.
789 ///
790 /// ```should_panic
791 /// use tokio::runtime::UnhandledPanic;
792 ///
793 /// # #[tokio::main]
794 /// # async fn main() {
795 /// tokio::task::LocalSet::new()
796 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
797 /// .run_until(async {
798 /// tokio::task::spawn_local(async { panic!("boom"); });
799 /// tokio::task::spawn_local(async {
800 /// // This task never completes
801 /// });
802 ///
803 /// // Do some work, but `run_until` will panic before it completes
804 /// # loop { tokio::task::yield_now().await; }
805 /// })
806 /// .await;
807 /// # }
808 /// ```
809 ///
810 /// [`JoinHandle`]: struct@crate::task::JoinHandle
811 pub fn unhandled_panic(&mut self, behavior: crate::runtime::UnhandledPanic) -> &mut Self {
812 // TODO: This should be set as a builder
813 Rc::get_mut(&mut self.context)
814 .and_then(|ctx| Arc::get_mut(&mut ctx.shared))
815 .expect("Unhandled Panic behavior modified after starting LocalSet")
816 .unhandled_panic = behavior;
817 self
818 }
819
820 /// Returns the [`Id`] of the current `LocalSet` runtime.
821 ///
822 /// # Examples
823 ///
824 /// ```rust
825 /// use tokio::task;
826 ///
827 /// #[tokio::main]
828 /// async fn main() {
829 /// let local_set = task::LocalSet::new();
830 /// println!("Local set id: {}", local_set.id());
831 /// }
832 /// ```
833 ///
834 /// **Note**: This is an [unstable API][unstable]. The public API of this type
835 /// may break in 1.x releases. See [the documentation on unstable
836 /// features][unstable] for details.
837 ///
838 /// [unstable]: crate#unstable-features
839 /// [`Id`]: struct@crate::runtime::Id
840 pub fn id(&self) -> runtime::Id {
841 self.context.shared.local_state.owned.id.into()
842 }
843 }
844}
845
846impl fmt::Debug for LocalSet {
847 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
848 fmt.debug_struct(name:"LocalSet").finish()
849 }
850}
851
852impl Future for LocalSet {
853 type Output = ();
854
855 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
856 // Register the waker before starting to work
857 self.context.shared.waker.register_by_ref(cx.waker());
858
859 if self.with(|| self.tick()) {
860 // If `tick` returns true, we need to notify the local future again:
861 // there are still tasks remaining in the run queue.
862 cx.waker().wake_by_ref();
863 Poll::Pending
864
865 // Safety: called from the thread that owns `LocalSet`. Because
866 // `LocalSet` is `!Send`, this is safe.
867 } else if unsafe { self.context.shared.local_state.owned_is_empty() } {
868 // If the scheduler has no remaining futures, we're done!
869 Poll::Ready(())
870 } else {
871 // There are still futures in the local set, but we've polled all the
872 // futures in the run queue. Therefore, we can just return Pending
873 // since the remaining futures will be woken from somewhere else.
874 Poll::Pending
875 }
876 }
877}
878
879impl Default for LocalSet {
880 fn default() -> LocalSet {
881 LocalSet::new()
882 }
883}
884
885impl Drop for LocalSet {
886 fn drop(&mut self) {
887 self.with_if_possible(|| {
888 // Shut down all tasks in the LocalOwnedTasks and close it to
889 // prevent new tasks from ever being added.
890 unsafe {
891 // Safety: called from the thread that owns `LocalSet`
892 self.context.shared.local_state.close_and_shutdown_all();
893 }
894
895 // We already called shutdown on all tasks above, so there is no
896 // need to call shutdown.
897
898 // Safety: note that this *intentionally* bypasses the unsafe
899 // `Shared::local_queue()` method. This is in order to avoid the
900 // debug assertion that we are on the thread that owns the
901 // `LocalSet`, because on some systems (e.g. at least some macOS
902 // versions), attempting to get the current thread ID can panic due
903 // to the thread's local data that stores the thread ID being
904 // dropped *before* the `LocalSet`.
905 //
906 // Despite avoiding the assertion here, it is safe for us to access
907 // the local queue in `Drop`, because the `LocalSet` itself is
908 // `!Send`, so we can reasonably guarantee that it will not be
909 // `Drop`ped from another thread.
910 let local_queue = unsafe {
911 // Safety: called from the thread that owns `LocalSet`
912 self.context.shared.local_state.take_local_queue()
913 };
914 for task in local_queue {
915 drop(task);
916 }
917
918 // Take the queue from the Shared object to prevent pushing
919 // notifications to it in the future.
920 let queue = self.context.shared.queue.lock().take().unwrap();
921 for task in queue {
922 drop(task);
923 }
924
925 // Safety: called from the thread that owns `LocalSet`
926 assert!(unsafe { self.context.shared.local_state.owned_is_empty() });
927 });
928 }
929}
930
931// === impl Context ===
932
933impl Context {
934 #[track_caller]
935 fn spawn<F>(&self, future: F, name: Option<&str>) -> JoinHandle<F::Output>
936 where
937 F: Future + 'static,
938 F::Output: 'static,
939 {
940 let id = crate::runtime::task::Id::next();
941 let future = crate::util::trace::task(future, "local", name, id.as_u64());
942
943 // Safety: called from the thread that owns the `LocalSet`
944 let (handle, notified) = {
945 self.shared.local_state.assert_called_from_owner_thread();
946 self.shared
947 .local_state
948 .owned
949 .bind(future, self.shared.clone(), id)
950 };
951
952 if let Some(notified) = notified {
953 self.shared.schedule(notified);
954 }
955
956 handle
957 }
958}
959
960// === impl LocalFuture ===
961
962impl<T: Future> Future for RunUntil<'_, T> {
963 type Output = T::Output;
964
965 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
966 let me = self.project();
967
968 me.local_set.with(|| {
969 me.local_set
970 .context
971 .shared
972 .waker
973 .register_by_ref(cx.waker());
974
975 let _no_blocking = crate::runtime::context::disallow_block_in_place();
976 let f = me.future;
977
978 if let Poll::Ready(output) = f.poll(cx) {
979 return Poll::Ready(output);
980 }
981
982 if me.local_set.tick() {
983 // If `tick` returns `true`, we need to notify the local future again:
984 // there are still tasks remaining in the run queue.
985 cx.waker().wake_by_ref();
986 }
987
988 Poll::Pending
989 })
990 }
991}
992
993impl Shared {
994 /// Schedule the provided task on the scheduler.
995 fn schedule(&self, task: task::Notified<Arc<Self>>) {
996 CURRENT.with(|localdata| {
997 match localdata.ctx.get() {
998 // If the current `LocalSet` is being polled, we don't need to wake it.
999 // When we `enter` it, then the value `wake_on_schedule` is set to be true.
1000 // In this case it is not being polled, so we need to wake it.
1001 Some(cx) if cx.shared.ptr_eq(self) && !localdata.wake_on_schedule.get() => unsafe {
1002 // Safety: if the current `LocalSet` context points to this
1003 // `LocalSet`, then we are on the thread that owns it.
1004 cx.shared.local_state.task_push_back(task);
1005 },
1006
1007 // We are on the thread that owns the `LocalSet`, so we can
1008 // wake to the local queue.
1009 _ if context::thread_id().ok() == Some(self.local_state.owner) => {
1010 unsafe {
1011 // Safety: we just checked that the thread ID matches
1012 // the localset's owner, so this is safe.
1013 self.local_state.task_push_back(task);
1014 }
1015 // We still have to wake the `LocalSet`, because it isn't
1016 // currently being polled.
1017 self.waker.wake();
1018 }
1019
1020 // We are *not* on the thread that owns the `LocalSet`, so we
1021 // have to wake to the remote queue.
1022 _ => {
1023 // First, check whether the queue is still there (if not, the
1024 // LocalSet is dropped). Then push to it if so, and if not,
1025 // do nothing.
1026 let mut lock = self.queue.lock();
1027
1028 if let Some(queue) = lock.as_mut() {
1029 queue.push_back(task);
1030 drop(lock);
1031 self.waker.wake();
1032 }
1033 }
1034 }
1035 });
1036 }
1037
1038 fn ptr_eq(&self, other: &Shared) -> bool {
1039 std::ptr::eq(self, other)
1040 }
1041}
1042
1043// This is safe because (and only because) we *pinky pwomise* to never touch the
1044// local run queue except from the thread that owns the `LocalSet`.
1045unsafe impl Sync for Shared {}
1046
1047impl task::Schedule for Arc<Shared> {
1048 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
1049 // Safety, this is always called from the thread that owns `LocalSet`
1050 unsafe { self.local_state.task_remove(task) }
1051 }
1052
1053 fn schedule(&self, task: task::Notified<Self>) {
1054 Shared::schedule(self, task);
1055 }
1056
1057 cfg_unstable! {
1058 fn unhandled_panic(&self) {
1059 use crate::runtime::UnhandledPanic;
1060
1061 match self.unhandled_panic {
1062 UnhandledPanic::Ignore => {
1063 // Do nothing
1064 }
1065 UnhandledPanic::ShutdownRuntime => {
1066 // This hook is only called from within the runtime, so
1067 // `CURRENT` should match with `&self`, i.e. there is no
1068 // opportunity for a nested scheduler to be called.
1069 CURRENT.with(|LocalData { ctx, .. }| match ctx.get() {
1070 Some(cx) if Arc::ptr_eq(self, &cx.shared) => {
1071 cx.unhandled_panic.set(true);
1072 // Safety: this is always called from the thread that owns `LocalSet`
1073 unsafe { cx.shared.local_state.close_and_shutdown_all(); }
1074 }
1075 _ => unreachable!("runtime core not set in CURRENT thread-local"),
1076 })
1077 }
1078 }
1079 }
1080 }
1081}
1082
1083impl LocalState {
1084 unsafe fn task_pop_front(&self) -> Option<task::Notified<Arc<Shared>>> {
1085 // The caller ensures it is called from the same thread that owns
1086 // the LocalSet.
1087 self.assert_called_from_owner_thread();
1088
1089 self.local_queue.with_mut(|ptr| (*ptr).pop_front())
1090 }
1091
1092 unsafe fn task_push_back(&self, task: task::Notified<Arc<Shared>>) {
1093 // The caller ensures it is called from the same thread that owns
1094 // the LocalSet.
1095 self.assert_called_from_owner_thread();
1096
1097 self.local_queue.with_mut(|ptr| (*ptr).push_back(task));
1098 }
1099
1100 unsafe fn take_local_queue(&self) -> VecDeque<task::Notified<Arc<Shared>>> {
1101 // The caller ensures it is called from the same thread that owns
1102 // the LocalSet.
1103 self.assert_called_from_owner_thread();
1104
1105 self.local_queue.with_mut(|ptr| std::mem::take(&mut (*ptr)))
1106 }
1107
1108 unsafe fn task_remove(&self, task: &Task<Arc<Shared>>) -> Option<Task<Arc<Shared>>> {
1109 // The caller ensures it is called from the same thread that owns
1110 // the LocalSet.
1111 self.assert_called_from_owner_thread();
1112
1113 self.owned.remove(task)
1114 }
1115
1116 /// Returns true if the `LocalSet` does not have any spawned tasks
1117 unsafe fn owned_is_empty(&self) -> bool {
1118 // The caller ensures it is called from the same thread that owns
1119 // the LocalSet.
1120 self.assert_called_from_owner_thread();
1121
1122 self.owned.is_empty()
1123 }
1124
1125 unsafe fn assert_owner(
1126 &self,
1127 task: task::Notified<Arc<Shared>>,
1128 ) -> task::LocalNotified<Arc<Shared>> {
1129 // The caller ensures it is called from the same thread that owns
1130 // the LocalSet.
1131 self.assert_called_from_owner_thread();
1132
1133 self.owned.assert_owner(task)
1134 }
1135
1136 unsafe fn close_and_shutdown_all(&self) {
1137 // The caller ensures it is called from the same thread that owns
1138 // the LocalSet.
1139 self.assert_called_from_owner_thread();
1140
1141 self.owned.close_and_shutdown_all();
1142 }
1143
1144 #[track_caller]
1145 fn assert_called_from_owner_thread(&self) {
1146 // FreeBSD has some weirdness around thread-local destruction.
1147 // TODO: remove this hack when thread id is cleaned up
1148 #[cfg(not(any(target_os = "openbsd", target_os = "freebsd")))]
1149 debug_assert!(
1150 // if we couldn't get the thread ID because we're dropping the local
1151 // data, skip the assertion --- the `Drop` impl is not going to be
1152 // called from another thread, because `LocalSet` is `!Send`
1153 context::thread_id()
1154 .map(|id| id == self.owner)
1155 .unwrap_or(true),
1156 "`LocalSet`'s local run queue must not be accessed by another thread!"
1157 );
1158 }
1159}
1160
1161// This is `Send` because it is stored in `Shared`. It is up to the caller to
1162// ensure they are on the same thread that owns the `LocalSet`.
1163unsafe impl Send for LocalState {}
1164
1165#[cfg(all(test, not(loom)))]
1166mod tests {
1167 use super::*;
1168
1169 // Does a `LocalSet` running on a current-thread runtime...basically work?
1170 //
1171 // This duplicates a test in `tests/task_local_set.rs`, but because this is
1172 // a lib test, it will run under Miri, so this is necessary to catch stacked
1173 // borrows violations in the `LocalSet` implementation.
1174 #[test]
1175 fn local_current_thread_scheduler() {
1176 let f = async {
1177 LocalSet::new()
1178 .run_until(async {
1179 spawn_local(async {}).await.unwrap();
1180 })
1181 .await;
1182 };
1183 crate::runtime::Builder::new_current_thread()
1184 .build()
1185 .expect("rt")
1186 .block_on(f)
1187 }
1188
1189 // Tests that when a task on a `LocalSet` is woken by an io driver on the
1190 // same thread, the task is woken to the localset's local queue rather than
1191 // its remote queue.
1192 //
1193 // This test has to be defined in the `local.rs` file as a lib test, rather
1194 // than in `tests/`, because it makes assertions about the local set's
1195 // internal state.
1196 #[test]
1197 fn wakes_to_local_queue() {
1198 use super::*;
1199 use crate::sync::Notify;
1200 let rt = crate::runtime::Builder::new_current_thread()
1201 .build()
1202 .expect("rt");
1203 rt.block_on(async {
1204 let local = LocalSet::new();
1205 let notify = Arc::new(Notify::new());
1206 let task = local.spawn_local({
1207 let notify = notify.clone();
1208 async move {
1209 notify.notified().await;
1210 }
1211 });
1212 let mut run_until = Box::pin(local.run_until(async move {
1213 task.await.unwrap();
1214 }));
1215
1216 // poll the run until future once
1217 crate::future::poll_fn(|cx| {
1218 let _ = run_until.as_mut().poll(cx);
1219 Poll::Ready(())
1220 })
1221 .await;
1222
1223 notify.notify_one();
1224 let task = unsafe { local.context.shared.local_state.task_pop_front() };
1225 // TODO(eliza): it would be nice to be able to assert that this is
1226 // the local task.
1227 assert!(
1228 task.is_some(),
1229 "task should have been notified to the LocalSet's local queue"
1230 );
1231 })
1232 }
1233}
1234