1use crate::future::poll_fn;
2use crate::loom::sync::atomic::AtomicBool;
3use crate::loom::sync::Arc;
4use crate::runtime::driver::{self, Driver};
5use crate::runtime::scheduler::{self, Defer, Inject};
6use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
7use crate::runtime::{blocking, context, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics};
8use crate::sync::notify::Notify;
9use crate::util::atomic_cell::AtomicCell;
10use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};
11
12use std::cell::RefCell;
13use std::collections::VecDeque;
14use std::fmt;
15use std::future::Future;
16use std::sync::atomic::Ordering::{AcqRel, Release};
17use std::task::Poll::{Pending, Ready};
18use std::task::Waker;
19use std::time::Duration;
20
21/// Executes tasks on the current thread
22pub(crate) struct CurrentThread {
23 /// Core scheduler data is acquired by a thread entering `block_on`.
24 core: AtomicCell<Core>,
25
26 /// Notifier for waking up other threads to steal the
27 /// driver.
28 notify: Notify,
29}
30
31/// Handle to the current thread scheduler
32pub(crate) struct Handle {
33 /// Scheduler state shared across threads
34 shared: Shared,
35
36 /// Resource driver handles
37 pub(crate) driver: driver::Handle,
38
39 /// Blocking pool spawner
40 pub(crate) blocking_spawner: blocking::Spawner,
41
42 /// Current random number generator seed
43 pub(crate) seed_generator: RngSeedGenerator,
44}
45
46/// Data required for executing the scheduler. The struct is passed around to
47/// a function that will perform the scheduling work and acts as a capability token.
48struct Core {
49 /// Scheduler run queue
50 tasks: VecDeque<Notified>,
51
52 /// Current tick
53 tick: u32,
54
55 /// Runtime driver
56 ///
57 /// The driver is removed before starting to park the thread
58 driver: Option<Driver>,
59
60 /// Metrics batch
61 metrics: MetricsBatch,
62
63 /// How often to check the global queue
64 global_queue_interval: u32,
65
66 /// True if a task panicked without being handled and the runtime is
67 /// configured to shutdown on unhandled panic.
68 unhandled_panic: bool,
69}
70
71/// Scheduler state shared between threads.
72struct Shared {
73 /// Remote run queue
74 inject: Inject<Arc<Handle>>,
75
76 /// Collection of all active tasks spawned onto this executor.
77 owned: OwnedTasks<Arc<Handle>>,
78
79 /// Indicates whether the blocked on thread was woken.
80 woken: AtomicBool,
81
82 /// Scheduler configuration options
83 config: Config,
84
85 /// Keeps track of various runtime metrics.
86 scheduler_metrics: SchedulerMetrics,
87
88 /// This scheduler only has one worker.
89 worker_metrics: WorkerMetrics,
90}
91
92/// Thread-local context.
93///
94/// pub(crate) to store in `runtime::context`.
95pub(crate) struct Context {
96 /// Scheduler handle
97 handle: Arc<Handle>,
98
99 /// Scheduler core, enabling the holder of `Context` to execute the
100 /// scheduler.
101 core: RefCell<Option<Box<Core>>>,
102
103 /// Deferred tasks, usually ones that called `task::yield_now()`.
104 pub(crate) defer: Defer,
105}
106
107type Notified = task::Notified<Arc<Handle>>;
108
109/// Initial queue capacity.
110const INITIAL_CAPACITY: usize = 64;
111
112/// Used if none is specified. This is a temporary constant and will be removed
113/// as we unify tuning logic between the multi-thread and current-thread
114/// schedulers.
115const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 31;
116
117impl CurrentThread {
118 pub(crate) fn new(
119 driver: Driver,
120 driver_handle: driver::Handle,
121 blocking_spawner: blocking::Spawner,
122 seed_generator: RngSeedGenerator,
123 config: Config,
124 ) -> (CurrentThread, Arc<Handle>) {
125 let worker_metrics = WorkerMetrics::from_config(&config);
126
127 // Get the configured global queue interval, or use the default.
128 let global_queue_interval = config
129 .global_queue_interval
130 .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL);
131
132 let handle = Arc::new(Handle {
133 shared: Shared {
134 inject: Inject::new(),
135 owned: OwnedTasks::new(1),
136 woken: AtomicBool::new(false),
137 config,
138 scheduler_metrics: SchedulerMetrics::new(),
139 worker_metrics,
140 },
141 driver: driver_handle,
142 blocking_spawner,
143 seed_generator,
144 });
145
146 let core = AtomicCell::new(Some(Box::new(Core {
147 tasks: VecDeque::with_capacity(INITIAL_CAPACITY),
148 tick: 0,
149 driver: Some(driver),
150 metrics: MetricsBatch::new(&handle.shared.worker_metrics),
151 global_queue_interval,
152 unhandled_panic: false,
153 })));
154
155 let scheduler = CurrentThread {
156 core,
157 notify: Notify::new(),
158 };
159
160 (scheduler, handle)
161 }
162
163 #[track_caller]
164 pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output {
165 pin!(future);
166
167 crate::runtime::context::enter_runtime(handle, false, |blocking| {
168 let handle = handle.as_current_thread();
169
170 // Attempt to steal the scheduler core and block_on the future if we can
171 // there, otherwise, lets select on a notification that the core is
172 // available or the future is complete.
173 loop {
174 if let Some(core) = self.take_core(handle) {
175 return core.block_on(future);
176 } else {
177 let notified = self.notify.notified();
178 pin!(notified);
179
180 if let Some(out) = blocking
181 .block_on(poll_fn(|cx| {
182 if notified.as_mut().poll(cx).is_ready() {
183 return Ready(None);
184 }
185
186 if let Ready(out) = future.as_mut().poll(cx) {
187 return Ready(Some(out));
188 }
189
190 Pending
191 }))
192 .expect("Failed to `Enter::block_on`")
193 {
194 return out;
195 }
196 }
197 }
198 })
199 }
200
201 fn take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>> {
202 let core = self.core.take()?;
203
204 Some(CoreGuard {
205 context: scheduler::Context::CurrentThread(Context {
206 handle: handle.clone(),
207 core: RefCell::new(Some(core)),
208 defer: Defer::new(),
209 }),
210 scheduler: self,
211 })
212 }
213
214 pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {
215 let handle = handle.as_current_thread();
216
217 // Avoid a double panic if we are currently panicking and
218 // the lock may be poisoned.
219
220 let core = match self.take_core(handle) {
221 Some(core) => core,
222 None if std::thread::panicking() => return,
223 None => panic!("Oh no! We never placed the Core back, this is a bug!"),
224 };
225
226 // Check that the thread-local is not being destroyed
227 let tls_available = context::with_current(|_| ()).is_ok();
228
229 if tls_available {
230 core.enter(|core, _context| {
231 let core = shutdown2(core, handle);
232 (core, ())
233 });
234 } else {
235 // Shutdown without setting the context. `tokio::spawn` calls will
236 // fail, but those will fail either way because the thread-local is
237 // not available anymore.
238 let context = core.context.expect_current_thread();
239 let core = context.core.borrow_mut().take().unwrap();
240
241 let core = shutdown2(core, handle);
242 *context.core.borrow_mut() = Some(core);
243 }
244 }
245}
246
247fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
248 // Drain the OwnedTasks collection. This call also closes the
249 // collection, ensuring that no tasks are ever pushed after this
250 // call returns.
251 handle.shared.owned.close_and_shutdown_all(0);
252
253 // Drain local queue
254 // We already shut down every task, so we just need to drop the task.
255 while let Some(task) = core.next_local_task(handle) {
256 drop(task);
257 }
258
259 // Close the injection queue
260 handle.shared.inject.close();
261
262 // Drain remote queue
263 while let Some(task) = handle.shared.inject.pop() {
264 drop(task);
265 }
266
267 assert!(handle.shared.owned.is_empty());
268
269 // Submit metrics
270 core.submit_metrics(handle);
271
272 // Shutdown the resource drivers
273 if let Some(driver) = core.driver.as_mut() {
274 driver.shutdown(&handle.driver);
275 }
276
277 core
278}
279
280impl fmt::Debug for CurrentThread {
281 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
282 fmt.debug_struct("CurrentThread").finish()
283 }
284}
285
286// ===== impl Core =====
287
288impl Core {
289 /// Get and increment the current tick
290 fn tick(&mut self) {
291 self.tick = self.tick.wrapping_add(1);
292 }
293
294 fn next_task(&mut self, handle: &Handle) -> Option<Notified> {
295 if self.tick % self.global_queue_interval == 0 {
296 handle
297 .next_remote_task()
298 .or_else(|| self.next_local_task(handle))
299 } else {
300 self.next_local_task(handle)
301 .or_else(|| handle.next_remote_task())
302 }
303 }
304
305 fn next_local_task(&mut self, handle: &Handle) -> Option<Notified> {
306 let ret = self.tasks.pop_front();
307 handle
308 .shared
309 .worker_metrics
310 .set_queue_depth(self.tasks.len());
311 ret
312 }
313
314 fn push_task(&mut self, handle: &Handle, task: Notified) {
315 self.tasks.push_back(task);
316 self.metrics.inc_local_schedule_count();
317 handle
318 .shared
319 .worker_metrics
320 .set_queue_depth(self.tasks.len());
321 }
322
323 fn submit_metrics(&mut self, handle: &Handle) {
324 self.metrics.submit(&handle.shared.worker_metrics, 0);
325 }
326}
327
328#[cfg(tokio_taskdump)]
329fn wake_deferred_tasks_and_free(context: &Context) {
330 let wakers = context.defer.take_deferred();
331 for waker in wakers {
332 waker.wake();
333 }
334}
335
336// ===== impl Context =====
337
338impl Context {
339 /// Execute the closure with the given scheduler core stored in the
340 /// thread-local context.
341 fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
342 core.metrics.start_poll();
343 let mut ret = self.enter(core, || crate::runtime::coop::budget(f));
344 ret.0.metrics.end_poll();
345 ret
346 }
347
348 /// Blocks the current thread until an event is received by the driver,
349 /// including I/O events, timer events, ...
350 fn park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
351 let mut driver = core.driver.take().expect("driver missing");
352
353 if let Some(f) = &handle.shared.config.before_park {
354 // Incorrect lint, the closures are actually different types so `f`
355 // cannot be passed as an argument to `enter`.
356 #[allow(clippy::redundant_closure)]
357 let (c, ()) = self.enter(core, || f());
358 core = c;
359 }
360
361 // This check will fail if `before_park` spawns a task for us to run
362 // instead of parking the thread
363 if core.tasks.is_empty() {
364 // Park until the thread is signaled
365 core.metrics.about_to_park();
366 core.submit_metrics(handle);
367
368 let (c, ()) = self.enter(core, || {
369 driver.park(&handle.driver);
370 self.defer.wake();
371 });
372
373 core = c;
374 }
375
376 if let Some(f) = &handle.shared.config.after_unpark {
377 // Incorrect lint, the closures are actually different types so `f`
378 // cannot be passed as an argument to `enter`.
379 #[allow(clippy::redundant_closure)]
380 let (c, ()) = self.enter(core, || f());
381 core = c;
382 }
383
384 core.driver = Some(driver);
385 core
386 }
387
388 /// Checks the driver for new events without blocking the thread.
389 fn park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
390 let mut driver = core.driver.take().expect("driver missing");
391
392 core.submit_metrics(handle);
393
394 let (mut core, ()) = self.enter(core, || {
395 driver.park_timeout(&handle.driver, Duration::from_millis(0));
396 self.defer.wake();
397 });
398
399 core.driver = Some(driver);
400 core
401 }
402
403 fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
404 // Store the scheduler core in the thread-local context
405 //
406 // A drop-guard is employed at a higher level.
407 *self.core.borrow_mut() = Some(core);
408
409 // Execute the closure while tracking the execution budget
410 let ret = f();
411
412 // Take the scheduler core back
413 let core = self.core.borrow_mut().take().expect("core missing");
414 (core, ret)
415 }
416
417 pub(crate) fn defer(&self, waker: &Waker) {
418 self.defer.defer(waker);
419 }
420}
421
422// ===== impl Handle =====
423
424impl Handle {
425 /// Spawns a future onto the `CurrentThread` scheduler
426 pub(crate) fn spawn<F>(
427 me: &Arc<Self>,
428 future: F,
429 id: crate::runtime::task::Id,
430 ) -> JoinHandle<F::Output>
431 where
432 F: crate::future::Future + Send + 'static,
433 F::Output: Send + 'static,
434 {
435 let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);
436
437 if let Some(notified) = notified {
438 me.schedule(notified);
439 }
440
441 handle
442 }
443
444 /// Capture a snapshot of this runtime's state.
445 #[cfg(all(
446 tokio_unstable,
447 tokio_taskdump,
448 target_os = "linux",
449 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
450 ))]
451 pub(crate) fn dump(&self) -> crate::runtime::Dump {
452 use crate::runtime::dump;
453 use task::trace::trace_current_thread;
454
455 let mut traces = vec![];
456
457 // todo: how to make this work outside of a runtime context?
458 context::with_scheduler(|maybe_context| {
459 // drain the local queue
460 let context = if let Some(context) = maybe_context {
461 context.expect_current_thread()
462 } else {
463 return;
464 };
465 let mut maybe_core = context.core.borrow_mut();
466 let core = if let Some(core) = maybe_core.as_mut() {
467 core
468 } else {
469 return;
470 };
471 let local = &mut core.tasks;
472
473 if self.shared.inject.is_closed() {
474 return;
475 }
476
477 traces = trace_current_thread(&self.shared.owned, local, &self.shared.inject)
478 .into_iter()
479 .map(dump::Task::new)
480 .collect();
481
482 // Avoid double borrow panic
483 drop(maybe_core);
484
485 // Taking a taskdump could wakes every task, but we probably don't want
486 // the `yield_now` vector to be that large under normal circumstances.
487 // Therefore, we free its allocation.
488 wake_deferred_tasks_and_free(context);
489 });
490
491 dump::Dump::new(traces)
492 }
493
494 fn next_remote_task(&self) -> Option<Notified> {
495 self.shared.inject.pop()
496 }
497
498 fn waker_ref(me: &Arc<Self>) -> WakerRef<'_> {
499 // Set woken to true when enter block_on, ensure outer future
500 // be polled for the first time when enter loop
501 me.shared.woken.store(true, Release);
502 waker_ref(me)
503 }
504
505 // reset woken to false and return original value
506 pub(crate) fn reset_woken(&self) -> bool {
507 self.shared.woken.swap(false, AcqRel)
508 }
509}
510
511cfg_metrics! {
512 impl Handle {
513 pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
514 &self.shared.scheduler_metrics
515 }
516
517 pub(crate) fn injection_queue_depth(&self) -> usize {
518 self.shared.inject.len()
519 }
520
521 pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
522 assert_eq!(0, worker);
523 &self.shared.worker_metrics
524 }
525
526 pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
527 self.worker_metrics(worker).queue_depth()
528 }
529
530 pub(crate) fn num_blocking_threads(&self) -> usize {
531 self.blocking_spawner.num_threads()
532 }
533
534 pub(crate) fn num_idle_blocking_threads(&self) -> usize {
535 self.blocking_spawner.num_idle_threads()
536 }
537
538 pub(crate) fn blocking_queue_depth(&self) -> usize {
539 self.blocking_spawner.queue_depth()
540 }
541
542 pub(crate) fn active_tasks_count(&self) -> usize {
543 self.shared.owned.active_tasks_count()
544 }
545 }
546}
547
548cfg_unstable! {
549 use std::num::NonZeroU64;
550
551 impl Handle {
552 pub(crate) fn owned_id(&self) -> NonZeroU64 {
553 self.shared.owned.id
554 }
555 }
556}
557
558impl fmt::Debug for Handle {
559 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
560 fmt.debug_struct("current_thread::Handle { ... }").finish()
561 }
562}
563
564// ===== impl Shared =====
565
566impl Schedule for Arc<Handle> {
567 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
568 self.shared.owned.remove(task)
569 }
570
571 fn schedule(&self, task: task::Notified<Self>) {
572 use scheduler::Context::CurrentThread;
573
574 context::with_scheduler(|maybe_cx| match maybe_cx {
575 Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
576 let mut core = cx.core.borrow_mut();
577
578 // If `None`, the runtime is shutting down, so there is no need
579 // to schedule the task.
580 if let Some(core) = core.as_mut() {
581 core.push_task(self, task);
582 }
583 }
584 _ => {
585 // Track that a task was scheduled from **outside** of the runtime.
586 self.shared.scheduler_metrics.inc_remote_schedule_count();
587
588 // Schedule the task
589 self.shared.inject.push(task);
590 self.driver.unpark();
591 }
592 });
593 }
594
595 cfg_unstable! {
596 fn unhandled_panic(&self) {
597 use crate::runtime::UnhandledPanic;
598
599 match self.shared.config.unhandled_panic {
600 UnhandledPanic::Ignore => {
601 // Do nothing
602 }
603 UnhandledPanic::ShutdownRuntime => {
604 use scheduler::Context::CurrentThread;
605
606 // This hook is only called from within the runtime, so
607 // `context::with_scheduler` should match with `&self`, i.e.
608 // there is no opportunity for a nested scheduler to be
609 // called.
610 context::with_scheduler(|maybe_cx| match maybe_cx {
611 Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
612 let mut core = cx.core.borrow_mut();
613
614 // If `None`, the runtime is shutting down, so there is no need to signal shutdown
615 if let Some(core) = core.as_mut() {
616 core.unhandled_panic = true;
617 self.shared.owned.close_and_shutdown_all(0);
618 }
619 }
620 _ => unreachable!("runtime core not set in CURRENT thread-local"),
621 })
622 }
623 }
624 }
625 }
626}
627
628impl Wake for Handle {
629 fn wake(arc_self: Arc<Self>) {
630 Wake::wake_by_ref(&arc_self);
631 }
632
633 /// Wake by reference
634 fn wake_by_ref(arc_self: &Arc<Self>) {
635 arc_self.shared.woken.store(true, Release);
636 arc_self.driver.unpark();
637 }
638}
639
640// ===== CoreGuard =====
641
642/// Used to ensure we always place the `Core` value back into its slot in
643/// `CurrentThread`, even if the future panics.
644struct CoreGuard<'a> {
645 context: scheduler::Context,
646 scheduler: &'a CurrentThread,
647}
648
649impl CoreGuard<'_> {
650 #[track_caller]
651 fn block_on<F: Future>(self, future: F) -> F::Output {
652 let ret = self.enter(|mut core, context| {
653 let waker = Handle::waker_ref(&context.handle);
654 let mut cx = std::task::Context::from_waker(&waker);
655
656 pin!(future);
657
658 core.metrics.start_processing_scheduled_tasks();
659
660 'outer: loop {
661 let handle = &context.handle;
662
663 if handle.reset_woken() {
664 let (c, res) = context.enter(core, || {
665 crate::runtime::coop::budget(|| future.as_mut().poll(&mut cx))
666 });
667
668 core = c;
669
670 if let Ready(v) = res {
671 return (core, Some(v));
672 }
673 }
674
675 for _ in 0..handle.shared.config.event_interval {
676 // Make sure we didn't hit an unhandled_panic
677 if core.unhandled_panic {
678 return (core, None);
679 }
680
681 core.tick();
682
683 let entry = core.next_task(handle);
684
685 let task = match entry {
686 Some(entry) => entry,
687 None => {
688 core.metrics.end_processing_scheduled_tasks();
689
690 core = if !context.defer.is_empty() {
691 context.park_yield(core, handle)
692 } else {
693 context.park(core, handle)
694 };
695
696 core.metrics.start_processing_scheduled_tasks();
697
698 // Try polling the `block_on` future next
699 continue 'outer;
700 }
701 };
702
703 let task = context.handle.shared.owned.assert_owner(task);
704
705 let (c, ()) = context.run_task(core, || {
706 task.run();
707 });
708
709 core = c;
710 }
711
712 core.metrics.end_processing_scheduled_tasks();
713
714 // Yield to the driver, this drives the timer and pulls any
715 // pending I/O events.
716 core = context.park_yield(core, handle);
717
718 core.metrics.start_processing_scheduled_tasks();
719 }
720 });
721
722 match ret {
723 Some(ret) => ret,
724 None => {
725 // `block_on` panicked.
726 panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic");
727 }
728 }
729 }
730
731 /// Enters the scheduler context. This sets the queue and other necessary
732 /// scheduler state in the thread-local.
733 fn enter<F, R>(self, f: F) -> R
734 where
735 F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R),
736 {
737 let context = self.context.expect_current_thread();
738
739 // Remove `core` from `context` to pass into the closure.
740 let core = context.core.borrow_mut().take().expect("core missing");
741
742 // Call the closure and place `core` back
743 let (core, ret) = context::set_scheduler(&self.context, || f(core, context));
744
745 *context.core.borrow_mut() = Some(core);
746
747 ret
748 }
749}
750
751impl Drop for CoreGuard<'_> {
752 fn drop(&mut self) {
753 let context = self.context.expect_current_thread();
754
755 if let Some(core) = context.core.borrow_mut().take() {
756 // Replace old scheduler back into the state to allow
757 // other threads to pick it up and drive it.
758 self.scheduler.core.set(core);
759
760 // Wake up other possible threads that could steal the driver.
761 self.scheduler.notify.notify_one();
762 }
763 }
764}
765