1use crate::future::poll_fn;
2use crate::loom::sync::atomic::AtomicBool;
3use crate::loom::sync::Arc;
4use crate::runtime::driver::{self, Driver};
5use crate::runtime::scheduler::{self, Defer, Inject};
6use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
7use crate::runtime::{blocking, context, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics};
8use crate::sync::notify::Notify;
9use crate::util::atomic_cell::AtomicCell;
10use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};
11
12use std::cell::RefCell;
13use std::collections::VecDeque;
14use std::fmt;
15use std::future::Future;
16use std::sync::atomic::Ordering::{AcqRel, Release};
17use std::task::Poll::{Pending, Ready};
18use std::task::Waker;
19use std::time::Duration;
20
21/// Executes tasks on the current thread
22pub(crate) struct CurrentThread {
23 /// Core scheduler data is acquired by a thread entering `block_on`.
24 core: AtomicCell<Core>,
25
26 /// Notifier for waking up other threads to steal the
27 /// driver.
28 notify: Notify,
29}
30
31/// Handle to the current thread scheduler
32pub(crate) struct Handle {
33 /// Scheduler state shared across threads
34 shared: Shared,
35
36 /// Resource driver handles
37 pub(crate) driver: driver::Handle,
38
39 /// Blocking pool spawner
40 pub(crate) blocking_spawner: blocking::Spawner,
41
42 /// Current random number generator seed
43 pub(crate) seed_generator: RngSeedGenerator,
44}
45
46/// Data required for executing the scheduler. The struct is passed around to
47/// a function that will perform the scheduling work and acts as a capability token.
48struct Core {
49 /// Scheduler run queue
50 tasks: VecDeque<Notified>,
51
52 /// Current tick
53 tick: u32,
54
55 /// Runtime driver
56 ///
57 /// The driver is removed before starting to park the thread
58 driver: Option<Driver>,
59
60 /// Metrics batch
61 metrics: MetricsBatch,
62
63 /// How often to check the global queue
64 global_queue_interval: u32,
65
66 /// True if a task panicked without being handled and the runtime is
67 /// configured to shutdown on unhandled panic.
68 unhandled_panic: bool,
69}
70
71/// Scheduler state shared between threads.
72struct Shared {
73 /// Remote run queue
74 inject: Inject<Arc<Handle>>,
75
76 /// Collection of all active tasks spawned onto this executor.
77 owned: OwnedTasks<Arc<Handle>>,
78
79 /// Indicates whether the blocked on thread was woken.
80 woken: AtomicBool,
81
82 /// Scheduler configuration options
83 config: Config,
84
85 /// Keeps track of various runtime metrics.
86 scheduler_metrics: SchedulerMetrics,
87
88 /// This scheduler only has one worker.
89 worker_metrics: WorkerMetrics,
90}
91
92/// Thread-local context.
93///
94/// pub(crate) to store in `runtime::context`.
95pub(crate) struct Context {
96 /// Scheduler handle
97 handle: Arc<Handle>,
98
99 /// Scheduler core, enabling the holder of `Context` to execute the
100 /// scheduler.
101 core: RefCell<Option<Box<Core>>>,
102
103 /// Deferred tasks, usually ones that called `task::yield_now()`.
104 pub(crate) defer: Defer,
105}
106
107type Notified = task::Notified<Arc<Handle>>;
108
109/// Initial queue capacity.
110const INITIAL_CAPACITY: usize = 64;
111
112/// Used if none is specified. This is a temporary constant and will be removed
113/// as we unify tuning logic between the multi-thread and current-thread
114/// schedulers.
115const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 31;
116
117impl CurrentThread {
118 pub(crate) fn new(
119 driver: Driver,
120 driver_handle: driver::Handle,
121 blocking_spawner: blocking::Spawner,
122 seed_generator: RngSeedGenerator,
123 config: Config,
124 ) -> (CurrentThread, Arc<Handle>) {
125 let worker_metrics = WorkerMetrics::from_config(&config);
126
127 // Get the configured global queue interval, or use the default.
128 let global_queue_interval = config
129 .global_queue_interval
130 .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL);
131
132 let handle = Arc::new(Handle {
133 shared: Shared {
134 inject: Inject::new(),
135 owned: OwnedTasks::new(1),
136 woken: AtomicBool::new(false),
137 config,
138 scheduler_metrics: SchedulerMetrics::new(),
139 worker_metrics,
140 },
141 driver: driver_handle,
142 blocking_spawner,
143 seed_generator,
144 });
145
146 let core = AtomicCell::new(Some(Box::new(Core {
147 tasks: VecDeque::with_capacity(INITIAL_CAPACITY),
148 tick: 0,
149 driver: Some(driver),
150 metrics: MetricsBatch::new(&handle.shared.worker_metrics),
151 global_queue_interval,
152 unhandled_panic: false,
153 })));
154
155 let scheduler = CurrentThread {
156 core,
157 notify: Notify::new(),
158 };
159
160 (scheduler, handle)
161 }
162
163 #[track_caller]
164 pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output {
165 pin!(future);
166
167 crate::runtime::context::enter_runtime(handle, false, |blocking| {
168 let handle = handle.as_current_thread();
169
170 // Attempt to steal the scheduler core and block_on the future if we can
171 // there, otherwise, lets select on a notification that the core is
172 // available or the future is complete.
173 loop {
174 if let Some(core) = self.take_core(handle) {
175 return core.block_on(future);
176 } else {
177 let notified = self.notify.notified();
178 pin!(notified);
179
180 if let Some(out) = blocking
181 .block_on(poll_fn(|cx| {
182 if notified.as_mut().poll(cx).is_ready() {
183 return Ready(None);
184 }
185
186 if let Ready(out) = future.as_mut().poll(cx) {
187 return Ready(Some(out));
188 }
189
190 Pending
191 }))
192 .expect("Failed to `Enter::block_on`")
193 {
194 return out;
195 }
196 }
197 }
198 })
199 }
200
201 fn take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>> {
202 let core = self.core.take()?;
203
204 Some(CoreGuard {
205 context: scheduler::Context::CurrentThread(Context {
206 handle: handle.clone(),
207 core: RefCell::new(Some(core)),
208 defer: Defer::new(),
209 }),
210 scheduler: self,
211 })
212 }
213
214 pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {
215 let handle = handle.as_current_thread();
216
217 // Avoid a double panic if we are currently panicking and
218 // the lock may be poisoned.
219
220 let core = match self.take_core(handle) {
221 Some(core) => core,
222 None if std::thread::panicking() => return,
223 None => panic!("Oh no! We never placed the Core back, this is a bug!"),
224 };
225
226 // Check that the thread-local is not being destroyed
227 let tls_available = context::with_current(|_| ()).is_ok();
228
229 if tls_available {
230 core.enter(|core, _context| {
231 let core = shutdown2(core, handle);
232 (core, ())
233 });
234 } else {
235 // Shutdown without setting the context. `tokio::spawn` calls will
236 // fail, but those will fail either way because the thread-local is
237 // not available anymore.
238 let context = core.context.expect_current_thread();
239 let core = context.core.borrow_mut().take().unwrap();
240
241 let core = shutdown2(core, handle);
242 *context.core.borrow_mut() = Some(core);
243 }
244 }
245}
246
247fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
248 // Drain the OwnedTasks collection. This call also closes the
249 // collection, ensuring that no tasks are ever pushed after this
250 // call returns.
251 handle.shared.owned.close_and_shutdown_all(0);
252
253 // Drain local queue
254 // We already shut down every task, so we just need to drop the task.
255 while let Some(task) = core.next_local_task(handle) {
256 drop(task);
257 }
258
259 // Close the injection queue
260 handle.shared.inject.close();
261
262 // Drain remote queue
263 while let Some(task) = handle.shared.inject.pop() {
264 drop(task);
265 }
266
267 assert!(handle.shared.owned.is_empty());
268
269 // Submit metrics
270 core.submit_metrics(handle);
271
272 // Shutdown the resource drivers
273 if let Some(driver) = core.driver.as_mut() {
274 driver.shutdown(&handle.driver);
275 }
276
277 core
278}
279
280impl fmt::Debug for CurrentThread {
281 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
282 fmt.debug_struct(name:"CurrentThread").finish()
283 }
284}
285
286// ===== impl Core =====
287
288impl Core {
289 /// Get and increment the current tick
290 fn tick(&mut self) {
291 self.tick = self.tick.wrapping_add(1);
292 }
293
294 fn next_task(&mut self, handle: &Handle) -> Option<Notified> {
295 if self.tick % self.global_queue_interval == 0 {
296 handle
297 .next_remote_task()
298 .or_else(|| self.next_local_task(handle))
299 } else {
300 self.next_local_task(handle)
301 .or_else(|| handle.next_remote_task())
302 }
303 }
304
305 fn next_local_task(&mut self, handle: &Handle) -> Option<Notified> {
306 let ret = self.tasks.pop_front();
307 handle
308 .shared
309 .worker_metrics
310 .set_queue_depth(self.tasks.len());
311 ret
312 }
313
314 fn push_task(&mut self, handle: &Handle, task: Notified) {
315 self.tasks.push_back(task);
316 self.metrics.inc_local_schedule_count();
317 handle
318 .shared
319 .worker_metrics
320 .set_queue_depth(self.tasks.len());
321 }
322
323 fn submit_metrics(&mut self, handle: &Handle) {
324 self.metrics.submit(&handle.shared.worker_metrics, 0);
325 }
326}
327
328#[cfg(tokio_taskdump)]
329fn wake_deferred_tasks_and_free(context: &Context) {
330 let wakers = context.defer.take_deferred();
331 for waker in wakers {
332 waker.wake();
333 }
334}
335
336// ===== impl Context =====
337
338impl Context {
339 /// Execute the closure with the given scheduler core stored in the
340 /// thread-local context.
341 fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
342 core.metrics.start_poll();
343 let mut ret = self.enter(core, || crate::runtime::coop::budget(f));
344 ret.0.metrics.end_poll();
345 ret
346 }
347
348 /// Blocks the current thread until an event is received by the driver,
349 /// including I/O events, timer events, ...
350 fn park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
351 let mut driver = core.driver.take().expect("driver missing");
352
353 if let Some(f) = &handle.shared.config.before_park {
354 let (c, ()) = self.enter(core, || f());
355 core = c;
356 }
357
358 // This check will fail if `before_park` spawns a task for us to run
359 // instead of parking the thread
360 if core.tasks.is_empty() {
361 // Park until the thread is signaled
362 core.metrics.about_to_park();
363 core.submit_metrics(handle);
364
365 let (c, ()) = self.enter(core, || {
366 driver.park(&handle.driver);
367 self.defer.wake();
368 });
369
370 core = c;
371 }
372
373 if let Some(f) = &handle.shared.config.after_unpark {
374 let (c, ()) = self.enter(core, || f());
375 core = c;
376 }
377
378 core.driver = Some(driver);
379 core
380 }
381
382 /// Checks the driver for new events without blocking the thread.
383 fn park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
384 let mut driver = core.driver.take().expect("driver missing");
385
386 core.submit_metrics(handle);
387
388 let (mut core, ()) = self.enter(core, || {
389 driver.park_timeout(&handle.driver, Duration::from_millis(0));
390 self.defer.wake();
391 });
392
393 core.driver = Some(driver);
394 core
395 }
396
397 fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
398 // Store the scheduler core in the thread-local context
399 //
400 // A drop-guard is employed at a higher level.
401 *self.core.borrow_mut() = Some(core);
402
403 // Execute the closure while tracking the execution budget
404 let ret = f();
405
406 // Take the scheduler core back
407 let core = self.core.borrow_mut().take().expect("core missing");
408 (core, ret)
409 }
410
411 pub(crate) fn defer(&self, waker: &Waker) {
412 self.defer.defer(waker);
413 }
414}
415
416// ===== impl Handle =====
417
418impl Handle {
419 /// Spawns a future onto the `CurrentThread` scheduler
420 pub(crate) fn spawn<F>(
421 me: &Arc<Self>,
422 future: F,
423 id: crate::runtime::task::Id,
424 ) -> JoinHandle<F::Output>
425 where
426 F: crate::future::Future + Send + 'static,
427 F::Output: Send + 'static,
428 {
429 let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);
430
431 if let Some(notified) = notified {
432 me.schedule(notified);
433 }
434
435 handle
436 }
437
438 /// Capture a snapshot of this runtime's state.
439 #[cfg(all(
440 tokio_unstable,
441 tokio_taskdump,
442 target_os = "linux",
443 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
444 ))]
445 pub(crate) fn dump(&self) -> crate::runtime::Dump {
446 use crate::runtime::dump;
447 use task::trace::trace_current_thread;
448
449 let mut traces = vec![];
450
451 // todo: how to make this work outside of a runtime context?
452 context::with_scheduler(|maybe_context| {
453 // drain the local queue
454 let context = if let Some(context) = maybe_context {
455 context.expect_current_thread()
456 } else {
457 return;
458 };
459 let mut maybe_core = context.core.borrow_mut();
460 let core = if let Some(core) = maybe_core.as_mut() {
461 core
462 } else {
463 return;
464 };
465 let local = &mut core.tasks;
466
467 if self.shared.inject.is_closed() {
468 return;
469 }
470
471 traces = trace_current_thread(&self.shared.owned, local, &self.shared.inject)
472 .into_iter()
473 .map(dump::Task::new)
474 .collect();
475
476 // Avoid double borrow panic
477 drop(maybe_core);
478
479 // Taking a taskdump could wakes every task, but we probably don't want
480 // the `yield_now` vector to be that large under normal circumstances.
481 // Therefore, we free its allocation.
482 wake_deferred_tasks_and_free(context);
483 });
484
485 dump::Dump::new(traces)
486 }
487
488 fn next_remote_task(&self) -> Option<Notified> {
489 self.shared.inject.pop()
490 }
491
492 fn waker_ref(me: &Arc<Self>) -> WakerRef<'_> {
493 // Set woken to true when enter block_on, ensure outer future
494 // be polled for the first time when enter loop
495 me.shared.woken.store(true, Release);
496 waker_ref(me)
497 }
498
499 // reset woken to false and return original value
500 pub(crate) fn reset_woken(&self) -> bool {
501 self.shared.woken.swap(false, AcqRel)
502 }
503}
504
505cfg_metrics! {
506 impl Handle {
507 pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
508 &self.shared.scheduler_metrics
509 }
510
511 pub(crate) fn injection_queue_depth(&self) -> usize {
512 self.shared.inject.len()
513 }
514
515 pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
516 assert_eq!(0, worker);
517 &self.shared.worker_metrics
518 }
519
520 pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
521 self.worker_metrics(worker).queue_depth()
522 }
523
524 pub(crate) fn num_blocking_threads(&self) -> usize {
525 self.blocking_spawner.num_threads()
526 }
527
528 pub(crate) fn num_idle_blocking_threads(&self) -> usize {
529 self.blocking_spawner.num_idle_threads()
530 }
531
532 pub(crate) fn blocking_queue_depth(&self) -> usize {
533 self.blocking_spawner.queue_depth()
534 }
535
536 pub(crate) fn active_tasks_count(&self) -> usize {
537 self.shared.owned.active_tasks_count()
538 }
539 }
540}
541
542cfg_unstable! {
543 use std::num::NonZeroU64;
544
545 impl Handle {
546 pub(crate) fn owned_id(&self) -> NonZeroU64 {
547 self.shared.owned.id
548 }
549 }
550}
551
552impl fmt::Debug for Handle {
553 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
554 fmt.debug_struct(name:"current_thread::Handle { ... }").finish()
555 }
556}
557
558// ===== impl Shared =====
559
560impl Schedule for Arc<Handle> {
561 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
562 self.shared.owned.remove(task)
563 }
564
565 fn schedule(&self, task: task::Notified<Self>) {
566 use scheduler::Context::CurrentThread;
567
568 context::with_scheduler(|maybe_cx| match maybe_cx {
569 Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
570 let mut core = cx.core.borrow_mut();
571
572 // If `None`, the runtime is shutting down, so there is no need
573 // to schedule the task.
574 if let Some(core) = core.as_mut() {
575 core.push_task(self, task);
576 }
577 }
578 _ => {
579 // Track that a task was scheduled from **outside** of the runtime.
580 self.shared.scheduler_metrics.inc_remote_schedule_count();
581
582 // Schedule the task
583 self.shared.inject.push(task);
584 self.driver.unpark();
585 }
586 });
587 }
588
589 cfg_unstable! {
590 fn unhandled_panic(&self) {
591 use crate::runtime::UnhandledPanic;
592
593 match self.shared.config.unhandled_panic {
594 UnhandledPanic::Ignore => {
595 // Do nothing
596 }
597 UnhandledPanic::ShutdownRuntime => {
598 use scheduler::Context::CurrentThread;
599
600 // This hook is only called from within the runtime, so
601 // `context::with_scheduler` should match with `&self`, i.e.
602 // there is no opportunity for a nested scheduler to be
603 // called.
604 context::with_scheduler(|maybe_cx| match maybe_cx {
605 Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
606 let mut core = cx.core.borrow_mut();
607
608 // If `None`, the runtime is shutting down, so there is no need to signal shutdown
609 if let Some(core) = core.as_mut() {
610 core.unhandled_panic = true;
611 self.shared.owned.close_and_shutdown_all(0);
612 }
613 }
614 _ => unreachable!("runtime core not set in CURRENT thread-local"),
615 })
616 }
617 }
618 }
619 }
620}
621
622impl Wake for Handle {
623 fn wake(arc_self: Arc<Self>) {
624 Wake::wake_by_ref(&arc_self);
625 }
626
627 /// Wake by reference
628 fn wake_by_ref(arc_self: &Arc<Self>) {
629 arc_self.shared.woken.store(val:true, order:Release);
630 arc_self.driver.unpark();
631 }
632}
633
634// ===== CoreGuard =====
635
636/// Used to ensure we always place the `Core` value back into its slot in
637/// `CurrentThread`, even if the future panics.
638struct CoreGuard<'a> {
639 context: scheduler::Context,
640 scheduler: &'a CurrentThread,
641}
642
643impl CoreGuard<'_> {
644 #[track_caller]
645 fn block_on<F: Future>(self, future: F) -> F::Output {
646 let ret = self.enter(|mut core, context| {
647 let waker = Handle::waker_ref(&context.handle);
648 let mut cx = std::task::Context::from_waker(&waker);
649
650 pin!(future);
651
652 core.metrics.start_processing_scheduled_tasks();
653
654 'outer: loop {
655 let handle = &context.handle;
656
657 if handle.reset_woken() {
658 let (c, res) = context.enter(core, || {
659 crate::runtime::coop::budget(|| future.as_mut().poll(&mut cx))
660 });
661
662 core = c;
663
664 if let Ready(v) = res {
665 return (core, Some(v));
666 }
667 }
668
669 for _ in 0..handle.shared.config.event_interval {
670 // Make sure we didn't hit an unhandled_panic
671 if core.unhandled_panic {
672 return (core, None);
673 }
674
675 core.tick();
676
677 let entry = core.next_task(handle);
678
679 let task = match entry {
680 Some(entry) => entry,
681 None => {
682 core.metrics.end_processing_scheduled_tasks();
683
684 core = if !context.defer.is_empty() {
685 context.park_yield(core, handle)
686 } else {
687 context.park(core, handle)
688 };
689
690 core.metrics.start_processing_scheduled_tasks();
691
692 // Try polling the `block_on` future next
693 continue 'outer;
694 }
695 };
696
697 let task = context.handle.shared.owned.assert_owner(task);
698
699 let (c, ()) = context.run_task(core, || {
700 task.run();
701 });
702
703 core = c;
704 }
705
706 core.metrics.end_processing_scheduled_tasks();
707
708 // Yield to the driver, this drives the timer and pulls any
709 // pending I/O events.
710 core = context.park_yield(core, handle);
711
712 core.metrics.start_processing_scheduled_tasks();
713 }
714 });
715
716 match ret {
717 Some(ret) => ret,
718 None => {
719 // `block_on` panicked.
720 panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic");
721 }
722 }
723 }
724
725 /// Enters the scheduler context. This sets the queue and other necessary
726 /// scheduler state in the thread-local.
727 fn enter<F, R>(self, f: F) -> R
728 where
729 F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R),
730 {
731 let context = self.context.expect_current_thread();
732
733 // Remove `core` from `context` to pass into the closure.
734 let core = context.core.borrow_mut().take().expect("core missing");
735
736 // Call the closure and place `core` back
737 let (core, ret) = context::set_scheduler(&self.context, || f(core, context));
738
739 *context.core.borrow_mut() = Some(core);
740
741 ret
742 }
743}
744
745impl Drop for CoreGuard<'_> {
746 fn drop(&mut self) {
747 let context: &Context = self.context.expect_current_thread();
748
749 if let Some(core: Box) = context.core.borrow_mut().take() {
750 // Replace old scheduler back into the state to allow
751 // other threads to pick it up and drive it.
752 self.scheduler.core.set(val:core);
753
754 // Wake up other possible threads that could steal the driver.
755 self.scheduler.notify.notify_one();
756 }
757 }
758}
759