1use crate::runtime::{context, scheduler, RuntimeFlavor};
2
3/// Handle to the runtime.
4///
5/// The handle is internally reference-counted and can be freely cloned. A handle can be
6/// obtained using the [`Runtime::handle`] method.
7///
8/// [`Runtime::handle`]: crate::runtime::Runtime::handle()
9#[derive(Debug, Clone)]
10// When the `rt` feature is *not* enabled, this type is still defined, but not
11// included in the public API.
12pub struct Handle {
13 pub(crate) inner: scheduler::Handle,
14}
15
16use crate::runtime::task::JoinHandle;
17use crate::util::error::{CONTEXT_MISSING_ERROR, THREAD_LOCAL_DESTROYED_ERROR};
18
19use std::future::Future;
20use std::marker::PhantomData;
21use std::{error, fmt};
22
23/// Runtime context guard.
24///
25/// Returned by [`Runtime::enter`] and [`Handle::enter`], the context guard exits
26/// the runtime context on drop.
27///
28/// [`Runtime::enter`]: fn@crate::runtime::Runtime::enter
29#[derive(Debug)]
30#[must_use = "Creating and dropping a guard does nothing"]
31pub struct EnterGuard<'a> {
32 _guard: context::SetCurrentGuard,
33 _handle_lifetime: PhantomData<&'a Handle>,
34}
35
36impl Handle {
37 /// Enters the runtime context. This allows you to construct types that must
38 /// have an executor available on creation such as [`Sleep`] or
39 /// [`TcpStream`]. It will also allow you to call methods such as
40 /// [`tokio::spawn`] and [`Handle::current`] without panicking.
41 ///
42 /// # Panics
43 ///
44 /// When calling `Handle::enter` multiple times, the returned guards
45 /// **must** be dropped in the reverse order that they were acquired.
46 /// Failure to do so will result in a panic and possible memory leaks.
47 ///
48 /// # Examples
49 ///
50 /// ```
51 /// use tokio::runtime::Runtime;
52 ///
53 /// let rt = Runtime::new().unwrap();
54 ///
55 /// let _guard = rt.enter();
56 /// tokio::spawn(async {
57 /// println!("Hello world!");
58 /// });
59 /// ```
60 ///
61 /// Do **not** do the following, this shows a scenario that will result in a
62 /// panic and possible memory leak.
63 ///
64 /// ```should_panic
65 /// use tokio::runtime::Runtime;
66 ///
67 /// let rt1 = Runtime::new().unwrap();
68 /// let rt2 = Runtime::new().unwrap();
69 ///
70 /// let enter1 = rt1.enter();
71 /// let enter2 = rt2.enter();
72 ///
73 /// drop(enter1);
74 /// drop(enter2);
75 /// ```
76 ///
77 /// [`Sleep`]: struct@crate::time::Sleep
78 /// [`TcpStream`]: struct@crate::net::TcpStream
79 /// [`tokio::spawn`]: fn@crate::spawn
80 pub fn enter(&self) -> EnterGuard<'_> {
81 EnterGuard {
82 _guard: match context::try_set_current(&self.inner) {
83 Some(guard) => guard,
84 None => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
85 },
86 _handle_lifetime: PhantomData,
87 }
88 }
89
90 /// Returns a `Handle` view over the currently running `Runtime`.
91 ///
92 /// # Panics
93 ///
94 /// This will panic if called outside the context of a Tokio runtime. That means that you must
95 /// call this on one of the threads **being run by the runtime**, or from a thread with an active
96 /// `EnterGuard`. Calling this from within a thread created by `std::thread::spawn` (for example)
97 /// will cause a panic unless that thread has an active `EnterGuard`.
98 ///
99 /// # Examples
100 ///
101 /// This can be used to obtain the handle of the surrounding runtime from an async
102 /// block or function running on that runtime.
103 ///
104 /// ```
105 /// # use std::thread;
106 /// # use tokio::runtime::Runtime;
107 /// # fn dox() {
108 /// # let rt = Runtime::new().unwrap();
109 /// # rt.spawn(async {
110 /// use tokio::runtime::Handle;
111 ///
112 /// // Inside an async block or function.
113 /// let handle = Handle::current();
114 /// handle.spawn(async {
115 /// println!("now running in the existing Runtime");
116 /// });
117 ///
118 /// # let handle =
119 /// thread::spawn(move || {
120 /// // Notice that the handle is created outside of this thread and then moved in
121 /// handle.spawn(async { /* ... */ });
122 /// // This next line would cause a panic because we haven't entered the runtime
123 /// // and created an EnterGuard
124 /// // let handle2 = Handle::current(); // panic
125 /// // So we create a guard here with Handle::enter();
126 /// let _guard = handle.enter();
127 /// // Now we can call Handle::current();
128 /// let handle2 = Handle::current();
129 /// });
130 /// # handle.join().unwrap();
131 /// # });
132 /// # }
133 /// ```
134 #[track_caller]
135 pub fn current() -> Self {
136 Handle {
137 inner: scheduler::Handle::current(),
138 }
139 }
140
141 /// Returns a Handle view over the currently running Runtime
142 ///
143 /// Returns an error if no Runtime has been started
144 ///
145 /// Contrary to `current`, this never panics
146 pub fn try_current() -> Result<Self, TryCurrentError> {
147 context::with_current(|inner| Handle {
148 inner: inner.clone(),
149 })
150 }
151
152 /// Spawns a future onto the Tokio runtime.
153 ///
154 /// This spawns the given future onto the runtime's executor, usually a
155 /// thread pool. The thread pool is then responsible for polling the future
156 /// until it completes.
157 ///
158 /// The provided future will start running in the background immediately
159 /// when `spawn` is called, even if you don't await the returned
160 /// `JoinHandle`.
161 ///
162 /// See [module level][mod] documentation for more details.
163 ///
164 /// [mod]: index.html
165 ///
166 /// # Examples
167 ///
168 /// ```
169 /// use tokio::runtime::Runtime;
170 ///
171 /// # fn dox() {
172 /// // Create the runtime
173 /// let rt = Runtime::new().unwrap();
174 /// // Get a handle from this runtime
175 /// let handle = rt.handle();
176 ///
177 /// // Spawn a future onto the runtime using the handle
178 /// handle.spawn(async {
179 /// println!("now running on a worker thread");
180 /// });
181 /// # }
182 /// ```
183 #[track_caller]
184 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
185 where
186 F: Future + Send + 'static,
187 F::Output: Send + 'static,
188 {
189 self.spawn_named(future, None)
190 }
191
192 /// Runs the provided function on an executor dedicated to blocking
193 /// operations.
194 ///
195 /// # Examples
196 ///
197 /// ```
198 /// use tokio::runtime::Runtime;
199 ///
200 /// # fn dox() {
201 /// // Create the runtime
202 /// let rt = Runtime::new().unwrap();
203 /// // Get a handle from this runtime
204 /// let handle = rt.handle();
205 ///
206 /// // Spawn a blocking function onto the runtime using the handle
207 /// handle.spawn_blocking(|| {
208 /// println!("now running on a worker thread");
209 /// });
210 /// # }
211 #[track_caller]
212 pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
213 where
214 F: FnOnce() -> R + Send + 'static,
215 R: Send + 'static,
216 {
217 self.inner.blocking_spawner().spawn_blocking(self, func)
218 }
219
220 /// Runs a future to completion on this `Handle`'s associated `Runtime`.
221 ///
222 /// This runs the given future on the current thread, blocking until it is
223 /// complete, and yielding its resolved result. Any tasks or timers which
224 /// the future spawns internally will be executed on the runtime.
225 ///
226 /// When this is used on a `current_thread` runtime, only the
227 /// [`Runtime::block_on`] method can drive the IO and timer drivers, but the
228 /// `Handle::block_on` method cannot drive them. This means that, when using
229 /// this method on a current_thread runtime, anything that relies on IO or
230 /// timers will not work unless there is another thread currently calling
231 /// [`Runtime::block_on`] on the same runtime.
232 ///
233 /// # If the runtime has been shut down
234 ///
235 /// If the `Handle`'s associated `Runtime` has been shut down (through
236 /// [`Runtime::shutdown_background`], [`Runtime::shutdown_timeout`], or by
237 /// dropping it) and `Handle::block_on` is used it might return an error or
238 /// panic. Specifically IO resources will return an error and timers will
239 /// panic. Runtime independent futures will run as normal.
240 ///
241 /// # Panics
242 ///
243 /// This function panics if the provided future panics, if called within an
244 /// asynchronous execution context, or if a timer future is executed on a
245 /// runtime that has been shut down.
246 ///
247 /// # Examples
248 ///
249 /// ```
250 /// use tokio::runtime::Runtime;
251 ///
252 /// // Create the runtime
253 /// let rt = Runtime::new().unwrap();
254 ///
255 /// // Get a handle from this runtime
256 /// let handle = rt.handle();
257 ///
258 /// // Execute the future, blocking the current thread until completion
259 /// handle.block_on(async {
260 /// println!("hello");
261 /// });
262 /// ```
263 ///
264 /// Or using `Handle::current`:
265 ///
266 /// ```
267 /// use tokio::runtime::Handle;
268 ///
269 /// #[tokio::main]
270 /// async fn main () {
271 /// let handle = Handle::current();
272 /// std::thread::spawn(move || {
273 /// // Using Handle::block_on to run async code in the new thread.
274 /// handle.block_on(async {
275 /// println!("hello");
276 /// });
277 /// });
278 /// }
279 /// ```
280 ///
281 /// [`JoinError`]: struct@crate::task::JoinError
282 /// [`JoinHandle`]: struct@crate::task::JoinHandle
283 /// [`Runtime::block_on`]: fn@crate::runtime::Runtime::block_on
284 /// [`Runtime::shutdown_background`]: fn@crate::runtime::Runtime::shutdown_background
285 /// [`Runtime::shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout
286 /// [`spawn_blocking`]: crate::task::spawn_blocking
287 /// [`tokio::fs`]: crate::fs
288 /// [`tokio::net`]: crate::net
289 /// [`tokio::time`]: crate::time
290 #[track_caller]
291 pub fn block_on<F: Future>(&self, future: F) -> F::Output {
292 #[cfg(all(
293 tokio_unstable,
294 tokio_taskdump,
295 feature = "rt",
296 target_os = "linux",
297 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
298 ))]
299 let future = super::task::trace::Trace::root(future);
300
301 #[cfg(all(tokio_unstable, feature = "tracing"))]
302 let future =
303 crate::util::trace::task(future, "block_on", None, super::task::Id::next().as_u64());
304
305 // Enter the runtime context. This sets the current driver handles and
306 // prevents blocking an existing runtime.
307 context::enter_runtime(&self.inner, true, |blocking| {
308 blocking.block_on(future).expect("failed to park thread")
309 })
310 }
311
312 #[track_caller]
313 pub(crate) fn spawn_named<F>(&self, future: F, _name: Option<&str>) -> JoinHandle<F::Output>
314 where
315 F: Future + Send + 'static,
316 F::Output: Send + 'static,
317 {
318 let id = crate::runtime::task::Id::next();
319 #[cfg(all(
320 tokio_unstable,
321 tokio_taskdump,
322 feature = "rt",
323 target_os = "linux",
324 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
325 ))]
326 let future = super::task::trace::Trace::root(future);
327 #[cfg(all(tokio_unstable, feature = "tracing"))]
328 let future = crate::util::trace::task(future, "task", _name, id.as_u64());
329 self.inner.spawn(future, id)
330 }
331
332 /// Returns the flavor of the current `Runtime`.
333 ///
334 /// # Examples
335 ///
336 /// ```
337 /// use tokio::runtime::{Handle, RuntimeFlavor};
338 ///
339 /// #[tokio::main(flavor = "current_thread")]
340 /// async fn main() {
341 /// assert_eq!(RuntimeFlavor::CurrentThread, Handle::current().runtime_flavor());
342 /// }
343 /// ```
344 ///
345 /// ```
346 /// use tokio::runtime::{Handle, RuntimeFlavor};
347 ///
348 /// #[tokio::main(flavor = "multi_thread", worker_threads = 4)]
349 /// async fn main() {
350 /// assert_eq!(RuntimeFlavor::MultiThread, Handle::current().runtime_flavor());
351 /// }
352 /// ```
353 pub fn runtime_flavor(&self) -> RuntimeFlavor {
354 match self.inner {
355 scheduler::Handle::CurrentThread(_) => RuntimeFlavor::CurrentThread,
356 #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
357 scheduler::Handle::MultiThread(_) => RuntimeFlavor::MultiThread,
358 }
359 }
360}
361
362cfg_metrics! {
363 use crate::runtime::RuntimeMetrics;
364
365 impl Handle {
366 /// Returns a view that lets you get information about how the runtime
367 /// is performing.
368 pub fn metrics(&self) -> RuntimeMetrics {
369 RuntimeMetrics::new(self.clone())
370 }
371 }
372}
373
374cfg_taskdump! {
375 impl Handle {
376 /// Captures a snapshot of the runtime's state.
377 ///
378 /// This functionality is experimental, and comes with a number of
379 /// requirements and limitations.
380 ///
381 /// # Examples
382 ///
383 /// This can be used to get call traces of each task in the runtime.
384 /// Calls to `Handle::dump` should usually be enclosed in a
385 /// [timeout][crate::time::timeout], so that dumping does not escalate a
386 /// single blocked runtime thread into an entirely blocked runtime.
387 ///
388 /// ```
389 /// # use tokio::runtime::Runtime;
390 /// # fn dox() {
391 /// # let rt = Runtime::new().unwrap();
392 /// # rt.spawn(async {
393 /// use tokio::runtime::Handle;
394 /// use tokio::time::{timeout, Duration};
395 ///
396 /// // Inside an async block or function.
397 /// let handle = Handle::current();
398 /// if let Ok(dump) = timeout(Duration::from_secs(2), handle.dump()).await {
399 /// for (i, task) in dump.tasks().iter().enumerate() {
400 /// let trace = task.trace();
401 /// println!("TASK {i}:");
402 /// println!("{trace}\n");
403 /// }
404 /// }
405 /// # });
406 /// # }
407 /// ```
408 ///
409 /// This produces highly detailed traces of tasks; e.g.:
410 ///
411 /// ```plain
412 /// TASK 0:
413 /// ╼ dump::main::{{closure}}::a::{{closure}} at /tokio/examples/dump.rs:18:20
414 /// └╼ dump::main::{{closure}}::b::{{closure}} at /tokio/examples/dump.rs:23:20
415 /// └╼ dump::main::{{closure}}::c::{{closure}} at /tokio/examples/dump.rs:28:24
416 /// └╼ tokio::sync::barrier::Barrier::wait::{{closure}} at /tokio/tokio/src/sync/barrier.rs:129:10
417 /// └╼ <tokio::util::trace::InstrumentedAsyncOp<F> as core::future::future::Future>::poll at /tokio/tokio/src/util/trace.rs:77:46
418 /// └╼ tokio::sync::barrier::Barrier::wait_internal::{{closure}} at /tokio/tokio/src/sync/barrier.rs:183:36
419 /// └╼ tokio::sync::watch::Receiver<T>::changed::{{closure}} at /tokio/tokio/src/sync/watch.rs:604:55
420 /// └╼ tokio::sync::watch::changed_impl::{{closure}} at /tokio/tokio/src/sync/watch.rs:755:18
421 /// └╼ <tokio::sync::notify::Notified as core::future::future::Future>::poll at /tokio/tokio/src/sync/notify.rs:1103:9
422 /// └╼ tokio::sync::notify::Notified::poll_notified at /tokio/tokio/src/sync/notify.rs:996:32
423 /// ```
424 ///
425 /// # Requirements
426 ///
427 /// ## Debug Info Must Be Available
428 ///
429 /// To produce task traces, the application must **not** be compiled
430 /// with split debuginfo. On Linux, including debuginfo within the
431 /// application binary is the (correct) default. You can further ensure
432 /// this behavior with the following directive in your `Cargo.toml`:
433 ///
434 /// ```toml
435 /// [profile.*]
436 /// split-debuginfo = "off"
437 /// ```
438 ///
439 /// ## Unstable Features
440 ///
441 /// This functionality is **unstable**, and requires both the
442 /// `tokio_unstable` and `tokio_taskdump` cfg flags to be set.
443 ///
444 /// You can do this by setting the `RUSTFLAGS` environment variable
445 /// before invoking `cargo`; e.g.:
446 /// ```bash
447 /// RUSTFLAGS="--cfg tokio_unstable --cfg tokio_taskdump" cargo run --example dump
448 /// ```
449 ///
450 /// Or by [configuring][cargo-config] `rustflags` in
451 /// `.cargo/config.toml`:
452 /// ```text
453 /// [build]
454 /// rustflags = ["--cfg tokio_unstable", "--cfg tokio_taskdump"]
455 /// ```
456 ///
457 /// [cargo-config]:
458 /// https://doc.rust-lang.org/cargo/reference/config.html
459 ///
460 /// ## Platform Requirements
461 ///
462 /// Task dumps are supported on Linux atop aarch64, x86 and x86_64.
463 ///
464 /// ## Current Thread Runtime Requirements
465 ///
466 /// On the `current_thread` runtime, task dumps may only be requested
467 /// from *within* the context of the runtime being dumped. Do not, for
468 /// example, await `Handle::dump()` on a different runtime.
469 ///
470 /// # Limitations
471 ///
472 /// ## Performance
473 ///
474 /// Although enabling the `tokio_taskdump` feature imposes virtually no
475 /// additional runtime overhead, actually calling `Handle::dump` is
476 /// expensive. The runtime must synchronize and pause its workers, then
477 /// re-poll every task in a special tracing mode. Avoid requesting dumps
478 /// often.
479 ///
480 /// ## Local Executors
481 ///
482 /// Tasks managed by local executors (e.g., `FuturesUnordered` and
483 /// [`LocalSet`][crate::task::LocalSet]) may not appear in task dumps.
484 ///
485 /// ## Non-Termination When Workers Are Blocked
486 ///
487 /// The future produced by `Handle::dump` may never produce `Ready` if
488 /// another runtime worker is blocked for more than 250ms. This may
489 /// occur if a dump is requested during shutdown, or if another runtime
490 /// worker is infinite looping or synchronously deadlocked. For these
491 /// reasons, task dumping should usually be paired with an explicit
492 /// [timeout][crate::time::timeout].
493 pub async fn dump(&self) -> crate::runtime::Dump {
494 match &self.inner {
495 scheduler::Handle::CurrentThread(handle) => handle.dump(),
496 #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
497 scheduler::Handle::MultiThread(handle) => {
498 // perform the trace in a separate thread so that the
499 // trace itself does not appear in the taskdump.
500 let handle = handle.clone();
501 spawn_thread(async {
502 let handle = handle;
503 handle.dump().await
504 }).await
505 },
506 }
507 }
508 }
509
510 cfg_rt_multi_thread! {
511 /// Spawn a new thread and asynchronously await on its result.
512 async fn spawn_thread<F>(f: F) -> <F as Future>::Output
513 where
514 F: Future + Send + 'static,
515 <F as Future>::Output: Send + 'static
516 {
517 let (tx, rx) = crate::sync::oneshot::channel();
518 crate::loom::thread::spawn(|| {
519 let rt = crate::runtime::Builder::new_current_thread().build().unwrap();
520 rt.block_on(async {
521 let _ = tx.send(f.await);
522 });
523 });
524 rx.await.unwrap()
525 }
526 }
527}
528
529/// Error returned by `try_current` when no Runtime has been started
530#[derive(Debug)]
531pub struct TryCurrentError {
532 kind: TryCurrentErrorKind,
533}
534
535impl TryCurrentError {
536 pub(crate) fn new_no_context() -> Self {
537 Self {
538 kind: TryCurrentErrorKind::NoContext,
539 }
540 }
541
542 pub(crate) fn new_thread_local_destroyed() -> Self {
543 Self {
544 kind: TryCurrentErrorKind::ThreadLocalDestroyed,
545 }
546 }
547
548 /// Returns true if the call failed because there is currently no runtime in
549 /// the Tokio context.
550 pub fn is_missing_context(&self) -> bool {
551 matches!(self.kind, TryCurrentErrorKind::NoContext)
552 }
553
554 /// Returns true if the call failed because the Tokio context thread-local
555 /// had been destroyed. This can usually only happen if in the destructor of
556 /// other thread-locals.
557 pub fn is_thread_local_destroyed(&self) -> bool {
558 matches!(self.kind, TryCurrentErrorKind::ThreadLocalDestroyed)
559 }
560}
561
562enum TryCurrentErrorKind {
563 NoContext,
564 ThreadLocalDestroyed,
565}
566
567impl fmt::Debug for TryCurrentErrorKind {
568 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
569 use TryCurrentErrorKind::*;
570 match self {
571 NoContext => f.write_str(data:"NoContext"),
572 ThreadLocalDestroyed => f.write_str(data:"ThreadLocalDestroyed"),
573 }
574 }
575}
576
577impl fmt::Display for TryCurrentError {
578 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
579 use TryCurrentErrorKind::*;
580 match self.kind {
581 NoContext => f.write_str(CONTEXT_MISSING_ERROR),
582 ThreadLocalDestroyed => f.write_str(THREAD_LOCAL_DESTROYED_ERROR),
583 }
584 }
585}
586
587impl error::Error for TryCurrentError {}
588