1 | //! Parks the runtime. |
2 | //! |
3 | //! A combination of the various resource driver park handles. |
4 | |
5 | use crate::loom::sync::atomic::AtomicUsize; |
6 | use crate::loom::sync::{Arc, Condvar, Mutex}; |
7 | use crate::runtime::driver::{self, Driver}; |
8 | use crate::util::TryLock; |
9 | |
10 | use std::sync::atomic::Ordering::SeqCst; |
11 | use std::time::Duration; |
12 | |
13 | pub(crate) struct Parker { |
14 | inner: Arc<Inner>, |
15 | } |
16 | |
17 | pub(crate) struct Unparker { |
18 | inner: Arc<Inner>, |
19 | } |
20 | |
21 | struct Inner { |
22 | /// Avoids entering the park if possible |
23 | state: AtomicUsize, |
24 | |
25 | /// Used to coordinate access to the driver / condvar |
26 | mutex: Mutex<()>, |
27 | |
28 | /// Condvar to block on if the driver is unavailable. |
29 | condvar: Condvar, |
30 | |
31 | /// Resource (I/O, time, ...) driver |
32 | shared: Arc<Shared>, |
33 | } |
34 | |
35 | const EMPTY: usize = 0; |
36 | const PARKED_CONDVAR: usize = 1; |
37 | const PARKED_DRIVER: usize = 2; |
38 | const NOTIFIED: usize = 3; |
39 | |
40 | /// Shared across multiple Parker handles |
41 | struct Shared { |
42 | /// Shared driver. Only one thread at a time can use this |
43 | driver: TryLock<Driver>, |
44 | } |
45 | |
46 | impl Parker { |
47 | pub(crate) fn new(driver: Driver) -> Parker { |
48 | Parker { |
49 | inner: Arc::new(Inner { |
50 | state: AtomicUsize::new(EMPTY), |
51 | mutex: Mutex::new(()), |
52 | condvar: Condvar::new(), |
53 | shared: Arc::new(Shared { |
54 | driver: TryLock::new(driver), |
55 | }), |
56 | }), |
57 | } |
58 | } |
59 | |
60 | pub(crate) fn unpark(&self) -> Unparker { |
61 | Unparker { |
62 | inner: self.inner.clone(), |
63 | } |
64 | } |
65 | |
66 | pub(crate) fn park(&mut self, handle: &driver::Handle) { |
67 | self.inner.park(handle); |
68 | } |
69 | |
70 | pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) { |
71 | // Only parking with zero is supported... |
72 | assert_eq!(duration, Duration::from_millis(0)); |
73 | |
74 | if let Some(mut driver) = self.inner.shared.driver.try_lock() { |
75 | driver.park_timeout(handle, duration); |
76 | } |
77 | } |
78 | |
79 | pub(crate) fn shutdown(&mut self, handle: &driver::Handle) { |
80 | self.inner.shutdown(handle); |
81 | } |
82 | } |
83 | |
84 | impl Clone for Parker { |
85 | fn clone(&self) -> Parker { |
86 | Parker { |
87 | inner: Arc::new(Inner { |
88 | state: AtomicUsize::new(EMPTY), |
89 | mutex: Mutex::new(()), |
90 | condvar: Condvar::new(), |
91 | shared: self.inner.shared.clone(), |
92 | }), |
93 | } |
94 | } |
95 | } |
96 | |
97 | impl Unparker { |
98 | pub(crate) fn unpark(&self, driver: &driver::Handle) { |
99 | self.inner.unpark(driver); |
100 | } |
101 | } |
102 | |
103 | impl Inner { |
104 | /// Parks the current thread for at most `dur`. |
105 | fn park(&self, handle: &driver::Handle) { |
106 | // If we were previously notified then we consume this notification and |
107 | // return quickly. |
108 | if self |
109 | .state |
110 | .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) |
111 | .is_ok() |
112 | { |
113 | return; |
114 | } |
115 | |
116 | if let Some(mut driver) = self.shared.driver.try_lock() { |
117 | self.park_driver(&mut driver, handle); |
118 | } else { |
119 | self.park_condvar(); |
120 | } |
121 | } |
122 | |
123 | fn park_condvar(&self) { |
124 | // Otherwise we need to coordinate going to sleep |
125 | let mut m = self.mutex.lock(); |
126 | |
127 | match self |
128 | .state |
129 | .compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst) |
130 | { |
131 | Ok(_) => {} |
132 | Err(NOTIFIED) => { |
133 | // We must read here, even though we know it will be `NOTIFIED`. |
134 | // This is because `unpark` may have been called again since we read |
135 | // `NOTIFIED` in the `compare_exchange` above. We must perform an |
136 | // acquire operation that synchronizes with that `unpark` to observe |
137 | // any writes it made before the call to unpark. To do that we must |
138 | // read from the write it made to `state`. |
139 | let old = self.state.swap(EMPTY, SeqCst); |
140 | debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly" ); |
141 | |
142 | return; |
143 | } |
144 | Err(actual) => panic!("inconsistent park state; actual = {}" , actual), |
145 | } |
146 | |
147 | loop { |
148 | m = self.condvar.wait(m).unwrap(); |
149 | |
150 | if self |
151 | .state |
152 | .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) |
153 | .is_ok() |
154 | { |
155 | // got a notification |
156 | return; |
157 | } |
158 | |
159 | // spurious wakeup, go back to sleep |
160 | } |
161 | } |
162 | |
163 | fn park_driver(&self, driver: &mut Driver, handle: &driver::Handle) { |
164 | match self |
165 | .state |
166 | .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst) |
167 | { |
168 | Ok(_) => {} |
169 | Err(NOTIFIED) => { |
170 | // We must read here, even though we know it will be `NOTIFIED`. |
171 | // This is because `unpark` may have been called again since we read |
172 | // `NOTIFIED` in the `compare_exchange` above. We must perform an |
173 | // acquire operation that synchronizes with that `unpark` to observe |
174 | // any writes it made before the call to unpark. To do that we must |
175 | // read from the write it made to `state`. |
176 | let old = self.state.swap(EMPTY, SeqCst); |
177 | debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly" ); |
178 | |
179 | return; |
180 | } |
181 | Err(actual) => panic!("inconsistent park state; actual = {}" , actual), |
182 | } |
183 | |
184 | driver.park(handle); |
185 | |
186 | match self.state.swap(EMPTY, SeqCst) { |
187 | NOTIFIED => {} // got a notification, hurray! |
188 | PARKED_DRIVER => {} // no notification, alas |
189 | n => panic!("inconsistent park_timeout state: {}" , n), |
190 | } |
191 | } |
192 | |
193 | fn unpark(&self, driver: &driver::Handle) { |
194 | // To ensure the unparked thread will observe any writes we made before |
195 | // this call, we must perform a release operation that `park` can |
196 | // synchronize with. To do that we must write `NOTIFIED` even if `state` |
197 | // is already `NOTIFIED`. That is why this must be a swap rather than a |
198 | // compare-and-swap that returns if it reads `NOTIFIED` on failure. |
199 | match self.state.swap(NOTIFIED, SeqCst) { |
200 | EMPTY => {} // no one was waiting |
201 | NOTIFIED => {} // already unparked |
202 | PARKED_CONDVAR => self.unpark_condvar(), |
203 | PARKED_DRIVER => driver.unpark(), |
204 | actual => panic!("inconsistent state in unpark; actual = {}" , actual), |
205 | } |
206 | } |
207 | |
208 | fn unpark_condvar(&self) { |
209 | // There is a period between when the parked thread sets `state` to |
210 | // `PARKED` (or last checked `state` in the case of a spurious wake |
211 | // up) and when it actually waits on `cvar`. If we were to notify |
212 | // during this period it would be ignored and then when the parked |
213 | // thread went to sleep it would never wake up. Fortunately, it has |
214 | // `lock` locked at this stage so we can acquire `lock` to wait until |
215 | // it is ready to receive the notification. |
216 | // |
217 | // Releasing `lock` before the call to `notify_one` means that when the |
218 | // parked thread wakes it doesn't get woken only to have to wait for us |
219 | // to release `lock`. |
220 | drop(self.mutex.lock()); |
221 | |
222 | self.condvar.notify_one(); |
223 | } |
224 | |
225 | fn shutdown(&self, handle: &driver::Handle) { |
226 | if let Some(mut driver) = self.shared.driver.try_lock() { |
227 | driver.shutdown(handle); |
228 | } |
229 | |
230 | self.condvar.notify_all(); |
231 | } |
232 | } |
233 | |