1 | //! Thread parking and unparking. |
2 | //! |
3 | //! A [`Parker`] is in either the notified or unnotified state. The [`park()`][`Parker::park()`] method blocks |
4 | //! the current thread until the [`Parker`] becomes notified and then puts it back into the unnotified |
5 | //! state. The [`unpark()`][`Unparker::unpark()`] method puts it into the notified state. |
6 | //! |
7 | //! This API is similar to [`thread::park()`] and [`Thread::unpark()`] from the standard library. |
8 | //! The difference is that the state "token" managed by those functions is shared across an entire |
9 | //! thread, and anyone can call [`thread::current()`] to access it. If you use `park` and `unpark`, |
10 | //! but you also call a function that uses `park` and `unpark` internally, that function could |
11 | //! cause a deadlock by consuming a wakeup that was intended for you. The [`Parker`] object in this |
12 | //! crate avoids that problem by managing its own state, which isn't shared with unrelated callers. |
13 | //! |
14 | //! [`thread::park()`]: https://doc.rust-lang.org/std/thread/fn.park.html |
15 | //! [`Thread::unpark()`]: https://doc.rust-lang.org/std/thread/struct.Thread.html#method.unpark |
16 | //! [`thread::current()`]: https://doc.rust-lang.org/std/thread/fn.current.html |
17 | //! |
18 | //! # Examples |
19 | //! |
20 | //! ``` |
21 | //! use std::thread; |
22 | //! use std::time::Duration; |
23 | //! use parking::Parker; |
24 | //! |
25 | //! let p = Parker::new(); |
26 | //! let u = p.unparker(); |
27 | //! |
28 | //! // Notify the parker. |
29 | //! u.unpark(); |
30 | //! |
31 | //! // Wakes up immediately because the parker is notified. |
32 | //! p.park(); |
33 | //! |
34 | //! thread::spawn(move || { |
35 | //! thread::sleep(Duration::from_millis(500)); |
36 | //! u.unpark(); |
37 | //! }); |
38 | //! |
39 | //! // Wakes up when `u.unpark()` notifies and then goes back into unnotified state. |
40 | //! p.park(); |
41 | //! ``` |
42 | |
43 | #![forbid (unsafe_code)] |
44 | #![warn (missing_docs, missing_debug_implementations, rust_2018_idioms)] |
45 | #![doc ( |
46 | html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
47 | )] |
48 | #![doc ( |
49 | html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
50 | )] |
51 | |
52 | #[cfg (not(all(loom, feature = "loom" )))] |
53 | use std::sync; |
54 | |
55 | #[cfg (all(loom, feature = "loom" ))] |
56 | use loom::sync; |
57 | |
58 | use std::cell::Cell; |
59 | use std::fmt; |
60 | use std::marker::PhantomData; |
61 | use std::sync::Arc; |
62 | use std::task::{Wake, Waker}; |
63 | use std::time::Duration; |
64 | |
65 | #[cfg (not(all(loom, feature = "loom" )))] |
66 | use std::time::Instant; |
67 | |
68 | use sync::atomic::AtomicUsize; |
69 | use sync::atomic::Ordering::SeqCst; |
70 | use sync::{Condvar, Mutex}; |
71 | |
72 | /// Creates a parker and an associated unparker. |
73 | /// |
74 | /// # Examples |
75 | /// |
76 | /// ``` |
77 | /// let (p, u) = parking::pair(); |
78 | /// ``` |
79 | pub fn pair() -> (Parker, Unparker) { |
80 | let p: Parker = Parker::new(); |
81 | let u: Unparker = p.unparker(); |
82 | (p, u) |
83 | } |
84 | |
85 | /// Waits for a notification. |
86 | pub struct Parker { |
87 | unparker: Unparker, |
88 | _marker: PhantomData<Cell<()>>, |
89 | } |
90 | |
91 | impl Parker { |
92 | /// Creates a new parker. |
93 | /// |
94 | /// # Examples |
95 | /// |
96 | /// ``` |
97 | /// use parking::Parker; |
98 | /// |
99 | /// let p = Parker::new(); |
100 | /// ``` |
101 | /// |
102 | pub fn new() -> Parker { |
103 | Parker { |
104 | unparker: Unparker { |
105 | inner: Arc::new(Inner { |
106 | state: AtomicUsize::new(EMPTY), |
107 | lock: Mutex::new(()), |
108 | cvar: Condvar::new(), |
109 | }), |
110 | }, |
111 | _marker: PhantomData, |
112 | } |
113 | } |
114 | |
115 | /// Blocks until notified and then goes back into unnotified state. |
116 | /// |
117 | /// # Examples |
118 | /// |
119 | /// ``` |
120 | /// use parking::Parker; |
121 | /// |
122 | /// let p = Parker::new(); |
123 | /// let u = p.unparker(); |
124 | /// |
125 | /// // Notify the parker. |
126 | /// u.unpark(); |
127 | /// |
128 | /// // Wakes up immediately because the parker is notified. |
129 | /// p.park(); |
130 | /// ``` |
131 | pub fn park(&self) { |
132 | self.unparker.inner.park(None); |
133 | } |
134 | |
135 | /// Blocks until notified and then goes back into unnotified state, or times out after |
136 | /// `duration`. |
137 | /// |
138 | /// Returns `true` if notified before the timeout. |
139 | /// |
140 | /// # Examples |
141 | /// |
142 | /// ``` |
143 | /// use std::time::Duration; |
144 | /// use parking::Parker; |
145 | /// |
146 | /// let p = Parker::new(); |
147 | /// |
148 | /// // Wait for a notification, or time out after 500 ms. |
149 | /// p.park_timeout(Duration::from_millis(500)); |
150 | /// ``` |
151 | #[cfg (not(loom))] |
152 | pub fn park_timeout(&self, duration: Duration) -> bool { |
153 | self.unparker.inner.park(Some(duration)) |
154 | } |
155 | |
156 | /// Blocks until notified and then goes back into unnotified state, or times out at `instant`. |
157 | /// |
158 | /// Returns `true` if notified before the deadline. |
159 | /// |
160 | /// # Examples |
161 | /// |
162 | /// ``` |
163 | /// use std::time::{Duration, Instant}; |
164 | /// use parking::Parker; |
165 | /// |
166 | /// let p = Parker::new(); |
167 | /// |
168 | /// // Wait for a notification, or time out after 500 ms. |
169 | /// p.park_deadline(Instant::now() + Duration::from_millis(500)); |
170 | /// ``` |
171 | #[cfg (not(loom))] |
172 | pub fn park_deadline(&self, instant: Instant) -> bool { |
173 | self.unparker |
174 | .inner |
175 | .park(Some(instant.saturating_duration_since(Instant::now()))) |
176 | } |
177 | |
178 | /// Notifies the parker. |
179 | /// |
180 | /// Returns `true` if this call is the first to notify the parker, or `false` if the parker |
181 | /// was already notified. |
182 | /// |
183 | /// # Examples |
184 | /// |
185 | /// ``` |
186 | /// use std::thread; |
187 | /// use std::time::Duration; |
188 | /// use parking::Parker; |
189 | /// |
190 | /// let p = Parker::new(); |
191 | /// |
192 | /// assert_eq!(p.unpark(), true); |
193 | /// assert_eq!(p.unpark(), false); |
194 | /// |
195 | /// // Wakes up immediately. |
196 | /// p.park(); |
197 | /// ``` |
198 | pub fn unpark(&self) -> bool { |
199 | self.unparker.unpark() |
200 | } |
201 | |
202 | /// Returns a handle for unparking. |
203 | /// |
204 | /// The returned [`Unparker`] can be cloned and shared among threads. |
205 | /// |
206 | /// # Examples |
207 | /// |
208 | /// ``` |
209 | /// use parking::Parker; |
210 | /// |
211 | /// let p = Parker::new(); |
212 | /// let u = p.unparker(); |
213 | /// |
214 | /// // Notify the parker. |
215 | /// u.unpark(); |
216 | /// |
217 | /// // Wakes up immediately because the parker is notified. |
218 | /// p.park(); |
219 | /// ``` |
220 | pub fn unparker(&self) -> Unparker { |
221 | self.unparker.clone() |
222 | } |
223 | } |
224 | |
225 | impl Default for Parker { |
226 | fn default() -> Parker { |
227 | Parker::new() |
228 | } |
229 | } |
230 | |
231 | impl fmt::Debug for Parker { |
232 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
233 | f.pad("Parker { .. }" ) |
234 | } |
235 | } |
236 | |
237 | /// Notifies a parker. |
238 | pub struct Unparker { |
239 | inner: Arc<Inner>, |
240 | } |
241 | |
242 | impl Unparker { |
243 | /// Notifies the associated parker. |
244 | /// |
245 | /// Returns `true` if this call is the first to notify the parker, or `false` if the parker |
246 | /// was already notified. |
247 | /// |
248 | /// # Examples |
249 | /// |
250 | /// ``` |
251 | /// use std::thread; |
252 | /// use std::time::Duration; |
253 | /// use parking::Parker; |
254 | /// |
255 | /// let p = Parker::new(); |
256 | /// let u = p.unparker(); |
257 | /// |
258 | /// thread::spawn(move || { |
259 | /// thread::sleep(Duration::from_millis(500)); |
260 | /// u.unpark(); |
261 | /// }); |
262 | /// |
263 | /// // Wakes up when `u.unpark()` notifies and then goes back into unnotified state. |
264 | /// p.park(); |
265 | /// ``` |
266 | pub fn unpark(&self) -> bool { |
267 | self.inner.unpark() |
268 | } |
269 | |
270 | /// Indicates whether this unparker will unpark the associated parker. |
271 | /// |
272 | /// This can be used to avoid unnecessary work before calling `unpark()`. |
273 | /// |
274 | /// # Examples |
275 | /// |
276 | /// ``` |
277 | /// use parking::Parker; |
278 | /// |
279 | /// let p = Parker::new(); |
280 | /// let u = p.unparker(); |
281 | /// |
282 | /// assert!(u.will_unpark(&p)); |
283 | /// ``` |
284 | pub fn will_unpark(&self, parker: &Parker) -> bool { |
285 | Arc::ptr_eq(&self.inner, &parker.unparker.inner) |
286 | } |
287 | |
288 | /// Indicates whether two unparkers will unpark the same parker. |
289 | /// |
290 | /// # Examples |
291 | /// |
292 | /// ``` |
293 | /// use parking::Parker; |
294 | /// |
295 | /// let p = Parker::new(); |
296 | /// let u1 = p.unparker(); |
297 | /// let u2 = p.unparker(); |
298 | /// |
299 | /// assert!(u1.same_parker(&u2)); |
300 | /// ``` |
301 | pub fn same_parker(&self, other: &Unparker) -> bool { |
302 | Arc::ptr_eq(&self.inner, &other.inner) |
303 | } |
304 | } |
305 | |
306 | impl fmt::Debug for Unparker { |
307 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
308 | f.pad("Unparker { .. }" ) |
309 | } |
310 | } |
311 | |
312 | impl Clone for Unparker { |
313 | fn clone(&self) -> Unparker { |
314 | Unparker { |
315 | inner: self.inner.clone(), |
316 | } |
317 | } |
318 | } |
319 | |
320 | impl From<Unparker> for Waker { |
321 | fn from(up: Unparker) -> Self { |
322 | Waker::from(up.inner) |
323 | } |
324 | } |
325 | |
326 | const EMPTY: usize = 0; |
327 | const PARKED: usize = 1; |
328 | const NOTIFIED: usize = 2; |
329 | |
330 | struct Inner { |
331 | state: AtomicUsize, |
332 | lock: Mutex<()>, |
333 | cvar: Condvar, |
334 | } |
335 | |
336 | impl Inner { |
337 | fn park(&self, timeout: Option<Duration>) -> bool { |
338 | // If we were previously notified then we consume this notification and return quickly. |
339 | if self |
340 | .state |
341 | .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) |
342 | .is_ok() |
343 | { |
344 | return true; |
345 | } |
346 | |
347 | // If the timeout is zero, then there is no need to actually block. |
348 | if let Some(dur) = timeout { |
349 | if dur == Duration::from_millis(0) { |
350 | return false; |
351 | } |
352 | } |
353 | |
354 | // Otherwise we need to coordinate going to sleep. |
355 | let mut m = self.lock.lock().unwrap(); |
356 | |
357 | match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { |
358 | Ok(_) => {} |
359 | // Consume this notification to avoid spurious wakeups in the next park. |
360 | Err(NOTIFIED) => { |
361 | // We must read `state` here, even though we know it will be `NOTIFIED`. This is |
362 | // because `unpark` may have been called again since we read `NOTIFIED` in the |
363 | // `compare_exchange` above. We must perform an acquire operation that synchronizes |
364 | // with that `unpark` to observe any writes it made before the call to `unpark`. To |
365 | // do that we must read from the write it made to `state`. |
366 | let old = self.state.swap(EMPTY, SeqCst); |
367 | assert_eq!(old, NOTIFIED, "park state changed unexpectedly" ); |
368 | return true; |
369 | } |
370 | Err(n) => panic!("inconsistent park_timeout state: {}" , n), |
371 | } |
372 | |
373 | match timeout { |
374 | None => { |
375 | loop { |
376 | // Block the current thread on the conditional variable. |
377 | m = self.cvar.wait(m).unwrap(); |
378 | |
379 | if self |
380 | .state |
381 | .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) |
382 | .is_ok() |
383 | { |
384 | // got a notification |
385 | return true; |
386 | } |
387 | } |
388 | } |
389 | Some(timeout) => { |
390 | #[cfg (not(loom))] |
391 | { |
392 | // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a |
393 | // notification we just want to unconditionally set `state` back to `EMPTY`, either |
394 | // consuming a notification or un-flagging ourselves as parked. |
395 | let (_m, _result) = self.cvar.wait_timeout(m, timeout).unwrap(); |
396 | |
397 | match self.state.swap(EMPTY, SeqCst) { |
398 | NOTIFIED => true, // got a notification |
399 | PARKED => false, // no notification |
400 | n => panic!("inconsistent park_timeout state: {}" , n), |
401 | } |
402 | } |
403 | |
404 | #[cfg (loom)] |
405 | { |
406 | let _ = timeout; |
407 | panic!("park_timeout is not supported under loom" ); |
408 | } |
409 | } |
410 | } |
411 | } |
412 | |
413 | pub fn unpark(&self) -> bool { |
414 | // To ensure the unparked thread will observe any writes we made before this call, we must |
415 | // perform a release operation that `park` can synchronize with. To do that we must write |
416 | // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather |
417 | // than a compare-and-swap that returns if it reads `NOTIFIED` on failure. |
418 | match self.state.swap(NOTIFIED, SeqCst) { |
419 | EMPTY => return true, // no one was waiting |
420 | NOTIFIED => return false, // already unparked |
421 | PARKED => {} // gotta go wake someone up |
422 | _ => panic!("inconsistent state in unpark" ), |
423 | } |
424 | |
425 | // There is a period between when the parked thread sets `state` to `PARKED` (or last |
426 | // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`. |
427 | // If we were to notify during this period it would be ignored and then when the parked |
428 | // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this |
429 | // stage so we can acquire `lock` to wait until it is ready to receive the notification. |
430 | // |
431 | // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes |
432 | // it doesn't get woken only to have to wait for us to release `lock`. |
433 | drop(self.lock.lock().unwrap()); |
434 | self.cvar.notify_one(); |
435 | true |
436 | } |
437 | } |
438 | |
439 | impl Wake for Inner { |
440 | #[inline ] |
441 | fn wake(self: Arc<Self>) { |
442 | self.unpark(); |
443 | } |
444 | |
445 | #[inline ] |
446 | fn wake_by_ref(self: &Arc<Self>) { |
447 | self.unpark(); |
448 | } |
449 | } |
450 | |