1 | //! Contains support for user-managed thread pools, represented by the |
2 | //! the [`ThreadPool`] type (see that struct for details). |
3 | //! |
4 | //! [`ThreadPool`]: struct.ThreadPool.html |
5 | |
6 | use crate::broadcast::{self, BroadcastContext}; |
7 | use crate::join; |
8 | use crate::registry::{Registry, ThreadSpawn, WorkerThread}; |
9 | use crate::scope::{do_in_place_scope, do_in_place_scope_fifo}; |
10 | use crate::spawn; |
11 | use crate::{scope, Scope}; |
12 | use crate::{scope_fifo, ScopeFifo}; |
13 | use crate::{ThreadPoolBuildError, ThreadPoolBuilder}; |
14 | use std::error::Error; |
15 | use std::fmt; |
16 | use std::sync::Arc; |
17 | |
18 | mod test; |
19 | |
20 | /// Represents a user created [thread-pool]. |
21 | /// |
22 | /// Use a [`ThreadPoolBuilder`] to specify the number and/or names of threads |
23 | /// in the pool. After calling [`ThreadPoolBuilder::build()`], you can then |
24 | /// execute functions explicitly within this [`ThreadPool`] using |
25 | /// [`ThreadPool::install()`]. By contrast, top level rayon functions |
26 | /// (like `join()`) will execute implicitly within the current thread-pool. |
27 | /// |
28 | /// |
29 | /// ## Creating a ThreadPool |
30 | /// |
31 | /// ```rust |
32 | /// # use rayon_core as rayon; |
33 | /// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap(); |
34 | /// ``` |
35 | /// |
36 | /// [`install()`][`ThreadPool::install()`] executes a closure in one of the `ThreadPool`'s |
37 | /// threads. In addition, any other rayon operations called inside of `install()` will also |
38 | /// execute in the context of the `ThreadPool`. |
39 | /// |
40 | /// When the `ThreadPool` is dropped, that's a signal for the threads it manages to terminate, |
41 | /// they will complete executing any remaining work that you have spawned, and automatically |
42 | /// terminate. |
43 | /// |
44 | /// |
45 | /// [thread-pool]: https://en.wikipedia.org/wiki/Thread_pool |
46 | /// [`ThreadPool`]: struct.ThreadPool.html |
47 | /// [`ThreadPool::new()`]: struct.ThreadPool.html#method.new |
48 | /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html |
49 | /// [`ThreadPoolBuilder::build()`]: struct.ThreadPoolBuilder.html#method.build |
50 | /// [`ThreadPool::install()`]: struct.ThreadPool.html#method.install |
51 | pub struct ThreadPool { |
52 | registry: Arc<Registry>, |
53 | } |
54 | |
55 | impl ThreadPool { |
56 | #[deprecated (note = "Use `ThreadPoolBuilder::build`" )] |
57 | #[allow (deprecated)] |
58 | /// Deprecated in favor of `ThreadPoolBuilder::build`. |
59 | pub fn new(configuration: crate::Configuration) -> Result<ThreadPool, Box<dyn Error>> { |
60 | Self::build(configuration.into_builder()).map_err(Box::from) |
61 | } |
62 | |
63 | pub(super) fn build<S>( |
64 | builder: ThreadPoolBuilder<S>, |
65 | ) -> Result<ThreadPool, ThreadPoolBuildError> |
66 | where |
67 | S: ThreadSpawn, |
68 | { |
69 | let registry = Registry::new(builder)?; |
70 | Ok(ThreadPool { registry }) |
71 | } |
72 | |
73 | /// Executes `op` within the threadpool. Any attempts to use |
74 | /// `join`, `scope`, or parallel iterators will then operate |
75 | /// within that threadpool. |
76 | /// |
77 | /// # Warning: thread-local data |
78 | /// |
79 | /// Because `op` is executing within the Rayon thread-pool, |
80 | /// thread-local data from the current thread will not be |
81 | /// accessible. |
82 | /// |
83 | /// # Warning: execution order |
84 | /// |
85 | /// If the current thread is part of a different thread pool, it will try to |
86 | /// keep busy while the `op` completes in its target pool, similar to |
87 | /// calling [`ThreadPool::yield_now()`] in a loop. Therefore, it may |
88 | /// potentially schedule other tasks to run on the current thread in the |
89 | /// meantime. For example |
90 | /// |
91 | /// ```rust |
92 | /// # use rayon_core as rayon; |
93 | /// fn main() { |
94 | /// rayon::ThreadPoolBuilder::new().num_threads(1).build_global().unwrap(); |
95 | /// let pool = rayon_core::ThreadPoolBuilder::default().build().unwrap(); |
96 | /// let do_it = || { |
97 | /// print!("one " ); |
98 | /// pool.install(||{}); |
99 | /// print!("two " ); |
100 | /// }; |
101 | /// rayon::join(|| do_it(), || do_it()); |
102 | /// } |
103 | /// ``` |
104 | /// |
105 | /// Since we configured just one thread in the global pool, one might |
106 | /// expect `do_it()` to run sequentially, producing: |
107 | /// |
108 | /// ```ascii |
109 | /// one two one two |
110 | /// ``` |
111 | /// |
112 | /// However each call to `install()` yields implicitly, allowing rayon to |
113 | /// run multiple instances of `do_it()` concurrently on the single, global |
114 | /// thread. The following output would be equally valid: |
115 | /// |
116 | /// ```ascii |
117 | /// one one two two |
118 | /// ``` |
119 | /// |
120 | /// # Panics |
121 | /// |
122 | /// If `op` should panic, that panic will be propagated. |
123 | /// |
124 | /// ## Using `install()` |
125 | /// |
126 | /// ```rust |
127 | /// # use rayon_core as rayon; |
128 | /// fn main() { |
129 | /// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap(); |
130 | /// let n = pool.install(|| fib(20)); |
131 | /// println!("{}" , n); |
132 | /// } |
133 | /// |
134 | /// fn fib(n: usize) -> usize { |
135 | /// if n == 0 || n == 1 { |
136 | /// return n; |
137 | /// } |
138 | /// let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // runs inside of `pool` |
139 | /// return a + b; |
140 | /// } |
141 | /// ``` |
142 | pub fn install<OP, R>(&self, op: OP) -> R |
143 | where |
144 | OP: FnOnce() -> R + Send, |
145 | R: Send, |
146 | { |
147 | self.registry.in_worker(|_, _| op()) |
148 | } |
149 | |
150 | /// Executes `op` within every thread in the threadpool. Any attempts to use |
151 | /// `join`, `scope`, or parallel iterators will then operate within that |
152 | /// threadpool. |
153 | /// |
154 | /// Broadcasts are executed on each thread after they have exhausted their |
155 | /// local work queue, before they attempt work-stealing from other threads. |
156 | /// The goal of that strategy is to run everywhere in a timely manner |
157 | /// *without* being too disruptive to current work. There may be alternative |
158 | /// broadcast styles added in the future for more or less aggressive |
159 | /// injection, if the need arises. |
160 | /// |
161 | /// # Warning: thread-local data |
162 | /// |
163 | /// Because `op` is executing within the Rayon thread-pool, |
164 | /// thread-local data from the current thread will not be |
165 | /// accessible. |
166 | /// |
167 | /// # Panics |
168 | /// |
169 | /// If `op` should panic on one or more threads, exactly one panic |
170 | /// will be propagated, only after all threads have completed |
171 | /// (or panicked) their own `op`. |
172 | /// |
173 | /// # Examples |
174 | /// |
175 | /// ``` |
176 | /// # use rayon_core as rayon; |
177 | /// use std::sync::atomic::{AtomicUsize, Ordering}; |
178 | /// |
179 | /// fn main() { |
180 | /// let pool = rayon::ThreadPoolBuilder::new().num_threads(5).build().unwrap(); |
181 | /// |
182 | /// // The argument gives context, including the index of each thread. |
183 | /// let v: Vec<usize> = pool.broadcast(|ctx| ctx.index() * ctx.index()); |
184 | /// assert_eq!(v, &[0, 1, 4, 9, 16]); |
185 | /// |
186 | /// // The closure can reference the local stack |
187 | /// let count = AtomicUsize::new(0); |
188 | /// pool.broadcast(|_| count.fetch_add(1, Ordering::Relaxed)); |
189 | /// assert_eq!(count.into_inner(), 5); |
190 | /// } |
191 | /// ``` |
192 | pub fn broadcast<OP, R>(&self, op: OP) -> Vec<R> |
193 | where |
194 | OP: Fn(BroadcastContext<'_>) -> R + Sync, |
195 | R: Send, |
196 | { |
197 | // We assert that `self.registry` has not terminated. |
198 | unsafe { broadcast::broadcast_in(op, &self.registry) } |
199 | } |
200 | |
201 | /// Returns the (current) number of threads in the thread pool. |
202 | /// |
203 | /// # Future compatibility note |
204 | /// |
205 | /// Note that unless this thread-pool was created with a |
206 | /// [`ThreadPoolBuilder`] that specifies the number of threads, |
207 | /// then this number may vary over time in future versions (see [the |
208 | /// `num_threads()` method for details][snt]). |
209 | /// |
210 | /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads |
211 | /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html |
212 | #[inline ] |
213 | pub fn current_num_threads(&self) -> usize { |
214 | self.registry.num_threads() |
215 | } |
216 | |
217 | /// If called from a Rayon worker thread in this thread-pool, |
218 | /// returns the index of that thread; if not called from a Rayon |
219 | /// thread, or called from a Rayon thread that belongs to a |
220 | /// different thread-pool, returns `None`. |
221 | /// |
222 | /// The index for a given thread will not change over the thread's |
223 | /// lifetime. However, multiple threads may share the same index if |
224 | /// they are in distinct thread-pools. |
225 | /// |
226 | /// # Future compatibility note |
227 | /// |
228 | /// Currently, every thread-pool (including the global |
229 | /// thread-pool) has a fixed number of threads, but this may |
230 | /// change in future Rayon versions (see [the `num_threads()` method |
231 | /// for details][snt]). In that case, the index for a |
232 | /// thread would not change during its lifetime, but thread |
233 | /// indices may wind up being reused if threads are terminated and |
234 | /// restarted. |
235 | /// |
236 | /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads |
237 | #[inline ] |
238 | pub fn current_thread_index(&self) -> Option<usize> { |
239 | let curr = self.registry.current_thread()?; |
240 | Some(curr.index()) |
241 | } |
242 | |
243 | /// Returns true if the current worker thread currently has "local |
244 | /// tasks" pending. This can be useful as part of a heuristic for |
245 | /// deciding whether to spawn a new task or execute code on the |
246 | /// current thread, particularly in breadth-first |
247 | /// schedulers. However, keep in mind that this is an inherently |
248 | /// racy check, as other worker threads may be actively "stealing" |
249 | /// tasks from our local deque. |
250 | /// |
251 | /// **Background:** Rayon's uses a [work-stealing] scheduler. The |
252 | /// key idea is that each thread has its own [deque] of |
253 | /// tasks. Whenever a new task is spawned -- whether through |
254 | /// `join()`, `Scope::spawn()`, or some other means -- that new |
255 | /// task is pushed onto the thread's *local* deque. Worker threads |
256 | /// have a preference for executing their own tasks; if however |
257 | /// they run out of tasks, they will go try to "steal" tasks from |
258 | /// other threads. This function therefore has an inherent race |
259 | /// with other active worker threads, which may be removing items |
260 | /// from the local deque. |
261 | /// |
262 | /// [work-stealing]: https://en.wikipedia.org/wiki/Work_stealing |
263 | /// [deque]: https://en.wikipedia.org/wiki/Double-ended_queue |
264 | #[inline ] |
265 | pub fn current_thread_has_pending_tasks(&self) -> Option<bool> { |
266 | let curr = self.registry.current_thread()?; |
267 | Some(!curr.local_deque_is_empty()) |
268 | } |
269 | |
270 | /// Execute `oper_a` and `oper_b` in the thread-pool and return |
271 | /// the results. Equivalent to `self.install(|| join(oper_a, |
272 | /// oper_b))`. |
273 | pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB) |
274 | where |
275 | A: FnOnce() -> RA + Send, |
276 | B: FnOnce() -> RB + Send, |
277 | RA: Send, |
278 | RB: Send, |
279 | { |
280 | self.install(|| join(oper_a, oper_b)) |
281 | } |
282 | |
283 | /// Creates a scope that executes within this thread-pool. |
284 | /// Equivalent to `self.install(|| scope(...))`. |
285 | /// |
286 | /// See also: [the `scope()` function][scope]. |
287 | /// |
288 | /// [scope]: fn.scope.html |
289 | pub fn scope<'scope, OP, R>(&self, op: OP) -> R |
290 | where |
291 | OP: FnOnce(&Scope<'scope>) -> R + Send, |
292 | R: Send, |
293 | { |
294 | self.install(|| scope(op)) |
295 | } |
296 | |
297 | /// Creates a scope that executes within this thread-pool. |
298 | /// Spawns from the same thread are prioritized in relative FIFO order. |
299 | /// Equivalent to `self.install(|| scope_fifo(...))`. |
300 | /// |
301 | /// See also: [the `scope_fifo()` function][scope_fifo]. |
302 | /// |
303 | /// [scope_fifo]: fn.scope_fifo.html |
304 | pub fn scope_fifo<'scope, OP, R>(&self, op: OP) -> R |
305 | where |
306 | OP: FnOnce(&ScopeFifo<'scope>) -> R + Send, |
307 | R: Send, |
308 | { |
309 | self.install(|| scope_fifo(op)) |
310 | } |
311 | |
312 | /// Creates a scope that spawns work into this thread-pool. |
313 | /// |
314 | /// See also: [the `in_place_scope()` function][in_place_scope]. |
315 | /// |
316 | /// [in_place_scope]: fn.in_place_scope.html |
317 | pub fn in_place_scope<'scope, OP, R>(&self, op: OP) -> R |
318 | where |
319 | OP: FnOnce(&Scope<'scope>) -> R, |
320 | { |
321 | do_in_place_scope(Some(&self.registry), op) |
322 | } |
323 | |
324 | /// Creates a scope that spawns work into this thread-pool in FIFO order. |
325 | /// |
326 | /// See also: [the `in_place_scope_fifo()` function][in_place_scope_fifo]. |
327 | /// |
328 | /// [in_place_scope_fifo]: fn.in_place_scope_fifo.html |
329 | pub fn in_place_scope_fifo<'scope, OP, R>(&self, op: OP) -> R |
330 | where |
331 | OP: FnOnce(&ScopeFifo<'scope>) -> R, |
332 | { |
333 | do_in_place_scope_fifo(Some(&self.registry), op) |
334 | } |
335 | |
336 | /// Spawns an asynchronous task in this thread-pool. This task will |
337 | /// run in the implicit, global scope, which means that it may outlast |
338 | /// the current stack frame -- therefore, it cannot capture any references |
339 | /// onto the stack (you will likely need a `move` closure). |
340 | /// |
341 | /// See also: [the `spawn()` function defined on scopes][spawn]. |
342 | /// |
343 | /// [spawn]: struct.Scope.html#method.spawn |
344 | pub fn spawn<OP>(&self, op: OP) |
345 | where |
346 | OP: FnOnce() + Send + 'static, |
347 | { |
348 | // We assert that `self.registry` has not terminated. |
349 | unsafe { spawn::spawn_in(op, &self.registry) } |
350 | } |
351 | |
352 | /// Spawns an asynchronous task in this thread-pool. This task will |
353 | /// run in the implicit, global scope, which means that it may outlast |
354 | /// the current stack frame -- therefore, it cannot capture any references |
355 | /// onto the stack (you will likely need a `move` closure). |
356 | /// |
357 | /// See also: [the `spawn_fifo()` function defined on scopes][spawn_fifo]. |
358 | /// |
359 | /// [spawn_fifo]: struct.ScopeFifo.html#method.spawn_fifo |
360 | pub fn spawn_fifo<OP>(&self, op: OP) |
361 | where |
362 | OP: FnOnce() + Send + 'static, |
363 | { |
364 | // We assert that `self.registry` has not terminated. |
365 | unsafe { spawn::spawn_fifo_in(op, &self.registry) } |
366 | } |
367 | |
368 | /// Spawns an asynchronous task on every thread in this thread-pool. This task |
369 | /// will run in the implicit, global scope, which means that it may outlast the |
370 | /// current stack frame -- therefore, it cannot capture any references onto the |
371 | /// stack (you will likely need a `move` closure). |
372 | pub fn spawn_broadcast<OP>(&self, op: OP) |
373 | where |
374 | OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static, |
375 | { |
376 | // We assert that `self.registry` has not terminated. |
377 | unsafe { broadcast::spawn_broadcast_in(op, &self.registry) } |
378 | } |
379 | |
380 | /// Cooperatively yields execution to Rayon. |
381 | /// |
382 | /// This is similar to the general [`yield_now()`], but only if the current |
383 | /// thread is part of *this* thread pool. |
384 | /// |
385 | /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if |
386 | /// nothing was available, or `None` if the current thread is not part this pool. |
387 | pub fn yield_now(&self) -> Option<Yield> { |
388 | let curr = self.registry.current_thread()?; |
389 | Some(curr.yield_now()) |
390 | } |
391 | |
392 | /// Cooperatively yields execution to local Rayon work. |
393 | /// |
394 | /// This is similar to the general [`yield_local()`], but only if the current |
395 | /// thread is part of *this* thread pool. |
396 | /// |
397 | /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if |
398 | /// nothing was available, or `None` if the current thread is not part this pool. |
399 | pub fn yield_local(&self) -> Option<Yield> { |
400 | let curr = self.registry.current_thread()?; |
401 | Some(curr.yield_local()) |
402 | } |
403 | } |
404 | |
405 | impl Drop for ThreadPool { |
406 | fn drop(&mut self) { |
407 | self.registry.terminate(); |
408 | } |
409 | } |
410 | |
411 | impl fmt::Debug for ThreadPool { |
412 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
413 | fmt&mut DebugStruct<'_, '_>.debug_struct("ThreadPool" ) |
414 | .field("num_threads" , &self.current_num_threads()) |
415 | .field(name:"id" , &self.registry.id()) |
416 | .finish() |
417 | } |
418 | } |
419 | |
420 | /// If called from a Rayon worker thread, returns the index of that |
421 | /// thread within its current pool; if not called from a Rayon thread, |
422 | /// returns `None`. |
423 | /// |
424 | /// The index for a given thread will not change over the thread's |
425 | /// lifetime. However, multiple threads may share the same index if |
426 | /// they are in distinct thread-pools. |
427 | /// |
428 | /// See also: [the `ThreadPool::current_thread_index()` method]. |
429 | /// |
430 | /// [m]: struct.ThreadPool.html#method.current_thread_index |
431 | /// |
432 | /// # Future compatibility note |
433 | /// |
434 | /// Currently, every thread-pool (including the global |
435 | /// thread-pool) has a fixed number of threads, but this may |
436 | /// change in future Rayon versions (see [the `num_threads()` method |
437 | /// for details][snt]). In that case, the index for a |
438 | /// thread would not change during its lifetime, but thread |
439 | /// indices may wind up being reused if threads are terminated and |
440 | /// restarted. |
441 | /// |
442 | /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads |
443 | #[inline ] |
444 | pub fn current_thread_index() -> Option<usize> { |
445 | unsafe { |
446 | let curr: &WorkerThread = WorkerThread::current().as_ref()?; |
447 | Some(curr.index()) |
448 | } |
449 | } |
450 | |
451 | /// If called from a Rayon worker thread, indicates whether that |
452 | /// thread's local deque still has pending tasks. Otherwise, returns |
453 | /// `None`. For more information, see [the |
454 | /// `ThreadPool::current_thread_has_pending_tasks()` method][m]. |
455 | /// |
456 | /// [m]: struct.ThreadPool.html#method.current_thread_has_pending_tasks |
457 | #[inline ] |
458 | pub fn current_thread_has_pending_tasks() -> Option<bool> { |
459 | unsafe { |
460 | let curr: &WorkerThread = WorkerThread::current().as_ref()?; |
461 | Some(!curr.local_deque_is_empty()) |
462 | } |
463 | } |
464 | |
465 | /// Cooperatively yields execution to Rayon. |
466 | /// |
467 | /// If the current thread is part of a rayon thread pool, this looks for a |
468 | /// single unit of pending work in the pool, then executes it. Completion of |
469 | /// that work might include nested work or further work stealing. |
470 | /// |
471 | /// This is similar to [`std::thread::yield_now()`], but does not literally make |
472 | /// that call. If you are implementing a polling loop, you may want to also |
473 | /// yield to the OS scheduler yourself if no Rayon work was found. |
474 | /// |
475 | /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if |
476 | /// nothing was available, or `None` if this thread is not part of any pool at all. |
477 | pub fn yield_now() -> Option<Yield> { |
478 | unsafe { |
479 | let thread: &WorkerThread = WorkerThread::current().as_ref()?; |
480 | Some(thread.yield_now()) |
481 | } |
482 | } |
483 | |
484 | /// Cooperatively yields execution to local Rayon work. |
485 | /// |
486 | /// If the current thread is part of a rayon thread pool, this looks for a |
487 | /// single unit of pending work in this thread's queue, then executes it. |
488 | /// Completion of that work might include nested work or further work stealing. |
489 | /// |
490 | /// This is similar to [`yield_now()`], but does not steal from other threads. |
491 | /// |
492 | /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if |
493 | /// nothing was available, or `None` if this thread is not part of any pool at all. |
494 | pub fn yield_local() -> Option<Yield> { |
495 | unsafe { |
496 | let thread: &WorkerThread = WorkerThread::current().as_ref()?; |
497 | Some(thread.yield_local()) |
498 | } |
499 | } |
500 | |
501 | /// Result of [`yield_now()`] or [`yield_local()`]. |
502 | #[derive (Clone, Copy, Debug, PartialEq, Eq)] |
503 | pub enum Yield { |
504 | /// Work was found and executed. |
505 | Executed, |
506 | /// No available work was found. |
507 | Idle, |
508 | } |
509 | |