| 1 | use crate::latch::Latch; |
| 2 | use crate::tlv; |
| 3 | use crate::tlv::Tlv; |
| 4 | use crate::unwind; |
| 5 | use crossbeam_deque::{Injector, Steal}; |
| 6 | use std::any::Any; |
| 7 | use std::cell::UnsafeCell; |
| 8 | use std::mem; |
| 9 | use std::sync::Arc; |
| 10 | |
| 11 | pub(super) enum JobResult<T> { |
| 12 | None, |
| 13 | Ok(T), |
| 14 | Panic(Box<dyn Any + Send>), |
| 15 | } |
| 16 | |
| 17 | /// A `Job` is used to advertise work for other threads that they may |
| 18 | /// want to steal. In accordance with time honored tradition, jobs are |
| 19 | /// arranged in a deque, so that thieves can take from the top of the |
| 20 | /// deque while the main worker manages the bottom of the deque. This |
| 21 | /// deque is managed by the `thread_pool` module. |
| 22 | pub(super) trait Job { |
| 23 | /// Unsafe: this may be called from a different thread than the one |
| 24 | /// which scheduled the job, so the implementer must ensure the |
| 25 | /// appropriate traits are met, whether `Send`, `Sync`, or both. |
| 26 | unsafe fn execute(this: *const ()); |
| 27 | } |
| 28 | |
| 29 | /// Effectively a Job trait object. Each JobRef **must** be executed |
| 30 | /// exactly once, or else data may leak. |
| 31 | /// |
| 32 | /// Internally, we store the job's data in a `*const ()` pointer. The |
| 33 | /// true type is something like `*const StackJob<...>`, but we hide |
| 34 | /// it. We also carry the "execute fn" from the `Job` trait. |
| 35 | pub(super) struct JobRef { |
| 36 | pointer: *const (), |
| 37 | execute_fn: unsafe fn(*const ()), |
| 38 | } |
| 39 | |
| 40 | unsafe impl Send for JobRef {} |
| 41 | unsafe impl Sync for JobRef {} |
| 42 | |
| 43 | impl JobRef { |
| 44 | /// Unsafe: caller asserts that `data` will remain valid until the |
| 45 | /// job is executed. |
| 46 | pub(super) unsafe fn new<T>(data: *const T) -> JobRef |
| 47 | where |
| 48 | T: Job, |
| 49 | { |
| 50 | // erase types: |
| 51 | JobRef { |
| 52 | pointer: data as *const (), |
| 53 | execute_fn: <T as Job>::execute, |
| 54 | } |
| 55 | } |
| 56 | |
| 57 | /// Returns an opaque handle that can be saved and compared, |
| 58 | /// without making `JobRef` itself `Copy + Eq`. |
| 59 | #[inline ] |
| 60 | pub(super) fn id(&self) -> impl Eq { |
| 61 | (self.pointer, self.execute_fn) |
| 62 | } |
| 63 | |
| 64 | #[inline ] |
| 65 | pub(super) unsafe fn execute(self) { |
| 66 | (self.execute_fn)(self.pointer) |
| 67 | } |
| 68 | } |
| 69 | |
| 70 | /// A job that will be owned by a stack slot. This means that when it |
| 71 | /// executes it need not free any heap data, the cleanup occurs when |
| 72 | /// the stack frame is later popped. The function parameter indicates |
| 73 | /// `true` if the job was stolen -- executed on a different thread. |
| 74 | pub(super) struct StackJob<L, F, R> |
| 75 | where |
| 76 | L: Latch + Sync, |
| 77 | F: FnOnce(bool) -> R + Send, |
| 78 | R: Send, |
| 79 | { |
| 80 | pub(super) latch: L, |
| 81 | func: UnsafeCell<Option<F>>, |
| 82 | result: UnsafeCell<JobResult<R>>, |
| 83 | tlv: Tlv, |
| 84 | } |
| 85 | |
| 86 | impl<L, F, R> StackJob<L, F, R> |
| 87 | where |
| 88 | L: Latch + Sync, |
| 89 | F: FnOnce(bool) -> R + Send, |
| 90 | R: Send, |
| 91 | { |
| 92 | pub(super) fn new(tlv: Tlv, func: F, latch: L) -> StackJob<L, F, R> { |
| 93 | StackJob { |
| 94 | latch, |
| 95 | func: UnsafeCell::new(Some(func)), |
| 96 | result: UnsafeCell::new(JobResult::None), |
| 97 | tlv, |
| 98 | } |
| 99 | } |
| 100 | |
| 101 | pub(super) unsafe fn as_job_ref(&self) -> JobRef { |
| 102 | JobRef::new(self) |
| 103 | } |
| 104 | |
| 105 | pub(super) unsafe fn run_inline(self, stolen: bool) -> R { |
| 106 | self.func.into_inner().unwrap()(stolen) |
| 107 | } |
| 108 | |
| 109 | pub(super) unsafe fn into_result(self) -> R { |
| 110 | self.result.into_inner().into_return_value() |
| 111 | } |
| 112 | } |
| 113 | |
| 114 | impl<L, F, R> Job for StackJob<L, F, R> |
| 115 | where |
| 116 | L: Latch + Sync, |
| 117 | F: FnOnce(bool) -> R + Send, |
| 118 | R: Send, |
| 119 | { |
| 120 | unsafe fn execute(this: *const ()) { |
| 121 | let this: &StackJob = &*(this as *const Self); |
| 122 | tlv::set(this.tlv); |
| 123 | let abort: AbortIfPanic = unwind::AbortIfPanic; |
| 124 | let func: F = (*this.func.get()).take().unwrap(); |
| 125 | (*this.result.get()) = JobResult::call(func); |
| 126 | Latch::set(&this.latch); |
| 127 | mem::forget(abort); |
| 128 | } |
| 129 | } |
| 130 | |
| 131 | /// Represents a job stored in the heap. Used to implement |
| 132 | /// `scope`. Unlike `StackJob`, when executed, `HeapJob` simply |
| 133 | /// invokes a closure, which then triggers the appropriate logic to |
| 134 | /// signal that the job executed. |
| 135 | /// |
| 136 | /// (Probably `StackJob` should be refactored in a similar fashion.) |
| 137 | pub(super) struct HeapJob<BODY> |
| 138 | where |
| 139 | BODY: FnOnce() + Send, |
| 140 | { |
| 141 | job: BODY, |
| 142 | tlv: Tlv, |
| 143 | } |
| 144 | |
| 145 | impl<BODY> HeapJob<BODY> |
| 146 | where |
| 147 | BODY: FnOnce() + Send, |
| 148 | { |
| 149 | pub(super) fn new(tlv: Tlv, job: BODY) -> Box<Self> { |
| 150 | Box::new(HeapJob { job, tlv }) |
| 151 | } |
| 152 | |
| 153 | /// Creates a `JobRef` from this job -- note that this hides all |
| 154 | /// lifetimes, so it is up to you to ensure that this JobRef |
| 155 | /// doesn't outlive any data that it closes over. |
| 156 | pub(super) unsafe fn into_job_ref(self: Box<Self>) -> JobRef { |
| 157 | JobRef::new(data:Box::into_raw(self)) |
| 158 | } |
| 159 | |
| 160 | /// Creates a static `JobRef` from this job. |
| 161 | pub(super) fn into_static_job_ref(self: Box<Self>) -> JobRef |
| 162 | where |
| 163 | BODY: 'static, |
| 164 | { |
| 165 | unsafe { self.into_job_ref() } |
| 166 | } |
| 167 | } |
| 168 | |
| 169 | impl<BODY> Job for HeapJob<BODY> |
| 170 | where |
| 171 | BODY: FnOnce() + Send, |
| 172 | { |
| 173 | unsafe fn execute(this: *const ()) { |
| 174 | let this: Box> = Box::from_raw(this as *mut Self); |
| 175 | tlv::set(this.tlv); |
| 176 | (this.job)(); |
| 177 | } |
| 178 | } |
| 179 | |
| 180 | /// Represents a job stored in an `Arc` -- like `HeapJob`, but may |
| 181 | /// be turned into multiple `JobRef`s and called multiple times. |
| 182 | pub(super) struct ArcJob<BODY> |
| 183 | where |
| 184 | BODY: Fn() + Send + Sync, |
| 185 | { |
| 186 | job: BODY, |
| 187 | } |
| 188 | |
| 189 | impl<BODY> ArcJob<BODY> |
| 190 | where |
| 191 | BODY: Fn() + Send + Sync, |
| 192 | { |
| 193 | pub(super) fn new(job: BODY) -> Arc<Self> { |
| 194 | Arc::new(data:ArcJob { job }) |
| 195 | } |
| 196 | |
| 197 | /// Creates a `JobRef` from this job -- note that this hides all |
| 198 | /// lifetimes, so it is up to you to ensure that this JobRef |
| 199 | /// doesn't outlive any data that it closes over. |
| 200 | pub(super) unsafe fn as_job_ref(this: &Arc<Self>) -> JobRef { |
| 201 | JobRef::new(data:Arc::into_raw(this:Arc::clone(self:this))) |
| 202 | } |
| 203 | |
| 204 | /// Creates a static `JobRef` from this job. |
| 205 | pub(super) fn as_static_job_ref(this: &Arc<Self>) -> JobRef |
| 206 | where |
| 207 | BODY: 'static, |
| 208 | { |
| 209 | unsafe { Self::as_job_ref(this) } |
| 210 | } |
| 211 | } |
| 212 | |
| 213 | impl<BODY> Job for ArcJob<BODY> |
| 214 | where |
| 215 | BODY: Fn() + Send + Sync, |
| 216 | { |
| 217 | unsafe fn execute(this: *const ()) { |
| 218 | let this: Arc> = Arc::from_raw(ptr:this as *mut Self); |
| 219 | (this.job)(); |
| 220 | } |
| 221 | } |
| 222 | |
| 223 | impl<T> JobResult<T> { |
| 224 | fn call(func: impl FnOnce(bool) -> T) -> Self { |
| 225 | match unwind::halt_unwinding(|| func(true)) { |
| 226 | Ok(x: T) => JobResult::Ok(x), |
| 227 | Err(x: Box) => JobResult::Panic(x), |
| 228 | } |
| 229 | } |
| 230 | |
| 231 | /// Convert the `JobResult` for a job that has finished (and hence |
| 232 | /// its JobResult is populated) into its return value. |
| 233 | /// |
| 234 | /// NB. This will panic if the job panicked. |
| 235 | pub(super) fn into_return_value(self) -> T { |
| 236 | match self { |
| 237 | JobResult::None => unreachable!(), |
| 238 | JobResult::Ok(x: T) => x, |
| 239 | JobResult::Panic(x: Box) => unwind::resume_unwinding(payload:x), |
| 240 | } |
| 241 | } |
| 242 | } |
| 243 | |
| 244 | /// Indirect queue to provide FIFO job priority. |
| 245 | pub(super) struct JobFifo { |
| 246 | inner: Injector<JobRef>, |
| 247 | } |
| 248 | |
| 249 | impl JobFifo { |
| 250 | pub(super) fn new() -> Self { |
| 251 | JobFifo { |
| 252 | inner: Injector::new(), |
| 253 | } |
| 254 | } |
| 255 | |
| 256 | pub(super) unsafe fn push(&self, job_ref: JobRef) -> JobRef { |
| 257 | // A little indirection ensures that spawns are always prioritized in FIFO order. The |
| 258 | // jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front |
| 259 | // (FIFO), but either way they will end up popping from the front of this queue. |
| 260 | self.inner.push(task:job_ref); |
| 261 | JobRef::new(self) |
| 262 | } |
| 263 | } |
| 264 | |
| 265 | impl Job for JobFifo { |
| 266 | unsafe fn execute(this: *const ()) { |
| 267 | // We "execute" a queue by executing its first job, FIFO. |
| 268 | let this: &JobFifo = &*(this as *const Self); |
| 269 | loop { |
| 270 | match this.inner.steal() { |
| 271 | Steal::Success(job_ref: JobRef) => break job_ref.execute(), |
| 272 | Steal::Empty => panic!("FIFO is empty" ), |
| 273 | Steal::Retry => {} |
| 274 | } |
| 275 | } |
| 276 | } |
| 277 | } |
| 278 | |