1 | use crate::loom::sync::atomic::AtomicBool; |
2 | use crate::loom::sync::Arc; |
3 | use crate::runtime::driver::{self, Driver}; |
4 | use crate::runtime::scheduler::{self, Defer, Inject}; |
5 | use crate::runtime::task::{ |
6 | self, JoinHandle, OwnedTasks, Schedule, Task, TaskHarnessScheduleHooks, |
7 | }; |
8 | use crate::runtime::{ |
9 | blocking, context, Config, MetricsBatch, SchedulerMetrics, TaskHooks, TaskMeta, WorkerMetrics, |
10 | }; |
11 | use crate::sync::notify::Notify; |
12 | use crate::util::atomic_cell::AtomicCell; |
13 | use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef}; |
14 | |
15 | use std::cell::RefCell; |
16 | use std::collections::VecDeque; |
17 | use std::future::{poll_fn, Future}; |
18 | use std::sync::atomic::Ordering::{AcqRel, Release}; |
19 | use std::task::Poll::{Pending, Ready}; |
20 | use std::task::Waker; |
21 | use std::thread::ThreadId; |
22 | use std::time::Duration; |
23 | use std::{fmt, thread}; |
24 | |
25 | /// Executes tasks on the current thread |
26 | pub(crate) struct CurrentThread { |
27 | /// Core scheduler data is acquired by a thread entering `block_on`. |
28 | core: AtomicCell<Core>, |
29 | |
30 | /// Notifier for waking up other threads to steal the |
31 | /// driver. |
32 | notify: Notify, |
33 | } |
34 | |
35 | /// Handle to the current thread scheduler |
36 | pub(crate) struct Handle { |
37 | /// Scheduler state shared across threads |
38 | shared: Shared, |
39 | |
40 | /// Resource driver handles |
41 | pub(crate) driver: driver::Handle, |
42 | |
43 | /// Blocking pool spawner |
44 | pub(crate) blocking_spawner: blocking::Spawner, |
45 | |
46 | /// Current random number generator seed |
47 | pub(crate) seed_generator: RngSeedGenerator, |
48 | |
49 | /// User-supplied hooks to invoke for things |
50 | pub(crate) task_hooks: TaskHooks, |
51 | |
52 | /// If this is a `LocalRuntime`, flags the owning thread ID. |
53 | pub(crate) local_tid: Option<ThreadId>, |
54 | } |
55 | |
56 | /// Data required for executing the scheduler. The struct is passed around to |
57 | /// a function that will perform the scheduling work and acts as a capability token. |
58 | struct Core { |
59 | /// Scheduler run queue |
60 | tasks: VecDeque<Notified>, |
61 | |
62 | /// Current tick |
63 | tick: u32, |
64 | |
65 | /// Runtime driver |
66 | /// |
67 | /// The driver is removed before starting to park the thread |
68 | driver: Option<Driver>, |
69 | |
70 | /// Metrics batch |
71 | metrics: MetricsBatch, |
72 | |
73 | /// How often to check the global queue |
74 | global_queue_interval: u32, |
75 | |
76 | /// True if a task panicked without being handled and the runtime is |
77 | /// configured to shutdown on unhandled panic. |
78 | unhandled_panic: bool, |
79 | } |
80 | |
81 | /// Scheduler state shared between threads. |
82 | struct Shared { |
83 | /// Remote run queue |
84 | inject: Inject<Arc<Handle>>, |
85 | |
86 | /// Collection of all active tasks spawned onto this executor. |
87 | owned: OwnedTasks<Arc<Handle>>, |
88 | |
89 | /// Indicates whether the blocked on thread was woken. |
90 | woken: AtomicBool, |
91 | |
92 | /// Scheduler configuration options |
93 | config: Config, |
94 | |
95 | /// Keeps track of various runtime metrics. |
96 | scheduler_metrics: SchedulerMetrics, |
97 | |
98 | /// This scheduler only has one worker. |
99 | worker_metrics: WorkerMetrics, |
100 | } |
101 | |
102 | /// Thread-local context. |
103 | /// |
104 | /// pub(crate) to store in `runtime::context`. |
105 | pub(crate) struct Context { |
106 | /// Scheduler handle |
107 | handle: Arc<Handle>, |
108 | |
109 | /// Scheduler core, enabling the holder of `Context` to execute the |
110 | /// scheduler. |
111 | core: RefCell<Option<Box<Core>>>, |
112 | |
113 | /// Deferred tasks, usually ones that called `task::yield_now()`. |
114 | pub(crate) defer: Defer, |
115 | } |
116 | |
117 | type Notified = task::Notified<Arc<Handle>>; |
118 | |
119 | /// Initial queue capacity. |
120 | const INITIAL_CAPACITY: usize = 64; |
121 | |
122 | /// Used if none is specified. This is a temporary constant and will be removed |
123 | /// as we unify tuning logic between the multi-thread and current-thread |
124 | /// schedulers. |
125 | const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 31; |
126 | |
127 | impl CurrentThread { |
128 | pub(crate) fn new( |
129 | driver: Driver, |
130 | driver_handle: driver::Handle, |
131 | blocking_spawner: blocking::Spawner, |
132 | seed_generator: RngSeedGenerator, |
133 | config: Config, |
134 | local_tid: Option<ThreadId>, |
135 | ) -> (CurrentThread, Arc<Handle>) { |
136 | let worker_metrics = WorkerMetrics::from_config(&config); |
137 | worker_metrics.set_thread_id(thread::current().id()); |
138 | |
139 | // Get the configured global queue interval, or use the default. |
140 | let global_queue_interval = config |
141 | .global_queue_interval |
142 | .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL); |
143 | |
144 | let handle = Arc::new(Handle { |
145 | task_hooks: TaskHooks { |
146 | task_spawn_callback: config.before_spawn.clone(), |
147 | task_terminate_callback: config.after_termination.clone(), |
148 | #[cfg (tokio_unstable)] |
149 | before_poll_callback: config.before_poll.clone(), |
150 | #[cfg (tokio_unstable)] |
151 | after_poll_callback: config.after_poll.clone(), |
152 | }, |
153 | shared: Shared { |
154 | inject: Inject::new(), |
155 | owned: OwnedTasks::new(1), |
156 | woken: AtomicBool::new(false), |
157 | config, |
158 | scheduler_metrics: SchedulerMetrics::new(), |
159 | worker_metrics, |
160 | }, |
161 | driver: driver_handle, |
162 | blocking_spawner, |
163 | seed_generator, |
164 | local_tid, |
165 | }); |
166 | |
167 | let core = AtomicCell::new(Some(Box::new(Core { |
168 | tasks: VecDeque::with_capacity(INITIAL_CAPACITY), |
169 | tick: 0, |
170 | driver: Some(driver), |
171 | metrics: MetricsBatch::new(&handle.shared.worker_metrics), |
172 | global_queue_interval, |
173 | unhandled_panic: false, |
174 | }))); |
175 | |
176 | let scheduler = CurrentThread { |
177 | core, |
178 | notify: Notify::new(), |
179 | }; |
180 | |
181 | (scheduler, handle) |
182 | } |
183 | |
184 | #[track_caller ] |
185 | pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output { |
186 | pin!(future); |
187 | |
188 | crate::runtime::context::enter_runtime(handle, false, |blocking| { |
189 | let handle = handle.as_current_thread(); |
190 | |
191 | // Attempt to steal the scheduler core and block_on the future if we can |
192 | // there, otherwise, lets select on a notification that the core is |
193 | // available or the future is complete. |
194 | loop { |
195 | if let Some(core) = self.take_core(handle) { |
196 | handle |
197 | .shared |
198 | .worker_metrics |
199 | .set_thread_id(thread::current().id()); |
200 | return core.block_on(future); |
201 | } else { |
202 | let notified = self.notify.notified(); |
203 | pin!(notified); |
204 | |
205 | if let Some(out) = blocking |
206 | .block_on(poll_fn(|cx| { |
207 | if notified.as_mut().poll(cx).is_ready() { |
208 | return Ready(None); |
209 | } |
210 | |
211 | if let Ready(out) = future.as_mut().poll(cx) { |
212 | return Ready(Some(out)); |
213 | } |
214 | |
215 | Pending |
216 | })) |
217 | .expect("Failed to `Enter::block_on`" ) |
218 | { |
219 | return out; |
220 | } |
221 | } |
222 | } |
223 | }) |
224 | } |
225 | |
226 | fn take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>> { |
227 | let core = self.core.take()?; |
228 | |
229 | Some(CoreGuard { |
230 | context: scheduler::Context::CurrentThread(Context { |
231 | handle: handle.clone(), |
232 | core: RefCell::new(Some(core)), |
233 | defer: Defer::new(), |
234 | }), |
235 | scheduler: self, |
236 | }) |
237 | } |
238 | |
239 | pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) { |
240 | let handle = handle.as_current_thread(); |
241 | |
242 | // Avoid a double panic if we are currently panicking and |
243 | // the lock may be poisoned. |
244 | |
245 | let core = match self.take_core(handle) { |
246 | Some(core) => core, |
247 | None if std::thread::panicking() => return, |
248 | None => panic!("Oh no! We never placed the Core back, this is a bug!" ), |
249 | }; |
250 | |
251 | // Check that the thread-local is not being destroyed |
252 | let tls_available = context::with_current(|_| ()).is_ok(); |
253 | |
254 | if tls_available { |
255 | core.enter(|core, _context| { |
256 | let core = shutdown2(core, handle); |
257 | (core, ()) |
258 | }); |
259 | } else { |
260 | // Shutdown without setting the context. `tokio::spawn` calls will |
261 | // fail, but those will fail either way because the thread-local is |
262 | // not available anymore. |
263 | let context = core.context.expect_current_thread(); |
264 | let core = context.core.borrow_mut().take().unwrap(); |
265 | |
266 | let core = shutdown2(core, handle); |
267 | *context.core.borrow_mut() = Some(core); |
268 | } |
269 | } |
270 | } |
271 | |
272 | fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> { |
273 | // Drain the OwnedTasks collection. This call also closes the |
274 | // collection, ensuring that no tasks are ever pushed after this |
275 | // call returns. |
276 | handle.shared.owned.close_and_shutdown_all(0); |
277 | |
278 | // Drain local queue |
279 | // We already shut down every task, so we just need to drop the task. |
280 | while let Some(task) = core.next_local_task(handle) { |
281 | drop(task); |
282 | } |
283 | |
284 | // Close the injection queue |
285 | handle.shared.inject.close(); |
286 | |
287 | // Drain remote queue |
288 | while let Some(task) = handle.shared.inject.pop() { |
289 | drop(task); |
290 | } |
291 | |
292 | assert!(handle.shared.owned.is_empty()); |
293 | |
294 | // Submit metrics |
295 | core.submit_metrics(handle); |
296 | |
297 | // Shutdown the resource drivers |
298 | if let Some(driver) = core.driver.as_mut() { |
299 | driver.shutdown(&handle.driver); |
300 | } |
301 | |
302 | core |
303 | } |
304 | |
305 | impl fmt::Debug for CurrentThread { |
306 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
307 | fmt.debug_struct(name:"CurrentThread" ).finish() |
308 | } |
309 | } |
310 | |
311 | // ===== impl Core ===== |
312 | |
313 | impl Core { |
314 | /// Get and increment the current tick |
315 | fn tick(&mut self) { |
316 | self.tick = self.tick.wrapping_add(1); |
317 | } |
318 | |
319 | fn next_task(&mut self, handle: &Handle) -> Option<Notified> { |
320 | if self.tick % self.global_queue_interval == 0 { |
321 | handle |
322 | .next_remote_task() |
323 | .or_else(|| self.next_local_task(handle)) |
324 | } else { |
325 | self.next_local_task(handle) |
326 | .or_else(|| handle.next_remote_task()) |
327 | } |
328 | } |
329 | |
330 | fn next_local_task(&mut self, handle: &Handle) -> Option<Notified> { |
331 | let ret = self.tasks.pop_front(); |
332 | handle |
333 | .shared |
334 | .worker_metrics |
335 | .set_queue_depth(self.tasks.len()); |
336 | ret |
337 | } |
338 | |
339 | fn push_task(&mut self, handle: &Handle, task: Notified) { |
340 | self.tasks.push_back(task); |
341 | self.metrics.inc_local_schedule_count(); |
342 | handle |
343 | .shared |
344 | .worker_metrics |
345 | .set_queue_depth(self.tasks.len()); |
346 | } |
347 | |
348 | fn submit_metrics(&mut self, handle: &Handle) { |
349 | self.metrics.submit(&handle.shared.worker_metrics, 0); |
350 | } |
351 | } |
352 | |
353 | #[cfg (tokio_taskdump)] |
354 | fn wake_deferred_tasks_and_free(context: &Context) { |
355 | let wakers = context.defer.take_deferred(); |
356 | for waker in wakers { |
357 | waker.wake(); |
358 | } |
359 | } |
360 | |
361 | // ===== impl Context ===== |
362 | |
363 | impl Context { |
364 | /// Execute the closure with the given scheduler core stored in the |
365 | /// thread-local context. |
366 | fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) { |
367 | core.metrics.start_poll(); |
368 | let mut ret = self.enter(core, || crate::task::coop::budget(f)); |
369 | ret.0.metrics.end_poll(); |
370 | ret |
371 | } |
372 | |
373 | /// Blocks the current thread until an event is received by the driver, |
374 | /// including I/O events, timer events, ... |
375 | fn park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> { |
376 | let mut driver = core.driver.take().expect("driver missing" ); |
377 | |
378 | if let Some(f) = &handle.shared.config.before_park { |
379 | let (c, ()) = self.enter(core, || f()); |
380 | core = c; |
381 | } |
382 | |
383 | // This check will fail if `before_park` spawns a task for us to run |
384 | // instead of parking the thread |
385 | if core.tasks.is_empty() { |
386 | // Park until the thread is signaled |
387 | core.metrics.about_to_park(); |
388 | core.submit_metrics(handle); |
389 | |
390 | let (c, ()) = self.enter(core, || { |
391 | driver.park(&handle.driver); |
392 | self.defer.wake(); |
393 | }); |
394 | |
395 | core = c; |
396 | |
397 | core.metrics.unparked(); |
398 | core.submit_metrics(handle); |
399 | } |
400 | |
401 | if let Some(f) = &handle.shared.config.after_unpark { |
402 | let (c, ()) = self.enter(core, || f()); |
403 | core = c; |
404 | } |
405 | |
406 | core.driver = Some(driver); |
407 | core |
408 | } |
409 | |
410 | /// Checks the driver for new events without blocking the thread. |
411 | fn park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> { |
412 | let mut driver = core.driver.take().expect("driver missing" ); |
413 | |
414 | core.submit_metrics(handle); |
415 | |
416 | let (mut core, ()) = self.enter(core, || { |
417 | driver.park_timeout(&handle.driver, Duration::from_millis(0)); |
418 | self.defer.wake(); |
419 | }); |
420 | |
421 | core.driver = Some(driver); |
422 | core |
423 | } |
424 | |
425 | fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) { |
426 | // Store the scheduler core in the thread-local context |
427 | // |
428 | // A drop-guard is employed at a higher level. |
429 | *self.core.borrow_mut() = Some(core); |
430 | |
431 | // Execute the closure while tracking the execution budget |
432 | let ret = f(); |
433 | |
434 | // Take the scheduler core back |
435 | let core = self.core.borrow_mut().take().expect("core missing" ); |
436 | (core, ret) |
437 | } |
438 | |
439 | pub(crate) fn defer(&self, waker: &Waker) { |
440 | self.defer.defer(waker); |
441 | } |
442 | } |
443 | |
444 | // ===== impl Handle ===== |
445 | |
446 | impl Handle { |
447 | /// Spawns a future onto the `CurrentThread` scheduler |
448 | pub(crate) fn spawn<F>( |
449 | me: &Arc<Self>, |
450 | future: F, |
451 | id: crate::runtime::task::Id, |
452 | ) -> JoinHandle<F::Output> |
453 | where |
454 | F: crate::future::Future + Send + 'static, |
455 | F::Output: Send + 'static, |
456 | { |
457 | let (handle, notified) = me.shared.owned.bind(future, me.clone(), id); |
458 | |
459 | me.task_hooks.spawn(&TaskMeta { |
460 | id, |
461 | _phantom: Default::default(), |
462 | }); |
463 | |
464 | if let Some(notified) = notified { |
465 | me.schedule(notified); |
466 | } |
467 | |
468 | handle |
469 | } |
470 | |
471 | /// Spawn a task which isn't safe to send across thread boundaries onto the runtime. |
472 | /// |
473 | /// # Safety |
474 | /// This should only be used when this is a `LocalRuntime` or in another case where the runtime |
475 | /// provably cannot be driven from or moved to different threads from the one on which the task |
476 | /// is spawned. |
477 | pub(crate) unsafe fn spawn_local<F>( |
478 | me: &Arc<Self>, |
479 | future: F, |
480 | id: crate::runtime::task::Id, |
481 | ) -> JoinHandle<F::Output> |
482 | where |
483 | F: crate::future::Future + 'static, |
484 | F::Output: 'static, |
485 | { |
486 | let (handle, notified) = me.shared.owned.bind_local(future, me.clone(), id); |
487 | |
488 | me.task_hooks.spawn(&TaskMeta { |
489 | id, |
490 | _phantom: Default::default(), |
491 | }); |
492 | |
493 | if let Some(notified) = notified { |
494 | me.schedule(notified); |
495 | } |
496 | |
497 | handle |
498 | } |
499 | |
500 | /// Capture a snapshot of this runtime's state. |
501 | #[cfg (all( |
502 | tokio_unstable, |
503 | tokio_taskdump, |
504 | target_os = "linux" , |
505 | any(target_arch = "aarch64" , target_arch = "x86" , target_arch = "x86_64" ) |
506 | ))] |
507 | pub(crate) fn dump(&self) -> crate::runtime::Dump { |
508 | use crate::runtime::dump; |
509 | use task::trace::trace_current_thread; |
510 | |
511 | let mut traces = vec![]; |
512 | |
513 | // todo: how to make this work outside of a runtime context? |
514 | context::with_scheduler(|maybe_context| { |
515 | // drain the local queue |
516 | let context = if let Some(context) = maybe_context { |
517 | context.expect_current_thread() |
518 | } else { |
519 | return; |
520 | }; |
521 | let mut maybe_core = context.core.borrow_mut(); |
522 | let core = if let Some(core) = maybe_core.as_mut() { |
523 | core |
524 | } else { |
525 | return; |
526 | }; |
527 | let local = &mut core.tasks; |
528 | |
529 | if self.shared.inject.is_closed() { |
530 | return; |
531 | } |
532 | |
533 | traces = trace_current_thread(&self.shared.owned, local, &self.shared.inject) |
534 | .into_iter() |
535 | .map(|(id, trace)| dump::Task::new(id, trace)) |
536 | .collect(); |
537 | |
538 | // Avoid double borrow panic |
539 | drop(maybe_core); |
540 | |
541 | // Taking a taskdump could wakes every task, but we probably don't want |
542 | // the `yield_now` vector to be that large under normal circumstances. |
543 | // Therefore, we free its allocation. |
544 | wake_deferred_tasks_and_free(context); |
545 | }); |
546 | |
547 | dump::Dump::new(traces) |
548 | } |
549 | |
550 | fn next_remote_task(&self) -> Option<Notified> { |
551 | self.shared.inject.pop() |
552 | } |
553 | |
554 | fn waker_ref(me: &Arc<Self>) -> WakerRef<'_> { |
555 | // Set woken to true when enter block_on, ensure outer future |
556 | // be polled for the first time when enter loop |
557 | me.shared.woken.store(true, Release); |
558 | waker_ref(me) |
559 | } |
560 | |
561 | // reset woken to false and return original value |
562 | pub(crate) fn reset_woken(&self) -> bool { |
563 | self.shared.woken.swap(false, AcqRel) |
564 | } |
565 | |
566 | pub(crate) fn num_alive_tasks(&self) -> usize { |
567 | self.shared.owned.num_alive_tasks() |
568 | } |
569 | |
570 | pub(crate) fn injection_queue_depth(&self) -> usize { |
571 | self.shared.inject.len() |
572 | } |
573 | } |
574 | |
575 | cfg_unstable_metrics! { |
576 | impl Handle { |
577 | pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { |
578 | &self.shared.scheduler_metrics |
579 | } |
580 | |
581 | pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { |
582 | assert_eq!(0, worker); |
583 | &self.shared.worker_metrics |
584 | } |
585 | |
586 | pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { |
587 | self.worker_metrics(worker).queue_depth() |
588 | } |
589 | |
590 | pub(crate) fn num_blocking_threads(&self) -> usize { |
591 | self.blocking_spawner.num_threads() |
592 | } |
593 | |
594 | pub(crate) fn num_idle_blocking_threads(&self) -> usize { |
595 | self.blocking_spawner.num_idle_threads() |
596 | } |
597 | |
598 | pub(crate) fn blocking_queue_depth(&self) -> usize { |
599 | self.blocking_spawner.queue_depth() |
600 | } |
601 | |
602 | cfg_64bit_metrics! { |
603 | pub(crate) fn spawned_tasks_count(&self) -> u64 { |
604 | self.shared.owned.spawned_tasks_count() |
605 | } |
606 | } |
607 | } |
608 | } |
609 | |
610 | cfg_unstable! { |
611 | use std::num::NonZeroU64; |
612 | |
613 | impl Handle { |
614 | pub(crate) fn owned_id(&self) -> NonZeroU64 { |
615 | self.shared.owned.id |
616 | } |
617 | } |
618 | } |
619 | |
620 | impl fmt::Debug for Handle { |
621 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
622 | fmt.debug_struct(name:"current_thread::Handle { ... }" ).finish() |
623 | } |
624 | } |
625 | |
626 | // ===== impl Shared ===== |
627 | |
628 | impl Schedule for Arc<Handle> { |
629 | fn release(&self, task: &Task<Self>) -> Option<Task<Self>> { |
630 | self.shared.owned.remove(task) |
631 | } |
632 | |
633 | fn schedule(&self, task: task::Notified<Self>) { |
634 | use scheduler::Context::CurrentThread; |
635 | |
636 | context::with_scheduler(|maybe_cx| match maybe_cx { |
637 | Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => { |
638 | let mut core = cx.core.borrow_mut(); |
639 | |
640 | // If `None`, the runtime is shutting down, so there is no need |
641 | // to schedule the task. |
642 | if let Some(core) = core.as_mut() { |
643 | core.push_task(self, task); |
644 | } |
645 | } |
646 | _ => { |
647 | // Track that a task was scheduled from **outside** of the runtime. |
648 | self.shared.scheduler_metrics.inc_remote_schedule_count(); |
649 | |
650 | // Schedule the task |
651 | self.shared.inject.push(task); |
652 | self.driver.unpark(); |
653 | } |
654 | }); |
655 | } |
656 | |
657 | fn hooks(&self) -> TaskHarnessScheduleHooks { |
658 | TaskHarnessScheduleHooks { |
659 | task_terminate_callback: self.task_hooks.task_terminate_callback.clone(), |
660 | } |
661 | } |
662 | |
663 | cfg_unstable! { |
664 | fn unhandled_panic(&self) { |
665 | use crate::runtime::UnhandledPanic; |
666 | |
667 | match self.shared.config.unhandled_panic { |
668 | UnhandledPanic::Ignore => { |
669 | // Do nothing |
670 | } |
671 | UnhandledPanic::ShutdownRuntime => { |
672 | use scheduler::Context::CurrentThread; |
673 | |
674 | // This hook is only called from within the runtime, so |
675 | // `context::with_scheduler` should match with `&self`, i.e. |
676 | // there is no opportunity for a nested scheduler to be |
677 | // called. |
678 | context::with_scheduler(|maybe_cx| match maybe_cx { |
679 | Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => { |
680 | let mut core = cx.core.borrow_mut(); |
681 | |
682 | // If `None`, the runtime is shutting down, so there is no need to signal shutdown |
683 | if let Some(core) = core.as_mut() { |
684 | core.unhandled_panic = true; |
685 | self.shared.owned.close_and_shutdown_all(0); |
686 | } |
687 | } |
688 | _ => unreachable!("runtime core not set in CURRENT thread-local" ), |
689 | }) |
690 | } |
691 | } |
692 | } |
693 | } |
694 | } |
695 | |
696 | impl Wake for Handle { |
697 | fn wake(arc_self: Arc<Self>) { |
698 | Wake::wake_by_ref(&arc_self); |
699 | } |
700 | |
701 | /// Wake by reference |
702 | fn wake_by_ref(arc_self: &Arc<Self>) { |
703 | arc_self.shared.woken.store(val:true, order:Release); |
704 | arc_self.driver.unpark(); |
705 | } |
706 | } |
707 | |
708 | // ===== CoreGuard ===== |
709 | |
710 | /// Used to ensure we always place the `Core` value back into its slot in |
711 | /// `CurrentThread`, even if the future panics. |
712 | struct CoreGuard<'a> { |
713 | context: scheduler::Context, |
714 | scheduler: &'a CurrentThread, |
715 | } |
716 | |
717 | impl CoreGuard<'_> { |
718 | #[track_caller ] |
719 | fn block_on<F: Future>(self, future: F) -> F::Output { |
720 | let ret = self.enter(|mut core, context| { |
721 | let waker = Handle::waker_ref(&context.handle); |
722 | let mut cx = std::task::Context::from_waker(&waker); |
723 | |
724 | pin!(future); |
725 | |
726 | core.metrics.start_processing_scheduled_tasks(); |
727 | |
728 | 'outer: loop { |
729 | let handle = &context.handle; |
730 | |
731 | if handle.reset_woken() { |
732 | let (c, res) = context.enter(core, || { |
733 | crate::task::coop::budget(|| future.as_mut().poll(&mut cx)) |
734 | }); |
735 | |
736 | core = c; |
737 | |
738 | if let Ready(v) = res { |
739 | return (core, Some(v)); |
740 | } |
741 | } |
742 | |
743 | for _ in 0..handle.shared.config.event_interval { |
744 | // Make sure we didn't hit an unhandled_panic |
745 | if core.unhandled_panic { |
746 | return (core, None); |
747 | } |
748 | |
749 | core.tick(); |
750 | |
751 | let entry = core.next_task(handle); |
752 | |
753 | let task = match entry { |
754 | Some(entry) => entry, |
755 | None => { |
756 | core.metrics.end_processing_scheduled_tasks(); |
757 | |
758 | core = if !context.defer.is_empty() { |
759 | context.park_yield(core, handle) |
760 | } else { |
761 | context.park(core, handle) |
762 | }; |
763 | |
764 | core.metrics.start_processing_scheduled_tasks(); |
765 | |
766 | // Try polling the `block_on` future next |
767 | continue 'outer; |
768 | } |
769 | }; |
770 | |
771 | let task = context.handle.shared.owned.assert_owner(task); |
772 | |
773 | #[cfg (tokio_unstable)] |
774 | let task_id = task.task_id(); |
775 | |
776 | let (c, ()) = context.run_task(core, || { |
777 | #[cfg (tokio_unstable)] |
778 | context.handle.task_hooks.poll_start_callback(task_id); |
779 | |
780 | task.run(); |
781 | |
782 | #[cfg (tokio_unstable)] |
783 | context.handle.task_hooks.poll_stop_callback(task_id); |
784 | }); |
785 | |
786 | core = c; |
787 | } |
788 | |
789 | core.metrics.end_processing_scheduled_tasks(); |
790 | |
791 | // Yield to the driver, this drives the timer and pulls any |
792 | // pending I/O events. |
793 | core = context.park_yield(core, handle); |
794 | |
795 | core.metrics.start_processing_scheduled_tasks(); |
796 | } |
797 | }); |
798 | |
799 | match ret { |
800 | Some(ret) => ret, |
801 | None => { |
802 | // `block_on` panicked. |
803 | panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic" ); |
804 | } |
805 | } |
806 | } |
807 | |
808 | /// Enters the scheduler context. This sets the queue and other necessary |
809 | /// scheduler state in the thread-local. |
810 | fn enter<F, R>(self, f: F) -> R |
811 | where |
812 | F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R), |
813 | { |
814 | let context = self.context.expect_current_thread(); |
815 | |
816 | // Remove `core` from `context` to pass into the closure. |
817 | let core = context.core.borrow_mut().take().expect("core missing" ); |
818 | |
819 | // Call the closure and place `core` back |
820 | let (core, ret) = context::set_scheduler(&self.context, || f(core, context)); |
821 | |
822 | *context.core.borrow_mut() = Some(core); |
823 | |
824 | ret |
825 | } |
826 | } |
827 | |
828 | impl Drop for CoreGuard<'_> { |
829 | fn drop(&mut self) { |
830 | let context: &Context = self.context.expect_current_thread(); |
831 | |
832 | if let Some(core: Box) = context.core.borrow_mut().take() { |
833 | // Replace old scheduler back into the state to allow |
834 | // other threads to pick it up and drive it. |
835 | self.scheduler.core.set(val:core); |
836 | |
837 | // Wake up other possible threads that could steal the driver. |
838 | self.scheduler.notify.notify_one(); |
839 | } |
840 | } |
841 | } |
842 | |