1 | use crate::future::poll_fn; |
2 | use crate::loom::sync::atomic::AtomicBool; |
3 | use crate::loom::sync::Arc; |
4 | use crate::runtime::driver::{self, Driver}; |
5 | use crate::runtime::scheduler::{self, Defer, Inject}; |
6 | use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task}; |
7 | use crate::runtime::{blocking, context, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics}; |
8 | use crate::sync::notify::Notify; |
9 | use crate::util::atomic_cell::AtomicCell; |
10 | use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef}; |
11 | |
12 | use std::cell::RefCell; |
13 | use std::collections::VecDeque; |
14 | use std::fmt; |
15 | use std::future::Future; |
16 | use std::sync::atomic::Ordering::{AcqRel, Release}; |
17 | use std::task::Poll::{Pending, Ready}; |
18 | use std::task::Waker; |
19 | use std::time::Duration; |
20 | |
21 | /// Executes tasks on the current thread |
22 | pub(crate) struct CurrentThread { |
23 | /// Core scheduler data is acquired by a thread entering `block_on`. |
24 | core: AtomicCell<Core>, |
25 | |
26 | /// Notifier for waking up other threads to steal the |
27 | /// driver. |
28 | notify: Notify, |
29 | } |
30 | |
31 | /// Handle to the current thread scheduler |
32 | pub(crate) struct Handle { |
33 | /// Scheduler state shared across threads |
34 | shared: Shared, |
35 | |
36 | /// Resource driver handles |
37 | pub(crate) driver: driver::Handle, |
38 | |
39 | /// Blocking pool spawner |
40 | pub(crate) blocking_spawner: blocking::Spawner, |
41 | |
42 | /// Current random number generator seed |
43 | pub(crate) seed_generator: RngSeedGenerator, |
44 | } |
45 | |
46 | /// Data required for executing the scheduler. The struct is passed around to |
47 | /// a function that will perform the scheduling work and acts as a capability token. |
48 | struct Core { |
49 | /// Scheduler run queue |
50 | tasks: VecDeque<Notified>, |
51 | |
52 | /// Current tick |
53 | tick: u32, |
54 | |
55 | /// Runtime driver |
56 | /// |
57 | /// The driver is removed before starting to park the thread |
58 | driver: Option<Driver>, |
59 | |
60 | /// Metrics batch |
61 | metrics: MetricsBatch, |
62 | |
63 | /// How often to check the global queue |
64 | global_queue_interval: u32, |
65 | |
66 | /// True if a task panicked without being handled and the runtime is |
67 | /// configured to shutdown on unhandled panic. |
68 | unhandled_panic: bool, |
69 | } |
70 | |
71 | /// Scheduler state shared between threads. |
72 | struct Shared { |
73 | /// Remote run queue |
74 | inject: Inject<Arc<Handle>>, |
75 | |
76 | /// Collection of all active tasks spawned onto this executor. |
77 | owned: OwnedTasks<Arc<Handle>>, |
78 | |
79 | /// Indicates whether the blocked on thread was woken. |
80 | woken: AtomicBool, |
81 | |
82 | /// Scheduler configuration options |
83 | config: Config, |
84 | |
85 | /// Keeps track of various runtime metrics. |
86 | scheduler_metrics: SchedulerMetrics, |
87 | |
88 | /// This scheduler only has one worker. |
89 | worker_metrics: WorkerMetrics, |
90 | } |
91 | |
92 | /// Thread-local context. |
93 | /// |
94 | /// pub(crate) to store in `runtime::context`. |
95 | pub(crate) struct Context { |
96 | /// Scheduler handle |
97 | handle: Arc<Handle>, |
98 | |
99 | /// Scheduler core, enabling the holder of `Context` to execute the |
100 | /// scheduler. |
101 | core: RefCell<Option<Box<Core>>>, |
102 | |
103 | /// Deferred tasks, usually ones that called `task::yield_now()`. |
104 | pub(crate) defer: Defer, |
105 | } |
106 | |
107 | type Notified = task::Notified<Arc<Handle>>; |
108 | |
109 | /// Initial queue capacity. |
110 | const INITIAL_CAPACITY: usize = 64; |
111 | |
112 | /// Used if none is specified. This is a temporary constant and will be removed |
113 | /// as we unify tuning logic between the multi-thread and current-thread |
114 | /// schedulers. |
115 | const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 31; |
116 | |
117 | impl CurrentThread { |
118 | pub(crate) fn new( |
119 | driver: Driver, |
120 | driver_handle: driver::Handle, |
121 | blocking_spawner: blocking::Spawner, |
122 | seed_generator: RngSeedGenerator, |
123 | config: Config, |
124 | ) -> (CurrentThread, Arc<Handle>) { |
125 | let worker_metrics = WorkerMetrics::from_config(&config); |
126 | |
127 | // Get the configured global queue interval, or use the default. |
128 | let global_queue_interval = config |
129 | .global_queue_interval |
130 | .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL); |
131 | |
132 | let handle = Arc::new(Handle { |
133 | shared: Shared { |
134 | inject: Inject::new(), |
135 | owned: OwnedTasks::new(), |
136 | woken: AtomicBool::new(false), |
137 | config, |
138 | scheduler_metrics: SchedulerMetrics::new(), |
139 | worker_metrics, |
140 | }, |
141 | driver: driver_handle, |
142 | blocking_spawner, |
143 | seed_generator, |
144 | }); |
145 | |
146 | let core = AtomicCell::new(Some(Box::new(Core { |
147 | tasks: VecDeque::with_capacity(INITIAL_CAPACITY), |
148 | tick: 0, |
149 | driver: Some(driver), |
150 | metrics: MetricsBatch::new(&handle.shared.worker_metrics), |
151 | global_queue_interval, |
152 | unhandled_panic: false, |
153 | }))); |
154 | |
155 | let scheduler = CurrentThread { |
156 | core, |
157 | notify: Notify::new(), |
158 | }; |
159 | |
160 | (scheduler, handle) |
161 | } |
162 | |
163 | #[track_caller ] |
164 | pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output { |
165 | pin!(future); |
166 | |
167 | crate::runtime::context::enter_runtime(handle, false, |blocking| { |
168 | let handle = handle.as_current_thread(); |
169 | |
170 | // Attempt to steal the scheduler core and block_on the future if we can |
171 | // there, otherwise, lets select on a notification that the core is |
172 | // available or the future is complete. |
173 | loop { |
174 | if let Some(core) = self.take_core(handle) { |
175 | return core.block_on(future); |
176 | } else { |
177 | let notified = self.notify.notified(); |
178 | pin!(notified); |
179 | |
180 | if let Some(out) = blocking |
181 | .block_on(poll_fn(|cx| { |
182 | if notified.as_mut().poll(cx).is_ready() { |
183 | return Ready(None); |
184 | } |
185 | |
186 | if let Ready(out) = future.as_mut().poll(cx) { |
187 | return Ready(Some(out)); |
188 | } |
189 | |
190 | Pending |
191 | })) |
192 | .expect("Failed to `Enter::block_on`" ) |
193 | { |
194 | return out; |
195 | } |
196 | } |
197 | } |
198 | }) |
199 | } |
200 | |
201 | fn take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>> { |
202 | let core = self.core.take()?; |
203 | |
204 | Some(CoreGuard { |
205 | context: scheduler::Context::CurrentThread(Context { |
206 | handle: handle.clone(), |
207 | core: RefCell::new(Some(core)), |
208 | defer: Defer::new(), |
209 | }), |
210 | scheduler: self, |
211 | }) |
212 | } |
213 | |
214 | pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) { |
215 | let handle = handle.as_current_thread(); |
216 | |
217 | // Avoid a double panic if we are currently panicking and |
218 | // the lock may be poisoned. |
219 | |
220 | let core = match self.take_core(handle) { |
221 | Some(core) => core, |
222 | None if std::thread::panicking() => return, |
223 | None => panic!("Oh no! We never placed the Core back, this is a bug!" ), |
224 | }; |
225 | |
226 | // Check that the thread-local is not being destroyed |
227 | let tls_available = context::with_current(|_| ()).is_ok(); |
228 | |
229 | if tls_available { |
230 | core.enter(|core, _context| { |
231 | let core = shutdown2(core, handle); |
232 | (core, ()) |
233 | }); |
234 | } else { |
235 | // Shutdown without setting the context. `tokio::spawn` calls will |
236 | // fail, but those will fail either way because the thread-local is |
237 | // not available anymore. |
238 | let context = core.context.expect_current_thread(); |
239 | let core = context.core.borrow_mut().take().unwrap(); |
240 | |
241 | let core = shutdown2(core, handle); |
242 | *context.core.borrow_mut() = Some(core); |
243 | } |
244 | } |
245 | } |
246 | |
247 | fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> { |
248 | // Drain the OwnedTasks collection. This call also closes the |
249 | // collection, ensuring that no tasks are ever pushed after this |
250 | // call returns. |
251 | handle.shared.owned.close_and_shutdown_all(); |
252 | |
253 | // Drain local queue |
254 | // We already shut down every task, so we just need to drop the task. |
255 | while let Some(task) = core.next_local_task(handle) { |
256 | drop(task); |
257 | } |
258 | |
259 | // Close the injection queue |
260 | handle.shared.inject.close(); |
261 | |
262 | // Drain remote queue |
263 | while let Some(task) = handle.shared.inject.pop() { |
264 | drop(task); |
265 | } |
266 | |
267 | assert!(handle.shared.owned.is_empty()); |
268 | |
269 | // Submit metrics |
270 | core.submit_metrics(handle); |
271 | |
272 | // Shutdown the resource drivers |
273 | if let Some(driver) = core.driver.as_mut() { |
274 | driver.shutdown(&handle.driver); |
275 | } |
276 | |
277 | core |
278 | } |
279 | |
280 | impl fmt::Debug for CurrentThread { |
281 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
282 | fmt.debug_struct(name:"CurrentThread" ).finish() |
283 | } |
284 | } |
285 | |
286 | // ===== impl Core ===== |
287 | |
288 | impl Core { |
289 | /// Get and increment the current tick |
290 | fn tick(&mut self) { |
291 | self.tick = self.tick.wrapping_add(1); |
292 | } |
293 | |
294 | fn next_task(&mut self, handle: &Handle) -> Option<Notified> { |
295 | if self.tick % self.global_queue_interval == 0 { |
296 | handle |
297 | .next_remote_task() |
298 | .or_else(|| self.next_local_task(handle)) |
299 | } else { |
300 | self.next_local_task(handle) |
301 | .or_else(|| handle.next_remote_task()) |
302 | } |
303 | } |
304 | |
305 | fn next_local_task(&mut self, handle: &Handle) -> Option<Notified> { |
306 | let ret = self.tasks.pop_front(); |
307 | handle |
308 | .shared |
309 | .worker_metrics |
310 | .set_queue_depth(self.tasks.len()); |
311 | ret |
312 | } |
313 | |
314 | fn push_task(&mut self, handle: &Handle, task: Notified) { |
315 | self.tasks.push_back(task); |
316 | self.metrics.inc_local_schedule_count(); |
317 | handle |
318 | .shared |
319 | .worker_metrics |
320 | .set_queue_depth(self.tasks.len()); |
321 | } |
322 | |
323 | fn submit_metrics(&mut self, handle: &Handle) { |
324 | self.metrics.submit(&handle.shared.worker_metrics); |
325 | } |
326 | } |
327 | |
328 | #[cfg (tokio_taskdump)] |
329 | fn wake_deferred_tasks_and_free(context: &Context) { |
330 | let wakers = context.defer.take_deferred(); |
331 | for waker in wakers { |
332 | waker.wake(); |
333 | } |
334 | } |
335 | |
336 | // ===== impl Context ===== |
337 | |
338 | impl Context { |
339 | /// Execute the closure with the given scheduler core stored in the |
340 | /// thread-local context. |
341 | fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) { |
342 | core.metrics.start_poll(); |
343 | let mut ret = self.enter(core, || crate::runtime::coop::budget(f)); |
344 | ret.0.metrics.end_poll(); |
345 | ret |
346 | } |
347 | |
348 | /// Blocks the current thread until an event is received by the driver, |
349 | /// including I/O events, timer events, ... |
350 | fn park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> { |
351 | let mut driver = core.driver.take().expect("driver missing" ); |
352 | |
353 | if let Some(f) = &handle.shared.config.before_park { |
354 | // Incorrect lint, the closures are actually different types so `f` |
355 | // cannot be passed as an argument to `enter`. |
356 | #[allow (clippy::redundant_closure)] |
357 | let (c, _) = self.enter(core, || f()); |
358 | core = c; |
359 | } |
360 | |
361 | // This check will fail if `before_park` spawns a task for us to run |
362 | // instead of parking the thread |
363 | if core.tasks.is_empty() { |
364 | // Park until the thread is signaled |
365 | core.metrics.about_to_park(); |
366 | core.submit_metrics(handle); |
367 | |
368 | let (c, _) = self.enter(core, || { |
369 | driver.park(&handle.driver); |
370 | self.defer.wake(); |
371 | }); |
372 | |
373 | core = c; |
374 | } |
375 | |
376 | if let Some(f) = &handle.shared.config.after_unpark { |
377 | // Incorrect lint, the closures are actually different types so `f` |
378 | // cannot be passed as an argument to `enter`. |
379 | #[allow (clippy::redundant_closure)] |
380 | let (c, _) = self.enter(core, || f()); |
381 | core = c; |
382 | } |
383 | |
384 | core.driver = Some(driver); |
385 | core |
386 | } |
387 | |
388 | /// Checks the driver for new events without blocking the thread. |
389 | fn park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> { |
390 | let mut driver = core.driver.take().expect("driver missing" ); |
391 | |
392 | core.submit_metrics(handle); |
393 | |
394 | let (mut core, _) = self.enter(core, || { |
395 | driver.park_timeout(&handle.driver, Duration::from_millis(0)); |
396 | self.defer.wake(); |
397 | }); |
398 | |
399 | core.driver = Some(driver); |
400 | core |
401 | } |
402 | |
403 | fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) { |
404 | // Store the scheduler core in the thread-local context |
405 | // |
406 | // A drop-guard is employed at a higher level. |
407 | *self.core.borrow_mut() = Some(core); |
408 | |
409 | // Execute the closure while tracking the execution budget |
410 | let ret = f(); |
411 | |
412 | // Take the scheduler core back |
413 | let core = self.core.borrow_mut().take().expect("core missing" ); |
414 | (core, ret) |
415 | } |
416 | |
417 | pub(crate) fn defer(&self, waker: &Waker) { |
418 | self.defer.defer(waker); |
419 | } |
420 | } |
421 | |
422 | // ===== impl Handle ===== |
423 | |
424 | impl Handle { |
425 | /// Spawns a future onto the `CurrentThread` scheduler |
426 | pub(crate) fn spawn<F>( |
427 | me: &Arc<Self>, |
428 | future: F, |
429 | id: crate::runtime::task::Id, |
430 | ) -> JoinHandle<F::Output> |
431 | where |
432 | F: crate::future::Future + Send + 'static, |
433 | F::Output: Send + 'static, |
434 | { |
435 | let (handle, notified) = me.shared.owned.bind(future, me.clone(), id); |
436 | |
437 | if let Some(notified) = notified { |
438 | me.schedule(notified); |
439 | } |
440 | |
441 | handle |
442 | } |
443 | |
444 | /// Capture a snapshot of this runtime's state. |
445 | #[cfg (all( |
446 | tokio_unstable, |
447 | tokio_taskdump, |
448 | target_os = "linux" , |
449 | any(target_arch = "aarch64" , target_arch = "x86" , target_arch = "x86_64" ) |
450 | ))] |
451 | pub(crate) fn dump(&self) -> crate::runtime::Dump { |
452 | use crate::runtime::dump; |
453 | use task::trace::trace_current_thread; |
454 | |
455 | let mut traces = vec![]; |
456 | |
457 | // todo: how to make this work outside of a runtime context? |
458 | context::with_scheduler(|maybe_context| { |
459 | // drain the local queue |
460 | let context = if let Some(context) = maybe_context { |
461 | context.expect_current_thread() |
462 | } else { |
463 | return; |
464 | }; |
465 | let mut maybe_core = context.core.borrow_mut(); |
466 | let core = if let Some(core) = maybe_core.as_mut() { |
467 | core |
468 | } else { |
469 | return; |
470 | }; |
471 | let local = &mut core.tasks; |
472 | |
473 | if self.shared.inject.is_closed() { |
474 | return; |
475 | } |
476 | |
477 | traces = trace_current_thread(&self.shared.owned, local, &self.shared.inject) |
478 | .into_iter() |
479 | .map(dump::Task::new) |
480 | .collect(); |
481 | |
482 | // Avoid double borrow panic |
483 | drop(maybe_core); |
484 | |
485 | // Taking a taskdump could wakes every task, but we probably don't want |
486 | // the `yield_now` vector to be that large under normal circumstances. |
487 | // Therefore, we free its allocation. |
488 | wake_deferred_tasks_and_free(context); |
489 | }); |
490 | |
491 | dump::Dump::new(traces) |
492 | } |
493 | |
494 | fn next_remote_task(&self) -> Option<Notified> { |
495 | self.shared.inject.pop() |
496 | } |
497 | |
498 | fn waker_ref(me: &Arc<Self>) -> WakerRef<'_> { |
499 | // Set woken to true when enter block_on, ensure outer future |
500 | // be polled for the first time when enter loop |
501 | me.shared.woken.store(true, Release); |
502 | waker_ref(me) |
503 | } |
504 | |
505 | // reset woken to false and return original value |
506 | pub(crate) fn reset_woken(&self) -> bool { |
507 | self.shared.woken.swap(false, AcqRel) |
508 | } |
509 | } |
510 | |
511 | cfg_metrics! { |
512 | impl Handle { |
513 | pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { |
514 | &self.shared.scheduler_metrics |
515 | } |
516 | |
517 | pub(crate) fn injection_queue_depth(&self) -> usize { |
518 | self.shared.inject.len() |
519 | } |
520 | |
521 | pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { |
522 | assert_eq!(0, worker); |
523 | &self.shared.worker_metrics |
524 | } |
525 | |
526 | pub(crate) fn num_blocking_threads(&self) -> usize { |
527 | self.blocking_spawner.num_threads() |
528 | } |
529 | |
530 | pub(crate) fn num_idle_blocking_threads(&self) -> usize { |
531 | self.blocking_spawner.num_idle_threads() |
532 | } |
533 | |
534 | pub(crate) fn blocking_queue_depth(&self) -> usize { |
535 | self.blocking_spawner.queue_depth() |
536 | } |
537 | |
538 | pub(crate) fn active_tasks_count(&self) -> usize { |
539 | self.shared.owned.active_tasks_count() |
540 | } |
541 | } |
542 | } |
543 | |
544 | impl fmt::Debug for Handle { |
545 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
546 | fmt.debug_struct(name:"current_thread::Handle { ... }" ).finish() |
547 | } |
548 | } |
549 | |
550 | // ===== impl Shared ===== |
551 | |
552 | impl Schedule for Arc<Handle> { |
553 | fn release(&self, task: &Task<Self>) -> Option<Task<Self>> { |
554 | self.shared.owned.remove(task) |
555 | } |
556 | |
557 | fn schedule(&self, task: task::Notified<Self>) { |
558 | use scheduler::Context::CurrentThread; |
559 | |
560 | context::with_scheduler(|maybe_cx| match maybe_cx { |
561 | Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => { |
562 | let mut core = cx.core.borrow_mut(); |
563 | |
564 | // If `None`, the runtime is shutting down, so there is no need |
565 | // to schedule the task. |
566 | if let Some(core) = core.as_mut() { |
567 | core.push_task(self, task); |
568 | } |
569 | } |
570 | _ => { |
571 | // Track that a task was scheduled from **outside** of the runtime. |
572 | self.shared.scheduler_metrics.inc_remote_schedule_count(); |
573 | |
574 | // Schedule the task |
575 | self.shared.inject.push(task); |
576 | self.driver.unpark(); |
577 | } |
578 | }); |
579 | } |
580 | |
581 | cfg_unstable! { |
582 | fn unhandled_panic(&self) { |
583 | use crate::runtime::UnhandledPanic; |
584 | |
585 | match self.shared.config.unhandled_panic { |
586 | UnhandledPanic::Ignore => { |
587 | // Do nothing |
588 | } |
589 | UnhandledPanic::ShutdownRuntime => { |
590 | use scheduler::Context::CurrentThread; |
591 | |
592 | // This hook is only called from within the runtime, so |
593 | // `context::with_scheduler` should match with `&self`, i.e. |
594 | // there is no opportunity for a nested scheduler to be |
595 | // called. |
596 | context::with_scheduler(|maybe_cx| match maybe_cx { |
597 | Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => { |
598 | let mut core = cx.core.borrow_mut(); |
599 | |
600 | // If `None`, the runtime is shutting down, so there is no need to signal shutdown |
601 | if let Some(core) = core.as_mut() { |
602 | core.unhandled_panic = true; |
603 | self.shared.owned.close_and_shutdown_all(); |
604 | } |
605 | } |
606 | _ => unreachable!("runtime core not set in CURRENT thread-local" ), |
607 | }) |
608 | } |
609 | } |
610 | } |
611 | } |
612 | } |
613 | |
614 | impl Wake for Handle { |
615 | fn wake(arc_self: Arc<Self>) { |
616 | Wake::wake_by_ref(&arc_self) |
617 | } |
618 | |
619 | /// Wake by reference |
620 | fn wake_by_ref(arc_self: &Arc<Self>) { |
621 | arc_self.shared.woken.store(val:true, order:Release); |
622 | arc_self.driver.unpark(); |
623 | } |
624 | } |
625 | |
626 | // ===== CoreGuard ===== |
627 | |
628 | /// Used to ensure we always place the `Core` value back into its slot in |
629 | /// `CurrentThread`, even if the future panics. |
630 | struct CoreGuard<'a> { |
631 | context: scheduler::Context, |
632 | scheduler: &'a CurrentThread, |
633 | } |
634 | |
635 | impl CoreGuard<'_> { |
636 | #[track_caller ] |
637 | fn block_on<F: Future>(self, future: F) -> F::Output { |
638 | let ret = self.enter(|mut core, context| { |
639 | let waker = Handle::waker_ref(&context.handle); |
640 | let mut cx = std::task::Context::from_waker(&waker); |
641 | |
642 | pin!(future); |
643 | |
644 | core.metrics.start_processing_scheduled_tasks(); |
645 | |
646 | 'outer: loop { |
647 | let handle = &context.handle; |
648 | |
649 | if handle.reset_woken() { |
650 | let (c, res) = context.enter(core, || { |
651 | crate::runtime::coop::budget(|| future.as_mut().poll(&mut cx)) |
652 | }); |
653 | |
654 | core = c; |
655 | |
656 | if let Ready(v) = res { |
657 | return (core, Some(v)); |
658 | } |
659 | } |
660 | |
661 | for _ in 0..handle.shared.config.event_interval { |
662 | // Make sure we didn't hit an unhandled_panic |
663 | if core.unhandled_panic { |
664 | return (core, None); |
665 | } |
666 | |
667 | core.tick(); |
668 | |
669 | let entry = core.next_task(handle); |
670 | |
671 | let task = match entry { |
672 | Some(entry) => entry, |
673 | None => { |
674 | core.metrics.end_processing_scheduled_tasks(); |
675 | |
676 | core = if !context.defer.is_empty() { |
677 | context.park_yield(core, handle) |
678 | } else { |
679 | context.park(core, handle) |
680 | }; |
681 | |
682 | core.metrics.start_processing_scheduled_tasks(); |
683 | |
684 | // Try polling the `block_on` future next |
685 | continue 'outer; |
686 | } |
687 | }; |
688 | |
689 | let task = context.handle.shared.owned.assert_owner(task); |
690 | |
691 | let (c, _) = context.run_task(core, || { |
692 | task.run(); |
693 | }); |
694 | |
695 | core = c; |
696 | } |
697 | |
698 | core.metrics.end_processing_scheduled_tasks(); |
699 | |
700 | // Yield to the driver, this drives the timer and pulls any |
701 | // pending I/O events. |
702 | core = context.park_yield(core, handle); |
703 | |
704 | core.metrics.start_processing_scheduled_tasks(); |
705 | } |
706 | }); |
707 | |
708 | match ret { |
709 | Some(ret) => ret, |
710 | None => { |
711 | // `block_on` panicked. |
712 | panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic" ); |
713 | } |
714 | } |
715 | } |
716 | |
717 | /// Enters the scheduler context. This sets the queue and other necessary |
718 | /// scheduler state in the thread-local. |
719 | fn enter<F, R>(self, f: F) -> R |
720 | where |
721 | F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R), |
722 | { |
723 | let context = self.context.expect_current_thread(); |
724 | |
725 | // Remove `core` from `context` to pass into the closure. |
726 | let core = context.core.borrow_mut().take().expect("core missing" ); |
727 | |
728 | // Call the closure and place `core` back |
729 | let (core, ret) = context::set_scheduler(&self.context, || f(core, context)); |
730 | |
731 | *context.core.borrow_mut() = Some(core); |
732 | |
733 | ret |
734 | } |
735 | } |
736 | |
737 | impl Drop for CoreGuard<'_> { |
738 | fn drop(&mut self) { |
739 | let context: &Context = self.context.expect_current_thread(); |
740 | |
741 | if let Some(core: Box) = context.core.borrow_mut().take() { |
742 | // Replace old scheduler back into the state to allow |
743 | // other threads to pick it up and drive it. |
744 | self.scheduler.core.set(val:core); |
745 | |
746 | // Wake up other possible threads that could steal the driver. |
747 | self.scheduler.notify.notify_one() |
748 | } |
749 | } |
750 | } |
751 | |