1 | use std::marker::PhantomData; |
2 | use std::ops::Deref; |
3 | use std::sync::atomic::{AtomicUsize, Ordering}; |
4 | use std::sync::{Arc, Condvar, Mutex}; |
5 | use std::usize; |
6 | |
7 | use crate::registry::{Registry, WorkerThread}; |
8 | |
9 | /// We define various kinds of latches, which are all a primitive signaling |
10 | /// mechanism. A latch starts as false. Eventually someone calls `set()` and |
11 | /// it becomes true. You can test if it has been set by calling `probe()`. |
12 | /// |
13 | /// Some kinds of latches, but not all, support a `wait()` operation |
14 | /// that will wait until the latch is set, blocking efficiently. That |
15 | /// is not part of the trait since it is not possibly to do with all |
16 | /// latches. |
17 | /// |
18 | /// The intention is that `set()` is called once, but `probe()` may be |
19 | /// called any number of times. Once `probe()` returns true, the memory |
20 | /// effects that occurred before `set()` become visible. |
21 | /// |
22 | /// It'd probably be better to refactor the API into two paired types, |
23 | /// but that's a bit of work, and this is not a public API. |
24 | /// |
25 | /// ## Memory ordering |
26 | /// |
27 | /// Latches need to guarantee two things: |
28 | /// |
29 | /// - Once `probe()` returns true, all memory effects from the `set()` |
30 | /// are visible (in other words, the set should synchronize-with |
31 | /// the probe). |
32 | /// - Once `set()` occurs, the next `probe()` *will* observe it. This |
33 | /// typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep |
34 | /// README](/src/sleep/README.md#tickle-then-get-sleepy) for details. |
35 | pub(super) trait Latch { |
36 | /// Set the latch, signalling others. |
37 | /// |
38 | /// # WARNING |
39 | /// |
40 | /// Setting a latch triggers other threads to wake up and (in some |
41 | /// cases) complete. This may, in turn, cause memory to be |
42 | /// deallocated and so forth. One must be very careful about this, |
43 | /// and it's typically better to read all the fields you will need |
44 | /// to access *before* a latch is set! |
45 | /// |
46 | /// This function operates on `*const Self` instead of `&self` to allow it |
47 | /// to become dangling during this call. The caller must ensure that the |
48 | /// pointer is valid upon entry, and not invalidated during the call by any |
49 | /// actions other than `set` itself. |
50 | unsafe fn set(this: *const Self); |
51 | } |
52 | |
53 | pub(super) trait AsCoreLatch { |
54 | fn as_core_latch(&self) -> &CoreLatch; |
55 | } |
56 | |
57 | /// Latch is not set, owning thread is awake |
58 | const UNSET: usize = 0; |
59 | |
60 | /// Latch is not set, owning thread is going to sleep on this latch |
61 | /// (but has not yet fallen asleep). |
62 | const SLEEPY: usize = 1; |
63 | |
64 | /// Latch is not set, owning thread is asleep on this latch and |
65 | /// must be awoken. |
66 | const SLEEPING: usize = 2; |
67 | |
68 | /// Latch is set. |
69 | const SET: usize = 3; |
70 | |
71 | /// Spin latches are the simplest, most efficient kind, but they do |
72 | /// not support a `wait()` operation. They just have a boolean flag |
73 | /// that becomes true when `set()` is called. |
74 | #[derive(Debug)] |
75 | pub(super) struct CoreLatch { |
76 | state: AtomicUsize, |
77 | } |
78 | |
79 | impl CoreLatch { |
80 | #[inline ] |
81 | fn new() -> Self { |
82 | Self { |
83 | state: AtomicUsize::new(0), |
84 | } |
85 | } |
86 | |
87 | /// Invoked by owning thread as it prepares to sleep. Returns true |
88 | /// if the owning thread may proceed to fall asleep, false if the |
89 | /// latch was set in the meantime. |
90 | #[inline ] |
91 | pub(super) fn get_sleepy(&self) -> bool { |
92 | self.state |
93 | .compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed) |
94 | .is_ok() |
95 | } |
96 | |
97 | /// Invoked by owning thread as it falls asleep sleep. Returns |
98 | /// true if the owning thread should block, or false if the latch |
99 | /// was set in the meantime. |
100 | #[inline ] |
101 | pub(super) fn fall_asleep(&self) -> bool { |
102 | self.state |
103 | .compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed) |
104 | .is_ok() |
105 | } |
106 | |
107 | /// Invoked by owning thread as it falls asleep sleep. Returns |
108 | /// true if the owning thread should block, or false if the latch |
109 | /// was set in the meantime. |
110 | #[inline ] |
111 | pub(super) fn wake_up(&self) { |
112 | if !self.probe() { |
113 | let _ = |
114 | self.state |
115 | .compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed); |
116 | } |
117 | } |
118 | |
119 | /// Set the latch. If this returns true, the owning thread was sleeping |
120 | /// and must be awoken. |
121 | /// |
122 | /// This is private because, typically, setting a latch involves |
123 | /// doing some wakeups; those are encapsulated in the surrounding |
124 | /// latch code. |
125 | #[inline ] |
126 | unsafe fn set(this: *const Self) -> bool { |
127 | let old_state = (*this).state.swap(SET, Ordering::AcqRel); |
128 | old_state == SLEEPING |
129 | } |
130 | |
131 | /// Test if this latch has been set. |
132 | #[inline ] |
133 | pub(super) fn probe(&self) -> bool { |
134 | self.state.load(Ordering::Acquire) == SET |
135 | } |
136 | } |
137 | |
138 | impl AsCoreLatch for CoreLatch { |
139 | #[inline ] |
140 | fn as_core_latch(&self) -> &CoreLatch { |
141 | self |
142 | } |
143 | } |
144 | |
145 | /// Spin latches are the simplest, most efficient kind, but they do |
146 | /// not support a `wait()` operation. They just have a boolean flag |
147 | /// that becomes true when `set()` is called. |
148 | pub(super) struct SpinLatch<'r> { |
149 | core_latch: CoreLatch, |
150 | registry: &'r Arc<Registry>, |
151 | target_worker_index: usize, |
152 | cross: bool, |
153 | } |
154 | |
155 | impl<'r> SpinLatch<'r> { |
156 | /// Creates a new spin latch that is owned by `thread`. This means |
157 | /// that `thread` is the only thread that should be blocking on |
158 | /// this latch -- it also means that when the latch is set, we |
159 | /// will wake `thread` if it is sleeping. |
160 | #[inline ] |
161 | pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> { |
162 | SpinLatch { |
163 | core_latch: CoreLatch::new(), |
164 | registry: thread.registry(), |
165 | target_worker_index: thread.index(), |
166 | cross: false, |
167 | } |
168 | } |
169 | |
170 | /// Creates a new spin latch for cross-threadpool blocking. Notably, we |
171 | /// need to make sure the registry is kept alive after setting, so we can |
172 | /// safely call the notification. |
173 | #[inline ] |
174 | pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> { |
175 | SpinLatch { |
176 | cross: true, |
177 | ..SpinLatch::new(thread) |
178 | } |
179 | } |
180 | |
181 | #[inline ] |
182 | pub(super) fn probe(&self) -> bool { |
183 | self.core_latch.probe() |
184 | } |
185 | } |
186 | |
187 | impl<'r> AsCoreLatch for SpinLatch<'r> { |
188 | #[inline ] |
189 | fn as_core_latch(&self) -> &CoreLatch { |
190 | &self.core_latch |
191 | } |
192 | } |
193 | |
194 | impl<'r> Latch for SpinLatch<'r> { |
195 | #[inline ] |
196 | unsafe fn set(this: *const Self) { |
197 | let cross_registry; |
198 | |
199 | let registry: &Registry = if (*this).cross { |
200 | // Ensure the registry stays alive while we notify it. |
201 | // Otherwise, it would be possible that we set the spin |
202 | // latch and the other thread sees it and exits, causing |
203 | // the registry to be deallocated, all before we get a |
204 | // chance to invoke `registry.notify_worker_latch_is_set`. |
205 | cross_registry = Arc::clone((*this).registry); |
206 | &cross_registry |
207 | } else { |
208 | // If this is not a "cross-registry" spin-latch, then the |
209 | // thread which is performing `set` is itself ensuring |
210 | // that the registry stays alive. However, that doesn't |
211 | // include this *particular* `Arc` handle if the waiting |
212 | // thread then exits, so we must completely dereference it. |
213 | (*this).registry |
214 | }; |
215 | let target_worker_index = (*this).target_worker_index; |
216 | |
217 | // NOTE: Once we `set`, the target may proceed and invalidate `this`! |
218 | if CoreLatch::set(&(*this).core_latch) { |
219 | // Subtle: at this point, we can no longer read from |
220 | // `self`, because the thread owning this spin latch may |
221 | // have awoken and deallocated the latch. Therefore, we |
222 | // only use fields whose values we already read. |
223 | registry.notify_worker_latch_is_set(target_worker_index); |
224 | } |
225 | } |
226 | } |
227 | |
228 | /// A Latch starts as false and eventually becomes true. You can block |
229 | /// until it becomes true. |
230 | #[derive(Debug)] |
231 | pub(super) struct LockLatch { |
232 | m: Mutex<bool>, |
233 | v: Condvar, |
234 | } |
235 | |
236 | impl LockLatch { |
237 | #[inline ] |
238 | pub(super) fn new() -> LockLatch { |
239 | LockLatch { |
240 | m: Mutex::new(false), |
241 | v: Condvar::new(), |
242 | } |
243 | } |
244 | |
245 | /// Block until latch is set, then resets this lock latch so it can be reused again. |
246 | pub(super) fn wait_and_reset(&self) { |
247 | let mut guard = self.m.lock().unwrap(); |
248 | while !*guard { |
249 | guard = self.v.wait(guard).unwrap(); |
250 | } |
251 | *guard = false; |
252 | } |
253 | |
254 | /// Block until latch is set. |
255 | pub(super) fn wait(&self) { |
256 | let mut guard = self.m.lock().unwrap(); |
257 | while !*guard { |
258 | guard = self.v.wait(guard).unwrap(); |
259 | } |
260 | } |
261 | } |
262 | |
263 | impl Latch for LockLatch { |
264 | #[inline ] |
265 | unsafe fn set(this: *const Self) { |
266 | let mut guard = (*this).m.lock().unwrap(); |
267 | *guard = true; |
268 | (*this).v.notify_all(); |
269 | } |
270 | } |
271 | |
272 | /// Once latches are used to implement one-time blocking, primarily |
273 | /// for the termination flag of the threads in the pool. |
274 | /// |
275 | /// Note: like a `SpinLatch`, once-latches are always associated with |
276 | /// some registry that is probing them, which must be tickled when |
277 | /// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a |
278 | /// reference to that registry. This is because in some cases the |
279 | /// registry owns the once-latch, and that would create a cycle. So a |
280 | /// `OnceLatch` must be given a reference to its owning registry when |
281 | /// it is set. For this reason, it does not implement the `Latch` |
282 | /// trait (but it doesn't have to, as it is not used in those generic |
283 | /// contexts). |
284 | #[derive(Debug)] |
285 | pub(super) struct OnceLatch { |
286 | core_latch: CoreLatch, |
287 | } |
288 | |
289 | impl OnceLatch { |
290 | #[inline ] |
291 | pub(super) fn new() -> OnceLatch { |
292 | Self { |
293 | core_latch: CoreLatch::new(), |
294 | } |
295 | } |
296 | |
297 | /// Set the latch, then tickle the specific worker thread, |
298 | /// which should be the one that owns this latch. |
299 | #[inline ] |
300 | pub(super) unsafe fn set_and_tickle_one( |
301 | this: *const Self, |
302 | registry: &Registry, |
303 | target_worker_index: usize, |
304 | ) { |
305 | if CoreLatch::set(&(*this).core_latch) { |
306 | registry.notify_worker_latch_is_set(target_worker_index); |
307 | } |
308 | } |
309 | } |
310 | |
311 | impl AsCoreLatch for OnceLatch { |
312 | #[inline ] |
313 | fn as_core_latch(&self) -> &CoreLatch { |
314 | &self.core_latch |
315 | } |
316 | } |
317 | |
318 | /// Counting latches are used to implement scopes. They track a |
319 | /// counter. Unlike other latches, calling `set()` does not |
320 | /// necessarily make the latch be considered `set()`; instead, it just |
321 | /// decrements the counter. The latch is only "set" (in the sense that |
322 | /// `probe()` returns true) once the counter reaches zero. |
323 | #[derive(Debug)] |
324 | pub(super) struct CountLatch { |
325 | counter: AtomicUsize, |
326 | kind: CountLatchKind, |
327 | } |
328 | |
329 | enum CountLatchKind { |
330 | /// A latch for scopes created on a rayon thread which will participate in work- |
331 | /// stealing while it waits for completion. This thread is not necessarily part |
332 | /// of the same registry as the scope itself! |
333 | Stealing { |
334 | latch: CoreLatch, |
335 | /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool |
336 | /// with registry B, when a job completes in a thread of registry B, we may |
337 | /// need to call `notify_worker_latch_is_set()` to wake the thread in registry A. |
338 | /// That means we need a reference to registry A (since at that point we will |
339 | /// only have a reference to registry B), so we stash it here. |
340 | registry: Arc<Registry>, |
341 | /// The index of the worker to wake in `registry` |
342 | worker_index: usize, |
343 | }, |
344 | |
345 | /// A latch for scopes created on a non-rayon thread which will block to wait. |
346 | Blocking { latch: LockLatch }, |
347 | } |
348 | |
349 | impl std::fmt::Debug for CountLatchKind { |
350 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
351 | match self { |
352 | CountLatchKind::Stealing { latch, .. } => { |
353 | f.debug_tuple("Stealing" ).field(latch).finish() |
354 | } |
355 | CountLatchKind::Blocking { latch, .. } => { |
356 | f.debug_tuple("Blocking" ).field(latch).finish() |
357 | } |
358 | } |
359 | } |
360 | } |
361 | |
362 | impl CountLatch { |
363 | pub(super) fn new(owner: Option<&WorkerThread>) -> Self { |
364 | Self::with_count(1, owner) |
365 | } |
366 | |
367 | pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self { |
368 | Self { |
369 | counter: AtomicUsize::new(count), |
370 | kind: match owner { |
371 | Some(owner) => CountLatchKind::Stealing { |
372 | latch: CoreLatch::new(), |
373 | registry: Arc::clone(owner.registry()), |
374 | worker_index: owner.index(), |
375 | }, |
376 | None => CountLatchKind::Blocking { |
377 | latch: LockLatch::new(), |
378 | }, |
379 | }, |
380 | } |
381 | } |
382 | |
383 | #[inline ] |
384 | pub(super) fn increment(&self) { |
385 | let old_counter = self.counter.fetch_add(1, Ordering::Relaxed); |
386 | debug_assert!(old_counter != 0); |
387 | } |
388 | |
389 | pub(super) fn wait(&self, owner: Option<&WorkerThread>) { |
390 | match &self.kind { |
391 | CountLatchKind::Stealing { |
392 | latch, |
393 | registry, |
394 | worker_index, |
395 | } => unsafe { |
396 | let owner = owner.expect("owner thread" ); |
397 | debug_assert_eq!(registry.id(), owner.registry().id()); |
398 | debug_assert_eq!(*worker_index, owner.index()); |
399 | owner.wait_until(latch); |
400 | }, |
401 | CountLatchKind::Blocking { latch } => latch.wait(), |
402 | } |
403 | } |
404 | } |
405 | |
406 | impl Latch for CountLatch { |
407 | #[inline ] |
408 | unsafe fn set(this: *const Self) { |
409 | if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 { |
410 | // NOTE: Once we call `set` on the internal `latch`, |
411 | // the target may proceed and invalidate `this`! |
412 | match (*this).kind { |
413 | CountLatchKind::Stealing { |
414 | ref latch, |
415 | ref registry, |
416 | worker_index, |
417 | } => { |
418 | let registry = Arc::clone(registry); |
419 | if CoreLatch::set(latch) { |
420 | registry.notify_worker_latch_is_set(worker_index); |
421 | } |
422 | } |
423 | CountLatchKind::Blocking { ref latch } => LockLatch::set(latch), |
424 | } |
425 | } |
426 | } |
427 | } |
428 | |
429 | /// `&L` without any implication of `dereferenceable` for `Latch::set` |
430 | pub(super) struct LatchRef<'a, L> { |
431 | inner: *const L, |
432 | marker: PhantomData<&'a L>, |
433 | } |
434 | |
435 | impl<L> LatchRef<'_, L> { |
436 | pub(super) fn new(inner: &L) -> LatchRef<'_, L> { |
437 | LatchRef { |
438 | inner, |
439 | marker: PhantomData, |
440 | } |
441 | } |
442 | } |
443 | |
444 | unsafe impl<L: Sync> Sync for LatchRef<'_, L> {} |
445 | |
446 | impl<L> Deref for LatchRef<'_, L> { |
447 | type Target = L; |
448 | |
449 | fn deref(&self) -> &L { |
450 | // SAFETY: if we have &self, the inner latch is still alive |
451 | unsafe { &*self.inner } |
452 | } |
453 | } |
454 | |
455 | impl<L: Latch> Latch for LatchRef<'_, L> { |
456 | #[inline ] |
457 | unsafe fn set(this: *const Self) { |
458 | L::set((*this).inner); |
459 | } |
460 | } |
461 | |