1use crate::latch::Latch;
2use crate::tlv;
3use crate::tlv::Tlv;
4use crate::unwind;
5use crossbeam_deque::{Injector, Steal};
6use std::any::Any;
7use std::cell::UnsafeCell;
8use std::mem;
9use std::sync::Arc;
10
11pub(super) enum JobResult<T> {
12 None,
13 Ok(T),
14 Panic(Box<dyn Any + Send>),
15}
16
17/// A `Job` is used to advertise work for other threads that they may
18/// want to steal. In accordance with time honored tradition, jobs are
19/// arranged in a deque, so that thieves can take from the top of the
20/// deque while the main worker manages the bottom of the deque. This
21/// deque is managed by the `thread_pool` module.
22pub(super) trait Job {
23 /// Unsafe: this may be called from a different thread than the one
24 /// which scheduled the job, so the implementer must ensure the
25 /// appropriate traits are met, whether `Send`, `Sync`, or both.
26 unsafe fn execute(this: *const ());
27}
28
29/// Effectively a Job trait object. Each JobRef **must** be executed
30/// exactly once, or else data may leak.
31///
32/// Internally, we store the job's data in a `*const ()` pointer. The
33/// true type is something like `*const StackJob<...>`, but we hide
34/// it. We also carry the "execute fn" from the `Job` trait.
35pub(super) struct JobRef {
36 pointer: *const (),
37 execute_fn: unsafe fn(*const ()),
38}
39
40unsafe impl Send for JobRef {}
41unsafe impl Sync for JobRef {}
42
43impl JobRef {
44 /// Unsafe: caller asserts that `data` will remain valid until the
45 /// job is executed.
46 pub(super) unsafe fn new<T>(data: *const T) -> JobRef
47 where
48 T: Job,
49 {
50 // erase types:
51 JobRef {
52 pointer: data as *const (),
53 execute_fn: <T as Job>::execute,
54 }
55 }
56
57 /// Returns an opaque handle that can be saved and compared,
58 /// without making `JobRef` itself `Copy + Eq`.
59 #[inline]
60 pub(super) fn id(&self) -> impl Eq {
61 (self.pointer, self.execute_fn)
62 }
63
64 #[inline]
65 pub(super) unsafe fn execute(self) {
66 (self.execute_fn)(self.pointer)
67 }
68}
69
70/// A job that will be owned by a stack slot. This means that when it
71/// executes it need not free any heap data, the cleanup occurs when
72/// the stack frame is later popped. The function parameter indicates
73/// `true` if the job was stolen -- executed on a different thread.
74pub(super) struct StackJob<L, F, R>
75where
76 L: Latch + Sync,
77 F: FnOnce(bool) -> R + Send,
78 R: Send,
79{
80 pub(super) latch: L,
81 func: UnsafeCell<Option<F>>,
82 result: UnsafeCell<JobResult<R>>,
83 tlv: Tlv,
84}
85
86impl<L, F, R> StackJob<L, F, R>
87where
88 L: Latch + Sync,
89 F: FnOnce(bool) -> R + Send,
90 R: Send,
91{
92 pub(super) fn new(tlv: Tlv, func: F, latch: L) -> StackJob<L, F, R> {
93 StackJob {
94 latch,
95 func: UnsafeCell::new(Some(func)),
96 result: UnsafeCell::new(JobResult::None),
97 tlv,
98 }
99 }
100
101 pub(super) unsafe fn as_job_ref(&self) -> JobRef {
102 JobRef::new(self)
103 }
104
105 pub(super) unsafe fn run_inline(self, stolen: bool) -> R {
106 self.func.into_inner().unwrap()(stolen)
107 }
108
109 pub(super) unsafe fn into_result(self) -> R {
110 self.result.into_inner().into_return_value()
111 }
112}
113
114impl<L, F, R> Job for StackJob<L, F, R>
115where
116 L: Latch + Sync,
117 F: FnOnce(bool) -> R + Send,
118 R: Send,
119{
120 unsafe fn execute(this: *const ()) {
121 let this: &StackJob = &*(this as *const Self);
122 tlv::set(this.tlv);
123 let abort: AbortIfPanic = unwind::AbortIfPanic;
124 let func: F = (*this.func.get()).take().unwrap();
125 (*this.result.get()) = JobResult::call(func);
126 Latch::set(&this.latch);
127 mem::forget(abort);
128 }
129}
130
131/// Represents a job stored in the heap. Used to implement
132/// `scope`. Unlike `StackJob`, when executed, `HeapJob` simply
133/// invokes a closure, which then triggers the appropriate logic to
134/// signal that the job executed.
135///
136/// (Probably `StackJob` should be refactored in a similar fashion.)
137pub(super) struct HeapJob<BODY>
138where
139 BODY: FnOnce() + Send,
140{
141 job: BODY,
142 tlv: Tlv,
143}
144
145impl<BODY> HeapJob<BODY>
146where
147 BODY: FnOnce() + Send,
148{
149 pub(super) fn new(tlv: Tlv, job: BODY) -> Box<Self> {
150 Box::new(HeapJob { job, tlv })
151 }
152
153 /// Creates a `JobRef` from this job -- note that this hides all
154 /// lifetimes, so it is up to you to ensure that this JobRef
155 /// doesn't outlive any data that it closes over.
156 pub(super) unsafe fn into_job_ref(self: Box<Self>) -> JobRef {
157 JobRef::new(data:Box::into_raw(self))
158 }
159
160 /// Creates a static `JobRef` from this job.
161 pub(super) fn into_static_job_ref(self: Box<Self>) -> JobRef
162 where
163 BODY: 'static,
164 {
165 unsafe { self.into_job_ref() }
166 }
167}
168
169impl<BODY> Job for HeapJob<BODY>
170where
171 BODY: FnOnce() + Send,
172{
173 unsafe fn execute(this: *const ()) {
174 let this: Box> = Box::from_raw(this as *mut Self);
175 tlv::set(this.tlv);
176 (this.job)();
177 }
178}
179
180/// Represents a job stored in an `Arc` -- like `HeapJob`, but may
181/// be turned into multiple `JobRef`s and called multiple times.
182pub(super) struct ArcJob<BODY>
183where
184 BODY: Fn() + Send + Sync,
185{
186 job: BODY,
187}
188
189impl<BODY> ArcJob<BODY>
190where
191 BODY: Fn() + Send + Sync,
192{
193 pub(super) fn new(job: BODY) -> Arc<Self> {
194 Arc::new(data:ArcJob { job })
195 }
196
197 /// Creates a `JobRef` from this job -- note that this hides all
198 /// lifetimes, so it is up to you to ensure that this JobRef
199 /// doesn't outlive any data that it closes over.
200 pub(super) unsafe fn as_job_ref(this: &Arc<Self>) -> JobRef {
201 JobRef::new(data:Arc::into_raw(this:Arc::clone(self:this)))
202 }
203
204 /// Creates a static `JobRef` from this job.
205 pub(super) fn as_static_job_ref(this: &Arc<Self>) -> JobRef
206 where
207 BODY: 'static,
208 {
209 unsafe { Self::as_job_ref(this) }
210 }
211}
212
213impl<BODY> Job for ArcJob<BODY>
214where
215 BODY: Fn() + Send + Sync,
216{
217 unsafe fn execute(this: *const ()) {
218 let this: Arc> = Arc::from_raw(ptr:this as *mut Self);
219 (this.job)();
220 }
221}
222
223impl<T> JobResult<T> {
224 fn call(func: impl FnOnce(bool) -> T) -> Self {
225 match unwind::halt_unwinding(|| func(true)) {
226 Ok(x: T) => JobResult::Ok(x),
227 Err(x: Box) => JobResult::Panic(x),
228 }
229 }
230
231 /// Convert the `JobResult` for a job that has finished (and hence
232 /// its JobResult is populated) into its return value.
233 ///
234 /// NB. This will panic if the job panicked.
235 pub(super) fn into_return_value(self) -> T {
236 match self {
237 JobResult::None => unreachable!(),
238 JobResult::Ok(x: T) => x,
239 JobResult::Panic(x: Box) => unwind::resume_unwinding(payload:x),
240 }
241 }
242}
243
244/// Indirect queue to provide FIFO job priority.
245pub(super) struct JobFifo {
246 inner: Injector<JobRef>,
247}
248
249impl JobFifo {
250 pub(super) fn new() -> Self {
251 JobFifo {
252 inner: Injector::new(),
253 }
254 }
255
256 pub(super) unsafe fn push(&self, job_ref: JobRef) -> JobRef {
257 // A little indirection ensures that spawns are always prioritized in FIFO order. The
258 // jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front
259 // (FIFO), but either way they will end up popping from the front of this queue.
260 self.inner.push(task:job_ref);
261 JobRef::new(self)
262 }
263}
264
265impl Job for JobFifo {
266 unsafe fn execute(this: *const ()) {
267 // We "execute" a queue by executing its first job, FIFO.
268 let this: &JobFifo = &*(this as *const Self);
269 loop {
270 match this.inner.steal() {
271 Steal::Success(job_ref: JobRef) => break job_ref.execute(),
272 Steal::Empty => panic!("FIFO is empty"),
273 Steal::Retry => {}
274 }
275 }
276 }
277}
278