1use crate::primitive::sync::atomic::AtomicUsize;
2use crate::primitive::sync::{Arc, Condvar, Mutex};
3use core::sync::atomic::Ordering::SeqCst;
4use std::fmt;
5use std::marker::PhantomData;
6use std::time::{Duration, Instant};
7
8/// A thread parking primitive.
9///
10/// Conceptually, each `Parker` has an associated token which is initially not present:
11///
12/// * The [`park`] method blocks the current thread unless or until the token is available, at
13/// which point it automatically consumes the token.
14///
15/// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for
16/// a specified maximum time.
17///
18/// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the
19/// token is initially absent, [`unpark`] followed by [`park`] will result in the second call
20/// returning immediately.
21///
22/// In other words, each `Parker` acts a bit like a spinlock that can be locked and unlocked using
23/// [`park`] and [`unpark`].
24///
25/// # Examples
26///
27/// ```
28/// use std::thread;
29/// use std::time::Duration;
30/// use crossbeam_utils::sync::Parker;
31///
32/// let p = Parker::new();
33/// let u = p.unparker().clone();
34///
35/// // Make the token available.
36/// u.unpark();
37/// // Wakes up immediately and consumes the token.
38/// p.park();
39///
40/// thread::spawn(move || {
41/// thread::sleep(Duration::from_millis(500));
42/// u.unpark();
43/// });
44///
45/// // Wakes up when `u.unpark()` provides the token.
46/// p.park();
47/// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
48/// ```
49///
50/// [`park`]: Parker::park
51/// [`park_timeout`]: Parker::park_timeout
52/// [`park_deadline`]: Parker::park_deadline
53/// [`unpark`]: Unparker::unpark
54pub struct Parker {
55 unparker: Unparker,
56 _marker: PhantomData<*const ()>,
57}
58
59unsafe impl Send for Parker {}
60
61impl Default for Parker {
62 fn default() -> Self {
63 Self {
64 unparker: Unparker {
65 inner: Arc::new(data:Inner {
66 state: AtomicUsize::new(EMPTY),
67 lock: Mutex::new(()),
68 cvar: Condvar::new(),
69 }),
70 },
71 _marker: PhantomData,
72 }
73 }
74}
75
76impl Parker {
77 /// Creates a new `Parker`.
78 ///
79 /// # Examples
80 ///
81 /// ```
82 /// use crossbeam_utils::sync::Parker;
83 ///
84 /// let p = Parker::new();
85 /// ```
86 ///
87 pub fn new() -> Parker {
88 Self::default()
89 }
90
91 /// Blocks the current thread until the token is made available.
92 ///
93 /// # Examples
94 ///
95 /// ```
96 /// use crossbeam_utils::sync::Parker;
97 ///
98 /// let p = Parker::new();
99 /// let u = p.unparker().clone();
100 ///
101 /// // Make the token available.
102 /// u.unpark();
103 ///
104 /// // Wakes up immediately and consumes the token.
105 /// p.park();
106 /// ```
107 pub fn park(&self) {
108 self.unparker.inner.park(None);
109 }
110
111 /// Blocks the current thread until the token is made available, but only for a limited time.
112 ///
113 /// # Examples
114 ///
115 /// ```
116 /// use std::time::Duration;
117 /// use crossbeam_utils::sync::Parker;
118 ///
119 /// let p = Parker::new();
120 ///
121 /// // Waits for the token to become available, but will not wait longer than 500 ms.
122 /// p.park_timeout(Duration::from_millis(500));
123 /// ```
124 pub fn park_timeout(&self, timeout: Duration) {
125 match Instant::now().checked_add(timeout) {
126 Some(deadline) => self.park_deadline(deadline),
127 None => self.park(),
128 }
129 }
130
131 /// Blocks the current thread until the token is made available, or until a certain deadline.
132 ///
133 /// # Examples
134 ///
135 /// ```
136 /// use std::time::{Duration, Instant};
137 /// use crossbeam_utils::sync::Parker;
138 ///
139 /// let p = Parker::new();
140 /// let deadline = Instant::now() + Duration::from_millis(500);
141 ///
142 /// // Waits for the token to become available, but will not wait longer than 500 ms.
143 /// p.park_deadline(deadline);
144 /// ```
145 pub fn park_deadline(&self, deadline: Instant) {
146 self.unparker.inner.park(Some(deadline))
147 }
148
149 /// Returns a reference to an associated [`Unparker`].
150 ///
151 /// The returned [`Unparker`] doesn't have to be used by reference - it can also be cloned.
152 ///
153 /// # Examples
154 ///
155 /// ```
156 /// use crossbeam_utils::sync::Parker;
157 ///
158 /// let p = Parker::new();
159 /// let u = p.unparker().clone();
160 ///
161 /// // Make the token available.
162 /// u.unpark();
163 /// // Wakes up immediately and consumes the token.
164 /// p.park();
165 /// ```
166 ///
167 /// [`park`]: Parker::park
168 /// [`park_timeout`]: Parker::park_timeout
169 pub fn unparker(&self) -> &Unparker {
170 &self.unparker
171 }
172
173 /// Converts a `Parker` into a raw pointer.
174 ///
175 /// # Examples
176 ///
177 /// ```
178 /// use crossbeam_utils::sync::Parker;
179 ///
180 /// let p = Parker::new();
181 /// let raw = Parker::into_raw(p);
182 /// # let _ = unsafe { Parker::from_raw(raw) };
183 /// ```
184 pub fn into_raw(this: Parker) -> *const () {
185 Unparker::into_raw(this.unparker)
186 }
187
188 /// Converts a raw pointer into a `Parker`.
189 ///
190 /// # Safety
191 ///
192 /// This method is safe to use only with pointers returned by [`Parker::into_raw`].
193 ///
194 /// # Examples
195 ///
196 /// ```
197 /// use crossbeam_utils::sync::Parker;
198 ///
199 /// let p = Parker::new();
200 /// let raw = Parker::into_raw(p);
201 /// let p = unsafe { Parker::from_raw(raw) };
202 /// ```
203 pub unsafe fn from_raw(ptr: *const ()) -> Parker {
204 Parker {
205 unparker: Unparker::from_raw(ptr),
206 _marker: PhantomData,
207 }
208 }
209}
210
211impl fmt::Debug for Parker {
212 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
213 f.pad("Parker { .. }")
214 }
215}
216
217/// Unparks a thread parked by the associated [`Parker`].
218pub struct Unparker {
219 inner: Arc<Inner>,
220}
221
222unsafe impl Send for Unparker {}
223unsafe impl Sync for Unparker {}
224
225impl Unparker {
226 /// Atomically makes the token available if it is not already.
227 ///
228 /// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is
229 /// any.
230 ///
231 /// # Examples
232 ///
233 /// ```
234 /// use std::thread;
235 /// use std::time::Duration;
236 /// use crossbeam_utils::sync::Parker;
237 ///
238 /// let p = Parker::new();
239 /// let u = p.unparker().clone();
240 ///
241 /// thread::spawn(move || {
242 /// thread::sleep(Duration::from_millis(500));
243 /// u.unpark();
244 /// });
245 ///
246 /// // Wakes up when `u.unpark()` provides the token.
247 /// p.park();
248 /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
249 /// ```
250 ///
251 /// [`park`]: Parker::park
252 /// [`park_timeout`]: Parker::park_timeout
253 pub fn unpark(&self) {
254 self.inner.unpark()
255 }
256
257 /// Converts an `Unparker` into a raw pointer.
258 ///
259 /// # Examples
260 ///
261 /// ```
262 /// use crossbeam_utils::sync::{Parker, Unparker};
263 ///
264 /// let p = Parker::new();
265 /// let u = p.unparker().clone();
266 /// let raw = Unparker::into_raw(u);
267 /// # let _ = unsafe { Unparker::from_raw(raw) };
268 /// ```
269 pub fn into_raw(this: Unparker) -> *const () {
270 Arc::into_raw(this.inner).cast::<()>()
271 }
272
273 /// Converts a raw pointer into an `Unparker`.
274 ///
275 /// # Safety
276 ///
277 /// This method is safe to use only with pointers returned by [`Unparker::into_raw`].
278 ///
279 /// # Examples
280 ///
281 /// ```
282 /// use crossbeam_utils::sync::{Parker, Unparker};
283 ///
284 /// let p = Parker::new();
285 /// let u = p.unparker().clone();
286 ///
287 /// let raw = Unparker::into_raw(u);
288 /// let u = unsafe { Unparker::from_raw(raw) };
289 /// ```
290 pub unsafe fn from_raw(ptr: *const ()) -> Unparker {
291 Unparker {
292 inner: Arc::from_raw(ptr.cast::<Inner>()),
293 }
294 }
295}
296
297impl fmt::Debug for Unparker {
298 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
299 f.pad("Unparker { .. }")
300 }
301}
302
303impl Clone for Unparker {
304 fn clone(&self) -> Unparker {
305 Unparker {
306 inner: self.inner.clone(),
307 }
308 }
309}
310
311const EMPTY: usize = 0;
312const PARKED: usize = 1;
313const NOTIFIED: usize = 2;
314
315struct Inner {
316 state: AtomicUsize,
317 lock: Mutex<()>,
318 cvar: Condvar,
319}
320
321impl Inner {
322 fn park(&self, deadline: Option<Instant>) {
323 // If we were previously notified then we consume this notification and return quickly.
324 if self
325 .state
326 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
327 .is_ok()
328 {
329 return;
330 }
331
332 // If the timeout is zero, then there is no need to actually block.
333 if let Some(deadline) = deadline {
334 if deadline <= Instant::now() {
335 return;
336 }
337 }
338
339 // Otherwise we need to coordinate going to sleep.
340 let mut m = self.lock.lock().unwrap();
341
342 match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
343 Ok(_) => {}
344 // Consume this notification to avoid spurious wakeups in the next park.
345 Err(NOTIFIED) => {
346 // We must read `state` here, even though we know it will be `NOTIFIED`. This is
347 // because `unpark` may have been called again since we read `NOTIFIED` in the
348 // `compare_exchange` above. We must perform an acquire operation that synchronizes
349 // with that `unpark` to observe any writes it made before the call to `unpark`. To
350 // do that we must read from the write it made to `state`.
351 let old = self.state.swap(EMPTY, SeqCst);
352 assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
353 return;
354 }
355 Err(n) => panic!("inconsistent park_timeout state: {}", n),
356 }
357
358 loop {
359 // Block the current thread on the conditional variable.
360 m = match deadline {
361 None => self.cvar.wait(m).unwrap(),
362 Some(deadline) => {
363 let now = Instant::now();
364 if now < deadline {
365 // We could check for a timeout here, in the return value of wait_timeout,
366 // but in the case that a timeout and an unpark arrive simultaneously, we
367 // prefer to report the former.
368 self.cvar.wait_timeout(m, deadline - now).unwrap().0
369 } else {
370 // We've timed out; swap out the state back to empty on our way out
371 match self.state.swap(EMPTY, SeqCst) {
372 NOTIFIED | PARKED => return,
373 n => panic!("inconsistent park_timeout state: {}", n),
374 };
375 }
376 }
377 };
378
379 if self
380 .state
381 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
382 .is_ok()
383 {
384 // got a notification
385 return;
386 }
387
388 // Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught
389 // in the branch above, when we discover the deadline is in the past
390 }
391 }
392
393 pub(crate) fn unpark(&self) {
394 // To ensure the unparked thread will observe any writes we made before this call, we must
395 // perform a release operation that `park` can synchronize with. To do that we must write
396 // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
397 // than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
398 match self.state.swap(NOTIFIED, SeqCst) {
399 EMPTY => return, // no one was waiting
400 NOTIFIED => return, // already unparked
401 PARKED => {} // gotta go wake someone up
402 _ => panic!("inconsistent state in unpark"),
403 }
404
405 // There is a period between when the parked thread sets `state` to `PARKED` (or last
406 // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
407 // If we were to notify during this period it would be ignored and then when the parked
408 // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
409 // stage so we can acquire `lock` to wait until it is ready to receive the notification.
410 //
411 // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
412 // it doesn't get woken only to have to wait for us to release `lock`.
413 drop(self.lock.lock().unwrap());
414 self.cvar.notify_one();
415 }
416}
417