1use crate::runtime::handle::Handle;
2use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime};
3use crate::util::rand::{RngSeed, RngSeedGenerator};
4
5use std::fmt;
6use std::io;
7use std::time::Duration;
8
9/// Builds Tokio Runtime with custom configuration values.
10///
11/// Methods can be chained in order to set the configuration values. The
12/// Runtime is constructed by calling [`build`].
13///
14/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
15/// or [`Builder::new_current_thread`].
16///
17/// See function level documentation for details on the various configuration
18/// settings.
19///
20/// [`build`]: method@Self::build
21/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
22/// [`Builder::new_current_thread`]: method@Self::new_current_thread
23///
24/// # Examples
25///
26/// ```
27/// use tokio::runtime::Builder;
28///
29/// fn main() {
30/// // build runtime
31/// let runtime = Builder::new_multi_thread()
32/// .worker_threads(4)
33/// .thread_name("my-custom-name")
34/// .thread_stack_size(3 * 1024 * 1024)
35/// .build()
36/// .unwrap();
37///
38/// // use runtime ...
39/// }
40/// ```
41pub struct Builder {
42 /// Runtime type
43 kind: Kind,
44
45 /// Whether or not to enable the I/O driver
46 enable_io: bool,
47 nevents: usize,
48
49 /// Whether or not to enable the time driver
50 enable_time: bool,
51
52 /// Whether or not the clock should start paused.
53 start_paused: bool,
54
55 /// The number of worker threads, used by Runtime.
56 ///
57 /// Only used when not using the current-thread executor.
58 worker_threads: Option<usize>,
59
60 /// Cap on thread usage.
61 max_blocking_threads: usize,
62
63 /// Name fn used for threads spawned by the runtime.
64 pub(super) thread_name: ThreadNameFn,
65
66 /// Stack size used for threads spawned by the runtime.
67 pub(super) thread_stack_size: Option<usize>,
68
69 /// Callback to run after each thread starts.
70 pub(super) after_start: Option<Callback>,
71
72 /// To run before each worker thread stops
73 pub(super) before_stop: Option<Callback>,
74
75 /// To run before each worker thread is parked.
76 pub(super) before_park: Option<Callback>,
77
78 /// To run after each thread is unparked.
79 pub(super) after_unpark: Option<Callback>,
80
81 /// Customizable keep alive timeout for BlockingPool
82 pub(super) keep_alive: Option<Duration>,
83
84 /// How many ticks before pulling a task from the global/remote queue?
85 ///
86 /// When `None`, the value is unspecified and behavior details are left to
87 /// the scheduler. Each scheduler flavor could choose to either pick its own
88 /// default value or use some other strategy to decide when to poll from the
89 /// global queue. For example, the multi-threaded scheduler uses a
90 /// self-tuning strategy based on mean task poll times.
91 pub(super) global_queue_interval: Option<u32>,
92
93 /// How many ticks before yielding to the driver for timer and I/O events?
94 pub(super) event_interval: u32,
95
96 pub(super) local_queue_capacity: usize,
97
98 /// When true, the multi-threade scheduler LIFO slot should not be used.
99 ///
100 /// This option should only be exposed as unstable.
101 pub(super) disable_lifo_slot: bool,
102
103 /// Specify a random number generator seed to provide deterministic results
104 pub(super) seed_generator: RngSeedGenerator,
105
106 /// When true, enables task poll count histogram instrumentation.
107 pub(super) metrics_poll_count_histogram_enable: bool,
108
109 /// Configures the task poll count histogram
110 pub(super) metrics_poll_count_histogram: HistogramBuilder,
111
112 #[cfg(tokio_unstable)]
113 pub(super) unhandled_panic: UnhandledPanic,
114}
115
116cfg_unstable! {
117 /// How the runtime should respond to unhandled panics.
118 ///
119 /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
120 /// to configure the runtime behavior when a spawned task panics.
121 ///
122 /// See [`Builder::unhandled_panic`] for more details.
123 #[derive(Debug, Clone)]
124 #[non_exhaustive]
125 pub enum UnhandledPanic {
126 /// The runtime should ignore panics on spawned tasks.
127 ///
128 /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
129 /// tasks continue running normally.
130 ///
131 /// This is the default behavior.
132 ///
133 /// # Examples
134 ///
135 /// ```
136 /// use tokio::runtime::{self, UnhandledPanic};
137 ///
138 /// # pub fn main() {
139 /// let rt = runtime::Builder::new_current_thread()
140 /// .unhandled_panic(UnhandledPanic::Ignore)
141 /// .build()
142 /// .unwrap();
143 ///
144 /// let task1 = rt.spawn(async { panic!("boom"); });
145 /// let task2 = rt.spawn(async {
146 /// // This task completes normally
147 /// "done"
148 /// });
149 ///
150 /// rt.block_on(async {
151 /// // The panic on the first task is forwarded to the `JoinHandle`
152 /// assert!(task1.await.is_err());
153 ///
154 /// // The second task completes normally
155 /// assert!(task2.await.is_ok());
156 /// })
157 /// # }
158 /// ```
159 ///
160 /// [`JoinHandle`]: struct@crate::task::JoinHandle
161 Ignore,
162
163 /// The runtime should immediately shutdown if a spawned task panics.
164 ///
165 /// The runtime will immediately shutdown even if the panicked task's
166 /// [`JoinHandle`] is still available. All further spawned tasks will be
167 /// immediately dropped and call to [`Runtime::block_on`] will panic.
168 ///
169 /// # Examples
170 ///
171 /// ```should_panic
172 /// use tokio::runtime::{self, UnhandledPanic};
173 ///
174 /// # pub fn main() {
175 /// let rt = runtime::Builder::new_current_thread()
176 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
177 /// .build()
178 /// .unwrap();
179 ///
180 /// rt.spawn(async { panic!("boom"); });
181 /// rt.spawn(async {
182 /// // This task never completes.
183 /// });
184 ///
185 /// rt.block_on(async {
186 /// // Do some work
187 /// # loop { tokio::task::yield_now().await; }
188 /// })
189 /// # }
190 /// ```
191 ///
192 /// [`JoinHandle`]: struct@crate::task::JoinHandle
193 ShutdownRuntime,
194 }
195}
196
197pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
198
199#[derive(Clone, Copy)]
200pub(crate) enum Kind {
201 CurrentThread,
202 #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
203 MultiThread,
204 #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))]
205 MultiThreadAlt,
206}
207
208impl Builder {
209 /// Returns a new builder with the current thread scheduler selected.
210 ///
211 /// Configuration methods can be chained on the return value.
212 ///
213 /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
214 /// [`LocalSet`].
215 ///
216 /// [`LocalSet`]: crate::task::LocalSet
217 pub fn new_current_thread() -> Builder {
218 #[cfg(loom)]
219 const EVENT_INTERVAL: u32 = 4;
220 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
221 #[cfg(not(loom))]
222 const EVENT_INTERVAL: u32 = 61;
223
224 Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
225 }
226
227 cfg_not_wasi! {
228 /// Returns a new builder with the multi thread scheduler selected.
229 ///
230 /// Configuration methods can be chained on the return value.
231 #[cfg(feature = "rt-multi-thread")]
232 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
233 pub fn new_multi_thread() -> Builder {
234 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
235 Builder::new(Kind::MultiThread, 61)
236 }
237
238 cfg_unstable! {
239 /// Returns a new builder with the alternate multi thread scheduler
240 /// selected.
241 ///
242 /// The alternate multi threaded scheduler is an in-progress
243 /// candidate to replace the existing multi threaded scheduler. It
244 /// currently does not scale as well to 16+ processors.
245 ///
246 /// This runtime flavor is currently **not considered production
247 /// ready**.
248 ///
249 /// Configuration methods can be chained on the return value.
250 #[cfg(feature = "rt-multi-thread")]
251 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
252 pub fn new_multi_thread_alt() -> Builder {
253 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
254 Builder::new(Kind::MultiThreadAlt, 61)
255 }
256 }
257 }
258
259 /// Returns a new runtime builder initialized with default configuration
260 /// values.
261 ///
262 /// Configuration methods can be chained on the return value.
263 pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
264 Builder {
265 kind,
266
267 // I/O defaults to "off"
268 enable_io: false,
269 nevents: 1024,
270
271 // Time defaults to "off"
272 enable_time: false,
273
274 // The clock starts not-paused
275 start_paused: false,
276
277 // Read from environment variable first in multi-threaded mode.
278 // Default to lazy auto-detection (one thread per CPU core)
279 worker_threads: None,
280
281 max_blocking_threads: 512,
282
283 // Default thread name
284 thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
285
286 // Do not set a stack size by default
287 thread_stack_size: None,
288
289 // No worker thread callbacks
290 after_start: None,
291 before_stop: None,
292 before_park: None,
293 after_unpark: None,
294
295 keep_alive: None,
296
297 // Defaults for these values depend on the scheduler kind, so we get them
298 // as parameters.
299 global_queue_interval: None,
300 event_interval,
301
302 #[cfg(not(loom))]
303 local_queue_capacity: 256,
304
305 #[cfg(loom)]
306 local_queue_capacity: 4,
307
308 seed_generator: RngSeedGenerator::new(RngSeed::new()),
309
310 #[cfg(tokio_unstable)]
311 unhandled_panic: UnhandledPanic::Ignore,
312
313 metrics_poll_count_histogram_enable: false,
314
315 metrics_poll_count_histogram: HistogramBuilder::default(),
316
317 disable_lifo_slot: false,
318 }
319 }
320
321 /// Enables both I/O and time drivers.
322 ///
323 /// Doing this is a shorthand for calling `enable_io` and `enable_time`
324 /// individually. If additional components are added to Tokio in the future,
325 /// `enable_all` will include these future components.
326 ///
327 /// # Examples
328 ///
329 /// ```
330 /// use tokio::runtime;
331 ///
332 /// let rt = runtime::Builder::new_multi_thread()
333 /// .enable_all()
334 /// .build()
335 /// .unwrap();
336 /// ```
337 pub fn enable_all(&mut self) -> &mut Self {
338 #[cfg(any(
339 feature = "net",
340 all(unix, feature = "process"),
341 all(unix, feature = "signal")
342 ))]
343 self.enable_io();
344 #[cfg(feature = "time")]
345 self.enable_time();
346
347 self
348 }
349
350 /// Sets the number of worker threads the `Runtime` will use.
351 ///
352 /// This can be any number above 0 though it is advised to keep this value
353 /// on the smaller side.
354 ///
355 /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
356 ///
357 /// # Default
358 ///
359 /// The default value is the number of cores available to the system.
360 ///
361 /// When using the `current_thread` runtime this method has no effect.
362 ///
363 /// # Examples
364 ///
365 /// ## Multi threaded runtime with 4 threads
366 ///
367 /// ```
368 /// use tokio::runtime;
369 ///
370 /// // This will spawn a work-stealing runtime with 4 worker threads.
371 /// let rt = runtime::Builder::new_multi_thread()
372 /// .worker_threads(4)
373 /// .build()
374 /// .unwrap();
375 ///
376 /// rt.spawn(async move {});
377 /// ```
378 ///
379 /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
380 ///
381 /// ```
382 /// use tokio::runtime;
383 ///
384 /// // Create a runtime that _must_ be driven from a call
385 /// // to `Runtime::block_on`.
386 /// let rt = runtime::Builder::new_current_thread()
387 /// .build()
388 /// .unwrap();
389 ///
390 /// // This will run the runtime and future on the current thread
391 /// rt.block_on(async move {});
392 /// ```
393 ///
394 /// # Panics
395 ///
396 /// This will panic if `val` is not larger than `0`.
397 #[track_caller]
398 pub fn worker_threads(&mut self, val: usize) -> &mut Self {
399 assert!(val > 0, "Worker threads cannot be set to 0");
400 self.worker_threads = Some(val);
401 self
402 }
403
404 /// Specifies the limit for additional threads spawned by the Runtime.
405 ///
406 /// These threads are used for blocking operations like tasks spawned
407 /// through [`spawn_blocking`], this includes but is not limited to:
408 /// - [`fs`] operations
409 /// - dns resolution through [`ToSocketAddrs`]
410 /// - writing to [`Stdout`] or [`Stderr`]
411 /// - reading from [`Stdin`]
412 ///
413 /// Unlike the [`worker_threads`], they are not always active and will exit
414 /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
415 ///
416 /// It's recommended to not set this limit too low in order to avoid hanging on operations
417 /// requiring [`spawn_blocking`].
418 ///
419 /// The default value is 512.
420 ///
421 /// # Panics
422 ///
423 /// This will panic if `val` is not larger than `0`.
424 ///
425 /// # Upgrading from 0.x
426 ///
427 /// In old versions `max_threads` limited both blocking and worker threads, but the
428 /// current `max_blocking_threads` does not include async worker threads in the count.
429 ///
430 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
431 /// [`fs`]: mod@crate::fs
432 /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
433 /// [`Stdout`]: struct@crate::io::Stdout
434 /// [`Stdin`]: struct@crate::io::Stdin
435 /// [`Stderr`]: struct@crate::io::Stderr
436 /// [`worker_threads`]: Self::worker_threads
437 /// [`thread_keep_alive`]: Self::thread_keep_alive
438 #[track_caller]
439 #[cfg_attr(docsrs, doc(alias = "max_threads"))]
440 pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
441 assert!(val > 0, "Max blocking threads cannot be set to 0");
442 self.max_blocking_threads = val;
443 self
444 }
445
446 /// Sets name of threads spawned by the `Runtime`'s thread pool.
447 ///
448 /// The default name is "tokio-runtime-worker".
449 ///
450 /// # Examples
451 ///
452 /// ```
453 /// # use tokio::runtime;
454 ///
455 /// # pub fn main() {
456 /// let rt = runtime::Builder::new_multi_thread()
457 /// .thread_name("my-pool")
458 /// .build();
459 /// # }
460 /// ```
461 pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
462 let val = val.into();
463 self.thread_name = std::sync::Arc::new(move || val.clone());
464 self
465 }
466
467 /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
468 ///
469 /// The default name fn is `|| "tokio-runtime-worker".into()`.
470 ///
471 /// # Examples
472 ///
473 /// ```
474 /// # use tokio::runtime;
475 /// # use std::sync::atomic::{AtomicUsize, Ordering};
476 /// # pub fn main() {
477 /// let rt = runtime::Builder::new_multi_thread()
478 /// .thread_name_fn(|| {
479 /// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
480 /// let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
481 /// format!("my-pool-{}", id)
482 /// })
483 /// .build();
484 /// # }
485 /// ```
486 pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
487 where
488 F: Fn() -> String + Send + Sync + 'static,
489 {
490 self.thread_name = std::sync::Arc::new(f);
491 self
492 }
493
494 /// Sets the stack size (in bytes) for worker threads.
495 ///
496 /// The actual stack size may be greater than this value if the platform
497 /// specifies minimal stack size.
498 ///
499 /// The default stack size for spawned threads is 2 MiB, though this
500 /// particular stack size is subject to change in the future.
501 ///
502 /// # Examples
503 ///
504 /// ```
505 /// # use tokio::runtime;
506 ///
507 /// # pub fn main() {
508 /// let rt = runtime::Builder::new_multi_thread()
509 /// .thread_stack_size(32 * 1024)
510 /// .build();
511 /// # }
512 /// ```
513 pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
514 self.thread_stack_size = Some(val);
515 self
516 }
517
518 /// Executes function `f` after each thread is started but before it starts
519 /// doing work.
520 ///
521 /// This is intended for bookkeeping and monitoring use cases.
522 ///
523 /// # Examples
524 ///
525 /// ```
526 /// # use tokio::runtime;
527 /// # pub fn main() {
528 /// let runtime = runtime::Builder::new_multi_thread()
529 /// .on_thread_start(|| {
530 /// println!("thread started");
531 /// })
532 /// .build();
533 /// # }
534 /// ```
535 #[cfg(not(loom))]
536 pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
537 where
538 F: Fn() + Send + Sync + 'static,
539 {
540 self.after_start = Some(std::sync::Arc::new(f));
541 self
542 }
543
544 /// Executes function `f` before each thread stops.
545 ///
546 /// This is intended for bookkeeping and monitoring use cases.
547 ///
548 /// # Examples
549 ///
550 /// ```
551 /// # use tokio::runtime;
552 /// # pub fn main() {
553 /// let runtime = runtime::Builder::new_multi_thread()
554 /// .on_thread_stop(|| {
555 /// println!("thread stopping");
556 /// })
557 /// .build();
558 /// # }
559 /// ```
560 #[cfg(not(loom))]
561 pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
562 where
563 F: Fn() + Send + Sync + 'static,
564 {
565 self.before_stop = Some(std::sync::Arc::new(f));
566 self
567 }
568
569 /// Executes function `f` just before a thread is parked (goes idle).
570 /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
571 /// can be called, and may result in this thread being unparked immediately.
572 ///
573 /// This can be used to start work only when the executor is idle, or for bookkeeping
574 /// and monitoring purposes.
575 ///
576 /// Note: There can only be one park callback for a runtime; calling this function
577 /// more than once replaces the last callback defined, rather than adding to it.
578 ///
579 /// # Examples
580 ///
581 /// ## Multithreaded executor
582 /// ```
583 /// # use std::sync::Arc;
584 /// # use std::sync::atomic::{AtomicBool, Ordering};
585 /// # use tokio::runtime;
586 /// # use tokio::sync::Barrier;
587 /// # pub fn main() {
588 /// let once = AtomicBool::new(true);
589 /// let barrier = Arc::new(Barrier::new(2));
590 ///
591 /// let runtime = runtime::Builder::new_multi_thread()
592 /// .worker_threads(1)
593 /// .on_thread_park({
594 /// let barrier = barrier.clone();
595 /// move || {
596 /// let barrier = barrier.clone();
597 /// if once.swap(false, Ordering::Relaxed) {
598 /// tokio::spawn(async move { barrier.wait().await; });
599 /// }
600 /// }
601 /// })
602 /// .build()
603 /// .unwrap();
604 ///
605 /// runtime.block_on(async {
606 /// barrier.wait().await;
607 /// })
608 /// # }
609 /// ```
610 /// ## Current thread executor
611 /// ```
612 /// # use std::sync::Arc;
613 /// # use std::sync::atomic::{AtomicBool, Ordering};
614 /// # use tokio::runtime;
615 /// # use tokio::sync::Barrier;
616 /// # pub fn main() {
617 /// let once = AtomicBool::new(true);
618 /// let barrier = Arc::new(Barrier::new(2));
619 ///
620 /// let runtime = runtime::Builder::new_current_thread()
621 /// .on_thread_park({
622 /// let barrier = barrier.clone();
623 /// move || {
624 /// let barrier = barrier.clone();
625 /// if once.swap(false, Ordering::Relaxed) {
626 /// tokio::spawn(async move { barrier.wait().await; });
627 /// }
628 /// }
629 /// })
630 /// .build()
631 /// .unwrap();
632 ///
633 /// runtime.block_on(async {
634 /// barrier.wait().await;
635 /// })
636 /// # }
637 /// ```
638 #[cfg(not(loom))]
639 pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
640 where
641 F: Fn() + Send + Sync + 'static,
642 {
643 self.before_park = Some(std::sync::Arc::new(f));
644 self
645 }
646
647 /// Executes function `f` just after a thread unparks (starts executing tasks).
648 ///
649 /// This is intended for bookkeeping and monitoring use cases; note that work
650 /// in this callback will increase latencies when the application has allowed one or
651 /// more runtime threads to go idle.
652 ///
653 /// Note: There can only be one unpark callback for a runtime; calling this function
654 /// more than once replaces the last callback defined, rather than adding to it.
655 ///
656 /// # Examples
657 ///
658 /// ```
659 /// # use tokio::runtime;
660 /// # pub fn main() {
661 /// let runtime = runtime::Builder::new_multi_thread()
662 /// .on_thread_unpark(|| {
663 /// println!("thread unparking");
664 /// })
665 /// .build();
666 ///
667 /// runtime.unwrap().block_on(async {
668 /// tokio::task::yield_now().await;
669 /// println!("Hello from Tokio!");
670 /// })
671 /// # }
672 /// ```
673 #[cfg(not(loom))]
674 pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
675 where
676 F: Fn() + Send + Sync + 'static,
677 {
678 self.after_unpark = Some(std::sync::Arc::new(f));
679 self
680 }
681
682 /// Creates the configured `Runtime`.
683 ///
684 /// The returned `Runtime` instance is ready to spawn tasks.
685 ///
686 /// # Examples
687 ///
688 /// ```
689 /// use tokio::runtime::Builder;
690 ///
691 /// let rt = Builder::new_multi_thread().build().unwrap();
692 ///
693 /// rt.block_on(async {
694 /// println!("Hello from the Tokio runtime");
695 /// });
696 /// ```
697 pub fn build(&mut self) -> io::Result<Runtime> {
698 match &self.kind {
699 Kind::CurrentThread => self.build_current_thread_runtime(),
700 #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
701 Kind::MultiThread => self.build_threaded_runtime(),
702 #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))]
703 Kind::MultiThreadAlt => self.build_alt_threaded_runtime(),
704 }
705 }
706
707 fn get_cfg(&self) -> driver::Cfg {
708 driver::Cfg {
709 enable_pause_time: match self.kind {
710 Kind::CurrentThread => true,
711 #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
712 Kind::MultiThread => false,
713 #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))]
714 Kind::MultiThreadAlt => false,
715 },
716 enable_io: self.enable_io,
717 enable_time: self.enable_time,
718 start_paused: self.start_paused,
719 nevents: self.nevents,
720 }
721 }
722
723 /// Sets a custom timeout for a thread in the blocking pool.
724 ///
725 /// By default, the timeout for a thread is set to 10 seconds. This can
726 /// be overridden using .thread_keep_alive().
727 ///
728 /// # Example
729 ///
730 /// ```
731 /// # use tokio::runtime;
732 /// # use std::time::Duration;
733 /// # pub fn main() {
734 /// let rt = runtime::Builder::new_multi_thread()
735 /// .thread_keep_alive(Duration::from_millis(100))
736 /// .build();
737 /// # }
738 /// ```
739 pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
740 self.keep_alive = Some(duration);
741 self
742 }
743
744 /// Sets the number of scheduler ticks after which the scheduler will poll the global
745 /// task queue.
746 ///
747 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
748 ///
749 /// By default the global queue interval is 31 for the current-thread scheduler. Please see
750 /// [the module documentation] for the default behavior of the multi-thread scheduler.
751 ///
752 /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
753 /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
754 /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
755 /// getting started on new work, especially if tasks frequently yield rather than complete
756 /// or await on further I/O. Conversely, a higher value prioritizes existing work, and
757 /// is a good choice when most tasks quickly complete polling.
758 ///
759 /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
760 ///
761 /// # Examples
762 ///
763 /// ```
764 /// # use tokio::runtime;
765 /// # pub fn main() {
766 /// let rt = runtime::Builder::new_multi_thread()
767 /// .global_queue_interval(31)
768 /// .build();
769 /// # }
770 /// ```
771 pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
772 self.global_queue_interval = Some(val);
773 self
774 }
775
776 /// Sets the number of scheduler ticks after which the scheduler will poll for
777 /// external events (timers, I/O, and so on).
778 ///
779 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
780 ///
781 /// By default, the event interval is `61` for all scheduler types.
782 ///
783 /// Setting the event interval determines the effective "priority" of delivering
784 /// these external events (which may wake up additional tasks), compared to
785 /// executing tasks that are currently ready to run. A smaller value is useful
786 /// when tasks frequently spend a long time in polling, or frequently yield,
787 /// which can result in overly long delays picking up I/O events. Conversely,
788 /// picking up new events requires extra synchronization and syscall overhead,
789 /// so if tasks generally complete their polling quickly, a higher event interval
790 /// will minimize that overhead while still keeping the scheduler responsive to
791 /// events.
792 ///
793 /// # Examples
794 ///
795 /// ```
796 /// # use tokio::runtime;
797 /// # pub fn main() {
798 /// let rt = runtime::Builder::new_multi_thread()
799 /// .event_interval(31)
800 /// .build();
801 /// # }
802 /// ```
803 pub fn event_interval(&mut self, val: u32) -> &mut Self {
804 self.event_interval = val;
805 self
806 }
807
808 cfg_unstable! {
809 /// Configure how the runtime responds to an unhandled panic on a
810 /// spawned task.
811 ///
812 /// By default, an unhandled panic (i.e. a panic not caught by
813 /// [`std::panic::catch_unwind`]) has no impact on the runtime's
814 /// execution. The panic is error value is forwarded to the task's
815 /// [`JoinHandle`] and all other spawned tasks continue running.
816 ///
817 /// The `unhandled_panic` option enables configuring this behavior.
818 ///
819 /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
820 /// spawned tasks have no impact on the runtime's execution.
821 /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
822 /// shutdown immediately when a spawned task panics even if that
823 /// task's `JoinHandle` has not been dropped. All other spawned tasks
824 /// will immediately terminate and further calls to
825 /// [`Runtime::block_on`] will panic.
826 ///
827 /// # Unstable
828 ///
829 /// This option is currently unstable and its implementation is
830 /// incomplete. The API may change or be removed in the future. See
831 /// tokio-rs/tokio#4516 for more details.
832 ///
833 /// # Examples
834 ///
835 /// The following demonstrates a runtime configured to shutdown on
836 /// panic. The first spawned task panics and results in the runtime
837 /// shutting down. The second spawned task never has a chance to
838 /// execute. The call to `block_on` will panic due to the runtime being
839 /// forcibly shutdown.
840 ///
841 /// ```should_panic
842 /// use tokio::runtime::{self, UnhandledPanic};
843 ///
844 /// # pub fn main() {
845 /// let rt = runtime::Builder::new_current_thread()
846 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
847 /// .build()
848 /// .unwrap();
849 ///
850 /// rt.spawn(async { panic!("boom"); });
851 /// rt.spawn(async {
852 /// // This task never completes.
853 /// });
854 ///
855 /// rt.block_on(async {
856 /// // Do some work
857 /// # loop { tokio::task::yield_now().await; }
858 /// })
859 /// # }
860 /// ```
861 ///
862 /// [`JoinHandle`]: struct@crate::task::JoinHandle
863 pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
864 self.unhandled_panic = behavior;
865 self
866 }
867
868 /// Disables the LIFO task scheduler heuristic.
869 ///
870 /// The multi-threaded scheduler includes a heuristic for optimizing
871 /// message-passing patterns. This heuristic results in the **last**
872 /// scheduled task being polled first.
873 ///
874 /// To implement this heuristic, each worker thread has a slot which
875 /// holds the task that should be polled next. However, this slot cannot
876 /// be stolen by other worker threads, which can result in lower total
877 /// throughput when tasks tend to have longer poll times.
878 ///
879 /// This configuration option will disable this heuristic resulting in
880 /// all scheduled tasks being pushed into the worker-local queue, which
881 /// is stealable.
882 ///
883 /// Consider trying this option when the task "scheduled" time is high
884 /// but the runtime is underutilized. Use tokio-rs/tokio-metrics to
885 /// collect this data.
886 ///
887 /// # Unstable
888 ///
889 /// This configuration option is considered a workaround for the LIFO
890 /// slot not being stealable. When the slot becomes stealable, we will
891 /// revisit whether or not this option is necessary. See
892 /// tokio-rs/tokio#4941.
893 ///
894 /// # Examples
895 ///
896 /// ```
897 /// use tokio::runtime;
898 ///
899 /// let rt = runtime::Builder::new_multi_thread()
900 /// .disable_lifo_slot()
901 /// .build()
902 /// .unwrap();
903 /// ```
904 pub fn disable_lifo_slot(&mut self) -> &mut Self {
905 self.disable_lifo_slot = true;
906 self
907 }
908
909 /// Specifies the random number generation seed to use within all
910 /// threads associated with the runtime being built.
911 ///
912 /// This option is intended to make certain parts of the runtime
913 /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
914 /// [`tokio::select!`] it will ensure that the order that branches are
915 /// polled is deterministic.
916 ///
917 /// In addition to the code specifying `rng_seed` and interacting with
918 /// the runtime, the internals of Tokio and the Rust compiler may affect
919 /// the sequences of random numbers. In order to ensure repeatable
920 /// results, the version of Tokio, the versions of all other
921 /// dependencies that interact with Tokio, and the Rust compiler version
922 /// should also all remain constant.
923 ///
924 /// # Examples
925 ///
926 /// ```
927 /// # use tokio::runtime::{self, RngSeed};
928 /// # pub fn main() {
929 /// let seed = RngSeed::from_bytes(b"place your seed here");
930 /// let rt = runtime::Builder::new_current_thread()
931 /// .rng_seed(seed)
932 /// .build();
933 /// # }
934 /// ```
935 ///
936 /// [`tokio::select!`]: crate::select
937 pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
938 self.seed_generator = RngSeedGenerator::new(seed);
939 self
940 }
941 }
942
943 cfg_metrics! {
944 /// Enables tracking the distribution of task poll times.
945 ///
946 /// Task poll times are not instrumented by default as doing so requires
947 /// calling [`Instant::now()`] twice per task poll, which could add
948 /// measurable overhead. Use the [`Handle::metrics()`] to access the
949 /// metrics data.
950 ///
951 /// The histogram uses fixed bucket sizes. In other words, the histogram
952 /// buckets are not dynamic based on input values. Use the
953 /// `metrics_poll_count_histogram_` builder methods to configure the
954 /// histogram details.
955 ///
956 /// # Examples
957 ///
958 /// ```
959 /// use tokio::runtime;
960 ///
961 /// let rt = runtime::Builder::new_multi_thread()
962 /// .enable_metrics_poll_count_histogram()
963 /// .build()
964 /// .unwrap();
965 /// # // Test default values here
966 /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
967 /// # let m = rt.handle().metrics();
968 /// # assert_eq!(m.poll_count_histogram_num_buckets(), 10);
969 /// # assert_eq!(m.poll_count_histogram_bucket_range(0), us(0)..us(100));
970 /// # assert_eq!(m.poll_count_histogram_bucket_range(1), us(100)..us(200));
971 /// ```
972 ///
973 /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
974 /// [`Instant::now()`]: std::time::Instant::now
975 pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
976 self.metrics_poll_count_histogram_enable = true;
977 self
978 }
979
980 /// Sets the histogram scale for tracking the distribution of task poll
981 /// times.
982 ///
983 /// Tracking the distribution of task poll times can be done using a
984 /// linear or log scale. When using linear scale, each histogram bucket
985 /// will represent the same range of poll times. When using log scale,
986 /// each histogram bucket will cover a range twice as big as the
987 /// previous bucket.
988 ///
989 /// **Default:** linear scale.
990 ///
991 /// # Examples
992 ///
993 /// ```
994 /// use tokio::runtime::{self, HistogramScale};
995 ///
996 /// let rt = runtime::Builder::new_multi_thread()
997 /// .enable_metrics_poll_count_histogram()
998 /// .metrics_poll_count_histogram_scale(HistogramScale::Log)
999 /// .build()
1000 /// .unwrap();
1001 /// ```
1002 pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1003 self.metrics_poll_count_histogram.scale = histogram_scale;
1004 self
1005 }
1006
1007 /// Sets the histogram resolution for tracking the distribution of task
1008 /// poll times.
1009 ///
1010 /// The resolution is the histogram's first bucket's range. When using a
1011 /// linear histogram scale, each bucket will cover the same range. When
1012 /// using a log scale, each bucket will cover a range twice as big as
1013 /// the previous bucket. In the log case, the resolution represents the
1014 /// smallest bucket range.
1015 ///
1016 /// Note that, when using log scale, the resolution is rounded up to the
1017 /// nearest power of 2 in nanoseconds.
1018 ///
1019 /// **Default:** 100 microseconds.
1020 ///
1021 /// # Examples
1022 ///
1023 /// ```
1024 /// use tokio::runtime;
1025 /// use std::time::Duration;
1026 ///
1027 /// let rt = runtime::Builder::new_multi_thread()
1028 /// .enable_metrics_poll_count_histogram()
1029 /// .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1030 /// .build()
1031 /// .unwrap();
1032 /// ```
1033 pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1034 assert!(resolution > Duration::from_secs(0));
1035 // Sanity check the argument and also make the cast below safe.
1036 assert!(resolution <= Duration::from_secs(1));
1037
1038 let resolution = resolution.as_nanos() as u64;
1039 self.metrics_poll_count_histogram.resolution = resolution;
1040 self
1041 }
1042
1043 /// Sets the number of buckets for the histogram tracking the
1044 /// distribution of task poll times.
1045 ///
1046 /// The last bucket tracks all greater values that fall out of other
1047 /// ranges. So, configuring the histogram using a linear scale,
1048 /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1049 /// polls that take more than 450ms to complete.
1050 ///
1051 /// **Default:** 10
1052 ///
1053 /// # Examples
1054 ///
1055 /// ```
1056 /// use tokio::runtime;
1057 ///
1058 /// let rt = runtime::Builder::new_multi_thread()
1059 /// .enable_metrics_poll_count_histogram()
1060 /// .metrics_poll_count_histogram_buckets(15)
1061 /// .build()
1062 /// .unwrap();
1063 /// ```
1064 pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1065 self.metrics_poll_count_histogram.num_buckets = buckets;
1066 self
1067 }
1068 }
1069
1070 cfg_loom! {
1071 pub(crate) fn local_queue_capacity(&mut self, value: usize) -> &mut Self {
1072 assert!(value.is_power_of_two());
1073 self.local_queue_capacity = value;
1074 self
1075 }
1076 }
1077
1078 fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1079 use crate::runtime::scheduler::{self, CurrentThread};
1080 use crate::runtime::{runtime::Scheduler, Config};
1081
1082 let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1083
1084 // Blocking pool
1085 let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1086 let blocking_spawner = blocking_pool.spawner().clone();
1087
1088 // Generate a rng seed for this runtime.
1089 let seed_generator_1 = self.seed_generator.next_generator();
1090 let seed_generator_2 = self.seed_generator.next_generator();
1091
1092 // And now put a single-threaded scheduler on top of the timer. When
1093 // there are no futures ready to do something, it'll let the timer or
1094 // the reactor to generate some new stimuli for the futures to continue
1095 // in their life.
1096 let (scheduler, handle) = CurrentThread::new(
1097 driver,
1098 driver_handle,
1099 blocking_spawner,
1100 seed_generator_2,
1101 Config {
1102 before_park: self.before_park.clone(),
1103 after_unpark: self.after_unpark.clone(),
1104 global_queue_interval: self.global_queue_interval,
1105 event_interval: self.event_interval,
1106 local_queue_capacity: self.local_queue_capacity,
1107 #[cfg(tokio_unstable)]
1108 unhandled_panic: self.unhandled_panic.clone(),
1109 disable_lifo_slot: self.disable_lifo_slot,
1110 seed_generator: seed_generator_1,
1111 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1112 },
1113 );
1114
1115 let handle = Handle {
1116 inner: scheduler::Handle::CurrentThread(handle),
1117 };
1118
1119 Ok(Runtime::from_parts(
1120 Scheduler::CurrentThread(scheduler),
1121 handle,
1122 blocking_pool,
1123 ))
1124 }
1125
1126 fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1127 if self.metrics_poll_count_histogram_enable {
1128 Some(self.metrics_poll_count_histogram.clone())
1129 } else {
1130 None
1131 }
1132 }
1133}
1134
1135cfg_io_driver! {
1136 impl Builder {
1137 /// Enables the I/O driver.
1138 ///
1139 /// Doing this enables using net, process, signal, and some I/O types on
1140 /// the runtime.
1141 ///
1142 /// # Examples
1143 ///
1144 /// ```
1145 /// use tokio::runtime;
1146 ///
1147 /// let rt = runtime::Builder::new_multi_thread()
1148 /// .enable_io()
1149 /// .build()
1150 /// .unwrap();
1151 /// ```
1152 pub fn enable_io(&mut self) -> &mut Self {
1153 self.enable_io = true;
1154 self
1155 }
1156
1157 /// Enables the I/O driver and configures the max number of events to be
1158 /// processed per tick.
1159 ///
1160 /// # Examples
1161 ///
1162 /// ```
1163 /// use tokio::runtime;
1164 ///
1165 /// let rt = runtime::Builder::new_current_thread()
1166 /// .enable_io()
1167 /// .max_io_events_per_tick(1024)
1168 /// .build()
1169 /// .unwrap();
1170 /// ```
1171 pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1172 self.nevents = capacity;
1173 self
1174 }
1175 }
1176}
1177
1178cfg_time! {
1179 impl Builder {
1180 /// Enables the time driver.
1181 ///
1182 /// Doing this enables using `tokio::time` on the runtime.
1183 ///
1184 /// # Examples
1185 ///
1186 /// ```
1187 /// use tokio::runtime;
1188 ///
1189 /// let rt = runtime::Builder::new_multi_thread()
1190 /// .enable_time()
1191 /// .build()
1192 /// .unwrap();
1193 /// ```
1194 pub fn enable_time(&mut self) -> &mut Self {
1195 self.enable_time = true;
1196 self
1197 }
1198 }
1199}
1200
1201cfg_test_util! {
1202 impl Builder {
1203 /// Controls if the runtime's clock starts paused or advancing.
1204 ///
1205 /// Pausing time requires the current-thread runtime; construction of
1206 /// the runtime will panic otherwise.
1207 ///
1208 /// # Examples
1209 ///
1210 /// ```
1211 /// use tokio::runtime;
1212 ///
1213 /// let rt = runtime::Builder::new_current_thread()
1214 /// .enable_time()
1215 /// .start_paused(true)
1216 /// .build()
1217 /// .unwrap();
1218 /// ```
1219 pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1220 self.start_paused = start_paused;
1221 self
1222 }
1223 }
1224}
1225
1226cfg_rt_multi_thread! {
1227 impl Builder {
1228 fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1229 use crate::loom::sys::num_cpus;
1230 use crate::runtime::{Config, runtime::Scheduler};
1231 use crate::runtime::scheduler::{self, MultiThread};
1232
1233 let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
1234
1235 let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1236
1237 // Create the blocking pool
1238 let blocking_pool =
1239 blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
1240 let blocking_spawner = blocking_pool.spawner().clone();
1241
1242 // Generate a rng seed for this runtime.
1243 let seed_generator_1 = self.seed_generator.next_generator();
1244 let seed_generator_2 = self.seed_generator.next_generator();
1245
1246 let (scheduler, handle, launch) = MultiThread::new(
1247 core_threads,
1248 driver,
1249 driver_handle,
1250 blocking_spawner,
1251 seed_generator_2,
1252 Config {
1253 before_park: self.before_park.clone(),
1254 after_unpark: self.after_unpark.clone(),
1255 global_queue_interval: self.global_queue_interval,
1256 event_interval: self.event_interval,
1257 local_queue_capacity: self.local_queue_capacity,
1258 #[cfg(tokio_unstable)]
1259 unhandled_panic: self.unhandled_panic.clone(),
1260 disable_lifo_slot: self.disable_lifo_slot,
1261 seed_generator: seed_generator_1,
1262 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1263 },
1264 );
1265
1266 let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1267
1268 // Spawn the thread pool workers
1269 let _enter = handle.enter();
1270 launch.launch();
1271
1272 Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1273 }
1274
1275 cfg_unstable! {
1276 fn build_alt_threaded_runtime(&mut self) -> io::Result<Runtime> {
1277 use crate::loom::sys::num_cpus;
1278 use crate::runtime::{Config, runtime::Scheduler};
1279 use crate::runtime::scheduler::MultiThreadAlt;
1280
1281 let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
1282 let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1283
1284 // Create the blocking pool
1285 let blocking_pool =
1286 blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
1287 let blocking_spawner = blocking_pool.spawner().clone();
1288
1289 // Generate a rng seed for this runtime.
1290 let seed_generator_1 = self.seed_generator.next_generator();
1291 let seed_generator_2 = self.seed_generator.next_generator();
1292
1293 let (scheduler, handle) = MultiThreadAlt::new(
1294 core_threads,
1295 driver,
1296 driver_handle,
1297 blocking_spawner,
1298 seed_generator_2,
1299 Config {
1300 before_park: self.before_park.clone(),
1301 after_unpark: self.after_unpark.clone(),
1302 global_queue_interval: self.global_queue_interval,
1303 event_interval: self.event_interval,
1304 local_queue_capacity: self.local_queue_capacity,
1305 #[cfg(tokio_unstable)]
1306 unhandled_panic: self.unhandled_panic.clone(),
1307 disable_lifo_slot: self.disable_lifo_slot,
1308 seed_generator: seed_generator_1,
1309 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1310 },
1311 );
1312
1313 Ok(Runtime::from_parts(Scheduler::MultiThreadAlt(scheduler), handle, blocking_pool))
1314 }
1315 }
1316 }
1317}
1318
1319impl fmt::Debug for Builder {
1320 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1321 fmt.debug_struct("Builder")
1322 .field("worker_threads", &self.worker_threads)
1323 .field("max_blocking_threads", &self.max_blocking_threads)
1324 .field(
1325 "thread_name",
1326 &"<dyn Fn() -> String + Send + Sync + 'static>",
1327 )
1328 .field("thread_stack_size", &self.thread_stack_size)
1329 .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1330 .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1331 .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1332 .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1333 .finish()
1334 }
1335}
1336