| 1 | use super::BOX_FUTURE_THRESHOLD; |
| 2 | use crate::runtime::blocking::BlockingPool; |
| 3 | use crate::runtime::scheduler::CurrentThread; |
| 4 | use crate::runtime::{context, EnterGuard, Handle}; |
| 5 | use crate::task::JoinHandle; |
| 6 | use crate::util::trace::SpawnMeta; |
| 7 | |
| 8 | use std::future::Future; |
| 9 | use std::mem; |
| 10 | use std::time::Duration; |
| 11 | |
| 12 | cfg_rt_multi_thread! { |
| 13 | use crate::runtime::Builder; |
| 14 | use crate::runtime::scheduler::MultiThread; |
| 15 | |
| 16 | cfg_unstable! { |
| 17 | use crate::runtime::scheduler::MultiThreadAlt; |
| 18 | } |
| 19 | } |
| 20 | |
| 21 | /// The Tokio runtime. |
| 22 | /// |
| 23 | /// The runtime provides an I/O driver, task scheduler, [timer], and |
| 24 | /// blocking pool, necessary for running asynchronous tasks. |
| 25 | /// |
| 26 | /// Instances of `Runtime` can be created using [`new`], or [`Builder`]. |
| 27 | /// However, most users will use the [`#[tokio::main]`][main] annotation on |
| 28 | /// their entry point instead. |
| 29 | /// |
| 30 | /// See [module level][mod] documentation for more details. |
| 31 | /// |
| 32 | /// # Shutdown |
| 33 | /// |
| 34 | /// Shutting down the runtime is done by dropping the value, or calling |
| 35 | /// [`shutdown_background`] or [`shutdown_timeout`]. |
| 36 | /// |
| 37 | /// Tasks spawned through [`Runtime::spawn`] keep running until they yield. |
| 38 | /// Then they are dropped. They are not *guaranteed* to run to completion, but |
| 39 | /// *might* do so if they do not yield until completion. |
| 40 | /// |
| 41 | /// Blocking functions spawned through [`Runtime::spawn_blocking`] keep running |
| 42 | /// until they return. |
| 43 | /// |
| 44 | /// The thread initiating the shutdown blocks until all spawned work has been |
| 45 | /// stopped. This can take an indefinite amount of time. The `Drop` |
| 46 | /// implementation waits forever for this. |
| 47 | /// |
| 48 | /// The [`shutdown_background`] and [`shutdown_timeout`] methods can be used if |
| 49 | /// waiting forever is undesired. When the timeout is reached, spawned work that |
| 50 | /// did not stop in time and threads running it are leaked. The work continues |
| 51 | /// to run until one of the stopping conditions is fulfilled, but the thread |
| 52 | /// initiating the shutdown is unblocked. |
| 53 | /// |
| 54 | /// Once the runtime has been dropped, any outstanding I/O resources bound to |
| 55 | /// it will no longer function. Calling any method on them will result in an |
| 56 | /// error. |
| 57 | /// |
| 58 | /// # Sharing |
| 59 | /// |
| 60 | /// There are several ways to establish shared access to a Tokio runtime: |
| 61 | /// |
| 62 | /// * Using an <code>[Arc]\<Runtime></code>. |
| 63 | /// * Using a [`Handle`]. |
| 64 | /// * Entering the runtime context. |
| 65 | /// |
| 66 | /// Using an <code>[Arc]\<Runtime></code> or [`Handle`] allows you to do various |
| 67 | /// things with the runtime such as spawning new tasks or entering the runtime |
| 68 | /// context. Both types can be cloned to create a new handle that allows access |
| 69 | /// to the same runtime. By passing clones into different tasks or threads, you |
| 70 | /// will be able to access the runtime from those tasks or threads. |
| 71 | /// |
| 72 | /// The difference between <code>[Arc]\<Runtime></code> and [`Handle`] is that |
| 73 | /// an <code>[Arc]\<Runtime></code> will prevent the runtime from shutting down, |
| 74 | /// whereas a [`Handle`] does not prevent that. This is because shutdown of the |
| 75 | /// runtime happens when the destructor of the `Runtime` object runs. |
| 76 | /// |
| 77 | /// Calls to [`shutdown_background`] and [`shutdown_timeout`] require exclusive |
| 78 | /// ownership of the `Runtime` type. When using an <code>[Arc]\<Runtime></code>, |
| 79 | /// this can be achieved via [`Arc::try_unwrap`] when only one strong count |
| 80 | /// reference is left over. |
| 81 | /// |
| 82 | /// The runtime context is entered using the [`Runtime::enter`] or |
| 83 | /// [`Handle::enter`] methods, which use a thread-local variable to store the |
| 84 | /// current runtime. Whenever you are inside the runtime context, methods such |
| 85 | /// as [`tokio::spawn`] will use the runtime whose context you are inside. |
| 86 | /// |
| 87 | /// [timer]: crate::time |
| 88 | /// [mod]: index.html |
| 89 | /// [`new`]: method@Self::new |
| 90 | /// [`Builder`]: struct@Builder |
| 91 | /// [`Handle`]: struct@Handle |
| 92 | /// [main]: macro@crate::main |
| 93 | /// [`tokio::spawn`]: crate::spawn |
| 94 | /// [`Arc::try_unwrap`]: std::sync::Arc::try_unwrap |
| 95 | /// [Arc]: std::sync::Arc |
| 96 | /// [`shutdown_background`]: method@Runtime::shutdown_background |
| 97 | /// [`shutdown_timeout`]: method@Runtime::shutdown_timeout |
| 98 | #[derive (Debug)] |
| 99 | pub struct Runtime { |
| 100 | /// Task scheduler |
| 101 | scheduler: Scheduler, |
| 102 | |
| 103 | /// Handle to runtime, also contains driver handles |
| 104 | handle: Handle, |
| 105 | |
| 106 | /// Blocking pool handle, used to signal shutdown |
| 107 | blocking_pool: BlockingPool, |
| 108 | } |
| 109 | |
| 110 | /// The flavor of a `Runtime`. |
| 111 | /// |
| 112 | /// This is the return type for [`Handle::runtime_flavor`](crate::runtime::Handle::runtime_flavor()). |
| 113 | #[derive (Debug, PartialEq, Eq)] |
| 114 | #[non_exhaustive ] |
| 115 | pub enum RuntimeFlavor { |
| 116 | /// The flavor that executes all tasks on the current thread. |
| 117 | CurrentThread, |
| 118 | /// The flavor that executes tasks across multiple threads. |
| 119 | MultiThread, |
| 120 | /// The flavor that executes tasks across multiple threads. |
| 121 | #[cfg (tokio_unstable)] |
| 122 | #[cfg_attr (docsrs, doc(cfg(tokio_unstable)))] |
| 123 | MultiThreadAlt, |
| 124 | } |
| 125 | |
| 126 | /// The runtime scheduler is either a multi-thread or a current-thread executor. |
| 127 | #[derive (Debug)] |
| 128 | pub(super) enum Scheduler { |
| 129 | /// Execute all tasks on the current-thread. |
| 130 | CurrentThread(CurrentThread), |
| 131 | |
| 132 | /// Execute tasks across multiple threads. |
| 133 | #[cfg (feature = "rt-multi-thread" )] |
| 134 | MultiThread(MultiThread), |
| 135 | |
| 136 | /// Execute tasks across multiple threads. |
| 137 | #[cfg (all(tokio_unstable, feature = "rt-multi-thread" ))] |
| 138 | MultiThreadAlt(MultiThreadAlt), |
| 139 | } |
| 140 | |
| 141 | impl Runtime { |
| 142 | pub(super) fn from_parts( |
| 143 | scheduler: Scheduler, |
| 144 | handle: Handle, |
| 145 | blocking_pool: BlockingPool, |
| 146 | ) -> Runtime { |
| 147 | Runtime { |
| 148 | scheduler, |
| 149 | handle, |
| 150 | blocking_pool, |
| 151 | } |
| 152 | } |
| 153 | |
| 154 | /// Creates a new runtime instance with default configuration values. |
| 155 | /// |
| 156 | /// This results in the multi threaded scheduler, I/O driver, and time driver being |
| 157 | /// initialized. |
| 158 | /// |
| 159 | /// Most applications will not need to call this function directly. Instead, |
| 160 | /// they will use the [`#[tokio::main]` attribute][main]. When a more complex |
| 161 | /// configuration is necessary, the [runtime builder] may be used. |
| 162 | /// |
| 163 | /// See [module level][mod] documentation for more details. |
| 164 | /// |
| 165 | /// # Examples |
| 166 | /// |
| 167 | /// Creating a new `Runtime` with default configuration values. |
| 168 | /// |
| 169 | /// ``` |
| 170 | /// use tokio::runtime::Runtime; |
| 171 | /// |
| 172 | /// let rt = Runtime::new() |
| 173 | /// .unwrap(); |
| 174 | /// |
| 175 | /// // Use the runtime... |
| 176 | /// ``` |
| 177 | /// |
| 178 | /// [mod]: index.html |
| 179 | /// [main]: ../attr.main.html |
| 180 | /// [threaded scheduler]: index.html#threaded-scheduler |
| 181 | /// [runtime builder]: crate::runtime::Builder |
| 182 | #[cfg (feature = "rt-multi-thread" )] |
| 183 | #[cfg_attr (docsrs, doc(cfg(feature = "rt-multi-thread" )))] |
| 184 | pub fn new() -> std::io::Result<Runtime> { |
| 185 | Builder::new_multi_thread().enable_all().build() |
| 186 | } |
| 187 | |
| 188 | /// Returns a handle to the runtime's spawner. |
| 189 | /// |
| 190 | /// The returned handle can be used to spawn tasks that run on this runtime, and can |
| 191 | /// be cloned to allow moving the `Handle` to other threads. |
| 192 | /// |
| 193 | /// Calling [`Handle::block_on`] on a handle to a `current_thread` runtime is error-prone. |
| 194 | /// Refer to the documentation of [`Handle::block_on`] for more. |
| 195 | /// |
| 196 | /// # Examples |
| 197 | /// |
| 198 | /// ``` |
| 199 | /// use tokio::runtime::Runtime; |
| 200 | /// |
| 201 | /// let rt = Runtime::new() |
| 202 | /// .unwrap(); |
| 203 | /// |
| 204 | /// let handle = rt.handle(); |
| 205 | /// |
| 206 | /// // Use the handle... |
| 207 | /// ``` |
| 208 | pub fn handle(&self) -> &Handle { |
| 209 | &self.handle |
| 210 | } |
| 211 | |
| 212 | /// Spawns a future onto the Tokio runtime. |
| 213 | /// |
| 214 | /// This spawns the given future onto the runtime's executor, usually a |
| 215 | /// thread pool. The thread pool is then responsible for polling the future |
| 216 | /// until it completes. |
| 217 | /// |
| 218 | /// The provided future will start running in the background immediately |
| 219 | /// when `spawn` is called, even if you don't await the returned |
| 220 | /// `JoinHandle`. |
| 221 | /// |
| 222 | /// See [module level][mod] documentation for more details. |
| 223 | /// |
| 224 | /// [mod]: index.html |
| 225 | /// |
| 226 | /// # Examples |
| 227 | /// |
| 228 | /// ``` |
| 229 | /// use tokio::runtime::Runtime; |
| 230 | /// |
| 231 | /// # fn dox() { |
| 232 | /// // Create the runtime |
| 233 | /// let rt = Runtime::new().unwrap(); |
| 234 | /// |
| 235 | /// // Spawn a future onto the runtime |
| 236 | /// rt.spawn(async { |
| 237 | /// println!("now running on a worker thread" ); |
| 238 | /// }); |
| 239 | /// # } |
| 240 | /// ``` |
| 241 | #[track_caller ] |
| 242 | pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> |
| 243 | where |
| 244 | F: Future + Send + 'static, |
| 245 | F::Output: Send + 'static, |
| 246 | { |
| 247 | let fut_size = mem::size_of::<F>(); |
| 248 | if fut_size > BOX_FUTURE_THRESHOLD { |
| 249 | self.handle |
| 250 | .spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) |
| 251 | } else { |
| 252 | self.handle |
| 253 | .spawn_named(future, SpawnMeta::new_unnamed(fut_size)) |
| 254 | } |
| 255 | } |
| 256 | |
| 257 | /// Runs the provided function on an executor dedicated to blocking operations. |
| 258 | /// |
| 259 | /// # Examples |
| 260 | /// |
| 261 | /// ``` |
| 262 | /// use tokio::runtime::Runtime; |
| 263 | /// |
| 264 | /// # fn dox() { |
| 265 | /// // Create the runtime |
| 266 | /// let rt = Runtime::new().unwrap(); |
| 267 | /// |
| 268 | /// // Spawn a blocking function onto the runtime |
| 269 | /// rt.spawn_blocking(|| { |
| 270 | /// println!("now running on a worker thread" ); |
| 271 | /// }); |
| 272 | /// # } |
| 273 | /// ``` |
| 274 | #[track_caller ] |
| 275 | pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R> |
| 276 | where |
| 277 | F: FnOnce() -> R + Send + 'static, |
| 278 | R: Send + 'static, |
| 279 | { |
| 280 | self.handle.spawn_blocking(func) |
| 281 | } |
| 282 | |
| 283 | /// Runs a future to completion on the Tokio runtime. This is the |
| 284 | /// runtime's entry point. |
| 285 | /// |
| 286 | /// This runs the given future on the current thread, blocking until it is |
| 287 | /// complete, and yielding its resolved result. Any tasks or timers |
| 288 | /// which the future spawns internally will be executed on the runtime. |
| 289 | /// |
| 290 | /// # Non-worker future |
| 291 | /// |
| 292 | /// Note that the future required by this function does not run as a |
| 293 | /// worker. The expectation is that other tasks are spawned by the future here. |
| 294 | /// Awaiting on other futures from the future provided here will not |
| 295 | /// perform as fast as those spawned as workers. |
| 296 | /// |
| 297 | /// # Multi thread scheduler |
| 298 | /// |
| 299 | /// When the multi thread scheduler is used this will allow futures |
| 300 | /// to run within the io driver and timer context of the overall runtime. |
| 301 | /// |
| 302 | /// Any spawned tasks will continue running after `block_on` returns. |
| 303 | /// |
| 304 | /// # Current thread scheduler |
| 305 | /// |
| 306 | /// When the current thread scheduler is enabled `block_on` |
| 307 | /// can be called concurrently from multiple threads. The first call |
| 308 | /// will take ownership of the io and timer drivers. This means |
| 309 | /// other threads which do not own the drivers will hook into that one. |
| 310 | /// When the first `block_on` completes, other threads will be able to |
| 311 | /// "steal" the driver to allow continued execution of their futures. |
| 312 | /// |
| 313 | /// Any spawned tasks will be suspended after `block_on` returns. Calling |
| 314 | /// `block_on` again will resume previously spawned tasks. |
| 315 | /// |
| 316 | /// # Panics |
| 317 | /// |
| 318 | /// This function panics if the provided future panics, or if called within an |
| 319 | /// asynchronous execution context. |
| 320 | /// |
| 321 | /// # Examples |
| 322 | /// |
| 323 | /// ```no_run |
| 324 | /// use tokio::runtime::Runtime; |
| 325 | /// |
| 326 | /// // Create the runtime |
| 327 | /// let rt = Runtime::new().unwrap(); |
| 328 | /// |
| 329 | /// // Execute the future, blocking the current thread until completion |
| 330 | /// rt.block_on(async { |
| 331 | /// println!("hello" ); |
| 332 | /// }); |
| 333 | /// ``` |
| 334 | /// |
| 335 | /// [handle]: fn@Handle::block_on |
| 336 | #[track_caller ] |
| 337 | pub fn block_on<F: Future>(&self, future: F) -> F::Output { |
| 338 | let fut_size = mem::size_of::<F>(); |
| 339 | if fut_size > BOX_FUTURE_THRESHOLD { |
| 340 | self.block_on_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) |
| 341 | } else { |
| 342 | self.block_on_inner(future, SpawnMeta::new_unnamed(fut_size)) |
| 343 | } |
| 344 | } |
| 345 | |
| 346 | #[track_caller ] |
| 347 | fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output { |
| 348 | #[cfg (all( |
| 349 | tokio_unstable, |
| 350 | tokio_taskdump, |
| 351 | feature = "rt" , |
| 352 | target_os = "linux" , |
| 353 | any(target_arch = "aarch64" , target_arch = "x86" , target_arch = "x86_64" ) |
| 354 | ))] |
| 355 | let future = super::task::trace::Trace::root(future); |
| 356 | |
| 357 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
| 358 | let future = crate::util::trace::task( |
| 359 | future, |
| 360 | "block_on" , |
| 361 | _meta, |
| 362 | crate::runtime::task::Id::next().as_u64(), |
| 363 | ); |
| 364 | |
| 365 | let _enter = self.enter(); |
| 366 | |
| 367 | match &self.scheduler { |
| 368 | Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future), |
| 369 | #[cfg (feature = "rt-multi-thread" )] |
| 370 | Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future), |
| 371 | #[cfg (all(tokio_unstable, feature = "rt-multi-thread" ))] |
| 372 | Scheduler::MultiThreadAlt(exec) => exec.block_on(&self.handle.inner, future), |
| 373 | } |
| 374 | } |
| 375 | |
| 376 | /// Enters the runtime context. |
| 377 | /// |
| 378 | /// This allows you to construct types that must have an executor |
| 379 | /// available on creation such as [`Sleep`] or [`TcpStream`]. It will |
| 380 | /// also allow you to call methods such as [`tokio::spawn`]. |
| 381 | /// |
| 382 | /// [`Sleep`]: struct@crate::time::Sleep |
| 383 | /// [`TcpStream`]: struct@crate::net::TcpStream |
| 384 | /// [`tokio::spawn`]: fn@crate::spawn |
| 385 | /// |
| 386 | /// # Example |
| 387 | /// |
| 388 | /// ``` |
| 389 | /// use tokio::runtime::Runtime; |
| 390 | /// use tokio::task::JoinHandle; |
| 391 | /// |
| 392 | /// fn function_that_spawns(msg: String) -> JoinHandle<()> { |
| 393 | /// // Had we not used `rt.enter` below, this would panic. |
| 394 | /// tokio::spawn(async move { |
| 395 | /// println!("{}" , msg); |
| 396 | /// }) |
| 397 | /// } |
| 398 | /// |
| 399 | /// fn main() { |
| 400 | /// let rt = Runtime::new().unwrap(); |
| 401 | /// |
| 402 | /// let s = "Hello World!" .to_string(); |
| 403 | /// |
| 404 | /// // By entering the context, we tie `tokio::spawn` to this executor. |
| 405 | /// let _guard = rt.enter(); |
| 406 | /// let handle = function_that_spawns(s); |
| 407 | /// |
| 408 | /// // Wait for the task before we end the test. |
| 409 | /// rt.block_on(handle).unwrap(); |
| 410 | /// } |
| 411 | /// ``` |
| 412 | pub fn enter(&self) -> EnterGuard<'_> { |
| 413 | self.handle.enter() |
| 414 | } |
| 415 | |
| 416 | /// Shuts down the runtime, waiting for at most `duration` for all spawned |
| 417 | /// work to stop. |
| 418 | /// |
| 419 | /// See the [struct level documentation](Runtime#shutdown) for more details. |
| 420 | /// |
| 421 | /// # Examples |
| 422 | /// |
| 423 | /// ``` |
| 424 | /// # if cfg!(miri) { return } // Miri reports error when main thread terminated without waiting all remaining threads. |
| 425 | /// use tokio::runtime::Runtime; |
| 426 | /// use tokio::task; |
| 427 | /// |
| 428 | /// use std::thread; |
| 429 | /// use std::time::Duration; |
| 430 | /// |
| 431 | /// fn main() { |
| 432 | /// let runtime = Runtime::new().unwrap(); |
| 433 | /// |
| 434 | /// runtime.block_on(async move { |
| 435 | /// task::spawn_blocking(move || { |
| 436 | /// thread::sleep(Duration::from_secs(10_000)); |
| 437 | /// }); |
| 438 | /// }); |
| 439 | /// |
| 440 | /// runtime.shutdown_timeout(Duration::from_millis(100)); |
| 441 | /// } |
| 442 | /// ``` |
| 443 | pub fn shutdown_timeout(mut self, duration: Duration) { |
| 444 | // Wakeup and shutdown all the worker threads |
| 445 | self.handle.inner.shutdown(); |
| 446 | self.blocking_pool.shutdown(Some(duration)); |
| 447 | } |
| 448 | |
| 449 | /// Shuts down the runtime, without waiting for any spawned work to stop. |
| 450 | /// |
| 451 | /// This can be useful if you want to drop a runtime from within another runtime. |
| 452 | /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks |
| 453 | /// to complete, which would normally not be permitted within an asynchronous context. |
| 454 | /// By calling `shutdown_background()`, you can drop the runtime from such a context. |
| 455 | /// |
| 456 | /// Note however, that because we do not wait for any blocking tasks to complete, this |
| 457 | /// may result in a resource leak (in that any blocking tasks are still running until they |
| 458 | /// return. |
| 459 | /// |
| 460 | /// See the [struct level documentation](Runtime#shutdown) for more details. |
| 461 | /// |
| 462 | /// This function is equivalent to calling `shutdown_timeout(Duration::from_nanos(0))`. |
| 463 | /// |
| 464 | /// ``` |
| 465 | /// use tokio::runtime::Runtime; |
| 466 | /// |
| 467 | /// fn main() { |
| 468 | /// let runtime = Runtime::new().unwrap(); |
| 469 | /// |
| 470 | /// runtime.block_on(async move { |
| 471 | /// let inner_runtime = Runtime::new().unwrap(); |
| 472 | /// // ... |
| 473 | /// inner_runtime.shutdown_background(); |
| 474 | /// }); |
| 475 | /// } |
| 476 | /// ``` |
| 477 | pub fn shutdown_background(self) { |
| 478 | self.shutdown_timeout(Duration::from_nanos(0)); |
| 479 | } |
| 480 | |
| 481 | /// Returns a view that lets you get information about how the runtime |
| 482 | /// is performing. |
| 483 | pub fn metrics(&self) -> crate::runtime::RuntimeMetrics { |
| 484 | self.handle.metrics() |
| 485 | } |
| 486 | } |
| 487 | |
| 488 | #[allow (clippy::single_match)] // there are comments in the error branch, so we don't want if-let |
| 489 | impl Drop for Runtime { |
| 490 | fn drop(&mut self) { |
| 491 | match &mut self.scheduler { |
| 492 | Scheduler::CurrentThread(current_thread: &mut CurrentThread) => { |
| 493 | // This ensures that tasks spawned on the current-thread |
| 494 | // runtime are dropped inside the runtime's context. |
| 495 | let _guard: Option = context::try_set_current(&self.handle.inner); |
| 496 | current_thread.shutdown(&self.handle.inner); |
| 497 | } |
| 498 | #[cfg (feature = "rt-multi-thread" )] |
| 499 | Scheduler::MultiThread(multi_thread: &mut MultiThread) => { |
| 500 | // The threaded scheduler drops its tasks on its worker threads, which is |
| 501 | // already in the runtime's context. |
| 502 | multi_thread.shutdown(&self.handle.inner); |
| 503 | } |
| 504 | #[cfg (all(tokio_unstable, feature = "rt-multi-thread" ))] |
| 505 | Scheduler::MultiThreadAlt(multi_thread) => { |
| 506 | // The threaded scheduler drops its tasks on its worker threads, which is |
| 507 | // already in the runtime's context. |
| 508 | multi_thread.shutdown(&self.handle.inner); |
| 509 | } |
| 510 | } |
| 511 | } |
| 512 | } |
| 513 | |
| 514 | impl std::panic::UnwindSafe for Runtime {} |
| 515 | |
| 516 | impl std::panic::RefUnwindSafe for Runtime {} |
| 517 | |