1//! Contains support for user-managed thread pools, represented by the
2//! the [`ThreadPool`] type (see that struct for details).
3//!
4//! [`ThreadPool`]: struct.ThreadPool.html
5
6use crate::broadcast::{self, BroadcastContext};
7use crate::join;
8use crate::registry::{Registry, ThreadSpawn, WorkerThread};
9use crate::scope::{do_in_place_scope, do_in_place_scope_fifo};
10use crate::spawn;
11use crate::{scope, Scope};
12use crate::{scope_fifo, ScopeFifo};
13use crate::{ThreadPoolBuildError, ThreadPoolBuilder};
14use std::error::Error;
15use std::fmt;
16use std::sync::Arc;
17
18mod test;
19
20/// Represents a user created [thread-pool].
21///
22/// Use a [`ThreadPoolBuilder`] to specify the number and/or names of threads
23/// in the pool. After calling [`ThreadPoolBuilder::build()`], you can then
24/// execute functions explicitly within this [`ThreadPool`] using
25/// [`ThreadPool::install()`]. By contrast, top level rayon functions
26/// (like `join()`) will execute implicitly within the current thread-pool.
27///
28///
29/// ## Creating a ThreadPool
30///
31/// ```rust
32/// # use rayon_core as rayon;
33/// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
34/// ```
35///
36/// [`install()`][`ThreadPool::install()`] executes a closure in one of the `ThreadPool`'s
37/// threads. In addition, any other rayon operations called inside of `install()` will also
38/// execute in the context of the `ThreadPool`.
39///
40/// When the `ThreadPool` is dropped, that's a signal for the threads it manages to terminate,
41/// they will complete executing any remaining work that you have spawned, and automatically
42/// terminate.
43///
44///
45/// [thread-pool]: https://en.wikipedia.org/wiki/Thread_pool
46/// [`ThreadPool`]: struct.ThreadPool.html
47/// [`ThreadPool::new()`]: struct.ThreadPool.html#method.new
48/// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
49/// [`ThreadPoolBuilder::build()`]: struct.ThreadPoolBuilder.html#method.build
50/// [`ThreadPool::install()`]: struct.ThreadPool.html#method.install
51pub struct ThreadPool {
52 registry: Arc<Registry>,
53}
54
55impl ThreadPool {
56 #[deprecated(note = "Use `ThreadPoolBuilder::build`")]
57 #[allow(deprecated)]
58 /// Deprecated in favor of `ThreadPoolBuilder::build`.
59 pub fn new(configuration: crate::Configuration) -> Result<ThreadPool, Box<dyn Error>> {
60 Self::build(configuration.into_builder()).map_err(Box::from)
61 }
62
63 pub(super) fn build<S>(
64 builder: ThreadPoolBuilder<S>,
65 ) -> Result<ThreadPool, ThreadPoolBuildError>
66 where
67 S: ThreadSpawn,
68 {
69 let registry = Registry::new(builder)?;
70 Ok(ThreadPool { registry })
71 }
72
73 /// Executes `op` within the threadpool. Any attempts to use
74 /// `join`, `scope`, or parallel iterators will then operate
75 /// within that threadpool.
76 ///
77 /// # Warning: thread-local data
78 ///
79 /// Because `op` is executing within the Rayon thread-pool,
80 /// thread-local data from the current thread will not be
81 /// accessible.
82 ///
83 /// # Warning: execution order
84 ///
85 /// If the current thread is part of a different thread pool, it will try to
86 /// keep busy while the `op` completes in its target pool, similar to
87 /// calling [`ThreadPool::yield_now()`] in a loop. Therefore, it may
88 /// potentially schedule other tasks to run on the current thread in the
89 /// meantime. For example
90 ///
91 /// ```rust
92 /// # use rayon_core as rayon;
93 /// fn main() {
94 /// rayon::ThreadPoolBuilder::new().num_threads(1).build_global().unwrap();
95 /// let pool = rayon_core::ThreadPoolBuilder::default().build().unwrap();
96 /// let do_it = || {
97 /// print!("one ");
98 /// pool.install(||{});
99 /// print!("two ");
100 /// };
101 /// rayon::join(|| do_it(), || do_it());
102 /// }
103 /// ```
104 ///
105 /// Since we configured just one thread in the global pool, one might
106 /// expect `do_it()` to run sequentially, producing:
107 ///
108 /// ```ascii
109 /// one two one two
110 /// ```
111 ///
112 /// However each call to `install()` yields implicitly, allowing rayon to
113 /// run multiple instances of `do_it()` concurrently on the single, global
114 /// thread. The following output would be equally valid:
115 ///
116 /// ```ascii
117 /// one one two two
118 /// ```
119 ///
120 /// # Panics
121 ///
122 /// If `op` should panic, that panic will be propagated.
123 ///
124 /// ## Using `install()`
125 ///
126 /// ```rust
127 /// # use rayon_core as rayon;
128 /// fn main() {
129 /// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
130 /// let n = pool.install(|| fib(20));
131 /// println!("{}", n);
132 /// }
133 ///
134 /// fn fib(n: usize) -> usize {
135 /// if n == 0 || n == 1 {
136 /// return n;
137 /// }
138 /// let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // runs inside of `pool`
139 /// return a + b;
140 /// }
141 /// ```
142 pub fn install<OP, R>(&self, op: OP) -> R
143 where
144 OP: FnOnce() -> R + Send,
145 R: Send,
146 {
147 self.registry.in_worker(|_, _| op())
148 }
149
150 /// Executes `op` within every thread in the threadpool. Any attempts to use
151 /// `join`, `scope`, or parallel iterators will then operate within that
152 /// threadpool.
153 ///
154 /// Broadcasts are executed on each thread after they have exhausted their
155 /// local work queue, before they attempt work-stealing from other threads.
156 /// The goal of that strategy is to run everywhere in a timely manner
157 /// *without* being too disruptive to current work. There may be alternative
158 /// broadcast styles added in the future for more or less aggressive
159 /// injection, if the need arises.
160 ///
161 /// # Warning: thread-local data
162 ///
163 /// Because `op` is executing within the Rayon thread-pool,
164 /// thread-local data from the current thread will not be
165 /// accessible.
166 ///
167 /// # Panics
168 ///
169 /// If `op` should panic on one or more threads, exactly one panic
170 /// will be propagated, only after all threads have completed
171 /// (or panicked) their own `op`.
172 ///
173 /// # Examples
174 ///
175 /// ```
176 /// # use rayon_core as rayon;
177 /// use std::sync::atomic::{AtomicUsize, Ordering};
178 ///
179 /// fn main() {
180 /// let pool = rayon::ThreadPoolBuilder::new().num_threads(5).build().unwrap();
181 ///
182 /// // The argument gives context, including the index of each thread.
183 /// let v: Vec<usize> = pool.broadcast(|ctx| ctx.index() * ctx.index());
184 /// assert_eq!(v, &[0, 1, 4, 9, 16]);
185 ///
186 /// // The closure can reference the local stack
187 /// let count = AtomicUsize::new(0);
188 /// pool.broadcast(|_| count.fetch_add(1, Ordering::Relaxed));
189 /// assert_eq!(count.into_inner(), 5);
190 /// }
191 /// ```
192 pub fn broadcast<OP, R>(&self, op: OP) -> Vec<R>
193 where
194 OP: Fn(BroadcastContext<'_>) -> R + Sync,
195 R: Send,
196 {
197 // We assert that `self.registry` has not terminated.
198 unsafe { broadcast::broadcast_in(op, &self.registry) }
199 }
200
201 /// Returns the (current) number of threads in the thread pool.
202 ///
203 /// # Future compatibility note
204 ///
205 /// Note that unless this thread-pool was created with a
206 /// [`ThreadPoolBuilder`] that specifies the number of threads,
207 /// then this number may vary over time in future versions (see [the
208 /// `num_threads()` method for details][snt]).
209 ///
210 /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
211 /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
212 #[inline]
213 pub fn current_num_threads(&self) -> usize {
214 self.registry.num_threads()
215 }
216
217 /// If called from a Rayon worker thread in this thread-pool,
218 /// returns the index of that thread; if not called from a Rayon
219 /// thread, or called from a Rayon thread that belongs to a
220 /// different thread-pool, returns `None`.
221 ///
222 /// The index for a given thread will not change over the thread's
223 /// lifetime. However, multiple threads may share the same index if
224 /// they are in distinct thread-pools.
225 ///
226 /// # Future compatibility note
227 ///
228 /// Currently, every thread-pool (including the global
229 /// thread-pool) has a fixed number of threads, but this may
230 /// change in future Rayon versions (see [the `num_threads()` method
231 /// for details][snt]). In that case, the index for a
232 /// thread would not change during its lifetime, but thread
233 /// indices may wind up being reused if threads are terminated and
234 /// restarted.
235 ///
236 /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
237 #[inline]
238 pub fn current_thread_index(&self) -> Option<usize> {
239 let curr = self.registry.current_thread()?;
240 Some(curr.index())
241 }
242
243 /// Returns true if the current worker thread currently has "local
244 /// tasks" pending. This can be useful as part of a heuristic for
245 /// deciding whether to spawn a new task or execute code on the
246 /// current thread, particularly in breadth-first
247 /// schedulers. However, keep in mind that this is an inherently
248 /// racy check, as other worker threads may be actively "stealing"
249 /// tasks from our local deque.
250 ///
251 /// **Background:** Rayon's uses a [work-stealing] scheduler. The
252 /// key idea is that each thread has its own [deque] of
253 /// tasks. Whenever a new task is spawned -- whether through
254 /// `join()`, `Scope::spawn()`, or some other means -- that new
255 /// task is pushed onto the thread's *local* deque. Worker threads
256 /// have a preference for executing their own tasks; if however
257 /// they run out of tasks, they will go try to "steal" tasks from
258 /// other threads. This function therefore has an inherent race
259 /// with other active worker threads, which may be removing items
260 /// from the local deque.
261 ///
262 /// [work-stealing]: https://en.wikipedia.org/wiki/Work_stealing
263 /// [deque]: https://en.wikipedia.org/wiki/Double-ended_queue
264 #[inline]
265 pub fn current_thread_has_pending_tasks(&self) -> Option<bool> {
266 let curr = self.registry.current_thread()?;
267 Some(!curr.local_deque_is_empty())
268 }
269
270 /// Execute `oper_a` and `oper_b` in the thread-pool and return
271 /// the results. Equivalent to `self.install(|| join(oper_a,
272 /// oper_b))`.
273 pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB)
274 where
275 A: FnOnce() -> RA + Send,
276 B: FnOnce() -> RB + Send,
277 RA: Send,
278 RB: Send,
279 {
280 self.install(|| join(oper_a, oper_b))
281 }
282
283 /// Creates a scope that executes within this thread-pool.
284 /// Equivalent to `self.install(|| scope(...))`.
285 ///
286 /// See also: [the `scope()` function][scope].
287 ///
288 /// [scope]: fn.scope.html
289 pub fn scope<'scope, OP, R>(&self, op: OP) -> R
290 where
291 OP: FnOnce(&Scope<'scope>) -> R + Send,
292 R: Send,
293 {
294 self.install(|| scope(op))
295 }
296
297 /// Creates a scope that executes within this thread-pool.
298 /// Spawns from the same thread are prioritized in relative FIFO order.
299 /// Equivalent to `self.install(|| scope_fifo(...))`.
300 ///
301 /// See also: [the `scope_fifo()` function][scope_fifo].
302 ///
303 /// [scope_fifo]: fn.scope_fifo.html
304 pub fn scope_fifo<'scope, OP, R>(&self, op: OP) -> R
305 where
306 OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
307 R: Send,
308 {
309 self.install(|| scope_fifo(op))
310 }
311
312 /// Creates a scope that spawns work into this thread-pool.
313 ///
314 /// See also: [the `in_place_scope()` function][in_place_scope].
315 ///
316 /// [in_place_scope]: fn.in_place_scope.html
317 pub fn in_place_scope<'scope, OP, R>(&self, op: OP) -> R
318 where
319 OP: FnOnce(&Scope<'scope>) -> R,
320 {
321 do_in_place_scope(Some(&self.registry), op)
322 }
323
324 /// Creates a scope that spawns work into this thread-pool in FIFO order.
325 ///
326 /// See also: [the `in_place_scope_fifo()` function][in_place_scope_fifo].
327 ///
328 /// [in_place_scope_fifo]: fn.in_place_scope_fifo.html
329 pub fn in_place_scope_fifo<'scope, OP, R>(&self, op: OP) -> R
330 where
331 OP: FnOnce(&ScopeFifo<'scope>) -> R,
332 {
333 do_in_place_scope_fifo(Some(&self.registry), op)
334 }
335
336 /// Spawns an asynchronous task in this thread-pool. This task will
337 /// run in the implicit, global scope, which means that it may outlast
338 /// the current stack frame -- therefore, it cannot capture any references
339 /// onto the stack (you will likely need a `move` closure).
340 ///
341 /// See also: [the `spawn()` function defined on scopes][spawn].
342 ///
343 /// [spawn]: struct.Scope.html#method.spawn
344 pub fn spawn<OP>(&self, op: OP)
345 where
346 OP: FnOnce() + Send + 'static,
347 {
348 // We assert that `self.registry` has not terminated.
349 unsafe { spawn::spawn_in(op, &self.registry) }
350 }
351
352 /// Spawns an asynchronous task in this thread-pool. This task will
353 /// run in the implicit, global scope, which means that it may outlast
354 /// the current stack frame -- therefore, it cannot capture any references
355 /// onto the stack (you will likely need a `move` closure).
356 ///
357 /// See also: [the `spawn_fifo()` function defined on scopes][spawn_fifo].
358 ///
359 /// [spawn_fifo]: struct.ScopeFifo.html#method.spawn_fifo
360 pub fn spawn_fifo<OP>(&self, op: OP)
361 where
362 OP: FnOnce() + Send + 'static,
363 {
364 // We assert that `self.registry` has not terminated.
365 unsafe { spawn::spawn_fifo_in(op, &self.registry) }
366 }
367
368 /// Spawns an asynchronous task on every thread in this thread-pool. This task
369 /// will run in the implicit, global scope, which means that it may outlast the
370 /// current stack frame -- therefore, it cannot capture any references onto the
371 /// stack (you will likely need a `move` closure).
372 pub fn spawn_broadcast<OP>(&self, op: OP)
373 where
374 OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static,
375 {
376 // We assert that `self.registry` has not terminated.
377 unsafe { broadcast::spawn_broadcast_in(op, &self.registry) }
378 }
379
380 /// Cooperatively yields execution to Rayon.
381 ///
382 /// This is similar to the general [`yield_now()`], but only if the current
383 /// thread is part of *this* thread pool.
384 ///
385 /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
386 /// nothing was available, or `None` if the current thread is not part this pool.
387 pub fn yield_now(&self) -> Option<Yield> {
388 let curr = self.registry.current_thread()?;
389 Some(curr.yield_now())
390 }
391
392 /// Cooperatively yields execution to local Rayon work.
393 ///
394 /// This is similar to the general [`yield_local()`], but only if the current
395 /// thread is part of *this* thread pool.
396 ///
397 /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
398 /// nothing was available, or `None` if the current thread is not part this pool.
399 pub fn yield_local(&self) -> Option<Yield> {
400 let curr = self.registry.current_thread()?;
401 Some(curr.yield_local())
402 }
403}
404
405impl Drop for ThreadPool {
406 fn drop(&mut self) {
407 self.registry.terminate();
408 }
409}
410
411impl fmt::Debug for ThreadPool {
412 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
413 fmt&mut DebugStruct<'_, '_>.debug_struct("ThreadPool")
414 .field("num_threads", &self.current_num_threads())
415 .field(name:"id", &self.registry.id())
416 .finish()
417 }
418}
419
420/// If called from a Rayon worker thread, returns the index of that
421/// thread within its current pool; if not called from a Rayon thread,
422/// returns `None`.
423///
424/// The index for a given thread will not change over the thread's
425/// lifetime. However, multiple threads may share the same index if
426/// they are in distinct thread-pools.
427///
428/// See also: [the `ThreadPool::current_thread_index()` method].
429///
430/// [m]: struct.ThreadPool.html#method.current_thread_index
431///
432/// # Future compatibility note
433///
434/// Currently, every thread-pool (including the global
435/// thread-pool) has a fixed number of threads, but this may
436/// change in future Rayon versions (see [the `num_threads()` method
437/// for details][snt]). In that case, the index for a
438/// thread would not change during its lifetime, but thread
439/// indices may wind up being reused if threads are terminated and
440/// restarted.
441///
442/// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
443#[inline]
444pub fn current_thread_index() -> Option<usize> {
445 unsafe {
446 let curr: &WorkerThread = WorkerThread::current().as_ref()?;
447 Some(curr.index())
448 }
449}
450
451/// If called from a Rayon worker thread, indicates whether that
452/// thread's local deque still has pending tasks. Otherwise, returns
453/// `None`. For more information, see [the
454/// `ThreadPool::current_thread_has_pending_tasks()` method][m].
455///
456/// [m]: struct.ThreadPool.html#method.current_thread_has_pending_tasks
457#[inline]
458pub fn current_thread_has_pending_tasks() -> Option<bool> {
459 unsafe {
460 let curr: &WorkerThread = WorkerThread::current().as_ref()?;
461 Some(!curr.local_deque_is_empty())
462 }
463}
464
465/// Cooperatively yields execution to Rayon.
466///
467/// If the current thread is part of a rayon thread pool, this looks for a
468/// single unit of pending work in the pool, then executes it. Completion of
469/// that work might include nested work or further work stealing.
470///
471/// This is similar to [`std::thread::yield_now()`], but does not literally make
472/// that call. If you are implementing a polling loop, you may want to also
473/// yield to the OS scheduler yourself if no Rayon work was found.
474///
475/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
476/// nothing was available, or `None` if this thread is not part of any pool at all.
477pub fn yield_now() -> Option<Yield> {
478 unsafe {
479 let thread: &WorkerThread = WorkerThread::current().as_ref()?;
480 Some(thread.yield_now())
481 }
482}
483
484/// Cooperatively yields execution to local Rayon work.
485///
486/// If the current thread is part of a rayon thread pool, this looks for a
487/// single unit of pending work in this thread's queue, then executes it.
488/// Completion of that work might include nested work or further work stealing.
489///
490/// This is similar to [`yield_now()`], but does not steal from other threads.
491///
492/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
493/// nothing was available, or `None` if this thread is not part of any pool at all.
494pub fn yield_local() -> Option<Yield> {
495 unsafe {
496 let thread: &WorkerThread = WorkerThread::current().as_ref()?;
497 Some(thread.yield_local())
498 }
499}
500
501/// Result of [`yield_now()`] or [`yield_local()`].
502#[derive(Clone, Copy, Debug, PartialEq, Eq)]
503pub enum Yield {
504 /// Work was found and executed.
505 Executed,
506 /// No available work was found.
507 Idle,
508}
509