1use std::marker::PhantomData;
2use std::ops::Deref;
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::{Arc, Condvar, Mutex};
5use std::usize;
6
7use crate::registry::{Registry, WorkerThread};
8
9/// We define various kinds of latches, which are all a primitive signaling
10/// mechanism. A latch starts as false. Eventually someone calls `set()` and
11/// it becomes true. You can test if it has been set by calling `probe()`.
12///
13/// Some kinds of latches, but not all, support a `wait()` operation
14/// that will wait until the latch is set, blocking efficiently. That
15/// is not part of the trait since it is not possibly to do with all
16/// latches.
17///
18/// The intention is that `set()` is called once, but `probe()` may be
19/// called any number of times. Once `probe()` returns true, the memory
20/// effects that occurred before `set()` become visible.
21///
22/// It'd probably be better to refactor the API into two paired types,
23/// but that's a bit of work, and this is not a public API.
24///
25/// ## Memory ordering
26///
27/// Latches need to guarantee two things:
28///
29/// - Once `probe()` returns true, all memory effects from the `set()`
30/// are visible (in other words, the set should synchronize-with
31/// the probe).
32/// - Once `set()` occurs, the next `probe()` *will* observe it. This
33/// typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep
34/// README](/src/sleep/README.md#tickle-then-get-sleepy) for details.
35pub(super) trait Latch {
36 /// Set the latch, signalling others.
37 ///
38 /// # WARNING
39 ///
40 /// Setting a latch triggers other threads to wake up and (in some
41 /// cases) complete. This may, in turn, cause memory to be
42 /// deallocated and so forth. One must be very careful about this,
43 /// and it's typically better to read all the fields you will need
44 /// to access *before* a latch is set!
45 ///
46 /// This function operates on `*const Self` instead of `&self` to allow it
47 /// to become dangling during this call. The caller must ensure that the
48 /// pointer is valid upon entry, and not invalidated during the call by any
49 /// actions other than `set` itself.
50 unsafe fn set(this: *const Self);
51}
52
53pub(super) trait AsCoreLatch {
54 fn as_core_latch(&self) -> &CoreLatch;
55}
56
57/// Latch is not set, owning thread is awake
58const UNSET: usize = 0;
59
60/// Latch is not set, owning thread is going to sleep on this latch
61/// (but has not yet fallen asleep).
62const SLEEPY: usize = 1;
63
64/// Latch is not set, owning thread is asleep on this latch and
65/// must be awoken.
66const SLEEPING: usize = 2;
67
68/// Latch is set.
69const SET: usize = 3;
70
71/// Spin latches are the simplest, most efficient kind, but they do
72/// not support a `wait()` operation. They just have a boolean flag
73/// that becomes true when `set()` is called.
74#[derive(Debug)]
75pub(super) struct CoreLatch {
76 state: AtomicUsize,
77}
78
79impl CoreLatch {
80 #[inline]
81 fn new() -> Self {
82 Self {
83 state: AtomicUsize::new(0),
84 }
85 }
86
87 /// Returns the address of this core latch as an integer. Used
88 /// for logging.
89 #[inline]
90 pub(super) fn addr(&self) -> usize {
91 self as *const CoreLatch as usize
92 }
93
94 /// Invoked by owning thread as it prepares to sleep. Returns true
95 /// if the owning thread may proceed to fall asleep, false if the
96 /// latch was set in the meantime.
97 #[inline]
98 pub(super) fn get_sleepy(&self) -> bool {
99 self.state
100 .compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed)
101 .is_ok()
102 }
103
104 /// Invoked by owning thread as it falls asleep sleep. Returns
105 /// true if the owning thread should block, or false if the latch
106 /// was set in the meantime.
107 #[inline]
108 pub(super) fn fall_asleep(&self) -> bool {
109 self.state
110 .compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed)
111 .is_ok()
112 }
113
114 /// Invoked by owning thread as it falls asleep sleep. Returns
115 /// true if the owning thread should block, or false if the latch
116 /// was set in the meantime.
117 #[inline]
118 pub(super) fn wake_up(&self) {
119 if !self.probe() {
120 let _ =
121 self.state
122 .compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed);
123 }
124 }
125
126 /// Set the latch. If this returns true, the owning thread was sleeping
127 /// and must be awoken.
128 ///
129 /// This is private because, typically, setting a latch involves
130 /// doing some wakeups; those are encapsulated in the surrounding
131 /// latch code.
132 #[inline]
133 unsafe fn set(this: *const Self) -> bool {
134 let old_state = (*this).state.swap(SET, Ordering::AcqRel);
135 old_state == SLEEPING
136 }
137
138 /// Test if this latch has been set.
139 #[inline]
140 pub(super) fn probe(&self) -> bool {
141 self.state.load(Ordering::Acquire) == SET
142 }
143}
144
145/// Spin latches are the simplest, most efficient kind, but they do
146/// not support a `wait()` operation. They just have a boolean flag
147/// that becomes true when `set()` is called.
148pub(super) struct SpinLatch<'r> {
149 core_latch: CoreLatch,
150 registry: &'r Arc<Registry>,
151 target_worker_index: usize,
152 cross: bool,
153}
154
155impl<'r> SpinLatch<'r> {
156 /// Creates a new spin latch that is owned by `thread`. This means
157 /// that `thread` is the only thread that should be blocking on
158 /// this latch -- it also means that when the latch is set, we
159 /// will wake `thread` if it is sleeping.
160 #[inline]
161 pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> {
162 SpinLatch {
163 core_latch: CoreLatch::new(),
164 registry: thread.registry(),
165 target_worker_index: thread.index(),
166 cross: false,
167 }
168 }
169
170 /// Creates a new spin latch for cross-threadpool blocking. Notably, we
171 /// need to make sure the registry is kept alive after setting, so we can
172 /// safely call the notification.
173 #[inline]
174 pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> {
175 SpinLatch {
176 cross: true,
177 ..SpinLatch::new(thread)
178 }
179 }
180
181 #[inline]
182 pub(super) fn probe(&self) -> bool {
183 self.core_latch.probe()
184 }
185}
186
187impl<'r> AsCoreLatch for SpinLatch<'r> {
188 #[inline]
189 fn as_core_latch(&self) -> &CoreLatch {
190 &self.core_latch
191 }
192}
193
194impl<'r> Latch for SpinLatch<'r> {
195 #[inline]
196 unsafe fn set(this: *const Self) {
197 let cross_registry;
198
199 let registry: &Registry = if (*this).cross {
200 // Ensure the registry stays alive while we notify it.
201 // Otherwise, it would be possible that we set the spin
202 // latch and the other thread sees it and exits, causing
203 // the registry to be deallocated, all before we get a
204 // chance to invoke `registry.notify_worker_latch_is_set`.
205 cross_registry = Arc::clone((*this).registry);
206 &cross_registry
207 } else {
208 // If this is not a "cross-registry" spin-latch, then the
209 // thread which is performing `set` is itself ensuring
210 // that the registry stays alive. However, that doesn't
211 // include this *particular* `Arc` handle if the waiting
212 // thread then exits, so we must completely dereference it.
213 (*this).registry
214 };
215 let target_worker_index = (*this).target_worker_index;
216
217 // NOTE: Once we `set`, the target may proceed and invalidate `this`!
218 if CoreLatch::set(&(*this).core_latch) {
219 // Subtle: at this point, we can no longer read from
220 // `self`, because the thread owning this spin latch may
221 // have awoken and deallocated the latch. Therefore, we
222 // only use fields whose values we already read.
223 registry.notify_worker_latch_is_set(target_worker_index);
224 }
225 }
226}
227
228/// A Latch starts as false and eventually becomes true. You can block
229/// until it becomes true.
230#[derive(Debug)]
231pub(super) struct LockLatch {
232 m: Mutex<bool>,
233 v: Condvar,
234}
235
236impl LockLatch {
237 #[inline]
238 pub(super) fn new() -> LockLatch {
239 LockLatch {
240 m: Mutex::new(false),
241 v: Condvar::new(),
242 }
243 }
244
245 /// Block until latch is set, then resets this lock latch so it can be reused again.
246 pub(super) fn wait_and_reset(&self) {
247 let mut guard = self.m.lock().unwrap();
248 while !*guard {
249 guard = self.v.wait(guard).unwrap();
250 }
251 *guard = false;
252 }
253
254 /// Block until latch is set.
255 pub(super) fn wait(&self) {
256 let mut guard = self.m.lock().unwrap();
257 while !*guard {
258 guard = self.v.wait(guard).unwrap();
259 }
260 }
261}
262
263impl Latch for LockLatch {
264 #[inline]
265 unsafe fn set(this: *const Self) {
266 let mut guard: MutexGuard<'_, bool> = (*this).m.lock().unwrap();
267 *guard = true;
268 (*this).v.notify_all();
269 }
270}
271
272/// Counting latches are used to implement scopes. They track a
273/// counter. Unlike other latches, calling `set()` does not
274/// necessarily make the latch be considered `set()`; instead, it just
275/// decrements the counter. The latch is only "set" (in the sense that
276/// `probe()` returns true) once the counter reaches zero.
277///
278/// Note: like a `SpinLatch`, count laches are always associated with
279/// some registry that is probing them, which must be tickled when
280/// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a
281/// reference to that registry. This is because in some cases the
282/// registry owns the count-latch, and that would create a cycle. So a
283/// `CountLatch` must be given a reference to its owning registry when
284/// it is set. For this reason, it does not implement the `Latch`
285/// trait (but it doesn't have to, as it is not used in those generic
286/// contexts).
287#[derive(Debug)]
288pub(super) struct CountLatch {
289 core_latch: CoreLatch,
290 counter: AtomicUsize,
291}
292
293impl CountLatch {
294 #[inline]
295 pub(super) fn new() -> CountLatch {
296 Self::with_count(1)
297 }
298
299 #[inline]
300 pub(super) fn with_count(n: usize) -> CountLatch {
301 CountLatch {
302 core_latch: CoreLatch::new(),
303 counter: AtomicUsize::new(n),
304 }
305 }
306
307 #[inline]
308 pub(super) fn increment(&self) {
309 debug_assert!(!self.core_latch.probe());
310 self.counter.fetch_add(1, Ordering::Relaxed);
311 }
312
313 /// Decrements the latch counter by one. If this is the final
314 /// count, then the latch is **set**, and calls to `probe()` will
315 /// return true. Returns whether the latch was set.
316 #[inline]
317 pub(super) unsafe fn set(this: *const Self) -> bool {
318 if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
319 CoreLatch::set(&(*this).core_latch);
320 true
321 } else {
322 false
323 }
324 }
325
326 /// Decrements the latch counter by one and possibly set it. If
327 /// the latch is set, then the specific worker thread is tickled,
328 /// which should be the one that owns this latch.
329 #[inline]
330 pub(super) unsafe fn set_and_tickle_one(
331 this: *const Self,
332 registry: &Registry,
333 target_worker_index: usize,
334 ) {
335 if Self::set(this) {
336 registry.notify_worker_latch_is_set(target_worker_index);
337 }
338 }
339}
340
341impl AsCoreLatch for CountLatch {
342 #[inline]
343 fn as_core_latch(&self) -> &CoreLatch {
344 &self.core_latch
345 }
346}
347
348#[derive(Debug)]
349pub(super) struct CountLockLatch {
350 lock_latch: LockLatch,
351 counter: AtomicUsize,
352}
353
354impl CountLockLatch {
355 #[inline]
356 pub(super) fn with_count(n: usize) -> CountLockLatch {
357 CountLockLatch {
358 lock_latch: LockLatch::new(),
359 counter: AtomicUsize::new(n),
360 }
361 }
362
363 #[inline]
364 pub(super) fn increment(&self) {
365 let old_counter: usize = self.counter.fetch_add(val:1, order:Ordering::Relaxed);
366 debug_assert!(old_counter != 0);
367 }
368
369 pub(super) fn wait(&self) {
370 self.lock_latch.wait();
371 }
372}
373
374impl Latch for CountLockLatch {
375 #[inline]
376 unsafe fn set(this: *const Self) {
377 if (*this).counter.fetch_sub(val:1, order:Ordering::SeqCst) == 1 {
378 LockLatch::set(&(*this).lock_latch);
379 }
380 }
381}
382
383/// `&L` without any implication of `dereferenceable` for `Latch::set`
384pub(super) struct LatchRef<'a, L> {
385 inner: *const L,
386 marker: PhantomData<&'a L>,
387}
388
389impl<L> LatchRef<'_, L> {
390 pub(super) fn new(inner: &L) -> LatchRef<'_, L> {
391 LatchRef {
392 inner,
393 marker: PhantomData,
394 }
395 }
396}
397
398unsafe impl<L: Sync> Sync for LatchRef<'_, L> {}
399
400impl<L> Deref for LatchRef<'_, L> {
401 type Target = L;
402
403 fn deref(&self) -> &L {
404 // SAFETY: if we have &self, the inner latch is still alive
405 unsafe { &*self.inner }
406 }
407}
408
409impl<L: Latch> Latch for LatchRef<'_, L> {
410 #[inline]
411 unsafe fn set(this: *const Self) {
412 L::set((*this).inner);
413 }
414}
415