1use crate::primitive::sync::atomic::{AtomicUsize, Ordering::SeqCst};
2use crate::primitive::sync::{Arc, Condvar, Mutex};
3use std::fmt;
4use std::marker::PhantomData;
5use std::time::{Duration, Instant};
6
7/// A thread parking primitive.
8///
9/// Conceptually, each `Parker` has an associated token which is initially not present:
10///
11/// * The [`park`] method blocks the current thread unless or until the token is available, at
12/// which point it automatically consumes the token.
13///
14/// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for
15/// a specified maximum time.
16///
17/// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the
18/// token is initially absent, [`unpark`] followed by [`park`] will result in the second call
19/// returning immediately.
20///
21/// In other words, each `Parker` acts a bit like a spinlock that can be locked and unlocked using
22/// [`park`] and [`unpark`].
23///
24/// # Examples
25///
26/// ```
27/// use std::thread;
28/// use std::time::Duration;
29/// use crossbeam_utils::sync::Parker;
30///
31/// let p = Parker::new();
32/// let u = p.unparker().clone();
33///
34/// // Make the token available.
35/// u.unpark();
36/// // Wakes up immediately and consumes the token.
37/// p.park();
38///
39/// thread::spawn(move || {
40/// thread::sleep(Duration::from_millis(500));
41/// u.unpark();
42/// });
43///
44/// // Wakes up when `u.unpark()` provides the token.
45/// p.park();
46/// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
47/// ```
48///
49/// [`park`]: Parker::park
50/// [`park_timeout`]: Parker::park_timeout
51/// [`park_deadline`]: Parker::park_deadline
52/// [`unpark`]: Unparker::unpark
53pub struct Parker {
54 unparker: Unparker,
55 _marker: PhantomData<*const ()>,
56}
57
58unsafe impl Send for Parker {}
59
60impl Default for Parker {
61 fn default() -> Self {
62 Self {
63 unparker: Unparker {
64 inner: Arc::new(data:Inner {
65 state: AtomicUsize::new(EMPTY),
66 lock: Mutex::new(()),
67 cvar: Condvar::new(),
68 }),
69 },
70 _marker: PhantomData,
71 }
72 }
73}
74
75impl Parker {
76 /// Creates a new `Parker`.
77 ///
78 /// # Examples
79 ///
80 /// ```
81 /// use crossbeam_utils::sync::Parker;
82 ///
83 /// let p = Parker::new();
84 /// ```
85 ///
86 pub fn new() -> Parker {
87 Self::default()
88 }
89
90 /// Blocks the current thread until the token is made available.
91 ///
92 /// # Examples
93 ///
94 /// ```
95 /// use crossbeam_utils::sync::Parker;
96 ///
97 /// let p = Parker::new();
98 /// let u = p.unparker().clone();
99 ///
100 /// // Make the token available.
101 /// u.unpark();
102 ///
103 /// // Wakes up immediately and consumes the token.
104 /// p.park();
105 /// ```
106 pub fn park(&self) {
107 self.unparker.inner.park(None);
108 }
109
110 /// Blocks the current thread until the token is made available, but only for a limited time.
111 ///
112 /// # Examples
113 ///
114 /// ```
115 /// use std::time::Duration;
116 /// use crossbeam_utils::sync::Parker;
117 ///
118 /// let p = Parker::new();
119 ///
120 /// // Waits for the token to become available, but will not wait longer than 500 ms.
121 /// p.park_timeout(Duration::from_millis(500));
122 /// ```
123 pub fn park_timeout(&self, timeout: Duration) {
124 match Instant::now().checked_add(timeout) {
125 Some(deadline) => self.park_deadline(deadline),
126 None => self.park(),
127 }
128 }
129
130 /// Blocks the current thread until the token is made available, or until a certain deadline.
131 ///
132 /// # Examples
133 ///
134 /// ```
135 /// use std::time::{Duration, Instant};
136 /// use crossbeam_utils::sync::Parker;
137 ///
138 /// let p = Parker::new();
139 /// let deadline = Instant::now() + Duration::from_millis(500);
140 ///
141 /// // Waits for the token to become available, but will not wait longer than 500 ms.
142 /// p.park_deadline(deadline);
143 /// ```
144 pub fn park_deadline(&self, deadline: Instant) {
145 self.unparker.inner.park(Some(deadline))
146 }
147
148 /// Returns a reference to an associated [`Unparker`].
149 ///
150 /// The returned [`Unparker`] doesn't have to be used by reference - it can also be cloned.
151 ///
152 /// # Examples
153 ///
154 /// ```
155 /// use crossbeam_utils::sync::Parker;
156 ///
157 /// let p = Parker::new();
158 /// let u = p.unparker().clone();
159 ///
160 /// // Make the token available.
161 /// u.unpark();
162 /// // Wakes up immediately and consumes the token.
163 /// p.park();
164 /// ```
165 ///
166 /// [`park`]: Parker::park
167 /// [`park_timeout`]: Parker::park_timeout
168 pub fn unparker(&self) -> &Unparker {
169 &self.unparker
170 }
171
172 /// Converts a `Parker` into a raw pointer.
173 ///
174 /// # Examples
175 ///
176 /// ```
177 /// use crossbeam_utils::sync::Parker;
178 ///
179 /// let p = Parker::new();
180 /// let raw = Parker::into_raw(p);
181 /// # let _ = unsafe { Parker::from_raw(raw) };
182 /// ```
183 pub fn into_raw(this: Parker) -> *const () {
184 Unparker::into_raw(this.unparker)
185 }
186
187 /// Converts a raw pointer into a `Parker`.
188 ///
189 /// # Safety
190 ///
191 /// This method is safe to use only with pointers returned by [`Parker::into_raw`].
192 ///
193 /// # Examples
194 ///
195 /// ```
196 /// use crossbeam_utils::sync::Parker;
197 ///
198 /// let p = Parker::new();
199 /// let raw = Parker::into_raw(p);
200 /// let p = unsafe { Parker::from_raw(raw) };
201 /// ```
202 pub unsafe fn from_raw(ptr: *const ()) -> Parker {
203 Parker {
204 unparker: Unparker::from_raw(ptr),
205 _marker: PhantomData,
206 }
207 }
208}
209
210impl fmt::Debug for Parker {
211 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212 f.pad("Parker { .. }")
213 }
214}
215
216/// Unparks a thread parked by the associated [`Parker`].
217pub struct Unparker {
218 inner: Arc<Inner>,
219}
220
221unsafe impl Send for Unparker {}
222unsafe impl Sync for Unparker {}
223
224impl Unparker {
225 /// Atomically makes the token available if it is not already.
226 ///
227 /// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is
228 /// any.
229 ///
230 /// # Examples
231 ///
232 /// ```
233 /// use std::thread;
234 /// use std::time::Duration;
235 /// use crossbeam_utils::sync::Parker;
236 ///
237 /// let p = Parker::new();
238 /// let u = p.unparker().clone();
239 ///
240 /// thread::spawn(move || {
241 /// thread::sleep(Duration::from_millis(500));
242 /// u.unpark();
243 /// });
244 ///
245 /// // Wakes up when `u.unpark()` provides the token.
246 /// p.park();
247 /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
248 /// ```
249 ///
250 /// [`park`]: Parker::park
251 /// [`park_timeout`]: Parker::park_timeout
252 pub fn unpark(&self) {
253 self.inner.unpark()
254 }
255
256 /// Converts an `Unparker` into a raw pointer.
257 ///
258 /// # Examples
259 ///
260 /// ```
261 /// use crossbeam_utils::sync::{Parker, Unparker};
262 ///
263 /// let p = Parker::new();
264 /// let u = p.unparker().clone();
265 /// let raw = Unparker::into_raw(u);
266 /// # let _ = unsafe { Unparker::from_raw(raw) };
267 /// ```
268 pub fn into_raw(this: Unparker) -> *const () {
269 Arc::into_raw(this.inner).cast::<()>()
270 }
271
272 /// Converts a raw pointer into an `Unparker`.
273 ///
274 /// # Safety
275 ///
276 /// This method is safe to use only with pointers returned by [`Unparker::into_raw`].
277 ///
278 /// # Examples
279 ///
280 /// ```
281 /// use crossbeam_utils::sync::{Parker, Unparker};
282 ///
283 /// let p = Parker::new();
284 /// let u = p.unparker().clone();
285 ///
286 /// let raw = Unparker::into_raw(u);
287 /// let u = unsafe { Unparker::from_raw(raw) };
288 /// ```
289 pub unsafe fn from_raw(ptr: *const ()) -> Unparker {
290 Unparker {
291 inner: Arc::from_raw(ptr.cast::<Inner>()),
292 }
293 }
294}
295
296impl fmt::Debug for Unparker {
297 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
298 f.pad("Unparker { .. }")
299 }
300}
301
302impl Clone for Unparker {
303 fn clone(&self) -> Unparker {
304 Unparker {
305 inner: self.inner.clone(),
306 }
307 }
308}
309
310const EMPTY: usize = 0;
311const PARKED: usize = 1;
312const NOTIFIED: usize = 2;
313
314struct Inner {
315 state: AtomicUsize,
316 lock: Mutex<()>,
317 cvar: Condvar,
318}
319
320impl Inner {
321 fn park(&self, deadline: Option<Instant>) {
322 // If we were previously notified then we consume this notification and return quickly.
323 if self
324 .state
325 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
326 .is_ok()
327 {
328 return;
329 }
330
331 // If the timeout is zero, then there is no need to actually block.
332 if let Some(deadline) = deadline {
333 if deadline <= Instant::now() {
334 return;
335 }
336 }
337
338 // Otherwise we need to coordinate going to sleep.
339 let mut m = self.lock.lock().unwrap();
340
341 match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
342 Ok(_) => {}
343 // Consume this notification to avoid spurious wakeups in the next park.
344 Err(NOTIFIED) => {
345 // We must read `state` here, even though we know it will be `NOTIFIED`. This is
346 // because `unpark` may have been called again since we read `NOTIFIED` in the
347 // `compare_exchange` above. We must perform an acquire operation that synchronizes
348 // with that `unpark` to observe any writes it made before the call to `unpark`. To
349 // do that we must read from the write it made to `state`.
350 let old = self.state.swap(EMPTY, SeqCst);
351 assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
352 return;
353 }
354 Err(n) => panic!("inconsistent park_timeout state: {}", n),
355 }
356
357 loop {
358 // Block the current thread on the conditional variable.
359 m = match deadline {
360 None => self.cvar.wait(m).unwrap(),
361 Some(deadline) => {
362 let now = Instant::now();
363 if now < deadline {
364 // We could check for a timeout here, in the return value of wait_timeout,
365 // but in the case that a timeout and an unpark arrive simultaneously, we
366 // prefer to report the former.
367 self.cvar.wait_timeout(m, deadline - now).unwrap().0
368 } else {
369 // We've timed out; swap out the state back to empty on our way out
370 match self.state.swap(EMPTY, SeqCst) {
371 NOTIFIED | PARKED => return,
372 n => panic!("inconsistent park_timeout state: {}", n),
373 };
374 }
375 }
376 };
377
378 if self
379 .state
380 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
381 .is_ok()
382 {
383 // got a notification
384 return;
385 }
386
387 // Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught
388 // in the branch above, when we discover the deadline is in the past
389 }
390 }
391
392 pub(crate) fn unpark(&self) {
393 // To ensure the unparked thread will observe any writes we made before this call, we must
394 // perform a release operation that `park` can synchronize with. To do that we must write
395 // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
396 // than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
397 match self.state.swap(NOTIFIED, SeqCst) {
398 EMPTY => return, // no one was waiting
399 NOTIFIED => return, // already unparked
400 PARKED => {} // gotta go wake someone up
401 _ => panic!("inconsistent state in unpark"),
402 }
403
404 // There is a period between when the parked thread sets `state` to `PARKED` (or last
405 // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
406 // If we were to notify during this period it would be ignored and then when the parked
407 // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
408 // stage so we can acquire `lock` to wait until it is ready to receive the notification.
409 //
410 // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
411 // it doesn't get woken only to have to wait for us to release `lock`.
412 drop(self.lock.lock().unwrap());
413 self.cvar.notify_one();
414 }
415}
416