1 | use crate::future::poll_fn; |
2 | use crate::loom::sync::atomic::AtomicBool; |
3 | use crate::loom::sync::Arc; |
4 | use crate::runtime::driver::{self, Driver}; |
5 | use crate::runtime::scheduler::{self, Defer, Inject}; |
6 | use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task}; |
7 | use crate::runtime::{blocking, context, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics}; |
8 | use crate::sync::notify::Notify; |
9 | use crate::util::atomic_cell::AtomicCell; |
10 | use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef}; |
11 | |
12 | use std::cell::RefCell; |
13 | use std::collections::VecDeque; |
14 | use std::fmt; |
15 | use std::future::Future; |
16 | use std::sync::atomic::Ordering::{AcqRel, Release}; |
17 | use std::task::Poll::{Pending, Ready}; |
18 | use std::task::Waker; |
19 | use std::time::Duration; |
20 | |
21 | /// Executes tasks on the current thread |
22 | pub(crate) struct CurrentThread { |
23 | /// Core scheduler data is acquired by a thread entering `block_on`. |
24 | core: AtomicCell<Core>, |
25 | |
26 | /// Notifier for waking up other threads to steal the |
27 | /// driver. |
28 | notify: Notify, |
29 | } |
30 | |
31 | /// Handle to the current thread scheduler |
32 | pub(crate) struct Handle { |
33 | /// Scheduler state shared across threads |
34 | shared: Shared, |
35 | |
36 | /// Resource driver handles |
37 | pub(crate) driver: driver::Handle, |
38 | |
39 | /// Blocking pool spawner |
40 | pub(crate) blocking_spawner: blocking::Spawner, |
41 | |
42 | /// Current random number generator seed |
43 | pub(crate) seed_generator: RngSeedGenerator, |
44 | } |
45 | |
46 | /// Data required for executing the scheduler. The struct is passed around to |
47 | /// a function that will perform the scheduling work and acts as a capability token. |
48 | struct Core { |
49 | /// Scheduler run queue |
50 | tasks: VecDeque<Notified>, |
51 | |
52 | /// Current tick |
53 | tick: u32, |
54 | |
55 | /// Runtime driver |
56 | /// |
57 | /// The driver is removed before starting to park the thread |
58 | driver: Option<Driver>, |
59 | |
60 | /// Metrics batch |
61 | metrics: MetricsBatch, |
62 | |
63 | /// How often to check the global queue |
64 | global_queue_interval: u32, |
65 | |
66 | /// True if a task panicked without being handled and the runtime is |
67 | /// configured to shutdown on unhandled panic. |
68 | unhandled_panic: bool, |
69 | } |
70 | |
71 | /// Scheduler state shared between threads. |
72 | struct Shared { |
73 | /// Remote run queue |
74 | inject: Inject<Arc<Handle>>, |
75 | |
76 | /// Collection of all active tasks spawned onto this executor. |
77 | owned: OwnedTasks<Arc<Handle>>, |
78 | |
79 | /// Indicates whether the blocked on thread was woken. |
80 | woken: AtomicBool, |
81 | |
82 | /// Scheduler configuration options |
83 | config: Config, |
84 | |
85 | /// Keeps track of various runtime metrics. |
86 | scheduler_metrics: SchedulerMetrics, |
87 | |
88 | /// This scheduler only has one worker. |
89 | worker_metrics: WorkerMetrics, |
90 | } |
91 | |
92 | /// Thread-local context. |
93 | /// |
94 | /// pub(crate) to store in `runtime::context`. |
95 | pub(crate) struct Context { |
96 | /// Scheduler handle |
97 | handle: Arc<Handle>, |
98 | |
99 | /// Scheduler core, enabling the holder of `Context` to execute the |
100 | /// scheduler. |
101 | core: RefCell<Option<Box<Core>>>, |
102 | |
103 | /// Deferred tasks, usually ones that called `task::yield_now()`. |
104 | pub(crate) defer: Defer, |
105 | } |
106 | |
107 | type Notified = task::Notified<Arc<Handle>>; |
108 | |
109 | /// Initial queue capacity. |
110 | const INITIAL_CAPACITY: usize = 64; |
111 | |
112 | /// Used if none is specified. This is a temporary constant and will be removed |
113 | /// as we unify tuning logic between the multi-thread and current-thread |
114 | /// schedulers. |
115 | const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 31; |
116 | |
117 | impl CurrentThread { |
118 | pub(crate) fn new( |
119 | driver: Driver, |
120 | driver_handle: driver::Handle, |
121 | blocking_spawner: blocking::Spawner, |
122 | seed_generator: RngSeedGenerator, |
123 | config: Config, |
124 | ) -> (CurrentThread, Arc<Handle>) { |
125 | let worker_metrics = WorkerMetrics::from_config(&config); |
126 | |
127 | // Get the configured global queue interval, or use the default. |
128 | let global_queue_interval = config |
129 | .global_queue_interval |
130 | .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL); |
131 | |
132 | let handle = Arc::new(Handle { |
133 | shared: Shared { |
134 | inject: Inject::new(), |
135 | owned: OwnedTasks::new(1), |
136 | woken: AtomicBool::new(false), |
137 | config, |
138 | scheduler_metrics: SchedulerMetrics::new(), |
139 | worker_metrics, |
140 | }, |
141 | driver: driver_handle, |
142 | blocking_spawner, |
143 | seed_generator, |
144 | }); |
145 | |
146 | let core = AtomicCell::new(Some(Box::new(Core { |
147 | tasks: VecDeque::with_capacity(INITIAL_CAPACITY), |
148 | tick: 0, |
149 | driver: Some(driver), |
150 | metrics: MetricsBatch::new(&handle.shared.worker_metrics), |
151 | global_queue_interval, |
152 | unhandled_panic: false, |
153 | }))); |
154 | |
155 | let scheduler = CurrentThread { |
156 | core, |
157 | notify: Notify::new(), |
158 | }; |
159 | |
160 | (scheduler, handle) |
161 | } |
162 | |
163 | #[track_caller ] |
164 | pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output { |
165 | pin!(future); |
166 | |
167 | crate::runtime::context::enter_runtime(handle, false, |blocking| { |
168 | let handle = handle.as_current_thread(); |
169 | |
170 | // Attempt to steal the scheduler core and block_on the future if we can |
171 | // there, otherwise, lets select on a notification that the core is |
172 | // available or the future is complete. |
173 | loop { |
174 | if let Some(core) = self.take_core(handle) { |
175 | return core.block_on(future); |
176 | } else { |
177 | let notified = self.notify.notified(); |
178 | pin!(notified); |
179 | |
180 | if let Some(out) = blocking |
181 | .block_on(poll_fn(|cx| { |
182 | if notified.as_mut().poll(cx).is_ready() { |
183 | return Ready(None); |
184 | } |
185 | |
186 | if let Ready(out) = future.as_mut().poll(cx) { |
187 | return Ready(Some(out)); |
188 | } |
189 | |
190 | Pending |
191 | })) |
192 | .expect("Failed to `Enter::block_on`" ) |
193 | { |
194 | return out; |
195 | } |
196 | } |
197 | } |
198 | }) |
199 | } |
200 | |
201 | fn take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>> { |
202 | let core = self.core.take()?; |
203 | |
204 | Some(CoreGuard { |
205 | context: scheduler::Context::CurrentThread(Context { |
206 | handle: handle.clone(), |
207 | core: RefCell::new(Some(core)), |
208 | defer: Defer::new(), |
209 | }), |
210 | scheduler: self, |
211 | }) |
212 | } |
213 | |
214 | pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) { |
215 | let handle = handle.as_current_thread(); |
216 | |
217 | // Avoid a double panic if we are currently panicking and |
218 | // the lock may be poisoned. |
219 | |
220 | let core = match self.take_core(handle) { |
221 | Some(core) => core, |
222 | None if std::thread::panicking() => return, |
223 | None => panic!("Oh no! We never placed the Core back, this is a bug!" ), |
224 | }; |
225 | |
226 | // Check that the thread-local is not being destroyed |
227 | let tls_available = context::with_current(|_| ()).is_ok(); |
228 | |
229 | if tls_available { |
230 | core.enter(|core, _context| { |
231 | let core = shutdown2(core, handle); |
232 | (core, ()) |
233 | }); |
234 | } else { |
235 | // Shutdown without setting the context. `tokio::spawn` calls will |
236 | // fail, but those will fail either way because the thread-local is |
237 | // not available anymore. |
238 | let context = core.context.expect_current_thread(); |
239 | let core = context.core.borrow_mut().take().unwrap(); |
240 | |
241 | let core = shutdown2(core, handle); |
242 | *context.core.borrow_mut() = Some(core); |
243 | } |
244 | } |
245 | } |
246 | |
247 | fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> { |
248 | // Drain the OwnedTasks collection. This call also closes the |
249 | // collection, ensuring that no tasks are ever pushed after this |
250 | // call returns. |
251 | handle.shared.owned.close_and_shutdown_all(0); |
252 | |
253 | // Drain local queue |
254 | // We already shut down every task, so we just need to drop the task. |
255 | while let Some(task) = core.next_local_task(handle) { |
256 | drop(task); |
257 | } |
258 | |
259 | // Close the injection queue |
260 | handle.shared.inject.close(); |
261 | |
262 | // Drain remote queue |
263 | while let Some(task) = handle.shared.inject.pop() { |
264 | drop(task); |
265 | } |
266 | |
267 | assert!(handle.shared.owned.is_empty()); |
268 | |
269 | // Submit metrics |
270 | core.submit_metrics(handle); |
271 | |
272 | // Shutdown the resource drivers |
273 | if let Some(driver) = core.driver.as_mut() { |
274 | driver.shutdown(&handle.driver); |
275 | } |
276 | |
277 | core |
278 | } |
279 | |
280 | impl fmt::Debug for CurrentThread { |
281 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
282 | fmt.debug_struct(name:"CurrentThread" ).finish() |
283 | } |
284 | } |
285 | |
286 | // ===== impl Core ===== |
287 | |
288 | impl Core { |
289 | /// Get and increment the current tick |
290 | fn tick(&mut self) { |
291 | self.tick = self.tick.wrapping_add(1); |
292 | } |
293 | |
294 | fn next_task(&mut self, handle: &Handle) -> Option<Notified> { |
295 | if self.tick % self.global_queue_interval == 0 { |
296 | handle |
297 | .next_remote_task() |
298 | .or_else(|| self.next_local_task(handle)) |
299 | } else { |
300 | self.next_local_task(handle) |
301 | .or_else(|| handle.next_remote_task()) |
302 | } |
303 | } |
304 | |
305 | fn next_local_task(&mut self, handle: &Handle) -> Option<Notified> { |
306 | let ret = self.tasks.pop_front(); |
307 | handle |
308 | .shared |
309 | .worker_metrics |
310 | .set_queue_depth(self.tasks.len()); |
311 | ret |
312 | } |
313 | |
314 | fn push_task(&mut self, handle: &Handle, task: Notified) { |
315 | self.tasks.push_back(task); |
316 | self.metrics.inc_local_schedule_count(); |
317 | handle |
318 | .shared |
319 | .worker_metrics |
320 | .set_queue_depth(self.tasks.len()); |
321 | } |
322 | |
323 | fn submit_metrics(&mut self, handle: &Handle) { |
324 | self.metrics.submit(&handle.shared.worker_metrics, 0); |
325 | } |
326 | } |
327 | |
328 | #[cfg (tokio_taskdump)] |
329 | fn wake_deferred_tasks_and_free(context: &Context) { |
330 | let wakers = context.defer.take_deferred(); |
331 | for waker in wakers { |
332 | waker.wake(); |
333 | } |
334 | } |
335 | |
336 | // ===== impl Context ===== |
337 | |
338 | impl Context { |
339 | /// Execute the closure with the given scheduler core stored in the |
340 | /// thread-local context. |
341 | fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) { |
342 | core.metrics.start_poll(); |
343 | let mut ret = self.enter(core, || crate::runtime::coop::budget(f)); |
344 | ret.0.metrics.end_poll(); |
345 | ret |
346 | } |
347 | |
348 | /// Blocks the current thread until an event is received by the driver, |
349 | /// including I/O events, timer events, ... |
350 | fn park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> { |
351 | let mut driver = core.driver.take().expect("driver missing" ); |
352 | |
353 | if let Some(f) = &handle.shared.config.before_park { |
354 | let (c, ()) = self.enter(core, || f()); |
355 | core = c; |
356 | } |
357 | |
358 | // This check will fail if `before_park` spawns a task for us to run |
359 | // instead of parking the thread |
360 | if core.tasks.is_empty() { |
361 | // Park until the thread is signaled |
362 | core.metrics.about_to_park(); |
363 | core.submit_metrics(handle); |
364 | |
365 | let (c, ()) = self.enter(core, || { |
366 | driver.park(&handle.driver); |
367 | self.defer.wake(); |
368 | }); |
369 | |
370 | core = c; |
371 | } |
372 | |
373 | if let Some(f) = &handle.shared.config.after_unpark { |
374 | let (c, ()) = self.enter(core, || f()); |
375 | core = c; |
376 | } |
377 | |
378 | core.driver = Some(driver); |
379 | core |
380 | } |
381 | |
382 | /// Checks the driver for new events without blocking the thread. |
383 | fn park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> { |
384 | let mut driver = core.driver.take().expect("driver missing" ); |
385 | |
386 | core.submit_metrics(handle); |
387 | |
388 | let (mut core, ()) = self.enter(core, || { |
389 | driver.park_timeout(&handle.driver, Duration::from_millis(0)); |
390 | self.defer.wake(); |
391 | }); |
392 | |
393 | core.driver = Some(driver); |
394 | core |
395 | } |
396 | |
397 | fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) { |
398 | // Store the scheduler core in the thread-local context |
399 | // |
400 | // A drop-guard is employed at a higher level. |
401 | *self.core.borrow_mut() = Some(core); |
402 | |
403 | // Execute the closure while tracking the execution budget |
404 | let ret = f(); |
405 | |
406 | // Take the scheduler core back |
407 | let core = self.core.borrow_mut().take().expect("core missing" ); |
408 | (core, ret) |
409 | } |
410 | |
411 | pub(crate) fn defer(&self, waker: &Waker) { |
412 | self.defer.defer(waker); |
413 | } |
414 | } |
415 | |
416 | // ===== impl Handle ===== |
417 | |
418 | impl Handle { |
419 | /// Spawns a future onto the `CurrentThread` scheduler |
420 | pub(crate) fn spawn<F>( |
421 | me: &Arc<Self>, |
422 | future: F, |
423 | id: crate::runtime::task::Id, |
424 | ) -> JoinHandle<F::Output> |
425 | where |
426 | F: crate::future::Future + Send + 'static, |
427 | F::Output: Send + 'static, |
428 | { |
429 | let (handle, notified) = me.shared.owned.bind(future, me.clone(), id); |
430 | |
431 | if let Some(notified) = notified { |
432 | me.schedule(notified); |
433 | } |
434 | |
435 | handle |
436 | } |
437 | |
438 | /// Capture a snapshot of this runtime's state. |
439 | #[cfg (all( |
440 | tokio_unstable, |
441 | tokio_taskdump, |
442 | target_os = "linux" , |
443 | any(target_arch = "aarch64" , target_arch = "x86" , target_arch = "x86_64" ) |
444 | ))] |
445 | pub(crate) fn dump(&self) -> crate::runtime::Dump { |
446 | use crate::runtime::dump; |
447 | use task::trace::trace_current_thread; |
448 | |
449 | let mut traces = vec![]; |
450 | |
451 | // todo: how to make this work outside of a runtime context? |
452 | context::with_scheduler(|maybe_context| { |
453 | // drain the local queue |
454 | let context = if let Some(context) = maybe_context { |
455 | context.expect_current_thread() |
456 | } else { |
457 | return; |
458 | }; |
459 | let mut maybe_core = context.core.borrow_mut(); |
460 | let core = if let Some(core) = maybe_core.as_mut() { |
461 | core |
462 | } else { |
463 | return; |
464 | }; |
465 | let local = &mut core.tasks; |
466 | |
467 | if self.shared.inject.is_closed() { |
468 | return; |
469 | } |
470 | |
471 | traces = trace_current_thread(&self.shared.owned, local, &self.shared.inject) |
472 | .into_iter() |
473 | .map(dump::Task::new) |
474 | .collect(); |
475 | |
476 | // Avoid double borrow panic |
477 | drop(maybe_core); |
478 | |
479 | // Taking a taskdump could wakes every task, but we probably don't want |
480 | // the `yield_now` vector to be that large under normal circumstances. |
481 | // Therefore, we free its allocation. |
482 | wake_deferred_tasks_and_free(context); |
483 | }); |
484 | |
485 | dump::Dump::new(traces) |
486 | } |
487 | |
488 | fn next_remote_task(&self) -> Option<Notified> { |
489 | self.shared.inject.pop() |
490 | } |
491 | |
492 | fn waker_ref(me: &Arc<Self>) -> WakerRef<'_> { |
493 | // Set woken to true when enter block_on, ensure outer future |
494 | // be polled for the first time when enter loop |
495 | me.shared.woken.store(true, Release); |
496 | waker_ref(me) |
497 | } |
498 | |
499 | // reset woken to false and return original value |
500 | pub(crate) fn reset_woken(&self) -> bool { |
501 | self.shared.woken.swap(false, AcqRel) |
502 | } |
503 | } |
504 | |
505 | cfg_metrics! { |
506 | impl Handle { |
507 | pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { |
508 | &self.shared.scheduler_metrics |
509 | } |
510 | |
511 | pub(crate) fn injection_queue_depth(&self) -> usize { |
512 | self.shared.inject.len() |
513 | } |
514 | |
515 | pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { |
516 | assert_eq!(0, worker); |
517 | &self.shared.worker_metrics |
518 | } |
519 | |
520 | pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { |
521 | self.worker_metrics(worker).queue_depth() |
522 | } |
523 | |
524 | pub(crate) fn num_blocking_threads(&self) -> usize { |
525 | self.blocking_spawner.num_threads() |
526 | } |
527 | |
528 | pub(crate) fn num_idle_blocking_threads(&self) -> usize { |
529 | self.blocking_spawner.num_idle_threads() |
530 | } |
531 | |
532 | pub(crate) fn blocking_queue_depth(&self) -> usize { |
533 | self.blocking_spawner.queue_depth() |
534 | } |
535 | |
536 | pub(crate) fn active_tasks_count(&self) -> usize { |
537 | self.shared.owned.active_tasks_count() |
538 | } |
539 | } |
540 | } |
541 | |
542 | cfg_unstable! { |
543 | use std::num::NonZeroU64; |
544 | |
545 | impl Handle { |
546 | pub(crate) fn owned_id(&self) -> NonZeroU64 { |
547 | self.shared.owned.id |
548 | } |
549 | } |
550 | } |
551 | |
552 | impl fmt::Debug for Handle { |
553 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
554 | fmt.debug_struct(name:"current_thread::Handle { ... }" ).finish() |
555 | } |
556 | } |
557 | |
558 | // ===== impl Shared ===== |
559 | |
560 | impl Schedule for Arc<Handle> { |
561 | fn release(&self, task: &Task<Self>) -> Option<Task<Self>> { |
562 | self.shared.owned.remove(task) |
563 | } |
564 | |
565 | fn schedule(&self, task: task::Notified<Self>) { |
566 | use scheduler::Context::CurrentThread; |
567 | |
568 | context::with_scheduler(|maybe_cx| match maybe_cx { |
569 | Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => { |
570 | let mut core = cx.core.borrow_mut(); |
571 | |
572 | // If `None`, the runtime is shutting down, so there is no need |
573 | // to schedule the task. |
574 | if let Some(core) = core.as_mut() { |
575 | core.push_task(self, task); |
576 | } |
577 | } |
578 | _ => { |
579 | // Track that a task was scheduled from **outside** of the runtime. |
580 | self.shared.scheduler_metrics.inc_remote_schedule_count(); |
581 | |
582 | // Schedule the task |
583 | self.shared.inject.push(task); |
584 | self.driver.unpark(); |
585 | } |
586 | }); |
587 | } |
588 | |
589 | cfg_unstable! { |
590 | fn unhandled_panic(&self) { |
591 | use crate::runtime::UnhandledPanic; |
592 | |
593 | match self.shared.config.unhandled_panic { |
594 | UnhandledPanic::Ignore => { |
595 | // Do nothing |
596 | } |
597 | UnhandledPanic::ShutdownRuntime => { |
598 | use scheduler::Context::CurrentThread; |
599 | |
600 | // This hook is only called from within the runtime, so |
601 | // `context::with_scheduler` should match with `&self`, i.e. |
602 | // there is no opportunity for a nested scheduler to be |
603 | // called. |
604 | context::with_scheduler(|maybe_cx| match maybe_cx { |
605 | Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => { |
606 | let mut core = cx.core.borrow_mut(); |
607 | |
608 | // If `None`, the runtime is shutting down, so there is no need to signal shutdown |
609 | if let Some(core) = core.as_mut() { |
610 | core.unhandled_panic = true; |
611 | self.shared.owned.close_and_shutdown_all(0); |
612 | } |
613 | } |
614 | _ => unreachable!("runtime core not set in CURRENT thread-local" ), |
615 | }) |
616 | } |
617 | } |
618 | } |
619 | } |
620 | } |
621 | |
622 | impl Wake for Handle { |
623 | fn wake(arc_self: Arc<Self>) { |
624 | Wake::wake_by_ref(&arc_self); |
625 | } |
626 | |
627 | /// Wake by reference |
628 | fn wake_by_ref(arc_self: &Arc<Self>) { |
629 | arc_self.shared.woken.store(val:true, order:Release); |
630 | arc_self.driver.unpark(); |
631 | } |
632 | } |
633 | |
634 | // ===== CoreGuard ===== |
635 | |
636 | /// Used to ensure we always place the `Core` value back into its slot in |
637 | /// `CurrentThread`, even if the future panics. |
638 | struct CoreGuard<'a> { |
639 | context: scheduler::Context, |
640 | scheduler: &'a CurrentThread, |
641 | } |
642 | |
643 | impl CoreGuard<'_> { |
644 | #[track_caller ] |
645 | fn block_on<F: Future>(self, future: F) -> F::Output { |
646 | let ret = self.enter(|mut core, context| { |
647 | let waker = Handle::waker_ref(&context.handle); |
648 | let mut cx = std::task::Context::from_waker(&waker); |
649 | |
650 | pin!(future); |
651 | |
652 | core.metrics.start_processing_scheduled_tasks(); |
653 | |
654 | 'outer: loop { |
655 | let handle = &context.handle; |
656 | |
657 | if handle.reset_woken() { |
658 | let (c, res) = context.enter(core, || { |
659 | crate::runtime::coop::budget(|| future.as_mut().poll(&mut cx)) |
660 | }); |
661 | |
662 | core = c; |
663 | |
664 | if let Ready(v) = res { |
665 | return (core, Some(v)); |
666 | } |
667 | } |
668 | |
669 | for _ in 0..handle.shared.config.event_interval { |
670 | // Make sure we didn't hit an unhandled_panic |
671 | if core.unhandled_panic { |
672 | return (core, None); |
673 | } |
674 | |
675 | core.tick(); |
676 | |
677 | let entry = core.next_task(handle); |
678 | |
679 | let task = match entry { |
680 | Some(entry) => entry, |
681 | None => { |
682 | core.metrics.end_processing_scheduled_tasks(); |
683 | |
684 | core = if !context.defer.is_empty() { |
685 | context.park_yield(core, handle) |
686 | } else { |
687 | context.park(core, handle) |
688 | }; |
689 | |
690 | core.metrics.start_processing_scheduled_tasks(); |
691 | |
692 | // Try polling the `block_on` future next |
693 | continue 'outer; |
694 | } |
695 | }; |
696 | |
697 | let task = context.handle.shared.owned.assert_owner(task); |
698 | |
699 | let (c, ()) = context.run_task(core, || { |
700 | task.run(); |
701 | }); |
702 | |
703 | core = c; |
704 | } |
705 | |
706 | core.metrics.end_processing_scheduled_tasks(); |
707 | |
708 | // Yield to the driver, this drives the timer and pulls any |
709 | // pending I/O events. |
710 | core = context.park_yield(core, handle); |
711 | |
712 | core.metrics.start_processing_scheduled_tasks(); |
713 | } |
714 | }); |
715 | |
716 | match ret { |
717 | Some(ret) => ret, |
718 | None => { |
719 | // `block_on` panicked. |
720 | panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic" ); |
721 | } |
722 | } |
723 | } |
724 | |
725 | /// Enters the scheduler context. This sets the queue and other necessary |
726 | /// scheduler state in the thread-local. |
727 | fn enter<F, R>(self, f: F) -> R |
728 | where |
729 | F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R), |
730 | { |
731 | let context = self.context.expect_current_thread(); |
732 | |
733 | // Remove `core` from `context` to pass into the closure. |
734 | let core = context.core.borrow_mut().take().expect("core missing" ); |
735 | |
736 | // Call the closure and place `core` back |
737 | let (core, ret) = context::set_scheduler(&self.context, || f(core, context)); |
738 | |
739 | *context.core.borrow_mut() = Some(core); |
740 | |
741 | ret |
742 | } |
743 | } |
744 | |
745 | impl Drop for CoreGuard<'_> { |
746 | fn drop(&mut self) { |
747 | let context: &Context = self.context.expect_current_thread(); |
748 | |
749 | if let Some(core: Box) = context.core.borrow_mut().take() { |
750 | // Replace old scheduler back into the state to allow |
751 | // other threads to pick it up and drive it. |
752 | self.scheduler.core.set(val:core); |
753 | |
754 | // Wake up other possible threads that could steal the driver. |
755 | self.scheduler.notify.notify_one(); |
756 | } |
757 | } |
758 | } |
759 | |