| 1 | //! Contains support for user-managed thread pools, represented by the |
| 2 | //! the [`ThreadPool`] type (see that struct for details). |
| 3 | //! |
| 4 | //! [`ThreadPool`]: struct.ThreadPool.html |
| 5 | |
| 6 | use crate::broadcast::{self, BroadcastContext}; |
| 7 | use crate::join; |
| 8 | use crate::registry::{Registry, ThreadSpawn, WorkerThread}; |
| 9 | use crate::scope::{do_in_place_scope, do_in_place_scope_fifo}; |
| 10 | use crate::spawn; |
| 11 | use crate::{scope, Scope}; |
| 12 | use crate::{scope_fifo, ScopeFifo}; |
| 13 | use crate::{ThreadPoolBuildError, ThreadPoolBuilder}; |
| 14 | use std::error::Error; |
| 15 | use std::fmt; |
| 16 | use std::sync::Arc; |
| 17 | |
| 18 | mod test; |
| 19 | |
| 20 | /// Represents a user created [thread-pool]. |
| 21 | /// |
| 22 | /// Use a [`ThreadPoolBuilder`] to specify the number and/or names of threads |
| 23 | /// in the pool. After calling [`ThreadPoolBuilder::build()`], you can then |
| 24 | /// execute functions explicitly within this [`ThreadPool`] using |
| 25 | /// [`ThreadPool::install()`]. By contrast, top level rayon functions |
| 26 | /// (like `join()`) will execute implicitly within the current thread-pool. |
| 27 | /// |
| 28 | /// |
| 29 | /// ## Creating a ThreadPool |
| 30 | /// |
| 31 | /// ```rust |
| 32 | /// # use rayon_core as rayon; |
| 33 | /// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap(); |
| 34 | /// ``` |
| 35 | /// |
| 36 | /// [`install()`][`ThreadPool::install()`] executes a closure in one of the `ThreadPool`'s |
| 37 | /// threads. In addition, any other rayon operations called inside of `install()` will also |
| 38 | /// execute in the context of the `ThreadPool`. |
| 39 | /// |
| 40 | /// When the `ThreadPool` is dropped, that's a signal for the threads it manages to terminate, |
| 41 | /// they will complete executing any remaining work that you have spawned, and automatically |
| 42 | /// terminate. |
| 43 | /// |
| 44 | /// |
| 45 | /// [thread-pool]: https://en.wikipedia.org/wiki/Thread_pool |
| 46 | /// [`ThreadPool`]: struct.ThreadPool.html |
| 47 | /// [`ThreadPool::new()`]: struct.ThreadPool.html#method.new |
| 48 | /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html |
| 49 | /// [`ThreadPoolBuilder::build()`]: struct.ThreadPoolBuilder.html#method.build |
| 50 | /// [`ThreadPool::install()`]: struct.ThreadPool.html#method.install |
| 51 | pub struct ThreadPool { |
| 52 | registry: Arc<Registry>, |
| 53 | } |
| 54 | |
| 55 | impl ThreadPool { |
| 56 | #[deprecated (note = "Use `ThreadPoolBuilder::build`" )] |
| 57 | #[allow (deprecated)] |
| 58 | /// Deprecated in favor of `ThreadPoolBuilder::build`. |
| 59 | pub fn new(configuration: crate::Configuration) -> Result<ThreadPool, Box<dyn Error>> { |
| 60 | Self::build(configuration.into_builder()).map_err(Box::from) |
| 61 | } |
| 62 | |
| 63 | pub(super) fn build<S>( |
| 64 | builder: ThreadPoolBuilder<S>, |
| 65 | ) -> Result<ThreadPool, ThreadPoolBuildError> |
| 66 | where |
| 67 | S: ThreadSpawn, |
| 68 | { |
| 69 | let registry = Registry::new(builder)?; |
| 70 | Ok(ThreadPool { registry }) |
| 71 | } |
| 72 | |
| 73 | /// Executes `op` within the threadpool. Any attempts to use |
| 74 | /// `join`, `scope`, or parallel iterators will then operate |
| 75 | /// within that threadpool. |
| 76 | /// |
| 77 | /// # Warning: thread-local data |
| 78 | /// |
| 79 | /// Because `op` is executing within the Rayon thread-pool, |
| 80 | /// thread-local data from the current thread will not be |
| 81 | /// accessible. |
| 82 | /// |
| 83 | /// # Panics |
| 84 | /// |
| 85 | /// If `op` should panic, that panic will be propagated. |
| 86 | /// |
| 87 | /// ## Using `install()` |
| 88 | /// |
| 89 | /// ```rust |
| 90 | /// # use rayon_core as rayon; |
| 91 | /// fn main() { |
| 92 | /// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap(); |
| 93 | /// let n = pool.install(|| fib(20)); |
| 94 | /// println!("{}" , n); |
| 95 | /// } |
| 96 | /// |
| 97 | /// fn fib(n: usize) -> usize { |
| 98 | /// if n == 0 || n == 1 { |
| 99 | /// return n; |
| 100 | /// } |
| 101 | /// let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // runs inside of `pool` |
| 102 | /// return a + b; |
| 103 | /// } |
| 104 | /// ``` |
| 105 | pub fn install<OP, R>(&self, op: OP) -> R |
| 106 | where |
| 107 | OP: FnOnce() -> R + Send, |
| 108 | R: Send, |
| 109 | { |
| 110 | self.registry.in_worker(|_, _| op()) |
| 111 | } |
| 112 | |
| 113 | /// Executes `op` within every thread in the threadpool. Any attempts to use |
| 114 | /// `join`, `scope`, or parallel iterators will then operate within that |
| 115 | /// threadpool. |
| 116 | /// |
| 117 | /// Broadcasts are executed on each thread after they have exhausted their |
| 118 | /// local work queue, before they attempt work-stealing from other threads. |
| 119 | /// The goal of that strategy is to run everywhere in a timely manner |
| 120 | /// *without* being too disruptive to current work. There may be alternative |
| 121 | /// broadcast styles added in the future for more or less aggressive |
| 122 | /// injection, if the need arises. |
| 123 | /// |
| 124 | /// # Warning: thread-local data |
| 125 | /// |
| 126 | /// Because `op` is executing within the Rayon thread-pool, |
| 127 | /// thread-local data from the current thread will not be |
| 128 | /// accessible. |
| 129 | /// |
| 130 | /// # Panics |
| 131 | /// |
| 132 | /// If `op` should panic on one or more threads, exactly one panic |
| 133 | /// will be propagated, only after all threads have completed |
| 134 | /// (or panicked) their own `op`. |
| 135 | /// |
| 136 | /// # Examples |
| 137 | /// |
| 138 | /// ``` |
| 139 | /// # use rayon_core as rayon; |
| 140 | /// use std::sync::atomic::{AtomicUsize, Ordering}; |
| 141 | /// |
| 142 | /// fn main() { |
| 143 | /// let pool = rayon::ThreadPoolBuilder::new().num_threads(5).build().unwrap(); |
| 144 | /// |
| 145 | /// // The argument gives context, including the index of each thread. |
| 146 | /// let v: Vec<usize> = pool.broadcast(|ctx| ctx.index() * ctx.index()); |
| 147 | /// assert_eq!(v, &[0, 1, 4, 9, 16]); |
| 148 | /// |
| 149 | /// // The closure can reference the local stack |
| 150 | /// let count = AtomicUsize::new(0); |
| 151 | /// pool.broadcast(|_| count.fetch_add(1, Ordering::Relaxed)); |
| 152 | /// assert_eq!(count.into_inner(), 5); |
| 153 | /// } |
| 154 | /// ``` |
| 155 | pub fn broadcast<OP, R>(&self, op: OP) -> Vec<R> |
| 156 | where |
| 157 | OP: Fn(BroadcastContext<'_>) -> R + Sync, |
| 158 | R: Send, |
| 159 | { |
| 160 | // We assert that `self.registry` has not terminated. |
| 161 | unsafe { broadcast::broadcast_in(op, &self.registry) } |
| 162 | } |
| 163 | |
| 164 | /// Returns the (current) number of threads in the thread pool. |
| 165 | /// |
| 166 | /// # Future compatibility note |
| 167 | /// |
| 168 | /// Note that unless this thread-pool was created with a |
| 169 | /// [`ThreadPoolBuilder`] that specifies the number of threads, |
| 170 | /// then this number may vary over time in future versions (see [the |
| 171 | /// `num_threads()` method for details][snt]). |
| 172 | /// |
| 173 | /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads |
| 174 | /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html |
| 175 | #[inline ] |
| 176 | pub fn current_num_threads(&self) -> usize { |
| 177 | self.registry.num_threads() |
| 178 | } |
| 179 | |
| 180 | /// If called from a Rayon worker thread in this thread-pool, |
| 181 | /// returns the index of that thread; if not called from a Rayon |
| 182 | /// thread, or called from a Rayon thread that belongs to a |
| 183 | /// different thread-pool, returns `None`. |
| 184 | /// |
| 185 | /// The index for a given thread will not change over the thread's |
| 186 | /// lifetime. However, multiple threads may share the same index if |
| 187 | /// they are in distinct thread-pools. |
| 188 | /// |
| 189 | /// # Future compatibility note |
| 190 | /// |
| 191 | /// Currently, every thread-pool (including the global |
| 192 | /// thread-pool) has a fixed number of threads, but this may |
| 193 | /// change in future Rayon versions (see [the `num_threads()` method |
| 194 | /// for details][snt]). In that case, the index for a |
| 195 | /// thread would not change during its lifetime, but thread |
| 196 | /// indices may wind up being reused if threads are terminated and |
| 197 | /// restarted. |
| 198 | /// |
| 199 | /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads |
| 200 | #[inline ] |
| 201 | pub fn current_thread_index(&self) -> Option<usize> { |
| 202 | let curr = self.registry.current_thread()?; |
| 203 | Some(curr.index()) |
| 204 | } |
| 205 | |
| 206 | /// Returns true if the current worker thread currently has "local |
| 207 | /// tasks" pending. This can be useful as part of a heuristic for |
| 208 | /// deciding whether to spawn a new task or execute code on the |
| 209 | /// current thread, particularly in breadth-first |
| 210 | /// schedulers. However, keep in mind that this is an inherently |
| 211 | /// racy check, as other worker threads may be actively "stealing" |
| 212 | /// tasks from our local deque. |
| 213 | /// |
| 214 | /// **Background:** Rayon's uses a [work-stealing] scheduler. The |
| 215 | /// key idea is that each thread has its own [deque] of |
| 216 | /// tasks. Whenever a new task is spawned -- whether through |
| 217 | /// `join()`, `Scope::spawn()`, or some other means -- that new |
| 218 | /// task is pushed onto the thread's *local* deque. Worker threads |
| 219 | /// have a preference for executing their own tasks; if however |
| 220 | /// they run out of tasks, they will go try to "steal" tasks from |
| 221 | /// other threads. This function therefore has an inherent race |
| 222 | /// with other active worker threads, which may be removing items |
| 223 | /// from the local deque. |
| 224 | /// |
| 225 | /// [work-stealing]: https://en.wikipedia.org/wiki/Work_stealing |
| 226 | /// [deque]: https://en.wikipedia.org/wiki/Double-ended_queue |
| 227 | #[inline ] |
| 228 | pub fn current_thread_has_pending_tasks(&self) -> Option<bool> { |
| 229 | let curr = self.registry.current_thread()?; |
| 230 | Some(!curr.local_deque_is_empty()) |
| 231 | } |
| 232 | |
| 233 | /// Execute `oper_a` and `oper_b` in the thread-pool and return |
| 234 | /// the results. Equivalent to `self.install(|| join(oper_a, |
| 235 | /// oper_b))`. |
| 236 | pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB) |
| 237 | where |
| 238 | A: FnOnce() -> RA + Send, |
| 239 | B: FnOnce() -> RB + Send, |
| 240 | RA: Send, |
| 241 | RB: Send, |
| 242 | { |
| 243 | self.install(|| join(oper_a, oper_b)) |
| 244 | } |
| 245 | |
| 246 | /// Creates a scope that executes within this thread-pool. |
| 247 | /// Equivalent to `self.install(|| scope(...))`. |
| 248 | /// |
| 249 | /// See also: [the `scope()` function][scope]. |
| 250 | /// |
| 251 | /// [scope]: fn.scope.html |
| 252 | pub fn scope<'scope, OP, R>(&self, op: OP) -> R |
| 253 | where |
| 254 | OP: FnOnce(&Scope<'scope>) -> R + Send, |
| 255 | R: Send, |
| 256 | { |
| 257 | self.install(|| scope(op)) |
| 258 | } |
| 259 | |
| 260 | /// Creates a scope that executes within this thread-pool. |
| 261 | /// Spawns from the same thread are prioritized in relative FIFO order. |
| 262 | /// Equivalent to `self.install(|| scope_fifo(...))`. |
| 263 | /// |
| 264 | /// See also: [the `scope_fifo()` function][scope_fifo]. |
| 265 | /// |
| 266 | /// [scope_fifo]: fn.scope_fifo.html |
| 267 | pub fn scope_fifo<'scope, OP, R>(&self, op: OP) -> R |
| 268 | where |
| 269 | OP: FnOnce(&ScopeFifo<'scope>) -> R + Send, |
| 270 | R: Send, |
| 271 | { |
| 272 | self.install(|| scope_fifo(op)) |
| 273 | } |
| 274 | |
| 275 | /// Creates a scope that spawns work into this thread-pool. |
| 276 | /// |
| 277 | /// See also: [the `in_place_scope()` function][in_place_scope]. |
| 278 | /// |
| 279 | /// [in_place_scope]: fn.in_place_scope.html |
| 280 | pub fn in_place_scope<'scope, OP, R>(&self, op: OP) -> R |
| 281 | where |
| 282 | OP: FnOnce(&Scope<'scope>) -> R, |
| 283 | { |
| 284 | do_in_place_scope(Some(&self.registry), op) |
| 285 | } |
| 286 | |
| 287 | /// Creates a scope that spawns work into this thread-pool in FIFO order. |
| 288 | /// |
| 289 | /// See also: [the `in_place_scope_fifo()` function][in_place_scope_fifo]. |
| 290 | /// |
| 291 | /// [in_place_scope_fifo]: fn.in_place_scope_fifo.html |
| 292 | pub fn in_place_scope_fifo<'scope, OP, R>(&self, op: OP) -> R |
| 293 | where |
| 294 | OP: FnOnce(&ScopeFifo<'scope>) -> R, |
| 295 | { |
| 296 | do_in_place_scope_fifo(Some(&self.registry), op) |
| 297 | } |
| 298 | |
| 299 | /// Spawns an asynchronous task in this thread-pool. This task will |
| 300 | /// run in the implicit, global scope, which means that it may outlast |
| 301 | /// the current stack frame -- therefore, it cannot capture any references |
| 302 | /// onto the stack (you will likely need a `move` closure). |
| 303 | /// |
| 304 | /// See also: [the `spawn()` function defined on scopes][spawn]. |
| 305 | /// |
| 306 | /// [spawn]: struct.Scope.html#method.spawn |
| 307 | pub fn spawn<OP>(&self, op: OP) |
| 308 | where |
| 309 | OP: FnOnce() + Send + 'static, |
| 310 | { |
| 311 | // We assert that `self.registry` has not terminated. |
| 312 | unsafe { spawn::spawn_in(op, &self.registry) } |
| 313 | } |
| 314 | |
| 315 | /// Spawns an asynchronous task in this thread-pool. This task will |
| 316 | /// run in the implicit, global scope, which means that it may outlast |
| 317 | /// the current stack frame -- therefore, it cannot capture any references |
| 318 | /// onto the stack (you will likely need a `move` closure). |
| 319 | /// |
| 320 | /// See also: [the `spawn_fifo()` function defined on scopes][spawn_fifo]. |
| 321 | /// |
| 322 | /// [spawn_fifo]: struct.ScopeFifo.html#method.spawn_fifo |
| 323 | pub fn spawn_fifo<OP>(&self, op: OP) |
| 324 | where |
| 325 | OP: FnOnce() + Send + 'static, |
| 326 | { |
| 327 | // We assert that `self.registry` has not terminated. |
| 328 | unsafe { spawn::spawn_fifo_in(op, &self.registry) } |
| 329 | } |
| 330 | |
| 331 | /// Spawns an asynchronous task on every thread in this thread-pool. This task |
| 332 | /// will run in the implicit, global scope, which means that it may outlast the |
| 333 | /// current stack frame -- therefore, it cannot capture any references onto the |
| 334 | /// stack (you will likely need a `move` closure). |
| 335 | pub fn spawn_broadcast<OP>(&self, op: OP) |
| 336 | where |
| 337 | OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static, |
| 338 | { |
| 339 | // We assert that `self.registry` has not terminated. |
| 340 | unsafe { broadcast::spawn_broadcast_in(op, &self.registry) } |
| 341 | } |
| 342 | |
| 343 | /// Cooperatively yields execution to Rayon. |
| 344 | /// |
| 345 | /// This is similar to the general [`yield_now()`], but only if the current |
| 346 | /// thread is part of *this* thread pool. |
| 347 | /// |
| 348 | /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if |
| 349 | /// nothing was available, or `None` if the current thread is not part this pool. |
| 350 | pub fn yield_now(&self) -> Option<Yield> { |
| 351 | let curr = self.registry.current_thread()?; |
| 352 | Some(curr.yield_now()) |
| 353 | } |
| 354 | |
| 355 | /// Cooperatively yields execution to local Rayon work. |
| 356 | /// |
| 357 | /// This is similar to the general [`yield_local()`], but only if the current |
| 358 | /// thread is part of *this* thread pool. |
| 359 | /// |
| 360 | /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if |
| 361 | /// nothing was available, or `None` if the current thread is not part this pool. |
| 362 | pub fn yield_local(&self) -> Option<Yield> { |
| 363 | let curr = self.registry.current_thread()?; |
| 364 | Some(curr.yield_local()) |
| 365 | } |
| 366 | |
| 367 | pub(crate) fn wait_until_stopped(self) { |
| 368 | let registry = self.registry.clone(); |
| 369 | drop(self); |
| 370 | registry.wait_until_stopped(); |
| 371 | } |
| 372 | } |
| 373 | |
| 374 | impl Drop for ThreadPool { |
| 375 | fn drop(&mut self) { |
| 376 | self.registry.terminate(); |
| 377 | } |
| 378 | } |
| 379 | |
| 380 | impl fmt::Debug for ThreadPool { |
| 381 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 382 | fmt&mut DebugStruct<'_, '_>.debug_struct("ThreadPool" ) |
| 383 | .field("num_threads" , &self.current_num_threads()) |
| 384 | .field(name:"id" , &self.registry.id()) |
| 385 | .finish() |
| 386 | } |
| 387 | } |
| 388 | |
| 389 | /// If called from a Rayon worker thread, returns the index of that |
| 390 | /// thread within its current pool; if not called from a Rayon thread, |
| 391 | /// returns `None`. |
| 392 | /// |
| 393 | /// The index for a given thread will not change over the thread's |
| 394 | /// lifetime. However, multiple threads may share the same index if |
| 395 | /// they are in distinct thread-pools. |
| 396 | /// |
| 397 | /// See also: [the `ThreadPool::current_thread_index()` method]. |
| 398 | /// |
| 399 | /// [m]: struct.ThreadPool.html#method.current_thread_index |
| 400 | /// |
| 401 | /// # Future compatibility note |
| 402 | /// |
| 403 | /// Currently, every thread-pool (including the global |
| 404 | /// thread-pool) has a fixed number of threads, but this may |
| 405 | /// change in future Rayon versions (see [the `num_threads()` method |
| 406 | /// for details][snt]). In that case, the index for a |
| 407 | /// thread would not change during its lifetime, but thread |
| 408 | /// indices may wind up being reused if threads are terminated and |
| 409 | /// restarted. |
| 410 | /// |
| 411 | /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads |
| 412 | #[inline ] |
| 413 | pub fn current_thread_index() -> Option<usize> { |
| 414 | unsafe { |
| 415 | let curr: &WorkerThread = WorkerThread::current().as_ref()?; |
| 416 | Some(curr.index()) |
| 417 | } |
| 418 | } |
| 419 | |
| 420 | /// If called from a Rayon worker thread, indicates whether that |
| 421 | /// thread's local deque still has pending tasks. Otherwise, returns |
| 422 | /// `None`. For more information, see [the |
| 423 | /// `ThreadPool::current_thread_has_pending_tasks()` method][m]. |
| 424 | /// |
| 425 | /// [m]: struct.ThreadPool.html#method.current_thread_has_pending_tasks |
| 426 | #[inline ] |
| 427 | pub fn current_thread_has_pending_tasks() -> Option<bool> { |
| 428 | unsafe { |
| 429 | let curr: &WorkerThread = WorkerThread::current().as_ref()?; |
| 430 | Some(!curr.local_deque_is_empty()) |
| 431 | } |
| 432 | } |
| 433 | |
| 434 | /// Cooperatively yields execution to Rayon. |
| 435 | /// |
| 436 | /// If the current thread is part of a rayon thread pool, this looks for a |
| 437 | /// single unit of pending work in the pool, then executes it. Completion of |
| 438 | /// that work might include nested work or further work stealing. |
| 439 | /// |
| 440 | /// This is similar to [`std::thread::yield_now()`], but does not literally make |
| 441 | /// that call. If you are implementing a polling loop, you may want to also |
| 442 | /// yield to the OS scheduler yourself if no Rayon work was found. |
| 443 | /// |
| 444 | /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if |
| 445 | /// nothing was available, or `None` if this thread is not part of any pool at all. |
| 446 | pub fn yield_now() -> Option<Yield> { |
| 447 | unsafe { |
| 448 | let thread: &WorkerThread = WorkerThread::current().as_ref()?; |
| 449 | Some(thread.yield_now()) |
| 450 | } |
| 451 | } |
| 452 | |
| 453 | /// Cooperatively yields execution to local Rayon work. |
| 454 | /// |
| 455 | /// If the current thread is part of a rayon thread pool, this looks for a |
| 456 | /// single unit of pending work in this thread's queue, then executes it. |
| 457 | /// Completion of that work might include nested work or further work stealing. |
| 458 | /// |
| 459 | /// This is similar to [`yield_now()`], but does not steal from other threads. |
| 460 | /// |
| 461 | /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if |
| 462 | /// nothing was available, or `None` if this thread is not part of any pool at all. |
| 463 | pub fn yield_local() -> Option<Yield> { |
| 464 | unsafe { |
| 465 | let thread: &WorkerThread = WorkerThread::current().as_ref()?; |
| 466 | Some(thread.yield_local()) |
| 467 | } |
| 468 | } |
| 469 | |
| 470 | /// Result of [`yield_now()`] or [`yield_local()`]. |
| 471 | #[derive (Clone, Copy, Debug, PartialEq, Eq)] |
| 472 | pub enum Yield { |
| 473 | /// Work was found and executed. |
| 474 | Executed, |
| 475 | /// No available work was found. |
| 476 | Idle, |
| 477 | } |
| 478 | |