1 | //! Thread parking and unparking. |
2 | //! |
3 | //! A parker is in either notified or unnotified state. Method [`park()`][`Parker::park()`] blocks |
4 | //! the current thread until the parker becomes notified and then puts it back into unnotified |
5 | //! state. Method [`unpark()`][`Unparker::unpark()`] puts it into notified state. |
6 | //! |
7 | //! # Examples |
8 | //! |
9 | //! ``` |
10 | //! use std::thread; |
11 | //! use std::time::Duration; |
12 | //! use parking::Parker; |
13 | //! |
14 | //! let p = Parker::new(); |
15 | //! let u = p.unparker(); |
16 | //! |
17 | //! // Notify the parker. |
18 | //! u.unpark(); |
19 | //! |
20 | //! // Wakes up immediately because the parker is notified. |
21 | //! p.park(); |
22 | //! |
23 | //! thread::spawn(move || { |
24 | //! thread::sleep(Duration::from_millis(500)); |
25 | //! u.unpark(); |
26 | //! }); |
27 | //! |
28 | //! // Wakes up when `u.unpark()` notifies and then goes back into unnotified state. |
29 | //! p.park(); |
30 | //! ``` |
31 | |
32 | #![forbid (unsafe_code)] |
33 | #![warn (missing_docs, missing_debug_implementations, rust_2018_idioms)] |
34 | #![doc ( |
35 | html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
36 | )] |
37 | #![doc ( |
38 | html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
39 | )] |
40 | |
41 | #[cfg (not(all(loom, feature = "loom" )))] |
42 | use std::sync; |
43 | |
44 | #[cfg (all(loom, feature = "loom" ))] |
45 | use loom::sync; |
46 | |
47 | use std::cell::Cell; |
48 | use std::fmt; |
49 | use std::marker::PhantomData; |
50 | use std::sync::Arc; |
51 | use std::task::{Wake, Waker}; |
52 | use std::time::Duration; |
53 | |
54 | #[cfg (not(all(loom, feature = "loom" )))] |
55 | use std::time::Instant; |
56 | |
57 | use sync::atomic::AtomicUsize; |
58 | use sync::atomic::Ordering::SeqCst; |
59 | use sync::{Condvar, Mutex}; |
60 | |
61 | /// Creates a parker and an associated unparker. |
62 | /// |
63 | /// # Examples |
64 | /// |
65 | /// ``` |
66 | /// let (p, u) = parking::pair(); |
67 | /// ``` |
68 | pub fn pair() -> (Parker, Unparker) { |
69 | let p: Parker = Parker::new(); |
70 | let u: Unparker = p.unparker(); |
71 | (p, u) |
72 | } |
73 | |
74 | /// Waits for a notification. |
75 | pub struct Parker { |
76 | unparker: Unparker, |
77 | _marker: PhantomData<Cell<()>>, |
78 | } |
79 | |
80 | impl Parker { |
81 | /// Creates a new parker. |
82 | /// |
83 | /// # Examples |
84 | /// |
85 | /// ``` |
86 | /// use parking::Parker; |
87 | /// |
88 | /// let p = Parker::new(); |
89 | /// ``` |
90 | /// |
91 | pub fn new() -> Parker { |
92 | Parker { |
93 | unparker: Unparker { |
94 | inner: Arc::new(Inner { |
95 | state: AtomicUsize::new(EMPTY), |
96 | lock: Mutex::new(()), |
97 | cvar: Condvar::new(), |
98 | }), |
99 | }, |
100 | _marker: PhantomData, |
101 | } |
102 | } |
103 | |
104 | /// Blocks until notified and then goes back into unnotified state. |
105 | /// |
106 | /// # Examples |
107 | /// |
108 | /// ``` |
109 | /// use parking::Parker; |
110 | /// |
111 | /// let p = Parker::new(); |
112 | /// let u = p.unparker(); |
113 | /// |
114 | /// // Notify the parker. |
115 | /// u.unpark(); |
116 | /// |
117 | /// // Wakes up immediately because the parker is notified. |
118 | /// p.park(); |
119 | /// ``` |
120 | pub fn park(&self) { |
121 | self.unparker.inner.park(None); |
122 | } |
123 | |
124 | /// Blocks until notified and then goes back into unnotified state, or times out after |
125 | /// `duration`. |
126 | /// |
127 | /// Returns `true` if notified before the timeout. |
128 | /// |
129 | /// # Examples |
130 | /// |
131 | /// ``` |
132 | /// use std::time::Duration; |
133 | /// use parking::Parker; |
134 | /// |
135 | /// let p = Parker::new(); |
136 | /// |
137 | /// // Wait for a notification, or time out after 500 ms. |
138 | /// p.park_timeout(Duration::from_millis(500)); |
139 | /// ``` |
140 | #[cfg (not(loom))] |
141 | pub fn park_timeout(&self, duration: Duration) -> bool { |
142 | self.unparker.inner.park(Some(duration)) |
143 | } |
144 | |
145 | /// Blocks until notified and then goes back into unnotified state, or times out at `instant`. |
146 | /// |
147 | /// Returns `true` if notified before the deadline. |
148 | /// |
149 | /// # Examples |
150 | /// |
151 | /// ``` |
152 | /// use std::time::{Duration, Instant}; |
153 | /// use parking::Parker; |
154 | /// |
155 | /// let p = Parker::new(); |
156 | /// |
157 | /// // Wait for a notification, or time out after 500 ms. |
158 | /// p.park_deadline(Instant::now() + Duration::from_millis(500)); |
159 | /// ``` |
160 | #[cfg (not(loom))] |
161 | pub fn park_deadline(&self, instant: Instant) -> bool { |
162 | self.unparker |
163 | .inner |
164 | .park(Some(instant.saturating_duration_since(Instant::now()))) |
165 | } |
166 | |
167 | /// Notifies the parker. |
168 | /// |
169 | /// Returns `true` if this call is the first to notify the parker, or `false` if the parker |
170 | /// was already notified. |
171 | /// |
172 | /// # Examples |
173 | /// |
174 | /// ``` |
175 | /// use std::thread; |
176 | /// use std::time::Duration; |
177 | /// use parking::Parker; |
178 | /// |
179 | /// let p = Parker::new(); |
180 | /// |
181 | /// assert_eq!(p.unpark(), true); |
182 | /// assert_eq!(p.unpark(), false); |
183 | /// |
184 | /// // Wakes up immediately. |
185 | /// p.park(); |
186 | /// ``` |
187 | pub fn unpark(&self) -> bool { |
188 | self.unparker.unpark() |
189 | } |
190 | |
191 | /// Returns a handle for unparking. |
192 | /// |
193 | /// The returned [`Unparker`] can be cloned and shared among threads. |
194 | /// |
195 | /// # Examples |
196 | /// |
197 | /// ``` |
198 | /// use parking::Parker; |
199 | /// |
200 | /// let p = Parker::new(); |
201 | /// let u = p.unparker(); |
202 | /// |
203 | /// // Notify the parker. |
204 | /// u.unpark(); |
205 | /// |
206 | /// // Wakes up immediately because the parker is notified. |
207 | /// p.park(); |
208 | /// ``` |
209 | pub fn unparker(&self) -> Unparker { |
210 | self.unparker.clone() |
211 | } |
212 | } |
213 | |
214 | impl Default for Parker { |
215 | fn default() -> Parker { |
216 | Parker::new() |
217 | } |
218 | } |
219 | |
220 | impl fmt::Debug for Parker { |
221 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
222 | f.pad("Parker { .. }" ) |
223 | } |
224 | } |
225 | |
226 | /// Notifies a parker. |
227 | pub struct Unparker { |
228 | inner: Arc<Inner>, |
229 | } |
230 | |
231 | impl Unparker { |
232 | /// Notifies the associated parker. |
233 | /// |
234 | /// Returns `true` if this call is the first to notify the parker, or `false` if the parker |
235 | /// was already notified. |
236 | /// |
237 | /// # Examples |
238 | /// |
239 | /// ``` |
240 | /// use std::thread; |
241 | /// use std::time::Duration; |
242 | /// use parking::Parker; |
243 | /// |
244 | /// let p = Parker::new(); |
245 | /// let u = p.unparker(); |
246 | /// |
247 | /// thread::spawn(move || { |
248 | /// thread::sleep(Duration::from_millis(500)); |
249 | /// u.unpark(); |
250 | /// }); |
251 | /// |
252 | /// // Wakes up when `u.unpark()` notifies and then goes back into unnotified state. |
253 | /// p.park(); |
254 | /// ``` |
255 | pub fn unpark(&self) -> bool { |
256 | self.inner.unpark() |
257 | } |
258 | |
259 | /// Indicates whether this unparker will unpark the associated parker. |
260 | /// |
261 | /// This can be used to avoid unnecessary work before calling `unpark()`. |
262 | /// |
263 | /// # Examples |
264 | /// |
265 | /// ``` |
266 | /// use parking::Parker; |
267 | /// |
268 | /// let p = Parker::new(); |
269 | /// let u = p.unparker(); |
270 | /// |
271 | /// assert!(u.will_unpark(&p)); |
272 | /// ``` |
273 | pub fn will_unpark(&self, parker: &Parker) -> bool { |
274 | Arc::ptr_eq(&self.inner, &parker.unparker.inner) |
275 | } |
276 | |
277 | /// Indicates whether two unparkers will unpark the same parker. |
278 | /// |
279 | /// # Examples |
280 | /// |
281 | /// ``` |
282 | /// use parking::Parker; |
283 | /// |
284 | /// let p = Parker::new(); |
285 | /// let u1 = p.unparker(); |
286 | /// let u2 = p.unparker(); |
287 | /// |
288 | /// assert!(u1.same_parker(&u2)); |
289 | /// ``` |
290 | pub fn same_parker(&self, other: &Unparker) -> bool { |
291 | Arc::ptr_eq(&self.inner, &other.inner) |
292 | } |
293 | } |
294 | |
295 | impl fmt::Debug for Unparker { |
296 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
297 | f.pad("Unparker { .. }" ) |
298 | } |
299 | } |
300 | |
301 | impl Clone for Unparker { |
302 | fn clone(&self) -> Unparker { |
303 | Unparker { |
304 | inner: self.inner.clone(), |
305 | } |
306 | } |
307 | } |
308 | |
309 | impl From<Unparker> for Waker { |
310 | fn from(up: Unparker) -> Self { |
311 | Waker::from(up.inner) |
312 | } |
313 | } |
314 | |
315 | const EMPTY: usize = 0; |
316 | const PARKED: usize = 1; |
317 | const NOTIFIED: usize = 2; |
318 | |
319 | struct Inner { |
320 | state: AtomicUsize, |
321 | lock: Mutex<()>, |
322 | cvar: Condvar, |
323 | } |
324 | |
325 | impl Inner { |
326 | fn park(&self, timeout: Option<Duration>) -> bool { |
327 | // If we were previously notified then we consume this notification and return quickly. |
328 | if self |
329 | .state |
330 | .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) |
331 | .is_ok() |
332 | { |
333 | return true; |
334 | } |
335 | |
336 | // If the timeout is zero, then there is no need to actually block. |
337 | if let Some(dur) = timeout { |
338 | if dur == Duration::from_millis(0) { |
339 | return false; |
340 | } |
341 | } |
342 | |
343 | // Otherwise we need to coordinate going to sleep. |
344 | let mut m = self.lock.lock().unwrap(); |
345 | |
346 | match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { |
347 | Ok(_) => {} |
348 | // Consume this notification to avoid spurious wakeups in the next park. |
349 | Err(NOTIFIED) => { |
350 | // We must read `state` here, even though we know it will be `NOTIFIED`. This is |
351 | // because `unpark` may have been called again since we read `NOTIFIED` in the |
352 | // `compare_exchange` above. We must perform an acquire operation that synchronizes |
353 | // with that `unpark` to observe any writes it made before the call to `unpark`. To |
354 | // do that we must read from the write it made to `state`. |
355 | let old = self.state.swap(EMPTY, SeqCst); |
356 | assert_eq!(old, NOTIFIED, "park state changed unexpectedly" ); |
357 | return true; |
358 | } |
359 | Err(n) => panic!("inconsistent park_timeout state: {}" , n), |
360 | } |
361 | |
362 | match timeout { |
363 | None => { |
364 | loop { |
365 | // Block the current thread on the conditional variable. |
366 | m = self.cvar.wait(m).unwrap(); |
367 | |
368 | if self |
369 | .state |
370 | .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) |
371 | .is_ok() |
372 | { |
373 | // got a notification |
374 | return true; |
375 | } |
376 | } |
377 | } |
378 | Some(timeout) => { |
379 | #[cfg (not(loom))] |
380 | { |
381 | // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a |
382 | // notification we just want to unconditionally set `state` back to `EMPTY`, either |
383 | // consuming a notification or un-flagging ourselves as parked. |
384 | let (_m, _result) = self.cvar.wait_timeout(m, timeout).unwrap(); |
385 | |
386 | match self.state.swap(EMPTY, SeqCst) { |
387 | NOTIFIED => true, // got a notification |
388 | PARKED => false, // no notification |
389 | n => panic!("inconsistent park_timeout state: {}" , n), |
390 | } |
391 | } |
392 | |
393 | #[cfg (loom)] |
394 | { |
395 | let _ = timeout; |
396 | panic!("park_timeout is not supported under loom" ); |
397 | } |
398 | } |
399 | } |
400 | } |
401 | |
402 | pub fn unpark(&self) -> bool { |
403 | // To ensure the unparked thread will observe any writes we made before this call, we must |
404 | // perform a release operation that `park` can synchronize with. To do that we must write |
405 | // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather |
406 | // than a compare-and-swap that returns if it reads `NOTIFIED` on failure. |
407 | match self.state.swap(NOTIFIED, SeqCst) { |
408 | EMPTY => return true, // no one was waiting |
409 | NOTIFIED => return false, // already unparked |
410 | PARKED => {} // gotta go wake someone up |
411 | _ => panic!("inconsistent state in unpark" ), |
412 | } |
413 | |
414 | // There is a period between when the parked thread sets `state` to `PARKED` (or last |
415 | // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`. |
416 | // If we were to notify during this period it would be ignored and then when the parked |
417 | // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this |
418 | // stage so we can acquire `lock` to wait until it is ready to receive the notification. |
419 | // |
420 | // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes |
421 | // it doesn't get woken only to have to wait for us to release `lock`. |
422 | drop(self.lock.lock().unwrap()); |
423 | self.cvar.notify_one(); |
424 | true |
425 | } |
426 | } |
427 | |
428 | impl Wake for Inner { |
429 | #[inline ] |
430 | fn wake(self: Arc<Self>) { |
431 | self.unpark(); |
432 | } |
433 | |
434 | #[inline ] |
435 | fn wake_by_ref(self: &Arc<Self>) { |
436 | self.unpark(); |
437 | } |
438 | } |
439 | |