1use crate::future::poll_fn;
2use crate::loom::sync::atomic::AtomicBool;
3use crate::loom::sync::Arc;
4use crate::runtime::driver::{self, Driver};
5use crate::runtime::scheduler::{self, Defer, Inject};
6use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
7use crate::runtime::{blocking, context, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics};
8use crate::sync::notify::Notify;
9use crate::util::atomic_cell::AtomicCell;
10use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};
11
12use std::cell::RefCell;
13use std::collections::VecDeque;
14use std::fmt;
15use std::future::Future;
16use std::sync::atomic::Ordering::{AcqRel, Release};
17use std::task::Poll::{Pending, Ready};
18use std::task::Waker;
19use std::time::Duration;
20
21/// Executes tasks on the current thread
22pub(crate) struct CurrentThread {
23 /// Core scheduler data is acquired by a thread entering `block_on`.
24 core: AtomicCell<Core>,
25
26 /// Notifier for waking up other threads to steal the
27 /// driver.
28 notify: Notify,
29}
30
31/// Handle to the current thread scheduler
32pub(crate) struct Handle {
33 /// Scheduler state shared across threads
34 shared: Shared,
35
36 /// Resource driver handles
37 pub(crate) driver: driver::Handle,
38
39 /// Blocking pool spawner
40 pub(crate) blocking_spawner: blocking::Spawner,
41
42 /// Current random number generator seed
43 pub(crate) seed_generator: RngSeedGenerator,
44}
45
46/// Data required for executing the scheduler. The struct is passed around to
47/// a function that will perform the scheduling work and acts as a capability token.
48struct Core {
49 /// Scheduler run queue
50 tasks: VecDeque<Notified>,
51
52 /// Current tick
53 tick: u32,
54
55 /// Runtime driver
56 ///
57 /// The driver is removed before starting to park the thread
58 driver: Option<Driver>,
59
60 /// Metrics batch
61 metrics: MetricsBatch,
62
63 /// How often to check the global queue
64 global_queue_interval: u32,
65
66 /// True if a task panicked without being handled and the runtime is
67 /// configured to shutdown on unhandled panic.
68 unhandled_panic: bool,
69}
70
71/// Scheduler state shared between threads.
72struct Shared {
73 /// Remote run queue
74 inject: Inject<Arc<Handle>>,
75
76 /// Collection of all active tasks spawned onto this executor.
77 owned: OwnedTasks<Arc<Handle>>,
78
79 /// Indicates whether the blocked on thread was woken.
80 woken: AtomicBool,
81
82 /// Scheduler configuration options
83 config: Config,
84
85 /// Keeps track of various runtime metrics.
86 scheduler_metrics: SchedulerMetrics,
87
88 /// This scheduler only has one worker.
89 worker_metrics: WorkerMetrics,
90}
91
92/// Thread-local context.
93///
94/// pub(crate) to store in `runtime::context`.
95pub(crate) struct Context {
96 /// Scheduler handle
97 handle: Arc<Handle>,
98
99 /// Scheduler core, enabling the holder of `Context` to execute the
100 /// scheduler.
101 core: RefCell<Option<Box<Core>>>,
102
103 /// Deferred tasks, usually ones that called `task::yield_now()`.
104 pub(crate) defer: Defer,
105}
106
107type Notified = task::Notified<Arc<Handle>>;
108
109/// Initial queue capacity.
110const INITIAL_CAPACITY: usize = 64;
111
112/// Used if none is specified. This is a temporary constant and will be removed
113/// as we unify tuning logic between the multi-thread and current-thread
114/// schedulers.
115const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 31;
116
117impl CurrentThread {
118 pub(crate) fn new(
119 driver: Driver,
120 driver_handle: driver::Handle,
121 blocking_spawner: blocking::Spawner,
122 seed_generator: RngSeedGenerator,
123 config: Config,
124 ) -> (CurrentThread, Arc<Handle>) {
125 let worker_metrics = WorkerMetrics::from_config(&config);
126
127 // Get the configured global queue interval, or use the default.
128 let global_queue_interval = config
129 .global_queue_interval
130 .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL);
131
132 let handle = Arc::new(Handle {
133 shared: Shared {
134 inject: Inject::new(),
135 owned: OwnedTasks::new(),
136 woken: AtomicBool::new(false),
137 config,
138 scheduler_metrics: SchedulerMetrics::new(),
139 worker_metrics,
140 },
141 driver: driver_handle,
142 blocking_spawner,
143 seed_generator,
144 });
145
146 let core = AtomicCell::new(Some(Box::new(Core {
147 tasks: VecDeque::with_capacity(INITIAL_CAPACITY),
148 tick: 0,
149 driver: Some(driver),
150 metrics: MetricsBatch::new(&handle.shared.worker_metrics),
151 global_queue_interval,
152 unhandled_panic: false,
153 })));
154
155 let scheduler = CurrentThread {
156 core,
157 notify: Notify::new(),
158 };
159
160 (scheduler, handle)
161 }
162
163 #[track_caller]
164 pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output {
165 pin!(future);
166
167 crate::runtime::context::enter_runtime(handle, false, |blocking| {
168 let handle = handle.as_current_thread();
169
170 // Attempt to steal the scheduler core and block_on the future if we can
171 // there, otherwise, lets select on a notification that the core is
172 // available or the future is complete.
173 loop {
174 if let Some(core) = self.take_core(handle) {
175 return core.block_on(future);
176 } else {
177 let notified = self.notify.notified();
178 pin!(notified);
179
180 if let Some(out) = blocking
181 .block_on(poll_fn(|cx| {
182 if notified.as_mut().poll(cx).is_ready() {
183 return Ready(None);
184 }
185
186 if let Ready(out) = future.as_mut().poll(cx) {
187 return Ready(Some(out));
188 }
189
190 Pending
191 }))
192 .expect("Failed to `Enter::block_on`")
193 {
194 return out;
195 }
196 }
197 }
198 })
199 }
200
201 fn take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>> {
202 let core = self.core.take()?;
203
204 Some(CoreGuard {
205 context: scheduler::Context::CurrentThread(Context {
206 handle: handle.clone(),
207 core: RefCell::new(Some(core)),
208 defer: Defer::new(),
209 }),
210 scheduler: self,
211 })
212 }
213
214 pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {
215 let handle = handle.as_current_thread();
216
217 // Avoid a double panic if we are currently panicking and
218 // the lock may be poisoned.
219
220 let core = match self.take_core(handle) {
221 Some(core) => core,
222 None if std::thread::panicking() => return,
223 None => panic!("Oh no! We never placed the Core back, this is a bug!"),
224 };
225
226 // Check that the thread-local is not being destroyed
227 let tls_available = context::with_current(|_| ()).is_ok();
228
229 if tls_available {
230 core.enter(|core, _context| {
231 let core = shutdown2(core, handle);
232 (core, ())
233 });
234 } else {
235 // Shutdown without setting the context. `tokio::spawn` calls will
236 // fail, but those will fail either way because the thread-local is
237 // not available anymore.
238 let context = core.context.expect_current_thread();
239 let core = context.core.borrow_mut().take().unwrap();
240
241 let core = shutdown2(core, handle);
242 *context.core.borrow_mut() = Some(core);
243 }
244 }
245}
246
247fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
248 // Drain the OwnedTasks collection. This call also closes the
249 // collection, ensuring that no tasks are ever pushed after this
250 // call returns.
251 handle.shared.owned.close_and_shutdown_all();
252
253 // Drain local queue
254 // We already shut down every task, so we just need to drop the task.
255 while let Some(task) = core.next_local_task(handle) {
256 drop(task);
257 }
258
259 // Close the injection queue
260 handle.shared.inject.close();
261
262 // Drain remote queue
263 while let Some(task) = handle.shared.inject.pop() {
264 drop(task);
265 }
266
267 assert!(handle.shared.owned.is_empty());
268
269 // Submit metrics
270 core.submit_metrics(handle);
271
272 // Shutdown the resource drivers
273 if let Some(driver) = core.driver.as_mut() {
274 driver.shutdown(&handle.driver);
275 }
276
277 core
278}
279
280impl fmt::Debug for CurrentThread {
281 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
282 fmt.debug_struct(name:"CurrentThread").finish()
283 }
284}
285
286// ===== impl Core =====
287
288impl Core {
289 /// Get and increment the current tick
290 fn tick(&mut self) {
291 self.tick = self.tick.wrapping_add(1);
292 }
293
294 fn next_task(&mut self, handle: &Handle) -> Option<Notified> {
295 if self.tick % self.global_queue_interval == 0 {
296 handle
297 .next_remote_task()
298 .or_else(|| self.next_local_task(handle))
299 } else {
300 self.next_local_task(handle)
301 .or_else(|| handle.next_remote_task())
302 }
303 }
304
305 fn next_local_task(&mut self, handle: &Handle) -> Option<Notified> {
306 let ret = self.tasks.pop_front();
307 handle
308 .shared
309 .worker_metrics
310 .set_queue_depth(self.tasks.len());
311 ret
312 }
313
314 fn push_task(&mut self, handle: &Handle, task: Notified) {
315 self.tasks.push_back(task);
316 self.metrics.inc_local_schedule_count();
317 handle
318 .shared
319 .worker_metrics
320 .set_queue_depth(self.tasks.len());
321 }
322
323 fn submit_metrics(&mut self, handle: &Handle) {
324 self.metrics.submit(&handle.shared.worker_metrics);
325 }
326}
327
328#[cfg(tokio_taskdump)]
329fn wake_deferred_tasks_and_free(context: &Context) {
330 let wakers = context.defer.take_deferred();
331 for waker in wakers {
332 waker.wake();
333 }
334}
335
336// ===== impl Context =====
337
338impl Context {
339 /// Execute the closure with the given scheduler core stored in the
340 /// thread-local context.
341 fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
342 core.metrics.start_poll();
343 let mut ret = self.enter(core, || crate::runtime::coop::budget(f));
344 ret.0.metrics.end_poll();
345 ret
346 }
347
348 /// Blocks the current thread until an event is received by the driver,
349 /// including I/O events, timer events, ...
350 fn park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
351 let mut driver = core.driver.take().expect("driver missing");
352
353 if let Some(f) = &handle.shared.config.before_park {
354 // Incorrect lint, the closures are actually different types so `f`
355 // cannot be passed as an argument to `enter`.
356 #[allow(clippy::redundant_closure)]
357 let (c, _) = self.enter(core, || f());
358 core = c;
359 }
360
361 // This check will fail if `before_park` spawns a task for us to run
362 // instead of parking the thread
363 if core.tasks.is_empty() {
364 // Park until the thread is signaled
365 core.metrics.about_to_park();
366 core.submit_metrics(handle);
367
368 let (c, _) = self.enter(core, || {
369 driver.park(&handle.driver);
370 self.defer.wake();
371 });
372
373 core = c;
374 }
375
376 if let Some(f) = &handle.shared.config.after_unpark {
377 // Incorrect lint, the closures are actually different types so `f`
378 // cannot be passed as an argument to `enter`.
379 #[allow(clippy::redundant_closure)]
380 let (c, _) = self.enter(core, || f());
381 core = c;
382 }
383
384 core.driver = Some(driver);
385 core
386 }
387
388 /// Checks the driver for new events without blocking the thread.
389 fn park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
390 let mut driver = core.driver.take().expect("driver missing");
391
392 core.submit_metrics(handle);
393
394 let (mut core, _) = self.enter(core, || {
395 driver.park_timeout(&handle.driver, Duration::from_millis(0));
396 self.defer.wake();
397 });
398
399 core.driver = Some(driver);
400 core
401 }
402
403 fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
404 // Store the scheduler core in the thread-local context
405 //
406 // A drop-guard is employed at a higher level.
407 *self.core.borrow_mut() = Some(core);
408
409 // Execute the closure while tracking the execution budget
410 let ret = f();
411
412 // Take the scheduler core back
413 let core = self.core.borrow_mut().take().expect("core missing");
414 (core, ret)
415 }
416
417 pub(crate) fn defer(&self, waker: &Waker) {
418 self.defer.defer(waker);
419 }
420}
421
422// ===== impl Handle =====
423
424impl Handle {
425 /// Spawns a future onto the `CurrentThread` scheduler
426 pub(crate) fn spawn<F>(
427 me: &Arc<Self>,
428 future: F,
429 id: crate::runtime::task::Id,
430 ) -> JoinHandle<F::Output>
431 where
432 F: crate::future::Future + Send + 'static,
433 F::Output: Send + 'static,
434 {
435 let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);
436
437 if let Some(notified) = notified {
438 me.schedule(notified);
439 }
440
441 handle
442 }
443
444 /// Capture a snapshot of this runtime's state.
445 #[cfg(all(
446 tokio_unstable,
447 tokio_taskdump,
448 target_os = "linux",
449 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
450 ))]
451 pub(crate) fn dump(&self) -> crate::runtime::Dump {
452 use crate::runtime::dump;
453 use task::trace::trace_current_thread;
454
455 let mut traces = vec![];
456
457 // todo: how to make this work outside of a runtime context?
458 context::with_scheduler(|maybe_context| {
459 // drain the local queue
460 let context = if let Some(context) = maybe_context {
461 context.expect_current_thread()
462 } else {
463 return;
464 };
465 let mut maybe_core = context.core.borrow_mut();
466 let core = if let Some(core) = maybe_core.as_mut() {
467 core
468 } else {
469 return;
470 };
471 let local = &mut core.tasks;
472
473 if self.shared.inject.is_closed() {
474 return;
475 }
476
477 traces = trace_current_thread(&self.shared.owned, local, &self.shared.inject)
478 .into_iter()
479 .map(dump::Task::new)
480 .collect();
481
482 // Avoid double borrow panic
483 drop(maybe_core);
484
485 // Taking a taskdump could wakes every task, but we probably don't want
486 // the `yield_now` vector to be that large under normal circumstances.
487 // Therefore, we free its allocation.
488 wake_deferred_tasks_and_free(context);
489 });
490
491 dump::Dump::new(traces)
492 }
493
494 fn next_remote_task(&self) -> Option<Notified> {
495 self.shared.inject.pop()
496 }
497
498 fn waker_ref(me: &Arc<Self>) -> WakerRef<'_> {
499 // Set woken to true when enter block_on, ensure outer future
500 // be polled for the first time when enter loop
501 me.shared.woken.store(true, Release);
502 waker_ref(me)
503 }
504
505 // reset woken to false and return original value
506 pub(crate) fn reset_woken(&self) -> bool {
507 self.shared.woken.swap(false, AcqRel)
508 }
509}
510
511cfg_metrics! {
512 impl Handle {
513 pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
514 &self.shared.scheduler_metrics
515 }
516
517 pub(crate) fn injection_queue_depth(&self) -> usize {
518 self.shared.inject.len()
519 }
520
521 pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
522 assert_eq!(0, worker);
523 &self.shared.worker_metrics
524 }
525
526 pub(crate) fn num_blocking_threads(&self) -> usize {
527 self.blocking_spawner.num_threads()
528 }
529
530 pub(crate) fn num_idle_blocking_threads(&self) -> usize {
531 self.blocking_spawner.num_idle_threads()
532 }
533
534 pub(crate) fn blocking_queue_depth(&self) -> usize {
535 self.blocking_spawner.queue_depth()
536 }
537
538 pub(crate) fn active_tasks_count(&self) -> usize {
539 self.shared.owned.active_tasks_count()
540 }
541 }
542}
543
544impl fmt::Debug for Handle {
545 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
546 fmt.debug_struct(name:"current_thread::Handle { ... }").finish()
547 }
548}
549
550// ===== impl Shared =====
551
552impl Schedule for Arc<Handle> {
553 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
554 self.shared.owned.remove(task)
555 }
556
557 fn schedule(&self, task: task::Notified<Self>) {
558 use scheduler::Context::CurrentThread;
559
560 context::with_scheduler(|maybe_cx| match maybe_cx {
561 Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
562 let mut core = cx.core.borrow_mut();
563
564 // If `None`, the runtime is shutting down, so there is no need
565 // to schedule the task.
566 if let Some(core) = core.as_mut() {
567 core.push_task(self, task);
568 }
569 }
570 _ => {
571 // Track that a task was scheduled from **outside** of the runtime.
572 self.shared.scheduler_metrics.inc_remote_schedule_count();
573
574 // Schedule the task
575 self.shared.inject.push(task);
576 self.driver.unpark();
577 }
578 });
579 }
580
581 cfg_unstable! {
582 fn unhandled_panic(&self) {
583 use crate::runtime::UnhandledPanic;
584
585 match self.shared.config.unhandled_panic {
586 UnhandledPanic::Ignore => {
587 // Do nothing
588 }
589 UnhandledPanic::ShutdownRuntime => {
590 use scheduler::Context::CurrentThread;
591
592 // This hook is only called from within the runtime, so
593 // `context::with_scheduler` should match with `&self`, i.e.
594 // there is no opportunity for a nested scheduler to be
595 // called.
596 context::with_scheduler(|maybe_cx| match maybe_cx {
597 Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
598 let mut core = cx.core.borrow_mut();
599
600 // If `None`, the runtime is shutting down, so there is no need to signal shutdown
601 if let Some(core) = core.as_mut() {
602 core.unhandled_panic = true;
603 self.shared.owned.close_and_shutdown_all();
604 }
605 }
606 _ => unreachable!("runtime core not set in CURRENT thread-local"),
607 })
608 }
609 }
610 }
611 }
612}
613
614impl Wake for Handle {
615 fn wake(arc_self: Arc<Self>) {
616 Wake::wake_by_ref(&arc_self)
617 }
618
619 /// Wake by reference
620 fn wake_by_ref(arc_self: &Arc<Self>) {
621 arc_self.shared.woken.store(val:true, order:Release);
622 arc_self.driver.unpark();
623 }
624}
625
626// ===== CoreGuard =====
627
628/// Used to ensure we always place the `Core` value back into its slot in
629/// `CurrentThread`, even if the future panics.
630struct CoreGuard<'a> {
631 context: scheduler::Context,
632 scheduler: &'a CurrentThread,
633}
634
635impl CoreGuard<'_> {
636 #[track_caller]
637 fn block_on<F: Future>(self, future: F) -> F::Output {
638 let ret = self.enter(|mut core, context| {
639 let waker = Handle::waker_ref(&context.handle);
640 let mut cx = std::task::Context::from_waker(&waker);
641
642 pin!(future);
643
644 core.metrics.start_processing_scheduled_tasks();
645
646 'outer: loop {
647 let handle = &context.handle;
648
649 if handle.reset_woken() {
650 let (c, res) = context.enter(core, || {
651 crate::runtime::coop::budget(|| future.as_mut().poll(&mut cx))
652 });
653
654 core = c;
655
656 if let Ready(v) = res {
657 return (core, Some(v));
658 }
659 }
660
661 for _ in 0..handle.shared.config.event_interval {
662 // Make sure we didn't hit an unhandled_panic
663 if core.unhandled_panic {
664 return (core, None);
665 }
666
667 core.tick();
668
669 let entry = core.next_task(handle);
670
671 let task = match entry {
672 Some(entry) => entry,
673 None => {
674 core.metrics.end_processing_scheduled_tasks();
675
676 core = if !context.defer.is_empty() {
677 context.park_yield(core, handle)
678 } else {
679 context.park(core, handle)
680 };
681
682 core.metrics.start_processing_scheduled_tasks();
683
684 // Try polling the `block_on` future next
685 continue 'outer;
686 }
687 };
688
689 let task = context.handle.shared.owned.assert_owner(task);
690
691 let (c, _) = context.run_task(core, || {
692 task.run();
693 });
694
695 core = c;
696 }
697
698 core.metrics.end_processing_scheduled_tasks();
699
700 // Yield to the driver, this drives the timer and pulls any
701 // pending I/O events.
702 core = context.park_yield(core, handle);
703
704 core.metrics.start_processing_scheduled_tasks();
705 }
706 });
707
708 match ret {
709 Some(ret) => ret,
710 None => {
711 // `block_on` panicked.
712 panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic");
713 }
714 }
715 }
716
717 /// Enters the scheduler context. This sets the queue and other necessary
718 /// scheduler state in the thread-local.
719 fn enter<F, R>(self, f: F) -> R
720 where
721 F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R),
722 {
723 let context = self.context.expect_current_thread();
724
725 // Remove `core` from `context` to pass into the closure.
726 let core = context.core.borrow_mut().take().expect("core missing");
727
728 // Call the closure and place `core` back
729 let (core, ret) = context::set_scheduler(&self.context, || f(core, context));
730
731 *context.core.borrow_mut() = Some(core);
732
733 ret
734 }
735}
736
737impl Drop for CoreGuard<'_> {
738 fn drop(&mut self) {
739 let context: &Context = self.context.expect_current_thread();
740
741 if let Some(core: Box) = context.core.borrow_mut().take() {
742 // Replace old scheduler back into the state to allow
743 // other threads to pick it up and drive it.
744 self.scheduler.core.set(val:core);
745
746 // Wake up other possible threads that could steal the driver.
747 self.scheduler.notify.notify_one()
748 }
749 }
750}
751