| 1 | use std::marker::PhantomData; |
| 2 | use std::ops::Deref; |
| 3 | use std::sync::atomic::{AtomicUsize, Ordering}; |
| 4 | use std::sync::{Arc, Condvar, Mutex}; |
| 5 | use std::usize; |
| 6 | |
| 7 | use crate::registry::{Registry, WorkerThread}; |
| 8 | |
| 9 | /// We define various kinds of latches, which are all a primitive signaling |
| 10 | /// mechanism. A latch starts as false. Eventually someone calls `set()` and |
| 11 | /// it becomes true. You can test if it has been set by calling `probe()`. |
| 12 | /// |
| 13 | /// Some kinds of latches, but not all, support a `wait()` operation |
| 14 | /// that will wait until the latch is set, blocking efficiently. That |
| 15 | /// is not part of the trait since it is not possibly to do with all |
| 16 | /// latches. |
| 17 | /// |
| 18 | /// The intention is that `set()` is called once, but `probe()` may be |
| 19 | /// called any number of times. Once `probe()` returns true, the memory |
| 20 | /// effects that occurred before `set()` become visible. |
| 21 | /// |
| 22 | /// It'd probably be better to refactor the API into two paired types, |
| 23 | /// but that's a bit of work, and this is not a public API. |
| 24 | /// |
| 25 | /// ## Memory ordering |
| 26 | /// |
| 27 | /// Latches need to guarantee two things: |
| 28 | /// |
| 29 | /// - Once `probe()` returns true, all memory effects from the `set()` |
| 30 | /// are visible (in other words, the set should synchronize-with |
| 31 | /// the probe). |
| 32 | /// - Once `set()` occurs, the next `probe()` *will* observe it. This |
| 33 | /// typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep |
| 34 | /// README](/src/sleep/README.md#tickle-then-get-sleepy) for details. |
| 35 | pub(super) trait Latch { |
| 36 | /// Set the latch, signalling others. |
| 37 | /// |
| 38 | /// # WARNING |
| 39 | /// |
| 40 | /// Setting a latch triggers other threads to wake up and (in some |
| 41 | /// cases) complete. This may, in turn, cause memory to be |
| 42 | /// deallocated and so forth. One must be very careful about this, |
| 43 | /// and it's typically better to read all the fields you will need |
| 44 | /// to access *before* a latch is set! |
| 45 | /// |
| 46 | /// This function operates on `*const Self` instead of `&self` to allow it |
| 47 | /// to become dangling during this call. The caller must ensure that the |
| 48 | /// pointer is valid upon entry, and not invalidated during the call by any |
| 49 | /// actions other than `set` itself. |
| 50 | unsafe fn set(this: *const Self); |
| 51 | } |
| 52 | |
| 53 | pub(super) trait AsCoreLatch { |
| 54 | fn as_core_latch(&self) -> &CoreLatch; |
| 55 | } |
| 56 | |
| 57 | /// Latch is not set, owning thread is awake |
| 58 | const UNSET: usize = 0; |
| 59 | |
| 60 | /// Latch is not set, owning thread is going to sleep on this latch |
| 61 | /// (but has not yet fallen asleep). |
| 62 | const SLEEPY: usize = 1; |
| 63 | |
| 64 | /// Latch is not set, owning thread is asleep on this latch and |
| 65 | /// must be awoken. |
| 66 | const SLEEPING: usize = 2; |
| 67 | |
| 68 | /// Latch is set. |
| 69 | const SET: usize = 3; |
| 70 | |
| 71 | /// Spin latches are the simplest, most efficient kind, but they do |
| 72 | /// not support a `wait()` operation. They just have a boolean flag |
| 73 | /// that becomes true when `set()` is called. |
| 74 | #[derive (Debug)] |
| 75 | pub(super) struct CoreLatch { |
| 76 | state: AtomicUsize, |
| 77 | } |
| 78 | |
| 79 | impl CoreLatch { |
| 80 | #[inline ] |
| 81 | fn new() -> Self { |
| 82 | Self { |
| 83 | state: AtomicUsize::new(0), |
| 84 | } |
| 85 | } |
| 86 | |
| 87 | /// Returns the address of this core latch as an integer. Used |
| 88 | /// for logging. |
| 89 | #[inline ] |
| 90 | pub(super) fn addr(&self) -> usize { |
| 91 | self as *const CoreLatch as usize |
| 92 | } |
| 93 | |
| 94 | /// Invoked by owning thread as it prepares to sleep. Returns true |
| 95 | /// if the owning thread may proceed to fall asleep, false if the |
| 96 | /// latch was set in the meantime. |
| 97 | #[inline ] |
| 98 | pub(super) fn get_sleepy(&self) -> bool { |
| 99 | self.state |
| 100 | .compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed) |
| 101 | .is_ok() |
| 102 | } |
| 103 | |
| 104 | /// Invoked by owning thread as it falls asleep sleep. Returns |
| 105 | /// true if the owning thread should block, or false if the latch |
| 106 | /// was set in the meantime. |
| 107 | #[inline ] |
| 108 | pub(super) fn fall_asleep(&self) -> bool { |
| 109 | self.state |
| 110 | .compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed) |
| 111 | .is_ok() |
| 112 | } |
| 113 | |
| 114 | /// Invoked by owning thread as it falls asleep sleep. Returns |
| 115 | /// true if the owning thread should block, or false if the latch |
| 116 | /// was set in the meantime. |
| 117 | #[inline ] |
| 118 | pub(super) fn wake_up(&self) { |
| 119 | if !self.probe() { |
| 120 | let _ = |
| 121 | self.state |
| 122 | .compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed); |
| 123 | } |
| 124 | } |
| 125 | |
| 126 | /// Set the latch. If this returns true, the owning thread was sleeping |
| 127 | /// and must be awoken. |
| 128 | /// |
| 129 | /// This is private because, typically, setting a latch involves |
| 130 | /// doing some wakeups; those are encapsulated in the surrounding |
| 131 | /// latch code. |
| 132 | #[inline ] |
| 133 | unsafe fn set(this: *const Self) -> bool { |
| 134 | let old_state = (*this).state.swap(SET, Ordering::AcqRel); |
| 135 | old_state == SLEEPING |
| 136 | } |
| 137 | |
| 138 | /// Test if this latch has been set. |
| 139 | #[inline ] |
| 140 | pub(super) fn probe(&self) -> bool { |
| 141 | self.state.load(Ordering::Acquire) == SET |
| 142 | } |
| 143 | } |
| 144 | |
| 145 | /// Spin latches are the simplest, most efficient kind, but they do |
| 146 | /// not support a `wait()` operation. They just have a boolean flag |
| 147 | /// that becomes true when `set()` is called. |
| 148 | pub(super) struct SpinLatch<'r> { |
| 149 | core_latch: CoreLatch, |
| 150 | registry: &'r Arc<Registry>, |
| 151 | target_worker_index: usize, |
| 152 | cross: bool, |
| 153 | } |
| 154 | |
| 155 | impl<'r> SpinLatch<'r> { |
| 156 | /// Creates a new spin latch that is owned by `thread`. This means |
| 157 | /// that `thread` is the only thread that should be blocking on |
| 158 | /// this latch -- it also means that when the latch is set, we |
| 159 | /// will wake `thread` if it is sleeping. |
| 160 | #[inline ] |
| 161 | pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> { |
| 162 | SpinLatch { |
| 163 | core_latch: CoreLatch::new(), |
| 164 | registry: thread.registry(), |
| 165 | target_worker_index: thread.index(), |
| 166 | cross: false, |
| 167 | } |
| 168 | } |
| 169 | |
| 170 | /// Creates a new spin latch for cross-threadpool blocking. Notably, we |
| 171 | /// need to make sure the registry is kept alive after setting, so we can |
| 172 | /// safely call the notification. |
| 173 | #[inline ] |
| 174 | pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> { |
| 175 | SpinLatch { |
| 176 | cross: true, |
| 177 | ..SpinLatch::new(thread) |
| 178 | } |
| 179 | } |
| 180 | |
| 181 | #[inline ] |
| 182 | pub(super) fn probe(&self) -> bool { |
| 183 | self.core_latch.probe() |
| 184 | } |
| 185 | } |
| 186 | |
| 187 | impl<'r> AsCoreLatch for SpinLatch<'r> { |
| 188 | #[inline ] |
| 189 | fn as_core_latch(&self) -> &CoreLatch { |
| 190 | &self.core_latch |
| 191 | } |
| 192 | } |
| 193 | |
| 194 | impl<'r> Latch for SpinLatch<'r> { |
| 195 | #[inline ] |
| 196 | unsafe fn set(this: *const Self) { |
| 197 | let cross_registry; |
| 198 | |
| 199 | let registry: &Registry = if (*this).cross { |
| 200 | // Ensure the registry stays alive while we notify it. |
| 201 | // Otherwise, it would be possible that we set the spin |
| 202 | // latch and the other thread sees it and exits, causing |
| 203 | // the registry to be deallocated, all before we get a |
| 204 | // chance to invoke `registry.notify_worker_latch_is_set`. |
| 205 | cross_registry = Arc::clone((*this).registry); |
| 206 | &cross_registry |
| 207 | } else { |
| 208 | // If this is not a "cross-registry" spin-latch, then the |
| 209 | // thread which is performing `set` is itself ensuring |
| 210 | // that the registry stays alive. However, that doesn't |
| 211 | // include this *particular* `Arc` handle if the waiting |
| 212 | // thread then exits, so we must completely dereference it. |
| 213 | (*this).registry |
| 214 | }; |
| 215 | let target_worker_index = (*this).target_worker_index; |
| 216 | |
| 217 | // NOTE: Once we `set`, the target may proceed and invalidate `this`! |
| 218 | if CoreLatch::set(&(*this).core_latch) { |
| 219 | // Subtle: at this point, we can no longer read from |
| 220 | // `self`, because the thread owning this spin latch may |
| 221 | // have awoken and deallocated the latch. Therefore, we |
| 222 | // only use fields whose values we already read. |
| 223 | registry.notify_worker_latch_is_set(target_worker_index); |
| 224 | } |
| 225 | } |
| 226 | } |
| 227 | |
| 228 | /// A Latch starts as false and eventually becomes true. You can block |
| 229 | /// until it becomes true. |
| 230 | #[derive (Debug)] |
| 231 | pub(super) struct LockLatch { |
| 232 | m: Mutex<bool>, |
| 233 | v: Condvar, |
| 234 | } |
| 235 | |
| 236 | impl LockLatch { |
| 237 | #[inline ] |
| 238 | pub(super) fn new() -> LockLatch { |
| 239 | LockLatch { |
| 240 | m: Mutex::new(false), |
| 241 | v: Condvar::new(), |
| 242 | } |
| 243 | } |
| 244 | |
| 245 | /// Block until latch is set, then resets this lock latch so it can be reused again. |
| 246 | pub(super) fn wait_and_reset(&self) { |
| 247 | let mut guard = self.m.lock().unwrap(); |
| 248 | while !*guard { |
| 249 | guard = self.v.wait(guard).unwrap(); |
| 250 | } |
| 251 | *guard = false; |
| 252 | } |
| 253 | |
| 254 | /// Block until latch is set. |
| 255 | pub(super) fn wait(&self) { |
| 256 | let mut guard = self.m.lock().unwrap(); |
| 257 | while !*guard { |
| 258 | guard = self.v.wait(guard).unwrap(); |
| 259 | } |
| 260 | } |
| 261 | } |
| 262 | |
| 263 | impl Latch for LockLatch { |
| 264 | #[inline ] |
| 265 | unsafe fn set(this: *const Self) { |
| 266 | let mut guard: MutexGuard<'_, bool> = (*this).m.lock().unwrap(); |
| 267 | *guard = true; |
| 268 | (*this).v.notify_all(); |
| 269 | } |
| 270 | } |
| 271 | |
| 272 | /// Counting latches are used to implement scopes. They track a |
| 273 | /// counter. Unlike other latches, calling `set()` does not |
| 274 | /// necessarily make the latch be considered `set()`; instead, it just |
| 275 | /// decrements the counter. The latch is only "set" (in the sense that |
| 276 | /// `probe()` returns true) once the counter reaches zero. |
| 277 | /// |
| 278 | /// Note: like a `SpinLatch`, count laches are always associated with |
| 279 | /// some registry that is probing them, which must be tickled when |
| 280 | /// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a |
| 281 | /// reference to that registry. This is because in some cases the |
| 282 | /// registry owns the count-latch, and that would create a cycle. So a |
| 283 | /// `CountLatch` must be given a reference to its owning registry when |
| 284 | /// it is set. For this reason, it does not implement the `Latch` |
| 285 | /// trait (but it doesn't have to, as it is not used in those generic |
| 286 | /// contexts). |
| 287 | #[derive (Debug)] |
| 288 | pub(super) struct CountLatch { |
| 289 | core_latch: CoreLatch, |
| 290 | counter: AtomicUsize, |
| 291 | } |
| 292 | |
| 293 | impl CountLatch { |
| 294 | #[inline ] |
| 295 | pub(super) fn new() -> CountLatch { |
| 296 | Self::with_count(1) |
| 297 | } |
| 298 | |
| 299 | #[inline ] |
| 300 | pub(super) fn with_count(n: usize) -> CountLatch { |
| 301 | CountLatch { |
| 302 | core_latch: CoreLatch::new(), |
| 303 | counter: AtomicUsize::new(n), |
| 304 | } |
| 305 | } |
| 306 | |
| 307 | #[inline ] |
| 308 | pub(super) fn increment(&self) { |
| 309 | debug_assert!(!self.core_latch.probe()); |
| 310 | self.counter.fetch_add(1, Ordering::Relaxed); |
| 311 | } |
| 312 | |
| 313 | /// Decrements the latch counter by one. If this is the final |
| 314 | /// count, then the latch is **set**, and calls to `probe()` will |
| 315 | /// return true. Returns whether the latch was set. |
| 316 | #[inline ] |
| 317 | pub(super) unsafe fn set(this: *const Self) -> bool { |
| 318 | if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 { |
| 319 | CoreLatch::set(&(*this).core_latch); |
| 320 | true |
| 321 | } else { |
| 322 | false |
| 323 | } |
| 324 | } |
| 325 | |
| 326 | /// Decrements the latch counter by one and possibly set it. If |
| 327 | /// the latch is set, then the specific worker thread is tickled, |
| 328 | /// which should be the one that owns this latch. |
| 329 | #[inline ] |
| 330 | pub(super) unsafe fn set_and_tickle_one( |
| 331 | this: *const Self, |
| 332 | registry: &Registry, |
| 333 | target_worker_index: usize, |
| 334 | ) { |
| 335 | if Self::set(this) { |
| 336 | registry.notify_worker_latch_is_set(target_worker_index); |
| 337 | } |
| 338 | } |
| 339 | } |
| 340 | |
| 341 | impl AsCoreLatch for CountLatch { |
| 342 | #[inline ] |
| 343 | fn as_core_latch(&self) -> &CoreLatch { |
| 344 | &self.core_latch |
| 345 | } |
| 346 | } |
| 347 | |
| 348 | #[derive (Debug)] |
| 349 | pub(super) struct CountLockLatch { |
| 350 | lock_latch: LockLatch, |
| 351 | counter: AtomicUsize, |
| 352 | } |
| 353 | |
| 354 | impl CountLockLatch { |
| 355 | #[inline ] |
| 356 | pub(super) fn with_count(n: usize) -> CountLockLatch { |
| 357 | CountLockLatch { |
| 358 | lock_latch: LockLatch::new(), |
| 359 | counter: AtomicUsize::new(n), |
| 360 | } |
| 361 | } |
| 362 | |
| 363 | #[inline ] |
| 364 | pub(super) fn increment(&self) { |
| 365 | let old_counter: usize = self.counter.fetch_add(val:1, order:Ordering::Relaxed); |
| 366 | debug_assert!(old_counter != 0); |
| 367 | } |
| 368 | |
| 369 | pub(super) fn wait(&self) { |
| 370 | self.lock_latch.wait(); |
| 371 | } |
| 372 | } |
| 373 | |
| 374 | impl Latch for CountLockLatch { |
| 375 | #[inline ] |
| 376 | unsafe fn set(this: *const Self) { |
| 377 | if (*this).counter.fetch_sub(val:1, order:Ordering::SeqCst) == 1 { |
| 378 | LockLatch::set(&(*this).lock_latch); |
| 379 | } |
| 380 | } |
| 381 | } |
| 382 | |
| 383 | /// `&L` without any implication of `dereferenceable` for `Latch::set` |
| 384 | pub(super) struct LatchRef<'a, L> { |
| 385 | inner: *const L, |
| 386 | marker: PhantomData<&'a L>, |
| 387 | } |
| 388 | |
| 389 | impl<L> LatchRef<'_, L> { |
| 390 | pub(super) fn new(inner: &L) -> LatchRef<'_, L> { |
| 391 | LatchRef { |
| 392 | inner, |
| 393 | marker: PhantomData, |
| 394 | } |
| 395 | } |
| 396 | } |
| 397 | |
| 398 | unsafe impl<L: Sync> Sync for LatchRef<'_, L> {} |
| 399 | |
| 400 | impl<L> Deref for LatchRef<'_, L> { |
| 401 | type Target = L; |
| 402 | |
| 403 | fn deref(&self) -> &L { |
| 404 | // SAFETY: if we have &self, the inner latch is still alive |
| 405 | unsafe { &*self.inner } |
| 406 | } |
| 407 | } |
| 408 | |
| 409 | impl<L: Latch> Latch for LatchRef<'_, L> { |
| 410 | #[inline ] |
| 411 | unsafe fn set(this: *const Self) { |
| 412 | L::set((*this).inner); |
| 413 | } |
| 414 | } |
| 415 | |