1 | //! A scheduler is initialized with a fixed number of workers. Each worker is |
2 | //! driven by a thread. Each worker has a "core" which contains data such as the |
3 | //! run queue and other state. When `block_in_place` is called, the worker's |
4 | //! "core" is handed off to a new thread allowing the scheduler to continue to |
5 | //! make progress while the originating thread blocks. |
6 | //! |
7 | //! # Shutdown |
8 | //! |
9 | //! Shutting down the runtime involves the following steps: |
10 | //! |
11 | //! 1. The Shared::close method is called. This closes the inject queue and |
12 | //! `OwnedTasks` instance and wakes up all worker threads. |
13 | //! |
14 | //! 2. Each worker thread observes the close signal next time it runs |
15 | //! Core::maintenance by checking whether the inject queue is closed. |
16 | //! The `Core::is_shutdown` flag is set to true. |
17 | //! |
18 | //! 3. The worker thread calls `pre_shutdown` in parallel. Here, the worker |
19 | //! will keep removing tasks from `OwnedTasks` until it is empty. No new |
20 | //! tasks can be pushed to the `OwnedTasks` during or after this step as it |
21 | //! was closed in step 1. |
22 | //! |
23 | //! 5. The workers call Shared::shutdown to enter the single-threaded phase of |
24 | //! shutdown. These calls will push their core to `Shared::shutdown_cores`, |
25 | //! and the last thread to push its core will finish the shutdown procedure. |
26 | //! |
27 | //! 6. The local run queue of each core is emptied, then the inject queue is |
28 | //! emptied. |
29 | //! |
30 | //! At this point, shutdown has completed. It is not possible for any of the |
31 | //! collections to contain any tasks at this point, as each collection was |
32 | //! closed first, then emptied afterwards. |
33 | //! |
34 | //! ## Spawns during shutdown |
35 | //! |
36 | //! When spawning tasks during shutdown, there are two cases: |
37 | //! |
38 | //! * The spawner observes the `OwnedTasks` being open, and the inject queue is |
39 | //! closed. |
40 | //! * The spawner observes the `OwnedTasks` being closed and doesn't check the |
41 | //! inject queue. |
42 | //! |
43 | //! The first case can only happen if the `OwnedTasks::bind` call happens before |
44 | //! or during step 1 of shutdown. In this case, the runtime will clean up the |
45 | //! task in step 3 of shutdown. |
46 | //! |
47 | //! In the latter case, the task was not spawned and the task is immediately |
48 | //! cancelled by the spawner. |
49 | //! |
50 | //! The correctness of shutdown requires both the inject queue and `OwnedTasks` |
51 | //! collection to have a closed bit. With a close bit on only the inject queue, |
52 | //! spawning could run in to a situation where a task is successfully bound long |
53 | //! after the runtime has shut down. With a close bit on only the `OwnedTasks`, |
54 | //! the first spawning situation could result in the notification being pushed |
55 | //! to the inject queue after step 6 of shutdown, which would leave a task in |
56 | //! the inject queue indefinitely. This would be a ref-count cycle and a memory |
57 | //! leak. |
58 | |
59 | use crate::loom::sync::{Arc, Mutex}; |
60 | use crate::runtime; |
61 | use crate::runtime::scheduler::multi_thread::{ |
62 | idle, queue, Counters, Handle, Idle, Overflow, Parker, Stats, TraceStatus, Unparker, |
63 | }; |
64 | use crate::runtime::scheduler::{inject, Defer, Lock}; |
65 | use crate::runtime::task::{OwnedTasks, TaskHarnessScheduleHooks}; |
66 | use crate::runtime::{blocking, driver, scheduler, task, Config, SchedulerMetrics, WorkerMetrics}; |
67 | use crate::runtime::{context, TaskHooks}; |
68 | use crate::task::coop; |
69 | use crate::util::atomic_cell::AtomicCell; |
70 | use crate::util::rand::{FastRand, RngSeedGenerator}; |
71 | |
72 | use std::cell::RefCell; |
73 | use std::task::Waker; |
74 | use std::thread; |
75 | use std::time::Duration; |
76 | |
77 | mod metrics; |
78 | |
79 | cfg_taskdump! { |
80 | mod taskdump; |
81 | } |
82 | |
83 | cfg_not_taskdump! { |
84 | mod taskdump_mock; |
85 | } |
86 | |
87 | /// A scheduler worker |
88 | pub(super) struct Worker { |
89 | /// Reference to scheduler's handle |
90 | handle: Arc<Handle>, |
91 | |
92 | /// Index holding this worker's remote state |
93 | index: usize, |
94 | |
95 | /// Used to hand-off a worker's core to another thread. |
96 | core: AtomicCell<Core>, |
97 | } |
98 | |
99 | /// Core data |
100 | struct Core { |
101 | /// Used to schedule bookkeeping tasks every so often. |
102 | tick: u32, |
103 | |
104 | /// When a task is scheduled from a worker, it is stored in this slot. The |
105 | /// worker will check this slot for a task **before** checking the run |
106 | /// queue. This effectively results in the **last** scheduled task to be run |
107 | /// next (LIFO). This is an optimization for improving locality which |
108 | /// benefits message passing patterns and helps to reduce latency. |
109 | lifo_slot: Option<Notified>, |
110 | |
111 | /// When `true`, locally scheduled tasks go to the LIFO slot. When `false`, |
112 | /// they go to the back of the `run_queue`. |
113 | lifo_enabled: bool, |
114 | |
115 | /// The worker-local run queue. |
116 | run_queue: queue::Local<Arc<Handle>>, |
117 | |
118 | /// True if the worker is currently searching for more work. Searching |
119 | /// involves attempting to steal from other workers. |
120 | is_searching: bool, |
121 | |
122 | /// True if the scheduler is being shutdown |
123 | is_shutdown: bool, |
124 | |
125 | /// True if the scheduler is being traced |
126 | is_traced: bool, |
127 | |
128 | /// Parker |
129 | /// |
130 | /// Stored in an `Option` as the parker is added / removed to make the |
131 | /// borrow checker happy. |
132 | park: Option<Parker>, |
133 | |
134 | /// Per-worker runtime stats |
135 | stats: Stats, |
136 | |
137 | /// How often to check the global queue |
138 | global_queue_interval: u32, |
139 | |
140 | /// Fast random number generator. |
141 | rand: FastRand, |
142 | } |
143 | |
144 | /// State shared across all workers |
145 | pub(crate) struct Shared { |
146 | /// Per-worker remote state. All other workers have access to this and is |
147 | /// how they communicate between each other. |
148 | remotes: Box<[Remote]>, |
149 | |
150 | /// Global task queue used for: |
151 | /// 1. Submit work to the scheduler while **not** currently on a worker thread. |
152 | /// 2. Submit work to the scheduler when a worker run queue is saturated |
153 | pub(super) inject: inject::Shared<Arc<Handle>>, |
154 | |
155 | /// Coordinates idle workers |
156 | idle: Idle, |
157 | |
158 | /// Collection of all active tasks spawned onto this executor. |
159 | pub(crate) owned: OwnedTasks<Arc<Handle>>, |
160 | |
161 | /// Data synchronized by the scheduler mutex |
162 | pub(super) synced: Mutex<Synced>, |
163 | |
164 | /// Cores that have observed the shutdown signal |
165 | /// |
166 | /// The core is **not** placed back in the worker to avoid it from being |
167 | /// stolen by a thread that was spawned as part of `block_in_place`. |
168 | #[allow (clippy::vec_box)] // we're moving an already-boxed value |
169 | shutdown_cores: Mutex<Vec<Box<Core>>>, |
170 | |
171 | /// The number of cores that have observed the trace signal. |
172 | pub(super) trace_status: TraceStatus, |
173 | |
174 | /// Scheduler configuration options |
175 | config: Config, |
176 | |
177 | /// Collects metrics from the runtime. |
178 | pub(super) scheduler_metrics: SchedulerMetrics, |
179 | |
180 | pub(super) worker_metrics: Box<[WorkerMetrics]>, |
181 | |
182 | /// Only held to trigger some code on drop. This is used to get internal |
183 | /// runtime metrics that can be useful when doing performance |
184 | /// investigations. This does nothing (empty struct, no drop impl) unless |
185 | /// the `tokio_internal_mt_counters` `cfg` flag is set. |
186 | _counters: Counters, |
187 | } |
188 | |
189 | /// Data synchronized by the scheduler mutex |
190 | pub(crate) struct Synced { |
191 | /// Synchronized state for `Idle`. |
192 | pub(super) idle: idle::Synced, |
193 | |
194 | /// Synchronized state for `Inject`. |
195 | pub(crate) inject: inject::Synced, |
196 | } |
197 | |
198 | /// Used to communicate with a worker from other threads. |
199 | struct Remote { |
200 | /// Steals tasks from this worker. |
201 | pub(super) steal: queue::Steal<Arc<Handle>>, |
202 | |
203 | /// Unparks the associated worker thread |
204 | unpark: Unparker, |
205 | } |
206 | |
207 | /// Thread-local context |
208 | pub(crate) struct Context { |
209 | /// Worker |
210 | worker: Arc<Worker>, |
211 | |
212 | /// Core data |
213 | core: RefCell<Option<Box<Core>>>, |
214 | |
215 | /// Tasks to wake after resource drivers are polled. This is mostly to |
216 | /// handle yielded tasks. |
217 | pub(crate) defer: Defer, |
218 | } |
219 | |
220 | /// Starts the workers |
221 | pub(crate) struct Launch(Vec<Arc<Worker>>); |
222 | |
223 | /// Running a task may consume the core. If the core is still available when |
224 | /// running the task completes, it is returned. Otherwise, the worker will need |
225 | /// to stop processing. |
226 | type RunResult = Result<Box<Core>, ()>; |
227 | |
228 | /// A task handle |
229 | type Task = task::Task<Arc<Handle>>; |
230 | |
231 | /// A notified task handle |
232 | type Notified = task::Notified<Arc<Handle>>; |
233 | |
234 | /// Value picked out of thin-air. Running the LIFO slot a handful of times |
235 | /// seems sufficient to benefit from locality. More than 3 times probably is |
236 | /// overweighing. The value can be tuned in the future with data that shows |
237 | /// improvements. |
238 | const MAX_LIFO_POLLS_PER_TICK: usize = 3; |
239 | |
240 | pub(super) fn create( |
241 | size: usize, |
242 | park: Parker, |
243 | driver_handle: driver::Handle, |
244 | blocking_spawner: blocking::Spawner, |
245 | seed_generator: RngSeedGenerator, |
246 | config: Config, |
247 | ) -> (Arc<Handle>, Launch) { |
248 | let mut cores = Vec::with_capacity(size); |
249 | let mut remotes = Vec::with_capacity(size); |
250 | let mut worker_metrics = Vec::with_capacity(size); |
251 | |
252 | // Create the local queues |
253 | for _ in 0..size { |
254 | let (steal, run_queue) = queue::local(); |
255 | |
256 | let park = park.clone(); |
257 | let unpark = park.unpark(); |
258 | let metrics = WorkerMetrics::from_config(&config); |
259 | let stats = Stats::new(&metrics); |
260 | |
261 | cores.push(Box::new(Core { |
262 | tick: 0, |
263 | lifo_slot: None, |
264 | lifo_enabled: !config.disable_lifo_slot, |
265 | run_queue, |
266 | is_searching: false, |
267 | is_shutdown: false, |
268 | is_traced: false, |
269 | park: Some(park), |
270 | global_queue_interval: stats.tuned_global_queue_interval(&config), |
271 | stats, |
272 | rand: FastRand::from_seed(config.seed_generator.next_seed()), |
273 | })); |
274 | |
275 | remotes.push(Remote { steal, unpark }); |
276 | worker_metrics.push(metrics); |
277 | } |
278 | |
279 | let (idle, idle_synced) = Idle::new(size); |
280 | let (inject, inject_synced) = inject::Shared::new(); |
281 | |
282 | let remotes_len = remotes.len(); |
283 | let handle = Arc::new(Handle { |
284 | task_hooks: TaskHooks::from_config(&config), |
285 | shared: Shared { |
286 | remotes: remotes.into_boxed_slice(), |
287 | inject, |
288 | idle, |
289 | owned: OwnedTasks::new(size), |
290 | synced: Mutex::new(Synced { |
291 | idle: idle_synced, |
292 | inject: inject_synced, |
293 | }), |
294 | shutdown_cores: Mutex::new(vec![]), |
295 | trace_status: TraceStatus::new(remotes_len), |
296 | config, |
297 | scheduler_metrics: SchedulerMetrics::new(), |
298 | worker_metrics: worker_metrics.into_boxed_slice(), |
299 | _counters: Counters, |
300 | }, |
301 | driver: driver_handle, |
302 | blocking_spawner, |
303 | seed_generator, |
304 | }); |
305 | |
306 | let mut launch = Launch(vec![]); |
307 | |
308 | for (index, core) in cores.drain(..).enumerate() { |
309 | launch.0.push(Arc::new(Worker { |
310 | handle: handle.clone(), |
311 | index, |
312 | core: AtomicCell::new(Some(core)), |
313 | })); |
314 | } |
315 | |
316 | (handle, launch) |
317 | } |
318 | |
319 | #[track_caller ] |
320 | pub(crate) fn block_in_place<F, R>(f: F) -> R |
321 | where |
322 | F: FnOnce() -> R, |
323 | { |
324 | // Try to steal the worker core back |
325 | struct Reset { |
326 | take_core: bool, |
327 | budget: coop::Budget, |
328 | } |
329 | |
330 | impl Drop for Reset { |
331 | fn drop(&mut self) { |
332 | with_current(|maybe_cx| { |
333 | if let Some(cx) = maybe_cx { |
334 | if self.take_core { |
335 | let core = cx.worker.core.take(); |
336 | |
337 | if core.is_some() { |
338 | cx.worker.handle.shared.worker_metrics[cx.worker.index] |
339 | .set_thread_id(thread::current().id()); |
340 | } |
341 | |
342 | let mut cx_core = cx.core.borrow_mut(); |
343 | assert!(cx_core.is_none()); |
344 | *cx_core = core; |
345 | } |
346 | |
347 | // Reset the task budget as we are re-entering the |
348 | // runtime. |
349 | coop::set(self.budget); |
350 | } |
351 | }); |
352 | } |
353 | } |
354 | |
355 | let mut had_entered = false; |
356 | let mut take_core = false; |
357 | |
358 | let setup_result = with_current(|maybe_cx| { |
359 | match ( |
360 | crate::runtime::context::current_enter_context(), |
361 | maybe_cx.is_some(), |
362 | ) { |
363 | (context::EnterRuntime::Entered { .. }, true) => { |
364 | // We are on a thread pool runtime thread, so we just need to |
365 | // set up blocking. |
366 | had_entered = true; |
367 | } |
368 | ( |
369 | context::EnterRuntime::Entered { |
370 | allow_block_in_place, |
371 | }, |
372 | false, |
373 | ) => { |
374 | // We are on an executor, but _not_ on the thread pool. That is |
375 | // _only_ okay if we are in a thread pool runtime's block_on |
376 | // method: |
377 | if allow_block_in_place { |
378 | had_entered = true; |
379 | return Ok(()); |
380 | } else { |
381 | // This probably means we are on the current_thread runtime or in a |
382 | // LocalSet, where it is _not_ okay to block. |
383 | return Err( |
384 | "can call blocking only when running on the multi-threaded runtime" , |
385 | ); |
386 | } |
387 | } |
388 | (context::EnterRuntime::NotEntered, true) => { |
389 | // This is a nested call to block_in_place (we already exited). |
390 | // All the necessary setup has already been done. |
391 | return Ok(()); |
392 | } |
393 | (context::EnterRuntime::NotEntered, false) => { |
394 | // We are outside of the tokio runtime, so blocking is fine. |
395 | // We can also skip all of the thread pool blocking setup steps. |
396 | return Ok(()); |
397 | } |
398 | } |
399 | |
400 | let cx = maybe_cx.expect("no .is_some() == false cases above should lead here" ); |
401 | |
402 | // Get the worker core. If none is set, then blocking is fine! |
403 | let mut core = match cx.core.borrow_mut().take() { |
404 | Some(core) => core, |
405 | None => return Ok(()), |
406 | }; |
407 | |
408 | // If we heavily call `spawn_blocking`, there might be no available thread to |
409 | // run this core. Except for the task in the lifo_slot, all tasks can be |
410 | // stolen, so we move the task out of the lifo_slot to the run_queue. |
411 | if let Some(task) = core.lifo_slot.take() { |
412 | core.run_queue |
413 | .push_back_or_overflow(task, &*cx.worker.handle, &mut core.stats); |
414 | } |
415 | |
416 | // We are taking the core from the context and sending it to another |
417 | // thread. |
418 | take_core = true; |
419 | |
420 | // The parker should be set here |
421 | assert!(core.park.is_some()); |
422 | |
423 | // In order to block, the core must be sent to another thread for |
424 | // execution. |
425 | // |
426 | // First, move the core back into the worker's shared core slot. |
427 | cx.worker.core.set(core); |
428 | |
429 | // Next, clone the worker handle and send it to a new thread for |
430 | // processing. |
431 | // |
432 | // Once the blocking task is done executing, we will attempt to |
433 | // steal the core back. |
434 | let worker = cx.worker.clone(); |
435 | runtime::spawn_blocking(move || run(worker)); |
436 | Ok(()) |
437 | }); |
438 | |
439 | if let Err(panic_message) = setup_result { |
440 | panic!("{}" , panic_message); |
441 | } |
442 | |
443 | if had_entered { |
444 | // Unset the current task's budget. Blocking sections are not |
445 | // constrained by task budgets. |
446 | let _reset = Reset { |
447 | take_core, |
448 | budget: coop::stop(), |
449 | }; |
450 | |
451 | crate::runtime::context::exit_runtime(f) |
452 | } else { |
453 | f() |
454 | } |
455 | } |
456 | |
457 | impl Launch { |
458 | pub(crate) fn launch(mut self) { |
459 | for worker: Arc in self.0.drain(..) { |
460 | runtime::spawn_blocking(func:move || run(worker)); |
461 | } |
462 | } |
463 | } |
464 | |
465 | fn run(worker: Arc<Worker>) { |
466 | #[allow (dead_code)] |
467 | struct AbortOnPanic; |
468 | |
469 | impl Drop for AbortOnPanic { |
470 | fn drop(&mut self) { |
471 | if std::thread::panicking() { |
472 | eprintln!("worker thread panicking; aborting process" ); |
473 | std::process::abort(); |
474 | } |
475 | } |
476 | } |
477 | |
478 | // Catching panics on worker threads in tests is quite tricky. Instead, when |
479 | // debug assertions are enabled, we just abort the process. |
480 | #[cfg (debug_assertions)] |
481 | let _abort_on_panic = AbortOnPanic; |
482 | |
483 | // Acquire a core. If this fails, then another thread is running this |
484 | // worker and there is nothing further to do. |
485 | let core = match worker.core.take() { |
486 | Some(core) => core, |
487 | None => return, |
488 | }; |
489 | |
490 | worker.handle.shared.worker_metrics[worker.index].set_thread_id(thread::current().id()); |
491 | |
492 | let handle = scheduler::Handle::MultiThread(worker.handle.clone()); |
493 | |
494 | crate::runtime::context::enter_runtime(&handle, true, |_| { |
495 | // Set the worker context. |
496 | let cx = scheduler::Context::MultiThread(Context { |
497 | worker, |
498 | core: RefCell::new(None), |
499 | defer: Defer::new(), |
500 | }); |
501 | |
502 | context::set_scheduler(&cx, || { |
503 | let cx = cx.expect_multi_thread(); |
504 | |
505 | // This should always be an error. It only returns a `Result` to support |
506 | // using `?` to short circuit. |
507 | assert!(cx.run(core).is_err()); |
508 | |
509 | // Check if there are any deferred tasks to notify. This can happen when |
510 | // the worker core is lost due to `block_in_place()` being called from |
511 | // within the task. |
512 | cx.defer.wake(); |
513 | }); |
514 | }); |
515 | } |
516 | |
517 | impl Context { |
518 | fn run(&self, mut core: Box<Core>) -> RunResult { |
519 | // Reset `lifo_enabled` here in case the core was previously stolen from |
520 | // a task that had the LIFO slot disabled. |
521 | self.reset_lifo_enabled(&mut core); |
522 | |
523 | // Start as "processing" tasks as polling tasks from the local queue |
524 | // will be one of the first things we do. |
525 | core.stats.start_processing_scheduled_tasks(); |
526 | |
527 | while !core.is_shutdown { |
528 | self.assert_lifo_enabled_is_correct(&core); |
529 | |
530 | if core.is_traced { |
531 | core = self.worker.handle.trace_core(core); |
532 | } |
533 | |
534 | // Increment the tick |
535 | core.tick(); |
536 | |
537 | // Run maintenance, if needed |
538 | core = self.maintenance(core); |
539 | |
540 | // First, check work available to the current worker. |
541 | if let Some(task) = core.next_task(&self.worker) { |
542 | core = self.run_task(task, core)?; |
543 | continue; |
544 | } |
545 | |
546 | // We consumed all work in the queues and will start searching for work. |
547 | core.stats.end_processing_scheduled_tasks(); |
548 | |
549 | // There is no more **local** work to process, try to steal work |
550 | // from other workers. |
551 | if let Some(task) = core.steal_work(&self.worker) { |
552 | // Found work, switch back to processing |
553 | core.stats.start_processing_scheduled_tasks(); |
554 | core = self.run_task(task, core)?; |
555 | } else { |
556 | // Wait for work |
557 | core = if !self.defer.is_empty() { |
558 | self.park_timeout(core, Some(Duration::from_millis(0))) |
559 | } else { |
560 | self.park(core) |
561 | }; |
562 | core.stats.start_processing_scheduled_tasks(); |
563 | } |
564 | } |
565 | |
566 | core.pre_shutdown(&self.worker); |
567 | // Signal shutdown |
568 | self.worker.handle.shutdown_core(core); |
569 | Err(()) |
570 | } |
571 | |
572 | fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult { |
573 | #[cfg (tokio_unstable)] |
574 | let task_id = task.task_id(); |
575 | |
576 | let task = self.worker.handle.shared.owned.assert_owner(task); |
577 | |
578 | // Make sure the worker is not in the **searching** state. This enables |
579 | // another idle worker to try to steal work. |
580 | core.transition_from_searching(&self.worker); |
581 | |
582 | self.assert_lifo_enabled_is_correct(&core); |
583 | |
584 | // Measure the poll start time. Note that we may end up polling other |
585 | // tasks under this measurement. In this case, the tasks came from the |
586 | // LIFO slot and are considered part of the current task for scheduling |
587 | // purposes. These tasks inherent the "parent"'s limits. |
588 | core.stats.start_poll(); |
589 | |
590 | // Make the core available to the runtime context |
591 | *self.core.borrow_mut() = Some(core); |
592 | |
593 | // Run the task |
594 | coop::budget(|| { |
595 | // Unlike the poll time above, poll start callback is attached to the task id, |
596 | // so it is tightly associated with the actual poll invocation. |
597 | #[cfg (tokio_unstable)] |
598 | self.worker.handle.task_hooks.poll_start_callback(task_id); |
599 | |
600 | task.run(); |
601 | |
602 | #[cfg (tokio_unstable)] |
603 | self.worker.handle.task_hooks.poll_stop_callback(task_id); |
604 | |
605 | let mut lifo_polls = 0; |
606 | |
607 | // As long as there is budget remaining and a task exists in the |
608 | // `lifo_slot`, then keep running. |
609 | loop { |
610 | // Check if we still have the core. If not, the core was stolen |
611 | // by another worker. |
612 | let mut core = match self.core.borrow_mut().take() { |
613 | Some(core) => core, |
614 | None => { |
615 | // In this case, we cannot call `reset_lifo_enabled()` |
616 | // because the core was stolen. The stealer will handle |
617 | // that at the top of `Context::run` |
618 | return Err(()); |
619 | } |
620 | }; |
621 | |
622 | // Check for a task in the LIFO slot |
623 | let task = match core.lifo_slot.take() { |
624 | Some(task) => task, |
625 | None => { |
626 | self.reset_lifo_enabled(&mut core); |
627 | core.stats.end_poll(); |
628 | return Ok(core); |
629 | } |
630 | }; |
631 | |
632 | if !coop::has_budget_remaining() { |
633 | core.stats.end_poll(); |
634 | |
635 | // Not enough budget left to run the LIFO task, push it to |
636 | // the back of the queue and return. |
637 | core.run_queue.push_back_or_overflow( |
638 | task, |
639 | &*self.worker.handle, |
640 | &mut core.stats, |
641 | ); |
642 | // If we hit this point, the LIFO slot should be enabled. |
643 | // There is no need to reset it. |
644 | debug_assert!(core.lifo_enabled); |
645 | return Ok(core); |
646 | } |
647 | |
648 | // Track that we are about to run a task from the LIFO slot. |
649 | lifo_polls += 1; |
650 | super::counters::inc_lifo_schedules(); |
651 | |
652 | // Disable the LIFO slot if we reach our limit |
653 | // |
654 | // In ping-ping style workloads where task A notifies task B, |
655 | // which notifies task A again, continuously prioritizing the |
656 | // LIFO slot can cause starvation as these two tasks will |
657 | // repeatedly schedule the other. To mitigate this, we limit the |
658 | // number of times the LIFO slot is prioritized. |
659 | if lifo_polls >= MAX_LIFO_POLLS_PER_TICK { |
660 | core.lifo_enabled = false; |
661 | super::counters::inc_lifo_capped(); |
662 | } |
663 | |
664 | // Run the LIFO task, then loop |
665 | *self.core.borrow_mut() = Some(core); |
666 | let task = self.worker.handle.shared.owned.assert_owner(task); |
667 | |
668 | #[cfg (tokio_unstable)] |
669 | let task_id = task.task_id(); |
670 | |
671 | #[cfg (tokio_unstable)] |
672 | self.worker.handle.task_hooks.poll_start_callback(task_id); |
673 | |
674 | task.run(); |
675 | |
676 | #[cfg (tokio_unstable)] |
677 | self.worker.handle.task_hooks.poll_stop_callback(task_id); |
678 | } |
679 | }) |
680 | } |
681 | |
682 | fn reset_lifo_enabled(&self, core: &mut Core) { |
683 | core.lifo_enabled = !self.worker.handle.shared.config.disable_lifo_slot; |
684 | } |
685 | |
686 | fn assert_lifo_enabled_is_correct(&self, core: &Core) { |
687 | debug_assert_eq!( |
688 | core.lifo_enabled, |
689 | !self.worker.handle.shared.config.disable_lifo_slot |
690 | ); |
691 | } |
692 | |
693 | fn maintenance(&self, mut core: Box<Core>) -> Box<Core> { |
694 | if core.tick % self.worker.handle.shared.config.event_interval == 0 { |
695 | super::counters::inc_num_maintenance(); |
696 | |
697 | core.stats.end_processing_scheduled_tasks(); |
698 | |
699 | // Call `park` with a 0 timeout. This enables the I/O driver, timer, ... |
700 | // to run without actually putting the thread to sleep. |
701 | core = self.park_timeout(core, Some(Duration::from_millis(0))); |
702 | |
703 | // Run regularly scheduled maintenance |
704 | core.maintenance(&self.worker); |
705 | |
706 | core.stats.start_processing_scheduled_tasks(); |
707 | } |
708 | |
709 | core |
710 | } |
711 | |
712 | /// Parks the worker thread while waiting for tasks to execute. |
713 | /// |
714 | /// This function checks if indeed there's no more work left to be done before parking. |
715 | /// Also important to notice that, before parking, the worker thread will try to take |
716 | /// ownership of the Driver (IO/Time) and dispatch any events that might have fired. |
717 | /// Whenever a worker thread executes the Driver loop, all waken tasks are scheduled |
718 | /// in its own local queue until the queue saturates (ntasks > `LOCAL_QUEUE_CAPACITY`). |
719 | /// When the local queue is saturated, the overflow tasks are added to the injection queue |
720 | /// from where other workers can pick them up. |
721 | /// Also, we rely on the workstealing algorithm to spread the tasks amongst workers |
722 | /// after all the IOs get dispatched |
723 | fn park(&self, mut core: Box<Core>) -> Box<Core> { |
724 | if let Some(f) = &self.worker.handle.shared.config.before_park { |
725 | f(); |
726 | } |
727 | |
728 | if core.transition_to_parked(&self.worker) { |
729 | while !core.is_shutdown && !core.is_traced { |
730 | core.stats.about_to_park(); |
731 | core.stats |
732 | .submit(&self.worker.handle.shared.worker_metrics[self.worker.index]); |
733 | |
734 | core = self.park_timeout(core, None); |
735 | |
736 | core.stats.unparked(); |
737 | |
738 | // Run regularly scheduled maintenance |
739 | core.maintenance(&self.worker); |
740 | |
741 | if core.transition_from_parked(&self.worker) { |
742 | break; |
743 | } |
744 | } |
745 | } |
746 | |
747 | if let Some(f) = &self.worker.handle.shared.config.after_unpark { |
748 | f(); |
749 | } |
750 | core |
751 | } |
752 | |
753 | fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> { |
754 | self.assert_lifo_enabled_is_correct(&core); |
755 | |
756 | // Take the parker out of core |
757 | let mut park = core.park.take().expect("park missing" ); |
758 | |
759 | // Store `core` in context |
760 | *self.core.borrow_mut() = Some(core); |
761 | |
762 | // Park thread |
763 | if let Some(timeout) = duration { |
764 | park.park_timeout(&self.worker.handle.driver, timeout); |
765 | } else { |
766 | park.park(&self.worker.handle.driver); |
767 | } |
768 | |
769 | self.defer.wake(); |
770 | |
771 | // Remove `core` from context |
772 | core = self.core.borrow_mut().take().expect("core missing" ); |
773 | |
774 | // Place `park` back in `core` |
775 | core.park = Some(park); |
776 | |
777 | if core.should_notify_others() { |
778 | self.worker.handle.notify_parked_local(); |
779 | } |
780 | |
781 | core |
782 | } |
783 | |
784 | pub(crate) fn defer(&self, waker: &Waker) { |
785 | if self.core.borrow().is_none() { |
786 | // If there is no core, then the worker is currently in a block_in_place. In this case, |
787 | // we cannot use the defer queue as we aren't really in the current runtime. |
788 | waker.wake_by_ref(); |
789 | } else { |
790 | self.defer.defer(waker); |
791 | } |
792 | } |
793 | |
794 | #[allow (dead_code)] |
795 | pub(crate) fn get_worker_index(&self) -> usize { |
796 | self.worker.index |
797 | } |
798 | } |
799 | |
800 | impl Core { |
801 | /// Increment the tick |
802 | fn tick(&mut self) { |
803 | self.tick = self.tick.wrapping_add(1); |
804 | } |
805 | |
806 | /// Return the next notified task available to this worker. |
807 | fn next_task(&mut self, worker: &Worker) -> Option<Notified> { |
808 | if self.tick % self.global_queue_interval == 0 { |
809 | // Update the global queue interval, if needed |
810 | self.tune_global_queue_interval(worker); |
811 | |
812 | worker |
813 | .handle |
814 | .next_remote_task() |
815 | .or_else(|| self.next_local_task()) |
816 | } else { |
817 | let maybe_task = self.next_local_task(); |
818 | |
819 | if maybe_task.is_some() { |
820 | return maybe_task; |
821 | } |
822 | |
823 | if worker.inject().is_empty() { |
824 | return None; |
825 | } |
826 | |
827 | // Other threads can only **remove** tasks from the current worker's |
828 | // `run_queue`. So, we can be confident that by the time we call |
829 | // `run_queue.push_back` below, there will be *at least* `cap` |
830 | // available slots in the queue. |
831 | let cap = usize::min( |
832 | self.run_queue.remaining_slots(), |
833 | self.run_queue.max_capacity() / 2, |
834 | ); |
835 | |
836 | // The worker is currently idle, pull a batch of work from the |
837 | // injection queue. We don't want to pull *all* the work so other |
838 | // workers can also get some. |
839 | let n = usize::min( |
840 | worker.inject().len() / worker.handle.shared.remotes.len() + 1, |
841 | cap, |
842 | ); |
843 | |
844 | // Take at least one task since the first task is returned directly |
845 | // and not pushed onto the local queue. |
846 | let n = usize::max(1, n); |
847 | |
848 | let mut synced = worker.handle.shared.synced.lock(); |
849 | // safety: passing in the correct `inject::Synced`. |
850 | let mut tasks = unsafe { worker.inject().pop_n(&mut synced.inject, n) }; |
851 | |
852 | // Pop the first task to return immediately |
853 | let ret = tasks.next(); |
854 | |
855 | // Push the rest of the on the run queue |
856 | self.run_queue.push_back(tasks); |
857 | |
858 | ret |
859 | } |
860 | } |
861 | |
862 | fn next_local_task(&mut self) -> Option<Notified> { |
863 | self.lifo_slot.take().or_else(|| self.run_queue.pop()) |
864 | } |
865 | |
866 | /// Function responsible for stealing tasks from another worker |
867 | /// |
868 | /// Note: Only if less than half the workers are searching for tasks to steal |
869 | /// a new worker will actually try to steal. The idea is to make sure not all |
870 | /// workers will be trying to steal at the same time. |
871 | fn steal_work(&mut self, worker: &Worker) -> Option<Notified> { |
872 | if !self.transition_to_searching(worker) { |
873 | return None; |
874 | } |
875 | |
876 | let num = worker.handle.shared.remotes.len(); |
877 | // Start from a random worker |
878 | let start = self.rand.fastrand_n(num as u32) as usize; |
879 | |
880 | for i in 0..num { |
881 | let i = (start + i) % num; |
882 | |
883 | // Don't steal from ourself! We know we don't have work. |
884 | if i == worker.index { |
885 | continue; |
886 | } |
887 | |
888 | let target = &worker.handle.shared.remotes[i]; |
889 | if let Some(task) = target |
890 | .steal |
891 | .steal_into(&mut self.run_queue, &mut self.stats) |
892 | { |
893 | return Some(task); |
894 | } |
895 | } |
896 | |
897 | // Fallback on checking the global queue |
898 | worker.handle.next_remote_task() |
899 | } |
900 | |
901 | fn transition_to_searching(&mut self, worker: &Worker) -> bool { |
902 | if !self.is_searching { |
903 | self.is_searching = worker.handle.shared.idle.transition_worker_to_searching(); |
904 | } |
905 | |
906 | self.is_searching |
907 | } |
908 | |
909 | fn transition_from_searching(&mut self, worker: &Worker) { |
910 | if !self.is_searching { |
911 | return; |
912 | } |
913 | |
914 | self.is_searching = false; |
915 | worker.handle.transition_worker_from_searching(); |
916 | } |
917 | |
918 | fn has_tasks(&self) -> bool { |
919 | self.lifo_slot.is_some() || self.run_queue.has_tasks() |
920 | } |
921 | |
922 | fn should_notify_others(&self) -> bool { |
923 | // If there are tasks available to steal, but this worker is not |
924 | // looking for tasks to steal, notify another worker. |
925 | if self.is_searching { |
926 | return false; |
927 | } |
928 | self.lifo_slot.is_some() as usize + self.run_queue.len() > 1 |
929 | } |
930 | |
931 | /// Prepares the worker state for parking. |
932 | /// |
933 | /// Returns true if the transition happened, false if there is work to do first. |
934 | fn transition_to_parked(&mut self, worker: &Worker) -> bool { |
935 | // Workers should not park if they have work to do |
936 | if self.has_tasks() || self.is_traced { |
937 | return false; |
938 | } |
939 | |
940 | // When the final worker transitions **out** of searching to parked, it |
941 | // must check all the queues one last time in case work materialized |
942 | // between the last work scan and transitioning out of searching. |
943 | let is_last_searcher = worker.handle.shared.idle.transition_worker_to_parked( |
944 | &worker.handle.shared, |
945 | worker.index, |
946 | self.is_searching, |
947 | ); |
948 | |
949 | // The worker is no longer searching. Setting this is the local cache |
950 | // only. |
951 | self.is_searching = false; |
952 | |
953 | if is_last_searcher { |
954 | worker.handle.notify_if_work_pending(); |
955 | } |
956 | |
957 | true |
958 | } |
959 | |
960 | /// Returns `true` if the transition happened. |
961 | fn transition_from_parked(&mut self, worker: &Worker) -> bool { |
962 | // If a task is in the lifo slot/run queue, then we must unpark regardless of |
963 | // being notified |
964 | if self.has_tasks() { |
965 | // When a worker wakes, it should only transition to the "searching" |
966 | // state when the wake originates from another worker *or* a new task |
967 | // is pushed. We do *not* want the worker to transition to "searching" |
968 | // when it wakes when the I/O driver receives new events. |
969 | self.is_searching = !worker |
970 | .handle |
971 | .shared |
972 | .idle |
973 | .unpark_worker_by_id(&worker.handle.shared, worker.index); |
974 | return true; |
975 | } |
976 | |
977 | if worker |
978 | .handle |
979 | .shared |
980 | .idle |
981 | .is_parked(&worker.handle.shared, worker.index) |
982 | { |
983 | return false; |
984 | } |
985 | |
986 | // When unparked, the worker is in the searching state. |
987 | self.is_searching = true; |
988 | true |
989 | } |
990 | |
991 | /// Runs maintenance work such as checking the pool's state. |
992 | fn maintenance(&mut self, worker: &Worker) { |
993 | self.stats |
994 | .submit(&worker.handle.shared.worker_metrics[worker.index]); |
995 | |
996 | if !self.is_shutdown { |
997 | // Check if the scheduler has been shutdown |
998 | let synced = worker.handle.shared.synced.lock(); |
999 | self.is_shutdown = worker.inject().is_closed(&synced.inject); |
1000 | } |
1001 | |
1002 | if !self.is_traced { |
1003 | // Check if the worker should be tracing. |
1004 | self.is_traced = worker.handle.shared.trace_status.trace_requested(); |
1005 | } |
1006 | } |
1007 | |
1008 | /// Signals all tasks to shut down, and waits for them to complete. Must run |
1009 | /// before we enter the single-threaded phase of shutdown processing. |
1010 | fn pre_shutdown(&mut self, worker: &Worker) { |
1011 | // Start from a random inner list |
1012 | let start = self |
1013 | .rand |
1014 | .fastrand_n(worker.handle.shared.owned.get_shard_size() as u32); |
1015 | // Signal to all tasks to shut down. |
1016 | worker |
1017 | .handle |
1018 | .shared |
1019 | .owned |
1020 | .close_and_shutdown_all(start as usize); |
1021 | |
1022 | self.stats |
1023 | .submit(&worker.handle.shared.worker_metrics[worker.index]); |
1024 | } |
1025 | |
1026 | /// Shuts down the core. |
1027 | fn shutdown(&mut self, handle: &Handle) { |
1028 | // Take the core |
1029 | let mut park = self.park.take().expect("park missing" ); |
1030 | |
1031 | // Drain the queue |
1032 | while self.next_local_task().is_some() {} |
1033 | |
1034 | park.shutdown(&handle.driver); |
1035 | } |
1036 | |
1037 | fn tune_global_queue_interval(&mut self, worker: &Worker) { |
1038 | let next = self |
1039 | .stats |
1040 | .tuned_global_queue_interval(&worker.handle.shared.config); |
1041 | |
1042 | // Smooth out jitter |
1043 | if u32::abs_diff(self.global_queue_interval, next) > 2 { |
1044 | self.global_queue_interval = next; |
1045 | } |
1046 | } |
1047 | } |
1048 | |
1049 | impl Worker { |
1050 | /// Returns a reference to the scheduler's injection queue. |
1051 | fn inject(&self) -> &inject::Shared<Arc<Handle>> { |
1052 | &self.handle.shared.inject |
1053 | } |
1054 | } |
1055 | |
1056 | // TODO: Move `Handle` impls into handle.rs |
1057 | impl task::Schedule for Arc<Handle> { |
1058 | fn release(&self, task: &Task) -> Option<Task> { |
1059 | self.shared.owned.remove(task) |
1060 | } |
1061 | |
1062 | fn schedule(&self, task: Notified) { |
1063 | self.schedule_task(task, is_yield:false); |
1064 | } |
1065 | |
1066 | fn hooks(&self) -> TaskHarnessScheduleHooks { |
1067 | TaskHarnessScheduleHooks { |
1068 | task_terminate_callback: self.task_hooks.task_terminate_callback.clone(), |
1069 | } |
1070 | } |
1071 | |
1072 | fn yield_now(&self, task: Notified) { |
1073 | self.schedule_task(task, is_yield:true); |
1074 | } |
1075 | } |
1076 | |
1077 | impl Handle { |
1078 | pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) { |
1079 | with_current(|maybe_cx| { |
1080 | if let Some(cx) = maybe_cx { |
1081 | // Make sure the task is part of the **current** scheduler. |
1082 | if self.ptr_eq(&cx.worker.handle) { |
1083 | // And the current thread still holds a core |
1084 | if let Some(core) = cx.core.borrow_mut().as_mut() { |
1085 | self.schedule_local(core, task, is_yield); |
1086 | return; |
1087 | } |
1088 | } |
1089 | } |
1090 | |
1091 | // Otherwise, use the inject queue. |
1092 | self.push_remote_task(task); |
1093 | self.notify_parked_remote(); |
1094 | }); |
1095 | } |
1096 | |
1097 | pub(super) fn schedule_option_task_without_yield(&self, task: Option<Notified>) { |
1098 | if let Some(task) = task { |
1099 | self.schedule_task(task, false); |
1100 | } |
1101 | } |
1102 | |
1103 | fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) { |
1104 | core.stats.inc_local_schedule_count(); |
1105 | |
1106 | // Spawning from the worker thread. If scheduling a "yield" then the |
1107 | // task must always be pushed to the back of the queue, enabling other |
1108 | // tasks to be executed. If **not** a yield, then there is more |
1109 | // flexibility and the task may go to the front of the queue. |
1110 | let should_notify = if is_yield || !core.lifo_enabled { |
1111 | core.run_queue |
1112 | .push_back_or_overflow(task, self, &mut core.stats); |
1113 | true |
1114 | } else { |
1115 | // Push to the LIFO slot |
1116 | let prev = core.lifo_slot.take(); |
1117 | let ret = prev.is_some(); |
1118 | |
1119 | if let Some(prev) = prev { |
1120 | core.run_queue |
1121 | .push_back_or_overflow(prev, self, &mut core.stats); |
1122 | } |
1123 | |
1124 | core.lifo_slot = Some(task); |
1125 | |
1126 | ret |
1127 | }; |
1128 | |
1129 | // Only notify if not currently parked. If `park` is `None`, then the |
1130 | // scheduling is from a resource driver. As notifications often come in |
1131 | // batches, the notification is delayed until the park is complete. |
1132 | if should_notify && core.park.is_some() { |
1133 | self.notify_parked_local(); |
1134 | } |
1135 | } |
1136 | |
1137 | fn next_remote_task(&self) -> Option<Notified> { |
1138 | if self.shared.inject.is_empty() { |
1139 | return None; |
1140 | } |
1141 | |
1142 | let mut synced = self.shared.synced.lock(); |
1143 | // safety: passing in correct `idle::Synced` |
1144 | unsafe { self.shared.inject.pop(&mut synced.inject) } |
1145 | } |
1146 | |
1147 | fn push_remote_task(&self, task: Notified) { |
1148 | self.shared.scheduler_metrics.inc_remote_schedule_count(); |
1149 | |
1150 | let mut synced = self.shared.synced.lock(); |
1151 | // safety: passing in correct `idle::Synced` |
1152 | unsafe { |
1153 | self.shared.inject.push(&mut synced.inject, task); |
1154 | } |
1155 | } |
1156 | |
1157 | pub(super) fn close(&self) { |
1158 | if self |
1159 | .shared |
1160 | .inject |
1161 | .close(&mut self.shared.synced.lock().inject) |
1162 | { |
1163 | self.notify_all(); |
1164 | } |
1165 | } |
1166 | |
1167 | fn notify_parked_local(&self) { |
1168 | super::counters::inc_num_inc_notify_local(); |
1169 | |
1170 | if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) { |
1171 | super::counters::inc_num_unparks_local(); |
1172 | self.shared.remotes[index].unpark.unpark(&self.driver); |
1173 | } |
1174 | } |
1175 | |
1176 | fn notify_parked_remote(&self) { |
1177 | if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) { |
1178 | self.shared.remotes[index].unpark.unpark(&self.driver); |
1179 | } |
1180 | } |
1181 | |
1182 | pub(super) fn notify_all(&self) { |
1183 | for remote in &self.shared.remotes[..] { |
1184 | remote.unpark.unpark(&self.driver); |
1185 | } |
1186 | } |
1187 | |
1188 | fn notify_if_work_pending(&self) { |
1189 | for remote in &self.shared.remotes[..] { |
1190 | if !remote.steal.is_empty() { |
1191 | self.notify_parked_local(); |
1192 | return; |
1193 | } |
1194 | } |
1195 | |
1196 | if !self.shared.inject.is_empty() { |
1197 | self.notify_parked_local(); |
1198 | } |
1199 | } |
1200 | |
1201 | fn transition_worker_from_searching(&self) { |
1202 | if self.shared.idle.transition_worker_from_searching() { |
1203 | // We are the final searching worker. Because work was found, we |
1204 | // need to notify another worker. |
1205 | self.notify_parked_local(); |
1206 | } |
1207 | } |
1208 | |
1209 | /// Signals that a worker has observed the shutdown signal and has replaced |
1210 | /// its core back into its handle. |
1211 | /// |
1212 | /// If all workers have reached this point, the final cleanup is performed. |
1213 | fn shutdown_core(&self, core: Box<Core>) { |
1214 | let mut cores = self.shared.shutdown_cores.lock(); |
1215 | cores.push(core); |
1216 | |
1217 | if cores.len() != self.shared.remotes.len() { |
1218 | return; |
1219 | } |
1220 | |
1221 | debug_assert!(self.shared.owned.is_empty()); |
1222 | |
1223 | for mut core in cores.drain(..) { |
1224 | core.shutdown(self); |
1225 | } |
1226 | |
1227 | // Drain the injection queue |
1228 | // |
1229 | // We already shut down every task, so we can simply drop the tasks. |
1230 | while let Some(task) = self.next_remote_task() { |
1231 | drop(task); |
1232 | } |
1233 | } |
1234 | |
1235 | fn ptr_eq(&self, other: &Handle) -> bool { |
1236 | std::ptr::eq(self, other) |
1237 | } |
1238 | } |
1239 | |
1240 | impl Overflow<Arc<Handle>> for Handle { |
1241 | fn push(&self, task: task::Notified<Arc<Handle>>) { |
1242 | self.push_remote_task(task); |
1243 | } |
1244 | |
1245 | fn push_batch<I>(&self, iter: I) |
1246 | where |
1247 | I: Iterator<Item = task::Notified<Arc<Handle>>>, |
1248 | { |
1249 | unsafe { |
1250 | self.shared.inject.push_batch(self, iter); |
1251 | } |
1252 | } |
1253 | } |
1254 | |
1255 | pub(crate) struct InjectGuard<'a> { |
1256 | lock: crate::loom::sync::MutexGuard<'a, Synced>, |
1257 | } |
1258 | |
1259 | impl<'a> AsMut<inject::Synced> for InjectGuard<'a> { |
1260 | fn as_mut(&mut self) -> &mut inject::Synced { |
1261 | &mut self.lock.inject |
1262 | } |
1263 | } |
1264 | |
1265 | impl<'a> Lock<inject::Synced> for &'a Handle { |
1266 | type Handle = InjectGuard<'a>; |
1267 | |
1268 | fn lock(self) -> Self::Handle { |
1269 | InjectGuard { |
1270 | lock: self.shared.synced.lock(), |
1271 | } |
1272 | } |
1273 | } |
1274 | |
1275 | #[track_caller ] |
1276 | fn with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R { |
1277 | use scheduler::Context::MultiThread; |
1278 | |
1279 | context::with_scheduler(|ctx: Option<&Context>| match ctx { |
1280 | Some(MultiThread(ctx: &Context)) => f(Some(ctx)), |
1281 | _ => f(None), |
1282 | }) |
1283 | } |
1284 | |