| 1 | //! Thread parking and unparking. |
| 2 | //! |
| 3 | //! A [`Parker`] is in either the notified or unnotified state. The [`park()`][`Parker::park()`] method blocks |
| 4 | //! the current thread until the [`Parker`] becomes notified and then puts it back into the unnotified |
| 5 | //! state. The [`unpark()`][`Unparker::unpark()`] method puts it into the notified state. |
| 6 | //! |
| 7 | //! This API is similar to [`thread::park()`] and [`Thread::unpark()`] from the standard library. |
| 8 | //! The difference is that the state "token" managed by those functions is shared across an entire |
| 9 | //! thread, and anyone can call [`thread::current()`] to access it. If you use `park` and `unpark`, |
| 10 | //! but you also call a function that uses `park` and `unpark` internally, that function could |
| 11 | //! cause a deadlock by consuming a wakeup that was intended for you. The [`Parker`] object in this |
| 12 | //! crate avoids that problem by managing its own state, which isn't shared with unrelated callers. |
| 13 | //! |
| 14 | //! [`thread::park()`]: https://doc.rust-lang.org/std/thread/fn.park.html |
| 15 | //! [`Thread::unpark()`]: https://doc.rust-lang.org/std/thread/struct.Thread.html#method.unpark |
| 16 | //! [`thread::current()`]: https://doc.rust-lang.org/std/thread/fn.current.html |
| 17 | //! |
| 18 | //! # Examples |
| 19 | //! |
| 20 | //! ``` |
| 21 | //! use std::thread; |
| 22 | //! use std::time::Duration; |
| 23 | //! use parking::Parker; |
| 24 | //! |
| 25 | //! let p = Parker::new(); |
| 26 | //! let u = p.unparker(); |
| 27 | //! |
| 28 | //! // Notify the parker. |
| 29 | //! u.unpark(); |
| 30 | //! |
| 31 | //! // Wakes up immediately because the parker is notified. |
| 32 | //! p.park(); |
| 33 | //! |
| 34 | //! thread::spawn(move || { |
| 35 | //! thread::sleep(Duration::from_millis(500)); |
| 36 | //! u.unpark(); |
| 37 | //! }); |
| 38 | //! |
| 39 | //! // Wakes up when `u.unpark()` notifies and then goes back into unnotified state. |
| 40 | //! p.park(); |
| 41 | //! ``` |
| 42 | |
| 43 | #![forbid (unsafe_code)] |
| 44 | #![warn (missing_docs, missing_debug_implementations, rust_2018_idioms)] |
| 45 | #![doc ( |
| 46 | html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
| 47 | )] |
| 48 | #![doc ( |
| 49 | html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
| 50 | )] |
| 51 | |
| 52 | #[cfg (not(all(loom, feature = "loom" )))] |
| 53 | use std::sync; |
| 54 | |
| 55 | #[cfg (all(loom, feature = "loom" ))] |
| 56 | use loom::sync; |
| 57 | |
| 58 | use std::cell::Cell; |
| 59 | use std::fmt; |
| 60 | use std::marker::PhantomData; |
| 61 | use std::sync::Arc; |
| 62 | use std::task::{Wake, Waker}; |
| 63 | use std::time::Duration; |
| 64 | |
| 65 | #[cfg (not(all(loom, feature = "loom" )))] |
| 66 | use std::time::Instant; |
| 67 | |
| 68 | use sync::atomic::AtomicUsize; |
| 69 | use sync::atomic::Ordering::SeqCst; |
| 70 | use sync::{Condvar, Mutex}; |
| 71 | |
| 72 | /// Creates a parker and an associated unparker. |
| 73 | /// |
| 74 | /// # Examples |
| 75 | /// |
| 76 | /// ``` |
| 77 | /// let (p, u) = parking::pair(); |
| 78 | /// ``` |
| 79 | pub fn pair() -> (Parker, Unparker) { |
| 80 | let p: Parker = Parker::new(); |
| 81 | let u: Unparker = p.unparker(); |
| 82 | (p, u) |
| 83 | } |
| 84 | |
| 85 | /// Waits for a notification. |
| 86 | pub struct Parker { |
| 87 | unparker: Unparker, |
| 88 | _marker: PhantomData<Cell<()>>, |
| 89 | } |
| 90 | |
| 91 | impl Parker { |
| 92 | /// Creates a new parker. |
| 93 | /// |
| 94 | /// # Examples |
| 95 | /// |
| 96 | /// ``` |
| 97 | /// use parking::Parker; |
| 98 | /// |
| 99 | /// let p = Parker::new(); |
| 100 | /// ``` |
| 101 | /// |
| 102 | pub fn new() -> Parker { |
| 103 | Parker { |
| 104 | unparker: Unparker { |
| 105 | inner: Arc::new(Inner { |
| 106 | state: AtomicUsize::new(EMPTY), |
| 107 | lock: Mutex::new(()), |
| 108 | cvar: Condvar::new(), |
| 109 | }), |
| 110 | }, |
| 111 | _marker: PhantomData, |
| 112 | } |
| 113 | } |
| 114 | |
| 115 | /// Blocks until notified and then goes back into unnotified state. |
| 116 | /// |
| 117 | /// # Examples |
| 118 | /// |
| 119 | /// ``` |
| 120 | /// use parking::Parker; |
| 121 | /// |
| 122 | /// let p = Parker::new(); |
| 123 | /// let u = p.unparker(); |
| 124 | /// |
| 125 | /// // Notify the parker. |
| 126 | /// u.unpark(); |
| 127 | /// |
| 128 | /// // Wakes up immediately because the parker is notified. |
| 129 | /// p.park(); |
| 130 | /// ``` |
| 131 | pub fn park(&self) { |
| 132 | self.unparker.inner.park(None); |
| 133 | } |
| 134 | |
| 135 | /// Blocks until notified and then goes back into unnotified state, or times out after |
| 136 | /// `duration`. |
| 137 | /// |
| 138 | /// Returns `true` if notified before the timeout. |
| 139 | /// |
| 140 | /// # Examples |
| 141 | /// |
| 142 | /// ``` |
| 143 | /// use std::time::Duration; |
| 144 | /// use parking::Parker; |
| 145 | /// |
| 146 | /// let p = Parker::new(); |
| 147 | /// |
| 148 | /// // Wait for a notification, or time out after 500 ms. |
| 149 | /// p.park_timeout(Duration::from_millis(500)); |
| 150 | /// ``` |
| 151 | #[cfg (not(loom))] |
| 152 | pub fn park_timeout(&self, duration: Duration) -> bool { |
| 153 | self.unparker.inner.park(Some(duration)) |
| 154 | } |
| 155 | |
| 156 | /// Blocks until notified and then goes back into unnotified state, or times out at `instant`. |
| 157 | /// |
| 158 | /// Returns `true` if notified before the deadline. |
| 159 | /// |
| 160 | /// # Examples |
| 161 | /// |
| 162 | /// ``` |
| 163 | /// use std::time::{Duration, Instant}; |
| 164 | /// use parking::Parker; |
| 165 | /// |
| 166 | /// let p = Parker::new(); |
| 167 | /// |
| 168 | /// // Wait for a notification, or time out after 500 ms. |
| 169 | /// p.park_deadline(Instant::now() + Duration::from_millis(500)); |
| 170 | /// ``` |
| 171 | #[cfg (not(loom))] |
| 172 | pub fn park_deadline(&self, instant: Instant) -> bool { |
| 173 | self.unparker |
| 174 | .inner |
| 175 | .park(Some(instant.saturating_duration_since(Instant::now()))) |
| 176 | } |
| 177 | |
| 178 | /// Notifies the parker. |
| 179 | /// |
| 180 | /// Returns `true` if this call is the first to notify the parker, or `false` if the parker |
| 181 | /// was already notified. |
| 182 | /// |
| 183 | /// # Examples |
| 184 | /// |
| 185 | /// ``` |
| 186 | /// use std::thread; |
| 187 | /// use std::time::Duration; |
| 188 | /// use parking::Parker; |
| 189 | /// |
| 190 | /// let p = Parker::new(); |
| 191 | /// |
| 192 | /// assert_eq!(p.unpark(), true); |
| 193 | /// assert_eq!(p.unpark(), false); |
| 194 | /// |
| 195 | /// // Wakes up immediately. |
| 196 | /// p.park(); |
| 197 | /// ``` |
| 198 | pub fn unpark(&self) -> bool { |
| 199 | self.unparker.unpark() |
| 200 | } |
| 201 | |
| 202 | /// Returns a handle for unparking. |
| 203 | /// |
| 204 | /// The returned [`Unparker`] can be cloned and shared among threads. |
| 205 | /// |
| 206 | /// # Examples |
| 207 | /// |
| 208 | /// ``` |
| 209 | /// use parking::Parker; |
| 210 | /// |
| 211 | /// let p = Parker::new(); |
| 212 | /// let u = p.unparker(); |
| 213 | /// |
| 214 | /// // Notify the parker. |
| 215 | /// u.unpark(); |
| 216 | /// |
| 217 | /// // Wakes up immediately because the parker is notified. |
| 218 | /// p.park(); |
| 219 | /// ``` |
| 220 | pub fn unparker(&self) -> Unparker { |
| 221 | self.unparker.clone() |
| 222 | } |
| 223 | } |
| 224 | |
| 225 | impl Default for Parker { |
| 226 | fn default() -> Parker { |
| 227 | Parker::new() |
| 228 | } |
| 229 | } |
| 230 | |
| 231 | impl fmt::Debug for Parker { |
| 232 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 233 | f.pad("Parker { .. }" ) |
| 234 | } |
| 235 | } |
| 236 | |
| 237 | /// Notifies a parker. |
| 238 | pub struct Unparker { |
| 239 | inner: Arc<Inner>, |
| 240 | } |
| 241 | |
| 242 | impl Unparker { |
| 243 | /// Notifies the associated parker. |
| 244 | /// |
| 245 | /// Returns `true` if this call is the first to notify the parker, or `false` if the parker |
| 246 | /// was already notified. |
| 247 | /// |
| 248 | /// # Examples |
| 249 | /// |
| 250 | /// ``` |
| 251 | /// use std::thread; |
| 252 | /// use std::time::Duration; |
| 253 | /// use parking::Parker; |
| 254 | /// |
| 255 | /// let p = Parker::new(); |
| 256 | /// let u = p.unparker(); |
| 257 | /// |
| 258 | /// thread::spawn(move || { |
| 259 | /// thread::sleep(Duration::from_millis(500)); |
| 260 | /// u.unpark(); |
| 261 | /// }); |
| 262 | /// |
| 263 | /// // Wakes up when `u.unpark()` notifies and then goes back into unnotified state. |
| 264 | /// p.park(); |
| 265 | /// ``` |
| 266 | pub fn unpark(&self) -> bool { |
| 267 | self.inner.unpark() |
| 268 | } |
| 269 | |
| 270 | /// Indicates whether this unparker will unpark the associated parker. |
| 271 | /// |
| 272 | /// This can be used to avoid unnecessary work before calling `unpark()`. |
| 273 | /// |
| 274 | /// # Examples |
| 275 | /// |
| 276 | /// ``` |
| 277 | /// use parking::Parker; |
| 278 | /// |
| 279 | /// let p = Parker::new(); |
| 280 | /// let u = p.unparker(); |
| 281 | /// |
| 282 | /// assert!(u.will_unpark(&p)); |
| 283 | /// ``` |
| 284 | pub fn will_unpark(&self, parker: &Parker) -> bool { |
| 285 | Arc::ptr_eq(&self.inner, &parker.unparker.inner) |
| 286 | } |
| 287 | |
| 288 | /// Indicates whether two unparkers will unpark the same parker. |
| 289 | /// |
| 290 | /// # Examples |
| 291 | /// |
| 292 | /// ``` |
| 293 | /// use parking::Parker; |
| 294 | /// |
| 295 | /// let p = Parker::new(); |
| 296 | /// let u1 = p.unparker(); |
| 297 | /// let u2 = p.unparker(); |
| 298 | /// |
| 299 | /// assert!(u1.same_parker(&u2)); |
| 300 | /// ``` |
| 301 | pub fn same_parker(&self, other: &Unparker) -> bool { |
| 302 | Arc::ptr_eq(&self.inner, &other.inner) |
| 303 | } |
| 304 | } |
| 305 | |
| 306 | impl fmt::Debug for Unparker { |
| 307 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 308 | f.pad("Unparker { .. }" ) |
| 309 | } |
| 310 | } |
| 311 | |
| 312 | impl Clone for Unparker { |
| 313 | fn clone(&self) -> Unparker { |
| 314 | Unparker { |
| 315 | inner: self.inner.clone(), |
| 316 | } |
| 317 | } |
| 318 | } |
| 319 | |
| 320 | impl From<Unparker> for Waker { |
| 321 | fn from(up: Unparker) -> Self { |
| 322 | Waker::from(up.inner) |
| 323 | } |
| 324 | } |
| 325 | |
| 326 | const EMPTY: usize = 0; |
| 327 | const PARKED: usize = 1; |
| 328 | const NOTIFIED: usize = 2; |
| 329 | |
| 330 | struct Inner { |
| 331 | state: AtomicUsize, |
| 332 | lock: Mutex<()>, |
| 333 | cvar: Condvar, |
| 334 | } |
| 335 | |
| 336 | impl Inner { |
| 337 | fn park(&self, timeout: Option<Duration>) -> bool { |
| 338 | // If we were previously notified then we consume this notification and return quickly. |
| 339 | if self |
| 340 | .state |
| 341 | .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) |
| 342 | .is_ok() |
| 343 | { |
| 344 | return true; |
| 345 | } |
| 346 | |
| 347 | // If the timeout is zero, then there is no need to actually block. |
| 348 | if let Some(dur) = timeout { |
| 349 | if dur == Duration::from_millis(0) { |
| 350 | return false; |
| 351 | } |
| 352 | } |
| 353 | |
| 354 | // Otherwise we need to coordinate going to sleep. |
| 355 | let mut m = self.lock.lock().unwrap(); |
| 356 | |
| 357 | match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { |
| 358 | Ok(_) => {} |
| 359 | // Consume this notification to avoid spurious wakeups in the next park. |
| 360 | Err(NOTIFIED) => { |
| 361 | // We must read `state` here, even though we know it will be `NOTIFIED`. This is |
| 362 | // because `unpark` may have been called again since we read `NOTIFIED` in the |
| 363 | // `compare_exchange` above. We must perform an acquire operation that synchronizes |
| 364 | // with that `unpark` to observe any writes it made before the call to `unpark`. To |
| 365 | // do that we must read from the write it made to `state`. |
| 366 | let old = self.state.swap(EMPTY, SeqCst); |
| 367 | assert_eq!(old, NOTIFIED, "park state changed unexpectedly" ); |
| 368 | return true; |
| 369 | } |
| 370 | Err(n) => panic!("inconsistent park_timeout state: {}" , n), |
| 371 | } |
| 372 | |
| 373 | match timeout { |
| 374 | None => { |
| 375 | loop { |
| 376 | // Block the current thread on the conditional variable. |
| 377 | m = self.cvar.wait(m).unwrap(); |
| 378 | |
| 379 | if self |
| 380 | .state |
| 381 | .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) |
| 382 | .is_ok() |
| 383 | { |
| 384 | // got a notification |
| 385 | return true; |
| 386 | } |
| 387 | } |
| 388 | } |
| 389 | Some(timeout) => { |
| 390 | #[cfg (not(loom))] |
| 391 | { |
| 392 | // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a |
| 393 | // notification we just want to unconditionally set `state` back to `EMPTY`, either |
| 394 | // consuming a notification or un-flagging ourselves as parked. |
| 395 | let (_m, _result) = self.cvar.wait_timeout(m, timeout).unwrap(); |
| 396 | |
| 397 | match self.state.swap(EMPTY, SeqCst) { |
| 398 | NOTIFIED => true, // got a notification |
| 399 | PARKED => false, // no notification |
| 400 | n => panic!("inconsistent park_timeout state: {}" , n), |
| 401 | } |
| 402 | } |
| 403 | |
| 404 | #[cfg (loom)] |
| 405 | { |
| 406 | let _ = timeout; |
| 407 | panic!("park_timeout is not supported under loom" ); |
| 408 | } |
| 409 | } |
| 410 | } |
| 411 | } |
| 412 | |
| 413 | pub fn unpark(&self) -> bool { |
| 414 | // To ensure the unparked thread will observe any writes we made before this call, we must |
| 415 | // perform a release operation that `park` can synchronize with. To do that we must write |
| 416 | // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather |
| 417 | // than a compare-and-swap that returns if it reads `NOTIFIED` on failure. |
| 418 | match self.state.swap(NOTIFIED, SeqCst) { |
| 419 | EMPTY => return true, // no one was waiting |
| 420 | NOTIFIED => return false, // already unparked |
| 421 | PARKED => {} // gotta go wake someone up |
| 422 | _ => panic!("inconsistent state in unpark" ), |
| 423 | } |
| 424 | |
| 425 | // There is a period between when the parked thread sets `state` to `PARKED` (or last |
| 426 | // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`. |
| 427 | // If we were to notify during this period it would be ignored and then when the parked |
| 428 | // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this |
| 429 | // stage so we can acquire `lock` to wait until it is ready to receive the notification. |
| 430 | // |
| 431 | // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes |
| 432 | // it doesn't get woken only to have to wait for us to release `lock`. |
| 433 | drop(self.lock.lock().unwrap()); |
| 434 | self.cvar.notify_one(); |
| 435 | true |
| 436 | } |
| 437 | } |
| 438 | |
| 439 | impl Wake for Inner { |
| 440 | #[inline ] |
| 441 | fn wake(self: Arc<Self>) { |
| 442 | self.unpark(); |
| 443 | } |
| 444 | |
| 445 | #[inline ] |
| 446 | fn wake_by_ref(self: &Arc<Self>) { |
| 447 | self.unpark(); |
| 448 | } |
| 449 | } |
| 450 | |