| 1 | #![cfg_attr (loom, allow(unused_imports))] |
| 2 | |
| 3 | use crate::runtime::handle::Handle; |
| 4 | use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback}; |
| 5 | #[cfg (tokio_unstable)] |
| 6 | use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta}; |
| 7 | use crate::util::rand::{RngSeed, RngSeedGenerator}; |
| 8 | |
| 9 | use crate::runtime::blocking::BlockingPool; |
| 10 | use crate::runtime::scheduler::CurrentThread; |
| 11 | use std::fmt; |
| 12 | use std::io; |
| 13 | use std::thread::ThreadId; |
| 14 | use std::time::Duration; |
| 15 | |
| 16 | /// Builds Tokio Runtime with custom configuration values. |
| 17 | /// |
| 18 | /// Methods can be chained in order to set the configuration values. The |
| 19 | /// Runtime is constructed by calling [`build`]. |
| 20 | /// |
| 21 | /// New instances of `Builder` are obtained via [`Builder::new_multi_thread`] |
| 22 | /// or [`Builder::new_current_thread`]. |
| 23 | /// |
| 24 | /// See function level documentation for details on the various configuration |
| 25 | /// settings. |
| 26 | /// |
| 27 | /// [`build`]: method@Self::build |
| 28 | /// [`Builder::new_multi_thread`]: method@Self::new_multi_thread |
| 29 | /// [`Builder::new_current_thread`]: method@Self::new_current_thread |
| 30 | /// |
| 31 | /// # Examples |
| 32 | /// |
| 33 | /// ``` |
| 34 | /// use tokio::runtime::Builder; |
| 35 | /// |
| 36 | /// fn main() { |
| 37 | /// // build runtime |
| 38 | /// let runtime = Builder::new_multi_thread() |
| 39 | /// .worker_threads(4) |
| 40 | /// .thread_name("my-custom-name" ) |
| 41 | /// .thread_stack_size(3 * 1024 * 1024) |
| 42 | /// .build() |
| 43 | /// .unwrap(); |
| 44 | /// |
| 45 | /// // use runtime ... |
| 46 | /// } |
| 47 | /// ``` |
| 48 | pub struct Builder { |
| 49 | /// Runtime type |
| 50 | kind: Kind, |
| 51 | |
| 52 | /// Whether or not to enable the I/O driver |
| 53 | enable_io: bool, |
| 54 | nevents: usize, |
| 55 | |
| 56 | /// Whether or not to enable the time driver |
| 57 | enable_time: bool, |
| 58 | |
| 59 | /// Whether or not the clock should start paused. |
| 60 | start_paused: bool, |
| 61 | |
| 62 | /// The number of worker threads, used by Runtime. |
| 63 | /// |
| 64 | /// Only used when not using the current-thread executor. |
| 65 | worker_threads: Option<usize>, |
| 66 | |
| 67 | /// Cap on thread usage. |
| 68 | max_blocking_threads: usize, |
| 69 | |
| 70 | /// Name fn used for threads spawned by the runtime. |
| 71 | pub(super) thread_name: ThreadNameFn, |
| 72 | |
| 73 | /// Stack size used for threads spawned by the runtime. |
| 74 | pub(super) thread_stack_size: Option<usize>, |
| 75 | |
| 76 | /// Callback to run after each thread starts. |
| 77 | pub(super) after_start: Option<Callback>, |
| 78 | |
| 79 | /// To run before each worker thread stops |
| 80 | pub(super) before_stop: Option<Callback>, |
| 81 | |
| 82 | /// To run before each worker thread is parked. |
| 83 | pub(super) before_park: Option<Callback>, |
| 84 | |
| 85 | /// To run after each thread is unparked. |
| 86 | pub(super) after_unpark: Option<Callback>, |
| 87 | |
| 88 | /// To run before each task is spawned. |
| 89 | pub(super) before_spawn: Option<TaskCallback>, |
| 90 | |
| 91 | /// To run before each poll |
| 92 | #[cfg (tokio_unstable)] |
| 93 | pub(super) before_poll: Option<TaskCallback>, |
| 94 | |
| 95 | /// To run after each poll |
| 96 | #[cfg (tokio_unstable)] |
| 97 | pub(super) after_poll: Option<TaskCallback>, |
| 98 | |
| 99 | /// To run after each task is terminated. |
| 100 | pub(super) after_termination: Option<TaskCallback>, |
| 101 | |
| 102 | /// Customizable keep alive timeout for `BlockingPool` |
| 103 | pub(super) keep_alive: Option<Duration>, |
| 104 | |
| 105 | /// How many ticks before pulling a task from the global/remote queue? |
| 106 | /// |
| 107 | /// When `None`, the value is unspecified and behavior details are left to |
| 108 | /// the scheduler. Each scheduler flavor could choose to either pick its own |
| 109 | /// default value or use some other strategy to decide when to poll from the |
| 110 | /// global queue. For example, the multi-threaded scheduler uses a |
| 111 | /// self-tuning strategy based on mean task poll times. |
| 112 | pub(super) global_queue_interval: Option<u32>, |
| 113 | |
| 114 | /// How many ticks before yielding to the driver for timer and I/O events? |
| 115 | pub(super) event_interval: u32, |
| 116 | |
| 117 | pub(super) local_queue_capacity: usize, |
| 118 | |
| 119 | /// When true, the multi-threade scheduler LIFO slot should not be used. |
| 120 | /// |
| 121 | /// This option should only be exposed as unstable. |
| 122 | pub(super) disable_lifo_slot: bool, |
| 123 | |
| 124 | /// Specify a random number generator seed to provide deterministic results |
| 125 | pub(super) seed_generator: RngSeedGenerator, |
| 126 | |
| 127 | /// When true, enables task poll count histogram instrumentation. |
| 128 | pub(super) metrics_poll_count_histogram_enable: bool, |
| 129 | |
| 130 | /// Configures the task poll count histogram |
| 131 | pub(super) metrics_poll_count_histogram: HistogramBuilder, |
| 132 | |
| 133 | #[cfg (tokio_unstable)] |
| 134 | pub(super) unhandled_panic: UnhandledPanic, |
| 135 | } |
| 136 | |
| 137 | cfg_unstable! { |
| 138 | /// How the runtime should respond to unhandled panics. |
| 139 | /// |
| 140 | /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic` |
| 141 | /// to configure the runtime behavior when a spawned task panics. |
| 142 | /// |
| 143 | /// See [`Builder::unhandled_panic`] for more details. |
| 144 | #[derive (Debug, Clone)] |
| 145 | #[non_exhaustive ] |
| 146 | pub enum UnhandledPanic { |
| 147 | /// The runtime should ignore panics on spawned tasks. |
| 148 | /// |
| 149 | /// The panic is forwarded to the task's [`JoinHandle`] and all spawned |
| 150 | /// tasks continue running normally. |
| 151 | /// |
| 152 | /// This is the default behavior. |
| 153 | /// |
| 154 | /// # Examples |
| 155 | /// |
| 156 | /// ``` |
| 157 | /// use tokio::runtime::{self, UnhandledPanic}; |
| 158 | /// |
| 159 | /// # pub fn main() { |
| 160 | /// let rt = runtime::Builder::new_current_thread() |
| 161 | /// .unhandled_panic(UnhandledPanic::Ignore) |
| 162 | /// .build() |
| 163 | /// .unwrap(); |
| 164 | /// |
| 165 | /// let task1 = rt.spawn(async { panic!("boom"); }); |
| 166 | /// let task2 = rt.spawn(async { |
| 167 | /// // This task completes normally |
| 168 | /// "done" |
| 169 | /// }); |
| 170 | /// |
| 171 | /// rt.block_on(async { |
| 172 | /// // The panic on the first task is forwarded to the `JoinHandle` |
| 173 | /// assert!(task1.await.is_err()); |
| 174 | /// |
| 175 | /// // The second task completes normally |
| 176 | /// assert!(task2.await.is_ok()); |
| 177 | /// }) |
| 178 | /// # } |
| 179 | /// ``` |
| 180 | /// |
| 181 | /// [`JoinHandle`]: struct@crate::task::JoinHandle |
| 182 | Ignore, |
| 183 | |
| 184 | /// The runtime should immediately shutdown if a spawned task panics. |
| 185 | /// |
| 186 | /// The runtime will immediately shutdown even if the panicked task's |
| 187 | /// [`JoinHandle`] is still available. All further spawned tasks will be |
| 188 | /// immediately dropped and call to [`Runtime::block_on`] will panic. |
| 189 | /// |
| 190 | /// # Examples |
| 191 | /// |
| 192 | /// ```should_panic |
| 193 | /// use tokio::runtime::{self, UnhandledPanic}; |
| 194 | /// |
| 195 | /// # pub fn main() { |
| 196 | /// let rt = runtime::Builder::new_current_thread() |
| 197 | /// .unhandled_panic(UnhandledPanic::ShutdownRuntime) |
| 198 | /// .build() |
| 199 | /// .unwrap(); |
| 200 | /// |
| 201 | /// rt.spawn(async { panic!("boom"); }); |
| 202 | /// rt.spawn(async { |
| 203 | /// // This task never completes. |
| 204 | /// }); |
| 205 | /// |
| 206 | /// rt.block_on(async { |
| 207 | /// // Do some work |
| 208 | /// # loop { tokio::task::yield_now().await; } |
| 209 | /// }) |
| 210 | /// # } |
| 211 | /// ``` |
| 212 | /// |
| 213 | /// [`JoinHandle`]: struct@crate::task::JoinHandle |
| 214 | ShutdownRuntime, |
| 215 | } |
| 216 | } |
| 217 | |
| 218 | pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>; |
| 219 | |
| 220 | #[derive (Clone, Copy)] |
| 221 | pub(crate) enum Kind { |
| 222 | CurrentThread, |
| 223 | #[cfg (feature = "rt-multi-thread" )] |
| 224 | MultiThread, |
| 225 | #[cfg (all(tokio_unstable, feature = "rt-multi-thread" ))] |
| 226 | MultiThreadAlt, |
| 227 | } |
| 228 | |
| 229 | impl Builder { |
| 230 | /// Returns a new builder with the current thread scheduler selected. |
| 231 | /// |
| 232 | /// Configuration methods can be chained on the return value. |
| 233 | /// |
| 234 | /// To spawn non-`Send` tasks on the resulting runtime, combine it with a |
| 235 | /// [`LocalSet`]. |
| 236 | /// |
| 237 | /// [`LocalSet`]: crate::task::LocalSet |
| 238 | pub fn new_current_thread() -> Builder { |
| 239 | #[cfg (loom)] |
| 240 | const EVENT_INTERVAL: u32 = 4; |
| 241 | // The number `61` is fairly arbitrary. I believe this value was copied from golang. |
| 242 | #[cfg (not(loom))] |
| 243 | const EVENT_INTERVAL: u32 = 61; |
| 244 | |
| 245 | Builder::new(Kind::CurrentThread, EVENT_INTERVAL) |
| 246 | } |
| 247 | |
| 248 | /// Returns a new builder with the multi thread scheduler selected. |
| 249 | /// |
| 250 | /// Configuration methods can be chained on the return value. |
| 251 | #[cfg (feature = "rt-multi-thread" )] |
| 252 | #[cfg_attr (docsrs, doc(cfg(feature = "rt-multi-thread" )))] |
| 253 | pub fn new_multi_thread() -> Builder { |
| 254 | // The number `61` is fairly arbitrary. I believe this value was copied from golang. |
| 255 | Builder::new(Kind::MultiThread, 61) |
| 256 | } |
| 257 | |
| 258 | cfg_unstable! { |
| 259 | /// Returns a new builder with the alternate multi thread scheduler |
| 260 | /// selected. |
| 261 | /// |
| 262 | /// The alternate multi threaded scheduler is an in-progress |
| 263 | /// candidate to replace the existing multi threaded scheduler. It |
| 264 | /// currently does not scale as well to 16+ processors. |
| 265 | /// |
| 266 | /// This runtime flavor is currently **not considered production |
| 267 | /// ready**. |
| 268 | /// |
| 269 | /// Configuration methods can be chained on the return value. |
| 270 | #[cfg (feature = "rt-multi-thread" )] |
| 271 | #[cfg_attr (docsrs, doc(cfg(feature = "rt-multi-thread" )))] |
| 272 | pub fn new_multi_thread_alt() -> Builder { |
| 273 | // The number `61` is fairly arbitrary. I believe this value was copied from golang. |
| 274 | Builder::new(Kind::MultiThreadAlt, 61) |
| 275 | } |
| 276 | } |
| 277 | |
| 278 | /// Returns a new runtime builder initialized with default configuration |
| 279 | /// values. |
| 280 | /// |
| 281 | /// Configuration methods can be chained on the return value. |
| 282 | pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder { |
| 283 | Builder { |
| 284 | kind, |
| 285 | |
| 286 | // I/O defaults to "off" |
| 287 | enable_io: false, |
| 288 | nevents: 1024, |
| 289 | |
| 290 | // Time defaults to "off" |
| 291 | enable_time: false, |
| 292 | |
| 293 | // The clock starts not-paused |
| 294 | start_paused: false, |
| 295 | |
| 296 | // Read from environment variable first in multi-threaded mode. |
| 297 | // Default to lazy auto-detection (one thread per CPU core) |
| 298 | worker_threads: None, |
| 299 | |
| 300 | max_blocking_threads: 512, |
| 301 | |
| 302 | // Default thread name |
| 303 | thread_name: std::sync::Arc::new(|| "tokio-runtime-worker" .into()), |
| 304 | |
| 305 | // Do not set a stack size by default |
| 306 | thread_stack_size: None, |
| 307 | |
| 308 | // No worker thread callbacks |
| 309 | after_start: None, |
| 310 | before_stop: None, |
| 311 | before_park: None, |
| 312 | after_unpark: None, |
| 313 | |
| 314 | before_spawn: None, |
| 315 | after_termination: None, |
| 316 | |
| 317 | #[cfg (tokio_unstable)] |
| 318 | before_poll: None, |
| 319 | #[cfg (tokio_unstable)] |
| 320 | after_poll: None, |
| 321 | |
| 322 | keep_alive: None, |
| 323 | |
| 324 | // Defaults for these values depend on the scheduler kind, so we get them |
| 325 | // as parameters. |
| 326 | global_queue_interval: None, |
| 327 | event_interval, |
| 328 | |
| 329 | #[cfg (not(loom))] |
| 330 | local_queue_capacity: 256, |
| 331 | |
| 332 | #[cfg (loom)] |
| 333 | local_queue_capacity: 4, |
| 334 | |
| 335 | seed_generator: RngSeedGenerator::new(RngSeed::new()), |
| 336 | |
| 337 | #[cfg (tokio_unstable)] |
| 338 | unhandled_panic: UnhandledPanic::Ignore, |
| 339 | |
| 340 | metrics_poll_count_histogram_enable: false, |
| 341 | |
| 342 | metrics_poll_count_histogram: HistogramBuilder::default(), |
| 343 | |
| 344 | disable_lifo_slot: false, |
| 345 | } |
| 346 | } |
| 347 | |
| 348 | /// Enables both I/O and time drivers. |
| 349 | /// |
| 350 | /// Doing this is a shorthand for calling `enable_io` and `enable_time` |
| 351 | /// individually. If additional components are added to Tokio in the future, |
| 352 | /// `enable_all` will include these future components. |
| 353 | /// |
| 354 | /// # Examples |
| 355 | /// |
| 356 | /// ``` |
| 357 | /// use tokio::runtime; |
| 358 | /// |
| 359 | /// let rt = runtime::Builder::new_multi_thread() |
| 360 | /// .enable_all() |
| 361 | /// .build() |
| 362 | /// .unwrap(); |
| 363 | /// ``` |
| 364 | pub fn enable_all(&mut self) -> &mut Self { |
| 365 | #[cfg (any( |
| 366 | feature = "net" , |
| 367 | all(unix, feature = "process" ), |
| 368 | all(unix, feature = "signal" ) |
| 369 | ))] |
| 370 | self.enable_io(); |
| 371 | #[cfg (feature = "time" )] |
| 372 | self.enable_time(); |
| 373 | |
| 374 | self |
| 375 | } |
| 376 | |
| 377 | /// Sets the number of worker threads the `Runtime` will use. |
| 378 | /// |
| 379 | /// This can be any number above 0 though it is advised to keep this value |
| 380 | /// on the smaller side. |
| 381 | /// |
| 382 | /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`. |
| 383 | /// |
| 384 | /// # Default |
| 385 | /// |
| 386 | /// The default value is the number of cores available to the system. |
| 387 | /// |
| 388 | /// When using the `current_thread` runtime this method has no effect. |
| 389 | /// |
| 390 | /// # Examples |
| 391 | /// |
| 392 | /// ## Multi threaded runtime with 4 threads |
| 393 | /// |
| 394 | /// ``` |
| 395 | /// use tokio::runtime; |
| 396 | /// |
| 397 | /// // This will spawn a work-stealing runtime with 4 worker threads. |
| 398 | /// let rt = runtime::Builder::new_multi_thread() |
| 399 | /// .worker_threads(4) |
| 400 | /// .build() |
| 401 | /// .unwrap(); |
| 402 | /// |
| 403 | /// rt.spawn(async move {}); |
| 404 | /// ``` |
| 405 | /// |
| 406 | /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`) |
| 407 | /// |
| 408 | /// ``` |
| 409 | /// use tokio::runtime; |
| 410 | /// |
| 411 | /// // Create a runtime that _must_ be driven from a call |
| 412 | /// // to `Runtime::block_on`. |
| 413 | /// let rt = runtime::Builder::new_current_thread() |
| 414 | /// .build() |
| 415 | /// .unwrap(); |
| 416 | /// |
| 417 | /// // This will run the runtime and future on the current thread |
| 418 | /// rt.block_on(async move {}); |
| 419 | /// ``` |
| 420 | /// |
| 421 | /// # Panics |
| 422 | /// |
| 423 | /// This will panic if `val` is not larger than `0`. |
| 424 | #[track_caller ] |
| 425 | pub fn worker_threads(&mut self, val: usize) -> &mut Self { |
| 426 | assert!(val > 0, "Worker threads cannot be set to 0" ); |
| 427 | self.worker_threads = Some(val); |
| 428 | self |
| 429 | } |
| 430 | |
| 431 | /// Specifies the limit for additional threads spawned by the Runtime. |
| 432 | /// |
| 433 | /// These threads are used for blocking operations like tasks spawned |
| 434 | /// through [`spawn_blocking`], this includes but is not limited to: |
| 435 | /// - [`fs`] operations |
| 436 | /// - dns resolution through [`ToSocketAddrs`] |
| 437 | /// - writing to [`Stdout`] or [`Stderr`] |
| 438 | /// - reading from [`Stdin`] |
| 439 | /// |
| 440 | /// Unlike the [`worker_threads`], they are not always active and will exit |
| 441 | /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`]. |
| 442 | /// |
| 443 | /// It's recommended to not set this limit too low in order to avoid hanging on operations |
| 444 | /// requiring [`spawn_blocking`]. |
| 445 | /// |
| 446 | /// The default value is 512. |
| 447 | /// |
| 448 | /// # Panics |
| 449 | /// |
| 450 | /// This will panic if `val` is not larger than `0`. |
| 451 | /// |
| 452 | /// # Upgrading from 0.x |
| 453 | /// |
| 454 | /// In old versions `max_threads` limited both blocking and worker threads, but the |
| 455 | /// current `max_blocking_threads` does not include async worker threads in the count. |
| 456 | /// |
| 457 | /// [`spawn_blocking`]: fn@crate::task::spawn_blocking |
| 458 | /// [`fs`]: mod@crate::fs |
| 459 | /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs |
| 460 | /// [`Stdout`]: struct@crate::io::Stdout |
| 461 | /// [`Stdin`]: struct@crate::io::Stdin |
| 462 | /// [`Stderr`]: struct@crate::io::Stderr |
| 463 | /// [`worker_threads`]: Self::worker_threads |
| 464 | /// [`thread_keep_alive`]: Self::thread_keep_alive |
| 465 | #[track_caller ] |
| 466 | #[cfg_attr (docsrs, doc(alias = "max_threads" ))] |
| 467 | pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self { |
| 468 | assert!(val > 0, "Max blocking threads cannot be set to 0" ); |
| 469 | self.max_blocking_threads = val; |
| 470 | self |
| 471 | } |
| 472 | |
| 473 | /// Sets name of threads spawned by the `Runtime`'s thread pool. |
| 474 | /// |
| 475 | /// The default name is "tokio-runtime-worker". |
| 476 | /// |
| 477 | /// # Examples |
| 478 | /// |
| 479 | /// ``` |
| 480 | /// # use tokio::runtime; |
| 481 | /// |
| 482 | /// # pub fn main() { |
| 483 | /// let rt = runtime::Builder::new_multi_thread() |
| 484 | /// .thread_name("my-pool" ) |
| 485 | /// .build(); |
| 486 | /// # } |
| 487 | /// ``` |
| 488 | pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self { |
| 489 | let val = val.into(); |
| 490 | self.thread_name = std::sync::Arc::new(move || val.clone()); |
| 491 | self |
| 492 | } |
| 493 | |
| 494 | /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool. |
| 495 | /// |
| 496 | /// The default name fn is `|| "tokio-runtime-worker".into()`. |
| 497 | /// |
| 498 | /// # Examples |
| 499 | /// |
| 500 | /// ``` |
| 501 | /// # use tokio::runtime; |
| 502 | /// # use std::sync::atomic::{AtomicUsize, Ordering}; |
| 503 | /// # pub fn main() { |
| 504 | /// let rt = runtime::Builder::new_multi_thread() |
| 505 | /// .thread_name_fn(|| { |
| 506 | /// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); |
| 507 | /// let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); |
| 508 | /// format!("my-pool-{}" , id) |
| 509 | /// }) |
| 510 | /// .build(); |
| 511 | /// # } |
| 512 | /// ``` |
| 513 | pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self |
| 514 | where |
| 515 | F: Fn() -> String + Send + Sync + 'static, |
| 516 | { |
| 517 | self.thread_name = std::sync::Arc::new(f); |
| 518 | self |
| 519 | } |
| 520 | |
| 521 | /// Sets the stack size (in bytes) for worker threads. |
| 522 | /// |
| 523 | /// The actual stack size may be greater than this value if the platform |
| 524 | /// specifies minimal stack size. |
| 525 | /// |
| 526 | /// The default stack size for spawned threads is 2 MiB, though this |
| 527 | /// particular stack size is subject to change in the future. |
| 528 | /// |
| 529 | /// # Examples |
| 530 | /// |
| 531 | /// ``` |
| 532 | /// # use tokio::runtime; |
| 533 | /// |
| 534 | /// # pub fn main() { |
| 535 | /// let rt = runtime::Builder::new_multi_thread() |
| 536 | /// .thread_stack_size(32 * 1024) |
| 537 | /// .build(); |
| 538 | /// # } |
| 539 | /// ``` |
| 540 | pub fn thread_stack_size(&mut self, val: usize) -> &mut Self { |
| 541 | self.thread_stack_size = Some(val); |
| 542 | self |
| 543 | } |
| 544 | |
| 545 | /// Executes function `f` after each thread is started but before it starts |
| 546 | /// doing work. |
| 547 | /// |
| 548 | /// This is intended for bookkeeping and monitoring use cases. |
| 549 | /// |
| 550 | /// # Examples |
| 551 | /// |
| 552 | /// ``` |
| 553 | /// # use tokio::runtime; |
| 554 | /// # pub fn main() { |
| 555 | /// let runtime = runtime::Builder::new_multi_thread() |
| 556 | /// .on_thread_start(|| { |
| 557 | /// println!("thread started" ); |
| 558 | /// }) |
| 559 | /// .build(); |
| 560 | /// # } |
| 561 | /// ``` |
| 562 | #[cfg (not(loom))] |
| 563 | pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self |
| 564 | where |
| 565 | F: Fn() + Send + Sync + 'static, |
| 566 | { |
| 567 | self.after_start = Some(std::sync::Arc::new(f)); |
| 568 | self |
| 569 | } |
| 570 | |
| 571 | /// Executes function `f` before each thread stops. |
| 572 | /// |
| 573 | /// This is intended for bookkeeping and monitoring use cases. |
| 574 | /// |
| 575 | /// # Examples |
| 576 | /// |
| 577 | /// ``` |
| 578 | /// # use tokio::runtime; |
| 579 | /// # pub fn main() { |
| 580 | /// let runtime = runtime::Builder::new_multi_thread() |
| 581 | /// .on_thread_stop(|| { |
| 582 | /// println!("thread stopping" ); |
| 583 | /// }) |
| 584 | /// .build(); |
| 585 | /// # } |
| 586 | /// ``` |
| 587 | #[cfg (not(loom))] |
| 588 | pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self |
| 589 | where |
| 590 | F: Fn() + Send + Sync + 'static, |
| 591 | { |
| 592 | self.before_stop = Some(std::sync::Arc::new(f)); |
| 593 | self |
| 594 | } |
| 595 | |
| 596 | /// Executes function `f` just before a thread is parked (goes idle). |
| 597 | /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn) |
| 598 | /// can be called, and may result in this thread being unparked immediately. |
| 599 | /// |
| 600 | /// This can be used to start work only when the executor is idle, or for bookkeeping |
| 601 | /// and monitoring purposes. |
| 602 | /// |
| 603 | /// Note: There can only be one park callback for a runtime; calling this function |
| 604 | /// more than once replaces the last callback defined, rather than adding to it. |
| 605 | /// |
| 606 | /// # Examples |
| 607 | /// |
| 608 | /// ## Multithreaded executor |
| 609 | /// ``` |
| 610 | /// # use std::sync::Arc; |
| 611 | /// # use std::sync::atomic::{AtomicBool, Ordering}; |
| 612 | /// # use tokio::runtime; |
| 613 | /// # use tokio::sync::Barrier; |
| 614 | /// # pub fn main() { |
| 615 | /// let once = AtomicBool::new(true); |
| 616 | /// let barrier = Arc::new(Barrier::new(2)); |
| 617 | /// |
| 618 | /// let runtime = runtime::Builder::new_multi_thread() |
| 619 | /// .worker_threads(1) |
| 620 | /// .on_thread_park({ |
| 621 | /// let barrier = barrier.clone(); |
| 622 | /// move || { |
| 623 | /// let barrier = barrier.clone(); |
| 624 | /// if once.swap(false, Ordering::Relaxed) { |
| 625 | /// tokio::spawn(async move { barrier.wait().await; }); |
| 626 | /// } |
| 627 | /// } |
| 628 | /// }) |
| 629 | /// .build() |
| 630 | /// .unwrap(); |
| 631 | /// |
| 632 | /// runtime.block_on(async { |
| 633 | /// barrier.wait().await; |
| 634 | /// }) |
| 635 | /// # } |
| 636 | /// ``` |
| 637 | /// ## Current thread executor |
| 638 | /// ``` |
| 639 | /// # use std::sync::Arc; |
| 640 | /// # use std::sync::atomic::{AtomicBool, Ordering}; |
| 641 | /// # use tokio::runtime; |
| 642 | /// # use tokio::sync::Barrier; |
| 643 | /// # pub fn main() { |
| 644 | /// let once = AtomicBool::new(true); |
| 645 | /// let barrier = Arc::new(Barrier::new(2)); |
| 646 | /// |
| 647 | /// let runtime = runtime::Builder::new_current_thread() |
| 648 | /// .on_thread_park({ |
| 649 | /// let barrier = barrier.clone(); |
| 650 | /// move || { |
| 651 | /// let barrier = barrier.clone(); |
| 652 | /// if once.swap(false, Ordering::Relaxed) { |
| 653 | /// tokio::spawn(async move { barrier.wait().await; }); |
| 654 | /// } |
| 655 | /// } |
| 656 | /// }) |
| 657 | /// .build() |
| 658 | /// .unwrap(); |
| 659 | /// |
| 660 | /// runtime.block_on(async { |
| 661 | /// barrier.wait().await; |
| 662 | /// }) |
| 663 | /// # } |
| 664 | /// ``` |
| 665 | #[cfg (not(loom))] |
| 666 | pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self |
| 667 | where |
| 668 | F: Fn() + Send + Sync + 'static, |
| 669 | { |
| 670 | self.before_park = Some(std::sync::Arc::new(f)); |
| 671 | self |
| 672 | } |
| 673 | |
| 674 | /// Executes function `f` just after a thread unparks (starts executing tasks). |
| 675 | /// |
| 676 | /// This is intended for bookkeeping and monitoring use cases; note that work |
| 677 | /// in this callback will increase latencies when the application has allowed one or |
| 678 | /// more runtime threads to go idle. |
| 679 | /// |
| 680 | /// Note: There can only be one unpark callback for a runtime; calling this function |
| 681 | /// more than once replaces the last callback defined, rather than adding to it. |
| 682 | /// |
| 683 | /// # Examples |
| 684 | /// |
| 685 | /// ``` |
| 686 | /// # use tokio::runtime; |
| 687 | /// # pub fn main() { |
| 688 | /// let runtime = runtime::Builder::new_multi_thread() |
| 689 | /// .on_thread_unpark(|| { |
| 690 | /// println!("thread unparking" ); |
| 691 | /// }) |
| 692 | /// .build(); |
| 693 | /// |
| 694 | /// runtime.unwrap().block_on(async { |
| 695 | /// tokio::task::yield_now().await; |
| 696 | /// println!("Hello from Tokio!" ); |
| 697 | /// }) |
| 698 | /// # } |
| 699 | /// ``` |
| 700 | #[cfg (not(loom))] |
| 701 | pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self |
| 702 | where |
| 703 | F: Fn() + Send + Sync + 'static, |
| 704 | { |
| 705 | self.after_unpark = Some(std::sync::Arc::new(f)); |
| 706 | self |
| 707 | } |
| 708 | |
| 709 | /// Executes function `f` just before a task is spawned. |
| 710 | /// |
| 711 | /// `f` is called within the Tokio context, so functions like |
| 712 | /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being |
| 713 | /// invoked immediately. |
| 714 | /// |
| 715 | /// This can be used for bookkeeping or monitoring purposes. |
| 716 | /// |
| 717 | /// Note: There can only be one spawn callback for a runtime; calling this function more |
| 718 | /// than once replaces the last callback defined, rather than adding to it. |
| 719 | /// |
| 720 | /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time. |
| 721 | /// |
| 722 | /// **Note**: This is an [unstable API][unstable]. The public API of this type |
| 723 | /// may break in 1.x releases. See [the documentation on unstable |
| 724 | /// features][unstable] for details. |
| 725 | /// |
| 726 | /// [unstable]: crate#unstable-features |
| 727 | /// |
| 728 | /// # Examples |
| 729 | /// |
| 730 | /// ``` |
| 731 | /// # use tokio::runtime; |
| 732 | /// # pub fn main() { |
| 733 | /// let runtime = runtime::Builder::new_current_thread() |
| 734 | /// .on_task_spawn(|_| { |
| 735 | /// println!("spawning task"); |
| 736 | /// }) |
| 737 | /// .build() |
| 738 | /// .unwrap(); |
| 739 | /// |
| 740 | /// runtime.block_on(async { |
| 741 | /// tokio::task::spawn(std::future::ready(())); |
| 742 | /// |
| 743 | /// for _ in 0..64 { |
| 744 | /// tokio::task::yield_now().await; |
| 745 | /// } |
| 746 | /// }) |
| 747 | /// # } |
| 748 | /// ``` |
| 749 | #[cfg (all(not(loom), tokio_unstable))] |
| 750 | #[cfg_attr (docsrs, doc(cfg(tokio_unstable)))] |
| 751 | pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self |
| 752 | where |
| 753 | F: Fn(&TaskMeta<'_>) + Send + Sync + 'static, |
| 754 | { |
| 755 | self.before_spawn = Some(std::sync::Arc::new(f)); |
| 756 | self |
| 757 | } |
| 758 | |
| 759 | /// Executes function `f` just before a task is polled |
| 760 | /// |
| 761 | /// `f` is called within the Tokio context, so functions like |
| 762 | /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being |
| 763 | /// invoked immediately. |
| 764 | /// |
| 765 | /// **Note**: This is an [unstable API][unstable]. The public API of this type |
| 766 | /// may break in 1.x releases. See [the documentation on unstable |
| 767 | /// features][unstable] for details. |
| 768 | /// |
| 769 | /// [unstable]: crate#unstable-features |
| 770 | /// |
| 771 | /// # Examples |
| 772 | /// |
| 773 | /// ``` |
| 774 | /// # use std::sync::{atomic::AtomicUsize, Arc}; |
| 775 | /// # use tokio::task::yield_now; |
| 776 | /// # pub fn main() { |
| 777 | /// let poll_start_counter = Arc::new(AtomicUsize::new(0)); |
| 778 | /// let poll_start = poll_start_counter.clone(); |
| 779 | /// let rt = tokio::runtime::Builder::new_multi_thread() |
| 780 | /// .enable_all() |
| 781 | /// .on_before_task_poll(move |meta| { |
| 782 | /// println!("task {} is about to be polled", meta.id()) |
| 783 | /// }) |
| 784 | /// .build() |
| 785 | /// .unwrap(); |
| 786 | /// let task = rt.spawn(async { |
| 787 | /// yield_now().await; |
| 788 | /// }); |
| 789 | /// let _ = rt.block_on(task); |
| 790 | /// |
| 791 | /// # } |
| 792 | /// ``` |
| 793 | #[cfg (tokio_unstable)] |
| 794 | pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self |
| 795 | where |
| 796 | F: Fn(&TaskMeta<'_>) + Send + Sync + 'static, |
| 797 | { |
| 798 | self.before_poll = Some(std::sync::Arc::new(f)); |
| 799 | self |
| 800 | } |
| 801 | |
| 802 | /// Executes function `f` just after a task is polled |
| 803 | /// |
| 804 | /// `f` is called within the Tokio context, so functions like |
| 805 | /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being |
| 806 | /// invoked immediately. |
| 807 | /// |
| 808 | /// **Note**: This is an [unstable API][unstable]. The public API of this type |
| 809 | /// may break in 1.x releases. See [the documentation on unstable |
| 810 | /// features][unstable] for details. |
| 811 | /// |
| 812 | /// [unstable]: crate#unstable-features |
| 813 | /// |
| 814 | /// # Examples |
| 815 | /// |
| 816 | /// ``` |
| 817 | /// # use std::sync::{atomic::AtomicUsize, Arc}; |
| 818 | /// # use tokio::task::yield_now; |
| 819 | /// # pub fn main() { |
| 820 | /// let poll_stop_counter = Arc::new(AtomicUsize::new(0)); |
| 821 | /// let poll_stop = poll_stop_counter.clone(); |
| 822 | /// let rt = tokio::runtime::Builder::new_multi_thread() |
| 823 | /// .enable_all() |
| 824 | /// .on_after_task_poll(move |meta| { |
| 825 | /// println!("task {} completed polling", meta.id()); |
| 826 | /// }) |
| 827 | /// .build() |
| 828 | /// .unwrap(); |
| 829 | /// let task = rt.spawn(async { |
| 830 | /// yield_now().await; |
| 831 | /// }); |
| 832 | /// let _ = rt.block_on(task); |
| 833 | /// |
| 834 | /// # } |
| 835 | /// ``` |
| 836 | #[cfg (tokio_unstable)] |
| 837 | pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self |
| 838 | where |
| 839 | F: Fn(&TaskMeta<'_>) + Send + Sync + 'static, |
| 840 | { |
| 841 | self.after_poll = Some(std::sync::Arc::new(f)); |
| 842 | self |
| 843 | } |
| 844 | |
| 845 | /// Executes function `f` just after a task is terminated. |
| 846 | /// |
| 847 | /// `f` is called within the Tokio context, so functions like |
| 848 | /// [`tokio::spawn`](crate::spawn) can be called. |
| 849 | /// |
| 850 | /// This can be used for bookkeeping or monitoring purposes. |
| 851 | /// |
| 852 | /// Note: There can only be one task termination callback for a runtime; calling this |
| 853 | /// function more than once replaces the last callback defined, rather than adding to it. |
| 854 | /// |
| 855 | /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time. |
| 856 | /// |
| 857 | /// **Note**: This is an [unstable API][unstable]. The public API of this type |
| 858 | /// may break in 1.x releases. See [the documentation on unstable |
| 859 | /// features][unstable] for details. |
| 860 | /// |
| 861 | /// [unstable]: crate#unstable-features |
| 862 | /// |
| 863 | /// # Examples |
| 864 | /// |
| 865 | /// ``` |
| 866 | /// # use tokio::runtime; |
| 867 | /// # pub fn main() { |
| 868 | /// let runtime = runtime::Builder::new_current_thread() |
| 869 | /// .on_task_terminate(|_| { |
| 870 | /// println!("killing task"); |
| 871 | /// }) |
| 872 | /// .build() |
| 873 | /// .unwrap(); |
| 874 | /// |
| 875 | /// runtime.block_on(async { |
| 876 | /// tokio::task::spawn(std::future::ready(())); |
| 877 | /// |
| 878 | /// for _ in 0..64 { |
| 879 | /// tokio::task::yield_now().await; |
| 880 | /// } |
| 881 | /// }) |
| 882 | /// # } |
| 883 | /// ``` |
| 884 | #[cfg (all(not(loom), tokio_unstable))] |
| 885 | #[cfg_attr (docsrs, doc(cfg(tokio_unstable)))] |
| 886 | pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self |
| 887 | where |
| 888 | F: Fn(&TaskMeta<'_>) + Send + Sync + 'static, |
| 889 | { |
| 890 | self.after_termination = Some(std::sync::Arc::new(f)); |
| 891 | self |
| 892 | } |
| 893 | |
| 894 | /// Creates the configured `Runtime`. |
| 895 | /// |
| 896 | /// The returned `Runtime` instance is ready to spawn tasks. |
| 897 | /// |
| 898 | /// # Examples |
| 899 | /// |
| 900 | /// ``` |
| 901 | /// use tokio::runtime::Builder; |
| 902 | /// |
| 903 | /// let rt = Builder::new_multi_thread().build().unwrap(); |
| 904 | /// |
| 905 | /// rt.block_on(async { |
| 906 | /// println!("Hello from the Tokio runtime" ); |
| 907 | /// }); |
| 908 | /// ``` |
| 909 | pub fn build(&mut self) -> io::Result<Runtime> { |
| 910 | match &self.kind { |
| 911 | Kind::CurrentThread => self.build_current_thread_runtime(), |
| 912 | #[cfg (feature = "rt-multi-thread" )] |
| 913 | Kind::MultiThread => self.build_threaded_runtime(), |
| 914 | #[cfg (all(tokio_unstable, feature = "rt-multi-thread" ))] |
| 915 | Kind::MultiThreadAlt => self.build_alt_threaded_runtime(), |
| 916 | } |
| 917 | } |
| 918 | |
| 919 | /// Creates the configured `LocalRuntime`. |
| 920 | /// |
| 921 | /// The returned `LocalRuntime` instance is ready to spawn tasks. |
| 922 | /// |
| 923 | /// # Panics |
| 924 | /// This will panic if `current_thread` is not the selected runtime flavor. |
| 925 | /// All other runtime flavors are unsupported by [`LocalRuntime`]. |
| 926 | /// |
| 927 | /// [`LocalRuntime`]: [crate::runtime::LocalRuntime] |
| 928 | /// |
| 929 | /// # Examples |
| 930 | /// |
| 931 | /// ``` |
| 932 | /// use tokio::runtime::Builder; |
| 933 | /// |
| 934 | /// let rt = Builder::new_current_thread().build_local(&mut Default::default()).unwrap(); |
| 935 | /// |
| 936 | /// rt.block_on(async { |
| 937 | /// println!("Hello from the Tokio runtime"); |
| 938 | /// }); |
| 939 | /// ``` |
| 940 | #[allow (unused_variables, unreachable_patterns)] |
| 941 | #[cfg (tokio_unstable)] |
| 942 | #[cfg_attr (docsrs, doc(cfg(tokio_unstable)))] |
| 943 | pub fn build_local(&mut self, options: &LocalOptions) -> io::Result<LocalRuntime> { |
| 944 | match &self.kind { |
| 945 | Kind::CurrentThread => self.build_current_thread_local_runtime(), |
| 946 | _ => panic!("Only current_thread is supported when building a local runtime" ), |
| 947 | } |
| 948 | } |
| 949 | |
| 950 | fn get_cfg(&self, workers: usize) -> driver::Cfg { |
| 951 | driver::Cfg { |
| 952 | enable_pause_time: match self.kind { |
| 953 | Kind::CurrentThread => true, |
| 954 | #[cfg (feature = "rt-multi-thread" )] |
| 955 | Kind::MultiThread => false, |
| 956 | #[cfg (all(tokio_unstable, feature = "rt-multi-thread" ))] |
| 957 | Kind::MultiThreadAlt => false, |
| 958 | }, |
| 959 | enable_io: self.enable_io, |
| 960 | enable_time: self.enable_time, |
| 961 | start_paused: self.start_paused, |
| 962 | nevents: self.nevents, |
| 963 | workers, |
| 964 | } |
| 965 | } |
| 966 | |
| 967 | /// Sets a custom timeout for a thread in the blocking pool. |
| 968 | /// |
| 969 | /// By default, the timeout for a thread is set to 10 seconds. This can |
| 970 | /// be overridden using `.thread_keep_alive()`. |
| 971 | /// |
| 972 | /// # Example |
| 973 | /// |
| 974 | /// ``` |
| 975 | /// # use tokio::runtime; |
| 976 | /// # use std::time::Duration; |
| 977 | /// # pub fn main() { |
| 978 | /// let rt = runtime::Builder::new_multi_thread() |
| 979 | /// .thread_keep_alive(Duration::from_millis(100)) |
| 980 | /// .build(); |
| 981 | /// # } |
| 982 | /// ``` |
| 983 | pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self { |
| 984 | self.keep_alive = Some(duration); |
| 985 | self |
| 986 | } |
| 987 | |
| 988 | /// Sets the number of scheduler ticks after which the scheduler will poll the global |
| 989 | /// task queue. |
| 990 | /// |
| 991 | /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task. |
| 992 | /// |
| 993 | /// By default the global queue interval is 31 for the current-thread scheduler. Please see |
| 994 | /// [the module documentation] for the default behavior of the multi-thread scheduler. |
| 995 | /// |
| 996 | /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming |
| 997 | /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler, |
| 998 | /// at the cost of more synchronization overhead. That can be beneficial for prioritizing |
| 999 | /// getting started on new work, especially if tasks frequently yield rather than complete |
| 1000 | /// or await on further I/O. Conversely, a higher value prioritizes existing work, and |
| 1001 | /// is a good choice when most tasks quickly complete polling. |
| 1002 | /// |
| 1003 | /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing |
| 1004 | /// |
| 1005 | /// # Panics |
| 1006 | /// |
| 1007 | /// This function will panic if 0 is passed as an argument. |
| 1008 | /// |
| 1009 | /// # Examples |
| 1010 | /// |
| 1011 | /// ``` |
| 1012 | /// # use tokio::runtime; |
| 1013 | /// # pub fn main() { |
| 1014 | /// let rt = runtime::Builder::new_multi_thread() |
| 1015 | /// .global_queue_interval(31) |
| 1016 | /// .build(); |
| 1017 | /// # } |
| 1018 | /// ``` |
| 1019 | #[track_caller ] |
| 1020 | pub fn global_queue_interval(&mut self, val: u32) -> &mut Self { |
| 1021 | assert!(val > 0, "global_queue_interval must be greater than 0" ); |
| 1022 | self.global_queue_interval = Some(val); |
| 1023 | self |
| 1024 | } |
| 1025 | |
| 1026 | /// Sets the number of scheduler ticks after which the scheduler will poll for |
| 1027 | /// external events (timers, I/O, and so on). |
| 1028 | /// |
| 1029 | /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task. |
| 1030 | /// |
| 1031 | /// By default, the event interval is `61` for all scheduler types. |
| 1032 | /// |
| 1033 | /// Setting the event interval determines the effective "priority" of delivering |
| 1034 | /// these external events (which may wake up additional tasks), compared to |
| 1035 | /// executing tasks that are currently ready to run. A smaller value is useful |
| 1036 | /// when tasks frequently spend a long time in polling, or frequently yield, |
| 1037 | /// which can result in overly long delays picking up I/O events. Conversely, |
| 1038 | /// picking up new events requires extra synchronization and syscall overhead, |
| 1039 | /// so if tasks generally complete their polling quickly, a higher event interval |
| 1040 | /// will minimize that overhead while still keeping the scheduler responsive to |
| 1041 | /// events. |
| 1042 | /// |
| 1043 | /// # Examples |
| 1044 | /// |
| 1045 | /// ``` |
| 1046 | /// # use tokio::runtime; |
| 1047 | /// # pub fn main() { |
| 1048 | /// let rt = runtime::Builder::new_multi_thread() |
| 1049 | /// .event_interval(31) |
| 1050 | /// .build(); |
| 1051 | /// # } |
| 1052 | /// ``` |
| 1053 | pub fn event_interval(&mut self, val: u32) -> &mut Self { |
| 1054 | self.event_interval = val; |
| 1055 | self |
| 1056 | } |
| 1057 | |
| 1058 | cfg_unstable! { |
| 1059 | /// Configure how the runtime responds to an unhandled panic on a |
| 1060 | /// spawned task. |
| 1061 | /// |
| 1062 | /// By default, an unhandled panic (i.e. a panic not caught by |
| 1063 | /// [`std::panic::catch_unwind`]) has no impact on the runtime's |
| 1064 | /// execution. The panic's error value is forwarded to the task's |
| 1065 | /// [`JoinHandle`] and all other spawned tasks continue running. |
| 1066 | /// |
| 1067 | /// The `unhandled_panic` option enables configuring this behavior. |
| 1068 | /// |
| 1069 | /// * `UnhandledPanic::Ignore` is the default behavior. Panics on |
| 1070 | /// spawned tasks have no impact on the runtime's execution. |
| 1071 | /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to |
| 1072 | /// shutdown immediately when a spawned task panics even if that |
| 1073 | /// task's `JoinHandle` has not been dropped. All other spawned tasks |
| 1074 | /// will immediately terminate and further calls to |
| 1075 | /// [`Runtime::block_on`] will panic. |
| 1076 | /// |
| 1077 | /// # Panics |
| 1078 | /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`] |
| 1079 | /// on a runtime other than the current thread runtime. |
| 1080 | /// |
| 1081 | /// # Unstable |
| 1082 | /// |
| 1083 | /// This option is currently unstable and its implementation is |
| 1084 | /// incomplete. The API may change or be removed in the future. See |
| 1085 | /// issue [tokio-rs/tokio#4516] for more details. |
| 1086 | /// |
| 1087 | /// # Examples |
| 1088 | /// |
| 1089 | /// The following demonstrates a runtime configured to shutdown on |
| 1090 | /// panic. The first spawned task panics and results in the runtime |
| 1091 | /// shutting down. The second spawned task never has a chance to |
| 1092 | /// execute. The call to `block_on` will panic due to the runtime being |
| 1093 | /// forcibly shutdown. |
| 1094 | /// |
| 1095 | /// ```should_panic |
| 1096 | /// use tokio::runtime::{self, UnhandledPanic}; |
| 1097 | /// |
| 1098 | /// # pub fn main() { |
| 1099 | /// let rt = runtime::Builder::new_current_thread() |
| 1100 | /// .unhandled_panic(UnhandledPanic::ShutdownRuntime) |
| 1101 | /// .build() |
| 1102 | /// .unwrap(); |
| 1103 | /// |
| 1104 | /// rt.spawn(async { panic!("boom"); }); |
| 1105 | /// rt.spawn(async { |
| 1106 | /// // This task never completes. |
| 1107 | /// }); |
| 1108 | /// |
| 1109 | /// rt.block_on(async { |
| 1110 | /// // Do some work |
| 1111 | /// # loop { tokio::task::yield_now().await; } |
| 1112 | /// }) |
| 1113 | /// # } |
| 1114 | /// ``` |
| 1115 | /// |
| 1116 | /// [`JoinHandle`]: struct@crate::task::JoinHandle |
| 1117 | /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516 |
| 1118 | pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self { |
| 1119 | if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) { |
| 1120 | panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime" ); |
| 1121 | } |
| 1122 | |
| 1123 | self.unhandled_panic = behavior; |
| 1124 | self |
| 1125 | } |
| 1126 | |
| 1127 | /// Disables the LIFO task scheduler heuristic. |
| 1128 | /// |
| 1129 | /// The multi-threaded scheduler includes a heuristic for optimizing |
| 1130 | /// message-passing patterns. This heuristic results in the **last** |
| 1131 | /// scheduled task being polled first. |
| 1132 | /// |
| 1133 | /// To implement this heuristic, each worker thread has a slot which |
| 1134 | /// holds the task that should be polled next. However, this slot cannot |
| 1135 | /// be stolen by other worker threads, which can result in lower total |
| 1136 | /// throughput when tasks tend to have longer poll times. |
| 1137 | /// |
| 1138 | /// This configuration option will disable this heuristic resulting in |
| 1139 | /// all scheduled tasks being pushed into the worker-local queue, which |
| 1140 | /// is stealable. |
| 1141 | /// |
| 1142 | /// Consider trying this option when the task "scheduled" time is high |
| 1143 | /// but the runtime is underutilized. Use [tokio-rs/tokio-metrics] to |
| 1144 | /// collect this data. |
| 1145 | /// |
| 1146 | /// # Unstable |
| 1147 | /// |
| 1148 | /// This configuration option is considered a workaround for the LIFO |
| 1149 | /// slot not being stealable. When the slot becomes stealable, we will |
| 1150 | /// revisit whether or not this option is necessary. See |
| 1151 | /// issue [tokio-rs/tokio#4941]. |
| 1152 | /// |
| 1153 | /// # Examples |
| 1154 | /// |
| 1155 | /// ``` |
| 1156 | /// use tokio::runtime; |
| 1157 | /// |
| 1158 | /// let rt = runtime::Builder::new_multi_thread() |
| 1159 | /// .disable_lifo_slot() |
| 1160 | /// .build() |
| 1161 | /// .unwrap(); |
| 1162 | /// ``` |
| 1163 | /// |
| 1164 | /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics |
| 1165 | /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941 |
| 1166 | pub fn disable_lifo_slot(&mut self) -> &mut Self { |
| 1167 | self.disable_lifo_slot = true; |
| 1168 | self |
| 1169 | } |
| 1170 | |
| 1171 | /// Specifies the random number generation seed to use within all |
| 1172 | /// threads associated with the runtime being built. |
| 1173 | /// |
| 1174 | /// This option is intended to make certain parts of the runtime |
| 1175 | /// deterministic (e.g. the [`tokio::select!`] macro). In the case of |
| 1176 | /// [`tokio::select!`] it will ensure that the order that branches are |
| 1177 | /// polled is deterministic. |
| 1178 | /// |
| 1179 | /// In addition to the code specifying `rng_seed` and interacting with |
| 1180 | /// the runtime, the internals of Tokio and the Rust compiler may affect |
| 1181 | /// the sequences of random numbers. In order to ensure repeatable |
| 1182 | /// results, the version of Tokio, the versions of all other |
| 1183 | /// dependencies that interact with Tokio, and the Rust compiler version |
| 1184 | /// should also all remain constant. |
| 1185 | /// |
| 1186 | /// # Examples |
| 1187 | /// |
| 1188 | /// ``` |
| 1189 | /// # use tokio::runtime::{self, RngSeed}; |
| 1190 | /// # pub fn main() { |
| 1191 | /// let seed = RngSeed::from_bytes(b"place your seed here"); |
| 1192 | /// let rt = runtime::Builder::new_current_thread() |
| 1193 | /// .rng_seed(seed) |
| 1194 | /// .build(); |
| 1195 | /// # } |
| 1196 | /// ``` |
| 1197 | /// |
| 1198 | /// [`tokio::select!`]: crate::select |
| 1199 | pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self { |
| 1200 | self.seed_generator = RngSeedGenerator::new(seed); |
| 1201 | self |
| 1202 | } |
| 1203 | } |
| 1204 | |
| 1205 | cfg_unstable_metrics! { |
| 1206 | /// Enables tracking the distribution of task poll times. |
| 1207 | /// |
| 1208 | /// Task poll times are not instrumented by default as doing so requires |
| 1209 | /// calling [`Instant::now()`] twice per task poll, which could add |
| 1210 | /// measurable overhead. Use the [`Handle::metrics()`] to access the |
| 1211 | /// metrics data. |
| 1212 | /// |
| 1213 | /// The histogram uses fixed bucket sizes. In other words, the histogram |
| 1214 | /// buckets are not dynamic based on input values. Use the |
| 1215 | /// `metrics_poll_time_histogram` builder methods to configure the |
| 1216 | /// histogram details. |
| 1217 | /// |
| 1218 | /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used. |
| 1219 | /// This has an extremely low memory footprint, but may not provide enough granularity. For |
| 1220 | /// better granularity with low memory usage, use [`metrics_poll_time_histogram_configuration()`] |
| 1221 | /// to select [`LogHistogram`] instead. |
| 1222 | /// |
| 1223 | /// # Examples |
| 1224 | /// |
| 1225 | /// ``` |
| 1226 | /// use tokio::runtime; |
| 1227 | /// |
| 1228 | /// let rt = runtime::Builder::new_multi_thread() |
| 1229 | /// .enable_metrics_poll_time_histogram() |
| 1230 | /// .build() |
| 1231 | /// .unwrap(); |
| 1232 | /// # // Test default values here |
| 1233 | /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) } |
| 1234 | /// # let m = rt.handle().metrics(); |
| 1235 | /// # assert_eq!(m.poll_time_histogram_num_buckets(), 10); |
| 1236 | /// # assert_eq!(m.poll_time_histogram_bucket_range(0), us(0)..us(100)); |
| 1237 | /// # assert_eq!(m.poll_time_histogram_bucket_range(1), us(100)..us(200)); |
| 1238 | /// ``` |
| 1239 | /// |
| 1240 | /// [`Handle::metrics()`]: crate::runtime::Handle::metrics |
| 1241 | /// [`Instant::now()`]: std::time::Instant::now |
| 1242 | /// [`LogHistogram`]: crate::runtime::LogHistogram |
| 1243 | /// [`metrics_poll_time_histogram_configuration()`]: Builder::metrics_poll_time_histogram_configuration |
| 1244 | pub fn enable_metrics_poll_time_histogram(&mut self) -> &mut Self { |
| 1245 | self.metrics_poll_count_histogram_enable = true; |
| 1246 | self |
| 1247 | } |
| 1248 | |
| 1249 | /// Deprecated. Use [`enable_metrics_poll_time_histogram()`] instead. |
| 1250 | /// |
| 1251 | /// [`enable_metrics_poll_time_histogram()`]: Builder::enable_metrics_poll_time_histogram |
| 1252 | #[deprecated (note = "`poll_count_histogram` related methods have been renamed `poll_time_histogram` to better reflect their functionality." )] |
| 1253 | #[doc (hidden)] |
| 1254 | pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self { |
| 1255 | self.enable_metrics_poll_time_histogram() |
| 1256 | } |
| 1257 | |
| 1258 | /// Sets the histogram scale for tracking the distribution of task poll |
| 1259 | /// times. |
| 1260 | /// |
| 1261 | /// Tracking the distribution of task poll times can be done using a |
| 1262 | /// linear or log scale. When using linear scale, each histogram bucket |
| 1263 | /// will represent the same range of poll times. When using log scale, |
| 1264 | /// each histogram bucket will cover a range twice as big as the |
| 1265 | /// previous bucket. |
| 1266 | /// |
| 1267 | /// **Default:** linear scale. |
| 1268 | /// |
| 1269 | /// # Examples |
| 1270 | /// |
| 1271 | /// ``` |
| 1272 | /// use tokio::runtime::{self, HistogramScale}; |
| 1273 | /// |
| 1274 | /// # #[allow(deprecated)] |
| 1275 | /// let rt = runtime::Builder::new_multi_thread() |
| 1276 | /// .enable_metrics_poll_time_histogram() |
| 1277 | /// .metrics_poll_count_histogram_scale(HistogramScale::Log) |
| 1278 | /// .build() |
| 1279 | /// .unwrap(); |
| 1280 | /// ``` |
| 1281 | #[deprecated (note = "use `metrics_poll_time_histogram_configuration`" )] |
| 1282 | pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self { |
| 1283 | self.metrics_poll_count_histogram.legacy_mut(|b|b.scale = histogram_scale); |
| 1284 | self |
| 1285 | } |
| 1286 | |
| 1287 | /// Configure the histogram for tracking poll times |
| 1288 | /// |
| 1289 | /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used. |
| 1290 | /// This has an extremely low memory footprint, but may not provide enough granularity. For |
| 1291 | /// better granularity with low memory usage, use [`LogHistogram`] instead. |
| 1292 | /// |
| 1293 | /// # Examples |
| 1294 | /// Configure a [`LogHistogram`] with [default configuration]: |
| 1295 | /// ``` |
| 1296 | /// use tokio::runtime; |
| 1297 | /// use tokio::runtime::{HistogramConfiguration, LogHistogram}; |
| 1298 | /// |
| 1299 | /// let rt = runtime::Builder::new_multi_thread() |
| 1300 | /// .enable_metrics_poll_time_histogram() |
| 1301 | /// .metrics_poll_time_histogram_configuration( |
| 1302 | /// HistogramConfiguration::log(LogHistogram::default()) |
| 1303 | /// ) |
| 1304 | /// .build() |
| 1305 | /// .unwrap(); |
| 1306 | /// ``` |
| 1307 | /// |
| 1308 | /// Configure a linear histogram with 100 buckets, each 10μs wide |
| 1309 | /// ``` |
| 1310 | /// use tokio::runtime; |
| 1311 | /// use std::time::Duration; |
| 1312 | /// use tokio::runtime::HistogramConfiguration; |
| 1313 | /// |
| 1314 | /// let rt = runtime::Builder::new_multi_thread() |
| 1315 | /// .enable_metrics_poll_time_histogram() |
| 1316 | /// .metrics_poll_time_histogram_configuration( |
| 1317 | /// HistogramConfiguration::linear(Duration::from_micros(10), 100) |
| 1318 | /// ) |
| 1319 | /// .build() |
| 1320 | /// .unwrap(); |
| 1321 | /// ``` |
| 1322 | /// |
| 1323 | /// Configure a [`LogHistogram`] with the following settings: |
| 1324 | /// - Measure times from 100ns to 120s |
| 1325 | /// - Max error of 0.1 |
| 1326 | /// - No more than 1024 buckets |
| 1327 | /// ``` |
| 1328 | /// use std::time::Duration; |
| 1329 | /// use tokio::runtime; |
| 1330 | /// use tokio::runtime::{HistogramConfiguration, LogHistogram}; |
| 1331 | /// |
| 1332 | /// let rt = runtime::Builder::new_multi_thread() |
| 1333 | /// .enable_metrics_poll_time_histogram() |
| 1334 | /// .metrics_poll_time_histogram_configuration( |
| 1335 | /// HistogramConfiguration::log(LogHistogram::builder() |
| 1336 | /// .max_value(Duration::from_secs(120)) |
| 1337 | /// .min_value(Duration::from_nanos(100)) |
| 1338 | /// .max_error(0.1) |
| 1339 | /// .max_buckets(1024) |
| 1340 | /// .expect("configuration uses 488 buckets") |
| 1341 | /// ) |
| 1342 | /// ) |
| 1343 | /// .build() |
| 1344 | /// .unwrap(); |
| 1345 | /// ``` |
| 1346 | /// |
| 1347 | /// When migrating from the legacy histogram ([`HistogramScale::Log`]) and wanting |
| 1348 | /// to match the previous behavior, use `precision_exact(0)`. This creates a histogram |
| 1349 | /// where each bucket is twice the size of the previous bucket. |
| 1350 | /// ```rust |
| 1351 | /// use std::time::Duration; |
| 1352 | /// use tokio::runtime::{HistogramConfiguration, LogHistogram}; |
| 1353 | /// let rt = tokio::runtime::Builder::new_current_thread() |
| 1354 | /// .enable_all() |
| 1355 | /// .enable_metrics_poll_time_histogram() |
| 1356 | /// .metrics_poll_time_histogram_configuration(HistogramConfiguration::log( |
| 1357 | /// LogHistogram::builder() |
| 1358 | /// .min_value(Duration::from_micros(20)) |
| 1359 | /// .max_value(Duration::from_millis(4)) |
| 1360 | /// // Set `precision_exact` to `0` to match `HistogramScale::Log` |
| 1361 | /// .precision_exact(0) |
| 1362 | /// .max_buckets(10) |
| 1363 | /// .unwrap(), |
| 1364 | /// )) |
| 1365 | /// .build() |
| 1366 | /// .unwrap(); |
| 1367 | /// ``` |
| 1368 | /// |
| 1369 | /// [`LogHistogram`]: crate::runtime::LogHistogram |
| 1370 | /// [default configuration]: crate::runtime::LogHistogramBuilder |
| 1371 | /// [`HistogramScale::Log`]: crate::runtime::HistogramScale::Log |
| 1372 | pub fn metrics_poll_time_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self { |
| 1373 | self.metrics_poll_count_histogram.histogram_type = configuration.inner; |
| 1374 | self |
| 1375 | } |
| 1376 | |
| 1377 | /// Sets the histogram resolution for tracking the distribution of task |
| 1378 | /// poll times. |
| 1379 | /// |
| 1380 | /// The resolution is the histogram's first bucket's range. When using a |
| 1381 | /// linear histogram scale, each bucket will cover the same range. When |
| 1382 | /// using a log scale, each bucket will cover a range twice as big as |
| 1383 | /// the previous bucket. In the log case, the resolution represents the |
| 1384 | /// smallest bucket range. |
| 1385 | /// |
| 1386 | /// Note that, when using log scale, the resolution is rounded up to the |
| 1387 | /// nearest power of 2 in nanoseconds. |
| 1388 | /// |
| 1389 | /// **Default:** 100 microseconds. |
| 1390 | /// |
| 1391 | /// # Examples |
| 1392 | /// |
| 1393 | /// ``` |
| 1394 | /// use tokio::runtime; |
| 1395 | /// use std::time::Duration; |
| 1396 | /// |
| 1397 | /// # #[allow(deprecated)] |
| 1398 | /// let rt = runtime::Builder::new_multi_thread() |
| 1399 | /// .enable_metrics_poll_time_histogram() |
| 1400 | /// .metrics_poll_count_histogram_resolution(Duration::from_micros(100)) |
| 1401 | /// .build() |
| 1402 | /// .unwrap(); |
| 1403 | /// ``` |
| 1404 | #[deprecated (note = "use `metrics_poll_time_histogram_configuration`" )] |
| 1405 | pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self { |
| 1406 | assert!(resolution > Duration::from_secs(0)); |
| 1407 | // Sanity check the argument and also make the cast below safe. |
| 1408 | assert!(resolution <= Duration::from_secs(1)); |
| 1409 | |
| 1410 | let resolution = resolution.as_nanos() as u64; |
| 1411 | |
| 1412 | self.metrics_poll_count_histogram.legacy_mut(|b|b.resolution = resolution); |
| 1413 | self |
| 1414 | } |
| 1415 | |
| 1416 | /// Sets the number of buckets for the histogram tracking the |
| 1417 | /// distribution of task poll times. |
| 1418 | /// |
| 1419 | /// The last bucket tracks all greater values that fall out of other |
| 1420 | /// ranges. So, configuring the histogram using a linear scale, |
| 1421 | /// resolution of 50ms, and 10 buckets, the 10th bucket will track task |
| 1422 | /// polls that take more than 450ms to complete. |
| 1423 | /// |
| 1424 | /// **Default:** 10 |
| 1425 | /// |
| 1426 | /// # Examples |
| 1427 | /// |
| 1428 | /// ``` |
| 1429 | /// use tokio::runtime; |
| 1430 | /// |
| 1431 | /// # #[allow(deprecated)] |
| 1432 | /// let rt = runtime::Builder::new_multi_thread() |
| 1433 | /// .enable_metrics_poll_time_histogram() |
| 1434 | /// .metrics_poll_count_histogram_buckets(15) |
| 1435 | /// .build() |
| 1436 | /// .unwrap(); |
| 1437 | /// ``` |
| 1438 | #[deprecated (note = "use `metrics_poll_time_histogram_configuration`" )] |
| 1439 | pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self { |
| 1440 | self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets); |
| 1441 | self |
| 1442 | } |
| 1443 | } |
| 1444 | |
| 1445 | cfg_loom! { |
| 1446 | pub(crate) fn local_queue_capacity(&mut self, value: usize) -> &mut Self { |
| 1447 | assert!(value.is_power_of_two()); |
| 1448 | self.local_queue_capacity = value; |
| 1449 | self |
| 1450 | } |
| 1451 | } |
| 1452 | |
| 1453 | fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> { |
| 1454 | use crate::runtime::runtime::Scheduler; |
| 1455 | |
| 1456 | let (scheduler, handle, blocking_pool) = |
| 1457 | self.build_current_thread_runtime_components(None)?; |
| 1458 | |
| 1459 | Ok(Runtime::from_parts( |
| 1460 | Scheduler::CurrentThread(scheduler), |
| 1461 | handle, |
| 1462 | blocking_pool, |
| 1463 | )) |
| 1464 | } |
| 1465 | |
| 1466 | #[cfg (tokio_unstable)] |
| 1467 | fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> { |
| 1468 | use crate::runtime::local_runtime::LocalRuntimeScheduler; |
| 1469 | |
| 1470 | let tid = std::thread::current().id(); |
| 1471 | |
| 1472 | let (scheduler, handle, blocking_pool) = |
| 1473 | self.build_current_thread_runtime_components(Some(tid))?; |
| 1474 | |
| 1475 | Ok(LocalRuntime::from_parts( |
| 1476 | LocalRuntimeScheduler::CurrentThread(scheduler), |
| 1477 | handle, |
| 1478 | blocking_pool, |
| 1479 | )) |
| 1480 | } |
| 1481 | |
| 1482 | fn build_current_thread_runtime_components( |
| 1483 | &mut self, |
| 1484 | local_tid: Option<ThreadId>, |
| 1485 | ) -> io::Result<(CurrentThread, Handle, BlockingPool)> { |
| 1486 | use crate::runtime::scheduler; |
| 1487 | use crate::runtime::Config; |
| 1488 | |
| 1489 | let (driver, driver_handle) = driver::Driver::new(self.get_cfg(1))?; |
| 1490 | |
| 1491 | // Blocking pool |
| 1492 | let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads); |
| 1493 | let blocking_spawner = blocking_pool.spawner().clone(); |
| 1494 | |
| 1495 | // Generate a rng seed for this runtime. |
| 1496 | let seed_generator_1 = self.seed_generator.next_generator(); |
| 1497 | let seed_generator_2 = self.seed_generator.next_generator(); |
| 1498 | |
| 1499 | // And now put a single-threaded scheduler on top of the timer. When |
| 1500 | // there are no futures ready to do something, it'll let the timer or |
| 1501 | // the reactor to generate some new stimuli for the futures to continue |
| 1502 | // in their life. |
| 1503 | let (scheduler, handle) = CurrentThread::new( |
| 1504 | driver, |
| 1505 | driver_handle, |
| 1506 | blocking_spawner, |
| 1507 | seed_generator_2, |
| 1508 | Config { |
| 1509 | before_park: self.before_park.clone(), |
| 1510 | after_unpark: self.after_unpark.clone(), |
| 1511 | before_spawn: self.before_spawn.clone(), |
| 1512 | #[cfg (tokio_unstable)] |
| 1513 | before_poll: self.before_poll.clone(), |
| 1514 | #[cfg (tokio_unstable)] |
| 1515 | after_poll: self.after_poll.clone(), |
| 1516 | after_termination: self.after_termination.clone(), |
| 1517 | global_queue_interval: self.global_queue_interval, |
| 1518 | event_interval: self.event_interval, |
| 1519 | local_queue_capacity: self.local_queue_capacity, |
| 1520 | #[cfg (tokio_unstable)] |
| 1521 | unhandled_panic: self.unhandled_panic.clone(), |
| 1522 | disable_lifo_slot: self.disable_lifo_slot, |
| 1523 | seed_generator: seed_generator_1, |
| 1524 | metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(), |
| 1525 | }, |
| 1526 | local_tid, |
| 1527 | ); |
| 1528 | |
| 1529 | let handle = Handle { |
| 1530 | inner: scheduler::Handle::CurrentThread(handle), |
| 1531 | }; |
| 1532 | |
| 1533 | Ok((scheduler, handle, blocking_pool)) |
| 1534 | } |
| 1535 | |
| 1536 | fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> { |
| 1537 | if self.metrics_poll_count_histogram_enable { |
| 1538 | Some(self.metrics_poll_count_histogram.clone()) |
| 1539 | } else { |
| 1540 | None |
| 1541 | } |
| 1542 | } |
| 1543 | } |
| 1544 | |
| 1545 | cfg_io_driver! { |
| 1546 | impl Builder { |
| 1547 | /// Enables the I/O driver. |
| 1548 | /// |
| 1549 | /// Doing this enables using net, process, signal, and some I/O types on |
| 1550 | /// the runtime. |
| 1551 | /// |
| 1552 | /// # Examples |
| 1553 | /// |
| 1554 | /// ``` |
| 1555 | /// use tokio::runtime; |
| 1556 | /// |
| 1557 | /// let rt = runtime::Builder::new_multi_thread() |
| 1558 | /// .enable_io() |
| 1559 | /// .build() |
| 1560 | /// .unwrap(); |
| 1561 | /// ``` |
| 1562 | pub fn enable_io(&mut self) -> &mut Self { |
| 1563 | self.enable_io = true; |
| 1564 | self |
| 1565 | } |
| 1566 | |
| 1567 | /// Enables the I/O driver and configures the max number of events to be |
| 1568 | /// processed per tick. |
| 1569 | /// |
| 1570 | /// # Examples |
| 1571 | /// |
| 1572 | /// ``` |
| 1573 | /// use tokio::runtime; |
| 1574 | /// |
| 1575 | /// let rt = runtime::Builder::new_current_thread() |
| 1576 | /// .enable_io() |
| 1577 | /// .max_io_events_per_tick(1024) |
| 1578 | /// .build() |
| 1579 | /// .unwrap(); |
| 1580 | /// ``` |
| 1581 | pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self { |
| 1582 | self.nevents = capacity; |
| 1583 | self |
| 1584 | } |
| 1585 | } |
| 1586 | } |
| 1587 | |
| 1588 | cfg_time! { |
| 1589 | impl Builder { |
| 1590 | /// Enables the time driver. |
| 1591 | /// |
| 1592 | /// Doing this enables using `tokio::time` on the runtime. |
| 1593 | /// |
| 1594 | /// # Examples |
| 1595 | /// |
| 1596 | /// ``` |
| 1597 | /// use tokio::runtime; |
| 1598 | /// |
| 1599 | /// let rt = runtime::Builder::new_multi_thread() |
| 1600 | /// .enable_time() |
| 1601 | /// .build() |
| 1602 | /// .unwrap(); |
| 1603 | /// ``` |
| 1604 | pub fn enable_time(&mut self) -> &mut Self { |
| 1605 | self.enable_time = true; |
| 1606 | self |
| 1607 | } |
| 1608 | } |
| 1609 | } |
| 1610 | |
| 1611 | cfg_test_util! { |
| 1612 | impl Builder { |
| 1613 | /// Controls if the runtime's clock starts paused or advancing. |
| 1614 | /// |
| 1615 | /// Pausing time requires the current-thread runtime; construction of |
| 1616 | /// the runtime will panic otherwise. |
| 1617 | /// |
| 1618 | /// # Examples |
| 1619 | /// |
| 1620 | /// ``` |
| 1621 | /// use tokio::runtime; |
| 1622 | /// |
| 1623 | /// let rt = runtime::Builder::new_current_thread() |
| 1624 | /// .enable_time() |
| 1625 | /// .start_paused(true) |
| 1626 | /// .build() |
| 1627 | /// .unwrap(); |
| 1628 | /// ``` |
| 1629 | pub fn start_paused(&mut self, start_paused: bool) -> &mut Self { |
| 1630 | self.start_paused = start_paused; |
| 1631 | self |
| 1632 | } |
| 1633 | } |
| 1634 | } |
| 1635 | |
| 1636 | cfg_rt_multi_thread! { |
| 1637 | impl Builder { |
| 1638 | fn build_threaded_runtime(&mut self) -> io::Result<Runtime> { |
| 1639 | use crate::loom::sys::num_cpus; |
| 1640 | use crate::runtime::{Config, runtime::Scheduler}; |
| 1641 | use crate::runtime::scheduler::{self, MultiThread}; |
| 1642 | |
| 1643 | let worker_threads = self.worker_threads.unwrap_or_else(num_cpus); |
| 1644 | |
| 1645 | let (driver, driver_handle) = driver::Driver::new(self.get_cfg(worker_threads))?; |
| 1646 | |
| 1647 | // Create the blocking pool |
| 1648 | let blocking_pool = |
| 1649 | blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads); |
| 1650 | let blocking_spawner = blocking_pool.spawner().clone(); |
| 1651 | |
| 1652 | // Generate a rng seed for this runtime. |
| 1653 | let seed_generator_1 = self.seed_generator.next_generator(); |
| 1654 | let seed_generator_2 = self.seed_generator.next_generator(); |
| 1655 | |
| 1656 | let (scheduler, handle, launch) = MultiThread::new( |
| 1657 | worker_threads, |
| 1658 | driver, |
| 1659 | driver_handle, |
| 1660 | blocking_spawner, |
| 1661 | seed_generator_2, |
| 1662 | Config { |
| 1663 | before_park: self.before_park.clone(), |
| 1664 | after_unpark: self.after_unpark.clone(), |
| 1665 | before_spawn: self.before_spawn.clone(), |
| 1666 | #[cfg (tokio_unstable)] |
| 1667 | before_poll: self.before_poll.clone(), |
| 1668 | #[cfg (tokio_unstable)] |
| 1669 | after_poll: self.after_poll.clone(), |
| 1670 | after_termination: self.after_termination.clone(), |
| 1671 | global_queue_interval: self.global_queue_interval, |
| 1672 | event_interval: self.event_interval, |
| 1673 | local_queue_capacity: self.local_queue_capacity, |
| 1674 | #[cfg (tokio_unstable)] |
| 1675 | unhandled_panic: self.unhandled_panic.clone(), |
| 1676 | disable_lifo_slot: self.disable_lifo_slot, |
| 1677 | seed_generator: seed_generator_1, |
| 1678 | metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(), |
| 1679 | }, |
| 1680 | ); |
| 1681 | |
| 1682 | let handle = Handle { inner: scheduler::Handle::MultiThread(handle) }; |
| 1683 | |
| 1684 | // Spawn the thread pool workers |
| 1685 | let _enter = handle.enter(); |
| 1686 | launch.launch(); |
| 1687 | |
| 1688 | Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool)) |
| 1689 | } |
| 1690 | |
| 1691 | cfg_unstable! { |
| 1692 | fn build_alt_threaded_runtime(&mut self) -> io::Result<Runtime> { |
| 1693 | use crate::loom::sys::num_cpus; |
| 1694 | use crate::runtime::{Config, runtime::Scheduler}; |
| 1695 | use crate::runtime::scheduler::MultiThreadAlt; |
| 1696 | |
| 1697 | let worker_threads = self.worker_threads.unwrap_or_else(num_cpus); |
| 1698 | let (driver, driver_handle) = driver::Driver::new(self.get_cfg(worker_threads))?; |
| 1699 | |
| 1700 | // Create the blocking pool |
| 1701 | let blocking_pool = |
| 1702 | blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads); |
| 1703 | let blocking_spawner = blocking_pool.spawner().clone(); |
| 1704 | |
| 1705 | // Generate a rng seed for this runtime. |
| 1706 | let seed_generator_1 = self.seed_generator.next_generator(); |
| 1707 | let seed_generator_2 = self.seed_generator.next_generator(); |
| 1708 | |
| 1709 | let (scheduler, handle) = MultiThreadAlt::new( |
| 1710 | worker_threads, |
| 1711 | driver, |
| 1712 | driver_handle, |
| 1713 | blocking_spawner, |
| 1714 | seed_generator_2, |
| 1715 | Config { |
| 1716 | before_park: self.before_park.clone(), |
| 1717 | after_unpark: self.after_unpark.clone(), |
| 1718 | before_spawn: self.before_spawn.clone(), |
| 1719 | after_termination: self.after_termination.clone(), |
| 1720 | #[cfg (tokio_unstable)] |
| 1721 | before_poll: self.before_poll.clone(), |
| 1722 | #[cfg (tokio_unstable)] |
| 1723 | after_poll: self.after_poll.clone(), |
| 1724 | global_queue_interval: self.global_queue_interval, |
| 1725 | event_interval: self.event_interval, |
| 1726 | local_queue_capacity: self.local_queue_capacity, |
| 1727 | #[cfg (tokio_unstable)] |
| 1728 | unhandled_panic: self.unhandled_panic.clone(), |
| 1729 | disable_lifo_slot: self.disable_lifo_slot, |
| 1730 | seed_generator: seed_generator_1, |
| 1731 | metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(), |
| 1732 | }, |
| 1733 | ); |
| 1734 | |
| 1735 | Ok(Runtime::from_parts(Scheduler::MultiThreadAlt(scheduler), handle, blocking_pool)) |
| 1736 | } |
| 1737 | } |
| 1738 | } |
| 1739 | } |
| 1740 | |
| 1741 | impl fmt::Debug for Builder { |
| 1742 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 1743 | fmt&mut DebugStruct<'_, '_>.debug_struct("Builder" ) |
| 1744 | .field("worker_threads" , &self.worker_threads) |
| 1745 | .field("max_blocking_threads" , &self.max_blocking_threads) |
| 1746 | .field( |
| 1747 | "thread_name" , |
| 1748 | &"<dyn Fn() -> String + Send + Sync + 'static>" , |
| 1749 | ) |
| 1750 | .field("thread_stack_size" , &self.thread_stack_size) |
| 1751 | .field("after_start" , &self.after_start.as_ref().map(|_| "..." )) |
| 1752 | .field("before_stop" , &self.before_stop.as_ref().map(|_| "..." )) |
| 1753 | .field("before_park" , &self.before_park.as_ref().map(|_| "..." )) |
| 1754 | .field(name:"after_unpark" , &self.after_unpark.as_ref().map(|_| "..." )) |
| 1755 | .finish() |
| 1756 | } |
| 1757 | } |
| 1758 | |