1//! Parks the runtime.
2//!
3//! A combination of the various resource driver park handles.
4
5use crate::loom::sync::atomic::AtomicUsize;
6use crate::loom::sync::{Arc, Condvar, Mutex};
7use crate::runtime::driver::{self, Driver};
8use crate::util::TryLock;
9
10use std::sync::atomic::Ordering::SeqCst;
11use std::time::Duration;
12
13pub(crate) struct Parker {
14 inner: Arc<Inner>,
15}
16
17pub(crate) struct Unparker {
18 inner: Arc<Inner>,
19}
20
21struct Inner {
22 /// Avoids entering the park if possible
23 state: AtomicUsize,
24
25 /// Used to coordinate access to the driver / condvar
26 mutex: Mutex<()>,
27
28 /// Condvar to block on if the driver is unavailable.
29 condvar: Condvar,
30
31 /// Resource (I/O, time, ...) driver
32 shared: Arc<Shared>,
33}
34
35const EMPTY: usize = 0;
36const PARKED_CONDVAR: usize = 1;
37const PARKED_DRIVER: usize = 2;
38const NOTIFIED: usize = 3;
39
40/// Shared across multiple Parker handles
41struct Shared {
42 /// Shared driver. Only one thread at a time can use this
43 driver: TryLock<Driver>,
44}
45
46impl Parker {
47 pub(crate) fn new(driver: Driver) -> Parker {
48 Parker {
49 inner: Arc::new(Inner {
50 state: AtomicUsize::new(EMPTY),
51 mutex: Mutex::new(()),
52 condvar: Condvar::new(),
53 shared: Arc::new(Shared {
54 driver: TryLock::new(driver),
55 }),
56 }),
57 }
58 }
59
60 pub(crate) fn unpark(&self) -> Unparker {
61 Unparker {
62 inner: self.inner.clone(),
63 }
64 }
65
66 pub(crate) fn park(&mut self, handle: &driver::Handle) {
67 self.inner.park(handle);
68 }
69
70 pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) {
71 // Only parking with zero is supported...
72 assert_eq!(duration, Duration::from_millis(0));
73
74 if let Some(mut driver) = self.inner.shared.driver.try_lock() {
75 driver.park_timeout(handle, duration);
76 }
77 }
78
79 pub(crate) fn shutdown(&mut self, handle: &driver::Handle) {
80 self.inner.shutdown(handle);
81 }
82}
83
84impl Clone for Parker {
85 fn clone(&self) -> Parker {
86 Parker {
87 inner: Arc::new(Inner {
88 state: AtomicUsize::new(EMPTY),
89 mutex: Mutex::new(()),
90 condvar: Condvar::new(),
91 shared: self.inner.shared.clone(),
92 }),
93 }
94 }
95}
96
97impl Unparker {
98 pub(crate) fn unpark(&self, driver: &driver::Handle) {
99 self.inner.unpark(driver);
100 }
101}
102
103impl Inner {
104 /// Parks the current thread for at most `dur`.
105 fn park(&self, handle: &driver::Handle) {
106 // If we were previously notified then we consume this notification and
107 // return quickly.
108 if self
109 .state
110 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
111 .is_ok()
112 {
113 return;
114 }
115
116 if let Some(mut driver) = self.shared.driver.try_lock() {
117 self.park_driver(&mut driver, handle);
118 } else {
119 self.park_condvar();
120 }
121 }
122
123 fn park_condvar(&self) {
124 // Otherwise we need to coordinate going to sleep
125 let mut m = self.mutex.lock();
126
127 match self
128 .state
129 .compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst)
130 {
131 Ok(_) => {}
132 Err(NOTIFIED) => {
133 // We must read here, even though we know it will be `NOTIFIED`.
134 // This is because `unpark` may have been called again since we read
135 // `NOTIFIED` in the `compare_exchange` above. We must perform an
136 // acquire operation that synchronizes with that `unpark` to observe
137 // any writes it made before the call to unpark. To do that we must
138 // read from the write it made to `state`.
139 let old = self.state.swap(EMPTY, SeqCst);
140 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
141
142 return;
143 }
144 Err(actual) => panic!("inconsistent park state; actual = {}", actual),
145 }
146
147 loop {
148 m = self.condvar.wait(m).unwrap();
149
150 if self
151 .state
152 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
153 .is_ok()
154 {
155 // got a notification
156 return;
157 }
158
159 // spurious wakeup, go back to sleep
160 }
161 }
162
163 fn park_driver(&self, driver: &mut Driver, handle: &driver::Handle) {
164 match self
165 .state
166 .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst)
167 {
168 Ok(_) => {}
169 Err(NOTIFIED) => {
170 // We must read here, even though we know it will be `NOTIFIED`.
171 // This is because `unpark` may have been called again since we read
172 // `NOTIFIED` in the `compare_exchange` above. We must perform an
173 // acquire operation that synchronizes with that `unpark` to observe
174 // any writes it made before the call to unpark. To do that we must
175 // read from the write it made to `state`.
176 let old = self.state.swap(EMPTY, SeqCst);
177 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
178
179 return;
180 }
181 Err(actual) => panic!("inconsistent park state; actual = {}", actual),
182 }
183
184 driver.park(handle);
185
186 match self.state.swap(EMPTY, SeqCst) {
187 NOTIFIED => {} // got a notification, hurray!
188 PARKED_DRIVER => {} // no notification, alas
189 n => panic!("inconsistent park_timeout state: {}", n),
190 }
191 }
192
193 fn unpark(&self, driver: &driver::Handle) {
194 // To ensure the unparked thread will observe any writes we made before
195 // this call, we must perform a release operation that `park` can
196 // synchronize with. To do that we must write `NOTIFIED` even if `state`
197 // is already `NOTIFIED`. That is why this must be a swap rather than a
198 // compare-and-swap that returns if it reads `NOTIFIED` on failure.
199 match self.state.swap(NOTIFIED, SeqCst) {
200 EMPTY => {} // no one was waiting
201 NOTIFIED => {} // already unparked
202 PARKED_CONDVAR => self.unpark_condvar(),
203 PARKED_DRIVER => driver.unpark(),
204 actual => panic!("inconsistent state in unpark; actual = {}", actual),
205 }
206 }
207
208 fn unpark_condvar(&self) {
209 // There is a period between when the parked thread sets `state` to
210 // `PARKED` (or last checked `state` in the case of a spurious wake
211 // up) and when it actually waits on `cvar`. If we were to notify
212 // during this period it would be ignored and then when the parked
213 // thread went to sleep it would never wake up. Fortunately, it has
214 // `lock` locked at this stage so we can acquire `lock` to wait until
215 // it is ready to receive the notification.
216 //
217 // Releasing `lock` before the call to `notify_one` means that when the
218 // parked thread wakes it doesn't get woken only to have to wait for us
219 // to release `lock`.
220 drop(self.mutex.lock());
221
222 self.condvar.notify_one();
223 }
224
225 fn shutdown(&self, handle: &driver::Handle) {
226 if let Some(mut driver) = self.shared.driver.try_lock() {
227 driver.shutdown(handle);
228 }
229
230 self.condvar.notify_all();
231 }
232}
233