1use crate::runtime::handle::Handle;
2use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime};
3use crate::util::rand::{RngSeed, RngSeedGenerator};
4
5use std::fmt;
6use std::io;
7use std::time::Duration;
8
9/// Builds Tokio Runtime with custom configuration values.
10///
11/// Methods can be chained in order to set the configuration values. The
12/// Runtime is constructed by calling [`build`].
13///
14/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
15/// or [`Builder::new_current_thread`].
16///
17/// See function level documentation for details on the various configuration
18/// settings.
19///
20/// [`build`]: method@Self::build
21/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
22/// [`Builder::new_current_thread`]: method@Self::new_current_thread
23///
24/// # Examples
25///
26/// ```
27/// use tokio::runtime::Builder;
28///
29/// fn main() {
30/// // build runtime
31/// let runtime = Builder::new_multi_thread()
32/// .worker_threads(4)
33/// .thread_name("my-custom-name")
34/// .thread_stack_size(3 * 1024 * 1024)
35/// .build()
36/// .unwrap();
37///
38/// // use runtime ...
39/// }
40/// ```
41pub struct Builder {
42 /// Runtime type
43 kind: Kind,
44
45 /// Whether or not to enable the I/O driver
46 enable_io: bool,
47 nevents: usize,
48
49 /// Whether or not to enable the time driver
50 enable_time: bool,
51
52 /// Whether or not the clock should start paused.
53 start_paused: bool,
54
55 /// The number of worker threads, used by Runtime.
56 ///
57 /// Only used when not using the current-thread executor.
58 worker_threads: Option<usize>,
59
60 /// Cap on thread usage.
61 max_blocking_threads: usize,
62
63 /// Name fn used for threads spawned by the runtime.
64 pub(super) thread_name: ThreadNameFn,
65
66 /// Stack size used for threads spawned by the runtime.
67 pub(super) thread_stack_size: Option<usize>,
68
69 /// Callback to run after each thread starts.
70 pub(super) after_start: Option<Callback>,
71
72 /// To run before each worker thread stops
73 pub(super) before_stop: Option<Callback>,
74
75 /// To run before each worker thread is parked.
76 pub(super) before_park: Option<Callback>,
77
78 /// To run after each thread is unparked.
79 pub(super) after_unpark: Option<Callback>,
80
81 /// Customizable keep alive timeout for BlockingPool
82 pub(super) keep_alive: Option<Duration>,
83
84 /// How many ticks before pulling a task from the global/remote queue?
85 ///
86 /// When `None`, the value is unspecified and behavior details are left to
87 /// the scheduler. Each scheduler flavor could choose to either pick its own
88 /// default value or use some other strategy to decide when to poll from the
89 /// global queue. For example, the multi-threaded scheduler uses a
90 /// self-tuning strategy based on mean task poll times.
91 pub(super) global_queue_interval: Option<u32>,
92
93 /// How many ticks before yielding to the driver for timer and I/O events?
94 pub(super) event_interval: u32,
95
96 /// When true, the multi-threade scheduler LIFO slot should not be used.
97 ///
98 /// This option should only be exposed as unstable.
99 pub(super) disable_lifo_slot: bool,
100
101 /// Specify a random number generator seed to provide deterministic results
102 pub(super) seed_generator: RngSeedGenerator,
103
104 /// When true, enables task poll count histogram instrumentation.
105 pub(super) metrics_poll_count_histogram_enable: bool,
106
107 /// Configures the task poll count histogram
108 pub(super) metrics_poll_count_histogram: HistogramBuilder,
109
110 #[cfg(tokio_unstable)]
111 pub(super) unhandled_panic: UnhandledPanic,
112}
113
114cfg_unstable! {
115 /// How the runtime should respond to unhandled panics.
116 ///
117 /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
118 /// to configure the runtime behavior when a spawned task panics.
119 ///
120 /// See [`Builder::unhandled_panic`] for more details.
121 #[derive(Debug, Clone)]
122 #[non_exhaustive]
123 pub enum UnhandledPanic {
124 /// The runtime should ignore panics on spawned tasks.
125 ///
126 /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
127 /// tasks continue running normally.
128 ///
129 /// This is the default behavior.
130 ///
131 /// # Examples
132 ///
133 /// ```
134 /// use tokio::runtime::{self, UnhandledPanic};
135 ///
136 /// # pub fn main() {
137 /// let rt = runtime::Builder::new_current_thread()
138 /// .unhandled_panic(UnhandledPanic::Ignore)
139 /// .build()
140 /// .unwrap();
141 ///
142 /// let task1 = rt.spawn(async { panic!("boom"); });
143 /// let task2 = rt.spawn(async {
144 /// // This task completes normally
145 /// "done"
146 /// });
147 ///
148 /// rt.block_on(async {
149 /// // The panic on the first task is forwarded to the `JoinHandle`
150 /// assert!(task1.await.is_err());
151 ///
152 /// // The second task completes normally
153 /// assert!(task2.await.is_ok());
154 /// })
155 /// # }
156 /// ```
157 ///
158 /// [`JoinHandle`]: struct@crate::task::JoinHandle
159 Ignore,
160
161 /// The runtime should immediately shutdown if a spawned task panics.
162 ///
163 /// The runtime will immediately shutdown even if the panicked task's
164 /// [`JoinHandle`] is still available. All further spawned tasks will be
165 /// immediately dropped and call to [`Runtime::block_on`] will panic.
166 ///
167 /// # Examples
168 ///
169 /// ```should_panic
170 /// use tokio::runtime::{self, UnhandledPanic};
171 ///
172 /// # pub fn main() {
173 /// let rt = runtime::Builder::new_current_thread()
174 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
175 /// .build()
176 /// .unwrap();
177 ///
178 /// rt.spawn(async { panic!("boom"); });
179 /// rt.spawn(async {
180 /// // This task never completes.
181 /// });
182 ///
183 /// rt.block_on(async {
184 /// // Do some work
185 /// # loop { tokio::task::yield_now().await; }
186 /// })
187 /// # }
188 /// ```
189 ///
190 /// [`JoinHandle`]: struct@crate::task::JoinHandle
191 ShutdownRuntime,
192 }
193}
194
195pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
196
197#[derive(Clone, Copy)]
198pub(crate) enum Kind {
199 CurrentThread,
200 #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
201 MultiThread,
202}
203
204impl Builder {
205 /// Returns a new builder with the current thread scheduler selected.
206 ///
207 /// Configuration methods can be chained on the return value.
208 ///
209 /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
210 /// [`LocalSet`].
211 ///
212 /// [`LocalSet`]: crate::task::LocalSet
213 pub fn new_current_thread() -> Builder {
214 #[cfg(loom)]
215 const EVENT_INTERVAL: u32 = 4;
216 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
217 #[cfg(not(loom))]
218 const EVENT_INTERVAL: u32 = 61;
219
220 Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
221 }
222
223 cfg_not_wasi! {
224 /// Returns a new builder with the multi thread scheduler selected.
225 ///
226 /// Configuration methods can be chained on the return value.
227 #[cfg(feature = "rt-multi-thread")]
228 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
229 pub fn new_multi_thread() -> Builder {
230 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
231 Builder::new(Kind::MultiThread, 61)
232 }
233 }
234
235 /// Returns a new runtime builder initialized with default configuration
236 /// values.
237 ///
238 /// Configuration methods can be chained on the return value.
239 pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
240 Builder {
241 kind,
242
243 // I/O defaults to "off"
244 enable_io: false,
245 nevents: 1024,
246
247 // Time defaults to "off"
248 enable_time: false,
249
250 // The clock starts not-paused
251 start_paused: false,
252
253 // Read from environment variable first in multi-threaded mode.
254 // Default to lazy auto-detection (one thread per CPU core)
255 worker_threads: None,
256
257 max_blocking_threads: 512,
258
259 // Default thread name
260 thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
261
262 // Do not set a stack size by default
263 thread_stack_size: None,
264
265 // No worker thread callbacks
266 after_start: None,
267 before_stop: None,
268 before_park: None,
269 after_unpark: None,
270
271 keep_alive: None,
272
273 // Defaults for these values depend on the scheduler kind, so we get them
274 // as parameters.
275 global_queue_interval: None,
276 event_interval,
277
278 seed_generator: RngSeedGenerator::new(RngSeed::new()),
279
280 #[cfg(tokio_unstable)]
281 unhandled_panic: UnhandledPanic::Ignore,
282
283 metrics_poll_count_histogram_enable: false,
284
285 metrics_poll_count_histogram: Default::default(),
286
287 disable_lifo_slot: false,
288 }
289 }
290
291 /// Enables both I/O and time drivers.
292 ///
293 /// Doing this is a shorthand for calling `enable_io` and `enable_time`
294 /// individually. If additional components are added to Tokio in the future,
295 /// `enable_all` will include these future components.
296 ///
297 /// # Examples
298 ///
299 /// ```
300 /// use tokio::runtime;
301 ///
302 /// let rt = runtime::Builder::new_multi_thread()
303 /// .enable_all()
304 /// .build()
305 /// .unwrap();
306 /// ```
307 pub fn enable_all(&mut self) -> &mut Self {
308 #[cfg(any(
309 feature = "net",
310 all(unix, feature = "process"),
311 all(unix, feature = "signal")
312 ))]
313 self.enable_io();
314 #[cfg(feature = "time")]
315 self.enable_time();
316
317 self
318 }
319
320 /// Sets the number of worker threads the `Runtime` will use.
321 ///
322 /// This can be any number above 0 though it is advised to keep this value
323 /// on the smaller side.
324 ///
325 /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
326 ///
327 /// # Default
328 ///
329 /// The default value is the number of cores available to the system.
330 ///
331 /// When using the `current_thread` runtime this method has no effect.
332 ///
333 /// # Examples
334 ///
335 /// ## Multi threaded runtime with 4 threads
336 ///
337 /// ```
338 /// use tokio::runtime;
339 ///
340 /// // This will spawn a work-stealing runtime with 4 worker threads.
341 /// let rt = runtime::Builder::new_multi_thread()
342 /// .worker_threads(4)
343 /// .build()
344 /// .unwrap();
345 ///
346 /// rt.spawn(async move {});
347 /// ```
348 ///
349 /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
350 ///
351 /// ```
352 /// use tokio::runtime;
353 ///
354 /// // Create a runtime that _must_ be driven from a call
355 /// // to `Runtime::block_on`.
356 /// let rt = runtime::Builder::new_current_thread()
357 /// .build()
358 /// .unwrap();
359 ///
360 /// // This will run the runtime and future on the current thread
361 /// rt.block_on(async move {});
362 /// ```
363 ///
364 /// # Panics
365 ///
366 /// This will panic if `val` is not larger than `0`.
367 #[track_caller]
368 pub fn worker_threads(&mut self, val: usize) -> &mut Self {
369 assert!(val > 0, "Worker threads cannot be set to 0");
370 self.worker_threads = Some(val);
371 self
372 }
373
374 /// Specifies the limit for additional threads spawned by the Runtime.
375 ///
376 /// These threads are used for blocking operations like tasks spawned
377 /// through [`spawn_blocking`]. Unlike the [`worker_threads`], they are not
378 /// always active and will exit if left idle for too long. You can change
379 /// this timeout duration with [`thread_keep_alive`].
380 ///
381 /// The default value is 512.
382 ///
383 /// # Panics
384 ///
385 /// This will panic if `val` is not larger than `0`.
386 ///
387 /// # Upgrading from 0.x
388 ///
389 /// In old versions `max_threads` limited both blocking and worker threads, but the
390 /// current `max_blocking_threads` does not include async worker threads in the count.
391 ///
392 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
393 /// [`worker_threads`]: Self::worker_threads
394 /// [`thread_keep_alive`]: Self::thread_keep_alive
395 #[track_caller]
396 #[cfg_attr(docsrs, doc(alias = "max_threads"))]
397 pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
398 assert!(val > 0, "Max blocking threads cannot be set to 0");
399 self.max_blocking_threads = val;
400 self
401 }
402
403 /// Sets name of threads spawned by the `Runtime`'s thread pool.
404 ///
405 /// The default name is "tokio-runtime-worker".
406 ///
407 /// # Examples
408 ///
409 /// ```
410 /// # use tokio::runtime;
411 ///
412 /// # pub fn main() {
413 /// let rt = runtime::Builder::new_multi_thread()
414 /// .thread_name("my-pool")
415 /// .build();
416 /// # }
417 /// ```
418 pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
419 let val = val.into();
420 self.thread_name = std::sync::Arc::new(move || val.clone());
421 self
422 }
423
424 /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
425 ///
426 /// The default name fn is `|| "tokio-runtime-worker".into()`.
427 ///
428 /// # Examples
429 ///
430 /// ```
431 /// # use tokio::runtime;
432 /// # use std::sync::atomic::{AtomicUsize, Ordering};
433 /// # pub fn main() {
434 /// let rt = runtime::Builder::new_multi_thread()
435 /// .thread_name_fn(|| {
436 /// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
437 /// let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
438 /// format!("my-pool-{}", id)
439 /// })
440 /// .build();
441 /// # }
442 /// ```
443 pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
444 where
445 F: Fn() -> String + Send + Sync + 'static,
446 {
447 self.thread_name = std::sync::Arc::new(f);
448 self
449 }
450
451 /// Sets the stack size (in bytes) for worker threads.
452 ///
453 /// The actual stack size may be greater than this value if the platform
454 /// specifies minimal stack size.
455 ///
456 /// The default stack size for spawned threads is 2 MiB, though this
457 /// particular stack size is subject to change in the future.
458 ///
459 /// # Examples
460 ///
461 /// ```
462 /// # use tokio::runtime;
463 ///
464 /// # pub fn main() {
465 /// let rt = runtime::Builder::new_multi_thread()
466 /// .thread_stack_size(32 * 1024)
467 /// .build();
468 /// # }
469 /// ```
470 pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
471 self.thread_stack_size = Some(val);
472 self
473 }
474
475 /// Executes function `f` after each thread is started but before it starts
476 /// doing work.
477 ///
478 /// This is intended for bookkeeping and monitoring use cases.
479 ///
480 /// # Examples
481 ///
482 /// ```
483 /// # use tokio::runtime;
484 /// # pub fn main() {
485 /// let runtime = runtime::Builder::new_multi_thread()
486 /// .on_thread_start(|| {
487 /// println!("thread started");
488 /// })
489 /// .build();
490 /// # }
491 /// ```
492 #[cfg(not(loom))]
493 pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
494 where
495 F: Fn() + Send + Sync + 'static,
496 {
497 self.after_start = Some(std::sync::Arc::new(f));
498 self
499 }
500
501 /// Executes function `f` before each thread stops.
502 ///
503 /// This is intended for bookkeeping and monitoring use cases.
504 ///
505 /// # Examples
506 ///
507 /// ```
508 /// # use tokio::runtime;
509 /// # pub fn main() {
510 /// let runtime = runtime::Builder::new_multi_thread()
511 /// .on_thread_stop(|| {
512 /// println!("thread stopping");
513 /// })
514 /// .build();
515 /// # }
516 /// ```
517 #[cfg(not(loom))]
518 pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
519 where
520 F: Fn() + Send + Sync + 'static,
521 {
522 self.before_stop = Some(std::sync::Arc::new(f));
523 self
524 }
525
526 /// Executes function `f` just before a thread is parked (goes idle).
527 /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
528 /// can be called, and may result in this thread being unparked immediately.
529 ///
530 /// This can be used to start work only when the executor is idle, or for bookkeeping
531 /// and monitoring purposes.
532 ///
533 /// Note: There can only be one park callback for a runtime; calling this function
534 /// more than once replaces the last callback defined, rather than adding to it.
535 ///
536 /// # Examples
537 ///
538 /// ## Multithreaded executor
539 /// ```
540 /// # use std::sync::Arc;
541 /// # use std::sync::atomic::{AtomicBool, Ordering};
542 /// # use tokio::runtime;
543 /// # use tokio::sync::Barrier;
544 /// # pub fn main() {
545 /// let once = AtomicBool::new(true);
546 /// let barrier = Arc::new(Barrier::new(2));
547 ///
548 /// let runtime = runtime::Builder::new_multi_thread()
549 /// .worker_threads(1)
550 /// .on_thread_park({
551 /// let barrier = barrier.clone();
552 /// move || {
553 /// let barrier = barrier.clone();
554 /// if once.swap(false, Ordering::Relaxed) {
555 /// tokio::spawn(async move { barrier.wait().await; });
556 /// }
557 /// }
558 /// })
559 /// .build()
560 /// .unwrap();
561 ///
562 /// runtime.block_on(async {
563 /// barrier.wait().await;
564 /// })
565 /// # }
566 /// ```
567 /// ## Current thread executor
568 /// ```
569 /// # use std::sync::Arc;
570 /// # use std::sync::atomic::{AtomicBool, Ordering};
571 /// # use tokio::runtime;
572 /// # use tokio::sync::Barrier;
573 /// # pub fn main() {
574 /// let once = AtomicBool::new(true);
575 /// let barrier = Arc::new(Barrier::new(2));
576 ///
577 /// let runtime = runtime::Builder::new_current_thread()
578 /// .on_thread_park({
579 /// let barrier = barrier.clone();
580 /// move || {
581 /// let barrier = barrier.clone();
582 /// if once.swap(false, Ordering::Relaxed) {
583 /// tokio::spawn(async move { barrier.wait().await; });
584 /// }
585 /// }
586 /// })
587 /// .build()
588 /// .unwrap();
589 ///
590 /// runtime.block_on(async {
591 /// barrier.wait().await;
592 /// })
593 /// # }
594 /// ```
595 #[cfg(not(loom))]
596 pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
597 where
598 F: Fn() + Send + Sync + 'static,
599 {
600 self.before_park = Some(std::sync::Arc::new(f));
601 self
602 }
603
604 /// Executes function `f` just after a thread unparks (starts executing tasks).
605 ///
606 /// This is intended for bookkeeping and monitoring use cases; note that work
607 /// in this callback will increase latencies when the application has allowed one or
608 /// more runtime threads to go idle.
609 ///
610 /// Note: There can only be one unpark callback for a runtime; calling this function
611 /// more than once replaces the last callback defined, rather than adding to it.
612 ///
613 /// # Examples
614 ///
615 /// ```
616 /// # use tokio::runtime;
617 /// # pub fn main() {
618 /// let runtime = runtime::Builder::new_multi_thread()
619 /// .on_thread_unpark(|| {
620 /// println!("thread unparking");
621 /// })
622 /// .build();
623 ///
624 /// runtime.unwrap().block_on(async {
625 /// tokio::task::yield_now().await;
626 /// println!("Hello from Tokio!");
627 /// })
628 /// # }
629 /// ```
630 #[cfg(not(loom))]
631 pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
632 where
633 F: Fn() + Send + Sync + 'static,
634 {
635 self.after_unpark = Some(std::sync::Arc::new(f));
636 self
637 }
638
639 /// Creates the configured `Runtime`.
640 ///
641 /// The returned `Runtime` instance is ready to spawn tasks.
642 ///
643 /// # Examples
644 ///
645 /// ```
646 /// use tokio::runtime::Builder;
647 ///
648 /// let rt = Builder::new_multi_thread().build().unwrap();
649 ///
650 /// rt.block_on(async {
651 /// println!("Hello from the Tokio runtime");
652 /// });
653 /// ```
654 pub fn build(&mut self) -> io::Result<Runtime> {
655 match &self.kind {
656 Kind::CurrentThread => self.build_current_thread_runtime(),
657 #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
658 Kind::MultiThread => self.build_threaded_runtime(),
659 }
660 }
661
662 fn get_cfg(&self) -> driver::Cfg {
663 driver::Cfg {
664 enable_pause_time: match self.kind {
665 Kind::CurrentThread => true,
666 #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
667 Kind::MultiThread => false,
668 },
669 enable_io: self.enable_io,
670 enable_time: self.enable_time,
671 start_paused: self.start_paused,
672 nevents: self.nevents,
673 }
674 }
675
676 /// Sets a custom timeout for a thread in the blocking pool.
677 ///
678 /// By default, the timeout for a thread is set to 10 seconds. This can
679 /// be overridden using .thread_keep_alive().
680 ///
681 /// # Example
682 ///
683 /// ```
684 /// # use tokio::runtime;
685 /// # use std::time::Duration;
686 /// # pub fn main() {
687 /// let rt = runtime::Builder::new_multi_thread()
688 /// .thread_keep_alive(Duration::from_millis(100))
689 /// .build();
690 /// # }
691 /// ```
692 pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
693 self.keep_alive = Some(duration);
694 self
695 }
696
697 /// Sets the number of scheduler ticks after which the scheduler will poll the global
698 /// task queue.
699 ///
700 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
701 ///
702 /// By default the global queue interval is:
703 ///
704 /// * `31` for the current-thread scheduler.
705 /// * `61` for the multithreaded scheduler.
706 ///
707 /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
708 /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
709 /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
710 /// getting started on new work, especially if tasks frequently yield rather than complete
711 /// or await on further I/O. Conversely, a higher value prioritizes existing work, and
712 /// is a good choice when most tasks quickly complete polling.
713 ///
714 /// # Examples
715 ///
716 /// ```
717 /// # use tokio::runtime;
718 /// # pub fn main() {
719 /// let rt = runtime::Builder::new_multi_thread()
720 /// .global_queue_interval(31)
721 /// .build();
722 /// # }
723 /// ```
724 pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
725 self.global_queue_interval = Some(val);
726 self
727 }
728
729 /// Sets the number of scheduler ticks after which the scheduler will poll for
730 /// external events (timers, I/O, and so on).
731 ///
732 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
733 ///
734 /// By default, the event interval is `61` for all scheduler types.
735 ///
736 /// Setting the event interval determines the effective "priority" of delivering
737 /// these external events (which may wake up additional tasks), compared to
738 /// executing tasks that are currently ready to run. A smaller value is useful
739 /// when tasks frequently spend a long time in polling, or frequently yield,
740 /// which can result in overly long delays picking up I/O events. Conversely,
741 /// picking up new events requires extra synchronization and syscall overhead,
742 /// so if tasks generally complete their polling quickly, a higher event interval
743 /// will minimize that overhead while still keeping the scheduler responsive to
744 /// events.
745 ///
746 /// # Examples
747 ///
748 /// ```
749 /// # use tokio::runtime;
750 /// # pub fn main() {
751 /// let rt = runtime::Builder::new_multi_thread()
752 /// .event_interval(31)
753 /// .build();
754 /// # }
755 /// ```
756 pub fn event_interval(&mut self, val: u32) -> &mut Self {
757 self.event_interval = val;
758 self
759 }
760
761 cfg_unstable! {
762 /// Configure how the runtime responds to an unhandled panic on a
763 /// spawned task.
764 ///
765 /// By default, an unhandled panic (i.e. a panic not caught by
766 /// [`std::panic::catch_unwind`]) has no impact on the runtime's
767 /// execution. The panic is error value is forwarded to the task's
768 /// [`JoinHandle`] and all other spawned tasks continue running.
769 ///
770 /// The `unhandled_panic` option enables configuring this behavior.
771 ///
772 /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
773 /// spawned tasks have no impact on the runtime's execution.
774 /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
775 /// shutdown immediately when a spawned task panics even if that
776 /// task's `JoinHandle` has not been dropped. All other spawned tasks
777 /// will immediately terminate and further calls to
778 /// [`Runtime::block_on`] will panic.
779 ///
780 /// # Unstable
781 ///
782 /// This option is currently unstable and its implementation is
783 /// incomplete. The API may change or be removed in the future. See
784 /// tokio-rs/tokio#4516 for more details.
785 ///
786 /// # Examples
787 ///
788 /// The following demonstrates a runtime configured to shutdown on
789 /// panic. The first spawned task panics and results in the runtime
790 /// shutting down. The second spawned task never has a chance to
791 /// execute. The call to `block_on` will panic due to the runtime being
792 /// forcibly shutdown.
793 ///
794 /// ```should_panic
795 /// use tokio::runtime::{self, UnhandledPanic};
796 ///
797 /// # pub fn main() {
798 /// let rt = runtime::Builder::new_current_thread()
799 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
800 /// .build()
801 /// .unwrap();
802 ///
803 /// rt.spawn(async { panic!("boom"); });
804 /// rt.spawn(async {
805 /// // This task never completes.
806 /// });
807 ///
808 /// rt.block_on(async {
809 /// // Do some work
810 /// # loop { tokio::task::yield_now().await; }
811 /// })
812 /// # }
813 /// ```
814 ///
815 /// [`JoinHandle`]: struct@crate::task::JoinHandle
816 pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
817 self.unhandled_panic = behavior;
818 self
819 }
820
821 /// Disables the LIFO task scheduler heuristic.
822 ///
823 /// The multi-threaded scheduler includes a heuristic for optimizing
824 /// message-passing patterns. This heuristic results in the **last**
825 /// scheduled task being polled first.
826 ///
827 /// To implement this heuristic, each worker thread has a slot which
828 /// holds the task that should be polled next. However, this slot cannot
829 /// be stolen by other worker threads, which can result in lower total
830 /// throughput when tasks tend to have longer poll times.
831 ///
832 /// This configuration option will disable this heuristic resulting in
833 /// all scheduled tasks being pushed into the worker-local queue, which
834 /// is stealable.
835 ///
836 /// Consider trying this option when the task "scheduled" time is high
837 /// but the runtime is underutilized. Use tokio-rs/tokio-metrics to
838 /// collect this data.
839 ///
840 /// # Unstable
841 ///
842 /// This configuration option is considered a workaround for the LIFO
843 /// slot not being stealable. When the slot becomes stealable, we will
844 /// revisit whether or not this option is necessary. See
845 /// tokio-rs/tokio#4941.
846 ///
847 /// # Examples
848 ///
849 /// ```
850 /// use tokio::runtime;
851 ///
852 /// let rt = runtime::Builder::new_multi_thread()
853 /// .disable_lifo_slot()
854 /// .build()
855 /// .unwrap();
856 /// ```
857 pub fn disable_lifo_slot(&mut self) -> &mut Self {
858 self.disable_lifo_slot = true;
859 self
860 }
861
862 /// Specifies the random number generation seed to use within all
863 /// threads associated with the runtime being built.
864 ///
865 /// This option is intended to make certain parts of the runtime
866 /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
867 /// [`tokio::select!`] it will ensure that the order that branches are
868 /// polled is deterministic.
869 ///
870 /// In addition to the code specifying `rng_seed` and interacting with
871 /// the runtime, the internals of Tokio and the Rust compiler may affect
872 /// the sequences of random numbers. In order to ensure repeatable
873 /// results, the version of Tokio, the versions of all other
874 /// dependencies that interact with Tokio, and the Rust compiler version
875 /// should also all remain constant.
876 ///
877 /// # Examples
878 ///
879 /// ```
880 /// # use tokio::runtime::{self, RngSeed};
881 /// # pub fn main() {
882 /// let seed = RngSeed::from_bytes(b"place your seed here");
883 /// let rt = runtime::Builder::new_current_thread()
884 /// .rng_seed(seed)
885 /// .build();
886 /// # }
887 /// ```
888 ///
889 /// [`tokio::select!`]: crate::select
890 pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
891 self.seed_generator = RngSeedGenerator::new(seed);
892 self
893 }
894 }
895
896 cfg_metrics! {
897 /// Enables tracking the distribution of task poll times.
898 ///
899 /// Task poll times are not instrumented by default as doing so requires
900 /// calling [`Instant::now()`] twice per task poll, which could add
901 /// measurable overhead. Use the [`Handle::metrics()`] to access the
902 /// metrics data.
903 ///
904 /// The histogram uses fixed bucket sizes. In other words, the histogram
905 /// buckets are not dynamic based on input values. Use the
906 /// `metrics_poll_count_histogram_` builder methods to configure the
907 /// histogram details.
908 ///
909 /// # Examples
910 ///
911 /// ```
912 /// use tokio::runtime;
913 ///
914 /// let rt = runtime::Builder::new_multi_thread()
915 /// .enable_metrics_poll_count_histogram()
916 /// .build()
917 /// .unwrap();
918 /// # // Test default values here
919 /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
920 /// # let m = rt.handle().metrics();
921 /// # assert_eq!(m.poll_count_histogram_num_buckets(), 10);
922 /// # assert_eq!(m.poll_count_histogram_bucket_range(0), us(0)..us(100));
923 /// # assert_eq!(m.poll_count_histogram_bucket_range(1), us(100)..us(200));
924 /// ```
925 ///
926 /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
927 /// [`Instant::now()`]: std::time::Instant::now
928 pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
929 self.metrics_poll_count_histogram_enable = true;
930 self
931 }
932
933 /// Sets the histogram scale for tracking the distribution of task poll
934 /// times.
935 ///
936 /// Tracking the distribution of task poll times can be done using a
937 /// linear or log scale. When using linear scale, each histogram bucket
938 /// will represent the same range of poll times. When using log scale,
939 /// each histogram bucket will cover a range twice as big as the
940 /// previous bucket.
941 ///
942 /// **Default:** linear scale.
943 ///
944 /// # Examples
945 ///
946 /// ```
947 /// use tokio::runtime::{self, HistogramScale};
948 ///
949 /// let rt = runtime::Builder::new_multi_thread()
950 /// .enable_metrics_poll_count_histogram()
951 /// .metrics_poll_count_histogram_scale(HistogramScale::Log)
952 /// .build()
953 /// .unwrap();
954 /// ```
955 pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
956 self.metrics_poll_count_histogram.scale = histogram_scale;
957 self
958 }
959
960 /// Sets the histogram resolution for tracking the distribution of task
961 /// poll times.
962 ///
963 /// The resolution is the histogram's first bucket's range. When using a
964 /// linear histogram scale, each bucket will cover the same range. When
965 /// using a log scale, each bucket will cover a range twice as big as
966 /// the previous bucket. In the log case, the resolution represents the
967 /// smallest bucket range.
968 ///
969 /// Note that, when using log scale, the resolution is rounded up to the
970 /// nearest power of 2 in nanoseconds.
971 ///
972 /// **Default:** 100 microseconds.
973 ///
974 /// # Examples
975 ///
976 /// ```
977 /// use tokio::runtime;
978 /// use std::time::Duration;
979 ///
980 /// let rt = runtime::Builder::new_multi_thread()
981 /// .enable_metrics_poll_count_histogram()
982 /// .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
983 /// .build()
984 /// .unwrap();
985 /// ```
986 pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
987 assert!(resolution > Duration::from_secs(0));
988 // Sanity check the argument and also make the cast below safe.
989 assert!(resolution <= Duration::from_secs(1));
990
991 let resolution = resolution.as_nanos() as u64;
992 self.metrics_poll_count_histogram.resolution = resolution;
993 self
994 }
995
996 /// Sets the number of buckets for the histogram tracking the
997 /// distribution of task poll times.
998 ///
999 /// The last bucket tracks all greater values that fall out of other
1000 /// ranges. So, configuring the histogram using a linear scale,
1001 /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1002 /// polls that take more than 450ms to complete.
1003 ///
1004 /// **Default:** 10
1005 ///
1006 /// # Examples
1007 ///
1008 /// ```
1009 /// use tokio::runtime;
1010 ///
1011 /// let rt = runtime::Builder::new_multi_thread()
1012 /// .enable_metrics_poll_count_histogram()
1013 /// .metrics_poll_count_histogram_buckets(15)
1014 /// .build()
1015 /// .unwrap();
1016 /// ```
1017 pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1018 self.metrics_poll_count_histogram.num_buckets = buckets;
1019 self
1020 }
1021 }
1022
1023 fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1024 use crate::runtime::scheduler::{self, CurrentThread};
1025 use crate::runtime::{runtime::Scheduler, Config};
1026
1027 let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1028
1029 // Blocking pool
1030 let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1031 let blocking_spawner = blocking_pool.spawner().clone();
1032
1033 // Generate a rng seed for this runtime.
1034 let seed_generator_1 = self.seed_generator.next_generator();
1035 let seed_generator_2 = self.seed_generator.next_generator();
1036
1037 // And now put a single-threaded scheduler on top of the timer. When
1038 // there are no futures ready to do something, it'll let the timer or
1039 // the reactor to generate some new stimuli for the futures to continue
1040 // in their life.
1041 let (scheduler, handle) = CurrentThread::new(
1042 driver,
1043 driver_handle,
1044 blocking_spawner,
1045 seed_generator_2,
1046 Config {
1047 before_park: self.before_park.clone(),
1048 after_unpark: self.after_unpark.clone(),
1049 global_queue_interval: self.global_queue_interval,
1050 event_interval: self.event_interval,
1051 #[cfg(tokio_unstable)]
1052 unhandled_panic: self.unhandled_panic.clone(),
1053 disable_lifo_slot: self.disable_lifo_slot,
1054 seed_generator: seed_generator_1,
1055 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1056 },
1057 );
1058
1059 let handle = Handle {
1060 inner: scheduler::Handle::CurrentThread(handle),
1061 };
1062
1063 Ok(Runtime::from_parts(
1064 Scheduler::CurrentThread(scheduler),
1065 handle,
1066 blocking_pool,
1067 ))
1068 }
1069
1070 fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1071 if self.metrics_poll_count_histogram_enable {
1072 Some(self.metrics_poll_count_histogram.clone())
1073 } else {
1074 None
1075 }
1076 }
1077}
1078
1079cfg_io_driver! {
1080 impl Builder {
1081 /// Enables the I/O driver.
1082 ///
1083 /// Doing this enables using net, process, signal, and some I/O types on
1084 /// the runtime.
1085 ///
1086 /// # Examples
1087 ///
1088 /// ```
1089 /// use tokio::runtime;
1090 ///
1091 /// let rt = runtime::Builder::new_multi_thread()
1092 /// .enable_io()
1093 /// .build()
1094 /// .unwrap();
1095 /// ```
1096 pub fn enable_io(&mut self) -> &mut Self {
1097 self.enable_io = true;
1098 self
1099 }
1100
1101 /// Enables the I/O driver and configures the max number of events to be
1102 /// processed per tick.
1103 ///
1104 /// # Examples
1105 ///
1106 /// ```
1107 /// use tokio::runtime;
1108 ///
1109 /// let rt = runtime::Builder::new_current_thread()
1110 /// .enable_io()
1111 /// .max_io_events_per_tick(1024)
1112 /// .build()
1113 /// .unwrap();
1114 /// ```
1115 pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1116 self.nevents = capacity;
1117 self
1118 }
1119 }
1120}
1121
1122cfg_time! {
1123 impl Builder {
1124 /// Enables the time driver.
1125 ///
1126 /// Doing this enables using `tokio::time` on the runtime.
1127 ///
1128 /// # Examples
1129 ///
1130 /// ```
1131 /// use tokio::runtime;
1132 ///
1133 /// let rt = runtime::Builder::new_multi_thread()
1134 /// .enable_time()
1135 /// .build()
1136 /// .unwrap();
1137 /// ```
1138 pub fn enable_time(&mut self) -> &mut Self {
1139 self.enable_time = true;
1140 self
1141 }
1142 }
1143}
1144
1145cfg_test_util! {
1146 impl Builder {
1147 /// Controls if the runtime's clock starts paused or advancing.
1148 ///
1149 /// Pausing time requires the current-thread runtime; construction of
1150 /// the runtime will panic otherwise.
1151 ///
1152 /// # Examples
1153 ///
1154 /// ```
1155 /// use tokio::runtime;
1156 ///
1157 /// let rt = runtime::Builder::new_current_thread()
1158 /// .enable_time()
1159 /// .start_paused(true)
1160 /// .build()
1161 /// .unwrap();
1162 /// ```
1163 pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1164 self.start_paused = start_paused;
1165 self
1166 }
1167 }
1168}
1169
1170cfg_rt_multi_thread! {
1171 impl Builder {
1172 fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1173 use crate::loom::sys::num_cpus;
1174 use crate::runtime::{Config, runtime::Scheduler};
1175 use crate::runtime::scheduler::{self, MultiThread};
1176
1177 let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
1178
1179 let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1180
1181 // Create the blocking pool
1182 let blocking_pool =
1183 blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
1184 let blocking_spawner = blocking_pool.spawner().clone();
1185
1186 // Generate a rng seed for this runtime.
1187 let seed_generator_1 = self.seed_generator.next_generator();
1188 let seed_generator_2 = self.seed_generator.next_generator();
1189
1190 let (scheduler, handle, launch) = MultiThread::new(
1191 core_threads,
1192 driver,
1193 driver_handle,
1194 blocking_spawner,
1195 seed_generator_2,
1196 Config {
1197 before_park: self.before_park.clone(),
1198 after_unpark: self.after_unpark.clone(),
1199 global_queue_interval: self.global_queue_interval,
1200 event_interval: self.event_interval,
1201 #[cfg(tokio_unstable)]
1202 unhandled_panic: self.unhandled_panic.clone(),
1203 disable_lifo_slot: self.disable_lifo_slot,
1204 seed_generator: seed_generator_1,
1205 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1206 },
1207 );
1208
1209 let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1210
1211 // Spawn the thread pool workers
1212 let _enter = handle.enter();
1213 launch.launch();
1214
1215 Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1216 }
1217 }
1218}
1219
1220impl fmt::Debug for Builder {
1221 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1222 fmt&mut DebugStruct<'_, '_>.debug_struct("Builder")
1223 .field("worker_threads", &self.worker_threads)
1224 .field("max_blocking_threads", &self.max_blocking_threads)
1225 .field(
1226 "thread_name",
1227 &"<dyn Fn() -> String + Send + Sync + 'static>",
1228 )
1229 .field("thread_stack_size", &self.thread_stack_size)
1230 .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1231 .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1232 .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1233 .field(name:"after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1234 .finish()
1235 }
1236}
1237