1use crate::pin::Pin;
2use crate::sync::atomic::AtomicU32;
3use crate::sync::atomic::Ordering::{Acquire, Release};
4use crate::sys::futex::{futex_wait, futex_wake};
5use crate::time::Duration;
6
7const PARKED: u32 = u32::MAX;
8const EMPTY: u32 = 0;
9const NOTIFIED: u32 = 1;
10
11pub struct Parker {
12 state: AtomicU32,
13}
14
15// Notes about memory ordering:
16//
17// Memory ordering is only relevant for the relative ordering of operations
18// between different variables. Even Ordering::Relaxed guarantees a
19// monotonic/consistent order when looking at just a single atomic variable.
20//
21// So, since this parker is just a single atomic variable, we only need to look
22// at the ordering guarantees we need to provide to the 'outside world'.
23//
24// The only memory ordering guarantee that parking and unparking provide, is
25// that things which happened before unpark() are visible on the thread
26// returning from park() afterwards. Otherwise, it was effectively unparked
27// before unpark() was called while still consuming the 'token'.
28//
29// In other words, unpark() needs to synchronize with the part of park() that
30// consumes the token and returns.
31//
32// This is done with a release-acquire synchronization, by using
33// Ordering::Release when writing NOTIFIED (the 'token') in unpark(), and using
34// Ordering::Acquire when checking for this state in park().
35impl Parker {
36 /// Construct the futex parker. The UNIX parker implementation
37 /// requires this to happen in-place.
38 pub unsafe fn new_in_place(parker: *mut Parker) {
39 parker.write(Self { state: AtomicU32::new(EMPTY) });
40 }
41
42 // Assumes this is only called by the thread that owns the Parker,
43 // which means that `self.state != PARKED`.
44 pub unsafe fn park(self: Pin<&Self>) {
45 // Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
46 // first case.
47 if self.state.fetch_sub(1, Acquire) == NOTIFIED {
48 return;
49 }
50 loop {
51 // Wait for something to happen, assuming it's still set to PARKED.
52 futex_wait(&self.state, PARKED, None);
53 // Change NOTIFIED=>EMPTY and return in that case.
54 if self.state.compare_exchange(NOTIFIED, EMPTY, Acquire, Acquire).is_ok() {
55 return;
56 } else {
57 // Spurious wake up. We loop to try again.
58 }
59 }
60 }
61
62 // Assumes this is only called by the thread that owns the Parker,
63 // which means that `self.state != PARKED`. This implementation doesn't
64 // require `Pin`, but other implementations do.
65 pub unsafe fn park_timeout(self: Pin<&Self>, timeout: Duration) {
66 // Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
67 // first case.
68 if self.state.fetch_sub(1, Acquire) == NOTIFIED {
69 return;
70 }
71 // Wait for something to happen, assuming it's still set to PARKED.
72 futex_wait(&self.state, PARKED, Some(timeout));
73 // This is not just a store, because we need to establish a
74 // release-acquire ordering with unpark().
75 if self.state.swap(EMPTY, Acquire) == NOTIFIED {
76 // Woke up because of unpark().
77 } else {
78 // Timeout or spurious wake up.
79 // We return either way, because we can't easily tell if it was the
80 // timeout or not.
81 }
82 }
83
84 // This implementation doesn't require `Pin`, but other implementations do.
85 #[inline]
86 pub fn unpark(self: Pin<&Self>) {
87 // Change PARKED=>NOTIFIED, EMPTY=>NOTIFIED, or NOTIFIED=>NOTIFIED, and
88 // wake the thread in the first case.
89 //
90 // Note that even NOTIFIED=>NOTIFIED results in a write. This is on
91 // purpose, to make sure every unpark() has a release-acquire ordering
92 // with park().
93 if self.state.swap(NOTIFIED, Release) == PARKED {
94 futex_wake(&self.state);
95 }
96 }
97}
98