1 | //! Parks the runtime. |
2 | //! |
3 | //! A combination of the various resource driver park handles. |
4 | |
5 | use crate::loom::sync::atomic::AtomicUsize; |
6 | use crate::loom::sync::{Arc, Condvar, Mutex}; |
7 | use crate::runtime::driver::{self, Driver}; |
8 | use crate::util::TryLock; |
9 | |
10 | use std::sync::atomic::Ordering::SeqCst; |
11 | use std::time::Duration; |
12 | |
13 | #[cfg (loom)] |
14 | use crate::runtime::park::CURRENT_THREAD_PARK_COUNT; |
15 | |
16 | pub(crate) struct Parker { |
17 | inner: Arc<Inner>, |
18 | } |
19 | |
20 | pub(crate) struct Unparker { |
21 | inner: Arc<Inner>, |
22 | } |
23 | |
24 | struct Inner { |
25 | /// Avoids entering the park if possible |
26 | state: AtomicUsize, |
27 | |
28 | /// Used to coordinate access to the driver / `condvar` |
29 | mutex: Mutex<()>, |
30 | |
31 | /// `Condvar` to block on if the driver is unavailable. |
32 | condvar: Condvar, |
33 | |
34 | /// Resource (I/O, time, ...) driver |
35 | shared: Arc<Shared>, |
36 | } |
37 | |
38 | const EMPTY: usize = 0; |
39 | const PARKED_CONDVAR: usize = 1; |
40 | const PARKED_DRIVER: usize = 2; |
41 | const NOTIFIED: usize = 3; |
42 | |
43 | /// Shared across multiple Parker handles |
44 | struct Shared { |
45 | /// Shared driver. Only one thread at a time can use this |
46 | driver: TryLock<Driver>, |
47 | } |
48 | |
49 | impl Parker { |
50 | pub(crate) fn new(driver: Driver) -> Parker { |
51 | Parker { |
52 | inner: Arc::new(Inner { |
53 | state: AtomicUsize::new(EMPTY), |
54 | mutex: Mutex::new(()), |
55 | condvar: Condvar::new(), |
56 | shared: Arc::new(Shared { |
57 | driver: TryLock::new(driver), |
58 | }), |
59 | }), |
60 | } |
61 | } |
62 | |
63 | pub(crate) fn unpark(&self) -> Unparker { |
64 | Unparker { |
65 | inner: self.inner.clone(), |
66 | } |
67 | } |
68 | |
69 | pub(crate) fn park(&mut self, handle: &driver::Handle) { |
70 | self.inner.park(handle); |
71 | } |
72 | |
73 | pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) { |
74 | // Only parking with zero is supported... |
75 | assert_eq!(duration, Duration::from_millis(0)); |
76 | |
77 | if let Some(mut driver) = self.inner.shared.driver.try_lock() { |
78 | driver.park_timeout(handle, duration); |
79 | } else { |
80 | // https://github.com/tokio-rs/tokio/issues/6536 |
81 | // Hacky, but it's just for loom tests. The counter gets incremented during |
82 | // `park_timeout`, but we still have to increment the counter if we can't acquire the |
83 | // lock. |
84 | #[cfg (loom)] |
85 | CURRENT_THREAD_PARK_COUNT.with(|count| count.fetch_add(1, SeqCst)); |
86 | } |
87 | } |
88 | |
89 | pub(crate) fn shutdown(&mut self, handle: &driver::Handle) { |
90 | self.inner.shutdown(handle); |
91 | } |
92 | } |
93 | |
94 | impl Clone for Parker { |
95 | fn clone(&self) -> Parker { |
96 | Parker { |
97 | inner: Arc::new(data:Inner { |
98 | state: AtomicUsize::new(EMPTY), |
99 | mutex: Mutex::new(()), |
100 | condvar: Condvar::new(), |
101 | shared: self.inner.shared.clone(), |
102 | }), |
103 | } |
104 | } |
105 | } |
106 | |
107 | impl Unparker { |
108 | pub(crate) fn unpark(&self, driver: &driver::Handle) { |
109 | self.inner.unpark(driver); |
110 | } |
111 | } |
112 | |
113 | impl Inner { |
114 | /// Parks the current thread for at most `dur`. |
115 | fn park(&self, handle: &driver::Handle) { |
116 | // If we were previously notified then we consume this notification and |
117 | // return quickly. |
118 | if self |
119 | .state |
120 | .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) |
121 | .is_ok() |
122 | { |
123 | return; |
124 | } |
125 | |
126 | if let Some(mut driver) = self.shared.driver.try_lock() { |
127 | self.park_driver(&mut driver, handle); |
128 | } else { |
129 | self.park_condvar(); |
130 | } |
131 | } |
132 | |
133 | fn park_condvar(&self) { |
134 | // Otherwise we need to coordinate going to sleep |
135 | let mut m = self.mutex.lock(); |
136 | |
137 | match self |
138 | .state |
139 | .compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst) |
140 | { |
141 | Ok(_) => {} |
142 | Err(NOTIFIED) => { |
143 | // We must read here, even though we know it will be `NOTIFIED`. |
144 | // This is because `unpark` may have been called again since we read |
145 | // `NOTIFIED` in the `compare_exchange` above. We must perform an |
146 | // acquire operation that synchronizes with that `unpark` to observe |
147 | // any writes it made before the call to unpark. To do that we must |
148 | // read from the write it made to `state`. |
149 | let old = self.state.swap(EMPTY, SeqCst); |
150 | debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly" ); |
151 | |
152 | return; |
153 | } |
154 | Err(actual) => panic!("inconsistent park state; actual = {actual}" ), |
155 | } |
156 | |
157 | loop { |
158 | m = self.condvar.wait(m).unwrap(); |
159 | |
160 | if self |
161 | .state |
162 | .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) |
163 | .is_ok() |
164 | { |
165 | // got a notification |
166 | return; |
167 | } |
168 | |
169 | // spurious wakeup, go back to sleep |
170 | } |
171 | } |
172 | |
173 | fn park_driver(&self, driver: &mut Driver, handle: &driver::Handle) { |
174 | match self |
175 | .state |
176 | .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst) |
177 | { |
178 | Ok(_) => {} |
179 | Err(NOTIFIED) => { |
180 | // We must read here, even though we know it will be `NOTIFIED`. |
181 | // This is because `unpark` may have been called again since we read |
182 | // `NOTIFIED` in the `compare_exchange` above. We must perform an |
183 | // acquire operation that synchronizes with that `unpark` to observe |
184 | // any writes it made before the call to unpark. To do that we must |
185 | // read from the write it made to `state`. |
186 | let old = self.state.swap(EMPTY, SeqCst); |
187 | debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly" ); |
188 | |
189 | return; |
190 | } |
191 | Err(actual) => panic!("inconsistent park state; actual = {actual}" ), |
192 | } |
193 | |
194 | driver.park(handle); |
195 | |
196 | match self.state.swap(EMPTY, SeqCst) { |
197 | NOTIFIED => {} // got a notification, hurray! |
198 | PARKED_DRIVER => {} // no notification, alas |
199 | n => panic!("inconsistent park_timeout state: {n}" ), |
200 | } |
201 | } |
202 | |
203 | fn unpark(&self, driver: &driver::Handle) { |
204 | // To ensure the unparked thread will observe any writes we made before |
205 | // this call, we must perform a release operation that `park` can |
206 | // synchronize with. To do that we must write `NOTIFIED` even if `state` |
207 | // is already `NOTIFIED`. That is why this must be a swap rather than a |
208 | // compare-and-swap that returns if it reads `NOTIFIED` on failure. |
209 | match self.state.swap(NOTIFIED, SeqCst) { |
210 | EMPTY => {} // no one was waiting |
211 | NOTIFIED => {} // already unparked |
212 | PARKED_CONDVAR => self.unpark_condvar(), |
213 | PARKED_DRIVER => driver.unpark(), |
214 | actual => panic!("inconsistent state in unpark; actual = {actual}" ), |
215 | } |
216 | } |
217 | |
218 | fn unpark_condvar(&self) { |
219 | // There is a period between when the parked thread sets `state` to |
220 | // `PARKED` (or last checked `state` in the case of a spurious wake |
221 | // up) and when it actually waits on `cvar`. If we were to notify |
222 | // during this period it would be ignored and then when the parked |
223 | // thread went to sleep it would never wake up. Fortunately, it has |
224 | // `lock` locked at this stage so we can acquire `lock` to wait until |
225 | // it is ready to receive the notification. |
226 | // |
227 | // Releasing `lock` before the call to `notify_one` means that when the |
228 | // parked thread wakes it doesn't get woken only to have to wait for us |
229 | // to release `lock`. |
230 | drop(self.mutex.lock()); |
231 | |
232 | self.condvar.notify_one(); |
233 | } |
234 | |
235 | fn shutdown(&self, handle: &driver::Handle) { |
236 | if let Some(mut driver) = self.shared.driver.try_lock() { |
237 | driver.shutdown(handle); |
238 | } |
239 | |
240 | self.condvar.notify_all(); |
241 | } |
242 | } |
243 | |