| 1 | use crate::loom::sync::atomic::AtomicBool; |
| 2 | use crate::loom::sync::Arc; |
| 3 | use crate::runtime::driver::{self, Driver}; |
| 4 | use crate::runtime::scheduler::{self, Defer, Inject}; |
| 5 | use crate::runtime::task::{ |
| 6 | self, JoinHandle, OwnedTasks, Schedule, Task, TaskHarnessScheduleHooks, |
| 7 | }; |
| 8 | use crate::runtime::{ |
| 9 | blocking, context, Config, MetricsBatch, SchedulerMetrics, TaskHooks, TaskMeta, WorkerMetrics, |
| 10 | }; |
| 11 | use crate::sync::notify::Notify; |
| 12 | use crate::util::atomic_cell::AtomicCell; |
| 13 | use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef}; |
| 14 | |
| 15 | use std::cell::RefCell; |
| 16 | use std::collections::VecDeque; |
| 17 | use std::future::{poll_fn, Future}; |
| 18 | use std::sync::atomic::Ordering::{AcqRel, Release}; |
| 19 | use std::task::Poll::{Pending, Ready}; |
| 20 | use std::task::Waker; |
| 21 | use std::thread::ThreadId; |
| 22 | use std::time::Duration; |
| 23 | use std::{fmt, thread}; |
| 24 | |
| 25 | /// Executes tasks on the current thread |
| 26 | pub(crate) struct CurrentThread { |
| 27 | /// Core scheduler data is acquired by a thread entering `block_on`. |
| 28 | core: AtomicCell<Core>, |
| 29 | |
| 30 | /// Notifier for waking up other threads to steal the |
| 31 | /// driver. |
| 32 | notify: Notify, |
| 33 | } |
| 34 | |
| 35 | /// Handle to the current thread scheduler |
| 36 | pub(crate) struct Handle { |
| 37 | /// Scheduler state shared across threads |
| 38 | shared: Shared, |
| 39 | |
| 40 | /// Resource driver handles |
| 41 | pub(crate) driver: driver::Handle, |
| 42 | |
| 43 | /// Blocking pool spawner |
| 44 | pub(crate) blocking_spawner: blocking::Spawner, |
| 45 | |
| 46 | /// Current random number generator seed |
| 47 | pub(crate) seed_generator: RngSeedGenerator, |
| 48 | |
| 49 | /// User-supplied hooks to invoke for things |
| 50 | pub(crate) task_hooks: TaskHooks, |
| 51 | |
| 52 | /// If this is a `LocalRuntime`, flags the owning thread ID. |
| 53 | pub(crate) local_tid: Option<ThreadId>, |
| 54 | } |
| 55 | |
| 56 | /// Data required for executing the scheduler. The struct is passed around to |
| 57 | /// a function that will perform the scheduling work and acts as a capability token. |
| 58 | struct Core { |
| 59 | /// Scheduler run queue |
| 60 | tasks: VecDeque<Notified>, |
| 61 | |
| 62 | /// Current tick |
| 63 | tick: u32, |
| 64 | |
| 65 | /// Runtime driver |
| 66 | /// |
| 67 | /// The driver is removed before starting to park the thread |
| 68 | driver: Option<Driver>, |
| 69 | |
| 70 | /// Metrics batch |
| 71 | metrics: MetricsBatch, |
| 72 | |
| 73 | /// How often to check the global queue |
| 74 | global_queue_interval: u32, |
| 75 | |
| 76 | /// True if a task panicked without being handled and the runtime is |
| 77 | /// configured to shutdown on unhandled panic. |
| 78 | unhandled_panic: bool, |
| 79 | } |
| 80 | |
| 81 | /// Scheduler state shared between threads. |
| 82 | struct Shared { |
| 83 | /// Remote run queue |
| 84 | inject: Inject<Arc<Handle>>, |
| 85 | |
| 86 | /// Collection of all active tasks spawned onto this executor. |
| 87 | owned: OwnedTasks<Arc<Handle>>, |
| 88 | |
| 89 | /// Indicates whether the blocked on thread was woken. |
| 90 | woken: AtomicBool, |
| 91 | |
| 92 | /// Scheduler configuration options |
| 93 | config: Config, |
| 94 | |
| 95 | /// Keeps track of various runtime metrics. |
| 96 | scheduler_metrics: SchedulerMetrics, |
| 97 | |
| 98 | /// This scheduler only has one worker. |
| 99 | worker_metrics: WorkerMetrics, |
| 100 | } |
| 101 | |
| 102 | /// Thread-local context. |
| 103 | /// |
| 104 | /// pub(crate) to store in `runtime::context`. |
| 105 | pub(crate) struct Context { |
| 106 | /// Scheduler handle |
| 107 | handle: Arc<Handle>, |
| 108 | |
| 109 | /// Scheduler core, enabling the holder of `Context` to execute the |
| 110 | /// scheduler. |
| 111 | core: RefCell<Option<Box<Core>>>, |
| 112 | |
| 113 | /// Deferred tasks, usually ones that called `task::yield_now()`. |
| 114 | pub(crate) defer: Defer, |
| 115 | } |
| 116 | |
| 117 | type Notified = task::Notified<Arc<Handle>>; |
| 118 | |
| 119 | /// Initial queue capacity. |
| 120 | const INITIAL_CAPACITY: usize = 64; |
| 121 | |
| 122 | /// Used if none is specified. This is a temporary constant and will be removed |
| 123 | /// as we unify tuning logic between the multi-thread and current-thread |
| 124 | /// schedulers. |
| 125 | const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 31; |
| 126 | |
| 127 | impl CurrentThread { |
| 128 | pub(crate) fn new( |
| 129 | driver: Driver, |
| 130 | driver_handle: driver::Handle, |
| 131 | blocking_spawner: blocking::Spawner, |
| 132 | seed_generator: RngSeedGenerator, |
| 133 | config: Config, |
| 134 | local_tid: Option<ThreadId>, |
| 135 | ) -> (CurrentThread, Arc<Handle>) { |
| 136 | let worker_metrics = WorkerMetrics::from_config(&config); |
| 137 | worker_metrics.set_thread_id(thread::current().id()); |
| 138 | |
| 139 | // Get the configured global queue interval, or use the default. |
| 140 | let global_queue_interval = config |
| 141 | .global_queue_interval |
| 142 | .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL); |
| 143 | |
| 144 | let handle = Arc::new(Handle { |
| 145 | task_hooks: TaskHooks { |
| 146 | task_spawn_callback: config.before_spawn.clone(), |
| 147 | task_terminate_callback: config.after_termination.clone(), |
| 148 | #[cfg (tokio_unstable)] |
| 149 | before_poll_callback: config.before_poll.clone(), |
| 150 | #[cfg (tokio_unstable)] |
| 151 | after_poll_callback: config.after_poll.clone(), |
| 152 | }, |
| 153 | shared: Shared { |
| 154 | inject: Inject::new(), |
| 155 | owned: OwnedTasks::new(1), |
| 156 | woken: AtomicBool::new(false), |
| 157 | config, |
| 158 | scheduler_metrics: SchedulerMetrics::new(), |
| 159 | worker_metrics, |
| 160 | }, |
| 161 | driver: driver_handle, |
| 162 | blocking_spawner, |
| 163 | seed_generator, |
| 164 | local_tid, |
| 165 | }); |
| 166 | |
| 167 | let core = AtomicCell::new(Some(Box::new(Core { |
| 168 | tasks: VecDeque::with_capacity(INITIAL_CAPACITY), |
| 169 | tick: 0, |
| 170 | driver: Some(driver), |
| 171 | metrics: MetricsBatch::new(&handle.shared.worker_metrics), |
| 172 | global_queue_interval, |
| 173 | unhandled_panic: false, |
| 174 | }))); |
| 175 | |
| 176 | let scheduler = CurrentThread { |
| 177 | core, |
| 178 | notify: Notify::new(), |
| 179 | }; |
| 180 | |
| 181 | (scheduler, handle) |
| 182 | } |
| 183 | |
| 184 | #[track_caller ] |
| 185 | pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output { |
| 186 | pin!(future); |
| 187 | |
| 188 | crate::runtime::context::enter_runtime(handle, false, |blocking| { |
| 189 | let handle = handle.as_current_thread(); |
| 190 | |
| 191 | // Attempt to steal the scheduler core and block_on the future if we can |
| 192 | // there, otherwise, lets select on a notification that the core is |
| 193 | // available or the future is complete. |
| 194 | loop { |
| 195 | if let Some(core) = self.take_core(handle) { |
| 196 | handle |
| 197 | .shared |
| 198 | .worker_metrics |
| 199 | .set_thread_id(thread::current().id()); |
| 200 | return core.block_on(future); |
| 201 | } else { |
| 202 | let notified = self.notify.notified(); |
| 203 | pin!(notified); |
| 204 | |
| 205 | if let Some(out) = blocking |
| 206 | .block_on(poll_fn(|cx| { |
| 207 | if notified.as_mut().poll(cx).is_ready() { |
| 208 | return Ready(None); |
| 209 | } |
| 210 | |
| 211 | if let Ready(out) = future.as_mut().poll(cx) { |
| 212 | return Ready(Some(out)); |
| 213 | } |
| 214 | |
| 215 | Pending |
| 216 | })) |
| 217 | .expect("Failed to `Enter::block_on`" ) |
| 218 | { |
| 219 | return out; |
| 220 | } |
| 221 | } |
| 222 | } |
| 223 | }) |
| 224 | } |
| 225 | |
| 226 | fn take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>> { |
| 227 | let core = self.core.take()?; |
| 228 | |
| 229 | Some(CoreGuard { |
| 230 | context: scheduler::Context::CurrentThread(Context { |
| 231 | handle: handle.clone(), |
| 232 | core: RefCell::new(Some(core)), |
| 233 | defer: Defer::new(), |
| 234 | }), |
| 235 | scheduler: self, |
| 236 | }) |
| 237 | } |
| 238 | |
| 239 | pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) { |
| 240 | let handle = handle.as_current_thread(); |
| 241 | |
| 242 | // Avoid a double panic if we are currently panicking and |
| 243 | // the lock may be poisoned. |
| 244 | |
| 245 | let core = match self.take_core(handle) { |
| 246 | Some(core) => core, |
| 247 | None if std::thread::panicking() => return, |
| 248 | None => panic!("Oh no! We never placed the Core back, this is a bug!" ), |
| 249 | }; |
| 250 | |
| 251 | // Check that the thread-local is not being destroyed |
| 252 | let tls_available = context::with_current(|_| ()).is_ok(); |
| 253 | |
| 254 | if tls_available { |
| 255 | core.enter(|core, _context| { |
| 256 | let core = shutdown2(core, handle); |
| 257 | (core, ()) |
| 258 | }); |
| 259 | } else { |
| 260 | // Shutdown without setting the context. `tokio::spawn` calls will |
| 261 | // fail, but those will fail either way because the thread-local is |
| 262 | // not available anymore. |
| 263 | let context = core.context.expect_current_thread(); |
| 264 | let core = context.core.borrow_mut().take().unwrap(); |
| 265 | |
| 266 | let core = shutdown2(core, handle); |
| 267 | *context.core.borrow_mut() = Some(core); |
| 268 | } |
| 269 | } |
| 270 | } |
| 271 | |
| 272 | fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> { |
| 273 | // Drain the OwnedTasks collection. This call also closes the |
| 274 | // collection, ensuring that no tasks are ever pushed after this |
| 275 | // call returns. |
| 276 | handle.shared.owned.close_and_shutdown_all(0); |
| 277 | |
| 278 | // Drain local queue |
| 279 | // We already shut down every task, so we just need to drop the task. |
| 280 | while let Some(task) = core.next_local_task(handle) { |
| 281 | drop(task); |
| 282 | } |
| 283 | |
| 284 | // Close the injection queue |
| 285 | handle.shared.inject.close(); |
| 286 | |
| 287 | // Drain remote queue |
| 288 | while let Some(task) = handle.shared.inject.pop() { |
| 289 | drop(task); |
| 290 | } |
| 291 | |
| 292 | assert!(handle.shared.owned.is_empty()); |
| 293 | |
| 294 | // Submit metrics |
| 295 | core.submit_metrics(handle); |
| 296 | |
| 297 | // Shutdown the resource drivers |
| 298 | if let Some(driver) = core.driver.as_mut() { |
| 299 | driver.shutdown(&handle.driver); |
| 300 | } |
| 301 | |
| 302 | core |
| 303 | } |
| 304 | |
| 305 | impl fmt::Debug for CurrentThread { |
| 306 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 307 | fmt.debug_struct(name:"CurrentThread" ).finish() |
| 308 | } |
| 309 | } |
| 310 | |
| 311 | // ===== impl Core ===== |
| 312 | |
| 313 | impl Core { |
| 314 | /// Get and increment the current tick |
| 315 | fn tick(&mut self) { |
| 316 | self.tick = self.tick.wrapping_add(1); |
| 317 | } |
| 318 | |
| 319 | fn next_task(&mut self, handle: &Handle) -> Option<Notified> { |
| 320 | if self.tick % self.global_queue_interval == 0 { |
| 321 | handle |
| 322 | .next_remote_task() |
| 323 | .or_else(|| self.next_local_task(handle)) |
| 324 | } else { |
| 325 | self.next_local_task(handle) |
| 326 | .or_else(|| handle.next_remote_task()) |
| 327 | } |
| 328 | } |
| 329 | |
| 330 | fn next_local_task(&mut self, handle: &Handle) -> Option<Notified> { |
| 331 | let ret = self.tasks.pop_front(); |
| 332 | handle |
| 333 | .shared |
| 334 | .worker_metrics |
| 335 | .set_queue_depth(self.tasks.len()); |
| 336 | ret |
| 337 | } |
| 338 | |
| 339 | fn push_task(&mut self, handle: &Handle, task: Notified) { |
| 340 | self.tasks.push_back(task); |
| 341 | self.metrics.inc_local_schedule_count(); |
| 342 | handle |
| 343 | .shared |
| 344 | .worker_metrics |
| 345 | .set_queue_depth(self.tasks.len()); |
| 346 | } |
| 347 | |
| 348 | fn submit_metrics(&mut self, handle: &Handle) { |
| 349 | self.metrics.submit(&handle.shared.worker_metrics, 0); |
| 350 | } |
| 351 | } |
| 352 | |
| 353 | #[cfg (tokio_taskdump)] |
| 354 | fn wake_deferred_tasks_and_free(context: &Context) { |
| 355 | let wakers = context.defer.take_deferred(); |
| 356 | for waker in wakers { |
| 357 | waker.wake(); |
| 358 | } |
| 359 | } |
| 360 | |
| 361 | // ===== impl Context ===== |
| 362 | |
| 363 | impl Context { |
| 364 | /// Execute the closure with the given scheduler core stored in the |
| 365 | /// thread-local context. |
| 366 | fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) { |
| 367 | core.metrics.start_poll(); |
| 368 | let mut ret = self.enter(core, || crate::task::coop::budget(f)); |
| 369 | ret.0.metrics.end_poll(); |
| 370 | ret |
| 371 | } |
| 372 | |
| 373 | /// Blocks the current thread until an event is received by the driver, |
| 374 | /// including I/O events, timer events, ... |
| 375 | fn park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> { |
| 376 | let mut driver = core.driver.take().expect("driver missing" ); |
| 377 | |
| 378 | if let Some(f) = &handle.shared.config.before_park { |
| 379 | let (c, ()) = self.enter(core, || f()); |
| 380 | core = c; |
| 381 | } |
| 382 | |
| 383 | // This check will fail if `before_park` spawns a task for us to run |
| 384 | // instead of parking the thread |
| 385 | if core.tasks.is_empty() { |
| 386 | // Park until the thread is signaled |
| 387 | core.metrics.about_to_park(); |
| 388 | core.submit_metrics(handle); |
| 389 | |
| 390 | let (c, ()) = self.enter(core, || { |
| 391 | driver.park(&handle.driver); |
| 392 | self.defer.wake(); |
| 393 | }); |
| 394 | |
| 395 | core = c; |
| 396 | |
| 397 | core.metrics.unparked(); |
| 398 | core.submit_metrics(handle); |
| 399 | } |
| 400 | |
| 401 | if let Some(f) = &handle.shared.config.after_unpark { |
| 402 | let (c, ()) = self.enter(core, || f()); |
| 403 | core = c; |
| 404 | } |
| 405 | |
| 406 | core.driver = Some(driver); |
| 407 | core |
| 408 | } |
| 409 | |
| 410 | /// Checks the driver for new events without blocking the thread. |
| 411 | fn park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> { |
| 412 | let mut driver = core.driver.take().expect("driver missing" ); |
| 413 | |
| 414 | core.submit_metrics(handle); |
| 415 | |
| 416 | let (mut core, ()) = self.enter(core, || { |
| 417 | driver.park_timeout(&handle.driver, Duration::from_millis(0)); |
| 418 | self.defer.wake(); |
| 419 | }); |
| 420 | |
| 421 | core.driver = Some(driver); |
| 422 | core |
| 423 | } |
| 424 | |
| 425 | fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) { |
| 426 | // Store the scheduler core in the thread-local context |
| 427 | // |
| 428 | // A drop-guard is employed at a higher level. |
| 429 | *self.core.borrow_mut() = Some(core); |
| 430 | |
| 431 | // Execute the closure while tracking the execution budget |
| 432 | let ret = f(); |
| 433 | |
| 434 | // Take the scheduler core back |
| 435 | let core = self.core.borrow_mut().take().expect("core missing" ); |
| 436 | (core, ret) |
| 437 | } |
| 438 | |
| 439 | pub(crate) fn defer(&self, waker: &Waker) { |
| 440 | self.defer.defer(waker); |
| 441 | } |
| 442 | } |
| 443 | |
| 444 | // ===== impl Handle ===== |
| 445 | |
| 446 | impl Handle { |
| 447 | /// Spawns a future onto the `CurrentThread` scheduler |
| 448 | pub(crate) fn spawn<F>( |
| 449 | me: &Arc<Self>, |
| 450 | future: F, |
| 451 | id: crate::runtime::task::Id, |
| 452 | ) -> JoinHandle<F::Output> |
| 453 | where |
| 454 | F: crate::future::Future + Send + 'static, |
| 455 | F::Output: Send + 'static, |
| 456 | { |
| 457 | let (handle, notified) = me.shared.owned.bind(future, me.clone(), id); |
| 458 | |
| 459 | me.task_hooks.spawn(&TaskMeta { |
| 460 | id, |
| 461 | _phantom: Default::default(), |
| 462 | }); |
| 463 | |
| 464 | if let Some(notified) = notified { |
| 465 | me.schedule(notified); |
| 466 | } |
| 467 | |
| 468 | handle |
| 469 | } |
| 470 | |
| 471 | /// Spawn a task which isn't safe to send across thread boundaries onto the runtime. |
| 472 | /// |
| 473 | /// # Safety |
| 474 | /// This should only be used when this is a `LocalRuntime` or in another case where the runtime |
| 475 | /// provably cannot be driven from or moved to different threads from the one on which the task |
| 476 | /// is spawned. |
| 477 | pub(crate) unsafe fn spawn_local<F>( |
| 478 | me: &Arc<Self>, |
| 479 | future: F, |
| 480 | id: crate::runtime::task::Id, |
| 481 | ) -> JoinHandle<F::Output> |
| 482 | where |
| 483 | F: crate::future::Future + 'static, |
| 484 | F::Output: 'static, |
| 485 | { |
| 486 | let (handle, notified) = me.shared.owned.bind_local(future, me.clone(), id); |
| 487 | |
| 488 | me.task_hooks.spawn(&TaskMeta { |
| 489 | id, |
| 490 | _phantom: Default::default(), |
| 491 | }); |
| 492 | |
| 493 | if let Some(notified) = notified { |
| 494 | me.schedule(notified); |
| 495 | } |
| 496 | |
| 497 | handle |
| 498 | } |
| 499 | |
| 500 | /// Capture a snapshot of this runtime's state. |
| 501 | #[cfg (all( |
| 502 | tokio_unstable, |
| 503 | tokio_taskdump, |
| 504 | target_os = "linux" , |
| 505 | any(target_arch = "aarch64" , target_arch = "x86" , target_arch = "x86_64" ) |
| 506 | ))] |
| 507 | pub(crate) fn dump(&self) -> crate::runtime::Dump { |
| 508 | use crate::runtime::dump; |
| 509 | use task::trace::trace_current_thread; |
| 510 | |
| 511 | let mut traces = vec![]; |
| 512 | |
| 513 | // todo: how to make this work outside of a runtime context? |
| 514 | context::with_scheduler(|maybe_context| { |
| 515 | // drain the local queue |
| 516 | let context = if let Some(context) = maybe_context { |
| 517 | context.expect_current_thread() |
| 518 | } else { |
| 519 | return; |
| 520 | }; |
| 521 | let mut maybe_core = context.core.borrow_mut(); |
| 522 | let core = if let Some(core) = maybe_core.as_mut() { |
| 523 | core |
| 524 | } else { |
| 525 | return; |
| 526 | }; |
| 527 | let local = &mut core.tasks; |
| 528 | |
| 529 | if self.shared.inject.is_closed() { |
| 530 | return; |
| 531 | } |
| 532 | |
| 533 | traces = trace_current_thread(&self.shared.owned, local, &self.shared.inject) |
| 534 | .into_iter() |
| 535 | .map(|(id, trace)| dump::Task::new(id, trace)) |
| 536 | .collect(); |
| 537 | |
| 538 | // Avoid double borrow panic |
| 539 | drop(maybe_core); |
| 540 | |
| 541 | // Taking a taskdump could wakes every task, but we probably don't want |
| 542 | // the `yield_now` vector to be that large under normal circumstances. |
| 543 | // Therefore, we free its allocation. |
| 544 | wake_deferred_tasks_and_free(context); |
| 545 | }); |
| 546 | |
| 547 | dump::Dump::new(traces) |
| 548 | } |
| 549 | |
| 550 | fn next_remote_task(&self) -> Option<Notified> { |
| 551 | self.shared.inject.pop() |
| 552 | } |
| 553 | |
| 554 | fn waker_ref(me: &Arc<Self>) -> WakerRef<'_> { |
| 555 | // Set woken to true when enter block_on, ensure outer future |
| 556 | // be polled for the first time when enter loop |
| 557 | me.shared.woken.store(true, Release); |
| 558 | waker_ref(me) |
| 559 | } |
| 560 | |
| 561 | // reset woken to false and return original value |
| 562 | pub(crate) fn reset_woken(&self) -> bool { |
| 563 | self.shared.woken.swap(false, AcqRel) |
| 564 | } |
| 565 | |
| 566 | pub(crate) fn num_alive_tasks(&self) -> usize { |
| 567 | self.shared.owned.num_alive_tasks() |
| 568 | } |
| 569 | |
| 570 | pub(crate) fn injection_queue_depth(&self) -> usize { |
| 571 | self.shared.inject.len() |
| 572 | } |
| 573 | } |
| 574 | |
| 575 | cfg_unstable_metrics! { |
| 576 | impl Handle { |
| 577 | pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { |
| 578 | &self.shared.scheduler_metrics |
| 579 | } |
| 580 | |
| 581 | pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { |
| 582 | assert_eq!(0, worker); |
| 583 | &self.shared.worker_metrics |
| 584 | } |
| 585 | |
| 586 | pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { |
| 587 | self.worker_metrics(worker).queue_depth() |
| 588 | } |
| 589 | |
| 590 | pub(crate) fn num_blocking_threads(&self) -> usize { |
| 591 | self.blocking_spawner.num_threads() |
| 592 | } |
| 593 | |
| 594 | pub(crate) fn num_idle_blocking_threads(&self) -> usize { |
| 595 | self.blocking_spawner.num_idle_threads() |
| 596 | } |
| 597 | |
| 598 | pub(crate) fn blocking_queue_depth(&self) -> usize { |
| 599 | self.blocking_spawner.queue_depth() |
| 600 | } |
| 601 | |
| 602 | cfg_64bit_metrics! { |
| 603 | pub(crate) fn spawned_tasks_count(&self) -> u64 { |
| 604 | self.shared.owned.spawned_tasks_count() |
| 605 | } |
| 606 | } |
| 607 | } |
| 608 | } |
| 609 | |
| 610 | cfg_unstable! { |
| 611 | use std::num::NonZeroU64; |
| 612 | |
| 613 | impl Handle { |
| 614 | pub(crate) fn owned_id(&self) -> NonZeroU64 { |
| 615 | self.shared.owned.id |
| 616 | } |
| 617 | } |
| 618 | } |
| 619 | |
| 620 | impl fmt::Debug for Handle { |
| 621 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 622 | fmt.debug_struct(name:"current_thread::Handle { ... }" ).finish() |
| 623 | } |
| 624 | } |
| 625 | |
| 626 | // ===== impl Shared ===== |
| 627 | |
| 628 | impl Schedule for Arc<Handle> { |
| 629 | fn release(&self, task: &Task<Self>) -> Option<Task<Self>> { |
| 630 | self.shared.owned.remove(task) |
| 631 | } |
| 632 | |
| 633 | fn schedule(&self, task: task::Notified<Self>) { |
| 634 | use scheduler::Context::CurrentThread; |
| 635 | |
| 636 | context::with_scheduler(|maybe_cx| match maybe_cx { |
| 637 | Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => { |
| 638 | let mut core = cx.core.borrow_mut(); |
| 639 | |
| 640 | // If `None`, the runtime is shutting down, so there is no need |
| 641 | // to schedule the task. |
| 642 | if let Some(core) = core.as_mut() { |
| 643 | core.push_task(self, task); |
| 644 | } |
| 645 | } |
| 646 | _ => { |
| 647 | // Track that a task was scheduled from **outside** of the runtime. |
| 648 | self.shared.scheduler_metrics.inc_remote_schedule_count(); |
| 649 | |
| 650 | // Schedule the task |
| 651 | self.shared.inject.push(task); |
| 652 | self.driver.unpark(); |
| 653 | } |
| 654 | }); |
| 655 | } |
| 656 | |
| 657 | fn hooks(&self) -> TaskHarnessScheduleHooks { |
| 658 | TaskHarnessScheduleHooks { |
| 659 | task_terminate_callback: self.task_hooks.task_terminate_callback.clone(), |
| 660 | } |
| 661 | } |
| 662 | |
| 663 | cfg_unstable! { |
| 664 | fn unhandled_panic(&self) { |
| 665 | use crate::runtime::UnhandledPanic; |
| 666 | |
| 667 | match self.shared.config.unhandled_panic { |
| 668 | UnhandledPanic::Ignore => { |
| 669 | // Do nothing |
| 670 | } |
| 671 | UnhandledPanic::ShutdownRuntime => { |
| 672 | use scheduler::Context::CurrentThread; |
| 673 | |
| 674 | // This hook is only called from within the runtime, so |
| 675 | // `context::with_scheduler` should match with `&self`, i.e. |
| 676 | // there is no opportunity for a nested scheduler to be |
| 677 | // called. |
| 678 | context::with_scheduler(|maybe_cx| match maybe_cx { |
| 679 | Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => { |
| 680 | let mut core = cx.core.borrow_mut(); |
| 681 | |
| 682 | // If `None`, the runtime is shutting down, so there is no need to signal shutdown |
| 683 | if let Some(core) = core.as_mut() { |
| 684 | core.unhandled_panic = true; |
| 685 | self.shared.owned.close_and_shutdown_all(0); |
| 686 | } |
| 687 | } |
| 688 | _ => unreachable!("runtime core not set in CURRENT thread-local" ), |
| 689 | }) |
| 690 | } |
| 691 | } |
| 692 | } |
| 693 | } |
| 694 | } |
| 695 | |
| 696 | impl Wake for Handle { |
| 697 | fn wake(arc_self: Arc<Self>) { |
| 698 | Wake::wake_by_ref(&arc_self); |
| 699 | } |
| 700 | |
| 701 | /// Wake by reference |
| 702 | fn wake_by_ref(arc_self: &Arc<Self>) { |
| 703 | arc_self.shared.woken.store(val:true, order:Release); |
| 704 | arc_self.driver.unpark(); |
| 705 | } |
| 706 | } |
| 707 | |
| 708 | // ===== CoreGuard ===== |
| 709 | |
| 710 | /// Used to ensure we always place the `Core` value back into its slot in |
| 711 | /// `CurrentThread`, even if the future panics. |
| 712 | struct CoreGuard<'a> { |
| 713 | context: scheduler::Context, |
| 714 | scheduler: &'a CurrentThread, |
| 715 | } |
| 716 | |
| 717 | impl CoreGuard<'_> { |
| 718 | #[track_caller ] |
| 719 | fn block_on<F: Future>(self, future: F) -> F::Output { |
| 720 | let ret = self.enter(|mut core, context| { |
| 721 | let waker = Handle::waker_ref(&context.handle); |
| 722 | let mut cx = std::task::Context::from_waker(&waker); |
| 723 | |
| 724 | pin!(future); |
| 725 | |
| 726 | core.metrics.start_processing_scheduled_tasks(); |
| 727 | |
| 728 | 'outer: loop { |
| 729 | let handle = &context.handle; |
| 730 | |
| 731 | if handle.reset_woken() { |
| 732 | let (c, res) = context.enter(core, || { |
| 733 | crate::task::coop::budget(|| future.as_mut().poll(&mut cx)) |
| 734 | }); |
| 735 | |
| 736 | core = c; |
| 737 | |
| 738 | if let Ready(v) = res { |
| 739 | return (core, Some(v)); |
| 740 | } |
| 741 | } |
| 742 | |
| 743 | for _ in 0..handle.shared.config.event_interval { |
| 744 | // Make sure we didn't hit an unhandled_panic |
| 745 | if core.unhandled_panic { |
| 746 | return (core, None); |
| 747 | } |
| 748 | |
| 749 | core.tick(); |
| 750 | |
| 751 | let entry = core.next_task(handle); |
| 752 | |
| 753 | let task = match entry { |
| 754 | Some(entry) => entry, |
| 755 | None => { |
| 756 | core.metrics.end_processing_scheduled_tasks(); |
| 757 | |
| 758 | core = if !context.defer.is_empty() { |
| 759 | context.park_yield(core, handle) |
| 760 | } else { |
| 761 | context.park(core, handle) |
| 762 | }; |
| 763 | |
| 764 | core.metrics.start_processing_scheduled_tasks(); |
| 765 | |
| 766 | // Try polling the `block_on` future next |
| 767 | continue 'outer; |
| 768 | } |
| 769 | }; |
| 770 | |
| 771 | let task = context.handle.shared.owned.assert_owner(task); |
| 772 | |
| 773 | #[cfg (tokio_unstable)] |
| 774 | let task_id = task.task_id(); |
| 775 | |
| 776 | let (c, ()) = context.run_task(core, || { |
| 777 | #[cfg (tokio_unstable)] |
| 778 | context.handle.task_hooks.poll_start_callback(task_id); |
| 779 | |
| 780 | task.run(); |
| 781 | |
| 782 | #[cfg (tokio_unstable)] |
| 783 | context.handle.task_hooks.poll_stop_callback(task_id); |
| 784 | }); |
| 785 | |
| 786 | core = c; |
| 787 | } |
| 788 | |
| 789 | core.metrics.end_processing_scheduled_tasks(); |
| 790 | |
| 791 | // Yield to the driver, this drives the timer and pulls any |
| 792 | // pending I/O events. |
| 793 | core = context.park_yield(core, handle); |
| 794 | |
| 795 | core.metrics.start_processing_scheduled_tasks(); |
| 796 | } |
| 797 | }); |
| 798 | |
| 799 | match ret { |
| 800 | Some(ret) => ret, |
| 801 | None => { |
| 802 | // `block_on` panicked. |
| 803 | panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic" ); |
| 804 | } |
| 805 | } |
| 806 | } |
| 807 | |
| 808 | /// Enters the scheduler context. This sets the queue and other necessary |
| 809 | /// scheduler state in the thread-local. |
| 810 | fn enter<F, R>(self, f: F) -> R |
| 811 | where |
| 812 | F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R), |
| 813 | { |
| 814 | let context = self.context.expect_current_thread(); |
| 815 | |
| 816 | // Remove `core` from `context` to pass into the closure. |
| 817 | let core = context.core.borrow_mut().take().expect("core missing" ); |
| 818 | |
| 819 | // Call the closure and place `core` back |
| 820 | let (core, ret) = context::set_scheduler(&self.context, || f(core, context)); |
| 821 | |
| 822 | *context.core.borrow_mut() = Some(core); |
| 823 | |
| 824 | ret |
| 825 | } |
| 826 | } |
| 827 | |
| 828 | impl Drop for CoreGuard<'_> { |
| 829 | fn drop(&mut self) { |
| 830 | let context: &Context = self.context.expect_current_thread(); |
| 831 | |
| 832 | if let Some(core: Box) = context.core.borrow_mut().take() { |
| 833 | // Replace old scheduler back into the state to allow |
| 834 | // other threads to pick it up and drive it. |
| 835 | self.scheduler.core.set(val:core); |
| 836 | |
| 837 | // Wake up other possible threads that could steal the driver. |
| 838 | self.scheduler.notify.notify_one(); |
| 839 | } |
| 840 | } |
| 841 | } |
| 842 | |