1 | use crate::latch::Latch; |
2 | use crate::unwind; |
3 | use crossbeam_deque::{Injector, Steal}; |
4 | use std::any::Any; |
5 | use std::cell::UnsafeCell; |
6 | use std::mem; |
7 | use std::sync::Arc; |
8 | |
9 | pub(super) enum JobResult<T> { |
10 | None, |
11 | Ok(T), |
12 | Panic(Box<dyn Any + Send>), |
13 | } |
14 | |
15 | /// A `Job` is used to advertise work for other threads that they may |
16 | /// want to steal. In accordance with time honored tradition, jobs are |
17 | /// arranged in a deque, so that thieves can take from the top of the |
18 | /// deque while the main worker manages the bottom of the deque. This |
19 | /// deque is managed by the `thread_pool` module. |
20 | pub(super) trait Job { |
21 | /// Unsafe: this may be called from a different thread than the one |
22 | /// which scheduled the job, so the implementer must ensure the |
23 | /// appropriate traits are met, whether `Send`, `Sync`, or both. |
24 | unsafe fn execute(this: *const ()); |
25 | } |
26 | |
27 | /// Effectively a Job trait object. Each JobRef **must** be executed |
28 | /// exactly once, or else data may leak. |
29 | /// |
30 | /// Internally, we store the job's data in a `*const ()` pointer. The |
31 | /// true type is something like `*const StackJob<...>`, but we hide |
32 | /// it. We also carry the "execute fn" from the `Job` trait. |
33 | pub(super) struct JobRef { |
34 | pointer: *const (), |
35 | execute_fn: unsafe fn(*const ()), |
36 | } |
37 | |
38 | unsafe impl Send for JobRef {} |
39 | unsafe impl Sync for JobRef {} |
40 | |
41 | impl JobRef { |
42 | /// Unsafe: caller asserts that `data` will remain valid until the |
43 | /// job is executed. |
44 | pub(super) unsafe fn new<T>(data: *const T) -> JobRef |
45 | where |
46 | T: Job, |
47 | { |
48 | // erase types: |
49 | JobRef { |
50 | pointer: data as *const (), |
51 | execute_fn: <T as Job>::execute, |
52 | } |
53 | } |
54 | |
55 | /// Returns an opaque handle that can be saved and compared, |
56 | /// without making `JobRef` itself `Copy + Eq`. |
57 | #[inline ] |
58 | pub(super) fn id(&self) -> impl Eq { |
59 | (self.pointer, self.execute_fn) |
60 | } |
61 | |
62 | #[inline ] |
63 | pub(super) unsafe fn execute(self) { |
64 | (self.execute_fn)(self.pointer) |
65 | } |
66 | } |
67 | |
68 | /// A job that will be owned by a stack slot. This means that when it |
69 | /// executes it need not free any heap data, the cleanup occurs when |
70 | /// the stack frame is later popped. The function parameter indicates |
71 | /// `true` if the job was stolen -- executed on a different thread. |
72 | pub(super) struct StackJob<L, F, R> |
73 | where |
74 | L: Latch + Sync, |
75 | F: FnOnce(bool) -> R + Send, |
76 | R: Send, |
77 | { |
78 | pub(super) latch: L, |
79 | func: UnsafeCell<Option<F>>, |
80 | result: UnsafeCell<JobResult<R>>, |
81 | } |
82 | |
83 | impl<L, F, R> StackJob<L, F, R> |
84 | where |
85 | L: Latch + Sync, |
86 | F: FnOnce(bool) -> R + Send, |
87 | R: Send, |
88 | { |
89 | pub(super) fn new(func: F, latch: L) -> StackJob<L, F, R> { |
90 | StackJob { |
91 | latch, |
92 | func: UnsafeCell::new(Some(func)), |
93 | result: UnsafeCell::new(JobResult::None), |
94 | } |
95 | } |
96 | |
97 | pub(super) unsafe fn as_job_ref(&self) -> JobRef { |
98 | JobRef::new(self) |
99 | } |
100 | |
101 | pub(super) unsafe fn run_inline(self, stolen: bool) -> R { |
102 | self.func.into_inner().unwrap()(stolen) |
103 | } |
104 | |
105 | pub(super) unsafe fn into_result(self) -> R { |
106 | self.result.into_inner().into_return_value() |
107 | } |
108 | } |
109 | |
110 | impl<L, F, R> Job for StackJob<L, F, R> |
111 | where |
112 | L: Latch + Sync, |
113 | F: FnOnce(bool) -> R + Send, |
114 | R: Send, |
115 | { |
116 | unsafe fn execute(this: *const ()) { |
117 | let this = &*(this as *const Self); |
118 | let abort = unwind::AbortIfPanic; |
119 | let func = (*this.func.get()).take().unwrap(); |
120 | (*this.result.get()) = JobResult::call(func); |
121 | Latch::set(&this.latch); |
122 | mem::forget(abort); |
123 | } |
124 | } |
125 | |
126 | /// Represents a job stored in the heap. Used to implement |
127 | /// `scope`. Unlike `StackJob`, when executed, `HeapJob` simply |
128 | /// invokes a closure, which then triggers the appropriate logic to |
129 | /// signal that the job executed. |
130 | /// |
131 | /// (Probably `StackJob` should be refactored in a similar fashion.) |
132 | pub(super) struct HeapJob<BODY> |
133 | where |
134 | BODY: FnOnce() + Send, |
135 | { |
136 | job: BODY, |
137 | } |
138 | |
139 | impl<BODY> HeapJob<BODY> |
140 | where |
141 | BODY: FnOnce() + Send, |
142 | { |
143 | pub(super) fn new(job: BODY) -> Box<Self> { |
144 | Box::new(HeapJob { job }) |
145 | } |
146 | |
147 | /// Creates a `JobRef` from this job -- note that this hides all |
148 | /// lifetimes, so it is up to you to ensure that this JobRef |
149 | /// doesn't outlive any data that it closes over. |
150 | pub(super) unsafe fn into_job_ref(self: Box<Self>) -> JobRef { |
151 | JobRef::new(Box::into_raw(self)) |
152 | } |
153 | |
154 | /// Creates a static `JobRef` from this job. |
155 | pub(super) fn into_static_job_ref(self: Box<Self>) -> JobRef |
156 | where |
157 | BODY: 'static, |
158 | { |
159 | unsafe { self.into_job_ref() } |
160 | } |
161 | } |
162 | |
163 | impl<BODY> Job for HeapJob<BODY> |
164 | where |
165 | BODY: FnOnce() + Send, |
166 | { |
167 | unsafe fn execute(this: *const ()) { |
168 | let this = Box::from_raw(this as *mut Self); |
169 | (this.job)(); |
170 | } |
171 | } |
172 | |
173 | /// Represents a job stored in an `Arc` -- like `HeapJob`, but may |
174 | /// be turned into multiple `JobRef`s and called multiple times. |
175 | pub(super) struct ArcJob<BODY> |
176 | where |
177 | BODY: Fn() + Send + Sync, |
178 | { |
179 | job: BODY, |
180 | } |
181 | |
182 | impl<BODY> ArcJob<BODY> |
183 | where |
184 | BODY: Fn() + Send + Sync, |
185 | { |
186 | pub(super) fn new(job: BODY) -> Arc<Self> { |
187 | Arc::new(ArcJob { job }) |
188 | } |
189 | |
190 | /// Creates a `JobRef` from this job -- note that this hides all |
191 | /// lifetimes, so it is up to you to ensure that this JobRef |
192 | /// doesn't outlive any data that it closes over. |
193 | pub(super) unsafe fn as_job_ref(this: &Arc<Self>) -> JobRef { |
194 | JobRef::new(Arc::into_raw(Arc::clone(this))) |
195 | } |
196 | |
197 | /// Creates a static `JobRef` from this job. |
198 | pub(super) fn as_static_job_ref(this: &Arc<Self>) -> JobRef |
199 | where |
200 | BODY: 'static, |
201 | { |
202 | unsafe { Self::as_job_ref(this) } |
203 | } |
204 | } |
205 | |
206 | impl<BODY> Job for ArcJob<BODY> |
207 | where |
208 | BODY: Fn() + Send + Sync, |
209 | { |
210 | unsafe fn execute(this: *const ()) { |
211 | let this = Arc::from_raw(this as *mut Self); |
212 | (this.job)(); |
213 | } |
214 | } |
215 | |
216 | impl<T> JobResult<T> { |
217 | fn call(func: impl FnOnce(bool) -> T) -> Self { |
218 | match unwind::halt_unwinding(|| func(true)) { |
219 | Ok(x) => JobResult::Ok(x), |
220 | Err(x) => JobResult::Panic(x), |
221 | } |
222 | } |
223 | |
224 | /// Convert the `JobResult` for a job that has finished (and hence |
225 | /// its JobResult is populated) into its return value. |
226 | /// |
227 | /// NB. This will panic if the job panicked. |
228 | pub(super) fn into_return_value(self) -> T { |
229 | match self { |
230 | JobResult::None => unreachable!(), |
231 | JobResult::Ok(x) => x, |
232 | JobResult::Panic(x) => unwind::resume_unwinding(x), |
233 | } |
234 | } |
235 | } |
236 | |
237 | /// Indirect queue to provide FIFO job priority. |
238 | pub(super) struct JobFifo { |
239 | inner: Injector<JobRef>, |
240 | } |
241 | |
242 | impl JobFifo { |
243 | pub(super) fn new() -> Self { |
244 | JobFifo { |
245 | inner: Injector::new(), |
246 | } |
247 | } |
248 | |
249 | pub(super) unsafe fn push(&self, job_ref: JobRef) -> JobRef { |
250 | // A little indirection ensures that spawns are always prioritized in FIFO order. The |
251 | // jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front |
252 | // (FIFO), but either way they will end up popping from the front of this queue. |
253 | self.inner.push(job_ref); |
254 | JobRef::new(self) |
255 | } |
256 | } |
257 | |
258 | impl Job for JobFifo { |
259 | unsafe fn execute(this: *const ()) { |
260 | // We "execute" a queue by executing its first job, FIFO. |
261 | let this = &*(this as *const Self); |
262 | loop { |
263 | match this.inner.steal() { |
264 | Steal::Success(job_ref) => break job_ref.execute(), |
265 | Steal::Empty => panic!("FIFO is empty" ), |
266 | Steal::Retry => {} |
267 | } |
268 | } |
269 | } |
270 | } |
271 | |