1// Copyright 2016 Amanieu d'Antras
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8use crate::mutex::MutexGuard;
9use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL};
10use crate::{deadlock, util};
11use core::{
12 fmt, ptr,
13 sync::atomic::{AtomicPtr, Ordering},
14};
15use lock_api::RawMutex as RawMutex_;
16use parking_lot_core::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN};
17use std::ops::DerefMut;
18use std::time::{Duration, Instant};
19
20/// A type indicating whether a timed wait on a condition variable returned
21/// due to a time out or not.
22#[derive(Debug, PartialEq, Eq, Copy, Clone)]
23pub struct WaitTimeoutResult(bool);
24
25impl WaitTimeoutResult {
26 /// Returns whether the wait was known to have timed out.
27 #[inline]
28 pub fn timed_out(self) -> bool {
29 self.0
30 }
31}
32
33/// A Condition Variable
34///
35/// Condition variables represent the ability to block a thread such that it
36/// consumes no CPU time while waiting for an event to occur. Condition
37/// variables are typically associated with a boolean predicate (a condition)
38/// and a mutex. The predicate is always verified inside of the mutex before
39/// determining that thread must block.
40///
41/// Note that this module places one additional restriction over the system
42/// condition variables: each condvar can be used with only one mutex at a
43/// time. Any attempt to use multiple mutexes on the same condition variable
44/// simultaneously will result in a runtime panic. However it is possible to
45/// switch to a different mutex if there are no threads currently waiting on
46/// the condition variable.
47///
48/// # Differences from the standard library `Condvar`
49///
50/// - No spurious wakeups: A wait will only return a non-timeout result if it
51/// was woken up by `notify_one` or `notify_all`.
52/// - `Condvar::notify_all` will only wake up a single thread, the rest are
53/// requeued to wait for the `Mutex` to be unlocked by the thread that was
54/// woken up.
55/// - Only requires 1 word of space, whereas the standard library boxes the
56/// `Condvar` due to platform limitations.
57/// - Can be statically constructed.
58/// - Does not require any drop glue when dropped.
59/// - Inline fast path for the uncontended case.
60///
61/// # Examples
62///
63/// ```
64/// use parking_lot::{Mutex, Condvar};
65/// use std::sync::Arc;
66/// use std::thread;
67///
68/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
69/// let pair2 = pair.clone();
70///
71/// // Inside of our lock, spawn a new thread, and then wait for it to start
72/// thread::spawn(move|| {
73/// let &(ref lock, ref cvar) = &*pair2;
74/// let mut started = lock.lock();
75/// *started = true;
76/// cvar.notify_one();
77/// });
78///
79/// // wait for the thread to start up
80/// let &(ref lock, ref cvar) = &*pair;
81/// let mut started = lock.lock();
82/// if !*started {
83/// cvar.wait(&mut started);
84/// }
85/// // Note that we used an if instead of a while loop above. This is only
86/// // possible because parking_lot's Condvar will never spuriously wake up.
87/// // This means that wait() will only return after notify_one or notify_all is
88/// // called.
89/// ```
90pub struct Condvar {
91 state: AtomicPtr<RawMutex>,
92}
93
94impl Condvar {
95 /// Creates a new condition variable which is ready to be waited on and
96 /// notified.
97 #[inline]
98 pub const fn new() -> Condvar {
99 Condvar {
100 state: AtomicPtr::new(ptr::null_mut()),
101 }
102 }
103
104 /// Wakes up one blocked thread on this condvar.
105 ///
106 /// Returns whether a thread was woken up.
107 ///
108 /// If there is a blocked thread on this condition variable, then it will
109 /// be woken up from its call to `wait` or `wait_timeout`. Calls to
110 /// `notify_one` are not buffered in any way.
111 ///
112 /// To wake up all threads, see `notify_all()`.
113 ///
114 /// # Examples
115 ///
116 /// ```
117 /// use parking_lot::Condvar;
118 ///
119 /// let condvar = Condvar::new();
120 ///
121 /// // do something with condvar, share it with other threads
122 ///
123 /// if !condvar.notify_one() {
124 /// println!("Nobody was listening for this.");
125 /// }
126 /// ```
127 #[inline]
128 pub fn notify_one(&self) -> bool {
129 // Nothing to do if there are no waiting threads
130 let state = self.state.load(Ordering::Relaxed);
131 if state.is_null() {
132 return false;
133 }
134
135 self.notify_one_slow(state)
136 }
137
138 #[cold]
139 fn notify_one_slow(&self, mutex: *mut RawMutex) -> bool {
140 // Unpark one thread and requeue the rest onto the mutex
141 let from = self as *const _ as usize;
142 let to = mutex as usize;
143 let validate = || {
144 // Make sure that our atomic state still points to the same
145 // mutex. If not then it means that all threads on the current
146 // mutex were woken up and a new waiting thread switched to a
147 // different mutex. In that case we can get away with doing
148 // nothing.
149 if self.state.load(Ordering::Relaxed) != mutex {
150 return RequeueOp::Abort;
151 }
152
153 // Unpark one thread if the mutex is unlocked, otherwise just
154 // requeue everything to the mutex. This is safe to do here
155 // since unlocking the mutex when the parked bit is set requires
156 // locking the queue. There is the possibility of a race if the
157 // mutex gets locked after we check, but that doesn't matter in
158 // this case.
159 if unsafe { (*mutex).mark_parked_if_locked() } {
160 RequeueOp::RequeueOne
161 } else {
162 RequeueOp::UnparkOne
163 }
164 };
165 let callback = |_op, result: UnparkResult| {
166 // Clear our state if there are no more waiting threads
167 if !result.have_more_threads {
168 self.state.store(ptr::null_mut(), Ordering::Relaxed);
169 }
170 TOKEN_NORMAL
171 };
172 let res = unsafe { parking_lot_core::unpark_requeue(from, to, validate, callback) };
173
174 res.unparked_threads + res.requeued_threads != 0
175 }
176
177 /// Wakes up all blocked threads on this condvar.
178 ///
179 /// Returns the number of threads woken up.
180 ///
181 /// This method will ensure that any current waiters on the condition
182 /// variable are awoken. Calls to `notify_all()` are not buffered in any
183 /// way.
184 ///
185 /// To wake up only one thread, see `notify_one()`.
186 #[inline]
187 pub fn notify_all(&self) -> usize {
188 // Nothing to do if there are no waiting threads
189 let state = self.state.load(Ordering::Relaxed);
190 if state.is_null() {
191 return 0;
192 }
193
194 self.notify_all_slow(state)
195 }
196
197 #[cold]
198 fn notify_all_slow(&self, mutex: *mut RawMutex) -> usize {
199 // Unpark one thread and requeue the rest onto the mutex
200 let from = self as *const _ as usize;
201 let to = mutex as usize;
202 let validate = || {
203 // Make sure that our atomic state still points to the same
204 // mutex. If not then it means that all threads on the current
205 // mutex were woken up and a new waiting thread switched to a
206 // different mutex. In that case we can get away with doing
207 // nothing.
208 if self.state.load(Ordering::Relaxed) != mutex {
209 return RequeueOp::Abort;
210 }
211
212 // Clear our state since we are going to unpark or requeue all
213 // threads.
214 self.state.store(ptr::null_mut(), Ordering::Relaxed);
215
216 // Unpark one thread if the mutex is unlocked, otherwise just
217 // requeue everything to the mutex. This is safe to do here
218 // since unlocking the mutex when the parked bit is set requires
219 // locking the queue. There is the possibility of a race if the
220 // mutex gets locked after we check, but that doesn't matter in
221 // this case.
222 if unsafe { (*mutex).mark_parked_if_locked() } {
223 RequeueOp::RequeueAll
224 } else {
225 RequeueOp::UnparkOneRequeueRest
226 }
227 };
228 let callback = |op, result: UnparkResult| {
229 // If we requeued threads to the mutex, mark it as having
230 // parked threads. The RequeueAll case is already handled above.
231 if op == RequeueOp::UnparkOneRequeueRest && result.requeued_threads != 0 {
232 unsafe { (*mutex).mark_parked() };
233 }
234 TOKEN_NORMAL
235 };
236 let res = unsafe { parking_lot_core::unpark_requeue(from, to, validate, callback) };
237
238 res.unparked_threads + res.requeued_threads
239 }
240
241 /// Blocks the current thread until this condition variable receives a
242 /// notification.
243 ///
244 /// This function will atomically unlock the mutex specified (represented by
245 /// `mutex_guard`) and block the current thread. This means that any calls
246 /// to `notify_*()` which happen logically after the mutex is unlocked are
247 /// candidates to wake this thread up. When this function call returns, the
248 /// lock specified will have been re-acquired.
249 ///
250 /// # Panics
251 ///
252 /// This function will panic if another thread is waiting on the `Condvar`
253 /// with a different `Mutex` object.
254 #[inline]
255 pub fn wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>) {
256 self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, None);
257 }
258
259 /// Waits on this condition variable for a notification, timing out after
260 /// the specified time instant.
261 ///
262 /// The semantics of this function are equivalent to `wait()` except that
263 /// the thread will be blocked roughly until `timeout` is reached. This
264 /// method should not be used for precise timing due to anomalies such as
265 /// preemption or platform differences that may not cause the maximum
266 /// amount of time waited to be precisely `timeout`.
267 ///
268 /// Note that the best effort is made to ensure that the time waited is
269 /// measured with a monotonic clock, and not affected by the changes made to
270 /// the system time.
271 ///
272 /// The returned `WaitTimeoutResult` value indicates if the timeout is
273 /// known to have elapsed.
274 ///
275 /// Like `wait`, the lock specified will be re-acquired when this function
276 /// returns, regardless of whether the timeout elapsed or not.
277 ///
278 /// # Panics
279 ///
280 /// This function will panic if another thread is waiting on the `Condvar`
281 /// with a different `Mutex` object.
282 #[inline]
283 pub fn wait_until<T: ?Sized>(
284 &self,
285 mutex_guard: &mut MutexGuard<'_, T>,
286 timeout: Instant,
287 ) -> WaitTimeoutResult {
288 self.wait_until_internal(
289 unsafe { MutexGuard::mutex(mutex_guard).raw() },
290 Some(timeout),
291 )
292 }
293
294 // This is a non-generic function to reduce the monomorphization cost of
295 // using `wait_until`.
296 fn wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult {
297 let result;
298 let mut bad_mutex = false;
299 let mut requeued = false;
300 {
301 let addr = self as *const _ as usize;
302 let lock_addr = mutex as *const _ as *mut _;
303 let validate = || {
304 // Ensure we don't use two different mutexes with the same
305 // Condvar at the same time. This is done while locked to
306 // avoid races with notify_one
307 let state = self.state.load(Ordering::Relaxed);
308 if state.is_null() {
309 self.state.store(lock_addr, Ordering::Relaxed);
310 } else if state != lock_addr {
311 bad_mutex = true;
312 return false;
313 }
314 true
315 };
316 let before_sleep = || {
317 // Unlock the mutex before sleeping...
318 unsafe { mutex.unlock() };
319 };
320 let timed_out = |k, was_last_thread| {
321 // If we were requeued to a mutex, then we did not time out.
322 // We'll just park ourselves on the mutex again when we try
323 // to lock it later.
324 requeued = k != addr;
325
326 // If we were the last thread on the queue then we need to
327 // clear our state. This is normally done by the
328 // notify_{one,all} functions when not timing out.
329 if !requeued && was_last_thread {
330 self.state.store(ptr::null_mut(), Ordering::Relaxed);
331 }
332 };
333 result = unsafe { parking_lot_core::park(
334 addr,
335 validate,
336 before_sleep,
337 timed_out,
338 DEFAULT_PARK_TOKEN,
339 timeout,
340 ) };
341 }
342
343 // Panic if we tried to use multiple mutexes with a Condvar. Note
344 // that at this point the MutexGuard is still locked. It will be
345 // unlocked by the unwinding logic.
346 if bad_mutex {
347 panic!("attempted to use a condition variable with more than one mutex");
348 }
349
350 // ... and re-lock it once we are done sleeping
351 if result == ParkResult::Unparked(TOKEN_HANDOFF) {
352 unsafe { deadlock::acquire_resource(mutex as *const _ as usize) };
353 } else {
354 mutex.lock();
355 }
356
357 WaitTimeoutResult(!(result.is_unparked() || requeued))
358 }
359
360 /// Waits on this condition variable for a notification, timing out after a
361 /// specified duration.
362 ///
363 /// The semantics of this function are equivalent to `wait()` except that
364 /// the thread will be blocked for roughly no longer than `timeout`. This
365 /// method should not be used for precise timing due to anomalies such as
366 /// preemption or platform differences that may not cause the maximum
367 /// amount of time waited to be precisely `timeout`.
368 ///
369 /// Note that the best effort is made to ensure that the time waited is
370 /// measured with a monotonic clock, and not affected by the changes made to
371 /// the system time.
372 ///
373 /// The returned `WaitTimeoutResult` value indicates if the timeout is
374 /// known to have elapsed.
375 ///
376 /// Like `wait`, the lock specified will be re-acquired when this function
377 /// returns, regardless of whether the timeout elapsed or not.
378 #[inline]
379 pub fn wait_for<T: ?Sized>(
380 &self,
381 mutex_guard: &mut MutexGuard<'_, T>,
382 timeout: Duration,
383 ) -> WaitTimeoutResult {
384 let deadline = util::to_deadline(timeout);
385 self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, deadline)
386 }
387
388 #[inline]
389 fn wait_while_until_internal<T, F>(
390 &self,
391 mutex_guard: &mut MutexGuard<'_, T>,
392 mut condition: F,
393 timeout: Option<Instant>,
394 ) -> WaitTimeoutResult
395 where
396 T: ?Sized,
397 F: FnMut(&mut T) -> bool,
398 {
399 let mut result = WaitTimeoutResult(false);
400
401 while !result.timed_out() && condition(mutex_guard.deref_mut()) {
402 result =
403 self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, timeout);
404 }
405
406 result
407 }
408 /// Blocks the current thread until this condition variable receives a
409 /// notification. If the provided condition evaluates to `false`, then the
410 /// thread is no longer blocked and the operation is completed. If the
411 /// condition evaluates to `true`, then the thread is blocked again and
412 /// waits for another notification before repeating this process.
413 ///
414 /// This function will atomically unlock the mutex specified (represented by
415 /// `mutex_guard`) and block the current thread. This means that any calls
416 /// to `notify_*()` which happen logically after the mutex is unlocked are
417 /// candidates to wake this thread up. When this function call returns, the
418 /// lock specified will have been re-acquired.
419 ///
420 /// # Panics
421 ///
422 /// This function will panic if another thread is waiting on the `Condvar`
423 /// with a different `Mutex` object.
424 #[inline]
425 pub fn wait_while<T, F>(&self, mutex_guard: &mut MutexGuard<'_, T>, condition: F)
426 where
427 T: ?Sized,
428 F: FnMut(&mut T) -> bool,
429 {
430 self.wait_while_until_internal(mutex_guard, condition, None);
431 }
432
433 /// Waits on this condition variable for a notification, timing out after
434 /// the specified time instant. If the provided condition evaluates to
435 /// `false`, then the thread is no longer blocked and the operation is
436 /// completed. If the condition evaluates to `true`, then the thread is
437 /// blocked again and waits for another notification before repeating
438 /// this process.
439 ///
440 /// The semantics of this function are equivalent to `wait()` except that
441 /// the thread will be blocked roughly until `timeout` is reached. This
442 /// method should not be used for precise timing due to anomalies such as
443 /// preemption or platform differences that may not cause the maximum
444 /// amount of time waited to be precisely `timeout`.
445 ///
446 /// Note that the best effort is made to ensure that the time waited is
447 /// measured with a monotonic clock, and not affected by the changes made to
448 /// the system time.
449 ///
450 /// The returned `WaitTimeoutResult` value indicates if the timeout is
451 /// known to have elapsed.
452 ///
453 /// Like `wait`, the lock specified will be re-acquired when this function
454 /// returns, regardless of whether the timeout elapsed or not.
455 ///
456 /// # Panics
457 ///
458 /// This function will panic if another thread is waiting on the `Condvar`
459 /// with a different `Mutex` object.
460 #[inline]
461 pub fn wait_while_until<T, F>(
462 &self,
463 mutex_guard: &mut MutexGuard<'_, T>,
464 condition: F,
465 timeout: Instant,
466 ) -> WaitTimeoutResult
467 where
468 T: ?Sized,
469 F: FnMut(&mut T) -> bool,
470 {
471 self.wait_while_until_internal(mutex_guard, condition, Some(timeout))
472 }
473
474 /// Waits on this condition variable for a notification, timing out after a
475 /// specified duration. If the provided condition evaluates to `false`,
476 /// then the thread is no longer blocked and the operation is completed.
477 /// If the condition evaluates to `true`, then the thread is blocked again
478 /// and waits for another notification before repeating this process.
479 ///
480 /// The semantics of this function are equivalent to `wait()` except that
481 /// the thread will be blocked for roughly no longer than `timeout`. This
482 /// method should not be used for precise timing due to anomalies such as
483 /// preemption or platform differences that may not cause the maximum
484 /// amount of time waited to be precisely `timeout`.
485 ///
486 /// Note that the best effort is made to ensure that the time waited is
487 /// measured with a monotonic clock, and not affected by the changes made to
488 /// the system time.
489 ///
490 /// The returned `WaitTimeoutResult` value indicates if the timeout is
491 /// known to have elapsed.
492 ///
493 /// Like `wait`, the lock specified will be re-acquired when this function
494 /// returns, regardless of whether the timeout elapsed or not.
495 #[inline]
496 pub fn wait_while_for<T: ?Sized, F>(
497 &self,
498 mutex_guard: &mut MutexGuard<'_, T>,
499 condition: F,
500 timeout: Duration,
501 ) -> WaitTimeoutResult
502 where
503 F: FnMut(&mut T) -> bool,
504 {
505 let deadline = util::to_deadline(timeout);
506 self.wait_while_until_internal(mutex_guard, condition, deadline)
507 }
508}
509
510impl Default for Condvar {
511 #[inline]
512 fn default() -> Condvar {
513 Condvar::new()
514 }
515}
516
517impl fmt::Debug for Condvar {
518 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
519 f.pad("Condvar { .. }")
520 }
521}
522
523#[cfg(test)]
524mod tests {
525 use crate::{Condvar, Mutex, MutexGuard};
526 use std::sync::mpsc::channel;
527 use std::sync::Arc;
528 use std::thread;
529 use std::thread::sleep;
530 use std::thread::JoinHandle;
531 use std::time::Duration;
532 use std::time::Instant;
533
534 #[test]
535 fn smoke() {
536 let c = Condvar::new();
537 c.notify_one();
538 c.notify_all();
539 }
540
541 #[test]
542 fn notify_one() {
543 let m = Arc::new(Mutex::new(()));
544 let m2 = m.clone();
545 let c = Arc::new(Condvar::new());
546 let c2 = c.clone();
547
548 let mut g = m.lock();
549 let _t = thread::spawn(move || {
550 let _g = m2.lock();
551 c2.notify_one();
552 });
553 c.wait(&mut g);
554 }
555
556 #[test]
557 fn notify_all() {
558 const N: usize = 10;
559
560 let data = Arc::new((Mutex::new(0), Condvar::new()));
561 let (tx, rx) = channel();
562 for _ in 0..N {
563 let data = data.clone();
564 let tx = tx.clone();
565 thread::spawn(move || {
566 let &(ref lock, ref cond) = &*data;
567 let mut cnt = lock.lock();
568 *cnt += 1;
569 if *cnt == N {
570 tx.send(()).unwrap();
571 }
572 while *cnt != 0 {
573 cond.wait(&mut cnt);
574 }
575 tx.send(()).unwrap();
576 });
577 }
578 drop(tx);
579
580 let &(ref lock, ref cond) = &*data;
581 rx.recv().unwrap();
582 let mut cnt = lock.lock();
583 *cnt = 0;
584 cond.notify_all();
585 drop(cnt);
586
587 for _ in 0..N {
588 rx.recv().unwrap();
589 }
590 }
591
592 #[test]
593 fn notify_one_return_true() {
594 let m = Arc::new(Mutex::new(()));
595 let m2 = m.clone();
596 let c = Arc::new(Condvar::new());
597 let c2 = c.clone();
598
599 let mut g = m.lock();
600 let _t = thread::spawn(move || {
601 let _g = m2.lock();
602 assert!(c2.notify_one());
603 });
604 c.wait(&mut g);
605 }
606
607 #[test]
608 fn notify_one_return_false() {
609 let m = Arc::new(Mutex::new(()));
610 let c = Arc::new(Condvar::new());
611
612 let _t = thread::spawn(move || {
613 let _g = m.lock();
614 assert!(!c.notify_one());
615 });
616 }
617
618 #[test]
619 fn notify_all_return() {
620 const N: usize = 10;
621
622 let data = Arc::new((Mutex::new(0), Condvar::new()));
623 let (tx, rx) = channel();
624 for _ in 0..N {
625 let data = data.clone();
626 let tx = tx.clone();
627 thread::spawn(move || {
628 let &(ref lock, ref cond) = &*data;
629 let mut cnt = lock.lock();
630 *cnt += 1;
631 if *cnt == N {
632 tx.send(()).unwrap();
633 }
634 while *cnt != 0 {
635 cond.wait(&mut cnt);
636 }
637 tx.send(()).unwrap();
638 });
639 }
640 drop(tx);
641
642 let &(ref lock, ref cond) = &*data;
643 rx.recv().unwrap();
644 let mut cnt = lock.lock();
645 *cnt = 0;
646 assert_eq!(cond.notify_all(), N);
647 drop(cnt);
648
649 for _ in 0..N {
650 rx.recv().unwrap();
651 }
652
653 assert_eq!(cond.notify_all(), 0);
654 }
655
656 #[test]
657 fn wait_for() {
658 let m = Arc::new(Mutex::new(()));
659 let m2 = m.clone();
660 let c = Arc::new(Condvar::new());
661 let c2 = c.clone();
662
663 let mut g = m.lock();
664 let no_timeout = c.wait_for(&mut g, Duration::from_millis(1));
665 assert!(no_timeout.timed_out());
666
667 let _t = thread::spawn(move || {
668 let _g = m2.lock();
669 c2.notify_one();
670 });
671 let timeout_res = c.wait_for(&mut g, Duration::from_secs(u64::max_value()));
672 assert!(!timeout_res.timed_out());
673
674 drop(g);
675 }
676
677 #[test]
678 fn wait_until() {
679 let m = Arc::new(Mutex::new(()));
680 let m2 = m.clone();
681 let c = Arc::new(Condvar::new());
682 let c2 = c.clone();
683
684 let mut g = m.lock();
685 let no_timeout = c.wait_until(&mut g, Instant::now() + Duration::from_millis(1));
686 assert!(no_timeout.timed_out());
687 let _t = thread::spawn(move || {
688 let _g = m2.lock();
689 c2.notify_one();
690 });
691 let timeout_res = c.wait_until(
692 &mut g,
693 Instant::now() + Duration::from_millis(u32::max_value() as u64),
694 );
695 assert!(!timeout_res.timed_out());
696 drop(g);
697 }
698
699 fn spawn_wait_while_notifier(
700 mutex: Arc<Mutex<u32>>,
701 cv: Arc<Condvar>,
702 num_iters: u32,
703 timeout: Option<Instant>,
704 ) -> JoinHandle<()> {
705 thread::spawn(move || {
706 for epoch in 1..=num_iters {
707 // spin to wait for main test thread to block
708 // before notifying it to wake back up and check
709 // its condition.
710 let mut sleep_backoff = Duration::from_millis(1);
711 let _mutex_guard = loop {
712 let mutex_guard = mutex.lock();
713
714 if let Some(timeout) = timeout {
715 if Instant::now() >= timeout {
716 return;
717 }
718 }
719
720 if *mutex_guard == epoch {
721 break mutex_guard;
722 }
723
724 drop(mutex_guard);
725
726 // give main test thread a good chance to
727 // acquire the lock before this thread does.
728 sleep(sleep_backoff);
729 sleep_backoff *= 2;
730 };
731
732 cv.notify_one();
733 }
734 })
735 }
736
737 #[test]
738 fn wait_while_until_internal_does_not_wait_if_initially_false() {
739 let mutex = Arc::new(Mutex::new(0));
740 let cv = Arc::new(Condvar::new());
741
742 let condition = |counter: &mut u32| {
743 *counter += 1;
744 false
745 };
746
747 let mut mutex_guard = mutex.lock();
748 let timeout_result = cv
749 .wait_while_until_internal(&mut mutex_guard, condition, None);
750
751 assert!(!timeout_result.timed_out());
752 assert!(*mutex_guard == 1);
753 }
754
755 #[test]
756 fn wait_while_until_internal_times_out_before_false() {
757 let mutex = Arc::new(Mutex::new(0));
758 let cv = Arc::new(Condvar::new());
759
760 let num_iters = 3;
761 let condition = |counter: &mut u32| {
762 *counter += 1;
763 true
764 };
765
766 let mut mutex_guard = mutex.lock();
767 let timeout = Some(Instant::now() + Duration::from_millis(500));
768 let handle = spawn_wait_while_notifier(mutex.clone(), cv.clone(), num_iters, timeout);
769
770 let timeout_result =
771 cv.wait_while_until_internal(&mut mutex_guard, condition, timeout);
772
773 assert!(timeout_result.timed_out());
774 assert!(*mutex_guard == num_iters + 1);
775
776 // prevent deadlock with notifier
777 drop(mutex_guard);
778 handle.join().unwrap();
779 }
780
781 #[test]
782 fn wait_while_until_internal() {
783 let mutex = Arc::new(Mutex::new(0));
784 let cv = Arc::new(Condvar::new());
785
786 let num_iters = 4;
787
788 let condition = |counter: &mut u32| {
789 *counter += 1;
790 *counter <= num_iters
791 };
792
793 let mut mutex_guard = mutex.lock();
794 let handle = spawn_wait_while_notifier(mutex.clone(), cv.clone(), num_iters, None);
795
796 let timeout_result =
797 cv.wait_while_until_internal(&mut mutex_guard, condition, None);
798
799 assert!(!timeout_result.timed_out());
800 assert!(*mutex_guard == num_iters + 1);
801
802 let timeout_result = cv.wait_while_until_internal(&mut mutex_guard, condition, None);
803 handle.join().unwrap();
804
805 assert!(!timeout_result.timed_out());
806 assert!(*mutex_guard == num_iters + 2);
807 }
808
809 #[test]
810 #[should_panic]
811 fn two_mutexes() {
812 let m = Arc::new(Mutex::new(()));
813 let m2 = m.clone();
814 let m3 = Arc::new(Mutex::new(()));
815 let c = Arc::new(Condvar::new());
816 let c2 = c.clone();
817
818 // Make sure we don't leave the child thread dangling
819 struct PanicGuard<'a>(&'a Condvar);
820 impl<'a> Drop for PanicGuard<'a> {
821 fn drop(&mut self) {
822 self.0.notify_one();
823 }
824 }
825
826 let (tx, rx) = channel();
827 let g = m.lock();
828 let _t = thread::spawn(move || {
829 let mut g = m2.lock();
830 tx.send(()).unwrap();
831 c2.wait(&mut g);
832 });
833 drop(g);
834 rx.recv().unwrap();
835 let _g = m.lock();
836 let _guard = PanicGuard(&*c);
837 c.wait(&mut m3.lock());
838 }
839
840 #[test]
841 fn two_mutexes_disjoint() {
842 let m = Arc::new(Mutex::new(()));
843 let m2 = m.clone();
844 let m3 = Arc::new(Mutex::new(()));
845 let c = Arc::new(Condvar::new());
846 let c2 = c.clone();
847
848 let mut g = m.lock();
849 let _t = thread::spawn(move || {
850 let _g = m2.lock();
851 c2.notify_one();
852 });
853 c.wait(&mut g);
854 drop(g);
855
856 let _ = c.wait_for(&mut m3.lock(), Duration::from_millis(1));
857 }
858
859 #[test]
860 fn test_debug_condvar() {
861 let c = Condvar::new();
862 assert_eq!(format!("{:?}", c), "Condvar { .. }");
863 }
864
865 #[test]
866 fn test_condvar_requeue() {
867 let m = Arc::new(Mutex::new(()));
868 let m2 = m.clone();
869 let c = Arc::new(Condvar::new());
870 let c2 = c.clone();
871 let t = thread::spawn(move || {
872 let mut g = m2.lock();
873 c2.wait(&mut g);
874 });
875
876 let mut g = m.lock();
877 while !c.notify_one() {
878 // Wait for the thread to get into wait()
879 MutexGuard::bump(&mut g);
880 // Yield, so the other thread gets a chance to do something.
881 // (At least Miri needs this, because it doesn't preempt threads.)
882 thread::yield_now();
883 }
884 // The thread should have been requeued to the mutex, which we wake up now.
885 drop(g);
886 t.join().unwrap();
887 }
888
889 #[test]
890 fn test_issue_129() {
891 let locks = Arc::new((Mutex::new(()), Condvar::new()));
892
893 let (tx, rx) = channel();
894 for _ in 0..4 {
895 let locks = locks.clone();
896 let tx = tx.clone();
897 thread::spawn(move || {
898 let mut guard = locks.0.lock();
899 locks.1.wait(&mut guard);
900 locks.1.wait_for(&mut guard, Duration::from_millis(1));
901 locks.1.notify_one();
902 tx.send(()).unwrap();
903 });
904 }
905
906 thread::sleep(Duration::from_millis(100));
907 locks.1.notify_one();
908
909 for _ in 0..4 {
910 assert_eq!(rx.recv_timeout(Duration::from_millis(500)), Ok(()));
911 }
912 }
913}
914
915/// This module contains an integration test that is heavily inspired from WebKit's own integration
916/// tests for it's own Condvar.
917#[cfg(test)]
918mod webkit_queue_test {
919 use crate::{Condvar, Mutex, MutexGuard};
920 use std::{collections::VecDeque, sync::Arc, thread, time::Duration};
921
922 #[derive(Clone, Copy)]
923 enum Timeout {
924 Bounded(Duration),
925 Forever,
926 }
927
928 #[derive(Clone, Copy)]
929 enum NotifyStyle {
930 One,
931 All,
932 }
933
934 struct Queue {
935 items: VecDeque<usize>,
936 should_continue: bool,
937 }
938
939 impl Queue {
940 fn new() -> Self {
941 Self {
942 items: VecDeque::new(),
943 should_continue: true,
944 }
945 }
946 }
947
948 fn wait<T: ?Sized>(
949 condition: &Condvar,
950 lock: &mut MutexGuard<'_, T>,
951 predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool,
952 timeout: &Timeout,
953 ) {
954 while !predicate(lock) {
955 match timeout {
956 Timeout::Forever => condition.wait(lock),
957 Timeout::Bounded(bound) => {
958 condition.wait_for(lock, *bound);
959 }
960 }
961 }
962 }
963
964 fn notify(style: NotifyStyle, condition: &Condvar, should_notify: bool) {
965 match style {
966 NotifyStyle::One => {
967 condition.notify_one();
968 }
969 NotifyStyle::All => {
970 if should_notify {
971 condition.notify_all();
972 }
973 }
974 }
975 }
976
977 fn run_queue_test(
978 num_producers: usize,
979 num_consumers: usize,
980 max_queue_size: usize,
981 messages_per_producer: usize,
982 notify_style: NotifyStyle,
983 timeout: Timeout,
984 delay: Duration,
985 ) {
986 let input_queue = Arc::new(Mutex::new(Queue::new()));
987 let empty_condition = Arc::new(Condvar::new());
988 let full_condition = Arc::new(Condvar::new());
989
990 let output_vec = Arc::new(Mutex::new(vec![]));
991
992 let consumers = (0..num_consumers)
993 .map(|_| {
994 consumer_thread(
995 input_queue.clone(),
996 empty_condition.clone(),
997 full_condition.clone(),
998 timeout,
999 notify_style,
1000 output_vec.clone(),
1001 max_queue_size,
1002 )
1003 })
1004 .collect::<Vec<_>>();
1005 let producers = (0..num_producers)
1006 .map(|_| {
1007 producer_thread(
1008 messages_per_producer,
1009 input_queue.clone(),
1010 empty_condition.clone(),
1011 full_condition.clone(),
1012 timeout,
1013 notify_style,
1014 max_queue_size,
1015 )
1016 })
1017 .collect::<Vec<_>>();
1018
1019 thread::sleep(delay);
1020
1021 for producer in producers.into_iter() {
1022 producer.join().expect("Producer thread panicked");
1023 }
1024
1025 {
1026 let mut input_queue = input_queue.lock();
1027 input_queue.should_continue = false;
1028 }
1029 empty_condition.notify_all();
1030
1031 for consumer in consumers.into_iter() {
1032 consumer.join().expect("Consumer thread panicked");
1033 }
1034
1035 let mut output_vec = output_vec.lock();
1036 assert_eq!(output_vec.len(), num_producers * messages_per_producer);
1037 output_vec.sort();
1038 for msg_idx in 0..messages_per_producer {
1039 for producer_idx in 0..num_producers {
1040 assert_eq!(msg_idx, output_vec[msg_idx * num_producers + producer_idx]);
1041 }
1042 }
1043 }
1044
1045 fn consumer_thread(
1046 input_queue: Arc<Mutex<Queue>>,
1047 empty_condition: Arc<Condvar>,
1048 full_condition: Arc<Condvar>,
1049 timeout: Timeout,
1050 notify_style: NotifyStyle,
1051 output_queue: Arc<Mutex<Vec<usize>>>,
1052 max_queue_size: usize,
1053 ) -> thread::JoinHandle<()> {
1054 thread::spawn(move || loop {
1055 let (should_notify, result) = {
1056 let mut queue = input_queue.lock();
1057 wait(
1058 &*empty_condition,
1059 &mut queue,
1060 |state| -> bool { !state.items.is_empty() || !state.should_continue },
1061 &timeout,
1062 );
1063 if queue.items.is_empty() && !queue.should_continue {
1064 return;
1065 }
1066 let should_notify = queue.items.len() == max_queue_size;
1067 let result = queue.items.pop_front();
1068 std::mem::drop(queue);
1069 (should_notify, result)
1070 };
1071 notify(notify_style, &*full_condition, should_notify);
1072
1073 if let Some(result) = result {
1074 output_queue.lock().push(result);
1075 }
1076 })
1077 }
1078
1079 fn producer_thread(
1080 num_messages: usize,
1081 queue: Arc<Mutex<Queue>>,
1082 empty_condition: Arc<Condvar>,
1083 full_condition: Arc<Condvar>,
1084 timeout: Timeout,
1085 notify_style: NotifyStyle,
1086 max_queue_size: usize,
1087 ) -> thread::JoinHandle<()> {
1088 thread::spawn(move || {
1089 for message in 0..num_messages {
1090 let should_notify = {
1091 let mut queue = queue.lock();
1092 wait(
1093 &*full_condition,
1094 &mut queue,
1095 |state| state.items.len() < max_queue_size,
1096 &timeout,
1097 );
1098 let should_notify = queue.items.is_empty();
1099 queue.items.push_back(message);
1100 std::mem::drop(queue);
1101 should_notify
1102 };
1103 notify(notify_style, &*empty_condition, should_notify);
1104 }
1105 })
1106 }
1107
1108 macro_rules! run_queue_tests {
1109 ( $( $name:ident(
1110 num_producers: $num_producers:expr,
1111 num_consumers: $num_consumers:expr,
1112 max_queue_size: $max_queue_size:expr,
1113 messages_per_producer: $messages_per_producer:expr,
1114 notification_style: $notification_style:expr,
1115 timeout: $timeout:expr,
1116 delay_seconds: $delay_seconds:expr);
1117 )* ) => {
1118 $(#[test]
1119 fn $name() {
1120 let delay = Duration::from_secs($delay_seconds);
1121 run_queue_test(
1122 $num_producers,
1123 $num_consumers,
1124 $max_queue_size,
1125 $messages_per_producer,
1126 $notification_style,
1127 $timeout,
1128 delay,
1129 );
1130 })*
1131 };
1132 }
1133
1134 run_queue_tests! {
1135 sanity_check_queue(
1136 num_producers: 1,
1137 num_consumers: 1,
1138 max_queue_size: 1,
1139 messages_per_producer: 100_000,
1140 notification_style: NotifyStyle::All,
1141 timeout: Timeout::Bounded(Duration::from_secs(1)),
1142 delay_seconds: 0
1143 );
1144 sanity_check_queue_timeout(
1145 num_producers: 1,
1146 num_consumers: 1,
1147 max_queue_size: 1,
1148 messages_per_producer: 100_000,
1149 notification_style: NotifyStyle::All,
1150 timeout: Timeout::Forever,
1151 delay_seconds: 0
1152 );
1153 new_test_without_timeout_5(
1154 num_producers: 1,
1155 num_consumers: 5,
1156 max_queue_size: 1,
1157 messages_per_producer: 100_000,
1158 notification_style: NotifyStyle::All,
1159 timeout: Timeout::Forever,
1160 delay_seconds: 0
1161 );
1162 one_producer_one_consumer_one_slot(
1163 num_producers: 1,
1164 num_consumers: 1,
1165 max_queue_size: 1,
1166 messages_per_producer: 100_000,
1167 notification_style: NotifyStyle::All,
1168 timeout: Timeout::Forever,
1169 delay_seconds: 0
1170 );
1171 one_producer_one_consumer_one_slot_timeout(
1172 num_producers: 1,
1173 num_consumers: 1,
1174 max_queue_size: 1,
1175 messages_per_producer: 100_000,
1176 notification_style: NotifyStyle::All,
1177 timeout: Timeout::Forever,
1178 delay_seconds: 1
1179 );
1180 one_producer_one_consumer_hundred_slots(
1181 num_producers: 1,
1182 num_consumers: 1,
1183 max_queue_size: 100,
1184 messages_per_producer: 1_000_000,
1185 notification_style: NotifyStyle::All,
1186 timeout: Timeout::Forever,
1187 delay_seconds: 0
1188 );
1189 ten_producers_one_consumer_one_slot(
1190 num_producers: 10,
1191 num_consumers: 1,
1192 max_queue_size: 1,
1193 messages_per_producer: 10000,
1194 notification_style: NotifyStyle::All,
1195 timeout: Timeout::Forever,
1196 delay_seconds: 0
1197 );
1198 ten_producers_one_consumer_hundred_slots_notify_all(
1199 num_producers: 10,
1200 num_consumers: 1,
1201 max_queue_size: 100,
1202 messages_per_producer: 10000,
1203 notification_style: NotifyStyle::All,
1204 timeout: Timeout::Forever,
1205 delay_seconds: 0
1206 );
1207 ten_producers_one_consumer_hundred_slots_notify_one(
1208 num_producers: 10,
1209 num_consumers: 1,
1210 max_queue_size: 100,
1211 messages_per_producer: 10000,
1212 notification_style: NotifyStyle::One,
1213 timeout: Timeout::Forever,
1214 delay_seconds: 0
1215 );
1216 one_producer_ten_consumers_one_slot(
1217 num_producers: 1,
1218 num_consumers: 10,
1219 max_queue_size: 1,
1220 messages_per_producer: 10000,
1221 notification_style: NotifyStyle::All,
1222 timeout: Timeout::Forever,
1223 delay_seconds: 0
1224 );
1225 one_producer_ten_consumers_hundred_slots_notify_all(
1226 num_producers: 1,
1227 num_consumers: 10,
1228 max_queue_size: 100,
1229 messages_per_producer: 100_000,
1230 notification_style: NotifyStyle::All,
1231 timeout: Timeout::Forever,
1232 delay_seconds: 0
1233 );
1234 one_producer_ten_consumers_hundred_slots_notify_one(
1235 num_producers: 1,
1236 num_consumers: 10,
1237 max_queue_size: 100,
1238 messages_per_producer: 100_000,
1239 notification_style: NotifyStyle::One,
1240 timeout: Timeout::Forever,
1241 delay_seconds: 0
1242 );
1243 ten_producers_ten_consumers_one_slot(
1244 num_producers: 10,
1245 num_consumers: 10,
1246 max_queue_size: 1,
1247 messages_per_producer: 50000,
1248 notification_style: NotifyStyle::All,
1249 timeout: Timeout::Forever,
1250 delay_seconds: 0
1251 );
1252 ten_producers_ten_consumers_hundred_slots_notify_all(
1253 num_producers: 10,
1254 num_consumers: 10,
1255 max_queue_size: 100,
1256 messages_per_producer: 50000,
1257 notification_style: NotifyStyle::All,
1258 timeout: Timeout::Forever,
1259 delay_seconds: 0
1260 );
1261 ten_producers_ten_consumers_hundred_slots_notify_one(
1262 num_producers: 10,
1263 num_consumers: 10,
1264 max_queue_size: 100,
1265 messages_per_producer: 50000,
1266 notification_style: NotifyStyle::One,
1267 timeout: Timeout::Forever,
1268 delay_seconds: 0
1269 );
1270 }
1271}
1272