1//! Thread parking and unparking.
2//!
3//! A parker is in either notified or unnotified state. Method [`park()`][`Parker::park()`] blocks
4//! the current thread until the parker becomes notified and then puts it back into unnotified
5//! state. Method [`unpark()`][`Unparker::unpark()`] puts it into notified state.
6//!
7//! # Examples
8//!
9//! ```
10//! use std::thread;
11//! use std::time::Duration;
12//! use parking::Parker;
13//!
14//! let p = Parker::new();
15//! let u = p.unparker();
16//!
17//! // Notify the parker.
18//! u.unpark();
19//!
20//! // Wakes up immediately because the parker is notified.
21//! p.park();
22//!
23//! thread::spawn(move || {
24//! thread::sleep(Duration::from_millis(500));
25//! u.unpark();
26//! });
27//!
28//! // Wakes up when `u.unpark()` notifies and then goes back into unnotified state.
29//! p.park();
30//! ```
31
32#![forbid(unsafe_code)]
33#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
34#![doc(
35 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
36)]
37#![doc(
38 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
39)]
40
41#[cfg(not(all(loom, feature = "loom")))]
42use std::sync;
43
44#[cfg(all(loom, feature = "loom"))]
45use loom::sync;
46
47use std::cell::Cell;
48use std::fmt;
49use std::marker::PhantomData;
50use std::sync::Arc;
51use std::task::{Wake, Waker};
52use std::time::Duration;
53
54#[cfg(not(all(loom, feature = "loom")))]
55use std::time::Instant;
56
57use sync::atomic::AtomicUsize;
58use sync::atomic::Ordering::SeqCst;
59use sync::{Condvar, Mutex};
60
61/// Creates a parker and an associated unparker.
62///
63/// # Examples
64///
65/// ```
66/// let (p, u) = parking::pair();
67/// ```
68pub fn pair() -> (Parker, Unparker) {
69 let p: Parker = Parker::new();
70 let u: Unparker = p.unparker();
71 (p, u)
72}
73
74/// Waits for a notification.
75pub struct Parker {
76 unparker: Unparker,
77 _marker: PhantomData<Cell<()>>,
78}
79
80impl Parker {
81 /// Creates a new parker.
82 ///
83 /// # Examples
84 ///
85 /// ```
86 /// use parking::Parker;
87 ///
88 /// let p = Parker::new();
89 /// ```
90 ///
91 pub fn new() -> Parker {
92 Parker {
93 unparker: Unparker {
94 inner: Arc::new(Inner {
95 state: AtomicUsize::new(EMPTY),
96 lock: Mutex::new(()),
97 cvar: Condvar::new(),
98 }),
99 },
100 _marker: PhantomData,
101 }
102 }
103
104 /// Blocks until notified and then goes back into unnotified state.
105 ///
106 /// # Examples
107 ///
108 /// ```
109 /// use parking::Parker;
110 ///
111 /// let p = Parker::new();
112 /// let u = p.unparker();
113 ///
114 /// // Notify the parker.
115 /// u.unpark();
116 ///
117 /// // Wakes up immediately because the parker is notified.
118 /// p.park();
119 /// ```
120 pub fn park(&self) {
121 self.unparker.inner.park(None);
122 }
123
124 /// Blocks until notified and then goes back into unnotified state, or times out after
125 /// `duration`.
126 ///
127 /// Returns `true` if notified before the timeout.
128 ///
129 /// # Examples
130 ///
131 /// ```
132 /// use std::time::Duration;
133 /// use parking::Parker;
134 ///
135 /// let p = Parker::new();
136 ///
137 /// // Wait for a notification, or time out after 500 ms.
138 /// p.park_timeout(Duration::from_millis(500));
139 /// ```
140 #[cfg(not(loom))]
141 pub fn park_timeout(&self, duration: Duration) -> bool {
142 self.unparker.inner.park(Some(duration))
143 }
144
145 /// Blocks until notified and then goes back into unnotified state, or times out at `instant`.
146 ///
147 /// Returns `true` if notified before the deadline.
148 ///
149 /// # Examples
150 ///
151 /// ```
152 /// use std::time::{Duration, Instant};
153 /// use parking::Parker;
154 ///
155 /// let p = Parker::new();
156 ///
157 /// // Wait for a notification, or time out after 500 ms.
158 /// p.park_deadline(Instant::now() + Duration::from_millis(500));
159 /// ```
160 #[cfg(not(loom))]
161 pub fn park_deadline(&self, instant: Instant) -> bool {
162 self.unparker
163 .inner
164 .park(Some(instant.saturating_duration_since(Instant::now())))
165 }
166
167 /// Notifies the parker.
168 ///
169 /// Returns `true` if this call is the first to notify the parker, or `false` if the parker
170 /// was already notified.
171 ///
172 /// # Examples
173 ///
174 /// ```
175 /// use std::thread;
176 /// use std::time::Duration;
177 /// use parking::Parker;
178 ///
179 /// let p = Parker::new();
180 ///
181 /// assert_eq!(p.unpark(), true);
182 /// assert_eq!(p.unpark(), false);
183 ///
184 /// // Wakes up immediately.
185 /// p.park();
186 /// ```
187 pub fn unpark(&self) -> bool {
188 self.unparker.unpark()
189 }
190
191 /// Returns a handle for unparking.
192 ///
193 /// The returned [`Unparker`] can be cloned and shared among threads.
194 ///
195 /// # Examples
196 ///
197 /// ```
198 /// use parking::Parker;
199 ///
200 /// let p = Parker::new();
201 /// let u = p.unparker();
202 ///
203 /// // Notify the parker.
204 /// u.unpark();
205 ///
206 /// // Wakes up immediately because the parker is notified.
207 /// p.park();
208 /// ```
209 pub fn unparker(&self) -> Unparker {
210 self.unparker.clone()
211 }
212}
213
214impl Default for Parker {
215 fn default() -> Parker {
216 Parker::new()
217 }
218}
219
220impl fmt::Debug for Parker {
221 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
222 f.pad("Parker { .. }")
223 }
224}
225
226/// Notifies a parker.
227pub struct Unparker {
228 inner: Arc<Inner>,
229}
230
231impl Unparker {
232 /// Notifies the associated parker.
233 ///
234 /// Returns `true` if this call is the first to notify the parker, or `false` if the parker
235 /// was already notified.
236 ///
237 /// # Examples
238 ///
239 /// ```
240 /// use std::thread;
241 /// use std::time::Duration;
242 /// use parking::Parker;
243 ///
244 /// let p = Parker::new();
245 /// let u = p.unparker();
246 ///
247 /// thread::spawn(move || {
248 /// thread::sleep(Duration::from_millis(500));
249 /// u.unpark();
250 /// });
251 ///
252 /// // Wakes up when `u.unpark()` notifies and then goes back into unnotified state.
253 /// p.park();
254 /// ```
255 pub fn unpark(&self) -> bool {
256 self.inner.unpark()
257 }
258
259 /// Indicates whether this unparker will unpark the associated parker.
260 ///
261 /// This can be used to avoid unnecessary work before calling `unpark()`.
262 ///
263 /// # Examples
264 ///
265 /// ```
266 /// use parking::Parker;
267 ///
268 /// let p = Parker::new();
269 /// let u = p.unparker();
270 ///
271 /// assert!(u.will_unpark(&p));
272 /// ```
273 pub fn will_unpark(&self, parker: &Parker) -> bool {
274 Arc::ptr_eq(&self.inner, &parker.unparker.inner)
275 }
276
277 /// Indicates whether two unparkers will unpark the same parker.
278 ///
279 /// # Examples
280 ///
281 /// ```
282 /// use parking::Parker;
283 ///
284 /// let p = Parker::new();
285 /// let u1 = p.unparker();
286 /// let u2 = p.unparker();
287 ///
288 /// assert!(u1.same_parker(&u2));
289 /// ```
290 pub fn same_parker(&self, other: &Unparker) -> bool {
291 Arc::ptr_eq(&self.inner, &other.inner)
292 }
293}
294
295impl fmt::Debug for Unparker {
296 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
297 f.pad("Unparker { .. }")
298 }
299}
300
301impl Clone for Unparker {
302 fn clone(&self) -> Unparker {
303 Unparker {
304 inner: self.inner.clone(),
305 }
306 }
307}
308
309impl From<Unparker> for Waker {
310 fn from(up: Unparker) -> Self {
311 Waker::from(up.inner)
312 }
313}
314
315const EMPTY: usize = 0;
316const PARKED: usize = 1;
317const NOTIFIED: usize = 2;
318
319struct Inner {
320 state: AtomicUsize,
321 lock: Mutex<()>,
322 cvar: Condvar,
323}
324
325impl Inner {
326 fn park(&self, timeout: Option<Duration>) -> bool {
327 // If we were previously notified then we consume this notification and return quickly.
328 if self
329 .state
330 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
331 .is_ok()
332 {
333 return true;
334 }
335
336 // If the timeout is zero, then there is no need to actually block.
337 if let Some(dur) = timeout {
338 if dur == Duration::from_millis(0) {
339 return false;
340 }
341 }
342
343 // Otherwise we need to coordinate going to sleep.
344 let mut m = self.lock.lock().unwrap();
345
346 match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
347 Ok(_) => {}
348 // Consume this notification to avoid spurious wakeups in the next park.
349 Err(NOTIFIED) => {
350 // We must read `state` here, even though we know it will be `NOTIFIED`. This is
351 // because `unpark` may have been called again since we read `NOTIFIED` in the
352 // `compare_exchange` above. We must perform an acquire operation that synchronizes
353 // with that `unpark` to observe any writes it made before the call to `unpark`. To
354 // do that we must read from the write it made to `state`.
355 let old = self.state.swap(EMPTY, SeqCst);
356 assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
357 return true;
358 }
359 Err(n) => panic!("inconsistent park_timeout state: {}", n),
360 }
361
362 match timeout {
363 None => {
364 loop {
365 // Block the current thread on the conditional variable.
366 m = self.cvar.wait(m).unwrap();
367
368 if self
369 .state
370 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
371 .is_ok()
372 {
373 // got a notification
374 return true;
375 }
376 }
377 }
378 Some(timeout) => {
379 #[cfg(not(loom))]
380 {
381 // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a
382 // notification we just want to unconditionally set `state` back to `EMPTY`, either
383 // consuming a notification or un-flagging ourselves as parked.
384 let (_m, _result) = self.cvar.wait_timeout(m, timeout).unwrap();
385
386 match self.state.swap(EMPTY, SeqCst) {
387 NOTIFIED => true, // got a notification
388 PARKED => false, // no notification
389 n => panic!("inconsistent park_timeout state: {}", n),
390 }
391 }
392
393 #[cfg(loom)]
394 {
395 let _ = timeout;
396 panic!("park_timeout is not supported under loom");
397 }
398 }
399 }
400 }
401
402 pub fn unpark(&self) -> bool {
403 // To ensure the unparked thread will observe any writes we made before this call, we must
404 // perform a release operation that `park` can synchronize with. To do that we must write
405 // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
406 // than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
407 match self.state.swap(NOTIFIED, SeqCst) {
408 EMPTY => return true, // no one was waiting
409 NOTIFIED => return false, // already unparked
410 PARKED => {} // gotta go wake someone up
411 _ => panic!("inconsistent state in unpark"),
412 }
413
414 // There is a period between when the parked thread sets `state` to `PARKED` (or last
415 // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
416 // If we were to notify during this period it would be ignored and then when the parked
417 // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
418 // stage so we can acquire `lock` to wait until it is ready to receive the notification.
419 //
420 // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
421 // it doesn't get woken only to have to wait for us to release `lock`.
422 drop(self.lock.lock().unwrap());
423 self.cvar.notify_one();
424 true
425 }
426}
427
428impl Wake for Inner {
429 #[inline]
430 fn wake(self: Arc<Self>) {
431 self.unpark();
432 }
433
434 #[inline]
435 fn wake_by_ref(self: &Arc<Self>) {
436 self.unpark();
437 }
438}
439