1//! Rayon-core houses the core stable APIs of Rayon.
2//!
3//! These APIs have been mirrored in the Rayon crate and it is recommended to use these from there.
4//!
5//! [`join`] is used to take two closures and potentially run them in parallel.
6//! - It will run in parallel if task B gets stolen before task A can finish.
7//! - It will run sequentially if task A finishes before task B is stolen and can continue on task B.
8//!
9//! [`scope`] creates a scope in which you can run any number of parallel tasks.
10//! These tasks can spawn nested tasks and scopes, but given the nature of work stealing, the order of execution can not be guaranteed.
11//! The scope will exist until all tasks spawned within the scope have been completed.
12//!
13//! [`spawn`] add a task into the 'static' or 'global' scope, or a local scope created by the [`scope()`] function.
14//!
15//! [`ThreadPool`] can be used to create your own thread pools (using [`ThreadPoolBuilder`]) or to customize the global one.
16//! Tasks spawned within the pool (using [`install()`], [`join()`], etc.) will be added to a deque,
17//! where it becomes available for work stealing from other threads in the local threadpool.
18//!
19//! [`join`]: fn.join.html
20//! [`scope`]: fn.scope.html
21//! [`scope()`]: fn.scope.html
22//! [`spawn`]: fn.spawn.html
23//! [`ThreadPool`]: struct.threadpool.html
24//! [`install()`]: struct.ThreadPool.html#method.install
25//! [`spawn()`]: struct.ThreadPool.html#method.spawn
26//! [`join()`]: struct.ThreadPool.html#method.join
27//! [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
28//!
29//! # Global fallback when threading is unsupported
30//!
31//! Rayon uses `std` APIs for threading, but some targets have incomplete implementations that
32//! always return `Unsupported` errors. The WebAssembly `wasm32-unknown-unknown` and `wasm32-wasi`
33//! targets are notable examples of this. Rather than panicking on the unsupported error when
34//! creating the implicit global threadpool, Rayon configures a fallback mode instead.
35//!
36//! This fallback mode mostly functions as if it were using a single-threaded "pool", like setting
37//! `RAYON_NUM_THREADS=1`. For example, `join` will execute its two closures sequentially, since
38//! there is no other thread to share the work. However, since the pool is not running independent
39//! of the main thread, non-blocking calls like `spawn` may not execute at all, unless a lower-
40//! priority call like `broadcast` gives them an opening. The fallback mode does not try to emulate
41//! anything like thread preemption or `async` task switching, but `yield_now` or `yield_local`
42//! can also volunteer execution time.
43//!
44//! Explicit `ThreadPoolBuilder` methods always report their error without any fallback.
45//!
46//! # Restricting multiple versions
47//!
48//! In order to ensure proper coordination between threadpools, and especially
49//! to make sure there's only one global threadpool, `rayon-core` is actively
50//! restricted from building multiple versions of itself into a single target.
51//! You may see a build error like this in violation:
52//!
53//! ```text
54//! error: native library `rayon-core` is being linked to by more
55//! than one package, and can only be linked to by one package
56//! ```
57//!
58//! While we strive to keep `rayon-core` semver-compatible, it's still
59//! possible to arrive at this situation if different crates have overly
60//! restrictive tilde or inequality requirements for `rayon-core`. The
61//! conflicting requirements will need to be resolved before the build will
62//! succeed.
63
64#![deny(missing_debug_implementations)]
65#![deny(missing_docs)]
66#![deny(unreachable_pub)]
67#![warn(rust_2018_idioms)]
68
69use std::any::Any;
70use std::env;
71use std::error::Error;
72use std::fmt;
73use std::io;
74use std::marker::PhantomData;
75use std::str::FromStr;
76use std::thread;
77
78#[macro_use]
79mod private;
80
81mod broadcast;
82mod job;
83mod join;
84mod latch;
85mod registry;
86mod scope;
87mod sleep;
88mod spawn;
89mod thread_pool;
90mod unwind;
91
92mod compile_fail;
93mod test;
94
95pub use self::broadcast::{broadcast, spawn_broadcast, BroadcastContext};
96pub use self::join::{join, join_context};
97pub use self::registry::ThreadBuilder;
98pub use self::scope::{in_place_scope, scope, Scope};
99pub use self::scope::{in_place_scope_fifo, scope_fifo, ScopeFifo};
100pub use self::spawn::{spawn, spawn_fifo};
101pub use self::thread_pool::current_thread_has_pending_tasks;
102pub use self::thread_pool::current_thread_index;
103pub use self::thread_pool::ThreadPool;
104pub use self::thread_pool::{yield_local, yield_now, Yield};
105
106use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};
107
108/// Returns the maximum number of threads that Rayon supports in a single thread-pool.
109///
110/// If a higher thread count is requested by calling `ThreadPoolBuilder::num_threads` or by setting
111/// the `RAYON_NUM_THREADS` environment variable, then it will be reduced to this maximum.
112///
113/// The value may vary between different targets, and is subject to change in new Rayon versions.
114pub fn max_num_threads() -> usize {
115 // We are limited by the bits available in the sleep counter's `AtomicUsize`.
116 crate::sleep::THREADS_MAX
117}
118
119/// Returns the number of threads in the current registry. If this
120/// code is executing within a Rayon thread-pool, then this will be
121/// the number of threads for the thread-pool of the current
122/// thread. Otherwise, it will be the number of threads for the global
123/// thread-pool.
124///
125/// This can be useful when trying to judge how many times to split
126/// parallel work (the parallel iterator traits use this value
127/// internally for this purpose).
128///
129/// # Future compatibility note
130///
131/// Note that unless this thread-pool was created with a
132/// builder that specifies the number of threads, then this
133/// number may vary over time in future versions (see [the
134/// `num_threads()` method for details][snt]).
135///
136/// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
137pub fn current_num_threads() -> usize {
138 crate::registry::Registry::current_num_threads()
139}
140
141/// Error when initializing a thread pool.
142#[derive(Debug)]
143pub struct ThreadPoolBuildError {
144 kind: ErrorKind,
145}
146
147#[derive(Debug)]
148enum ErrorKind {
149 GlobalPoolAlreadyInitialized,
150 CurrentThreadAlreadyInPool,
151 IOError(io::Error),
152}
153
154/// Used to create a new [`ThreadPool`] or to configure the global rayon thread pool.
155/// ## Creating a ThreadPool
156/// The following creates a thread pool with 22 threads.
157///
158/// ```rust
159/// # use rayon_core as rayon;
160/// let pool = rayon::ThreadPoolBuilder::new().num_threads(22).build().unwrap();
161/// ```
162///
163/// To instead configure the global thread pool, use [`build_global()`]:
164///
165/// ```rust
166/// # use rayon_core as rayon;
167/// rayon::ThreadPoolBuilder::new().num_threads(22).build_global().unwrap();
168/// ```
169///
170/// [`ThreadPool`]: struct.ThreadPool.html
171/// [`build_global()`]: struct.ThreadPoolBuilder.html#method.build_global
172pub struct ThreadPoolBuilder<S = DefaultSpawn> {
173 /// The number of threads in the rayon thread pool.
174 /// If zero will use the RAYON_NUM_THREADS environment variable.
175 /// If RAYON_NUM_THREADS is invalid or zero will use the default.
176 num_threads: usize,
177
178 /// The thread we're building *from* will also be part of the pool.
179 use_current_thread: bool,
180
181 /// Custom closure, if any, to handle a panic that we cannot propagate
182 /// anywhere else.
183 panic_handler: Option<Box<PanicHandler>>,
184
185 /// Closure to compute the name of a thread.
186 get_thread_name: Option<Box<dyn FnMut(usize) -> String>>,
187
188 /// The stack size for the created worker threads
189 stack_size: Option<usize>,
190
191 /// Closure invoked on worker thread start.
192 start_handler: Option<Box<StartHandler>>,
193
194 /// Closure invoked on worker thread exit.
195 exit_handler: Option<Box<ExitHandler>>,
196
197 /// Closure invoked to spawn threads.
198 spawn_handler: S,
199
200 /// If false, worker threads will execute spawned jobs in a
201 /// "depth-first" fashion. If true, they will do a "breadth-first"
202 /// fashion. Depth-first is the default.
203 breadth_first: bool,
204}
205
206/// Contains the rayon thread pool configuration. Use [`ThreadPoolBuilder`] instead.
207///
208/// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
209#[deprecated(note = "Use `ThreadPoolBuilder`")]
210#[derive(Default)]
211pub struct Configuration {
212 builder: ThreadPoolBuilder,
213}
214
215/// The type for a panic handling closure. Note that this same closure
216/// may be invoked multiple times in parallel.
217type PanicHandler = dyn Fn(Box<dyn Any + Send>) + Send + Sync;
218
219/// The type for a closure that gets invoked when a thread starts. The
220/// closure is passed the index of the thread on which it is invoked.
221/// Note that this same closure may be invoked multiple times in parallel.
222type StartHandler = dyn Fn(usize) + Send + Sync;
223
224/// The type for a closure that gets invoked when a thread exits. The
225/// closure is passed the index of the thread on which is is invoked.
226/// Note that this same closure may be invoked multiple times in parallel.
227type ExitHandler = dyn Fn(usize) + Send + Sync;
228
229// NB: We can't `#[derive(Default)]` because `S` is left ambiguous.
230impl Default for ThreadPoolBuilder {
231 fn default() -> Self {
232 ThreadPoolBuilder {
233 num_threads: 0,
234 use_current_thread: false,
235 panic_handler: None,
236 get_thread_name: None,
237 stack_size: None,
238 start_handler: None,
239 exit_handler: None,
240 spawn_handler: DefaultSpawn,
241 breadth_first: false,
242 }
243 }
244}
245
246impl ThreadPoolBuilder {
247 /// Creates and returns a valid rayon thread pool builder, but does not initialize it.
248 pub fn new() -> Self {
249 Self::default()
250 }
251}
252
253/// Note: the `S: ThreadSpawn` constraint is an internal implementation detail for the
254/// default spawn and those set by [`spawn_handler`](#method.spawn_handler).
255impl<S> ThreadPoolBuilder<S>
256where
257 S: ThreadSpawn,
258{
259 /// Creates a new `ThreadPool` initialized using this configuration.
260 pub fn build(self) -> Result<ThreadPool, ThreadPoolBuildError> {
261 ThreadPool::build(self)
262 }
263
264 /// Initializes the global thread pool. This initialization is
265 /// **optional**. If you do not call this function, the thread pool
266 /// will be automatically initialized with the default
267 /// configuration. Calling `build_global` is not recommended, except
268 /// in two scenarios:
269 ///
270 /// - You wish to change the default configuration.
271 /// - You are running a benchmark, in which case initializing may
272 /// yield slightly more consistent results, since the worker threads
273 /// will already be ready to go even in the first iteration. But
274 /// this cost is minimal.
275 ///
276 /// Initialization of the global thread pool happens exactly
277 /// once. Once started, the configuration cannot be
278 /// changed. Therefore, if you call `build_global` a second time, it
279 /// will return an error. An `Ok` result indicates that this
280 /// is the first initialization of the thread pool.
281 pub fn build_global(self) -> Result<(), ThreadPoolBuildError> {
282 let registry = registry::init_global_registry(self)?;
283 registry.wait_until_primed();
284 Ok(())
285 }
286}
287
288impl ThreadPoolBuilder {
289 /// Creates a scoped `ThreadPool` initialized using this configuration.
290 ///
291 /// This is a convenience function for building a pool using [`std::thread::scope`]
292 /// to spawn threads in a [`spawn_handler`](#method.spawn_handler).
293 /// The threads in this pool will start by calling `wrapper`, which should
294 /// do initialization and continue by calling `ThreadBuilder::run()`.
295 ///
296 /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html
297 ///
298 /// # Examples
299 ///
300 /// A scoped pool may be useful in combination with scoped thread-local variables.
301 ///
302 /// ```
303 /// # use rayon_core as rayon;
304 ///
305 /// scoped_tls::scoped_thread_local!(static POOL_DATA: Vec<i32>);
306 ///
307 /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
308 /// let pool_data = vec![1, 2, 3];
309 ///
310 /// // We haven't assigned any TLS data yet.
311 /// assert!(!POOL_DATA.is_set());
312 ///
313 /// rayon::ThreadPoolBuilder::new()
314 /// .build_scoped(
315 /// // Borrow `pool_data` in TLS for each thread.
316 /// |thread| POOL_DATA.set(&pool_data, || thread.run()),
317 /// // Do some work that needs the TLS data.
318 /// |pool| pool.install(|| assert!(POOL_DATA.is_set())),
319 /// )?;
320 ///
321 /// // Once we've returned, `pool_data` is no longer borrowed.
322 /// drop(pool_data);
323 /// Ok(())
324 /// }
325 /// ```
326 pub fn build_scoped<W, F, R>(self, wrapper: W, with_pool: F) -> Result<R, ThreadPoolBuildError>
327 where
328 W: Fn(ThreadBuilder) + Sync, // expected to call `run()`
329 F: FnOnce(&ThreadPool) -> R,
330 {
331 std::thread::scope(|scope| {
332 let pool = self
333 .spawn_handler(|thread| {
334 let mut builder = std::thread::Builder::new();
335 if let Some(name) = thread.name() {
336 builder = builder.name(name.to_string());
337 }
338 if let Some(size) = thread.stack_size() {
339 builder = builder.stack_size(size);
340 }
341 builder.spawn_scoped(scope, || wrapper(thread))?;
342 Ok(())
343 })
344 .build()?;
345 Ok(with_pool(&pool))
346 })
347 }
348}
349
350impl<S> ThreadPoolBuilder<S> {
351 /// Sets a custom function for spawning threads.
352 ///
353 /// Note that the threads will not exit until after the pool is dropped. It
354 /// is up to the caller to wait for thread termination if that is important
355 /// for any invariants. For instance, threads created in [`std::thread::scope`]
356 /// will be joined before that scope returns, and this will block indefinitely
357 /// if the pool is leaked. Furthermore, the global thread pool doesn't terminate
358 /// until the entire process exits!
359 ///
360 /// # Examples
361 ///
362 /// A minimal spawn handler just needs to call `run()` from an independent thread.
363 ///
364 /// ```
365 /// # use rayon_core as rayon;
366 /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
367 /// let pool = rayon::ThreadPoolBuilder::new()
368 /// .spawn_handler(|thread| {
369 /// std::thread::spawn(|| thread.run());
370 /// Ok(())
371 /// })
372 /// .build()?;
373 ///
374 /// pool.install(|| println!("Hello from my custom thread!"));
375 /// Ok(())
376 /// }
377 /// ```
378 ///
379 /// The default spawn handler sets the name and stack size if given, and propagates
380 /// any errors from the thread builder.
381 ///
382 /// ```
383 /// # use rayon_core as rayon;
384 /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
385 /// let pool = rayon::ThreadPoolBuilder::new()
386 /// .spawn_handler(|thread| {
387 /// let mut b = std::thread::Builder::new();
388 /// if let Some(name) = thread.name() {
389 /// b = b.name(name.to_owned());
390 /// }
391 /// if let Some(stack_size) = thread.stack_size() {
392 /// b = b.stack_size(stack_size);
393 /// }
394 /// b.spawn(|| thread.run())?;
395 /// Ok(())
396 /// })
397 /// .build()?;
398 ///
399 /// pool.install(|| println!("Hello from my fully custom thread!"));
400 /// Ok(())
401 /// }
402 /// ```
403 ///
404 /// This can also be used for a pool of scoped threads like [`crossbeam::scope`],
405 /// or [`std::thread::scope`] introduced in Rust 1.63, which is encapsulated in
406 /// [`build_scoped`](#method.build_scoped).
407 ///
408 /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html
409 /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html
410 ///
411 /// ```
412 /// # use rayon_core as rayon;
413 /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
414 /// std::thread::scope(|scope| {
415 /// let pool = rayon::ThreadPoolBuilder::new()
416 /// .spawn_handler(|thread| {
417 /// let mut builder = std::thread::Builder::new();
418 /// if let Some(name) = thread.name() {
419 /// builder = builder.name(name.to_string());
420 /// }
421 /// if let Some(size) = thread.stack_size() {
422 /// builder = builder.stack_size(size);
423 /// }
424 /// builder.spawn_scoped(scope, || {
425 /// // Add any scoped initialization here, then run!
426 /// thread.run()
427 /// })?;
428 /// Ok(())
429 /// })
430 /// .build()?;
431 ///
432 /// pool.install(|| println!("Hello from my custom scoped thread!"));
433 /// Ok(())
434 /// })
435 /// }
436 /// ```
437 pub fn spawn_handler<F>(self, spawn: F) -> ThreadPoolBuilder<CustomSpawn<F>>
438 where
439 F: FnMut(ThreadBuilder) -> io::Result<()>,
440 {
441 ThreadPoolBuilder {
442 spawn_handler: CustomSpawn::new(spawn),
443 // ..self
444 num_threads: self.num_threads,
445 use_current_thread: self.use_current_thread,
446 panic_handler: self.panic_handler,
447 get_thread_name: self.get_thread_name,
448 stack_size: self.stack_size,
449 start_handler: self.start_handler,
450 exit_handler: self.exit_handler,
451 breadth_first: self.breadth_first,
452 }
453 }
454
455 /// Returns a reference to the current spawn handler.
456 fn get_spawn_handler(&mut self) -> &mut S {
457 &mut self.spawn_handler
458 }
459
460 /// Get the number of threads that will be used for the thread
461 /// pool. See `num_threads()` for more information.
462 fn get_num_threads(&self) -> usize {
463 if self.num_threads > 0 {
464 self.num_threads
465 } else {
466 let default = || {
467 thread::available_parallelism()
468 .map(|n| n.get())
469 .unwrap_or(1)
470 };
471
472 match env::var("RAYON_NUM_THREADS")
473 .ok()
474 .and_then(|s| usize::from_str(&s).ok())
475 {
476 Some(x @ 1..) => return x,
477 Some(0) => return default(),
478 _ => {}
479 }
480
481 // Support for deprecated `RAYON_RS_NUM_CPUS`.
482 match env::var("RAYON_RS_NUM_CPUS")
483 .ok()
484 .and_then(|s| usize::from_str(&s).ok())
485 {
486 Some(x @ 1..) => x,
487 _ => default(),
488 }
489 }
490 }
491
492 /// Get the thread name for the thread with the given index.
493 fn get_thread_name(&mut self, index: usize) -> Option<String> {
494 let f = self.get_thread_name.as_mut()?;
495 Some(f(index))
496 }
497
498 /// Sets a closure which takes a thread index and returns
499 /// the thread's name.
500 pub fn thread_name<F>(mut self, closure: F) -> Self
501 where
502 F: FnMut(usize) -> String + 'static,
503 {
504 self.get_thread_name = Some(Box::new(closure));
505 self
506 }
507
508 /// Sets the number of threads to be used in the rayon threadpool.
509 ///
510 /// If you specify a non-zero number of threads using this
511 /// function, then the resulting thread-pools are guaranteed to
512 /// start at most this number of threads.
513 ///
514 /// If `num_threads` is 0, or you do not call this function, then
515 /// the Rayon runtime will select the number of threads
516 /// automatically. At present, this is based on the
517 /// `RAYON_NUM_THREADS` environment variable (if set),
518 /// or the number of logical CPUs (otherwise).
519 /// In the future, however, the default behavior may
520 /// change to dynamically add or remove threads as needed.
521 ///
522 /// **Future compatibility warning:** Given the default behavior
523 /// may change in the future, if you wish to rely on a fixed
524 /// number of threads, you should use this function to specify
525 /// that number. To reproduce the current default behavior, you
526 /// may wish to use [`std::thread::available_parallelism`]
527 /// to query the number of CPUs dynamically.
528 ///
529 /// **Old environment variable:** `RAYON_NUM_THREADS` is a one-to-one
530 /// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment
531 /// variable. If both variables are specified, `RAYON_NUM_THREADS` will
532 /// be preferred.
533 pub fn num_threads(mut self, num_threads: usize) -> Self {
534 self.num_threads = num_threads;
535 self
536 }
537
538 /// Use the current thread as one of the threads in the pool.
539 ///
540 /// The current thread is guaranteed to be at index 0, and since the thread is not managed by
541 /// rayon, the spawn and exit handlers do not run for that thread.
542 ///
543 /// Note that the current thread won't run the main work-stealing loop, so jobs spawned into
544 /// the thread-pool will generally not be picked up automatically by this thread unless you
545 /// yield to rayon in some way, like via [`yield_now()`], [`yield_local()`], or [`scope()`].
546 ///
547 /// # Local thread-pools
548 ///
549 /// Using this in a local thread-pool means the registry will be leaked. In future versions
550 /// there might be a way of cleaning up the current-thread state.
551 pub fn use_current_thread(mut self) -> Self {
552 self.use_current_thread = true;
553 self
554 }
555
556 /// Returns a copy of the current panic handler.
557 fn take_panic_handler(&mut self) -> Option<Box<PanicHandler>> {
558 self.panic_handler.take()
559 }
560
561 /// Normally, whenever Rayon catches a panic, it tries to
562 /// propagate it to someplace sensible, to try and reflect the
563 /// semantics of sequential execution. But in some cases,
564 /// particularly with the `spawn()` APIs, there is no
565 /// obvious place where we should propagate the panic to.
566 /// In that case, this panic handler is invoked.
567 ///
568 /// If no panic handler is set, the default is to abort the
569 /// process, under the principle that panics should not go
570 /// unobserved.
571 ///
572 /// If the panic handler itself panics, this will abort the
573 /// process. To prevent this, wrap the body of your panic handler
574 /// in a call to `std::panic::catch_unwind()`.
575 pub fn panic_handler<H>(mut self, panic_handler: H) -> Self
576 where
577 H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,
578 {
579 self.panic_handler = Some(Box::new(panic_handler));
580 self
581 }
582
583 /// Get the stack size of the worker threads
584 fn get_stack_size(&self) -> Option<usize> {
585 self.stack_size
586 }
587
588 /// Sets the stack size of the worker threads
589 pub fn stack_size(mut self, stack_size: usize) -> Self {
590 self.stack_size = Some(stack_size);
591 self
592 }
593
594 /// **(DEPRECATED)** Suggest to worker threads that they execute
595 /// spawned jobs in a "breadth-first" fashion.
596 ///
597 /// Typically, when a worker thread is idle or blocked, it will
598 /// attempt to execute the job from the *top* of its local deque of
599 /// work (i.e., the job most recently spawned). If this flag is set
600 /// to true, however, workers will prefer to execute in a
601 /// *breadth-first* fashion -- that is, they will search for jobs at
602 /// the *bottom* of their local deque. (At present, workers *always*
603 /// steal from the bottom of other workers' deques, regardless of
604 /// the setting of this flag.)
605 ///
606 /// If you think of the tasks as a tree, where a parent task
607 /// spawns its children in the tree, then this flag loosely
608 /// corresponds to doing a breadth-first traversal of the tree,
609 /// whereas the default would be to do a depth-first traversal.
610 ///
611 /// **Note that this is an "execution hint".** Rayon's task
612 /// execution is highly dynamic and the precise order in which
613 /// independent tasks are executed is not intended to be
614 /// guaranteed.
615 ///
616 /// This `breadth_first()` method is now deprecated per [RFC #1],
617 /// and in the future its effect may be removed. Consider using
618 /// [`scope_fifo()`] for a similar effect.
619 ///
620 /// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
621 /// [`scope_fifo()`]: fn.scope_fifo.html
622 #[deprecated(note = "use `scope_fifo` and `spawn_fifo` for similar effect")]
623 pub fn breadth_first(mut self) -> Self {
624 self.breadth_first = true;
625 self
626 }
627
628 fn get_breadth_first(&self) -> bool {
629 self.breadth_first
630 }
631
632 /// Takes the current thread start callback, leaving `None`.
633 fn take_start_handler(&mut self) -> Option<Box<StartHandler>> {
634 self.start_handler.take()
635 }
636
637 /// Sets a callback to be invoked on thread start.
638 ///
639 /// The closure is passed the index of the thread on which it is invoked.
640 /// Note that this same closure may be invoked multiple times in parallel.
641 /// If this closure panics, the panic will be passed to the panic handler.
642 /// If that handler returns, then startup will continue normally.
643 pub fn start_handler<H>(mut self, start_handler: H) -> Self
644 where
645 H: Fn(usize) + Send + Sync + 'static,
646 {
647 self.start_handler = Some(Box::new(start_handler));
648 self
649 }
650
651 /// Returns a current thread exit callback, leaving `None`.
652 fn take_exit_handler(&mut self) -> Option<Box<ExitHandler>> {
653 self.exit_handler.take()
654 }
655
656 /// Sets a callback to be invoked on thread exit.
657 ///
658 /// The closure is passed the index of the thread on which it is invoked.
659 /// Note that this same closure may be invoked multiple times in parallel.
660 /// If this closure panics, the panic will be passed to the panic handler.
661 /// If that handler returns, then the thread will exit normally.
662 pub fn exit_handler<H>(mut self, exit_handler: H) -> Self
663 where
664 H: Fn(usize) + Send + Sync + 'static,
665 {
666 self.exit_handler = Some(Box::new(exit_handler));
667 self
668 }
669}
670
671#[allow(deprecated)]
672impl Configuration {
673 /// Creates and return a valid rayon thread pool configuration, but does not initialize it.
674 pub fn new() -> Configuration {
675 Configuration {
676 builder: ThreadPoolBuilder::new(),
677 }
678 }
679
680 /// Deprecated in favor of `ThreadPoolBuilder::build`.
681 pub fn build(self) -> Result<ThreadPool, Box<dyn Error + 'static>> {
682 self.builder.build().map_err(Box::from)
683 }
684
685 /// Deprecated in favor of `ThreadPoolBuilder::thread_name`.
686 pub fn thread_name<F>(mut self, closure: F) -> Self
687 where
688 F: FnMut(usize) -> String + 'static,
689 {
690 self.builder = self.builder.thread_name(closure);
691 self
692 }
693
694 /// Deprecated in favor of `ThreadPoolBuilder::num_threads`.
695 pub fn num_threads(mut self, num_threads: usize) -> Configuration {
696 self.builder = self.builder.num_threads(num_threads);
697 self
698 }
699
700 /// Deprecated in favor of `ThreadPoolBuilder::panic_handler`.
701 pub fn panic_handler<H>(mut self, panic_handler: H) -> Configuration
702 where
703 H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,
704 {
705 self.builder = self.builder.panic_handler(panic_handler);
706 self
707 }
708
709 /// Deprecated in favor of `ThreadPoolBuilder::stack_size`.
710 pub fn stack_size(mut self, stack_size: usize) -> Self {
711 self.builder = self.builder.stack_size(stack_size);
712 self
713 }
714
715 /// Deprecated in favor of `ThreadPoolBuilder::breadth_first`.
716 pub fn breadth_first(mut self) -> Self {
717 self.builder = self.builder.breadth_first();
718 self
719 }
720
721 /// Deprecated in favor of `ThreadPoolBuilder::start_handler`.
722 pub fn start_handler<H>(mut self, start_handler: H) -> Configuration
723 where
724 H: Fn(usize) + Send + Sync + 'static,
725 {
726 self.builder = self.builder.start_handler(start_handler);
727 self
728 }
729
730 /// Deprecated in favor of `ThreadPoolBuilder::exit_handler`.
731 pub fn exit_handler<H>(mut self, exit_handler: H) -> Configuration
732 where
733 H: Fn(usize) + Send + Sync + 'static,
734 {
735 self.builder = self.builder.exit_handler(exit_handler);
736 self
737 }
738
739 /// Returns a ThreadPoolBuilder with identical parameters.
740 fn into_builder(self) -> ThreadPoolBuilder {
741 self.builder
742 }
743}
744
745impl ThreadPoolBuildError {
746 fn new(kind: ErrorKind) -> ThreadPoolBuildError {
747 ThreadPoolBuildError { kind }
748 }
749
750 fn is_unsupported(&self) -> bool {
751 matches!(&self.kind, ErrorKind::IOError(e) if e.kind() == io::ErrorKind::Unsupported)
752 }
753}
754
755const GLOBAL_POOL_ALREADY_INITIALIZED: &str =
756 "The global thread pool has already been initialized.";
757
758const CURRENT_THREAD_ALREADY_IN_POOL: &str =
759 "The current thread is already part of another thread pool.";
760
761impl Error for ThreadPoolBuildError {
762 #[allow(deprecated)]
763 fn description(&self) -> &str {
764 match self.kind {
765 ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED,
766 ErrorKind::CurrentThreadAlreadyInPool => CURRENT_THREAD_ALREADY_IN_POOL,
767 ErrorKind::IOError(ref e) => e.description(),
768 }
769 }
770
771 fn source(&self) -> Option<&(dyn Error + 'static)> {
772 match &self.kind {
773 ErrorKind::GlobalPoolAlreadyInitialized | ErrorKind::CurrentThreadAlreadyInPool => None,
774 ErrorKind::IOError(e) => Some(e),
775 }
776 }
777}
778
779impl fmt::Display for ThreadPoolBuildError {
780 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
781 match &self.kind {
782 ErrorKind::CurrentThreadAlreadyInPool => CURRENT_THREAD_ALREADY_IN_POOL.fmt(f),
783 ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED.fmt(f),
784 ErrorKind::IOError(e) => e.fmt(f),
785 }
786 }
787}
788
789/// Deprecated in favor of `ThreadPoolBuilder::build_global`.
790#[deprecated(note = "use `ThreadPoolBuilder::build_global`")]
791#[allow(deprecated)]
792pub fn initialize(config: Configuration) -> Result<(), Box<dyn Error>> {
793 config.into_builder().build_global().map_err(Box::from)
794}
795
796impl<S> fmt::Debug for ThreadPoolBuilder<S> {
797 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
798 let ThreadPoolBuilder {
799 ref num_threads,
800 ref use_current_thread,
801 ref get_thread_name,
802 ref panic_handler,
803 ref stack_size,
804 ref start_handler,
805 ref exit_handler,
806 spawn_handler: _,
807 ref breadth_first,
808 } = *self;
809
810 // Just print `Some(<closure>)` or `None` to the debug
811 // output.
812 struct ClosurePlaceholder;
813 impl fmt::Debug for ClosurePlaceholder {
814 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
815 f.write_str("<closure>")
816 }
817 }
818 let get_thread_name = get_thread_name.as_ref().map(|_| ClosurePlaceholder);
819 let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder);
820 let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder);
821 let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder);
822
823 f.debug_struct("ThreadPoolBuilder")
824 .field("num_threads", num_threads)
825 .field("use_current_thread", use_current_thread)
826 .field("get_thread_name", &get_thread_name)
827 .field("panic_handler", &panic_handler)
828 .field("stack_size", &stack_size)
829 .field("start_handler", &start_handler)
830 .field("exit_handler", &exit_handler)
831 .field("breadth_first", &breadth_first)
832 .finish()
833 }
834}
835
836#[allow(deprecated)]
837impl fmt::Debug for Configuration {
838 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
839 self.builder.fmt(f)
840 }
841}
842
843/// Provides the calling context to a closure called by `join_context`.
844#[derive(Debug)]
845pub struct FnContext {
846 migrated: bool,
847
848 /// disable `Send` and `Sync`, just for a little future-proofing.
849 _marker: PhantomData<*mut ()>,
850}
851
852impl FnContext {
853 #[inline]
854 fn new(migrated: bool) -> Self {
855 FnContext {
856 migrated,
857 _marker: PhantomData,
858 }
859 }
860}
861
862impl FnContext {
863 /// Returns `true` if the closure was called from a different thread
864 /// than it was provided from.
865 #[inline]
866 pub fn migrated(&self) -> bool {
867 self.migrated
868 }
869}
870