1use crate::latch::Latch;
2use crate::unwind;
3use crossbeam_deque::{Injector, Steal};
4use std::any::Any;
5use std::cell::UnsafeCell;
6use std::mem;
7use std::sync::Arc;
8
9pub(super) enum JobResult<T> {
10 None,
11 Ok(T),
12 Panic(Box<dyn Any + Send>),
13}
14
15/// A `Job` is used to advertise work for other threads that they may
16/// want to steal. In accordance with time honored tradition, jobs are
17/// arranged in a deque, so that thieves can take from the top of the
18/// deque while the main worker manages the bottom of the deque. This
19/// deque is managed by the `thread_pool` module.
20pub(super) trait Job {
21 /// Unsafe: this may be called from a different thread than the one
22 /// which scheduled the job, so the implementer must ensure the
23 /// appropriate traits are met, whether `Send`, `Sync`, or both.
24 unsafe fn execute(this: *const ());
25}
26
27/// Effectively a Job trait object. Each JobRef **must** be executed
28/// exactly once, or else data may leak.
29///
30/// Internally, we store the job's data in a `*const ()` pointer. The
31/// true type is something like `*const StackJob<...>`, but we hide
32/// it. We also carry the "execute fn" from the `Job` trait.
33pub(super) struct JobRef {
34 pointer: *const (),
35 execute_fn: unsafe fn(*const ()),
36}
37
38unsafe impl Send for JobRef {}
39unsafe impl Sync for JobRef {}
40
41impl JobRef {
42 /// Unsafe: caller asserts that `data` will remain valid until the
43 /// job is executed.
44 pub(super) unsafe fn new<T>(data: *const T) -> JobRef
45 where
46 T: Job,
47 {
48 // erase types:
49 JobRef {
50 pointer: data as *const (),
51 execute_fn: <T as Job>::execute,
52 }
53 }
54
55 /// Returns an opaque handle that can be saved and compared,
56 /// without making `JobRef` itself `Copy + Eq`.
57 #[inline]
58 pub(super) fn id(&self) -> impl Eq {
59 (self.pointer, self.execute_fn)
60 }
61
62 #[inline]
63 pub(super) unsafe fn execute(self) {
64 (self.execute_fn)(self.pointer)
65 }
66}
67
68/// A job that will be owned by a stack slot. This means that when it
69/// executes it need not free any heap data, the cleanup occurs when
70/// the stack frame is later popped. The function parameter indicates
71/// `true` if the job was stolen -- executed on a different thread.
72pub(super) struct StackJob<L, F, R>
73where
74 L: Latch + Sync,
75 F: FnOnce(bool) -> R + Send,
76 R: Send,
77{
78 pub(super) latch: L,
79 func: UnsafeCell<Option<F>>,
80 result: UnsafeCell<JobResult<R>>,
81}
82
83impl<L, F, R> StackJob<L, F, R>
84where
85 L: Latch + Sync,
86 F: FnOnce(bool) -> R + Send,
87 R: Send,
88{
89 pub(super) fn new(func: F, latch: L) -> StackJob<L, F, R> {
90 StackJob {
91 latch,
92 func: UnsafeCell::new(Some(func)),
93 result: UnsafeCell::new(JobResult::None),
94 }
95 }
96
97 pub(super) unsafe fn as_job_ref(&self) -> JobRef {
98 JobRef::new(self)
99 }
100
101 pub(super) unsafe fn run_inline(self, stolen: bool) -> R {
102 self.func.into_inner().unwrap()(stolen)
103 }
104
105 pub(super) unsafe fn into_result(self) -> R {
106 self.result.into_inner().into_return_value()
107 }
108}
109
110impl<L, F, R> Job for StackJob<L, F, R>
111where
112 L: Latch + Sync,
113 F: FnOnce(bool) -> R + Send,
114 R: Send,
115{
116 unsafe fn execute(this: *const ()) {
117 let this: &StackJob = &*(this as *const Self);
118 let abort: AbortIfPanic = unwind::AbortIfPanic;
119 let func: F = (*this.func.get()).take().unwrap();
120 (*this.result.get()) = JobResult::call(func);
121 Latch::set(&this.latch);
122 mem::forget(abort);
123 }
124}
125
126/// Represents a job stored in the heap. Used to implement
127/// `scope`. Unlike `StackJob`, when executed, `HeapJob` simply
128/// invokes a closure, which then triggers the appropriate logic to
129/// signal that the job executed.
130///
131/// (Probably `StackJob` should be refactored in a similar fashion.)
132pub(super) struct HeapJob<BODY>
133where
134 BODY: FnOnce() + Send,
135{
136 job: BODY,
137}
138
139impl<BODY> HeapJob<BODY>
140where
141 BODY: FnOnce() + Send,
142{
143 pub(super) fn new(job: BODY) -> Box<Self> {
144 Box::new(HeapJob { job })
145 }
146
147 /// Creates a `JobRef` from this job -- note that this hides all
148 /// lifetimes, so it is up to you to ensure that this JobRef
149 /// doesn't outlive any data that it closes over.
150 pub(super) unsafe fn into_job_ref(self: Box<Self>) -> JobRef {
151 JobRef::new(data:Box::into_raw(self))
152 }
153
154 /// Creates a static `JobRef` from this job.
155 pub(super) fn into_static_job_ref(self: Box<Self>) -> JobRef
156 where
157 BODY: 'static,
158 {
159 unsafe { self.into_job_ref() }
160 }
161}
162
163impl<BODY> Job for HeapJob<BODY>
164where
165 BODY: FnOnce() + Send,
166{
167 unsafe fn execute(this: *const ()) {
168 let this: Box> = Box::from_raw(this as *mut Self);
169 (this.job)();
170 }
171}
172
173/// Represents a job stored in an `Arc` -- like `HeapJob`, but may
174/// be turned into multiple `JobRef`s and called multiple times.
175pub(super) struct ArcJob<BODY>
176where
177 BODY: Fn() + Send + Sync,
178{
179 job: BODY,
180}
181
182impl<BODY> ArcJob<BODY>
183where
184 BODY: Fn() + Send + Sync,
185{
186 pub(super) fn new(job: BODY) -> Arc<Self> {
187 Arc::new(data:ArcJob { job })
188 }
189
190 /// Creates a `JobRef` from this job -- note that this hides all
191 /// lifetimes, so it is up to you to ensure that this JobRef
192 /// doesn't outlive any data that it closes over.
193 pub(super) unsafe fn as_job_ref(this: &Arc<Self>) -> JobRef {
194 JobRef::new(data:Arc::into_raw(this:Arc::clone(self:this)))
195 }
196
197 /// Creates a static `JobRef` from this job.
198 pub(super) fn as_static_job_ref(this: &Arc<Self>) -> JobRef
199 where
200 BODY: 'static,
201 {
202 unsafe { Self::as_job_ref(this) }
203 }
204}
205
206impl<BODY> Job for ArcJob<BODY>
207where
208 BODY: Fn() + Send + Sync,
209{
210 unsafe fn execute(this: *const ()) {
211 let this: Arc> = Arc::from_raw(ptr:this as *mut Self);
212 (this.job)();
213 }
214}
215
216impl<T> JobResult<T> {
217 fn call(func: impl FnOnce(bool) -> T) -> Self {
218 match unwind::halt_unwinding(|| func(true)) {
219 Ok(x: T) => JobResult::Ok(x),
220 Err(x: Box) => JobResult::Panic(x),
221 }
222 }
223
224 /// Convert the `JobResult` for a job that has finished (and hence
225 /// its JobResult is populated) into its return value.
226 ///
227 /// NB. This will panic if the job panicked.
228 pub(super) fn into_return_value(self) -> T {
229 match self {
230 JobResult::None => unreachable!(),
231 JobResult::Ok(x: T) => x,
232 JobResult::Panic(x: Box) => unwind::resume_unwinding(payload:x),
233 }
234 }
235}
236
237/// Indirect queue to provide FIFO job priority.
238pub(super) struct JobFifo {
239 inner: Injector<JobRef>,
240}
241
242impl JobFifo {
243 pub(super) fn new() -> Self {
244 JobFifo {
245 inner: Injector::new(),
246 }
247 }
248
249 pub(super) unsafe fn push(&self, job_ref: JobRef) -> JobRef {
250 // A little indirection ensures that spawns are always prioritized in FIFO order. The
251 // jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front
252 // (FIFO), but either way they will end up popping from the front of this queue.
253 self.inner.push(task:job_ref);
254 JobRef::new(self)
255 }
256}
257
258impl Job for JobFifo {
259 unsafe fn execute(this: *const ()) {
260 // We "execute" a queue by executing its first job, FIFO.
261 let this: &JobFifo = &*(this as *const Self);
262 loop {
263 match this.inner.steal() {
264 Steal::Success(job_ref: JobRef) => break job_ref.execute(),
265 Steal::Empty => panic!("FIFO is empty"),
266 Steal::Retry => {}
267 }
268 }
269 }
270}
271