1 | //! A scheduler is initialized with a fixed number of workers. Each worker is |
2 | //! driven by a thread. Each worker has a "core" which contains data such as the |
3 | //! run queue and other state. When `block_in_place` is called, the worker's |
4 | //! "core" is handed off to a new thread allowing the scheduler to continue to |
5 | //! make progress while the originating thread blocks. |
6 | //! |
7 | //! # Shutdown |
8 | //! |
9 | //! Shutting down the runtime involves the following steps: |
10 | //! |
11 | //! 1. The Shared::close method is called. This closes the inject queue and |
12 | //! `OwnedTasks` instance and wakes up all worker threads. |
13 | //! |
14 | //! 2. Each worker thread observes the close signal next time it runs |
15 | //! Core::maintenance by checking whether the inject queue is closed. |
16 | //! The `Core::is_shutdown` flag is set to true. |
17 | //! |
18 | //! 3. The worker thread calls `pre_shutdown` in parallel. Here, the worker |
19 | //! will keep removing tasks from `OwnedTasks` until it is empty. No new |
20 | //! tasks can be pushed to the `OwnedTasks` during or after this step as it |
21 | //! was closed in step 1. |
22 | //! |
23 | //! 5. The workers call Shared::shutdown to enter the single-threaded phase of |
24 | //! shutdown. These calls will push their core to `Shared::shutdown_cores`, |
25 | //! and the last thread to push its core will finish the shutdown procedure. |
26 | //! |
27 | //! 6. The local run queue of each core is emptied, then the inject queue is |
28 | //! emptied. |
29 | //! |
30 | //! At this point, shutdown has completed. It is not possible for any of the |
31 | //! collections to contain any tasks at this point, as each collection was |
32 | //! closed first, then emptied afterwards. |
33 | //! |
34 | //! ## Spawns during shutdown |
35 | //! |
36 | //! When spawning tasks during shutdown, there are two cases: |
37 | //! |
38 | //! * The spawner observes the `OwnedTasks` being open, and the inject queue is |
39 | //! closed. |
40 | //! * The spawner observes the `OwnedTasks` being closed and doesn't check the |
41 | //! inject queue. |
42 | //! |
43 | //! The first case can only happen if the `OwnedTasks::bind` call happens before |
44 | //! or during step 1 of shutdown. In this case, the runtime will clean up the |
45 | //! task in step 3 of shutdown. |
46 | //! |
47 | //! In the latter case, the task was not spawned and the task is immediately |
48 | //! cancelled by the spawner. |
49 | //! |
50 | //! The correctness of shutdown requires both the inject queue and `OwnedTasks` |
51 | //! collection to have a closed bit. With a close bit on only the inject queue, |
52 | //! spawning could run in to a situation where a task is successfully bound long |
53 | //! after the runtime has shut down. With a close bit on only the `OwnedTasks`, |
54 | //! the first spawning situation could result in the notification being pushed |
55 | //! to the inject queue after step 6 of shutdown, which would leave a task in |
56 | //! the inject queue indefinitely. This would be a ref-count cycle and a memory |
57 | //! leak. |
58 | |
59 | use crate::loom::sync::{Arc, Mutex}; |
60 | use crate::runtime; |
61 | use crate::runtime::context; |
62 | use crate::runtime::scheduler::multi_thread::{ |
63 | idle, queue, Counters, Handle, Idle, Overflow, Parker, Stats, TraceStatus, Unparker, |
64 | }; |
65 | use crate::runtime::scheduler::{inject, Defer, Lock}; |
66 | use crate::runtime::task::OwnedTasks; |
67 | use crate::runtime::{ |
68 | blocking, coop, driver, scheduler, task, Config, SchedulerMetrics, WorkerMetrics, |
69 | }; |
70 | use crate::util::atomic_cell::AtomicCell; |
71 | use crate::util::rand::{FastRand, RngSeedGenerator}; |
72 | |
73 | use std::cell::RefCell; |
74 | use std::task::Waker; |
75 | use std::time::Duration; |
76 | |
77 | cfg_metrics! { |
78 | mod metrics; |
79 | } |
80 | |
81 | cfg_taskdump! { |
82 | mod taskdump; |
83 | } |
84 | |
85 | cfg_not_taskdump! { |
86 | mod taskdump_mock; |
87 | } |
88 | |
89 | /// A scheduler worker |
90 | pub(super) struct Worker { |
91 | /// Reference to scheduler's handle |
92 | handle: Arc<Handle>, |
93 | |
94 | /// Index holding this worker's remote state |
95 | index: usize, |
96 | |
97 | /// Used to hand-off a worker's core to another thread. |
98 | core: AtomicCell<Core>, |
99 | } |
100 | |
101 | /// Core data |
102 | struct Core { |
103 | /// Used to schedule bookkeeping tasks every so often. |
104 | tick: u32, |
105 | |
106 | /// When a task is scheduled from a worker, it is stored in this slot. The |
107 | /// worker will check this slot for a task **before** checking the run |
108 | /// queue. This effectively results in the **last** scheduled task to be run |
109 | /// next (LIFO). This is an optimization for improving locality which |
110 | /// benefits message passing patterns and helps to reduce latency. |
111 | lifo_slot: Option<Notified>, |
112 | |
113 | /// When `true`, locally scheduled tasks go to the LIFO slot. When `false`, |
114 | /// they go to the back of the `run_queue`. |
115 | lifo_enabled: bool, |
116 | |
117 | /// The worker-local run queue. |
118 | run_queue: queue::Local<Arc<Handle>>, |
119 | |
120 | /// True if the worker is currently searching for more work. Searching |
121 | /// involves attempting to steal from other workers. |
122 | is_searching: bool, |
123 | |
124 | /// True if the scheduler is being shutdown |
125 | is_shutdown: bool, |
126 | |
127 | /// True if the scheduler is being traced |
128 | is_traced: bool, |
129 | |
130 | /// Parker |
131 | /// |
132 | /// Stored in an `Option` as the parker is added / removed to make the |
133 | /// borrow checker happy. |
134 | park: Option<Parker>, |
135 | |
136 | /// Per-worker runtime stats |
137 | stats: Stats, |
138 | |
139 | /// How often to check the global queue |
140 | global_queue_interval: u32, |
141 | |
142 | /// Fast random number generator. |
143 | rand: FastRand, |
144 | } |
145 | |
146 | /// State shared across all workers |
147 | pub(crate) struct Shared { |
148 | /// Per-worker remote state. All other workers have access to this and is |
149 | /// how they communicate between each other. |
150 | remotes: Box<[Remote]>, |
151 | |
152 | /// Global task queue used for: |
153 | /// 1. Submit work to the scheduler while **not** currently on a worker thread. |
154 | /// 2. Submit work to the scheduler when a worker run queue is saturated |
155 | pub(super) inject: inject::Shared<Arc<Handle>>, |
156 | |
157 | /// Coordinates idle workers |
158 | idle: Idle, |
159 | |
160 | /// Collection of all active tasks spawned onto this executor. |
161 | pub(crate) owned: OwnedTasks<Arc<Handle>>, |
162 | |
163 | /// Data synchronized by the scheduler mutex |
164 | pub(super) synced: Mutex<Synced>, |
165 | |
166 | /// Cores that have observed the shutdown signal |
167 | /// |
168 | /// The core is **not** placed back in the worker to avoid it from being |
169 | /// stolen by a thread that was spawned as part of `block_in_place`. |
170 | #[allow (clippy::vec_box)] // we're moving an already-boxed value |
171 | shutdown_cores: Mutex<Vec<Box<Core>>>, |
172 | |
173 | /// The number of cores that have observed the trace signal. |
174 | pub(super) trace_status: TraceStatus, |
175 | |
176 | /// Scheduler configuration options |
177 | config: Config, |
178 | |
179 | /// Collects metrics from the runtime. |
180 | pub(super) scheduler_metrics: SchedulerMetrics, |
181 | |
182 | pub(super) worker_metrics: Box<[WorkerMetrics]>, |
183 | |
184 | /// Only held to trigger some code on drop. This is used to get internal |
185 | /// runtime metrics that can be useful when doing performance |
186 | /// investigations. This does nothing (empty struct, no drop impl) unless |
187 | /// the `tokio_internal_mt_counters` `cfg` flag is set. |
188 | _counters: Counters, |
189 | } |
190 | |
191 | /// Data synchronized by the scheduler mutex |
192 | pub(crate) struct Synced { |
193 | /// Synchronized state for `Idle`. |
194 | pub(super) idle: idle::Synced, |
195 | |
196 | /// Synchronized state for `Inject`. |
197 | pub(crate) inject: inject::Synced, |
198 | } |
199 | |
200 | /// Used to communicate with a worker from other threads. |
201 | struct Remote { |
202 | /// Steals tasks from this worker. |
203 | pub(super) steal: queue::Steal<Arc<Handle>>, |
204 | |
205 | /// Unparks the associated worker thread |
206 | unpark: Unparker, |
207 | } |
208 | |
209 | /// Thread-local context |
210 | pub(crate) struct Context { |
211 | /// Worker |
212 | worker: Arc<Worker>, |
213 | |
214 | /// Core data |
215 | core: RefCell<Option<Box<Core>>>, |
216 | |
217 | /// Tasks to wake after resource drivers are polled. This is mostly to |
218 | /// handle yielded tasks. |
219 | pub(crate) defer: Defer, |
220 | } |
221 | |
222 | /// Starts the workers |
223 | pub(crate) struct Launch(Vec<Arc<Worker>>); |
224 | |
225 | /// Running a task may consume the core. If the core is still available when |
226 | /// running the task completes, it is returned. Otherwise, the worker will need |
227 | /// to stop processing. |
228 | type RunResult = Result<Box<Core>, ()>; |
229 | |
230 | /// A task handle |
231 | type Task = task::Task<Arc<Handle>>; |
232 | |
233 | /// A notified task handle |
234 | type Notified = task::Notified<Arc<Handle>>; |
235 | |
236 | /// Value picked out of thin-air. Running the LIFO slot a handful of times |
237 | /// seems sufficient to benefit from locality. More than 3 times probably is |
238 | /// overweighing. The value can be tuned in the future with data that shows |
239 | /// improvements. |
240 | const MAX_LIFO_POLLS_PER_TICK: usize = 3; |
241 | |
242 | pub(super) fn create( |
243 | size: usize, |
244 | park: Parker, |
245 | driver_handle: driver::Handle, |
246 | blocking_spawner: blocking::Spawner, |
247 | seed_generator: RngSeedGenerator, |
248 | config: Config, |
249 | ) -> (Arc<Handle>, Launch) { |
250 | let mut cores = Vec::with_capacity(size); |
251 | let mut remotes = Vec::with_capacity(size); |
252 | let mut worker_metrics = Vec::with_capacity(size); |
253 | |
254 | // Create the local queues |
255 | for _ in 0..size { |
256 | let (steal, run_queue) = queue::local(); |
257 | |
258 | let park = park.clone(); |
259 | let unpark = park.unpark(); |
260 | let metrics = WorkerMetrics::from_config(&config); |
261 | let stats = Stats::new(&metrics); |
262 | |
263 | cores.push(Box::new(Core { |
264 | tick: 0, |
265 | lifo_slot: None, |
266 | lifo_enabled: !config.disable_lifo_slot, |
267 | run_queue, |
268 | is_searching: false, |
269 | is_shutdown: false, |
270 | is_traced: false, |
271 | park: Some(park), |
272 | global_queue_interval: stats.tuned_global_queue_interval(&config), |
273 | stats, |
274 | rand: FastRand::from_seed(config.seed_generator.next_seed()), |
275 | })); |
276 | |
277 | remotes.push(Remote { steal, unpark }); |
278 | worker_metrics.push(metrics); |
279 | } |
280 | |
281 | let (idle, idle_synced) = Idle::new(size); |
282 | let (inject, inject_synced) = inject::Shared::new(); |
283 | |
284 | let remotes_len = remotes.len(); |
285 | let handle = Arc::new(Handle { |
286 | shared: Shared { |
287 | remotes: remotes.into_boxed_slice(), |
288 | inject, |
289 | idle, |
290 | owned: OwnedTasks::new(size), |
291 | synced: Mutex::new(Synced { |
292 | idle: idle_synced, |
293 | inject: inject_synced, |
294 | }), |
295 | shutdown_cores: Mutex::new(vec![]), |
296 | trace_status: TraceStatus::new(remotes_len), |
297 | config, |
298 | scheduler_metrics: SchedulerMetrics::new(), |
299 | worker_metrics: worker_metrics.into_boxed_slice(), |
300 | _counters: Counters, |
301 | }, |
302 | driver: driver_handle, |
303 | blocking_spawner, |
304 | seed_generator, |
305 | }); |
306 | |
307 | let mut launch = Launch(vec![]); |
308 | |
309 | for (index, core) in cores.drain(..).enumerate() { |
310 | launch.0.push(Arc::new(Worker { |
311 | handle: handle.clone(), |
312 | index, |
313 | core: AtomicCell::new(Some(core)), |
314 | })); |
315 | } |
316 | |
317 | (handle, launch) |
318 | } |
319 | |
320 | #[track_caller ] |
321 | pub(crate) fn block_in_place<F, R>(f: F) -> R |
322 | where |
323 | F: FnOnce() -> R, |
324 | { |
325 | // Try to steal the worker core back |
326 | struct Reset { |
327 | take_core: bool, |
328 | budget: coop::Budget, |
329 | } |
330 | |
331 | impl Drop for Reset { |
332 | fn drop(&mut self) { |
333 | with_current(|maybe_cx| { |
334 | if let Some(cx) = maybe_cx { |
335 | if self.take_core { |
336 | let core = cx.worker.core.take(); |
337 | let mut cx_core = cx.core.borrow_mut(); |
338 | assert!(cx_core.is_none()); |
339 | *cx_core = core; |
340 | } |
341 | |
342 | // Reset the task budget as we are re-entering the |
343 | // runtime. |
344 | coop::set(self.budget); |
345 | } |
346 | }); |
347 | } |
348 | } |
349 | |
350 | let mut had_entered = false; |
351 | let mut take_core = false; |
352 | |
353 | let setup_result = with_current(|maybe_cx| { |
354 | match ( |
355 | crate::runtime::context::current_enter_context(), |
356 | maybe_cx.is_some(), |
357 | ) { |
358 | (context::EnterRuntime::Entered { .. }, true) => { |
359 | // We are on a thread pool runtime thread, so we just need to |
360 | // set up blocking. |
361 | had_entered = true; |
362 | } |
363 | ( |
364 | context::EnterRuntime::Entered { |
365 | allow_block_in_place, |
366 | }, |
367 | false, |
368 | ) => { |
369 | // We are on an executor, but _not_ on the thread pool. That is |
370 | // _only_ okay if we are in a thread pool runtime's block_on |
371 | // method: |
372 | if allow_block_in_place { |
373 | had_entered = true; |
374 | return Ok(()); |
375 | } else { |
376 | // This probably means we are on the current_thread runtime or in a |
377 | // LocalSet, where it is _not_ okay to block. |
378 | return Err( |
379 | "can call blocking only when running on the multi-threaded runtime" , |
380 | ); |
381 | } |
382 | } |
383 | (context::EnterRuntime::NotEntered, true) => { |
384 | // This is a nested call to block_in_place (we already exited). |
385 | // All the necessary setup has already been done. |
386 | return Ok(()); |
387 | } |
388 | (context::EnterRuntime::NotEntered, false) => { |
389 | // We are outside of the tokio runtime, so blocking is fine. |
390 | // We can also skip all of the thread pool blocking setup steps. |
391 | return Ok(()); |
392 | } |
393 | } |
394 | |
395 | let cx = maybe_cx.expect("no .is_some() == false cases above should lead here" ); |
396 | |
397 | // Get the worker core. If none is set, then blocking is fine! |
398 | let core = match cx.core.borrow_mut().take() { |
399 | Some(core) => core, |
400 | None => return Ok(()), |
401 | }; |
402 | |
403 | // We are taking the core from the context and sending it to another |
404 | // thread. |
405 | take_core = true; |
406 | |
407 | // The parker should be set here |
408 | assert!(core.park.is_some()); |
409 | |
410 | // In order to block, the core must be sent to another thread for |
411 | // execution. |
412 | // |
413 | // First, move the core back into the worker's shared core slot. |
414 | cx.worker.core.set(core); |
415 | |
416 | // Next, clone the worker handle and send it to a new thread for |
417 | // processing. |
418 | // |
419 | // Once the blocking task is done executing, we will attempt to |
420 | // steal the core back. |
421 | let worker = cx.worker.clone(); |
422 | runtime::spawn_blocking(move || run(worker)); |
423 | Ok(()) |
424 | }); |
425 | |
426 | if let Err(panic_message) = setup_result { |
427 | panic!("{}" , panic_message); |
428 | } |
429 | |
430 | if had_entered { |
431 | // Unset the current task's budget. Blocking sections are not |
432 | // constrained by task budgets. |
433 | let _reset = Reset { |
434 | take_core, |
435 | budget: coop::stop(), |
436 | }; |
437 | |
438 | crate::runtime::context::exit_runtime(f) |
439 | } else { |
440 | f() |
441 | } |
442 | } |
443 | |
444 | impl Launch { |
445 | pub(crate) fn launch(mut self) { |
446 | for worker: Arc in self.0.drain(..) { |
447 | runtime::spawn_blocking(func:move || run(worker)); |
448 | } |
449 | } |
450 | } |
451 | |
452 | fn run(worker: Arc<Worker>) { |
453 | struct AbortOnPanic; |
454 | |
455 | impl Drop for AbortOnPanic { |
456 | fn drop(&mut self) { |
457 | if std::thread::panicking() { |
458 | eprintln!("worker thread panicking; aborting process" ); |
459 | std::process::abort(); |
460 | } |
461 | } |
462 | } |
463 | |
464 | // Catching panics on worker threads in tests is quite tricky. Instead, when |
465 | // debug assertions are enabled, we just abort the process. |
466 | #[cfg (debug_assertions)] |
467 | let _abort_on_panic = AbortOnPanic; |
468 | |
469 | // Acquire a core. If this fails, then another thread is running this |
470 | // worker and there is nothing further to do. |
471 | let core = match worker.core.take() { |
472 | Some(core) => core, |
473 | None => return, |
474 | }; |
475 | |
476 | let handle = scheduler::Handle::MultiThread(worker.handle.clone()); |
477 | |
478 | crate::runtime::context::enter_runtime(&handle, true, |_| { |
479 | // Set the worker context. |
480 | let cx = scheduler::Context::MultiThread(Context { |
481 | worker, |
482 | core: RefCell::new(None), |
483 | defer: Defer::new(), |
484 | }); |
485 | |
486 | context::set_scheduler(&cx, || { |
487 | let cx = cx.expect_multi_thread(); |
488 | |
489 | // This should always be an error. It only returns a `Result` to support |
490 | // using `?` to short circuit. |
491 | assert!(cx.run(core).is_err()); |
492 | |
493 | // Check if there are any deferred tasks to notify. This can happen when |
494 | // the worker core is lost due to `block_in_place()` being called from |
495 | // within the task. |
496 | cx.defer.wake(); |
497 | }); |
498 | }); |
499 | } |
500 | |
501 | impl Context { |
502 | fn run(&self, mut core: Box<Core>) -> RunResult { |
503 | // Reset `lifo_enabled` here in case the core was previously stolen from |
504 | // a task that had the LIFO slot disabled. |
505 | self.reset_lifo_enabled(&mut core); |
506 | |
507 | // Start as "processing" tasks as polling tasks from the local queue |
508 | // will be one of the first things we do. |
509 | core.stats.start_processing_scheduled_tasks(); |
510 | |
511 | while !core.is_shutdown { |
512 | self.assert_lifo_enabled_is_correct(&core); |
513 | |
514 | if core.is_traced { |
515 | core = self.worker.handle.trace_core(core); |
516 | } |
517 | |
518 | // Increment the tick |
519 | core.tick(); |
520 | |
521 | // Run maintenance, if needed |
522 | core = self.maintenance(core); |
523 | |
524 | // First, check work available to the current worker. |
525 | if let Some(task) = core.next_task(&self.worker) { |
526 | core = self.run_task(task, core)?; |
527 | continue; |
528 | } |
529 | |
530 | // We consumed all work in the queues and will start searching for work. |
531 | core.stats.end_processing_scheduled_tasks(); |
532 | |
533 | // There is no more **local** work to process, try to steal work |
534 | // from other workers. |
535 | if let Some(task) = core.steal_work(&self.worker) { |
536 | // Found work, switch back to processing |
537 | core.stats.start_processing_scheduled_tasks(); |
538 | core = self.run_task(task, core)?; |
539 | } else { |
540 | // Wait for work |
541 | core = if !self.defer.is_empty() { |
542 | self.park_timeout(core, Some(Duration::from_millis(0))) |
543 | } else { |
544 | self.park(core) |
545 | }; |
546 | core.stats.start_processing_scheduled_tasks(); |
547 | } |
548 | } |
549 | |
550 | core.pre_shutdown(&self.worker); |
551 | // Signal shutdown |
552 | self.worker.handle.shutdown_core(core); |
553 | Err(()) |
554 | } |
555 | |
556 | fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult { |
557 | let task = self.worker.handle.shared.owned.assert_owner(task); |
558 | |
559 | // Make sure the worker is not in the **searching** state. This enables |
560 | // another idle worker to try to steal work. |
561 | core.transition_from_searching(&self.worker); |
562 | |
563 | self.assert_lifo_enabled_is_correct(&core); |
564 | |
565 | // Measure the poll start time. Note that we may end up polling other |
566 | // tasks under this measurement. In this case, the tasks came from the |
567 | // LIFO slot and are considered part of the current task for scheduling |
568 | // purposes. These tasks inherent the "parent"'s limits. |
569 | core.stats.start_poll(); |
570 | |
571 | // Make the core available to the runtime context |
572 | *self.core.borrow_mut() = Some(core); |
573 | |
574 | // Run the task |
575 | coop::budget(|| { |
576 | task.run(); |
577 | let mut lifo_polls = 0; |
578 | |
579 | // As long as there is budget remaining and a task exists in the |
580 | // `lifo_slot`, then keep running. |
581 | loop { |
582 | // Check if we still have the core. If not, the core was stolen |
583 | // by another worker. |
584 | let mut core = match self.core.borrow_mut().take() { |
585 | Some(core) => core, |
586 | None => { |
587 | // In this case, we cannot call `reset_lifo_enabled()` |
588 | // because the core was stolen. The stealer will handle |
589 | // that at the top of `Context::run` |
590 | return Err(()); |
591 | } |
592 | }; |
593 | |
594 | // Check for a task in the LIFO slot |
595 | let task = match core.lifo_slot.take() { |
596 | Some(task) => task, |
597 | None => { |
598 | self.reset_lifo_enabled(&mut core); |
599 | core.stats.end_poll(); |
600 | return Ok(core); |
601 | } |
602 | }; |
603 | |
604 | if !coop::has_budget_remaining() { |
605 | core.stats.end_poll(); |
606 | |
607 | // Not enough budget left to run the LIFO task, push it to |
608 | // the back of the queue and return. |
609 | core.run_queue.push_back_or_overflow( |
610 | task, |
611 | &*self.worker.handle, |
612 | &mut core.stats, |
613 | ); |
614 | // If we hit this point, the LIFO slot should be enabled. |
615 | // There is no need to reset it. |
616 | debug_assert!(core.lifo_enabled); |
617 | return Ok(core); |
618 | } |
619 | |
620 | // Track that we are about to run a task from the LIFO slot. |
621 | lifo_polls += 1; |
622 | super::counters::inc_lifo_schedules(); |
623 | |
624 | // Disable the LIFO slot if we reach our limit |
625 | // |
626 | // In ping-ping style workloads where task A notifies task B, |
627 | // which notifies task A again, continuously prioritizing the |
628 | // LIFO slot can cause starvation as these two tasks will |
629 | // repeatedly schedule the other. To mitigate this, we limit the |
630 | // number of times the LIFO slot is prioritized. |
631 | if lifo_polls >= MAX_LIFO_POLLS_PER_TICK { |
632 | core.lifo_enabled = false; |
633 | super::counters::inc_lifo_capped(); |
634 | } |
635 | |
636 | // Run the LIFO task, then loop |
637 | *self.core.borrow_mut() = Some(core); |
638 | let task = self.worker.handle.shared.owned.assert_owner(task); |
639 | task.run(); |
640 | } |
641 | }) |
642 | } |
643 | |
644 | fn reset_lifo_enabled(&self, core: &mut Core) { |
645 | core.lifo_enabled = !self.worker.handle.shared.config.disable_lifo_slot; |
646 | } |
647 | |
648 | fn assert_lifo_enabled_is_correct(&self, core: &Core) { |
649 | debug_assert_eq!( |
650 | core.lifo_enabled, |
651 | !self.worker.handle.shared.config.disable_lifo_slot |
652 | ); |
653 | } |
654 | |
655 | fn maintenance(&self, mut core: Box<Core>) -> Box<Core> { |
656 | if core.tick % self.worker.handle.shared.config.event_interval == 0 { |
657 | super::counters::inc_num_maintenance(); |
658 | |
659 | core.stats.end_processing_scheduled_tasks(); |
660 | |
661 | // Call `park` with a 0 timeout. This enables the I/O driver, timer, ... |
662 | // to run without actually putting the thread to sleep. |
663 | core = self.park_timeout(core, Some(Duration::from_millis(0))); |
664 | |
665 | // Run regularly scheduled maintenance |
666 | core.maintenance(&self.worker); |
667 | |
668 | core.stats.start_processing_scheduled_tasks(); |
669 | } |
670 | |
671 | core |
672 | } |
673 | |
674 | /// Parks the worker thread while waiting for tasks to execute. |
675 | /// |
676 | /// This function checks if indeed there's no more work left to be done before parking. |
677 | /// Also important to notice that, before parking, the worker thread will try to take |
678 | /// ownership of the Driver (IO/Time) and dispatch any events that might have fired. |
679 | /// Whenever a worker thread executes the Driver loop, all waken tasks are scheduled |
680 | /// in its own local queue until the queue saturates (ntasks > `LOCAL_QUEUE_CAPACITY`). |
681 | /// When the local queue is saturated, the overflow tasks are added to the injection queue |
682 | /// from where other workers can pick them up. |
683 | /// Also, we rely on the workstealing algorithm to spread the tasks amongst workers |
684 | /// after all the IOs get dispatched |
685 | fn park(&self, mut core: Box<Core>) -> Box<Core> { |
686 | if let Some(f) = &self.worker.handle.shared.config.before_park { |
687 | f(); |
688 | } |
689 | |
690 | if core.transition_to_parked(&self.worker) { |
691 | while !core.is_shutdown && !core.is_traced { |
692 | core.stats.about_to_park(); |
693 | core = self.park_timeout(core, None); |
694 | |
695 | // Run regularly scheduled maintenance |
696 | core.maintenance(&self.worker); |
697 | |
698 | if core.transition_from_parked(&self.worker) { |
699 | break; |
700 | } |
701 | } |
702 | } |
703 | |
704 | if let Some(f) = &self.worker.handle.shared.config.after_unpark { |
705 | f(); |
706 | } |
707 | core |
708 | } |
709 | |
710 | fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> { |
711 | self.assert_lifo_enabled_is_correct(&core); |
712 | |
713 | // Take the parker out of core |
714 | let mut park = core.park.take().expect("park missing" ); |
715 | |
716 | // Store `core` in context |
717 | *self.core.borrow_mut() = Some(core); |
718 | |
719 | // Park thread |
720 | if let Some(timeout) = duration { |
721 | park.park_timeout(&self.worker.handle.driver, timeout); |
722 | } else { |
723 | park.park(&self.worker.handle.driver); |
724 | } |
725 | |
726 | self.defer.wake(); |
727 | |
728 | // Remove `core` from context |
729 | core = self.core.borrow_mut().take().expect("core missing" ); |
730 | |
731 | // Place `park` back in `core` |
732 | core.park = Some(park); |
733 | |
734 | if core.should_notify_others() { |
735 | self.worker.handle.notify_parked_local(); |
736 | } |
737 | |
738 | core |
739 | } |
740 | |
741 | pub(crate) fn defer(&self, waker: &Waker) { |
742 | self.defer.defer(waker); |
743 | } |
744 | } |
745 | |
746 | impl Core { |
747 | /// Increment the tick |
748 | fn tick(&mut self) { |
749 | self.tick = self.tick.wrapping_add(1); |
750 | } |
751 | |
752 | /// Return the next notified task available to this worker. |
753 | fn next_task(&mut self, worker: &Worker) -> Option<Notified> { |
754 | if self.tick % self.global_queue_interval == 0 { |
755 | // Update the global queue interval, if needed |
756 | self.tune_global_queue_interval(worker); |
757 | |
758 | worker |
759 | .handle |
760 | .next_remote_task() |
761 | .or_else(|| self.next_local_task()) |
762 | } else { |
763 | let maybe_task = self.next_local_task(); |
764 | |
765 | if maybe_task.is_some() { |
766 | return maybe_task; |
767 | } |
768 | |
769 | if worker.inject().is_empty() { |
770 | return None; |
771 | } |
772 | |
773 | // Other threads can only **remove** tasks from the current worker's |
774 | // `run_queue`. So, we can be confident that by the time we call |
775 | // `run_queue.push_back` below, there will be *at least* `cap` |
776 | // available slots in the queue. |
777 | let cap = usize::min( |
778 | self.run_queue.remaining_slots(), |
779 | self.run_queue.max_capacity() / 2, |
780 | ); |
781 | |
782 | // The worker is currently idle, pull a batch of work from the |
783 | // injection queue. We don't want to pull *all* the work so other |
784 | // workers can also get some. |
785 | let n = usize::min( |
786 | worker.inject().len() / worker.handle.shared.remotes.len() + 1, |
787 | cap, |
788 | ); |
789 | |
790 | // Take at least one task since the first task is returned directly |
791 | // and not pushed onto the local queue. |
792 | let n = usize::max(1, n); |
793 | |
794 | let mut synced = worker.handle.shared.synced.lock(); |
795 | // safety: passing in the correct `inject::Synced`. |
796 | let mut tasks = unsafe { worker.inject().pop_n(&mut synced.inject, n) }; |
797 | |
798 | // Pop the first task to return immediately |
799 | let ret = tasks.next(); |
800 | |
801 | // Push the rest of the on the run queue |
802 | self.run_queue.push_back(tasks); |
803 | |
804 | ret |
805 | } |
806 | } |
807 | |
808 | fn next_local_task(&mut self) -> Option<Notified> { |
809 | self.lifo_slot.take().or_else(|| self.run_queue.pop()) |
810 | } |
811 | |
812 | /// Function responsible for stealing tasks from another worker |
813 | /// |
814 | /// Note: Only if less than half the workers are searching for tasks to steal |
815 | /// a new worker will actually try to steal. The idea is to make sure not all |
816 | /// workers will be trying to steal at the same time. |
817 | fn steal_work(&mut self, worker: &Worker) -> Option<Notified> { |
818 | if !self.transition_to_searching(worker) { |
819 | return None; |
820 | } |
821 | |
822 | let num = worker.handle.shared.remotes.len(); |
823 | // Start from a random worker |
824 | let start = self.rand.fastrand_n(num as u32) as usize; |
825 | |
826 | for i in 0..num { |
827 | let i = (start + i) % num; |
828 | |
829 | // Don't steal from ourself! We know we don't have work. |
830 | if i == worker.index { |
831 | continue; |
832 | } |
833 | |
834 | let target = &worker.handle.shared.remotes[i]; |
835 | if let Some(task) = target |
836 | .steal |
837 | .steal_into(&mut self.run_queue, &mut self.stats) |
838 | { |
839 | return Some(task); |
840 | } |
841 | } |
842 | |
843 | // Fallback on checking the global queue |
844 | worker.handle.next_remote_task() |
845 | } |
846 | |
847 | fn transition_to_searching(&mut self, worker: &Worker) -> bool { |
848 | if !self.is_searching { |
849 | self.is_searching = worker.handle.shared.idle.transition_worker_to_searching(); |
850 | } |
851 | |
852 | self.is_searching |
853 | } |
854 | |
855 | fn transition_from_searching(&mut self, worker: &Worker) { |
856 | if !self.is_searching { |
857 | return; |
858 | } |
859 | |
860 | self.is_searching = false; |
861 | worker.handle.transition_worker_from_searching(); |
862 | } |
863 | |
864 | fn has_tasks(&self) -> bool { |
865 | self.lifo_slot.is_some() || self.run_queue.has_tasks() |
866 | } |
867 | |
868 | fn should_notify_others(&self) -> bool { |
869 | // If there are tasks available to steal, but this worker is not |
870 | // looking for tasks to steal, notify another worker. |
871 | if self.is_searching { |
872 | return false; |
873 | } |
874 | self.lifo_slot.is_some() as usize + self.run_queue.len() > 1 |
875 | } |
876 | |
877 | /// Prepares the worker state for parking. |
878 | /// |
879 | /// Returns true if the transition happened, false if there is work to do first. |
880 | fn transition_to_parked(&mut self, worker: &Worker) -> bool { |
881 | // Workers should not park if they have work to do |
882 | if self.has_tasks() || self.is_traced { |
883 | return false; |
884 | } |
885 | |
886 | // When the final worker transitions **out** of searching to parked, it |
887 | // must check all the queues one last time in case work materialized |
888 | // between the last work scan and transitioning out of searching. |
889 | let is_last_searcher = worker.handle.shared.idle.transition_worker_to_parked( |
890 | &worker.handle.shared, |
891 | worker.index, |
892 | self.is_searching, |
893 | ); |
894 | |
895 | // The worker is no longer searching. Setting this is the local cache |
896 | // only. |
897 | self.is_searching = false; |
898 | |
899 | if is_last_searcher { |
900 | worker.handle.notify_if_work_pending(); |
901 | } |
902 | |
903 | true |
904 | } |
905 | |
906 | /// Returns `true` if the transition happened. |
907 | fn transition_from_parked(&mut self, worker: &Worker) -> bool { |
908 | // If a task is in the lifo slot/run queue, then we must unpark regardless of |
909 | // being notified |
910 | if self.has_tasks() { |
911 | // When a worker wakes, it should only transition to the "searching" |
912 | // state when the wake originates from another worker *or* a new task |
913 | // is pushed. We do *not* want the worker to transition to "searching" |
914 | // when it wakes when the I/O driver receives new events. |
915 | self.is_searching = !worker |
916 | .handle |
917 | .shared |
918 | .idle |
919 | .unpark_worker_by_id(&worker.handle.shared, worker.index); |
920 | return true; |
921 | } |
922 | |
923 | if worker |
924 | .handle |
925 | .shared |
926 | .idle |
927 | .is_parked(&worker.handle.shared, worker.index) |
928 | { |
929 | return false; |
930 | } |
931 | |
932 | // When unparked, the worker is in the searching state. |
933 | self.is_searching = true; |
934 | true |
935 | } |
936 | |
937 | /// Runs maintenance work such as checking the pool's state. |
938 | fn maintenance(&mut self, worker: &Worker) { |
939 | self.stats |
940 | .submit(&worker.handle.shared.worker_metrics[worker.index]); |
941 | |
942 | if !self.is_shutdown { |
943 | // Check if the scheduler has been shutdown |
944 | let synced = worker.handle.shared.synced.lock(); |
945 | self.is_shutdown = worker.inject().is_closed(&synced.inject); |
946 | } |
947 | |
948 | if !self.is_traced { |
949 | // Check if the worker should be tracing. |
950 | self.is_traced = worker.handle.shared.trace_status.trace_requested(); |
951 | } |
952 | } |
953 | |
954 | /// Signals all tasks to shut down, and waits for them to complete. Must run |
955 | /// before we enter the single-threaded phase of shutdown processing. |
956 | fn pre_shutdown(&mut self, worker: &Worker) { |
957 | // Start from a random inner list |
958 | let start = self |
959 | .rand |
960 | .fastrand_n(worker.handle.shared.owned.get_shard_size() as u32); |
961 | // Signal to all tasks to shut down. |
962 | worker |
963 | .handle |
964 | .shared |
965 | .owned |
966 | .close_and_shutdown_all(start as usize); |
967 | |
968 | self.stats |
969 | .submit(&worker.handle.shared.worker_metrics[worker.index]); |
970 | } |
971 | |
972 | /// Shuts down the core. |
973 | fn shutdown(&mut self, handle: &Handle) { |
974 | // Take the core |
975 | let mut park = self.park.take().expect("park missing" ); |
976 | |
977 | // Drain the queue |
978 | while self.next_local_task().is_some() {} |
979 | |
980 | park.shutdown(&handle.driver); |
981 | } |
982 | |
983 | fn tune_global_queue_interval(&mut self, worker: &Worker) { |
984 | let next = self |
985 | .stats |
986 | .tuned_global_queue_interval(&worker.handle.shared.config); |
987 | |
988 | debug_assert!(next > 1); |
989 | |
990 | // Smooth out jitter |
991 | if abs_diff(self.global_queue_interval, next) > 2 { |
992 | self.global_queue_interval = next; |
993 | } |
994 | } |
995 | } |
996 | |
997 | impl Worker { |
998 | /// Returns a reference to the scheduler's injection queue. |
999 | fn inject(&self) -> &inject::Shared<Arc<Handle>> { |
1000 | &self.handle.shared.inject |
1001 | } |
1002 | } |
1003 | |
1004 | // TODO: Move `Handle` impls into handle.rs |
1005 | impl task::Schedule for Arc<Handle> { |
1006 | fn release(&self, task: &Task) -> Option<Task> { |
1007 | self.shared.owned.remove(task) |
1008 | } |
1009 | |
1010 | fn schedule(&self, task: Notified) { |
1011 | self.schedule_task(task, is_yield:false); |
1012 | } |
1013 | |
1014 | fn yield_now(&self, task: Notified) { |
1015 | self.schedule_task(task, is_yield:true); |
1016 | } |
1017 | } |
1018 | |
1019 | impl Handle { |
1020 | pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) { |
1021 | with_current(|maybe_cx| { |
1022 | if let Some(cx) = maybe_cx { |
1023 | // Make sure the task is part of the **current** scheduler. |
1024 | if self.ptr_eq(&cx.worker.handle) { |
1025 | // And the current thread still holds a core |
1026 | if let Some(core) = cx.core.borrow_mut().as_mut() { |
1027 | self.schedule_local(core, task, is_yield); |
1028 | return; |
1029 | } |
1030 | } |
1031 | } |
1032 | |
1033 | // Otherwise, use the inject queue. |
1034 | self.push_remote_task(task); |
1035 | self.notify_parked_remote(); |
1036 | }); |
1037 | } |
1038 | |
1039 | pub(super) fn schedule_option_task_without_yield(&self, task: Option<Notified>) { |
1040 | if let Some(task) = task { |
1041 | self.schedule_task(task, false); |
1042 | } |
1043 | } |
1044 | |
1045 | fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) { |
1046 | core.stats.inc_local_schedule_count(); |
1047 | |
1048 | // Spawning from the worker thread. If scheduling a "yield" then the |
1049 | // task must always be pushed to the back of the queue, enabling other |
1050 | // tasks to be executed. If **not** a yield, then there is more |
1051 | // flexibility and the task may go to the front of the queue. |
1052 | let should_notify = if is_yield || !core.lifo_enabled { |
1053 | core.run_queue |
1054 | .push_back_or_overflow(task, self, &mut core.stats); |
1055 | true |
1056 | } else { |
1057 | // Push to the LIFO slot |
1058 | let prev = core.lifo_slot.take(); |
1059 | let ret = prev.is_some(); |
1060 | |
1061 | if let Some(prev) = prev { |
1062 | core.run_queue |
1063 | .push_back_or_overflow(prev, self, &mut core.stats); |
1064 | } |
1065 | |
1066 | core.lifo_slot = Some(task); |
1067 | |
1068 | ret |
1069 | }; |
1070 | |
1071 | // Only notify if not currently parked. If `park` is `None`, then the |
1072 | // scheduling is from a resource driver. As notifications often come in |
1073 | // batches, the notification is delayed until the park is complete. |
1074 | if should_notify && core.park.is_some() { |
1075 | self.notify_parked_local(); |
1076 | } |
1077 | } |
1078 | |
1079 | fn next_remote_task(&self) -> Option<Notified> { |
1080 | if self.shared.inject.is_empty() { |
1081 | return None; |
1082 | } |
1083 | |
1084 | let mut synced = self.shared.synced.lock(); |
1085 | // safety: passing in correct `idle::Synced` |
1086 | unsafe { self.shared.inject.pop(&mut synced.inject) } |
1087 | } |
1088 | |
1089 | fn push_remote_task(&self, task: Notified) { |
1090 | self.shared.scheduler_metrics.inc_remote_schedule_count(); |
1091 | |
1092 | let mut synced = self.shared.synced.lock(); |
1093 | // safety: passing in correct `idle::Synced` |
1094 | unsafe { |
1095 | self.shared.inject.push(&mut synced.inject, task); |
1096 | } |
1097 | } |
1098 | |
1099 | pub(super) fn close(&self) { |
1100 | if self |
1101 | .shared |
1102 | .inject |
1103 | .close(&mut self.shared.synced.lock().inject) |
1104 | { |
1105 | self.notify_all(); |
1106 | } |
1107 | } |
1108 | |
1109 | fn notify_parked_local(&self) { |
1110 | super::counters::inc_num_inc_notify_local(); |
1111 | |
1112 | if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) { |
1113 | super::counters::inc_num_unparks_local(); |
1114 | self.shared.remotes[index].unpark.unpark(&self.driver); |
1115 | } |
1116 | } |
1117 | |
1118 | fn notify_parked_remote(&self) { |
1119 | if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) { |
1120 | self.shared.remotes[index].unpark.unpark(&self.driver); |
1121 | } |
1122 | } |
1123 | |
1124 | pub(super) fn notify_all(&self) { |
1125 | for remote in &self.shared.remotes[..] { |
1126 | remote.unpark.unpark(&self.driver); |
1127 | } |
1128 | } |
1129 | |
1130 | fn notify_if_work_pending(&self) { |
1131 | for remote in &self.shared.remotes[..] { |
1132 | if !remote.steal.is_empty() { |
1133 | self.notify_parked_local(); |
1134 | return; |
1135 | } |
1136 | } |
1137 | |
1138 | if !self.shared.inject.is_empty() { |
1139 | self.notify_parked_local(); |
1140 | } |
1141 | } |
1142 | |
1143 | fn transition_worker_from_searching(&self) { |
1144 | if self.shared.idle.transition_worker_from_searching() { |
1145 | // We are the final searching worker. Because work was found, we |
1146 | // need to notify another worker. |
1147 | self.notify_parked_local(); |
1148 | } |
1149 | } |
1150 | |
1151 | /// Signals that a worker has observed the shutdown signal and has replaced |
1152 | /// its core back into its handle. |
1153 | /// |
1154 | /// If all workers have reached this point, the final cleanup is performed. |
1155 | fn shutdown_core(&self, core: Box<Core>) { |
1156 | let mut cores = self.shared.shutdown_cores.lock(); |
1157 | cores.push(core); |
1158 | |
1159 | if cores.len() != self.shared.remotes.len() { |
1160 | return; |
1161 | } |
1162 | |
1163 | debug_assert!(self.shared.owned.is_empty()); |
1164 | |
1165 | for mut core in cores.drain(..) { |
1166 | core.shutdown(self); |
1167 | } |
1168 | |
1169 | // Drain the injection queue |
1170 | // |
1171 | // We already shut down every task, so we can simply drop the tasks. |
1172 | while let Some(task) = self.next_remote_task() { |
1173 | drop(task); |
1174 | } |
1175 | } |
1176 | |
1177 | fn ptr_eq(&self, other: &Handle) -> bool { |
1178 | std::ptr::eq(self, other) |
1179 | } |
1180 | } |
1181 | |
1182 | impl Overflow<Arc<Handle>> for Handle { |
1183 | fn push(&self, task: task::Notified<Arc<Handle>>) { |
1184 | self.push_remote_task(task); |
1185 | } |
1186 | |
1187 | fn push_batch<I>(&self, iter: I) |
1188 | where |
1189 | I: Iterator<Item = task::Notified<Arc<Handle>>>, |
1190 | { |
1191 | unsafe { |
1192 | self.shared.inject.push_batch(self, iter); |
1193 | } |
1194 | } |
1195 | } |
1196 | |
1197 | pub(crate) struct InjectGuard<'a> { |
1198 | lock: crate::loom::sync::MutexGuard<'a, Synced>, |
1199 | } |
1200 | |
1201 | impl<'a> AsMut<inject::Synced> for InjectGuard<'a> { |
1202 | fn as_mut(&mut self) -> &mut inject::Synced { |
1203 | &mut self.lock.inject |
1204 | } |
1205 | } |
1206 | |
1207 | impl<'a> Lock<inject::Synced> for &'a Handle { |
1208 | type Handle = InjectGuard<'a>; |
1209 | |
1210 | fn lock(self) -> Self::Handle { |
1211 | InjectGuard { |
1212 | lock: self.shared.synced.lock(), |
1213 | } |
1214 | } |
1215 | } |
1216 | |
1217 | #[track_caller ] |
1218 | fn with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R { |
1219 | use scheduler::Context::MultiThread; |
1220 | |
1221 | context::with_scheduler(|ctx: Option<&Context>| match ctx { |
1222 | Some(MultiThread(ctx: &Context)) => f(Some(ctx)), |
1223 | _ => f(None), |
1224 | }) |
1225 | } |
1226 | |
1227 | // `u32::abs_diff` is not available on Tokio's MSRV. |
1228 | fn abs_diff(a: u32, b: u32) -> u32 { |
1229 | if a > b { |
1230 | a - b |
1231 | } else { |
1232 | b - a |
1233 | } |
1234 | } |
1235 | |