1 | //! Rayon-core houses the core stable APIs of Rayon. |
2 | //! |
3 | //! These APIs have been mirrored in the Rayon crate and it is recommended to use these from there. |
4 | //! |
5 | //! [`join`] is used to take two closures and potentially run them in parallel. |
6 | //! - It will run in parallel if task B gets stolen before task A can finish. |
7 | //! - It will run sequentially if task A finishes before task B is stolen and can continue on task B. |
8 | //! |
9 | //! [`scope`] creates a scope in which you can run any number of parallel tasks. |
10 | //! These tasks can spawn nested tasks and scopes, but given the nature of work stealing, the order of execution can not be guaranteed. |
11 | //! The scope will exist until all tasks spawned within the scope have been completed. |
12 | //! |
13 | //! [`spawn`] add a task into the 'static' or 'global' scope, or a local scope created by the [`scope()`] function. |
14 | //! |
15 | //! [`ThreadPool`] can be used to create your own thread pools (using [`ThreadPoolBuilder`]) or to customize the global one. |
16 | //! Tasks spawned within the pool (using [`install()`], [`join()`], etc.) will be added to a deque, |
17 | //! where it becomes available for work stealing from other threads in the local threadpool. |
18 | //! |
19 | //! [`join`]: fn.join.html |
20 | //! [`scope`]: fn.scope.html |
21 | //! [`scope()`]: fn.scope.html |
22 | //! [`spawn`]: fn.spawn.html |
23 | //! [`ThreadPool`]: struct.threadpool.html |
24 | //! [`install()`]: struct.ThreadPool.html#method.install |
25 | //! [`spawn()`]: struct.ThreadPool.html#method.spawn |
26 | //! [`join()`]: struct.ThreadPool.html#method.join |
27 | //! [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html |
28 | //! |
29 | //! # Global fallback when threading is unsupported |
30 | //! |
31 | //! Rayon uses `std` APIs for threading, but some targets have incomplete implementations that |
32 | //! always return `Unsupported` errors. The WebAssembly `wasm32-unknown-unknown` and `wasm32-wasi` |
33 | //! targets are notable examples of this. Rather than panicking on the unsupported error when |
34 | //! creating the implicit global threadpool, Rayon configures a fallback mode instead. |
35 | //! |
36 | //! This fallback mode mostly functions as if it were using a single-threaded "pool", like setting |
37 | //! `RAYON_NUM_THREADS=1`. For example, `join` will execute its two closures sequentially, since |
38 | //! there is no other thread to share the work. However, since the pool is not running independent |
39 | //! of the main thread, non-blocking calls like `spawn` may not execute at all, unless a lower- |
40 | //! priority call like `broadcast` gives them an opening. The fallback mode does not try to emulate |
41 | //! anything like thread preemption or `async` task switching, but `yield_now` or `yield_local` |
42 | //! can also volunteer execution time. |
43 | //! |
44 | //! Explicit `ThreadPoolBuilder` methods always report their error without any fallback. |
45 | //! |
46 | //! # Restricting multiple versions |
47 | //! |
48 | //! In order to ensure proper coordination between threadpools, and especially |
49 | //! to make sure there's only one global threadpool, `rayon-core` is actively |
50 | //! restricted from building multiple versions of itself into a single target. |
51 | //! You may see a build error like this in violation: |
52 | //! |
53 | //! ```text |
54 | //! error: native library `rayon-core` is being linked to by more |
55 | //! than one package, and can only be linked to by one package |
56 | //! ``` |
57 | //! |
58 | //! While we strive to keep `rayon-core` semver-compatible, it's still |
59 | //! possible to arrive at this situation if different crates have overly |
60 | //! restrictive tilde or inequality requirements for `rayon-core`. The |
61 | //! conflicting requirements will need to be resolved before the build will |
62 | //! succeed. |
63 | |
64 | #![deny (missing_debug_implementations)] |
65 | #![deny (missing_docs)] |
66 | #![deny (unreachable_pub)] |
67 | #![warn (rust_2018_idioms)] |
68 | |
69 | use std::any::Any; |
70 | use std::env; |
71 | use std::error::Error; |
72 | use std::fmt; |
73 | use std::io; |
74 | use std::marker::PhantomData; |
75 | use std::str::FromStr; |
76 | use std::thread; |
77 | |
78 | #[macro_use ] |
79 | mod private; |
80 | |
81 | mod broadcast; |
82 | mod job; |
83 | mod join; |
84 | mod latch; |
85 | mod registry; |
86 | mod scope; |
87 | mod sleep; |
88 | mod spawn; |
89 | mod thread_pool; |
90 | mod unwind; |
91 | |
92 | mod compile_fail; |
93 | mod test; |
94 | |
95 | pub use self::broadcast::{broadcast, spawn_broadcast, BroadcastContext}; |
96 | pub use self::join::{join, join_context}; |
97 | pub use self::registry::ThreadBuilder; |
98 | pub use self::scope::{in_place_scope, scope, Scope}; |
99 | pub use self::scope::{in_place_scope_fifo, scope_fifo, ScopeFifo}; |
100 | pub use self::spawn::{spawn, spawn_fifo}; |
101 | pub use self::thread_pool::current_thread_has_pending_tasks; |
102 | pub use self::thread_pool::current_thread_index; |
103 | pub use self::thread_pool::ThreadPool; |
104 | pub use self::thread_pool::{yield_local, yield_now, Yield}; |
105 | |
106 | #[cfg (not(feature = "web_spin_lock" ))] |
107 | use std::sync; |
108 | |
109 | #[cfg (feature = "web_spin_lock" )] |
110 | use wasm_sync as sync; |
111 | |
112 | use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn}; |
113 | |
114 | /// Returns the maximum number of threads that Rayon supports in a single thread-pool. |
115 | /// |
116 | /// If a higher thread count is requested by calling `ThreadPoolBuilder::num_threads` or by setting |
117 | /// the `RAYON_NUM_THREADS` environment variable, then it will be reduced to this maximum. |
118 | /// |
119 | /// The value may vary between different targets, and is subject to change in new Rayon versions. |
120 | pub fn max_num_threads() -> usize { |
121 | // We are limited by the bits available in the sleep counter's `AtomicUsize`. |
122 | crate::sleep::THREADS_MAX |
123 | } |
124 | |
125 | /// Returns the number of threads in the current registry. If this |
126 | /// code is executing within a Rayon thread-pool, then this will be |
127 | /// the number of threads for the thread-pool of the current |
128 | /// thread. Otherwise, it will be the number of threads for the global |
129 | /// thread-pool. |
130 | /// |
131 | /// This can be useful when trying to judge how many times to split |
132 | /// parallel work (the parallel iterator traits use this value |
133 | /// internally for this purpose). |
134 | /// |
135 | /// # Future compatibility note |
136 | /// |
137 | /// Note that unless this thread-pool was created with a |
138 | /// builder that specifies the number of threads, then this |
139 | /// number may vary over time in future versions (see [the |
140 | /// `num_threads()` method for details][snt]). |
141 | /// |
142 | /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads |
143 | pub fn current_num_threads() -> usize { |
144 | crate::registry::Registry::current_num_threads() |
145 | } |
146 | |
147 | /// Error when initializing a thread pool. |
148 | #[derive (Debug)] |
149 | pub struct ThreadPoolBuildError { |
150 | kind: ErrorKind, |
151 | } |
152 | |
153 | #[derive (Debug)] |
154 | enum ErrorKind { |
155 | GlobalPoolAlreadyInitialized, |
156 | CurrentThreadAlreadyInPool, |
157 | IOError(io::Error), |
158 | } |
159 | |
160 | /// Used to create a new [`ThreadPool`] or to configure the global rayon thread pool. |
161 | /// ## Creating a ThreadPool |
162 | /// The following creates a thread pool with 22 threads. |
163 | /// |
164 | /// ```rust |
165 | /// # use rayon_core as rayon; |
166 | /// let pool = rayon::ThreadPoolBuilder::new().num_threads(22).build().unwrap(); |
167 | /// ``` |
168 | /// |
169 | /// To instead configure the global thread pool, use [`build_global()`]: |
170 | /// |
171 | /// ```rust |
172 | /// # use rayon_core as rayon; |
173 | /// rayon::ThreadPoolBuilder::new().num_threads(22).build_global().unwrap(); |
174 | /// ``` |
175 | /// |
176 | /// [`ThreadPool`]: struct.ThreadPool.html |
177 | /// [`build_global()`]: struct.ThreadPoolBuilder.html#method.build_global |
178 | pub struct ThreadPoolBuilder<S = DefaultSpawn> { |
179 | /// The number of threads in the rayon thread pool. |
180 | /// If zero will use the RAYON_NUM_THREADS environment variable. |
181 | /// If RAYON_NUM_THREADS is invalid or zero will use the default. |
182 | num_threads: usize, |
183 | |
184 | /// The thread we're building *from* will also be part of the pool. |
185 | use_current_thread: bool, |
186 | |
187 | /// Custom closure, if any, to handle a panic that we cannot propagate |
188 | /// anywhere else. |
189 | panic_handler: Option<Box<PanicHandler>>, |
190 | |
191 | /// Closure to compute the name of a thread. |
192 | get_thread_name: Option<Box<dyn FnMut(usize) -> String>>, |
193 | |
194 | /// The stack size for the created worker threads |
195 | stack_size: Option<usize>, |
196 | |
197 | /// Closure invoked on worker thread start. |
198 | start_handler: Option<Box<StartHandler>>, |
199 | |
200 | /// Closure invoked on worker thread exit. |
201 | exit_handler: Option<Box<ExitHandler>>, |
202 | |
203 | /// Closure invoked to spawn threads. |
204 | spawn_handler: S, |
205 | |
206 | /// If false, worker threads will execute spawned jobs in a |
207 | /// "depth-first" fashion. If true, they will do a "breadth-first" |
208 | /// fashion. Depth-first is the default. |
209 | breadth_first: bool, |
210 | } |
211 | |
212 | /// Contains the rayon thread pool configuration. Use [`ThreadPoolBuilder`] instead. |
213 | /// |
214 | /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html |
215 | #[deprecated (note = "Use `ThreadPoolBuilder`" )] |
216 | #[derive (Default)] |
217 | pub struct Configuration { |
218 | builder: ThreadPoolBuilder, |
219 | } |
220 | |
221 | /// The type for a panic handling closure. Note that this same closure |
222 | /// may be invoked multiple times in parallel. |
223 | type PanicHandler = dyn Fn(Box<dyn Any + Send>) + Send + Sync; |
224 | |
225 | /// The type for a closure that gets invoked when a thread starts. The |
226 | /// closure is passed the index of the thread on which it is invoked. |
227 | /// Note that this same closure may be invoked multiple times in parallel. |
228 | type StartHandler = dyn Fn(usize) + Send + Sync; |
229 | |
230 | /// The type for a closure that gets invoked when a thread exits. The |
231 | /// closure is passed the index of the thread on which is is invoked. |
232 | /// Note that this same closure may be invoked multiple times in parallel. |
233 | type ExitHandler = dyn Fn(usize) + Send + Sync; |
234 | |
235 | // NB: We can't `#[derive(Default)]` because `S` is left ambiguous. |
236 | impl Default for ThreadPoolBuilder { |
237 | fn default() -> Self { |
238 | ThreadPoolBuilder { |
239 | num_threads: 0, |
240 | use_current_thread: false, |
241 | panic_handler: None, |
242 | get_thread_name: None, |
243 | stack_size: None, |
244 | start_handler: None, |
245 | exit_handler: None, |
246 | spawn_handler: DefaultSpawn, |
247 | breadth_first: false, |
248 | } |
249 | } |
250 | } |
251 | |
252 | impl ThreadPoolBuilder { |
253 | /// Creates and returns a valid rayon thread pool builder, but does not initialize it. |
254 | pub fn new() -> Self { |
255 | Self::default() |
256 | } |
257 | } |
258 | |
259 | /// Note: the `S: ThreadSpawn` constraint is an internal implementation detail for the |
260 | /// default spawn and those set by [`spawn_handler`](#method.spawn_handler). |
261 | impl<S> ThreadPoolBuilder<S> |
262 | where |
263 | S: ThreadSpawn, |
264 | { |
265 | /// Creates a new `ThreadPool` initialized using this configuration. |
266 | pub fn build(self) -> Result<ThreadPool, ThreadPoolBuildError> { |
267 | ThreadPool::build(self) |
268 | } |
269 | |
270 | /// Initializes the global thread pool. This initialization is |
271 | /// **optional**. If you do not call this function, the thread pool |
272 | /// will be automatically initialized with the default |
273 | /// configuration. Calling `build_global` is not recommended, except |
274 | /// in two scenarios: |
275 | /// |
276 | /// - You wish to change the default configuration. |
277 | /// - You are running a benchmark, in which case initializing may |
278 | /// yield slightly more consistent results, since the worker threads |
279 | /// will already be ready to go even in the first iteration. But |
280 | /// this cost is minimal. |
281 | /// |
282 | /// Initialization of the global thread pool happens exactly |
283 | /// once. Once started, the configuration cannot be |
284 | /// changed. Therefore, if you call `build_global` a second time, it |
285 | /// will return an error. An `Ok` result indicates that this |
286 | /// is the first initialization of the thread pool. |
287 | pub fn build_global(self) -> Result<(), ThreadPoolBuildError> { |
288 | let registry = registry::init_global_registry(self)?; |
289 | registry.wait_until_primed(); |
290 | Ok(()) |
291 | } |
292 | } |
293 | |
294 | impl ThreadPoolBuilder { |
295 | /// Creates a scoped `ThreadPool` initialized using this configuration. |
296 | /// |
297 | /// This is a convenience function for building a pool using [`std::thread::scope`] |
298 | /// to spawn threads in a [`spawn_handler`](#method.spawn_handler). |
299 | /// The threads in this pool will start by calling `wrapper`, which should |
300 | /// do initialization and continue by calling `ThreadBuilder::run()`. |
301 | /// |
302 | /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html |
303 | /// |
304 | /// # Examples |
305 | /// |
306 | /// A scoped pool may be useful in combination with scoped thread-local variables. |
307 | /// |
308 | /// ``` |
309 | /// # use rayon_core as rayon; |
310 | /// |
311 | /// scoped_tls::scoped_thread_local!(static POOL_DATA: Vec<i32>); |
312 | /// |
313 | /// fn main() -> Result<(), rayon::ThreadPoolBuildError> { |
314 | /// let pool_data = vec![1, 2, 3]; |
315 | /// |
316 | /// // We haven't assigned any TLS data yet. |
317 | /// assert!(!POOL_DATA.is_set()); |
318 | /// |
319 | /// rayon::ThreadPoolBuilder::new() |
320 | /// .build_scoped( |
321 | /// // Borrow `pool_data` in TLS for each thread. |
322 | /// |thread| POOL_DATA.set(&pool_data, || thread.run()), |
323 | /// // Do some work that needs the TLS data. |
324 | /// |pool| pool.install(|| assert!(POOL_DATA.is_set())), |
325 | /// )?; |
326 | /// |
327 | /// // Once we've returned, `pool_data` is no longer borrowed. |
328 | /// drop(pool_data); |
329 | /// Ok(()) |
330 | /// } |
331 | /// ``` |
332 | pub fn build_scoped<W, F, R>(self, wrapper: W, with_pool: F) -> Result<R, ThreadPoolBuildError> |
333 | where |
334 | W: Fn(ThreadBuilder) + Sync, // expected to call `run()` |
335 | F: FnOnce(&ThreadPool) -> R, |
336 | { |
337 | std::thread::scope(|scope| { |
338 | let pool = self |
339 | .spawn_handler(|thread| { |
340 | let mut builder = std::thread::Builder::new(); |
341 | if let Some(name) = thread.name() { |
342 | builder = builder.name(name.to_string()); |
343 | } |
344 | if let Some(size) = thread.stack_size() { |
345 | builder = builder.stack_size(size); |
346 | } |
347 | builder.spawn_scoped(scope, || wrapper(thread))?; |
348 | Ok(()) |
349 | }) |
350 | .build()?; |
351 | Ok(with_pool(&pool)) |
352 | }) |
353 | } |
354 | } |
355 | |
356 | impl<S> ThreadPoolBuilder<S> { |
357 | /// Sets a custom function for spawning threads. |
358 | /// |
359 | /// Note that the threads will not exit until after the pool is dropped. It |
360 | /// is up to the caller to wait for thread termination if that is important |
361 | /// for any invariants. For instance, threads created in [`std::thread::scope`] |
362 | /// will be joined before that scope returns, and this will block indefinitely |
363 | /// if the pool is leaked. Furthermore, the global thread pool doesn't terminate |
364 | /// until the entire process exits! |
365 | /// |
366 | /// # Examples |
367 | /// |
368 | /// A minimal spawn handler just needs to call `run()` from an independent thread. |
369 | /// |
370 | /// ``` |
371 | /// # use rayon_core as rayon; |
372 | /// fn main() -> Result<(), rayon::ThreadPoolBuildError> { |
373 | /// let pool = rayon::ThreadPoolBuilder::new() |
374 | /// .spawn_handler(|thread| { |
375 | /// std::thread::spawn(|| thread.run()); |
376 | /// Ok(()) |
377 | /// }) |
378 | /// .build()?; |
379 | /// |
380 | /// pool.install(|| println!("Hello from my custom thread!" )); |
381 | /// Ok(()) |
382 | /// } |
383 | /// ``` |
384 | /// |
385 | /// The default spawn handler sets the name and stack size if given, and propagates |
386 | /// any errors from the thread builder. |
387 | /// |
388 | /// ``` |
389 | /// # use rayon_core as rayon; |
390 | /// fn main() -> Result<(), rayon::ThreadPoolBuildError> { |
391 | /// let pool = rayon::ThreadPoolBuilder::new() |
392 | /// .spawn_handler(|thread| { |
393 | /// let mut b = std::thread::Builder::new(); |
394 | /// if let Some(name) = thread.name() { |
395 | /// b = b.name(name.to_owned()); |
396 | /// } |
397 | /// if let Some(stack_size) = thread.stack_size() { |
398 | /// b = b.stack_size(stack_size); |
399 | /// } |
400 | /// b.spawn(|| thread.run())?; |
401 | /// Ok(()) |
402 | /// }) |
403 | /// .build()?; |
404 | /// |
405 | /// pool.install(|| println!("Hello from my fully custom thread!" )); |
406 | /// Ok(()) |
407 | /// } |
408 | /// ``` |
409 | /// |
410 | /// This can also be used for a pool of scoped threads like [`crossbeam::scope`], |
411 | /// or [`std::thread::scope`] introduced in Rust 1.63, which is encapsulated in |
412 | /// [`build_scoped`](#method.build_scoped). |
413 | /// |
414 | /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html |
415 | /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html |
416 | /// |
417 | /// ``` |
418 | /// # use rayon_core as rayon; |
419 | /// fn main() -> Result<(), rayon::ThreadPoolBuildError> { |
420 | /// std::thread::scope(|scope| { |
421 | /// let pool = rayon::ThreadPoolBuilder::new() |
422 | /// .spawn_handler(|thread| { |
423 | /// let mut builder = std::thread::Builder::new(); |
424 | /// if let Some(name) = thread.name() { |
425 | /// builder = builder.name(name.to_string()); |
426 | /// } |
427 | /// if let Some(size) = thread.stack_size() { |
428 | /// builder = builder.stack_size(size); |
429 | /// } |
430 | /// builder.spawn_scoped(scope, || { |
431 | /// // Add any scoped initialization here, then run! |
432 | /// thread.run() |
433 | /// })?; |
434 | /// Ok(()) |
435 | /// }) |
436 | /// .build()?; |
437 | /// |
438 | /// pool.install(|| println!("Hello from my custom scoped thread!" )); |
439 | /// Ok(()) |
440 | /// }) |
441 | /// } |
442 | /// ``` |
443 | pub fn spawn_handler<F>(self, spawn: F) -> ThreadPoolBuilder<CustomSpawn<F>> |
444 | where |
445 | F: FnMut(ThreadBuilder) -> io::Result<()>, |
446 | { |
447 | ThreadPoolBuilder { |
448 | spawn_handler: CustomSpawn::new(spawn), |
449 | // ..self |
450 | num_threads: self.num_threads, |
451 | use_current_thread: self.use_current_thread, |
452 | panic_handler: self.panic_handler, |
453 | get_thread_name: self.get_thread_name, |
454 | stack_size: self.stack_size, |
455 | start_handler: self.start_handler, |
456 | exit_handler: self.exit_handler, |
457 | breadth_first: self.breadth_first, |
458 | } |
459 | } |
460 | |
461 | /// Returns a reference to the current spawn handler. |
462 | fn get_spawn_handler(&mut self) -> &mut S { |
463 | &mut self.spawn_handler |
464 | } |
465 | |
466 | /// Get the number of threads that will be used for the thread |
467 | /// pool. See `num_threads()` for more information. |
468 | fn get_num_threads(&self) -> usize { |
469 | if self.num_threads > 0 { |
470 | self.num_threads |
471 | } else { |
472 | let default = || { |
473 | thread::available_parallelism() |
474 | .map(|n| n.get()) |
475 | .unwrap_or(1) |
476 | }; |
477 | |
478 | match env::var("RAYON_NUM_THREADS" ) |
479 | .ok() |
480 | .and_then(|s| usize::from_str(&s).ok()) |
481 | { |
482 | Some(x @ 1..) => return x, |
483 | Some(0) => return default(), |
484 | _ => {} |
485 | } |
486 | |
487 | // Support for deprecated `RAYON_RS_NUM_CPUS`. |
488 | match env::var("RAYON_RS_NUM_CPUS" ) |
489 | .ok() |
490 | .and_then(|s| usize::from_str(&s).ok()) |
491 | { |
492 | Some(x @ 1..) => x, |
493 | _ => default(), |
494 | } |
495 | } |
496 | } |
497 | |
498 | /// Get the thread name for the thread with the given index. |
499 | fn get_thread_name(&mut self, index: usize) -> Option<String> { |
500 | let f = self.get_thread_name.as_mut()?; |
501 | Some(f(index)) |
502 | } |
503 | |
504 | /// Sets a closure which takes a thread index and returns |
505 | /// the thread's name. |
506 | pub fn thread_name<F>(mut self, closure: F) -> Self |
507 | where |
508 | F: FnMut(usize) -> String + 'static, |
509 | { |
510 | self.get_thread_name = Some(Box::new(closure)); |
511 | self |
512 | } |
513 | |
514 | /// Sets the number of threads to be used in the rayon threadpool. |
515 | /// |
516 | /// If you specify a non-zero number of threads using this |
517 | /// function, then the resulting thread-pools are guaranteed to |
518 | /// start at most this number of threads. |
519 | /// |
520 | /// If `num_threads` is 0, or you do not call this function, then |
521 | /// the Rayon runtime will select the number of threads |
522 | /// automatically. At present, this is based on the |
523 | /// `RAYON_NUM_THREADS` environment variable (if set), |
524 | /// or the number of logical CPUs (otherwise). |
525 | /// In the future, however, the default behavior may |
526 | /// change to dynamically add or remove threads as needed. |
527 | /// |
528 | /// **Future compatibility warning:** Given the default behavior |
529 | /// may change in the future, if you wish to rely on a fixed |
530 | /// number of threads, you should use this function to specify |
531 | /// that number. To reproduce the current default behavior, you |
532 | /// may wish to use [`std::thread::available_parallelism`] |
533 | /// to query the number of CPUs dynamically. |
534 | /// |
535 | /// **Old environment variable:** `RAYON_NUM_THREADS` is a one-to-one |
536 | /// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment |
537 | /// variable. If both variables are specified, `RAYON_NUM_THREADS` will |
538 | /// be preferred. |
539 | pub fn num_threads(mut self, num_threads: usize) -> Self { |
540 | self.num_threads = num_threads; |
541 | self |
542 | } |
543 | |
544 | /// Use the current thread as one of the threads in the pool. |
545 | /// |
546 | /// The current thread is guaranteed to be at index 0, and since the thread is not managed by |
547 | /// rayon, the spawn and exit handlers do not run for that thread. |
548 | /// |
549 | /// Note that the current thread won't run the main work-stealing loop, so jobs spawned into |
550 | /// the thread-pool will generally not be picked up automatically by this thread unless you |
551 | /// yield to rayon in some way, like via [`yield_now()`], [`yield_local()`], or [`scope()`]. |
552 | /// |
553 | /// # Local thread-pools |
554 | /// |
555 | /// Using this in a local thread-pool means the registry will be leaked. In future versions |
556 | /// there might be a way of cleaning up the current-thread state. |
557 | pub fn use_current_thread(mut self) -> Self { |
558 | self.use_current_thread = true; |
559 | self |
560 | } |
561 | |
562 | /// Returns a copy of the current panic handler. |
563 | fn take_panic_handler(&mut self) -> Option<Box<PanicHandler>> { |
564 | self.panic_handler.take() |
565 | } |
566 | |
567 | /// Normally, whenever Rayon catches a panic, it tries to |
568 | /// propagate it to someplace sensible, to try and reflect the |
569 | /// semantics of sequential execution. But in some cases, |
570 | /// particularly with the `spawn()` APIs, there is no |
571 | /// obvious place where we should propagate the panic to. |
572 | /// In that case, this panic handler is invoked. |
573 | /// |
574 | /// If no panic handler is set, the default is to abort the |
575 | /// process, under the principle that panics should not go |
576 | /// unobserved. |
577 | /// |
578 | /// If the panic handler itself panics, this will abort the |
579 | /// process. To prevent this, wrap the body of your panic handler |
580 | /// in a call to `std::panic::catch_unwind()`. |
581 | pub fn panic_handler<H>(mut self, panic_handler: H) -> Self |
582 | where |
583 | H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static, |
584 | { |
585 | self.panic_handler = Some(Box::new(panic_handler)); |
586 | self |
587 | } |
588 | |
589 | /// Get the stack size of the worker threads |
590 | fn get_stack_size(&self) -> Option<usize> { |
591 | self.stack_size |
592 | } |
593 | |
594 | /// Sets the stack size of the worker threads |
595 | pub fn stack_size(mut self, stack_size: usize) -> Self { |
596 | self.stack_size = Some(stack_size); |
597 | self |
598 | } |
599 | |
600 | /// **(DEPRECATED)** Suggest to worker threads that they execute |
601 | /// spawned jobs in a "breadth-first" fashion. |
602 | /// |
603 | /// Typically, when a worker thread is idle or blocked, it will |
604 | /// attempt to execute the job from the *top* of its local deque of |
605 | /// work (i.e., the job most recently spawned). If this flag is set |
606 | /// to true, however, workers will prefer to execute in a |
607 | /// *breadth-first* fashion -- that is, they will search for jobs at |
608 | /// the *bottom* of their local deque. (At present, workers *always* |
609 | /// steal from the bottom of other workers' deques, regardless of |
610 | /// the setting of this flag.) |
611 | /// |
612 | /// If you think of the tasks as a tree, where a parent task |
613 | /// spawns its children in the tree, then this flag loosely |
614 | /// corresponds to doing a breadth-first traversal of the tree, |
615 | /// whereas the default would be to do a depth-first traversal. |
616 | /// |
617 | /// **Note that this is an "execution hint".** Rayon's task |
618 | /// execution is highly dynamic and the precise order in which |
619 | /// independent tasks are executed is not intended to be |
620 | /// guaranteed. |
621 | /// |
622 | /// This `breadth_first()` method is now deprecated per [RFC #1], |
623 | /// and in the future its effect may be removed. Consider using |
624 | /// [`scope_fifo()`] for a similar effect. |
625 | /// |
626 | /// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md |
627 | /// [`scope_fifo()`]: fn.scope_fifo.html |
628 | #[deprecated (note = "use `scope_fifo` and `spawn_fifo` for similar effect" )] |
629 | pub fn breadth_first(mut self) -> Self { |
630 | self.breadth_first = true; |
631 | self |
632 | } |
633 | |
634 | fn get_breadth_first(&self) -> bool { |
635 | self.breadth_first |
636 | } |
637 | |
638 | /// Takes the current thread start callback, leaving `None`. |
639 | fn take_start_handler(&mut self) -> Option<Box<StartHandler>> { |
640 | self.start_handler.take() |
641 | } |
642 | |
643 | /// Sets a callback to be invoked on thread start. |
644 | /// |
645 | /// The closure is passed the index of the thread on which it is invoked. |
646 | /// Note that this same closure may be invoked multiple times in parallel. |
647 | /// If this closure panics, the panic will be passed to the panic handler. |
648 | /// If that handler returns, then startup will continue normally. |
649 | pub fn start_handler<H>(mut self, start_handler: H) -> Self |
650 | where |
651 | H: Fn(usize) + Send + Sync + 'static, |
652 | { |
653 | self.start_handler = Some(Box::new(start_handler)); |
654 | self |
655 | } |
656 | |
657 | /// Returns a current thread exit callback, leaving `None`. |
658 | fn take_exit_handler(&mut self) -> Option<Box<ExitHandler>> { |
659 | self.exit_handler.take() |
660 | } |
661 | |
662 | /// Sets a callback to be invoked on thread exit. |
663 | /// |
664 | /// The closure is passed the index of the thread on which it is invoked. |
665 | /// Note that this same closure may be invoked multiple times in parallel. |
666 | /// If this closure panics, the panic will be passed to the panic handler. |
667 | /// If that handler returns, then the thread will exit normally. |
668 | pub fn exit_handler<H>(mut self, exit_handler: H) -> Self |
669 | where |
670 | H: Fn(usize) + Send + Sync + 'static, |
671 | { |
672 | self.exit_handler = Some(Box::new(exit_handler)); |
673 | self |
674 | } |
675 | } |
676 | |
677 | #[allow (deprecated)] |
678 | impl Configuration { |
679 | /// Creates and return a valid rayon thread pool configuration, but does not initialize it. |
680 | pub fn new() -> Configuration { |
681 | Configuration { |
682 | builder: ThreadPoolBuilder::new(), |
683 | } |
684 | } |
685 | |
686 | /// Deprecated in favor of `ThreadPoolBuilder::build`. |
687 | pub fn build(self) -> Result<ThreadPool, Box<dyn Error + 'static>> { |
688 | self.builder.build().map_err(Box::from) |
689 | } |
690 | |
691 | /// Deprecated in favor of `ThreadPoolBuilder::thread_name`. |
692 | pub fn thread_name<F>(mut self, closure: F) -> Self |
693 | where |
694 | F: FnMut(usize) -> String + 'static, |
695 | { |
696 | self.builder = self.builder.thread_name(closure); |
697 | self |
698 | } |
699 | |
700 | /// Deprecated in favor of `ThreadPoolBuilder::num_threads`. |
701 | pub fn num_threads(mut self, num_threads: usize) -> Configuration { |
702 | self.builder = self.builder.num_threads(num_threads); |
703 | self |
704 | } |
705 | |
706 | /// Deprecated in favor of `ThreadPoolBuilder::panic_handler`. |
707 | pub fn panic_handler<H>(mut self, panic_handler: H) -> Configuration |
708 | where |
709 | H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static, |
710 | { |
711 | self.builder = self.builder.panic_handler(panic_handler); |
712 | self |
713 | } |
714 | |
715 | /// Deprecated in favor of `ThreadPoolBuilder::stack_size`. |
716 | pub fn stack_size(mut self, stack_size: usize) -> Self { |
717 | self.builder = self.builder.stack_size(stack_size); |
718 | self |
719 | } |
720 | |
721 | /// Deprecated in favor of `ThreadPoolBuilder::breadth_first`. |
722 | pub fn breadth_first(mut self) -> Self { |
723 | self.builder = self.builder.breadth_first(); |
724 | self |
725 | } |
726 | |
727 | /// Deprecated in favor of `ThreadPoolBuilder::start_handler`. |
728 | pub fn start_handler<H>(mut self, start_handler: H) -> Configuration |
729 | where |
730 | H: Fn(usize) + Send + Sync + 'static, |
731 | { |
732 | self.builder = self.builder.start_handler(start_handler); |
733 | self |
734 | } |
735 | |
736 | /// Deprecated in favor of `ThreadPoolBuilder::exit_handler`. |
737 | pub fn exit_handler<H>(mut self, exit_handler: H) -> Configuration |
738 | where |
739 | H: Fn(usize) + Send + Sync + 'static, |
740 | { |
741 | self.builder = self.builder.exit_handler(exit_handler); |
742 | self |
743 | } |
744 | |
745 | /// Returns a ThreadPoolBuilder with identical parameters. |
746 | fn into_builder(self) -> ThreadPoolBuilder { |
747 | self.builder |
748 | } |
749 | } |
750 | |
751 | impl ThreadPoolBuildError { |
752 | fn new(kind: ErrorKind) -> ThreadPoolBuildError { |
753 | ThreadPoolBuildError { kind } |
754 | } |
755 | |
756 | fn is_unsupported(&self) -> bool { |
757 | matches!(&self.kind, ErrorKind::IOError(e) if e.kind() == io::ErrorKind::Unsupported) |
758 | } |
759 | } |
760 | |
761 | const GLOBAL_POOL_ALREADY_INITIALIZED: &str = |
762 | "The global thread pool has already been initialized." ; |
763 | |
764 | const CURRENT_THREAD_ALREADY_IN_POOL: &str = |
765 | "The current thread is already part of another thread pool." ; |
766 | |
767 | impl Error for ThreadPoolBuildError { |
768 | #[allow (deprecated)] |
769 | fn description(&self) -> &str { |
770 | match self.kind { |
771 | ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED, |
772 | ErrorKind::CurrentThreadAlreadyInPool => CURRENT_THREAD_ALREADY_IN_POOL, |
773 | ErrorKind::IOError(ref e: &Error) => e.description(), |
774 | } |
775 | } |
776 | |
777 | fn source(&self) -> Option<&(dyn Error + 'static)> { |
778 | match &self.kind { |
779 | ErrorKind::GlobalPoolAlreadyInitialized | ErrorKind::CurrentThreadAlreadyInPool => None, |
780 | ErrorKind::IOError(e: &Error) => Some(e), |
781 | } |
782 | } |
783 | } |
784 | |
785 | impl fmt::Display for ThreadPoolBuildError { |
786 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
787 | match &self.kind { |
788 | ErrorKind::CurrentThreadAlreadyInPool => CURRENT_THREAD_ALREADY_IN_POOL.fmt(f), |
789 | ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED.fmt(f), |
790 | ErrorKind::IOError(e: &Error) => e.fmt(f), |
791 | } |
792 | } |
793 | } |
794 | |
795 | /// Deprecated in favor of `ThreadPoolBuilder::build_global`. |
796 | #[deprecated (note = "use `ThreadPoolBuilder::build_global`" )] |
797 | #[allow (deprecated)] |
798 | pub fn initialize(config: Configuration) -> Result<(), Box<dyn Error>> { |
799 | config.into_builder().build_global().map_err(op:Box::from) |
800 | } |
801 | |
802 | impl<S> fmt::Debug for ThreadPoolBuilder<S> { |
803 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
804 | let ThreadPoolBuilder { |
805 | ref num_threads, |
806 | ref use_current_thread, |
807 | ref get_thread_name, |
808 | ref panic_handler, |
809 | ref stack_size, |
810 | ref start_handler, |
811 | ref exit_handler, |
812 | spawn_handler: _, |
813 | ref breadth_first, |
814 | } = *self; |
815 | |
816 | // Just print `Some(<closure>)` or `None` to the debug |
817 | // output. |
818 | struct ClosurePlaceholder; |
819 | impl fmt::Debug for ClosurePlaceholder { |
820 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
821 | f.write_str("<closure>" ) |
822 | } |
823 | } |
824 | let get_thread_name = get_thread_name.as_ref().map(|_| ClosurePlaceholder); |
825 | let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder); |
826 | let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder); |
827 | let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder); |
828 | |
829 | f.debug_struct("ThreadPoolBuilder" ) |
830 | .field("num_threads" , num_threads) |
831 | .field("use_current_thread" , use_current_thread) |
832 | .field("get_thread_name" , &get_thread_name) |
833 | .field("panic_handler" , &panic_handler) |
834 | .field("stack_size" , &stack_size) |
835 | .field("start_handler" , &start_handler) |
836 | .field("exit_handler" , &exit_handler) |
837 | .field("breadth_first" , &breadth_first) |
838 | .finish() |
839 | } |
840 | } |
841 | |
842 | #[allow (deprecated)] |
843 | impl fmt::Debug for Configuration { |
844 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
845 | self.builder.fmt(f) |
846 | } |
847 | } |
848 | |
849 | /// Provides the calling context to a closure called by `join_context`. |
850 | #[derive (Debug)] |
851 | pub struct FnContext { |
852 | migrated: bool, |
853 | |
854 | /// disable `Send` and `Sync`, just for a little future-proofing. |
855 | _marker: PhantomData<*mut ()>, |
856 | } |
857 | |
858 | impl FnContext { |
859 | #[inline ] |
860 | fn new(migrated: bool) -> Self { |
861 | FnContext { |
862 | migrated, |
863 | _marker: PhantomData, |
864 | } |
865 | } |
866 | } |
867 | |
868 | impl FnContext { |
869 | /// Returns `true` if the closure was called from a different thread |
870 | /// than it was provided from. |
871 | #[inline ] |
872 | pub fn migrated(&self) -> bool { |
873 | self.migrated |
874 | } |
875 | } |
876 | |