1#[cfg(tokio_unstable)]
2use crate::runtime;
3use crate::runtime::{context, scheduler, RuntimeFlavor};
4
5/// Handle to the runtime.
6///
7/// The handle is internally reference-counted and can be freely cloned. A handle can be
8/// obtained using the [`Runtime::handle`] method.
9///
10/// [`Runtime::handle`]: crate::runtime::Runtime::handle()
11#[derive(Debug, Clone)]
12// When the `rt` feature is *not* enabled, this type is still defined, but not
13// included in the public API.
14pub struct Handle {
15 pub(crate) inner: scheduler::Handle,
16}
17
18use crate::runtime::task::JoinHandle;
19use crate::util::error::{CONTEXT_MISSING_ERROR, THREAD_LOCAL_DESTROYED_ERROR};
20
21use std::future::Future;
22use std::marker::PhantomData;
23use std::{error, fmt};
24
25/// Runtime context guard.
26///
27/// Returned by [`Runtime::enter`] and [`Handle::enter`], the context guard exits
28/// the runtime context on drop.
29///
30/// [`Runtime::enter`]: fn@crate::runtime::Runtime::enter
31#[derive(Debug)]
32#[must_use = "Creating and dropping a guard does nothing"]
33pub struct EnterGuard<'a> {
34 _guard: context::SetCurrentGuard,
35 _handle_lifetime: PhantomData<&'a Handle>,
36}
37
38impl Handle {
39 /// Enters the runtime context. This allows you to construct types that must
40 /// have an executor available on creation such as [`Sleep`] or
41 /// [`TcpStream`]. It will also allow you to call methods such as
42 /// [`tokio::spawn`] and [`Handle::current`] without panicking.
43 ///
44 /// # Panics
45 ///
46 /// When calling `Handle::enter` multiple times, the returned guards
47 /// **must** be dropped in the reverse order that they were acquired.
48 /// Failure to do so will result in a panic and possible memory leaks.
49 ///
50 /// # Examples
51 ///
52 /// ```
53 /// use tokio::runtime::Runtime;
54 ///
55 /// let rt = Runtime::new().unwrap();
56 ///
57 /// let _guard = rt.enter();
58 /// tokio::spawn(async {
59 /// println!("Hello world!");
60 /// });
61 /// ```
62 ///
63 /// Do **not** do the following, this shows a scenario that will result in a
64 /// panic and possible memory leak.
65 ///
66 /// ```should_panic
67 /// use tokio::runtime::Runtime;
68 ///
69 /// let rt1 = Runtime::new().unwrap();
70 /// let rt2 = Runtime::new().unwrap();
71 ///
72 /// let enter1 = rt1.enter();
73 /// let enter2 = rt2.enter();
74 ///
75 /// drop(enter1);
76 /// drop(enter2);
77 /// ```
78 ///
79 /// [`Sleep`]: struct@crate::time::Sleep
80 /// [`TcpStream`]: struct@crate::net::TcpStream
81 /// [`tokio::spawn`]: fn@crate::spawn
82 pub fn enter(&self) -> EnterGuard<'_> {
83 EnterGuard {
84 _guard: match context::try_set_current(&self.inner) {
85 Some(guard) => guard,
86 None => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
87 },
88 _handle_lifetime: PhantomData,
89 }
90 }
91
92 /// Returns a `Handle` view over the currently running `Runtime`.
93 ///
94 /// # Panics
95 ///
96 /// This will panic if called outside the context of a Tokio runtime. That means that you must
97 /// call this on one of the threads **being run by the runtime**, or from a thread with an active
98 /// `EnterGuard`. Calling this from within a thread created by `std::thread::spawn` (for example)
99 /// will cause a panic unless that thread has an active `EnterGuard`.
100 ///
101 /// # Examples
102 ///
103 /// This can be used to obtain the handle of the surrounding runtime from an async
104 /// block or function running on that runtime.
105 ///
106 /// ```
107 /// # use std::thread;
108 /// # use tokio::runtime::Runtime;
109 /// # fn dox() {
110 /// # let rt = Runtime::new().unwrap();
111 /// # rt.spawn(async {
112 /// use tokio::runtime::Handle;
113 ///
114 /// // Inside an async block or function.
115 /// let handle = Handle::current();
116 /// handle.spawn(async {
117 /// println!("now running in the existing Runtime");
118 /// });
119 ///
120 /// # let handle =
121 /// thread::spawn(move || {
122 /// // Notice that the handle is created outside of this thread and then moved in
123 /// handle.spawn(async { /* ... */ });
124 /// // This next line would cause a panic because we haven't entered the runtime
125 /// // and created an EnterGuard
126 /// // let handle2 = Handle::current(); // panic
127 /// // So we create a guard here with Handle::enter();
128 /// let _guard = handle.enter();
129 /// // Now we can call Handle::current();
130 /// let handle2 = Handle::current();
131 /// });
132 /// # handle.join().unwrap();
133 /// # });
134 /// # }
135 /// ```
136 #[track_caller]
137 pub fn current() -> Self {
138 Handle {
139 inner: scheduler::Handle::current(),
140 }
141 }
142
143 /// Returns a Handle view over the currently running Runtime
144 ///
145 /// Returns an error if no Runtime has been started
146 ///
147 /// Contrary to `current`, this never panics
148 pub fn try_current() -> Result<Self, TryCurrentError> {
149 context::with_current(|inner| Handle {
150 inner: inner.clone(),
151 })
152 }
153
154 /// Spawns a future onto the Tokio runtime.
155 ///
156 /// This spawns the given future onto the runtime's executor, usually a
157 /// thread pool. The thread pool is then responsible for polling the future
158 /// until it completes.
159 ///
160 /// The provided future will start running in the background immediately
161 /// when `spawn` is called, even if you don't await the returned
162 /// `JoinHandle`.
163 ///
164 /// See [module level][mod] documentation for more details.
165 ///
166 /// [mod]: index.html
167 ///
168 /// # Examples
169 ///
170 /// ```
171 /// use tokio::runtime::Runtime;
172 ///
173 /// # fn dox() {
174 /// // Create the runtime
175 /// let rt = Runtime::new().unwrap();
176 /// // Get a handle from this runtime
177 /// let handle = rt.handle();
178 ///
179 /// // Spawn a future onto the runtime using the handle
180 /// handle.spawn(async {
181 /// println!("now running on a worker thread");
182 /// });
183 /// # }
184 /// ```
185 #[track_caller]
186 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
187 where
188 F: Future + Send + 'static,
189 F::Output: Send + 'static,
190 {
191 self.spawn_named(future, None)
192 }
193
194 /// Runs the provided function on an executor dedicated to blocking
195 /// operations.
196 ///
197 /// # Examples
198 ///
199 /// ```
200 /// use tokio::runtime::Runtime;
201 ///
202 /// # fn dox() {
203 /// // Create the runtime
204 /// let rt = Runtime::new().unwrap();
205 /// // Get a handle from this runtime
206 /// let handle = rt.handle();
207 ///
208 /// // Spawn a blocking function onto the runtime using the handle
209 /// handle.spawn_blocking(|| {
210 /// println!("now running on a worker thread");
211 /// });
212 /// # }
213 #[track_caller]
214 pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
215 where
216 F: FnOnce() -> R + Send + 'static,
217 R: Send + 'static,
218 {
219 self.inner.blocking_spawner().spawn_blocking(self, func)
220 }
221
222 /// Runs a future to completion on this `Handle`'s associated `Runtime`.
223 ///
224 /// This runs the given future on the current thread, blocking until it is
225 /// complete, and yielding its resolved result. Any tasks or timers which
226 /// the future spawns internally will be executed on the runtime.
227 ///
228 /// When this is used on a `current_thread` runtime, only the
229 /// [`Runtime::block_on`] method can drive the IO and timer drivers, but the
230 /// `Handle::block_on` method cannot drive them. This means that, when using
231 /// this method on a `current_thread` runtime, anything that relies on IO or
232 /// timers will not work unless there is another thread currently calling
233 /// [`Runtime::block_on`] on the same runtime.
234 ///
235 /// # If the runtime has been shut down
236 ///
237 /// If the `Handle`'s associated `Runtime` has been shut down (through
238 /// [`Runtime::shutdown_background`], [`Runtime::shutdown_timeout`], or by
239 /// dropping it) and `Handle::block_on` is used it might return an error or
240 /// panic. Specifically IO resources will return an error and timers will
241 /// panic. Runtime independent futures will run as normal.
242 ///
243 /// # Panics
244 ///
245 /// This function panics if the provided future panics, if called within an
246 /// asynchronous execution context, or if a timer future is executed on a
247 /// runtime that has been shut down.
248 ///
249 /// # Examples
250 ///
251 /// ```
252 /// use tokio::runtime::Runtime;
253 ///
254 /// // Create the runtime
255 /// let rt = Runtime::new().unwrap();
256 ///
257 /// // Get a handle from this runtime
258 /// let handle = rt.handle();
259 ///
260 /// // Execute the future, blocking the current thread until completion
261 /// handle.block_on(async {
262 /// println!("hello");
263 /// });
264 /// ```
265 ///
266 /// Or using `Handle::current`:
267 ///
268 /// ```
269 /// use tokio::runtime::Handle;
270 ///
271 /// #[tokio::main]
272 /// async fn main () {
273 /// let handle = Handle::current();
274 /// std::thread::spawn(move || {
275 /// // Using Handle::block_on to run async code in the new thread.
276 /// handle.block_on(async {
277 /// println!("hello");
278 /// });
279 /// });
280 /// }
281 /// ```
282 ///
283 /// [`JoinError`]: struct@crate::task::JoinError
284 /// [`JoinHandle`]: struct@crate::task::JoinHandle
285 /// [`Runtime::block_on`]: fn@crate::runtime::Runtime::block_on
286 /// [`Runtime::shutdown_background`]: fn@crate::runtime::Runtime::shutdown_background
287 /// [`Runtime::shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout
288 /// [`spawn_blocking`]: crate::task::spawn_blocking
289 /// [`tokio::fs`]: crate::fs
290 /// [`tokio::net`]: crate::net
291 /// [`tokio::time`]: crate::time
292 #[track_caller]
293 pub fn block_on<F: Future>(&self, future: F) -> F::Output {
294 #[cfg(all(
295 tokio_unstable,
296 tokio_taskdump,
297 feature = "rt",
298 target_os = "linux",
299 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
300 ))]
301 let future = super::task::trace::Trace::root(future);
302
303 #[cfg(all(tokio_unstable, feature = "tracing"))]
304 let future =
305 crate::util::trace::task(future, "block_on", None, super::task::Id::next().as_u64());
306
307 // Enter the runtime context. This sets the current driver handles and
308 // prevents blocking an existing runtime.
309 context::enter_runtime(&self.inner, true, |blocking| {
310 blocking.block_on(future).expect("failed to park thread")
311 })
312 }
313
314 #[track_caller]
315 pub(crate) fn spawn_named<F>(&self, future: F, _name: Option<&str>) -> JoinHandle<F::Output>
316 where
317 F: Future + Send + 'static,
318 F::Output: Send + 'static,
319 {
320 let id = crate::runtime::task::Id::next();
321 #[cfg(all(
322 tokio_unstable,
323 tokio_taskdump,
324 feature = "rt",
325 target_os = "linux",
326 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
327 ))]
328 let future = super::task::trace::Trace::root(future);
329 #[cfg(all(tokio_unstable, feature = "tracing"))]
330 let future = crate::util::trace::task(future, "task", _name, id.as_u64());
331 self.inner.spawn(future, id)
332 }
333
334 /// Returns the flavor of the current `Runtime`.
335 ///
336 /// # Examples
337 ///
338 /// ```
339 /// use tokio::runtime::{Handle, RuntimeFlavor};
340 ///
341 /// #[tokio::main(flavor = "current_thread")]
342 /// async fn main() {
343 /// assert_eq!(RuntimeFlavor::CurrentThread, Handle::current().runtime_flavor());
344 /// }
345 /// ```
346 ///
347 /// ```
348 /// use tokio::runtime::{Handle, RuntimeFlavor};
349 ///
350 /// #[tokio::main(flavor = "multi_thread", worker_threads = 4)]
351 /// async fn main() {
352 /// assert_eq!(RuntimeFlavor::MultiThread, Handle::current().runtime_flavor());
353 /// }
354 /// ```
355 pub fn runtime_flavor(&self) -> RuntimeFlavor {
356 match self.inner {
357 scheduler::Handle::CurrentThread(_) => RuntimeFlavor::CurrentThread,
358 #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
359 scheduler::Handle::MultiThread(_) => RuntimeFlavor::MultiThread,
360 #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))]
361 scheduler::Handle::MultiThreadAlt(_) => RuntimeFlavor::MultiThreadAlt,
362 }
363 }
364
365 cfg_unstable! {
366 /// Returns the [`Id`] of the current `Runtime`.
367 ///
368 /// # Examples
369 ///
370 /// ```
371 /// use tokio::runtime::Handle;
372 ///
373 /// #[tokio::main(flavor = "current_thread")]
374 /// async fn main() {
375 /// println!("Current runtime id: {}", Handle::current().id());
376 /// }
377 /// ```
378 ///
379 /// **Note**: This is an [unstable API][unstable]. The public API of this type
380 /// may break in 1.x releases. See [the documentation on unstable
381 /// features][unstable] for details.
382 ///
383 /// [unstable]: crate#unstable-features
384 /// [`Id`]: struct@crate::runtime::Id
385 pub fn id(&self) -> runtime::Id {
386 let owned_id = match &self.inner {
387 scheduler::Handle::CurrentThread(handle) => handle.owned_id(),
388 #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
389 scheduler::Handle::MultiThread(handle) => handle.owned_id(),
390 #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))]
391 scheduler::Handle::MultiThreadAlt(handle) => handle.owned_id(),
392 };
393 owned_id.into()
394 }
395 }
396}
397
398cfg_metrics! {
399 use crate::runtime::RuntimeMetrics;
400
401 impl Handle {
402 /// Returns a view that lets you get information about how the runtime
403 /// is performing.
404 pub fn metrics(&self) -> RuntimeMetrics {
405 RuntimeMetrics::new(self.clone())
406 }
407 }
408}
409
410cfg_taskdump! {
411 impl Handle {
412 /// Captures a snapshot of the runtime's state.
413 ///
414 /// This functionality is experimental, and comes with a number of
415 /// requirements and limitations.
416 ///
417 /// # Examples
418 ///
419 /// This can be used to get call traces of each task in the runtime.
420 /// Calls to `Handle::dump` should usually be enclosed in a
421 /// [timeout][crate::time::timeout], so that dumping does not escalate a
422 /// single blocked runtime thread into an entirely blocked runtime.
423 ///
424 /// ```
425 /// # use tokio::runtime::Runtime;
426 /// # fn dox() {
427 /// # let rt = Runtime::new().unwrap();
428 /// # rt.spawn(async {
429 /// use tokio::runtime::Handle;
430 /// use tokio::time::{timeout, Duration};
431 ///
432 /// // Inside an async block or function.
433 /// let handle = Handle::current();
434 /// if let Ok(dump) = timeout(Duration::from_secs(2), handle.dump()).await {
435 /// for (i, task) in dump.tasks().iter().enumerate() {
436 /// let trace = task.trace();
437 /// println!("TASK {i}:");
438 /// println!("{trace}\n");
439 /// }
440 /// }
441 /// # });
442 /// # }
443 /// ```
444 ///
445 /// This produces highly detailed traces of tasks; e.g.:
446 ///
447 /// ```plain
448 /// TASK 0:
449 /// ╼ dump::main::{{closure}}::a::{{closure}} at /tokio/examples/dump.rs:18:20
450 /// └╼ dump::main::{{closure}}::b::{{closure}} at /tokio/examples/dump.rs:23:20
451 /// └╼ dump::main::{{closure}}::c::{{closure}} at /tokio/examples/dump.rs:28:24
452 /// └╼ tokio::sync::barrier::Barrier::wait::{{closure}} at /tokio/tokio/src/sync/barrier.rs:129:10
453 /// └╼ <tokio::util::trace::InstrumentedAsyncOp<F> as core::future::future::Future>::poll at /tokio/tokio/src/util/trace.rs:77:46
454 /// └╼ tokio::sync::barrier::Barrier::wait_internal::{{closure}} at /tokio/tokio/src/sync/barrier.rs:183:36
455 /// └╼ tokio::sync::watch::Receiver<T>::changed::{{closure}} at /tokio/tokio/src/sync/watch.rs:604:55
456 /// └╼ tokio::sync::watch::changed_impl::{{closure}} at /tokio/tokio/src/sync/watch.rs:755:18
457 /// └╼ <tokio::sync::notify::Notified as core::future::future::Future>::poll at /tokio/tokio/src/sync/notify.rs:1103:9
458 /// └╼ tokio::sync::notify::Notified::poll_notified at /tokio/tokio/src/sync/notify.rs:996:32
459 /// ```
460 ///
461 /// # Requirements
462 ///
463 /// ## Debug Info Must Be Available
464 ///
465 /// To produce task traces, the application must **not** be compiled
466 /// with split debuginfo. On Linux, including debuginfo within the
467 /// application binary is the (correct) default. You can further ensure
468 /// this behavior with the following directive in your `Cargo.toml`:
469 ///
470 /// ```toml
471 /// [profile.*]
472 /// split-debuginfo = "off"
473 /// ```
474 ///
475 /// ## Unstable Features
476 ///
477 /// This functionality is **unstable**, and requires both the
478 /// `tokio_unstable` and `tokio_taskdump` cfg flags to be set.
479 ///
480 /// You can do this by setting the `RUSTFLAGS` environment variable
481 /// before invoking `cargo`; e.g.:
482 /// ```bash
483 /// RUSTFLAGS="--cfg tokio_unstable --cfg tokio_taskdump" cargo run --example dump
484 /// ```
485 ///
486 /// Or by [configuring][cargo-config] `rustflags` in
487 /// `.cargo/config.toml`:
488 /// ```text
489 /// [build]
490 /// rustflags = ["--cfg", "tokio_unstable", "--cfg", "tokio_taskdump"]
491 /// ```
492 ///
493 /// [cargo-config]:
494 /// https://doc.rust-lang.org/cargo/reference/config.html
495 ///
496 /// ## Platform Requirements
497 ///
498 /// Task dumps are supported on Linux atop aarch64, x86 and x86_64.
499 ///
500 /// ## Current Thread Runtime Requirements
501 ///
502 /// On the `current_thread` runtime, task dumps may only be requested
503 /// from *within* the context of the runtime being dumped. Do not, for
504 /// example, await `Handle::dump()` on a different runtime.
505 ///
506 /// # Limitations
507 ///
508 /// ## Performance
509 ///
510 /// Although enabling the `tokio_taskdump` feature imposes virtually no
511 /// additional runtime overhead, actually calling `Handle::dump` is
512 /// expensive. The runtime must synchronize and pause its workers, then
513 /// re-poll every task in a special tracing mode. Avoid requesting dumps
514 /// often.
515 ///
516 /// ## Local Executors
517 ///
518 /// Tasks managed by local executors (e.g., `FuturesUnordered` and
519 /// [`LocalSet`][crate::task::LocalSet]) may not appear in task dumps.
520 ///
521 /// ## Non-Termination When Workers Are Blocked
522 ///
523 /// The future produced by `Handle::dump` may never produce `Ready` if
524 /// another runtime worker is blocked for more than 250ms. This may
525 /// occur if a dump is requested during shutdown, or if another runtime
526 /// worker is infinite looping or synchronously deadlocked. For these
527 /// reasons, task dumping should usually be paired with an explicit
528 /// [timeout][crate::time::timeout].
529 pub async fn dump(&self) -> crate::runtime::Dump {
530 match &self.inner {
531 scheduler::Handle::CurrentThread(handle) => handle.dump(),
532 #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
533 scheduler::Handle::MultiThread(handle) => {
534 // perform the trace in a separate thread so that the
535 // trace itself does not appear in the taskdump.
536 let handle = handle.clone();
537 spawn_thread(async {
538 let handle = handle;
539 handle.dump().await
540 }).await
541 },
542 #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))]
543 scheduler::Handle::MultiThreadAlt(_) => panic!("task dump not implemented for this runtime flavor"),
544 }
545 }
546
547 /// Produces `true` if the current task is being traced for a dump;
548 /// otherwise false. This function is only public for integration
549 /// testing purposes. Do not rely on it.
550 #[doc(hidden)]
551 pub fn is_tracing() -> bool {
552 super::task::trace::Context::is_tracing()
553 }
554 }
555
556 cfg_rt_multi_thread! {
557 /// Spawn a new thread and asynchronously await on its result.
558 async fn spawn_thread<F>(f: F) -> <F as Future>::Output
559 where
560 F: Future + Send + 'static,
561 <F as Future>::Output: Send + 'static
562 {
563 let (tx, rx) = crate::sync::oneshot::channel();
564 crate::loom::thread::spawn(|| {
565 let rt = crate::runtime::Builder::new_current_thread().build().unwrap();
566 rt.block_on(async {
567 let _ = tx.send(f.await);
568 });
569 });
570 rx.await.unwrap()
571 }
572 }
573}
574
575/// Error returned by `try_current` when no Runtime has been started
576#[derive(Debug)]
577pub struct TryCurrentError {
578 kind: TryCurrentErrorKind,
579}
580
581impl TryCurrentError {
582 pub(crate) fn new_no_context() -> Self {
583 Self {
584 kind: TryCurrentErrorKind::NoContext,
585 }
586 }
587
588 pub(crate) fn new_thread_local_destroyed() -> Self {
589 Self {
590 kind: TryCurrentErrorKind::ThreadLocalDestroyed,
591 }
592 }
593
594 /// Returns true if the call failed because there is currently no runtime in
595 /// the Tokio context.
596 pub fn is_missing_context(&self) -> bool {
597 matches!(self.kind, TryCurrentErrorKind::NoContext)
598 }
599
600 /// Returns true if the call failed because the Tokio context thread-local
601 /// had been destroyed. This can usually only happen if in the destructor of
602 /// other thread-locals.
603 pub fn is_thread_local_destroyed(&self) -> bool {
604 matches!(self.kind, TryCurrentErrorKind::ThreadLocalDestroyed)
605 }
606}
607
608enum TryCurrentErrorKind {
609 NoContext,
610 ThreadLocalDestroyed,
611}
612
613impl fmt::Debug for TryCurrentErrorKind {
614 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
615 match self {
616 TryCurrentErrorKind::NoContext => f.write_str("NoContext"),
617 TryCurrentErrorKind::ThreadLocalDestroyed => f.write_str("ThreadLocalDestroyed"),
618 }
619 }
620}
621
622impl fmt::Display for TryCurrentError {
623 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
624 use TryCurrentErrorKind as E;
625 match self.kind {
626 E::NoContext => f.write_str(CONTEXT_MISSING_ERROR),
627 E::ThreadLocalDestroyed => f.write_str(THREAD_LOCAL_DESTROYED_ERROR),
628 }
629 }
630}
631
632impl error::Error for TryCurrentError {}
633