| 1 | //! A scheduler is initialized with a fixed number of workers. Each worker is |
| 2 | //! driven by a thread. Each worker has a "core" which contains data such as the |
| 3 | //! run queue and other state. When `block_in_place` is called, the worker's |
| 4 | //! "core" is handed off to a new thread allowing the scheduler to continue to |
| 5 | //! make progress while the originating thread blocks. |
| 6 | //! |
| 7 | //! # Shutdown |
| 8 | //! |
| 9 | //! Shutting down the runtime involves the following steps: |
| 10 | //! |
| 11 | //! 1. The Shared::close method is called. This closes the inject queue and |
| 12 | //! `OwnedTasks` instance and wakes up all worker threads. |
| 13 | //! |
| 14 | //! 2. Each worker thread observes the close signal next time it runs |
| 15 | //! Core::maintenance by checking whether the inject queue is closed. |
| 16 | //! The `Core::is_shutdown` flag is set to true. |
| 17 | //! |
| 18 | //! 3. The worker thread calls `pre_shutdown` in parallel. Here, the worker |
| 19 | //! will keep removing tasks from `OwnedTasks` until it is empty. No new |
| 20 | //! tasks can be pushed to the `OwnedTasks` during or after this step as it |
| 21 | //! was closed in step 1. |
| 22 | //! |
| 23 | //! 5. The workers call Shared::shutdown to enter the single-threaded phase of |
| 24 | //! shutdown. These calls will push their core to `Shared::shutdown_cores`, |
| 25 | //! and the last thread to push its core will finish the shutdown procedure. |
| 26 | //! |
| 27 | //! 6. The local run queue of each core is emptied, then the inject queue is |
| 28 | //! emptied. |
| 29 | //! |
| 30 | //! At this point, shutdown has completed. It is not possible for any of the |
| 31 | //! collections to contain any tasks at this point, as each collection was |
| 32 | //! closed first, then emptied afterwards. |
| 33 | //! |
| 34 | //! ## Spawns during shutdown |
| 35 | //! |
| 36 | //! When spawning tasks during shutdown, there are two cases: |
| 37 | //! |
| 38 | //! * The spawner observes the `OwnedTasks` being open, and the inject queue is |
| 39 | //! closed. |
| 40 | //! * The spawner observes the `OwnedTasks` being closed and doesn't check the |
| 41 | //! inject queue. |
| 42 | //! |
| 43 | //! The first case can only happen if the `OwnedTasks::bind` call happens before |
| 44 | //! or during step 1 of shutdown. In this case, the runtime will clean up the |
| 45 | //! task in step 3 of shutdown. |
| 46 | //! |
| 47 | //! In the latter case, the task was not spawned and the task is immediately |
| 48 | //! cancelled by the spawner. |
| 49 | //! |
| 50 | //! The correctness of shutdown requires both the inject queue and `OwnedTasks` |
| 51 | //! collection to have a closed bit. With a close bit on only the inject queue, |
| 52 | //! spawning could run in to a situation where a task is successfully bound long |
| 53 | //! after the runtime has shut down. With a close bit on only the `OwnedTasks`, |
| 54 | //! the first spawning situation could result in the notification being pushed |
| 55 | //! to the inject queue after step 6 of shutdown, which would leave a task in |
| 56 | //! the inject queue indefinitely. This would be a ref-count cycle and a memory |
| 57 | //! leak. |
| 58 | |
| 59 | use crate::loom::sync::{Arc, Mutex}; |
| 60 | use crate::runtime; |
| 61 | use crate::runtime::scheduler::multi_thread::{ |
| 62 | idle, queue, Counters, Handle, Idle, Overflow, Parker, Stats, TraceStatus, Unparker, |
| 63 | }; |
| 64 | use crate::runtime::scheduler::{inject, Defer, Lock}; |
| 65 | use crate::runtime::task::{OwnedTasks, TaskHarnessScheduleHooks}; |
| 66 | use crate::runtime::{blocking, driver, scheduler, task, Config, SchedulerMetrics, WorkerMetrics}; |
| 67 | use crate::runtime::{context, TaskHooks}; |
| 68 | use crate::task::coop; |
| 69 | use crate::util::atomic_cell::AtomicCell; |
| 70 | use crate::util::rand::{FastRand, RngSeedGenerator}; |
| 71 | |
| 72 | use std::cell::RefCell; |
| 73 | use std::task::Waker; |
| 74 | use std::thread; |
| 75 | use std::time::Duration; |
| 76 | |
| 77 | mod metrics; |
| 78 | |
| 79 | cfg_taskdump! { |
| 80 | mod taskdump; |
| 81 | } |
| 82 | |
| 83 | cfg_not_taskdump! { |
| 84 | mod taskdump_mock; |
| 85 | } |
| 86 | |
| 87 | /// A scheduler worker |
| 88 | pub(super) struct Worker { |
| 89 | /// Reference to scheduler's handle |
| 90 | handle: Arc<Handle>, |
| 91 | |
| 92 | /// Index holding this worker's remote state |
| 93 | index: usize, |
| 94 | |
| 95 | /// Used to hand-off a worker's core to another thread. |
| 96 | core: AtomicCell<Core>, |
| 97 | } |
| 98 | |
| 99 | /// Core data |
| 100 | struct Core { |
| 101 | /// Used to schedule bookkeeping tasks every so often. |
| 102 | tick: u32, |
| 103 | |
| 104 | /// When a task is scheduled from a worker, it is stored in this slot. The |
| 105 | /// worker will check this slot for a task **before** checking the run |
| 106 | /// queue. This effectively results in the **last** scheduled task to be run |
| 107 | /// next (LIFO). This is an optimization for improving locality which |
| 108 | /// benefits message passing patterns and helps to reduce latency. |
| 109 | lifo_slot: Option<Notified>, |
| 110 | |
| 111 | /// When `true`, locally scheduled tasks go to the LIFO slot. When `false`, |
| 112 | /// they go to the back of the `run_queue`. |
| 113 | lifo_enabled: bool, |
| 114 | |
| 115 | /// The worker-local run queue. |
| 116 | run_queue: queue::Local<Arc<Handle>>, |
| 117 | |
| 118 | /// True if the worker is currently searching for more work. Searching |
| 119 | /// involves attempting to steal from other workers. |
| 120 | is_searching: bool, |
| 121 | |
| 122 | /// True if the scheduler is being shutdown |
| 123 | is_shutdown: bool, |
| 124 | |
| 125 | /// True if the scheduler is being traced |
| 126 | is_traced: bool, |
| 127 | |
| 128 | /// Parker |
| 129 | /// |
| 130 | /// Stored in an `Option` as the parker is added / removed to make the |
| 131 | /// borrow checker happy. |
| 132 | park: Option<Parker>, |
| 133 | |
| 134 | /// Per-worker runtime stats |
| 135 | stats: Stats, |
| 136 | |
| 137 | /// How often to check the global queue |
| 138 | global_queue_interval: u32, |
| 139 | |
| 140 | /// Fast random number generator. |
| 141 | rand: FastRand, |
| 142 | } |
| 143 | |
| 144 | /// State shared across all workers |
| 145 | pub(crate) struct Shared { |
| 146 | /// Per-worker remote state. All other workers have access to this and is |
| 147 | /// how they communicate between each other. |
| 148 | remotes: Box<[Remote]>, |
| 149 | |
| 150 | /// Global task queue used for: |
| 151 | /// 1. Submit work to the scheduler while **not** currently on a worker thread. |
| 152 | /// 2. Submit work to the scheduler when a worker run queue is saturated |
| 153 | pub(super) inject: inject::Shared<Arc<Handle>>, |
| 154 | |
| 155 | /// Coordinates idle workers |
| 156 | idle: Idle, |
| 157 | |
| 158 | /// Collection of all active tasks spawned onto this executor. |
| 159 | pub(crate) owned: OwnedTasks<Arc<Handle>>, |
| 160 | |
| 161 | /// Data synchronized by the scheduler mutex |
| 162 | pub(super) synced: Mutex<Synced>, |
| 163 | |
| 164 | /// Cores that have observed the shutdown signal |
| 165 | /// |
| 166 | /// The core is **not** placed back in the worker to avoid it from being |
| 167 | /// stolen by a thread that was spawned as part of `block_in_place`. |
| 168 | #[allow (clippy::vec_box)] // we're moving an already-boxed value |
| 169 | shutdown_cores: Mutex<Vec<Box<Core>>>, |
| 170 | |
| 171 | /// The number of cores that have observed the trace signal. |
| 172 | pub(super) trace_status: TraceStatus, |
| 173 | |
| 174 | /// Scheduler configuration options |
| 175 | config: Config, |
| 176 | |
| 177 | /// Collects metrics from the runtime. |
| 178 | pub(super) scheduler_metrics: SchedulerMetrics, |
| 179 | |
| 180 | pub(super) worker_metrics: Box<[WorkerMetrics]>, |
| 181 | |
| 182 | /// Only held to trigger some code on drop. This is used to get internal |
| 183 | /// runtime metrics that can be useful when doing performance |
| 184 | /// investigations. This does nothing (empty struct, no drop impl) unless |
| 185 | /// the `tokio_internal_mt_counters` `cfg` flag is set. |
| 186 | _counters: Counters, |
| 187 | } |
| 188 | |
| 189 | /// Data synchronized by the scheduler mutex |
| 190 | pub(crate) struct Synced { |
| 191 | /// Synchronized state for `Idle`. |
| 192 | pub(super) idle: idle::Synced, |
| 193 | |
| 194 | /// Synchronized state for `Inject`. |
| 195 | pub(crate) inject: inject::Synced, |
| 196 | } |
| 197 | |
| 198 | /// Used to communicate with a worker from other threads. |
| 199 | struct Remote { |
| 200 | /// Steals tasks from this worker. |
| 201 | pub(super) steal: queue::Steal<Arc<Handle>>, |
| 202 | |
| 203 | /// Unparks the associated worker thread |
| 204 | unpark: Unparker, |
| 205 | } |
| 206 | |
| 207 | /// Thread-local context |
| 208 | pub(crate) struct Context { |
| 209 | /// Worker |
| 210 | worker: Arc<Worker>, |
| 211 | |
| 212 | /// Core data |
| 213 | core: RefCell<Option<Box<Core>>>, |
| 214 | |
| 215 | /// Tasks to wake after resource drivers are polled. This is mostly to |
| 216 | /// handle yielded tasks. |
| 217 | pub(crate) defer: Defer, |
| 218 | } |
| 219 | |
| 220 | /// Starts the workers |
| 221 | pub(crate) struct Launch(Vec<Arc<Worker>>); |
| 222 | |
| 223 | /// Running a task may consume the core. If the core is still available when |
| 224 | /// running the task completes, it is returned. Otherwise, the worker will need |
| 225 | /// to stop processing. |
| 226 | type RunResult = Result<Box<Core>, ()>; |
| 227 | |
| 228 | /// A task handle |
| 229 | type Task = task::Task<Arc<Handle>>; |
| 230 | |
| 231 | /// A notified task handle |
| 232 | type Notified = task::Notified<Arc<Handle>>; |
| 233 | |
| 234 | /// Value picked out of thin-air. Running the LIFO slot a handful of times |
| 235 | /// seems sufficient to benefit from locality. More than 3 times probably is |
| 236 | /// overweighing. The value can be tuned in the future with data that shows |
| 237 | /// improvements. |
| 238 | const MAX_LIFO_POLLS_PER_TICK: usize = 3; |
| 239 | |
| 240 | pub(super) fn create( |
| 241 | size: usize, |
| 242 | park: Parker, |
| 243 | driver_handle: driver::Handle, |
| 244 | blocking_spawner: blocking::Spawner, |
| 245 | seed_generator: RngSeedGenerator, |
| 246 | config: Config, |
| 247 | ) -> (Arc<Handle>, Launch) { |
| 248 | let mut cores = Vec::with_capacity(size); |
| 249 | let mut remotes = Vec::with_capacity(size); |
| 250 | let mut worker_metrics = Vec::with_capacity(size); |
| 251 | |
| 252 | // Create the local queues |
| 253 | for _ in 0..size { |
| 254 | let (steal, run_queue) = queue::local(); |
| 255 | |
| 256 | let park = park.clone(); |
| 257 | let unpark = park.unpark(); |
| 258 | let metrics = WorkerMetrics::from_config(&config); |
| 259 | let stats = Stats::new(&metrics); |
| 260 | |
| 261 | cores.push(Box::new(Core { |
| 262 | tick: 0, |
| 263 | lifo_slot: None, |
| 264 | lifo_enabled: !config.disable_lifo_slot, |
| 265 | run_queue, |
| 266 | is_searching: false, |
| 267 | is_shutdown: false, |
| 268 | is_traced: false, |
| 269 | park: Some(park), |
| 270 | global_queue_interval: stats.tuned_global_queue_interval(&config), |
| 271 | stats, |
| 272 | rand: FastRand::from_seed(config.seed_generator.next_seed()), |
| 273 | })); |
| 274 | |
| 275 | remotes.push(Remote { steal, unpark }); |
| 276 | worker_metrics.push(metrics); |
| 277 | } |
| 278 | |
| 279 | let (idle, idle_synced) = Idle::new(size); |
| 280 | let (inject, inject_synced) = inject::Shared::new(); |
| 281 | |
| 282 | let remotes_len = remotes.len(); |
| 283 | let handle = Arc::new(Handle { |
| 284 | task_hooks: TaskHooks::from_config(&config), |
| 285 | shared: Shared { |
| 286 | remotes: remotes.into_boxed_slice(), |
| 287 | inject, |
| 288 | idle, |
| 289 | owned: OwnedTasks::new(size), |
| 290 | synced: Mutex::new(Synced { |
| 291 | idle: idle_synced, |
| 292 | inject: inject_synced, |
| 293 | }), |
| 294 | shutdown_cores: Mutex::new(vec![]), |
| 295 | trace_status: TraceStatus::new(remotes_len), |
| 296 | config, |
| 297 | scheduler_metrics: SchedulerMetrics::new(), |
| 298 | worker_metrics: worker_metrics.into_boxed_slice(), |
| 299 | _counters: Counters, |
| 300 | }, |
| 301 | driver: driver_handle, |
| 302 | blocking_spawner, |
| 303 | seed_generator, |
| 304 | }); |
| 305 | |
| 306 | let mut launch = Launch(vec![]); |
| 307 | |
| 308 | for (index, core) in cores.drain(..).enumerate() { |
| 309 | launch.0.push(Arc::new(Worker { |
| 310 | handle: handle.clone(), |
| 311 | index, |
| 312 | core: AtomicCell::new(Some(core)), |
| 313 | })); |
| 314 | } |
| 315 | |
| 316 | (handle, launch) |
| 317 | } |
| 318 | |
| 319 | #[track_caller ] |
| 320 | pub(crate) fn block_in_place<F, R>(f: F) -> R |
| 321 | where |
| 322 | F: FnOnce() -> R, |
| 323 | { |
| 324 | // Try to steal the worker core back |
| 325 | struct Reset { |
| 326 | take_core: bool, |
| 327 | budget: coop::Budget, |
| 328 | } |
| 329 | |
| 330 | impl Drop for Reset { |
| 331 | fn drop(&mut self) { |
| 332 | with_current(|maybe_cx| { |
| 333 | if let Some(cx) = maybe_cx { |
| 334 | if self.take_core { |
| 335 | let core = cx.worker.core.take(); |
| 336 | |
| 337 | if core.is_some() { |
| 338 | cx.worker.handle.shared.worker_metrics[cx.worker.index] |
| 339 | .set_thread_id(thread::current().id()); |
| 340 | } |
| 341 | |
| 342 | let mut cx_core = cx.core.borrow_mut(); |
| 343 | assert!(cx_core.is_none()); |
| 344 | *cx_core = core; |
| 345 | } |
| 346 | |
| 347 | // Reset the task budget as we are re-entering the |
| 348 | // runtime. |
| 349 | coop::set(self.budget); |
| 350 | } |
| 351 | }); |
| 352 | } |
| 353 | } |
| 354 | |
| 355 | let mut had_entered = false; |
| 356 | let mut take_core = false; |
| 357 | |
| 358 | let setup_result = with_current(|maybe_cx| { |
| 359 | match ( |
| 360 | crate::runtime::context::current_enter_context(), |
| 361 | maybe_cx.is_some(), |
| 362 | ) { |
| 363 | (context::EnterRuntime::Entered { .. }, true) => { |
| 364 | // We are on a thread pool runtime thread, so we just need to |
| 365 | // set up blocking. |
| 366 | had_entered = true; |
| 367 | } |
| 368 | ( |
| 369 | context::EnterRuntime::Entered { |
| 370 | allow_block_in_place, |
| 371 | }, |
| 372 | false, |
| 373 | ) => { |
| 374 | // We are on an executor, but _not_ on the thread pool. That is |
| 375 | // _only_ okay if we are in a thread pool runtime's block_on |
| 376 | // method: |
| 377 | if allow_block_in_place { |
| 378 | had_entered = true; |
| 379 | return Ok(()); |
| 380 | } else { |
| 381 | // This probably means we are on the current_thread runtime or in a |
| 382 | // LocalSet, where it is _not_ okay to block. |
| 383 | return Err( |
| 384 | "can call blocking only when running on the multi-threaded runtime" , |
| 385 | ); |
| 386 | } |
| 387 | } |
| 388 | (context::EnterRuntime::NotEntered, true) => { |
| 389 | // This is a nested call to block_in_place (we already exited). |
| 390 | // All the necessary setup has already been done. |
| 391 | return Ok(()); |
| 392 | } |
| 393 | (context::EnterRuntime::NotEntered, false) => { |
| 394 | // We are outside of the tokio runtime, so blocking is fine. |
| 395 | // We can also skip all of the thread pool blocking setup steps. |
| 396 | return Ok(()); |
| 397 | } |
| 398 | } |
| 399 | |
| 400 | let cx = maybe_cx.expect("no .is_some() == false cases above should lead here" ); |
| 401 | |
| 402 | // Get the worker core. If none is set, then blocking is fine! |
| 403 | let mut core = match cx.core.borrow_mut().take() { |
| 404 | Some(core) => core, |
| 405 | None => return Ok(()), |
| 406 | }; |
| 407 | |
| 408 | // If we heavily call `spawn_blocking`, there might be no available thread to |
| 409 | // run this core. Except for the task in the lifo_slot, all tasks can be |
| 410 | // stolen, so we move the task out of the lifo_slot to the run_queue. |
| 411 | if let Some(task) = core.lifo_slot.take() { |
| 412 | core.run_queue |
| 413 | .push_back_or_overflow(task, &*cx.worker.handle, &mut core.stats); |
| 414 | } |
| 415 | |
| 416 | // We are taking the core from the context and sending it to another |
| 417 | // thread. |
| 418 | take_core = true; |
| 419 | |
| 420 | // The parker should be set here |
| 421 | assert!(core.park.is_some()); |
| 422 | |
| 423 | // In order to block, the core must be sent to another thread for |
| 424 | // execution. |
| 425 | // |
| 426 | // First, move the core back into the worker's shared core slot. |
| 427 | cx.worker.core.set(core); |
| 428 | |
| 429 | // Next, clone the worker handle and send it to a new thread for |
| 430 | // processing. |
| 431 | // |
| 432 | // Once the blocking task is done executing, we will attempt to |
| 433 | // steal the core back. |
| 434 | let worker = cx.worker.clone(); |
| 435 | runtime::spawn_blocking(move || run(worker)); |
| 436 | Ok(()) |
| 437 | }); |
| 438 | |
| 439 | if let Err(panic_message) = setup_result { |
| 440 | panic!("{}" , panic_message); |
| 441 | } |
| 442 | |
| 443 | if had_entered { |
| 444 | // Unset the current task's budget. Blocking sections are not |
| 445 | // constrained by task budgets. |
| 446 | let _reset = Reset { |
| 447 | take_core, |
| 448 | budget: coop::stop(), |
| 449 | }; |
| 450 | |
| 451 | crate::runtime::context::exit_runtime(f) |
| 452 | } else { |
| 453 | f() |
| 454 | } |
| 455 | } |
| 456 | |
| 457 | impl Launch { |
| 458 | pub(crate) fn launch(mut self) { |
| 459 | for worker: Arc in self.0.drain(..) { |
| 460 | runtime::spawn_blocking(func:move || run(worker)); |
| 461 | } |
| 462 | } |
| 463 | } |
| 464 | |
| 465 | fn run(worker: Arc<Worker>) { |
| 466 | #[allow (dead_code)] |
| 467 | struct AbortOnPanic; |
| 468 | |
| 469 | impl Drop for AbortOnPanic { |
| 470 | fn drop(&mut self) { |
| 471 | if std::thread::panicking() { |
| 472 | eprintln!("worker thread panicking; aborting process" ); |
| 473 | std::process::abort(); |
| 474 | } |
| 475 | } |
| 476 | } |
| 477 | |
| 478 | // Catching panics on worker threads in tests is quite tricky. Instead, when |
| 479 | // debug assertions are enabled, we just abort the process. |
| 480 | #[cfg (debug_assertions)] |
| 481 | let _abort_on_panic = AbortOnPanic; |
| 482 | |
| 483 | // Acquire a core. If this fails, then another thread is running this |
| 484 | // worker and there is nothing further to do. |
| 485 | let core = match worker.core.take() { |
| 486 | Some(core) => core, |
| 487 | None => return, |
| 488 | }; |
| 489 | |
| 490 | worker.handle.shared.worker_metrics[worker.index].set_thread_id(thread::current().id()); |
| 491 | |
| 492 | let handle = scheduler::Handle::MultiThread(worker.handle.clone()); |
| 493 | |
| 494 | crate::runtime::context::enter_runtime(&handle, true, |_| { |
| 495 | // Set the worker context. |
| 496 | let cx = scheduler::Context::MultiThread(Context { |
| 497 | worker, |
| 498 | core: RefCell::new(None), |
| 499 | defer: Defer::new(), |
| 500 | }); |
| 501 | |
| 502 | context::set_scheduler(&cx, || { |
| 503 | let cx = cx.expect_multi_thread(); |
| 504 | |
| 505 | // This should always be an error. It only returns a `Result` to support |
| 506 | // using `?` to short circuit. |
| 507 | assert!(cx.run(core).is_err()); |
| 508 | |
| 509 | // Check if there are any deferred tasks to notify. This can happen when |
| 510 | // the worker core is lost due to `block_in_place()` being called from |
| 511 | // within the task. |
| 512 | cx.defer.wake(); |
| 513 | }); |
| 514 | }); |
| 515 | } |
| 516 | |
| 517 | impl Context { |
| 518 | fn run(&self, mut core: Box<Core>) -> RunResult { |
| 519 | // Reset `lifo_enabled` here in case the core was previously stolen from |
| 520 | // a task that had the LIFO slot disabled. |
| 521 | self.reset_lifo_enabled(&mut core); |
| 522 | |
| 523 | // Start as "processing" tasks as polling tasks from the local queue |
| 524 | // will be one of the first things we do. |
| 525 | core.stats.start_processing_scheduled_tasks(); |
| 526 | |
| 527 | while !core.is_shutdown { |
| 528 | self.assert_lifo_enabled_is_correct(&core); |
| 529 | |
| 530 | if core.is_traced { |
| 531 | core = self.worker.handle.trace_core(core); |
| 532 | } |
| 533 | |
| 534 | // Increment the tick |
| 535 | core.tick(); |
| 536 | |
| 537 | // Run maintenance, if needed |
| 538 | core = self.maintenance(core); |
| 539 | |
| 540 | // First, check work available to the current worker. |
| 541 | if let Some(task) = core.next_task(&self.worker) { |
| 542 | core = self.run_task(task, core)?; |
| 543 | continue; |
| 544 | } |
| 545 | |
| 546 | // We consumed all work in the queues and will start searching for work. |
| 547 | core.stats.end_processing_scheduled_tasks(); |
| 548 | |
| 549 | // There is no more **local** work to process, try to steal work |
| 550 | // from other workers. |
| 551 | if let Some(task) = core.steal_work(&self.worker) { |
| 552 | // Found work, switch back to processing |
| 553 | core.stats.start_processing_scheduled_tasks(); |
| 554 | core = self.run_task(task, core)?; |
| 555 | } else { |
| 556 | // Wait for work |
| 557 | core = if !self.defer.is_empty() { |
| 558 | self.park_timeout(core, Some(Duration::from_millis(0))) |
| 559 | } else { |
| 560 | self.park(core) |
| 561 | }; |
| 562 | core.stats.start_processing_scheduled_tasks(); |
| 563 | } |
| 564 | } |
| 565 | |
| 566 | core.pre_shutdown(&self.worker); |
| 567 | // Signal shutdown |
| 568 | self.worker.handle.shutdown_core(core); |
| 569 | Err(()) |
| 570 | } |
| 571 | |
| 572 | fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult { |
| 573 | #[cfg (tokio_unstable)] |
| 574 | let task_id = task.task_id(); |
| 575 | |
| 576 | let task = self.worker.handle.shared.owned.assert_owner(task); |
| 577 | |
| 578 | // Make sure the worker is not in the **searching** state. This enables |
| 579 | // another idle worker to try to steal work. |
| 580 | core.transition_from_searching(&self.worker); |
| 581 | |
| 582 | self.assert_lifo_enabled_is_correct(&core); |
| 583 | |
| 584 | // Measure the poll start time. Note that we may end up polling other |
| 585 | // tasks under this measurement. In this case, the tasks came from the |
| 586 | // LIFO slot and are considered part of the current task for scheduling |
| 587 | // purposes. These tasks inherent the "parent"'s limits. |
| 588 | core.stats.start_poll(); |
| 589 | |
| 590 | // Make the core available to the runtime context |
| 591 | *self.core.borrow_mut() = Some(core); |
| 592 | |
| 593 | // Run the task |
| 594 | coop::budget(|| { |
| 595 | // Unlike the poll time above, poll start callback is attached to the task id, |
| 596 | // so it is tightly associated with the actual poll invocation. |
| 597 | #[cfg (tokio_unstable)] |
| 598 | self.worker.handle.task_hooks.poll_start_callback(task_id); |
| 599 | |
| 600 | task.run(); |
| 601 | |
| 602 | #[cfg (tokio_unstable)] |
| 603 | self.worker.handle.task_hooks.poll_stop_callback(task_id); |
| 604 | |
| 605 | let mut lifo_polls = 0; |
| 606 | |
| 607 | // As long as there is budget remaining and a task exists in the |
| 608 | // `lifo_slot`, then keep running. |
| 609 | loop { |
| 610 | // Check if we still have the core. If not, the core was stolen |
| 611 | // by another worker. |
| 612 | let mut core = match self.core.borrow_mut().take() { |
| 613 | Some(core) => core, |
| 614 | None => { |
| 615 | // In this case, we cannot call `reset_lifo_enabled()` |
| 616 | // because the core was stolen. The stealer will handle |
| 617 | // that at the top of `Context::run` |
| 618 | return Err(()); |
| 619 | } |
| 620 | }; |
| 621 | |
| 622 | // Check for a task in the LIFO slot |
| 623 | let task = match core.lifo_slot.take() { |
| 624 | Some(task) => task, |
| 625 | None => { |
| 626 | self.reset_lifo_enabled(&mut core); |
| 627 | core.stats.end_poll(); |
| 628 | return Ok(core); |
| 629 | } |
| 630 | }; |
| 631 | |
| 632 | if !coop::has_budget_remaining() { |
| 633 | core.stats.end_poll(); |
| 634 | |
| 635 | // Not enough budget left to run the LIFO task, push it to |
| 636 | // the back of the queue and return. |
| 637 | core.run_queue.push_back_or_overflow( |
| 638 | task, |
| 639 | &*self.worker.handle, |
| 640 | &mut core.stats, |
| 641 | ); |
| 642 | // If we hit this point, the LIFO slot should be enabled. |
| 643 | // There is no need to reset it. |
| 644 | debug_assert!(core.lifo_enabled); |
| 645 | return Ok(core); |
| 646 | } |
| 647 | |
| 648 | // Track that we are about to run a task from the LIFO slot. |
| 649 | lifo_polls += 1; |
| 650 | super::counters::inc_lifo_schedules(); |
| 651 | |
| 652 | // Disable the LIFO slot if we reach our limit |
| 653 | // |
| 654 | // In ping-ping style workloads where task A notifies task B, |
| 655 | // which notifies task A again, continuously prioritizing the |
| 656 | // LIFO slot can cause starvation as these two tasks will |
| 657 | // repeatedly schedule the other. To mitigate this, we limit the |
| 658 | // number of times the LIFO slot is prioritized. |
| 659 | if lifo_polls >= MAX_LIFO_POLLS_PER_TICK { |
| 660 | core.lifo_enabled = false; |
| 661 | super::counters::inc_lifo_capped(); |
| 662 | } |
| 663 | |
| 664 | // Run the LIFO task, then loop |
| 665 | *self.core.borrow_mut() = Some(core); |
| 666 | let task = self.worker.handle.shared.owned.assert_owner(task); |
| 667 | |
| 668 | #[cfg (tokio_unstable)] |
| 669 | let task_id = task.task_id(); |
| 670 | |
| 671 | #[cfg (tokio_unstable)] |
| 672 | self.worker.handle.task_hooks.poll_start_callback(task_id); |
| 673 | |
| 674 | task.run(); |
| 675 | |
| 676 | #[cfg (tokio_unstable)] |
| 677 | self.worker.handle.task_hooks.poll_stop_callback(task_id); |
| 678 | } |
| 679 | }) |
| 680 | } |
| 681 | |
| 682 | fn reset_lifo_enabled(&self, core: &mut Core) { |
| 683 | core.lifo_enabled = !self.worker.handle.shared.config.disable_lifo_slot; |
| 684 | } |
| 685 | |
| 686 | fn assert_lifo_enabled_is_correct(&self, core: &Core) { |
| 687 | debug_assert_eq!( |
| 688 | core.lifo_enabled, |
| 689 | !self.worker.handle.shared.config.disable_lifo_slot |
| 690 | ); |
| 691 | } |
| 692 | |
| 693 | fn maintenance(&self, mut core: Box<Core>) -> Box<Core> { |
| 694 | if core.tick % self.worker.handle.shared.config.event_interval == 0 { |
| 695 | super::counters::inc_num_maintenance(); |
| 696 | |
| 697 | core.stats.end_processing_scheduled_tasks(); |
| 698 | |
| 699 | // Call `park` with a 0 timeout. This enables the I/O driver, timer, ... |
| 700 | // to run without actually putting the thread to sleep. |
| 701 | core = self.park_timeout(core, Some(Duration::from_millis(0))); |
| 702 | |
| 703 | // Run regularly scheduled maintenance |
| 704 | core.maintenance(&self.worker); |
| 705 | |
| 706 | core.stats.start_processing_scheduled_tasks(); |
| 707 | } |
| 708 | |
| 709 | core |
| 710 | } |
| 711 | |
| 712 | /// Parks the worker thread while waiting for tasks to execute. |
| 713 | /// |
| 714 | /// This function checks if indeed there's no more work left to be done before parking. |
| 715 | /// Also important to notice that, before parking, the worker thread will try to take |
| 716 | /// ownership of the Driver (IO/Time) and dispatch any events that might have fired. |
| 717 | /// Whenever a worker thread executes the Driver loop, all waken tasks are scheduled |
| 718 | /// in its own local queue until the queue saturates (ntasks > `LOCAL_QUEUE_CAPACITY`). |
| 719 | /// When the local queue is saturated, the overflow tasks are added to the injection queue |
| 720 | /// from where other workers can pick them up. |
| 721 | /// Also, we rely on the workstealing algorithm to spread the tasks amongst workers |
| 722 | /// after all the IOs get dispatched |
| 723 | fn park(&self, mut core: Box<Core>) -> Box<Core> { |
| 724 | if let Some(f) = &self.worker.handle.shared.config.before_park { |
| 725 | f(); |
| 726 | } |
| 727 | |
| 728 | if core.transition_to_parked(&self.worker) { |
| 729 | while !core.is_shutdown && !core.is_traced { |
| 730 | core.stats.about_to_park(); |
| 731 | core.stats |
| 732 | .submit(&self.worker.handle.shared.worker_metrics[self.worker.index]); |
| 733 | |
| 734 | core = self.park_timeout(core, None); |
| 735 | |
| 736 | core.stats.unparked(); |
| 737 | |
| 738 | // Run regularly scheduled maintenance |
| 739 | core.maintenance(&self.worker); |
| 740 | |
| 741 | if core.transition_from_parked(&self.worker) { |
| 742 | break; |
| 743 | } |
| 744 | } |
| 745 | } |
| 746 | |
| 747 | if let Some(f) = &self.worker.handle.shared.config.after_unpark { |
| 748 | f(); |
| 749 | } |
| 750 | core |
| 751 | } |
| 752 | |
| 753 | fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> { |
| 754 | self.assert_lifo_enabled_is_correct(&core); |
| 755 | |
| 756 | // Take the parker out of core |
| 757 | let mut park = core.park.take().expect("park missing" ); |
| 758 | |
| 759 | // Store `core` in context |
| 760 | *self.core.borrow_mut() = Some(core); |
| 761 | |
| 762 | // Park thread |
| 763 | if let Some(timeout) = duration { |
| 764 | park.park_timeout(&self.worker.handle.driver, timeout); |
| 765 | } else { |
| 766 | park.park(&self.worker.handle.driver); |
| 767 | } |
| 768 | |
| 769 | self.defer.wake(); |
| 770 | |
| 771 | // Remove `core` from context |
| 772 | core = self.core.borrow_mut().take().expect("core missing" ); |
| 773 | |
| 774 | // Place `park` back in `core` |
| 775 | core.park = Some(park); |
| 776 | |
| 777 | if core.should_notify_others() { |
| 778 | self.worker.handle.notify_parked_local(); |
| 779 | } |
| 780 | |
| 781 | core |
| 782 | } |
| 783 | |
| 784 | pub(crate) fn defer(&self, waker: &Waker) { |
| 785 | if self.core.borrow().is_none() { |
| 786 | // If there is no core, then the worker is currently in a block_in_place. In this case, |
| 787 | // we cannot use the defer queue as we aren't really in the current runtime. |
| 788 | waker.wake_by_ref(); |
| 789 | } else { |
| 790 | self.defer.defer(waker); |
| 791 | } |
| 792 | } |
| 793 | |
| 794 | #[allow (dead_code)] |
| 795 | pub(crate) fn get_worker_index(&self) -> usize { |
| 796 | self.worker.index |
| 797 | } |
| 798 | } |
| 799 | |
| 800 | impl Core { |
| 801 | /// Increment the tick |
| 802 | fn tick(&mut self) { |
| 803 | self.tick = self.tick.wrapping_add(1); |
| 804 | } |
| 805 | |
| 806 | /// Return the next notified task available to this worker. |
| 807 | fn next_task(&mut self, worker: &Worker) -> Option<Notified> { |
| 808 | if self.tick % self.global_queue_interval == 0 { |
| 809 | // Update the global queue interval, if needed |
| 810 | self.tune_global_queue_interval(worker); |
| 811 | |
| 812 | worker |
| 813 | .handle |
| 814 | .next_remote_task() |
| 815 | .or_else(|| self.next_local_task()) |
| 816 | } else { |
| 817 | let maybe_task = self.next_local_task(); |
| 818 | |
| 819 | if maybe_task.is_some() { |
| 820 | return maybe_task; |
| 821 | } |
| 822 | |
| 823 | if worker.inject().is_empty() { |
| 824 | return None; |
| 825 | } |
| 826 | |
| 827 | // Other threads can only **remove** tasks from the current worker's |
| 828 | // `run_queue`. So, we can be confident that by the time we call |
| 829 | // `run_queue.push_back` below, there will be *at least* `cap` |
| 830 | // available slots in the queue. |
| 831 | let cap = usize::min( |
| 832 | self.run_queue.remaining_slots(), |
| 833 | self.run_queue.max_capacity() / 2, |
| 834 | ); |
| 835 | |
| 836 | // The worker is currently idle, pull a batch of work from the |
| 837 | // injection queue. We don't want to pull *all* the work so other |
| 838 | // workers can also get some. |
| 839 | let n = usize::min( |
| 840 | worker.inject().len() / worker.handle.shared.remotes.len() + 1, |
| 841 | cap, |
| 842 | ); |
| 843 | |
| 844 | // Take at least one task since the first task is returned directly |
| 845 | // and not pushed onto the local queue. |
| 846 | let n = usize::max(1, n); |
| 847 | |
| 848 | let mut synced = worker.handle.shared.synced.lock(); |
| 849 | // safety: passing in the correct `inject::Synced`. |
| 850 | let mut tasks = unsafe { worker.inject().pop_n(&mut synced.inject, n) }; |
| 851 | |
| 852 | // Pop the first task to return immediately |
| 853 | let ret = tasks.next(); |
| 854 | |
| 855 | // Push the rest of the on the run queue |
| 856 | self.run_queue.push_back(tasks); |
| 857 | |
| 858 | ret |
| 859 | } |
| 860 | } |
| 861 | |
| 862 | fn next_local_task(&mut self) -> Option<Notified> { |
| 863 | self.lifo_slot.take().or_else(|| self.run_queue.pop()) |
| 864 | } |
| 865 | |
| 866 | /// Function responsible for stealing tasks from another worker |
| 867 | /// |
| 868 | /// Note: Only if less than half the workers are searching for tasks to steal |
| 869 | /// a new worker will actually try to steal. The idea is to make sure not all |
| 870 | /// workers will be trying to steal at the same time. |
| 871 | fn steal_work(&mut self, worker: &Worker) -> Option<Notified> { |
| 872 | if !self.transition_to_searching(worker) { |
| 873 | return None; |
| 874 | } |
| 875 | |
| 876 | let num = worker.handle.shared.remotes.len(); |
| 877 | // Start from a random worker |
| 878 | let start = self.rand.fastrand_n(num as u32) as usize; |
| 879 | |
| 880 | for i in 0..num { |
| 881 | let i = (start + i) % num; |
| 882 | |
| 883 | // Don't steal from ourself! We know we don't have work. |
| 884 | if i == worker.index { |
| 885 | continue; |
| 886 | } |
| 887 | |
| 888 | let target = &worker.handle.shared.remotes[i]; |
| 889 | if let Some(task) = target |
| 890 | .steal |
| 891 | .steal_into(&mut self.run_queue, &mut self.stats) |
| 892 | { |
| 893 | return Some(task); |
| 894 | } |
| 895 | } |
| 896 | |
| 897 | // Fallback on checking the global queue |
| 898 | worker.handle.next_remote_task() |
| 899 | } |
| 900 | |
| 901 | fn transition_to_searching(&mut self, worker: &Worker) -> bool { |
| 902 | if !self.is_searching { |
| 903 | self.is_searching = worker.handle.shared.idle.transition_worker_to_searching(); |
| 904 | } |
| 905 | |
| 906 | self.is_searching |
| 907 | } |
| 908 | |
| 909 | fn transition_from_searching(&mut self, worker: &Worker) { |
| 910 | if !self.is_searching { |
| 911 | return; |
| 912 | } |
| 913 | |
| 914 | self.is_searching = false; |
| 915 | worker.handle.transition_worker_from_searching(); |
| 916 | } |
| 917 | |
| 918 | fn has_tasks(&self) -> bool { |
| 919 | self.lifo_slot.is_some() || self.run_queue.has_tasks() |
| 920 | } |
| 921 | |
| 922 | fn should_notify_others(&self) -> bool { |
| 923 | // If there are tasks available to steal, but this worker is not |
| 924 | // looking for tasks to steal, notify another worker. |
| 925 | if self.is_searching { |
| 926 | return false; |
| 927 | } |
| 928 | self.lifo_slot.is_some() as usize + self.run_queue.len() > 1 |
| 929 | } |
| 930 | |
| 931 | /// Prepares the worker state for parking. |
| 932 | /// |
| 933 | /// Returns true if the transition happened, false if there is work to do first. |
| 934 | fn transition_to_parked(&mut self, worker: &Worker) -> bool { |
| 935 | // Workers should not park if they have work to do |
| 936 | if self.has_tasks() || self.is_traced { |
| 937 | return false; |
| 938 | } |
| 939 | |
| 940 | // When the final worker transitions **out** of searching to parked, it |
| 941 | // must check all the queues one last time in case work materialized |
| 942 | // between the last work scan and transitioning out of searching. |
| 943 | let is_last_searcher = worker.handle.shared.idle.transition_worker_to_parked( |
| 944 | &worker.handle.shared, |
| 945 | worker.index, |
| 946 | self.is_searching, |
| 947 | ); |
| 948 | |
| 949 | // The worker is no longer searching. Setting this is the local cache |
| 950 | // only. |
| 951 | self.is_searching = false; |
| 952 | |
| 953 | if is_last_searcher { |
| 954 | worker.handle.notify_if_work_pending(); |
| 955 | } |
| 956 | |
| 957 | true |
| 958 | } |
| 959 | |
| 960 | /// Returns `true` if the transition happened. |
| 961 | fn transition_from_parked(&mut self, worker: &Worker) -> bool { |
| 962 | // If a task is in the lifo slot/run queue, then we must unpark regardless of |
| 963 | // being notified |
| 964 | if self.has_tasks() { |
| 965 | // When a worker wakes, it should only transition to the "searching" |
| 966 | // state when the wake originates from another worker *or* a new task |
| 967 | // is pushed. We do *not* want the worker to transition to "searching" |
| 968 | // when it wakes when the I/O driver receives new events. |
| 969 | self.is_searching = !worker |
| 970 | .handle |
| 971 | .shared |
| 972 | .idle |
| 973 | .unpark_worker_by_id(&worker.handle.shared, worker.index); |
| 974 | return true; |
| 975 | } |
| 976 | |
| 977 | if worker |
| 978 | .handle |
| 979 | .shared |
| 980 | .idle |
| 981 | .is_parked(&worker.handle.shared, worker.index) |
| 982 | { |
| 983 | return false; |
| 984 | } |
| 985 | |
| 986 | // When unparked, the worker is in the searching state. |
| 987 | self.is_searching = true; |
| 988 | true |
| 989 | } |
| 990 | |
| 991 | /// Runs maintenance work such as checking the pool's state. |
| 992 | fn maintenance(&mut self, worker: &Worker) { |
| 993 | self.stats |
| 994 | .submit(&worker.handle.shared.worker_metrics[worker.index]); |
| 995 | |
| 996 | if !self.is_shutdown { |
| 997 | // Check if the scheduler has been shutdown |
| 998 | let synced = worker.handle.shared.synced.lock(); |
| 999 | self.is_shutdown = worker.inject().is_closed(&synced.inject); |
| 1000 | } |
| 1001 | |
| 1002 | if !self.is_traced { |
| 1003 | // Check if the worker should be tracing. |
| 1004 | self.is_traced = worker.handle.shared.trace_status.trace_requested(); |
| 1005 | } |
| 1006 | } |
| 1007 | |
| 1008 | /// Signals all tasks to shut down, and waits for them to complete. Must run |
| 1009 | /// before we enter the single-threaded phase of shutdown processing. |
| 1010 | fn pre_shutdown(&mut self, worker: &Worker) { |
| 1011 | // Start from a random inner list |
| 1012 | let start = self |
| 1013 | .rand |
| 1014 | .fastrand_n(worker.handle.shared.owned.get_shard_size() as u32); |
| 1015 | // Signal to all tasks to shut down. |
| 1016 | worker |
| 1017 | .handle |
| 1018 | .shared |
| 1019 | .owned |
| 1020 | .close_and_shutdown_all(start as usize); |
| 1021 | |
| 1022 | self.stats |
| 1023 | .submit(&worker.handle.shared.worker_metrics[worker.index]); |
| 1024 | } |
| 1025 | |
| 1026 | /// Shuts down the core. |
| 1027 | fn shutdown(&mut self, handle: &Handle) { |
| 1028 | // Take the core |
| 1029 | let mut park = self.park.take().expect("park missing" ); |
| 1030 | |
| 1031 | // Drain the queue |
| 1032 | while self.next_local_task().is_some() {} |
| 1033 | |
| 1034 | park.shutdown(&handle.driver); |
| 1035 | } |
| 1036 | |
| 1037 | fn tune_global_queue_interval(&mut self, worker: &Worker) { |
| 1038 | let next = self |
| 1039 | .stats |
| 1040 | .tuned_global_queue_interval(&worker.handle.shared.config); |
| 1041 | |
| 1042 | // Smooth out jitter |
| 1043 | if u32::abs_diff(self.global_queue_interval, next) > 2 { |
| 1044 | self.global_queue_interval = next; |
| 1045 | } |
| 1046 | } |
| 1047 | } |
| 1048 | |
| 1049 | impl Worker { |
| 1050 | /// Returns a reference to the scheduler's injection queue. |
| 1051 | fn inject(&self) -> &inject::Shared<Arc<Handle>> { |
| 1052 | &self.handle.shared.inject |
| 1053 | } |
| 1054 | } |
| 1055 | |
| 1056 | // TODO: Move `Handle` impls into handle.rs |
| 1057 | impl task::Schedule for Arc<Handle> { |
| 1058 | fn release(&self, task: &Task) -> Option<Task> { |
| 1059 | self.shared.owned.remove(task) |
| 1060 | } |
| 1061 | |
| 1062 | fn schedule(&self, task: Notified) { |
| 1063 | self.schedule_task(task, is_yield:false); |
| 1064 | } |
| 1065 | |
| 1066 | fn hooks(&self) -> TaskHarnessScheduleHooks { |
| 1067 | TaskHarnessScheduleHooks { |
| 1068 | task_terminate_callback: self.task_hooks.task_terminate_callback.clone(), |
| 1069 | } |
| 1070 | } |
| 1071 | |
| 1072 | fn yield_now(&self, task: Notified) { |
| 1073 | self.schedule_task(task, is_yield:true); |
| 1074 | } |
| 1075 | } |
| 1076 | |
| 1077 | impl Handle { |
| 1078 | pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) { |
| 1079 | with_current(|maybe_cx| { |
| 1080 | if let Some(cx) = maybe_cx { |
| 1081 | // Make sure the task is part of the **current** scheduler. |
| 1082 | if self.ptr_eq(&cx.worker.handle) { |
| 1083 | // And the current thread still holds a core |
| 1084 | if let Some(core) = cx.core.borrow_mut().as_mut() { |
| 1085 | self.schedule_local(core, task, is_yield); |
| 1086 | return; |
| 1087 | } |
| 1088 | } |
| 1089 | } |
| 1090 | |
| 1091 | // Otherwise, use the inject queue. |
| 1092 | self.push_remote_task(task); |
| 1093 | self.notify_parked_remote(); |
| 1094 | }); |
| 1095 | } |
| 1096 | |
| 1097 | pub(super) fn schedule_option_task_without_yield(&self, task: Option<Notified>) { |
| 1098 | if let Some(task) = task { |
| 1099 | self.schedule_task(task, false); |
| 1100 | } |
| 1101 | } |
| 1102 | |
| 1103 | fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) { |
| 1104 | core.stats.inc_local_schedule_count(); |
| 1105 | |
| 1106 | // Spawning from the worker thread. If scheduling a "yield" then the |
| 1107 | // task must always be pushed to the back of the queue, enabling other |
| 1108 | // tasks to be executed. If **not** a yield, then there is more |
| 1109 | // flexibility and the task may go to the front of the queue. |
| 1110 | let should_notify = if is_yield || !core.lifo_enabled { |
| 1111 | core.run_queue |
| 1112 | .push_back_or_overflow(task, self, &mut core.stats); |
| 1113 | true |
| 1114 | } else { |
| 1115 | // Push to the LIFO slot |
| 1116 | let prev = core.lifo_slot.take(); |
| 1117 | let ret = prev.is_some(); |
| 1118 | |
| 1119 | if let Some(prev) = prev { |
| 1120 | core.run_queue |
| 1121 | .push_back_or_overflow(prev, self, &mut core.stats); |
| 1122 | } |
| 1123 | |
| 1124 | core.lifo_slot = Some(task); |
| 1125 | |
| 1126 | ret |
| 1127 | }; |
| 1128 | |
| 1129 | // Only notify if not currently parked. If `park` is `None`, then the |
| 1130 | // scheduling is from a resource driver. As notifications often come in |
| 1131 | // batches, the notification is delayed until the park is complete. |
| 1132 | if should_notify && core.park.is_some() { |
| 1133 | self.notify_parked_local(); |
| 1134 | } |
| 1135 | } |
| 1136 | |
| 1137 | fn next_remote_task(&self) -> Option<Notified> { |
| 1138 | if self.shared.inject.is_empty() { |
| 1139 | return None; |
| 1140 | } |
| 1141 | |
| 1142 | let mut synced = self.shared.synced.lock(); |
| 1143 | // safety: passing in correct `idle::Synced` |
| 1144 | unsafe { self.shared.inject.pop(&mut synced.inject) } |
| 1145 | } |
| 1146 | |
| 1147 | fn push_remote_task(&self, task: Notified) { |
| 1148 | self.shared.scheduler_metrics.inc_remote_schedule_count(); |
| 1149 | |
| 1150 | let mut synced = self.shared.synced.lock(); |
| 1151 | // safety: passing in correct `idle::Synced` |
| 1152 | unsafe { |
| 1153 | self.shared.inject.push(&mut synced.inject, task); |
| 1154 | } |
| 1155 | } |
| 1156 | |
| 1157 | pub(super) fn close(&self) { |
| 1158 | if self |
| 1159 | .shared |
| 1160 | .inject |
| 1161 | .close(&mut self.shared.synced.lock().inject) |
| 1162 | { |
| 1163 | self.notify_all(); |
| 1164 | } |
| 1165 | } |
| 1166 | |
| 1167 | fn notify_parked_local(&self) { |
| 1168 | super::counters::inc_num_inc_notify_local(); |
| 1169 | |
| 1170 | if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) { |
| 1171 | super::counters::inc_num_unparks_local(); |
| 1172 | self.shared.remotes[index].unpark.unpark(&self.driver); |
| 1173 | } |
| 1174 | } |
| 1175 | |
| 1176 | fn notify_parked_remote(&self) { |
| 1177 | if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) { |
| 1178 | self.shared.remotes[index].unpark.unpark(&self.driver); |
| 1179 | } |
| 1180 | } |
| 1181 | |
| 1182 | pub(super) fn notify_all(&self) { |
| 1183 | for remote in &self.shared.remotes[..] { |
| 1184 | remote.unpark.unpark(&self.driver); |
| 1185 | } |
| 1186 | } |
| 1187 | |
| 1188 | fn notify_if_work_pending(&self) { |
| 1189 | for remote in &self.shared.remotes[..] { |
| 1190 | if !remote.steal.is_empty() { |
| 1191 | self.notify_parked_local(); |
| 1192 | return; |
| 1193 | } |
| 1194 | } |
| 1195 | |
| 1196 | if !self.shared.inject.is_empty() { |
| 1197 | self.notify_parked_local(); |
| 1198 | } |
| 1199 | } |
| 1200 | |
| 1201 | fn transition_worker_from_searching(&self) { |
| 1202 | if self.shared.idle.transition_worker_from_searching() { |
| 1203 | // We are the final searching worker. Because work was found, we |
| 1204 | // need to notify another worker. |
| 1205 | self.notify_parked_local(); |
| 1206 | } |
| 1207 | } |
| 1208 | |
| 1209 | /// Signals that a worker has observed the shutdown signal and has replaced |
| 1210 | /// its core back into its handle. |
| 1211 | /// |
| 1212 | /// If all workers have reached this point, the final cleanup is performed. |
| 1213 | fn shutdown_core(&self, core: Box<Core>) { |
| 1214 | let mut cores = self.shared.shutdown_cores.lock(); |
| 1215 | cores.push(core); |
| 1216 | |
| 1217 | if cores.len() != self.shared.remotes.len() { |
| 1218 | return; |
| 1219 | } |
| 1220 | |
| 1221 | debug_assert!(self.shared.owned.is_empty()); |
| 1222 | |
| 1223 | for mut core in cores.drain(..) { |
| 1224 | core.shutdown(self); |
| 1225 | } |
| 1226 | |
| 1227 | // Drain the injection queue |
| 1228 | // |
| 1229 | // We already shut down every task, so we can simply drop the tasks. |
| 1230 | while let Some(task) = self.next_remote_task() { |
| 1231 | drop(task); |
| 1232 | } |
| 1233 | } |
| 1234 | |
| 1235 | fn ptr_eq(&self, other: &Handle) -> bool { |
| 1236 | std::ptr::eq(self, other) |
| 1237 | } |
| 1238 | } |
| 1239 | |
| 1240 | impl Overflow<Arc<Handle>> for Handle { |
| 1241 | fn push(&self, task: task::Notified<Arc<Handle>>) { |
| 1242 | self.push_remote_task(task); |
| 1243 | } |
| 1244 | |
| 1245 | fn push_batch<I>(&self, iter: I) |
| 1246 | where |
| 1247 | I: Iterator<Item = task::Notified<Arc<Handle>>>, |
| 1248 | { |
| 1249 | unsafe { |
| 1250 | self.shared.inject.push_batch(self, iter); |
| 1251 | } |
| 1252 | } |
| 1253 | } |
| 1254 | |
| 1255 | pub(crate) struct InjectGuard<'a> { |
| 1256 | lock: crate::loom::sync::MutexGuard<'a, Synced>, |
| 1257 | } |
| 1258 | |
| 1259 | impl<'a> AsMut<inject::Synced> for InjectGuard<'a> { |
| 1260 | fn as_mut(&mut self) -> &mut inject::Synced { |
| 1261 | &mut self.lock.inject |
| 1262 | } |
| 1263 | } |
| 1264 | |
| 1265 | impl<'a> Lock<inject::Synced> for &'a Handle { |
| 1266 | type Handle = InjectGuard<'a>; |
| 1267 | |
| 1268 | fn lock(self) -> Self::Handle { |
| 1269 | InjectGuard { |
| 1270 | lock: self.shared.synced.lock(), |
| 1271 | } |
| 1272 | } |
| 1273 | } |
| 1274 | |
| 1275 | #[track_caller ] |
| 1276 | fn with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R { |
| 1277 | use scheduler::Context::MultiThread; |
| 1278 | |
| 1279 | context::with_scheduler(|ctx: Option<&Context>| match ctx { |
| 1280 | Some(MultiThread(ctx: &Context)) => f(Some(ctx)), |
| 1281 | _ => f(None), |
| 1282 | }) |
| 1283 | } |
| 1284 | |