1//! Methods for custom fork-join scopes, created by the [`scope()`]
2//! and [`in_place_scope()`] functions. These are a more flexible alternative to [`join()`].
3//!
4//! [`scope()`]: fn.scope.html
5//! [`in_place_scope()`]: fn.in_place_scope.html
6//! [`join()`]: ../join/join.fn.html
7
8use crate::broadcast::BroadcastContext;
9use crate::job::{ArcJob, HeapJob, JobFifo, JobRef};
10use crate::latch::{CountLatch, CountLockLatch, Latch};
11use crate::registry::{global_registry, in_worker, Registry, WorkerThread};
12use crate::tlv::{self, Tlv};
13use crate::unwind;
14use std::any::Any;
15use std::fmt;
16use std::marker::PhantomData;
17use std::mem::ManuallyDrop;
18use std::ptr;
19use std::sync::atomic::{AtomicPtr, Ordering};
20use std::sync::Arc;
21
22#[cfg(test)]
23mod test;
24
25/// Represents a fork-join scope which can be used to spawn any number of tasks.
26/// See [`scope()`] for more information.
27///
28///[`scope()`]: fn.scope.html
29pub struct Scope<'scope> {
30 base: ScopeBase<'scope>,
31}
32
33/// Represents a fork-join scope which can be used to spawn any number of tasks.
34/// Those spawned from the same thread are prioritized in relative FIFO order.
35/// See [`scope_fifo()`] for more information.
36///
37///[`scope_fifo()`]: fn.scope_fifo.html
38pub struct ScopeFifo<'scope> {
39 base: ScopeBase<'scope>,
40 fifos: Vec<JobFifo>,
41}
42
43pub(super) enum ScopeLatch {
44 /// A latch for scopes created on a rayon thread which will participate in work-
45 /// stealing while it waits for completion. This thread is not necessarily part
46 /// of the same registry as the scope itself!
47 Stealing {
48 latch: CountLatch,
49 /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool
50 /// with registry B, when a job completes in a thread of registry B, we may
51 /// need to call `latch.set_and_tickle_one()` to wake the thread in registry A.
52 /// That means we need a reference to registry A (since at that point we will
53 /// only have a reference to registry B), so we stash it here.
54 registry: Arc<Registry>,
55 /// The index of the worker to wake in `registry`
56 worker_index: usize,
57 },
58
59 /// A latch for scopes created on a non-rayon thread which will block to wait.
60 Blocking { latch: CountLockLatch },
61}
62
63struct ScopeBase<'scope> {
64 /// thread registry where `scope()` was executed or where `in_place_scope()`
65 /// should spawn jobs.
66 registry: Arc<Registry>,
67
68 /// if some job panicked, the error is stored here; it will be
69 /// propagated to the one who created the scope
70 panic: AtomicPtr<Box<dyn Any + Send + 'static>>,
71
72 /// latch to track job counts
73 job_completed_latch: ScopeLatch,
74
75 /// You can think of a scope as containing a list of closures to execute,
76 /// all of which outlive `'scope`. They're not actually required to be
77 /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because
78 /// the closures are only *moved* across threads to be executed.
79 marker: PhantomData<Box<dyn FnOnce(&Scope<'scope>) + Send + Sync + 'scope>>,
80
81 /// The TLV at the scope's creation. Used to set the TLV for spawned jobs.
82 tlv: Tlv,
83}
84
85/// Creates a "fork-join" scope `s` and invokes the closure with a
86/// reference to `s`. This closure can then spawn asynchronous tasks
87/// into `s`. Those tasks may run asynchronously with respect to the
88/// closure; they may themselves spawn additional tasks into `s`. When
89/// the closure returns, it will block until all tasks that have been
90/// spawned into `s` complete.
91///
92/// `scope()` is a more flexible building block compared to `join()`,
93/// since a loop can be used to spawn any number of tasks without
94/// recursing. However, that flexibility comes at a performance price:
95/// tasks spawned using `scope()` must be allocated onto the heap,
96/// whereas `join()` can make exclusive use of the stack. **Prefer
97/// `join()` (or, even better, parallel iterators) where possible.**
98///
99/// # Example
100///
101/// The Rayon `join()` function launches two closures and waits for them
102/// to stop. One could implement `join()` using a scope like so, although
103/// it would be less efficient than the real implementation:
104///
105/// ```rust
106/// # use rayon_core as rayon;
107/// pub fn join<A,B,RA,RB>(oper_a: A, oper_b: B) -> (RA, RB)
108/// where A: FnOnce() -> RA + Send,
109/// B: FnOnce() -> RB + Send,
110/// RA: Send,
111/// RB: Send,
112/// {
113/// let mut result_a: Option<RA> = None;
114/// let mut result_b: Option<RB> = None;
115/// rayon::scope(|s| {
116/// s.spawn(|_| result_a = Some(oper_a()));
117/// s.spawn(|_| result_b = Some(oper_b()));
118/// });
119/// (result_a.unwrap(), result_b.unwrap())
120/// }
121/// ```
122///
123/// # A note on threading
124///
125/// The closure given to `scope()` executes in the Rayon thread-pool,
126/// as do those given to `spawn()`. This means that you can't access
127/// thread-local variables (well, you can, but they may have
128/// unexpected values).
129///
130/// # Task execution
131///
132/// Task execution potentially starts as soon as `spawn()` is called.
133/// The task will end sometime before `scope()` returns. Note that the
134/// *closure* given to scope may return much earlier. In general
135/// the lifetime of a scope created like `scope(body)` goes something like this:
136///
137/// - Scope begins when `scope(body)` is called
138/// - Scope body `body()` is invoked
139/// - Scope tasks may be spawned
140/// - Scope body returns
141/// - Scope tasks execute, possibly spawning more tasks
142/// - Once all tasks are done, scope ends and `scope()` returns
143///
144/// To see how and when tasks are joined, consider this example:
145///
146/// ```rust
147/// # use rayon_core as rayon;
148/// // point start
149/// rayon::scope(|s| {
150/// s.spawn(|s| { // task s.1
151/// s.spawn(|s| { // task s.1.1
152/// rayon::scope(|t| {
153/// t.spawn(|_| ()); // task t.1
154/// t.spawn(|_| ()); // task t.2
155/// });
156/// });
157/// });
158/// s.spawn(|s| { // task s.2
159/// });
160/// // point mid
161/// });
162/// // point end
163/// ```
164///
165/// The various tasks that are run will execute roughly like so:
166///
167/// ```notrust
168/// | (start)
169/// |
170/// | (scope `s` created)
171/// +-----------------------------------------------+ (task s.2)
172/// +-------+ (task s.1) |
173/// | | |
174/// | +---+ (task s.1.1) |
175/// | | | |
176/// | | | (scope `t` created) |
177/// | | +----------------+ (task t.2) |
178/// | | +---+ (task t.1) | |
179/// | (mid) | | | | |
180/// : | + <-+------------+ (scope `t` ends) |
181/// : | | |
182/// |<------+---+-----------------------------------+ (scope `s` ends)
183/// |
184/// | (end)
185/// ```
186///
187/// The point here is that everything spawned into scope `s` will
188/// terminate (at latest) at the same point -- right before the
189/// original call to `rayon::scope` returns. This includes new
190/// subtasks created by other subtasks (e.g., task `s.1.1`). If a new
191/// scope is created (such as `t`), the things spawned into that scope
192/// will be joined before that scope returns, which in turn occurs
193/// before the creating task (task `s.1.1` in this case) finishes.
194///
195/// There is no guaranteed order of execution for spawns in a scope,
196/// given that other threads may steal tasks at any time. However, they
197/// are generally prioritized in a LIFO order on the thread from which
198/// they were spawned. So in this example, absent any stealing, we can
199/// expect `s.2` to execute before `s.1`, and `t.2` before `t.1`. Other
200/// threads always steal from the other end of the deque, like FIFO
201/// order. The idea is that "recent" tasks are most likely to be fresh
202/// in the local CPU's cache, while other threads can steal older
203/// "stale" tasks. For an alternate approach, consider
204/// [`scope_fifo()`] instead.
205///
206/// [`scope_fifo()`]: fn.scope_fifo.html
207///
208/// # Accessing stack data
209///
210/// In general, spawned tasks may access stack data in place that
211/// outlives the scope itself. Other data must be fully owned by the
212/// spawned task.
213///
214/// ```rust
215/// # use rayon_core as rayon;
216/// let ok: Vec<i32> = vec![1, 2, 3];
217/// rayon::scope(|s| {
218/// let bad: Vec<i32> = vec![4, 5, 6];
219/// s.spawn(|_| {
220/// // We can access `ok` because outlives the scope `s`.
221/// println!("ok: {:?}", ok);
222///
223/// // If we just try to use `bad` here, the closure will borrow `bad`
224/// // (because we are just printing it out, and that only requires a
225/// // borrow), which will result in a compilation error. Read on
226/// // for options.
227/// // println!("bad: {:?}", bad);
228/// });
229/// });
230/// ```
231///
232/// As the comments example above suggest, to reference `bad` we must
233/// take ownership of it. One way to do this is to detach the closure
234/// from the surrounding stack frame, using the `move` keyword. This
235/// will cause it to take ownership of *all* the variables it touches,
236/// in this case including both `ok` *and* `bad`:
237///
238/// ```rust
239/// # use rayon_core as rayon;
240/// let ok: Vec<i32> = vec![1, 2, 3];
241/// rayon::scope(|s| {
242/// let bad: Vec<i32> = vec![4, 5, 6];
243/// s.spawn(move |_| {
244/// println!("ok: {:?}", ok);
245/// println!("bad: {:?}", bad);
246/// });
247///
248/// // That closure is fine, but now we can't use `ok` anywhere else,
249/// // since it is owned by the previous task:
250/// // s.spawn(|_| println!("ok: {:?}", ok));
251/// });
252/// ```
253///
254/// While this works, it could be a problem if we want to use `ok` elsewhere.
255/// There are two choices. We can keep the closure as a `move` closure, but
256/// instead of referencing the variable `ok`, we create a shadowed variable that
257/// is a borrow of `ok` and capture *that*:
258///
259/// ```rust
260/// # use rayon_core as rayon;
261/// let ok: Vec<i32> = vec![1, 2, 3];
262/// rayon::scope(|s| {
263/// let bad: Vec<i32> = vec![4, 5, 6];
264/// let ok: &Vec<i32> = &ok; // shadow the original `ok`
265/// s.spawn(move |_| {
266/// println!("ok: {:?}", ok); // captures the shadowed version
267/// println!("bad: {:?}", bad);
268/// });
269///
270/// // Now we too can use the shadowed `ok`, since `&Vec<i32>` references
271/// // can be shared freely. Note that we need a `move` closure here though,
272/// // because otherwise we'd be trying to borrow the shadowed `ok`,
273/// // and that doesn't outlive `scope`.
274/// s.spawn(move |_| println!("ok: {:?}", ok));
275/// });
276/// ```
277///
278/// Another option is not to use the `move` keyword but instead to take ownership
279/// of individual variables:
280///
281/// ```rust
282/// # use rayon_core as rayon;
283/// let ok: Vec<i32> = vec![1, 2, 3];
284/// rayon::scope(|s| {
285/// let bad: Vec<i32> = vec![4, 5, 6];
286/// s.spawn(|_| {
287/// // Transfer ownership of `bad` into a local variable (also named `bad`).
288/// // This will force the closure to take ownership of `bad` from the environment.
289/// let bad = bad;
290/// println!("ok: {:?}", ok); // `ok` is only borrowed.
291/// println!("bad: {:?}", bad); // refers to our local variable, above.
292/// });
293///
294/// s.spawn(|_| println!("ok: {:?}", ok)); // we too can borrow `ok`
295/// });
296/// ```
297///
298/// # Panics
299///
300/// If a panic occurs, either in the closure given to `scope()` or in
301/// any of the spawned jobs, that panic will be propagated and the
302/// call to `scope()` will panic. If multiple panics occurs, it is
303/// non-deterministic which of their panic values will propagate.
304/// Regardless, once a task is spawned using `scope.spawn()`, it will
305/// execute, even if the spawning task should later panic. `scope()`
306/// returns once all spawned jobs have completed, and any panics are
307/// propagated at that point.
308pub fn scope<'scope, OP, R>(op: OP) -> R
309where
310 OP: FnOnce(&Scope<'scope>) -> R + Send,
311 R: Send,
312{
313 in_worker(|owner_thread: &WorkerThread, _| {
314 let scope: Scope<'_> = Scope::<'scope>::new(owner:Some(owner_thread), registry:None);
315 scope.base.complete(owner:Some(owner_thread), || op(&scope))
316 })
317}
318
319/// Creates a "fork-join" scope `s` with FIFO order, and invokes the
320/// closure with a reference to `s`. This closure can then spawn
321/// asynchronous tasks into `s`. Those tasks may run asynchronously with
322/// respect to the closure; they may themselves spawn additional tasks
323/// into `s`. When the closure returns, it will block until all tasks
324/// that have been spawned into `s` complete.
325///
326/// # Task execution
327///
328/// Tasks in a `scope_fifo()` run similarly to [`scope()`], but there's a
329/// difference in the order of execution. Consider a similar example:
330///
331/// [`scope()`]: fn.scope.html
332///
333/// ```rust
334/// # use rayon_core as rayon;
335/// // point start
336/// rayon::scope_fifo(|s| {
337/// s.spawn_fifo(|s| { // task s.1
338/// s.spawn_fifo(|s| { // task s.1.1
339/// rayon::scope_fifo(|t| {
340/// t.spawn_fifo(|_| ()); // task t.1
341/// t.spawn_fifo(|_| ()); // task t.2
342/// });
343/// });
344/// });
345/// s.spawn_fifo(|s| { // task s.2
346/// });
347/// // point mid
348/// });
349/// // point end
350/// ```
351///
352/// The various tasks that are run will execute roughly like so:
353///
354/// ```notrust
355/// | (start)
356/// |
357/// | (FIFO scope `s` created)
358/// +--------------------+ (task s.1)
359/// +-------+ (task s.2) |
360/// | | +---+ (task s.1.1)
361/// | | | |
362/// | | | | (FIFO scope `t` created)
363/// | | | +----------------+ (task t.1)
364/// | | | +---+ (task t.2) |
365/// | (mid) | | | | |
366/// : | | + <-+------------+ (scope `t` ends)
367/// : | | |
368/// |<------+------------+---+ (scope `s` ends)
369/// |
370/// | (end)
371/// ```
372///
373/// Under `scope_fifo()`, the spawns are prioritized in a FIFO order on
374/// the thread from which they were spawned, as opposed to `scope()`'s
375/// LIFO. So in this example, we can expect `s.1` to execute before
376/// `s.2`, and `t.1` before `t.2`. Other threads also steal tasks in
377/// FIFO order, as usual. Overall, this has roughly the same order as
378/// the now-deprecated [`breadth_first`] option, except the effect is
379/// isolated to a particular scope. If spawns are intermingled from any
380/// combination of `scope()` and `scope_fifo()`, or from different
381/// threads, their order is only specified with respect to spawns in the
382/// same scope and thread.
383///
384/// For more details on this design, see Rayon [RFC #1].
385///
386/// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first
387/// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
388///
389/// # Panics
390///
391/// If a panic occurs, either in the closure given to `scope_fifo()` or
392/// in any of the spawned jobs, that panic will be propagated and the
393/// call to `scope_fifo()` will panic. If multiple panics occurs, it is
394/// non-deterministic which of their panic values will propagate.
395/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it
396/// will execute, even if the spawning task should later panic.
397/// `scope_fifo()` returns once all spawned jobs have completed, and any
398/// panics are propagated at that point.
399pub fn scope_fifo<'scope, OP, R>(op: OP) -> R
400where
401 OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
402 R: Send,
403{
404 in_worker(|owner_thread: &WorkerThread, _| {
405 let scope: ScopeFifo<'_> = ScopeFifo::<'scope>::new(owner:Some(owner_thread), registry:None);
406 scope.base.complete(owner:Some(owner_thread), || op(&scope))
407 })
408}
409
410/// Creates a "fork-join" scope `s` and invokes the closure with a
411/// reference to `s`. This closure can then spawn asynchronous tasks
412/// into `s`. Those tasks may run asynchronously with respect to the
413/// closure; they may themselves spawn additional tasks into `s`. When
414/// the closure returns, it will block until all tasks that have been
415/// spawned into `s` complete.
416///
417/// This is just like `scope()` except the closure runs on the same thread
418/// that calls `in_place_scope()`. Only work that it spawns runs in the
419/// thread pool.
420///
421/// # Panics
422///
423/// If a panic occurs, either in the closure given to `in_place_scope()` or in
424/// any of the spawned jobs, that panic will be propagated and the
425/// call to `in_place_scope()` will panic. If multiple panics occurs, it is
426/// non-deterministic which of their panic values will propagate.
427/// Regardless, once a task is spawned using `scope.spawn()`, it will
428/// execute, even if the spawning task should later panic. `in_place_scope()`
429/// returns once all spawned jobs have completed, and any panics are
430/// propagated at that point.
431pub fn in_place_scope<'scope, OP, R>(op: OP) -> R
432where
433 OP: FnOnce(&Scope<'scope>) -> R,
434{
435 do_in_place_scope(registry:None, op)
436}
437
438pub(crate) fn do_in_place_scope<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
439where
440 OP: FnOnce(&Scope<'scope>) -> R,
441{
442 let thread: Option<&WorkerThread> = unsafe { WorkerThread::current().as_ref() };
443 let scope: Scope<'_> = Scope::<'scope>::new(owner:thread, registry);
444 scope.base.complete(owner:thread, || op(&scope))
445}
446
447/// Creates a "fork-join" scope `s` with FIFO order, and invokes the
448/// closure with a reference to `s`. This closure can then spawn
449/// asynchronous tasks into `s`. Those tasks may run asynchronously with
450/// respect to the closure; they may themselves spawn additional tasks
451/// into `s`. When the closure returns, it will block until all tasks
452/// that have been spawned into `s` complete.
453///
454/// This is just like `scope_fifo()` except the closure runs on the same thread
455/// that calls `in_place_scope_fifo()`. Only work that it spawns runs in the
456/// thread pool.
457///
458/// # Panics
459///
460/// If a panic occurs, either in the closure given to `in_place_scope_fifo()` or in
461/// any of the spawned jobs, that panic will be propagated and the
462/// call to `in_place_scope_fifo()` will panic. If multiple panics occurs, it is
463/// non-deterministic which of their panic values will propagate.
464/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it will
465/// execute, even if the spawning task should later panic. `in_place_scope_fifo()`
466/// returns once all spawned jobs have completed, and any panics are
467/// propagated at that point.
468pub fn in_place_scope_fifo<'scope, OP, R>(op: OP) -> R
469where
470 OP: FnOnce(&ScopeFifo<'scope>) -> R,
471{
472 do_in_place_scope_fifo(registry:None, op)
473}
474
475pub(crate) fn do_in_place_scope_fifo<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
476where
477 OP: FnOnce(&ScopeFifo<'scope>) -> R,
478{
479 let thread: Option<&WorkerThread> = unsafe { WorkerThread::current().as_ref() };
480 let scope: ScopeFifo<'_> = ScopeFifo::<'scope>::new(owner:thread, registry);
481 scope.base.complete(owner:thread, || op(&scope))
482}
483
484impl<'scope> Scope<'scope> {
485 fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
486 let base = ScopeBase::new(owner, registry);
487 Scope { base }
488 }
489
490 /// Spawns a job into the fork-join scope `self`. This job will
491 /// execute sometime before the fork-join scope completes. The
492 /// job is specified as a closure, and this closure receives its
493 /// own reference to the scope `self` as argument. This can be
494 /// used to inject new jobs into `self`.
495 ///
496 /// # Returns
497 ///
498 /// Nothing. The spawned closures cannot pass back values to the
499 /// caller directly, though they can write to local variables on
500 /// the stack (if those variables outlive the scope) or
501 /// communicate through shared channels.
502 ///
503 /// (The intention is to eventually integrate with Rust futures to
504 /// support spawns of functions that compute a value.)
505 ///
506 /// # Examples
507 ///
508 /// ```rust
509 /// # use rayon_core as rayon;
510 /// let mut value_a = None;
511 /// let mut value_b = None;
512 /// let mut value_c = None;
513 /// rayon::scope(|s| {
514 /// s.spawn(|s1| {
515 /// // ^ this is the same scope as `s`; this handle `s1`
516 /// // is intended for use by the spawned task,
517 /// // since scope handles cannot cross thread boundaries.
518 ///
519 /// value_a = Some(22);
520 ///
521 /// // the scope `s` will not end until all these tasks are done
522 /// s1.spawn(|_| {
523 /// value_b = Some(44);
524 /// });
525 /// });
526 ///
527 /// s.spawn(|_| {
528 /// value_c = Some(66);
529 /// });
530 /// });
531 /// assert_eq!(value_a, Some(22));
532 /// assert_eq!(value_b, Some(44));
533 /// assert_eq!(value_c, Some(66));
534 /// ```
535 ///
536 /// # See also
537 ///
538 /// The [`scope` function] has more extensive documentation about
539 /// task spawning.
540 ///
541 /// [`scope` function]: fn.scope.html
542 pub fn spawn<BODY>(&self, body: BODY)
543 where
544 BODY: FnOnce(&Scope<'scope>) + Send + 'scope,
545 {
546 let scope_ptr = ScopePtr(self);
547 let job = HeapJob::new(self.base.tlv, move || unsafe {
548 // SAFETY: this job will execute before the scope ends.
549 let scope = scope_ptr.as_ref();
550 ScopeBase::execute_job(&scope.base, move || body(scope))
551 });
552 let job_ref = self.base.heap_job_ref(job);
553
554 // Since `Scope` implements `Sync`, we can't be sure that we're still in a
555 // thread of this pool, so we can't just push to the local worker thread.
556 // Also, this might be an in-place scope.
557 self.base.registry.inject_or_push(job_ref);
558 }
559
560 /// Spawns a job into every thread of the fork-join scope `self`. This job will
561 /// execute on each thread sometime before the fork-join scope completes. The
562 /// job is specified as a closure, and this closure receives its own reference
563 /// to the scope `self` as argument, as well as a `BroadcastContext`.
564 pub fn spawn_broadcast<BODY>(&self, body: BODY)
565 where
566 BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
567 {
568 let scope_ptr = ScopePtr(self);
569 let job = ArcJob::new(move || unsafe {
570 // SAFETY: this job will execute before the scope ends.
571 let scope = scope_ptr.as_ref();
572 let body = &body;
573 let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
574 ScopeBase::execute_job(&scope.base, func)
575 });
576 self.base.inject_broadcast(job)
577 }
578}
579
580impl<'scope> ScopeFifo<'scope> {
581 fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
582 let base = ScopeBase::new(owner, registry);
583 let num_threads = base.registry.num_threads();
584 let fifos = (0..num_threads).map(|_| JobFifo::new()).collect();
585 ScopeFifo { base, fifos }
586 }
587
588 /// Spawns a job into the fork-join scope `self`. This job will
589 /// execute sometime before the fork-join scope completes. The
590 /// job is specified as a closure, and this closure receives its
591 /// own reference to the scope `self` as argument. This can be
592 /// used to inject new jobs into `self`.
593 ///
594 /// # See also
595 ///
596 /// This method is akin to [`Scope::spawn()`], but with a FIFO
597 /// priority. The [`scope_fifo` function] has more details about
598 /// this distinction.
599 ///
600 /// [`Scope::spawn()`]: struct.Scope.html#method.spawn
601 /// [`scope_fifo` function]: fn.scope_fifo.html
602 pub fn spawn_fifo<BODY>(&self, body: BODY)
603 where
604 BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope,
605 {
606 let scope_ptr = ScopePtr(self);
607 let job = HeapJob::new(self.base.tlv, move || unsafe {
608 // SAFETY: this job will execute before the scope ends.
609 let scope = scope_ptr.as_ref();
610 ScopeBase::execute_job(&scope.base, move || body(scope))
611 });
612 let job_ref = self.base.heap_job_ref(job);
613
614 // If we're in the pool, use our scope's private fifo for this thread to execute
615 // in a locally-FIFO order. Otherwise, just use the pool's global injector.
616 match self.base.registry.current_thread() {
617 Some(worker) => {
618 let fifo = &self.fifos[worker.index()];
619 // SAFETY: this job will execute before the scope ends.
620 unsafe { worker.push(fifo.push(job_ref)) };
621 }
622 None => self.base.registry.inject(job_ref),
623 }
624 }
625
626 /// Spawns a job into every thread of the fork-join scope `self`. This job will
627 /// execute on each thread sometime before the fork-join scope completes. The
628 /// job is specified as a closure, and this closure receives its own reference
629 /// to the scope `self` as argument, as well as a `BroadcastContext`.
630 pub fn spawn_broadcast<BODY>(&self, body: BODY)
631 where
632 BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
633 {
634 let scope_ptr = ScopePtr(self);
635 let job = ArcJob::new(move || unsafe {
636 // SAFETY: this job will execute before the scope ends.
637 let scope = scope_ptr.as_ref();
638 let body = &body;
639 let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
640 ScopeBase::execute_job(&scope.base, func)
641 });
642 self.base.inject_broadcast(job)
643 }
644}
645
646impl<'scope> ScopeBase<'scope> {
647 /// Creates the base of a new scope for the given registry
648 fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
649 let registry = registry.unwrap_or_else(|| match owner {
650 Some(owner) => owner.registry(),
651 None => global_registry(),
652 });
653
654 ScopeBase {
655 registry: Arc::clone(registry),
656 panic: AtomicPtr::new(ptr::null_mut()),
657 job_completed_latch: ScopeLatch::new(owner),
658 marker: PhantomData,
659 tlv: tlv::get(),
660 }
661 }
662
663 fn increment(&self) {
664 self.job_completed_latch.increment();
665 }
666
667 fn heap_job_ref<FUNC>(&self, job: Box<HeapJob<FUNC>>) -> JobRef
668 where
669 FUNC: FnOnce() + Send + 'scope,
670 {
671 unsafe {
672 self.increment();
673 job.into_job_ref()
674 }
675 }
676
677 fn inject_broadcast<FUNC>(&self, job: Arc<ArcJob<FUNC>>)
678 where
679 FUNC: Fn() + Send + Sync + 'scope,
680 {
681 let n_threads = self.registry.num_threads();
682 let job_refs = (0..n_threads).map(|_| unsafe {
683 self.increment();
684 ArcJob::as_job_ref(&job)
685 });
686
687 self.registry.inject_broadcast(job_refs);
688 }
689
690 /// Executes `func` as a job, either aborting or executing as
691 /// appropriate.
692 fn complete<FUNC, R>(&self, owner: Option<&WorkerThread>, func: FUNC) -> R
693 where
694 FUNC: FnOnce() -> R,
695 {
696 let result = unsafe { Self::execute_job_closure(self, func) };
697 self.job_completed_latch.wait(owner);
698
699 // Restore the TLV if we ran some jobs while waiting
700 tlv::set(self.tlv);
701
702 self.maybe_propagate_panic();
703 result.unwrap() // only None if `op` panicked, and that would have been propagated
704 }
705
706 /// Executes `func` as a job, either aborting or executing as
707 /// appropriate.
708 unsafe fn execute_job<FUNC>(this: *const Self, func: FUNC)
709 where
710 FUNC: FnOnce(),
711 {
712 let _: Option<()> = Self::execute_job_closure(this, func);
713 }
714
715 /// Executes `func` as a job in scope. Adjusts the "job completed"
716 /// counters and also catches any panic and stores it into
717 /// `scope`.
718 unsafe fn execute_job_closure<FUNC, R>(this: *const Self, func: FUNC) -> Option<R>
719 where
720 FUNC: FnOnce() -> R,
721 {
722 match unwind::halt_unwinding(func) {
723 Ok(r) => {
724 Latch::set(&(*this).job_completed_latch);
725 Some(r)
726 }
727 Err(err) => {
728 (*this).job_panicked(err);
729 Latch::set(&(*this).job_completed_latch);
730 None
731 }
732 }
733 }
734
735 fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) {
736 // capture the first error we see, free the rest
737 if self.panic.load(Ordering::Relaxed).is_null() {
738 let nil = ptr::null_mut();
739 let mut err = ManuallyDrop::new(Box::new(err)); // box up the fat ptr
740 let err_ptr: *mut Box<dyn Any + Send + 'static> = &mut **err;
741 if self
742 .panic
743 .compare_exchange(nil, err_ptr, Ordering::Release, Ordering::Relaxed)
744 .is_ok()
745 {
746 // ownership now transferred into self.panic
747 } else {
748 // another panic raced in ahead of us, so drop ours
749 let _: Box<Box<_>> = ManuallyDrop::into_inner(err);
750 }
751 }
752 }
753
754 fn maybe_propagate_panic(&self) {
755 // propagate panic, if any occurred; at this point, all
756 // outstanding jobs have completed, so we can use a relaxed
757 // ordering:
758 let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed);
759 if !panic.is_null() {
760 let value = unsafe { Box::from_raw(panic) };
761
762 // Restore the TLV if we ran some jobs while waiting
763 tlv::set(self.tlv);
764
765 unwind::resume_unwinding(*value);
766 }
767 }
768}
769
770impl ScopeLatch {
771 fn new(owner: Option<&WorkerThread>) -> Self {
772 Self::with_count(1, owner)
773 }
774
775 pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self {
776 match owner {
777 Some(owner) => ScopeLatch::Stealing {
778 latch: CountLatch::with_count(count),
779 registry: Arc::clone(owner.registry()),
780 worker_index: owner.index(),
781 },
782 None => ScopeLatch::Blocking {
783 latch: CountLockLatch::with_count(count),
784 },
785 }
786 }
787
788 fn increment(&self) {
789 match self {
790 ScopeLatch::Stealing { latch, .. } => latch.increment(),
791 ScopeLatch::Blocking { latch } => latch.increment(),
792 }
793 }
794
795 pub(super) fn wait(&self, owner: Option<&WorkerThread>) {
796 match self {
797 ScopeLatch::Stealing {
798 latch,
799 registry,
800 worker_index,
801 } => unsafe {
802 let owner = owner.expect("owner thread");
803 debug_assert_eq!(registry.id(), owner.registry().id());
804 debug_assert_eq!(*worker_index, owner.index());
805 owner.wait_until(latch);
806 },
807 ScopeLatch::Blocking { latch } => latch.wait(),
808 }
809 }
810}
811
812impl Latch for ScopeLatch {
813 unsafe fn set(this: *const Self) {
814 match &*this {
815 ScopeLatch::Stealing {
816 latch: &CountLatch,
817 registry: &Arc,
818 worker_index: &usize,
819 } => CountLatch::set_and_tickle_one(this:latch, registry, *worker_index),
820 ScopeLatch::Blocking { latch: &CountLockLatch } => Latch::set(this:latch),
821 }
822 }
823}
824
825impl<'scope> fmt::Debug for Scope<'scope> {
826 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
827 fmt&mut DebugStruct<'_, '_>.debug_struct("Scope")
828 .field("pool_id", &self.base.registry.id())
829 .field("panic", &self.base.panic)
830 .field(name:"job_completed_latch", &self.base.job_completed_latch)
831 .finish()
832 }
833}
834
835impl<'scope> fmt::Debug for ScopeFifo<'scope> {
836 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
837 fmt&mut DebugStruct<'_, '_>.debug_struct("ScopeFifo")
838 .field("num_fifos", &self.fifos.len())
839 .field("pool_id", &self.base.registry.id())
840 .field("panic", &self.base.panic)
841 .field(name:"job_completed_latch", &self.base.job_completed_latch)
842 .finish()
843 }
844}
845
846impl fmt::Debug for ScopeLatch {
847 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
848 match self {
849 ScopeLatch::Stealing { latch: &CountLatch, .. } => fmt&mut DebugTuple<'_, '_>
850 .debug_tuple(name:"ScopeLatch::Stealing")
851 .field(latch)
852 .finish(),
853 ScopeLatch::Blocking { latch: &CountLockLatch } => fmt&mut DebugTuple<'_, '_>
854 .debug_tuple(name:"ScopeLatch::Blocking")
855 .field(latch)
856 .finish(),
857 }
858 }
859}
860
861/// Used to capture a scope `&Self` pointer in jobs, without faking a lifetime.
862///
863/// Unsafe code is still required to dereference the pointer, but that's fine in
864/// scope jobs that are guaranteed to execute before the scope ends.
865struct ScopePtr<T>(*const T);
866
867// SAFETY: !Send for raw pointers is not for safety, just as a lint
868unsafe impl<T: Sync> Send for ScopePtr<T> {}
869
870// SAFETY: !Sync for raw pointers is not for safety, just as a lint
871unsafe impl<T: Sync> Sync for ScopePtr<T> {}
872
873impl<T> ScopePtr<T> {
874 // Helper to avoid disjoint captures of `scope_ptr.0`
875 unsafe fn as_ref(&self) -> &T {
876 &*self.0
877 }
878}
879