1// Copyright 2016 Amanieu d'Antras
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8use crate::mutex::MutexGuard;
9use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL};
10use crate::{deadlock, util};
11use core::{
12 fmt, ptr,
13 sync::atomic::{AtomicPtr, Ordering},
14};
15use lock_api::RawMutex as RawMutex_;
16use parking_lot_core::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN};
17use std::ops::DerefMut;
18use std::time::{Duration, Instant};
19
20/// A type indicating whether a timed wait on a condition variable returned
21/// due to a time out or not.
22#[derive(Debug, PartialEq, Eq, Copy, Clone)]
23pub struct WaitTimeoutResult(bool);
24
25impl WaitTimeoutResult {
26 /// Returns whether the wait was known to have timed out.
27 #[inline]
28 pub fn timed_out(self) -> bool {
29 self.0
30 }
31}
32
33/// A Condition Variable
34///
35/// Condition variables represent the ability to block a thread such that it
36/// consumes no CPU time while waiting for an event to occur. Condition
37/// variables are typically associated with a boolean predicate (a condition)
38/// and a mutex. The predicate is always verified inside of the mutex before
39/// determining that thread must block.
40///
41/// Note that this module places one additional restriction over the system
42/// condition variables: each condvar can be used with only one mutex at a
43/// time. Any attempt to use multiple mutexes on the same condition variable
44/// simultaneously will result in a runtime panic. However it is possible to
45/// switch to a different mutex if there are no threads currently waiting on
46/// the condition variable.
47///
48/// # Differences from the standard library `Condvar`
49///
50/// - No spurious wakeups: A wait will only return a non-timeout result if it
51/// was woken up by `notify_one` or `notify_all`.
52/// - `Condvar::notify_all` will only wake up a single thread, the rest are
53/// requeued to wait for the `Mutex` to be unlocked by the thread that was
54/// woken up.
55/// - Only requires 1 word of space, whereas the standard library boxes the
56/// `Condvar` due to platform limitations.
57/// - Can be statically constructed.
58/// - Does not require any drop glue when dropped.
59/// - Inline fast path for the uncontended case.
60///
61/// # Examples
62///
63/// ```
64/// use parking_lot::{Mutex, Condvar};
65/// use std::sync::Arc;
66/// use std::thread;
67///
68/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
69/// let pair2 = pair.clone();
70///
71/// // Inside of our lock, spawn a new thread, and then wait for it to start
72/// thread::spawn(move|| {
73/// let &(ref lock, ref cvar) = &*pair2;
74/// let mut started = lock.lock();
75/// *started = true;
76/// cvar.notify_one();
77/// });
78///
79/// // wait for the thread to start up
80/// let &(ref lock, ref cvar) = &*pair;
81/// let mut started = lock.lock();
82/// if !*started {
83/// cvar.wait(&mut started);
84/// }
85/// // Note that we used an if instead of a while loop above. This is only
86/// // possible because parking_lot's Condvar will never spuriously wake up.
87/// // This means that wait() will only return after notify_one or notify_all is
88/// // called.
89/// ```
90pub struct Condvar {
91 state: AtomicPtr<RawMutex>,
92}
93
94impl Condvar {
95 /// Creates a new condition variable which is ready to be waited on and
96 /// notified.
97 #[inline]
98 pub const fn new() -> Condvar {
99 Condvar {
100 state: AtomicPtr::new(ptr::null_mut()),
101 }
102 }
103
104 /// Wakes up one blocked thread on this condvar.
105 ///
106 /// Returns whether a thread was woken up.
107 ///
108 /// If there is a blocked thread on this condition variable, then it will
109 /// be woken up from its call to `wait` or `wait_timeout`. Calls to
110 /// `notify_one` are not buffered in any way.
111 ///
112 /// To wake up all threads, see `notify_all()`.
113 ///
114 /// # Examples
115 ///
116 /// ```
117 /// use parking_lot::Condvar;
118 ///
119 /// let condvar = Condvar::new();
120 ///
121 /// // do something with condvar, share it with other threads
122 ///
123 /// if !condvar.notify_one() {
124 /// println!("Nobody was listening for this.");
125 /// }
126 /// ```
127 #[inline]
128 pub fn notify_one(&self) -> bool {
129 // Nothing to do if there are no waiting threads
130 let state = self.state.load(Ordering::Relaxed);
131 if state.is_null() {
132 return false;
133 }
134
135 self.notify_one_slow(state)
136 }
137
138 #[cold]
139 fn notify_one_slow(&self, mutex: *mut RawMutex) -> bool {
140 // Unpark one thread and requeue the rest onto the mutex
141 let from = self as *const _ as usize;
142 let to = mutex as usize;
143 let validate = || {
144 // Make sure that our atomic state still points to the same
145 // mutex. If not then it means that all threads on the current
146 // mutex were woken up and a new waiting thread switched to a
147 // different mutex. In that case we can get away with doing
148 // nothing.
149 if self.state.load(Ordering::Relaxed) != mutex {
150 return RequeueOp::Abort;
151 }
152
153 // Unpark one thread if the mutex is unlocked, otherwise just
154 // requeue everything to the mutex. This is safe to do here
155 // since unlocking the mutex when the parked bit is set requires
156 // locking the queue. There is the possibility of a race if the
157 // mutex gets locked after we check, but that doesn't matter in
158 // this case.
159 if unsafe { (*mutex).mark_parked_if_locked() } {
160 RequeueOp::RequeueOne
161 } else {
162 RequeueOp::UnparkOne
163 }
164 };
165 let callback = |_op, result: UnparkResult| {
166 // Clear our state if there are no more waiting threads
167 if !result.have_more_threads {
168 self.state.store(ptr::null_mut(), Ordering::Relaxed);
169 }
170 TOKEN_NORMAL
171 };
172 let res = unsafe { parking_lot_core::unpark_requeue(from, to, validate, callback) };
173
174 res.unparked_threads + res.requeued_threads != 0
175 }
176
177 /// Wakes up all blocked threads on this condvar.
178 ///
179 /// Returns the number of threads woken up.
180 ///
181 /// This method will ensure that any current waiters on the condition
182 /// variable are awoken. Calls to `notify_all()` are not buffered in any
183 /// way.
184 ///
185 /// To wake up only one thread, see `notify_one()`.
186 #[inline]
187 pub fn notify_all(&self) -> usize {
188 // Nothing to do if there are no waiting threads
189 let state = self.state.load(Ordering::Relaxed);
190 if state.is_null() {
191 return 0;
192 }
193
194 self.notify_all_slow(state)
195 }
196
197 #[cold]
198 fn notify_all_slow(&self, mutex: *mut RawMutex) -> usize {
199 // Unpark one thread and requeue the rest onto the mutex
200 let from = self as *const _ as usize;
201 let to = mutex as usize;
202 let validate = || {
203 // Make sure that our atomic state still points to the same
204 // mutex. If not then it means that all threads on the current
205 // mutex were woken up and a new waiting thread switched to a
206 // different mutex. In that case we can get away with doing
207 // nothing.
208 if self.state.load(Ordering::Relaxed) != mutex {
209 return RequeueOp::Abort;
210 }
211
212 // Clear our state since we are going to unpark or requeue all
213 // threads.
214 self.state.store(ptr::null_mut(), Ordering::Relaxed);
215
216 // Unpark one thread if the mutex is unlocked, otherwise just
217 // requeue everything to the mutex. This is safe to do here
218 // since unlocking the mutex when the parked bit is set requires
219 // locking the queue. There is the possibility of a race if the
220 // mutex gets locked after we check, but that doesn't matter in
221 // this case.
222 if unsafe { (*mutex).mark_parked_if_locked() } {
223 RequeueOp::RequeueAll
224 } else {
225 RequeueOp::UnparkOneRequeueRest
226 }
227 };
228 let callback = |op, result: UnparkResult| {
229 // If we requeued threads to the mutex, mark it as having
230 // parked threads. The RequeueAll case is already handled above.
231 if op == RequeueOp::UnparkOneRequeueRest && result.requeued_threads != 0 {
232 unsafe { (*mutex).mark_parked() };
233 }
234 TOKEN_NORMAL
235 };
236 let res = unsafe { parking_lot_core::unpark_requeue(from, to, validate, callback) };
237
238 res.unparked_threads + res.requeued_threads
239 }
240
241 /// Blocks the current thread until this condition variable receives a
242 /// notification.
243 ///
244 /// This function will atomically unlock the mutex specified (represented by
245 /// `mutex_guard`) and block the current thread. This means that any calls
246 /// to `notify_*()` which happen logically after the mutex is unlocked are
247 /// candidates to wake this thread up. When this function call returns, the
248 /// lock specified will have been re-acquired.
249 ///
250 /// # Panics
251 ///
252 /// This function will panic if another thread is waiting on the `Condvar`
253 /// with a different `Mutex` object.
254 #[inline]
255 pub fn wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>) {
256 self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, None);
257 }
258
259 /// Waits on this condition variable for a notification, timing out after
260 /// the specified time instant.
261 ///
262 /// The semantics of this function are equivalent to `wait()` except that
263 /// the thread will be blocked roughly until `timeout` is reached. This
264 /// method should not be used for precise timing due to anomalies such as
265 /// preemption or platform differences that may not cause the maximum
266 /// amount of time waited to be precisely `timeout`.
267 ///
268 /// Note that the best effort is made to ensure that the time waited is
269 /// measured with a monotonic clock, and not affected by the changes made to
270 /// the system time.
271 ///
272 /// The returned `WaitTimeoutResult` value indicates if the timeout is
273 /// known to have elapsed.
274 ///
275 /// Like `wait`, the lock specified will be re-acquired when this function
276 /// returns, regardless of whether the timeout elapsed or not.
277 ///
278 /// # Panics
279 ///
280 /// This function will panic if another thread is waiting on the `Condvar`
281 /// with a different `Mutex` object.
282 #[inline]
283 pub fn wait_until<T: ?Sized>(
284 &self,
285 mutex_guard: &mut MutexGuard<'_, T>,
286 timeout: Instant,
287 ) -> WaitTimeoutResult {
288 self.wait_until_internal(
289 unsafe { MutexGuard::mutex(mutex_guard).raw() },
290 Some(timeout),
291 )
292 }
293
294 // This is a non-generic function to reduce the monomorphization cost of
295 // using `wait_until`.
296 fn wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult {
297 let result;
298 let mut bad_mutex = false;
299 let mut requeued = false;
300 {
301 let addr = self as *const _ as usize;
302 let lock_addr = mutex as *const _ as *mut _;
303 let validate = || {
304 // Ensure we don't use two different mutexes with the same
305 // Condvar at the same time. This is done while locked to
306 // avoid races with notify_one
307 let state = self.state.load(Ordering::Relaxed);
308 if state.is_null() {
309 self.state.store(lock_addr, Ordering::Relaxed);
310 } else if state != lock_addr {
311 bad_mutex = true;
312 return false;
313 }
314 true
315 };
316 let before_sleep = || {
317 // Unlock the mutex before sleeping...
318 unsafe { mutex.unlock() };
319 };
320 let timed_out = |k, was_last_thread| {
321 // If we were requeued to a mutex, then we did not time out.
322 // We'll just park ourselves on the mutex again when we try
323 // to lock it later.
324 requeued = k != addr;
325
326 // If we were the last thread on the queue then we need to
327 // clear our state. This is normally done by the
328 // notify_{one,all} functions when not timing out.
329 if !requeued && was_last_thread {
330 self.state.store(ptr::null_mut(), Ordering::Relaxed);
331 }
332 };
333 result = unsafe {
334 parking_lot_core::park(
335 addr,
336 validate,
337 before_sleep,
338 timed_out,
339 DEFAULT_PARK_TOKEN,
340 timeout,
341 )
342 };
343 }
344
345 // Panic if we tried to use multiple mutexes with a Condvar. Note
346 // that at this point the MutexGuard is still locked. It will be
347 // unlocked by the unwinding logic.
348 if bad_mutex {
349 panic!("attempted to use a condition variable with more than one mutex");
350 }
351
352 // ... and re-lock it once we are done sleeping
353 if result == ParkResult::Unparked(TOKEN_HANDOFF) {
354 unsafe { deadlock::acquire_resource(mutex as *const _ as usize) };
355 } else {
356 mutex.lock();
357 }
358
359 WaitTimeoutResult(!(result.is_unparked() || requeued))
360 }
361
362 /// Waits on this condition variable for a notification, timing out after a
363 /// specified duration.
364 ///
365 /// The semantics of this function are equivalent to `wait()` except that
366 /// the thread will be blocked for roughly no longer than `timeout`. This
367 /// method should not be used for precise timing due to anomalies such as
368 /// preemption or platform differences that may not cause the maximum
369 /// amount of time waited to be precisely `timeout`.
370 ///
371 /// Note that the best effort is made to ensure that the time waited is
372 /// measured with a monotonic clock, and not affected by the changes made to
373 /// the system time.
374 ///
375 /// The returned `WaitTimeoutResult` value indicates if the timeout is
376 /// known to have elapsed.
377 ///
378 /// Like `wait`, the lock specified will be re-acquired when this function
379 /// returns, regardless of whether the timeout elapsed or not.
380 #[inline]
381 pub fn wait_for<T: ?Sized>(
382 &self,
383 mutex_guard: &mut MutexGuard<'_, T>,
384 timeout: Duration,
385 ) -> WaitTimeoutResult {
386 let deadline = util::to_deadline(timeout);
387 self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, deadline)
388 }
389
390 #[inline]
391 fn wait_while_until_internal<T, F>(
392 &self,
393 mutex_guard: &mut MutexGuard<'_, T>,
394 mut condition: F,
395 timeout: Option<Instant>,
396 ) -> WaitTimeoutResult
397 where
398 T: ?Sized,
399 F: FnMut(&mut T) -> bool,
400 {
401 let mut result = WaitTimeoutResult(false);
402
403 while !result.timed_out() && condition(mutex_guard.deref_mut()) {
404 result =
405 self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, timeout);
406 }
407
408 result
409 }
410 /// Blocks the current thread until this condition variable receives a
411 /// notification. If the provided condition evaluates to `false`, then the
412 /// thread is no longer blocked and the operation is completed. If the
413 /// condition evaluates to `true`, then the thread is blocked again and
414 /// waits for another notification before repeating this process.
415 ///
416 /// This function will atomically unlock the mutex specified (represented by
417 /// `mutex_guard`) and block the current thread. This means that any calls
418 /// to `notify_*()` which happen logically after the mutex is unlocked are
419 /// candidates to wake this thread up. When this function call returns, the
420 /// lock specified will have been re-acquired.
421 ///
422 /// # Panics
423 ///
424 /// This function will panic if another thread is waiting on the `Condvar`
425 /// with a different `Mutex` object.
426 #[inline]
427 pub fn wait_while<T, F>(&self, mutex_guard: &mut MutexGuard<'_, T>, condition: F)
428 where
429 T: ?Sized,
430 F: FnMut(&mut T) -> bool,
431 {
432 self.wait_while_until_internal(mutex_guard, condition, None);
433 }
434
435 /// Waits on this condition variable for a notification, timing out after
436 /// the specified time instant. If the provided condition evaluates to
437 /// `false`, then the thread is no longer blocked and the operation is
438 /// completed. If the condition evaluates to `true`, then the thread is
439 /// blocked again and waits for another notification before repeating
440 /// this process.
441 ///
442 /// The semantics of this function are equivalent to `wait()` except that
443 /// the thread will be blocked roughly until `timeout` is reached. This
444 /// method should not be used for precise timing due to anomalies such as
445 /// preemption or platform differences that may not cause the maximum
446 /// amount of time waited to be precisely `timeout`.
447 ///
448 /// Note that the best effort is made to ensure that the time waited is
449 /// measured with a monotonic clock, and not affected by the changes made to
450 /// the system time.
451 ///
452 /// The returned `WaitTimeoutResult` value indicates if the timeout is
453 /// known to have elapsed.
454 ///
455 /// Like `wait`, the lock specified will be re-acquired when this function
456 /// returns, regardless of whether the timeout elapsed or not.
457 ///
458 /// # Panics
459 ///
460 /// This function will panic if another thread is waiting on the `Condvar`
461 /// with a different `Mutex` object.
462 #[inline]
463 pub fn wait_while_until<T, F>(
464 &self,
465 mutex_guard: &mut MutexGuard<'_, T>,
466 condition: F,
467 timeout: Instant,
468 ) -> WaitTimeoutResult
469 where
470 T: ?Sized,
471 F: FnMut(&mut T) -> bool,
472 {
473 self.wait_while_until_internal(mutex_guard, condition, Some(timeout))
474 }
475
476 /// Waits on this condition variable for a notification, timing out after a
477 /// specified duration. If the provided condition evaluates to `false`,
478 /// then the thread is no longer blocked and the operation is completed.
479 /// If the condition evaluates to `true`, then the thread is blocked again
480 /// and waits for another notification before repeating this process.
481 ///
482 /// The semantics of this function are equivalent to `wait()` except that
483 /// the thread will be blocked for roughly no longer than `timeout`. This
484 /// method should not be used for precise timing due to anomalies such as
485 /// preemption or platform differences that may not cause the maximum
486 /// amount of time waited to be precisely `timeout`.
487 ///
488 /// Note that the best effort is made to ensure that the time waited is
489 /// measured with a monotonic clock, and not affected by the changes made to
490 /// the system time.
491 ///
492 /// The returned `WaitTimeoutResult` value indicates if the timeout is
493 /// known to have elapsed.
494 ///
495 /// Like `wait`, the lock specified will be re-acquired when this function
496 /// returns, regardless of whether the timeout elapsed or not.
497 #[inline]
498 pub fn wait_while_for<T: ?Sized, F>(
499 &self,
500 mutex_guard: &mut MutexGuard<'_, T>,
501 condition: F,
502 timeout: Duration,
503 ) -> WaitTimeoutResult
504 where
505 F: FnMut(&mut T) -> bool,
506 {
507 let deadline = util::to_deadline(timeout);
508 self.wait_while_until_internal(mutex_guard, condition, deadline)
509 }
510}
511
512impl Default for Condvar {
513 #[inline]
514 fn default() -> Condvar {
515 Condvar::new()
516 }
517}
518
519impl fmt::Debug for Condvar {
520 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
521 f.pad("Condvar { .. }")
522 }
523}
524
525#[cfg(test)]
526mod tests {
527 use crate::{Condvar, Mutex, MutexGuard};
528 use std::sync::mpsc::channel;
529 use std::sync::Arc;
530 use std::thread;
531 use std::thread::sleep;
532 use std::thread::JoinHandle;
533 use std::time::Duration;
534 use std::time::Instant;
535
536 #[test]
537 fn smoke() {
538 let c = Condvar::new();
539 c.notify_one();
540 c.notify_all();
541 }
542
543 #[test]
544 fn notify_one() {
545 let m = Arc::new(Mutex::new(()));
546 let m2 = m.clone();
547 let c = Arc::new(Condvar::new());
548 let c2 = c.clone();
549
550 let mut g = m.lock();
551 let _t = thread::spawn(move || {
552 let _g = m2.lock();
553 c2.notify_one();
554 });
555 c.wait(&mut g);
556 }
557
558 #[test]
559 fn notify_all() {
560 const N: usize = 10;
561
562 let data = Arc::new((Mutex::new(0), Condvar::new()));
563 let (tx, rx) = channel();
564 for _ in 0..N {
565 let data = data.clone();
566 let tx = tx.clone();
567 thread::spawn(move || {
568 let (lock, cond) = &*data;
569 let mut cnt = lock.lock();
570 *cnt += 1;
571 if *cnt == N {
572 tx.send(()).unwrap();
573 }
574 while *cnt != 0 {
575 cond.wait(&mut cnt);
576 }
577 tx.send(()).unwrap();
578 });
579 }
580 drop(tx);
581
582 let (lock, cond) = &*data;
583 rx.recv().unwrap();
584 let mut cnt = lock.lock();
585 *cnt = 0;
586 cond.notify_all();
587 drop(cnt);
588
589 for _ in 0..N {
590 rx.recv().unwrap();
591 }
592 }
593
594 #[test]
595 fn notify_one_return_true() {
596 let m = Arc::new(Mutex::new(()));
597 let m2 = m.clone();
598 let c = Arc::new(Condvar::new());
599 let c2 = c.clone();
600
601 let mut g = m.lock();
602 let _t = thread::spawn(move || {
603 let _g = m2.lock();
604 assert!(c2.notify_one());
605 });
606 c.wait(&mut g);
607 }
608
609 #[test]
610 fn notify_one_return_false() {
611 let m = Arc::new(Mutex::new(()));
612 let c = Arc::new(Condvar::new());
613
614 let _t = thread::spawn(move || {
615 let _g = m.lock();
616 assert!(!c.notify_one());
617 });
618 }
619
620 #[test]
621 fn notify_all_return() {
622 const N: usize = 10;
623
624 let data = Arc::new((Mutex::new(0), Condvar::new()));
625 let (tx, rx) = channel();
626 for _ in 0..N {
627 let data = data.clone();
628 let tx = tx.clone();
629 thread::spawn(move || {
630 let (lock, cond) = &*data;
631 let mut cnt = lock.lock();
632 *cnt += 1;
633 if *cnt == N {
634 tx.send(()).unwrap();
635 }
636 while *cnt != 0 {
637 cond.wait(&mut cnt);
638 }
639 tx.send(()).unwrap();
640 });
641 }
642 drop(tx);
643
644 let (lock, cond) = &*data;
645 rx.recv().unwrap();
646 let mut cnt = lock.lock();
647 *cnt = 0;
648 assert_eq!(cond.notify_all(), N);
649 drop(cnt);
650
651 for _ in 0..N {
652 rx.recv().unwrap();
653 }
654
655 assert_eq!(cond.notify_all(), 0);
656 }
657
658 #[test]
659 fn wait_for() {
660 let m = Arc::new(Mutex::new(()));
661 let m2 = m.clone();
662 let c = Arc::new(Condvar::new());
663 let c2 = c.clone();
664
665 let mut g = m.lock();
666 let no_timeout = c.wait_for(&mut g, Duration::from_millis(1));
667 assert!(no_timeout.timed_out());
668
669 let _t = thread::spawn(move || {
670 let _g = m2.lock();
671 c2.notify_one();
672 });
673 let timeout_res = c.wait_for(&mut g, Duration::from_secs(u64::max_value()));
674 assert!(!timeout_res.timed_out());
675
676 drop(g);
677 }
678
679 #[test]
680 fn wait_until() {
681 let m = Arc::new(Mutex::new(()));
682 let m2 = m.clone();
683 let c = Arc::new(Condvar::new());
684 let c2 = c.clone();
685
686 let mut g = m.lock();
687 let no_timeout = c.wait_until(&mut g, Instant::now() + Duration::from_millis(1));
688 assert!(no_timeout.timed_out());
689 let _t = thread::spawn(move || {
690 let _g = m2.lock();
691 c2.notify_one();
692 });
693 let timeout_res = c.wait_until(
694 &mut g,
695 Instant::now() + Duration::from_millis(u32::max_value() as u64),
696 );
697 assert!(!timeout_res.timed_out());
698 drop(g);
699 }
700
701 fn spawn_wait_while_notifier(
702 mutex: Arc<Mutex<u32>>,
703 cv: Arc<Condvar>,
704 num_iters: u32,
705 timeout: Option<Instant>,
706 ) -> JoinHandle<()> {
707 thread::spawn(move || {
708 for epoch in 1..=num_iters {
709 // spin to wait for main test thread to block
710 // before notifying it to wake back up and check
711 // its condition.
712 let mut sleep_backoff = Duration::from_millis(1);
713 let _mutex_guard = loop {
714 let mutex_guard = mutex.lock();
715
716 if let Some(timeout) = timeout {
717 if Instant::now() >= timeout {
718 return;
719 }
720 }
721
722 if *mutex_guard == epoch {
723 break mutex_guard;
724 }
725
726 drop(mutex_guard);
727
728 // give main test thread a good chance to
729 // acquire the lock before this thread does.
730 sleep(sleep_backoff);
731 sleep_backoff *= 2;
732 };
733
734 cv.notify_one();
735 }
736 })
737 }
738
739 #[test]
740 fn wait_while_until_internal_does_not_wait_if_initially_false() {
741 let mutex = Arc::new(Mutex::new(0));
742 let cv = Arc::new(Condvar::new());
743
744 let condition = |counter: &mut u32| {
745 *counter += 1;
746 false
747 };
748
749 let mut mutex_guard = mutex.lock();
750 let timeout_result = cv.wait_while_until_internal(&mut mutex_guard, condition, None);
751
752 assert!(!timeout_result.timed_out());
753 assert!(*mutex_guard == 1);
754 }
755
756 #[test]
757 fn wait_while_until_internal_times_out_before_false() {
758 let mutex = Arc::new(Mutex::new(0));
759 let cv = Arc::new(Condvar::new());
760
761 let num_iters = 3;
762 let condition = |counter: &mut u32| {
763 *counter += 1;
764 true
765 };
766
767 let mut mutex_guard = mutex.lock();
768 let timeout = Some(Instant::now() + Duration::from_millis(500));
769 let handle = spawn_wait_while_notifier(mutex.clone(), cv.clone(), num_iters, timeout);
770
771 let timeout_result = cv.wait_while_until_internal(&mut mutex_guard, condition, timeout);
772
773 assert!(timeout_result.timed_out());
774 assert!(*mutex_guard == num_iters + 1);
775
776 // prevent deadlock with notifier
777 drop(mutex_guard);
778 handle.join().unwrap();
779 }
780
781 #[test]
782 fn wait_while_until_internal() {
783 let mutex = Arc::new(Mutex::new(0));
784 let cv = Arc::new(Condvar::new());
785
786 let num_iters = 4;
787
788 let condition = |counter: &mut u32| {
789 *counter += 1;
790 *counter <= num_iters
791 };
792
793 let mut mutex_guard = mutex.lock();
794 let handle = spawn_wait_while_notifier(mutex.clone(), cv.clone(), num_iters, None);
795
796 let timeout_result = cv.wait_while_until_internal(&mut mutex_guard, condition, None);
797
798 assert!(!timeout_result.timed_out());
799 assert!(*mutex_guard == num_iters + 1);
800
801 let timeout_result = cv.wait_while_until_internal(&mut mutex_guard, condition, None);
802 handle.join().unwrap();
803
804 assert!(!timeout_result.timed_out());
805 assert!(*mutex_guard == num_iters + 2);
806 }
807
808 #[test]
809 #[should_panic]
810 fn two_mutexes() {
811 let m = Arc::new(Mutex::new(()));
812 let m2 = m.clone();
813 let m3 = Arc::new(Mutex::new(()));
814 let c = Arc::new(Condvar::new());
815 let c2 = c.clone();
816
817 // Make sure we don't leave the child thread dangling
818 struct PanicGuard<'a>(&'a Condvar);
819 impl<'a> Drop for PanicGuard<'a> {
820 fn drop(&mut self) {
821 self.0.notify_one();
822 }
823 }
824
825 let (tx, rx) = channel();
826 let g = m.lock();
827 let _t = thread::spawn(move || {
828 let mut g = m2.lock();
829 tx.send(()).unwrap();
830 c2.wait(&mut g);
831 });
832 drop(g);
833 rx.recv().unwrap();
834 let _g = m.lock();
835 let _guard = PanicGuard(&c);
836 c.wait(&mut m3.lock());
837 }
838
839 #[test]
840 fn two_mutexes_disjoint() {
841 let m = Arc::new(Mutex::new(()));
842 let m2 = m.clone();
843 let m3 = Arc::new(Mutex::new(()));
844 let c = Arc::new(Condvar::new());
845 let c2 = c.clone();
846
847 let mut g = m.lock();
848 let _t = thread::spawn(move || {
849 let _g = m2.lock();
850 c2.notify_one();
851 });
852 c.wait(&mut g);
853 drop(g);
854
855 let _ = c.wait_for(&mut m3.lock(), Duration::from_millis(1));
856 }
857
858 #[test]
859 fn test_debug_condvar() {
860 let c = Condvar::new();
861 assert_eq!(format!("{:?}", c), "Condvar { .. }");
862 }
863
864 #[test]
865 fn test_condvar_requeue() {
866 let m = Arc::new(Mutex::new(()));
867 let m2 = m.clone();
868 let c = Arc::new(Condvar::new());
869 let c2 = c.clone();
870 let t = thread::spawn(move || {
871 let mut g = m2.lock();
872 c2.wait(&mut g);
873 });
874
875 let mut g = m.lock();
876 while !c.notify_one() {
877 // Wait for the thread to get into wait()
878 MutexGuard::bump(&mut g);
879 // Yield, so the other thread gets a chance to do something.
880 // (At least Miri needs this, because it doesn't preempt threads.)
881 thread::yield_now();
882 }
883 // The thread should have been requeued to the mutex, which we wake up now.
884 drop(g);
885 t.join().unwrap();
886 }
887
888 #[test]
889 fn test_issue_129() {
890 let locks = Arc::new((Mutex::new(()), Condvar::new()));
891
892 let (tx, rx) = channel();
893 for _ in 0..4 {
894 let locks = locks.clone();
895 let tx = tx.clone();
896 thread::spawn(move || {
897 let mut guard = locks.0.lock();
898 locks.1.wait(&mut guard);
899 locks.1.wait_for(&mut guard, Duration::from_millis(1));
900 locks.1.notify_one();
901 tx.send(()).unwrap();
902 });
903 }
904
905 thread::sleep(Duration::from_millis(100));
906 locks.1.notify_one();
907
908 for _ in 0..4 {
909 assert_eq!(rx.recv_timeout(Duration::from_millis(500)), Ok(()));
910 }
911 }
912}
913
914/// This module contains an integration test that is heavily inspired from WebKit's own integration
915/// tests for it's own Condvar.
916#[cfg(test)]
917mod webkit_queue_test {
918 use crate::{Condvar, Mutex, MutexGuard};
919 use std::{collections::VecDeque, sync::Arc, thread, time::Duration};
920
921 #[derive(Clone, Copy)]
922 enum Timeout {
923 Bounded(Duration),
924 Forever,
925 }
926
927 #[derive(Clone, Copy)]
928 enum NotifyStyle {
929 One,
930 All,
931 }
932
933 struct Queue {
934 items: VecDeque<usize>,
935 should_continue: bool,
936 }
937
938 impl Queue {
939 fn new() -> Self {
940 Self {
941 items: VecDeque::new(),
942 should_continue: true,
943 }
944 }
945 }
946
947 fn wait<T: ?Sized>(
948 condition: &Condvar,
949 lock: &mut MutexGuard<'_, T>,
950 predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool,
951 timeout: &Timeout,
952 ) {
953 while !predicate(lock) {
954 match timeout {
955 Timeout::Forever => condition.wait(lock),
956 Timeout::Bounded(bound) => {
957 condition.wait_for(lock, *bound);
958 }
959 }
960 }
961 }
962
963 fn notify(style: NotifyStyle, condition: &Condvar, should_notify: bool) {
964 match style {
965 NotifyStyle::One => {
966 condition.notify_one();
967 }
968 NotifyStyle::All => {
969 if should_notify {
970 condition.notify_all();
971 }
972 }
973 }
974 }
975
976 fn run_queue_test(
977 num_producers: usize,
978 num_consumers: usize,
979 max_queue_size: usize,
980 messages_per_producer: usize,
981 notify_style: NotifyStyle,
982 timeout: Timeout,
983 delay: Duration,
984 ) {
985 let input_queue = Arc::new(Mutex::new(Queue::new()));
986 let empty_condition = Arc::new(Condvar::new());
987 let full_condition = Arc::new(Condvar::new());
988
989 let output_vec = Arc::new(Mutex::new(vec![]));
990
991 let consumers = (0..num_consumers)
992 .map(|_| {
993 consumer_thread(
994 input_queue.clone(),
995 empty_condition.clone(),
996 full_condition.clone(),
997 timeout,
998 notify_style,
999 output_vec.clone(),
1000 max_queue_size,
1001 )
1002 })
1003 .collect::<Vec<_>>();
1004 let producers = (0..num_producers)
1005 .map(|_| {
1006 producer_thread(
1007 messages_per_producer,
1008 input_queue.clone(),
1009 empty_condition.clone(),
1010 full_condition.clone(),
1011 timeout,
1012 notify_style,
1013 max_queue_size,
1014 )
1015 })
1016 .collect::<Vec<_>>();
1017
1018 thread::sleep(delay);
1019
1020 for producer in producers.into_iter() {
1021 producer.join().expect("Producer thread panicked");
1022 }
1023
1024 {
1025 let mut input_queue = input_queue.lock();
1026 input_queue.should_continue = false;
1027 }
1028 empty_condition.notify_all();
1029
1030 for consumer in consumers.into_iter() {
1031 consumer.join().expect("Consumer thread panicked");
1032 }
1033
1034 let mut output_vec = output_vec.lock();
1035 assert_eq!(output_vec.len(), num_producers * messages_per_producer);
1036 output_vec.sort();
1037 for msg_idx in 0..messages_per_producer {
1038 for producer_idx in 0..num_producers {
1039 assert_eq!(msg_idx, output_vec[msg_idx * num_producers + producer_idx]);
1040 }
1041 }
1042 }
1043
1044 fn consumer_thread(
1045 input_queue: Arc<Mutex<Queue>>,
1046 empty_condition: Arc<Condvar>,
1047 full_condition: Arc<Condvar>,
1048 timeout: Timeout,
1049 notify_style: NotifyStyle,
1050 output_queue: Arc<Mutex<Vec<usize>>>,
1051 max_queue_size: usize,
1052 ) -> thread::JoinHandle<()> {
1053 thread::spawn(move || loop {
1054 let (should_notify, result) = {
1055 let mut queue = input_queue.lock();
1056 wait(
1057 &empty_condition,
1058 &mut queue,
1059 |state| -> bool { !state.items.is_empty() || !state.should_continue },
1060 &timeout,
1061 );
1062 if queue.items.is_empty() && !queue.should_continue {
1063 return;
1064 }
1065 let should_notify = queue.items.len() == max_queue_size;
1066 let result = queue.items.pop_front();
1067 std::mem::drop(queue);
1068 (should_notify, result)
1069 };
1070 notify(notify_style, &full_condition, should_notify);
1071
1072 if let Some(result) = result {
1073 output_queue.lock().push(result);
1074 }
1075 })
1076 }
1077
1078 fn producer_thread(
1079 num_messages: usize,
1080 queue: Arc<Mutex<Queue>>,
1081 empty_condition: Arc<Condvar>,
1082 full_condition: Arc<Condvar>,
1083 timeout: Timeout,
1084 notify_style: NotifyStyle,
1085 max_queue_size: usize,
1086 ) -> thread::JoinHandle<()> {
1087 thread::spawn(move || {
1088 for message in 0..num_messages {
1089 let should_notify = {
1090 let mut queue = queue.lock();
1091 wait(
1092 &full_condition,
1093 &mut queue,
1094 |state| state.items.len() < max_queue_size,
1095 &timeout,
1096 );
1097 let should_notify = queue.items.is_empty();
1098 queue.items.push_back(message);
1099 std::mem::drop(queue);
1100 should_notify
1101 };
1102 notify(notify_style, &empty_condition, should_notify);
1103 }
1104 })
1105 }
1106
1107 macro_rules! run_queue_tests {
1108 ( $( $name:ident(
1109 num_producers: $num_producers:expr,
1110 num_consumers: $num_consumers:expr,
1111 max_queue_size: $max_queue_size:expr,
1112 messages_per_producer: $messages_per_producer:expr,
1113 notification_style: $notification_style:expr,
1114 timeout: $timeout:expr,
1115 delay_seconds: $delay_seconds:expr);
1116 )* ) => {
1117 $(#[test]
1118 fn $name() {
1119 let delay = Duration::from_secs($delay_seconds);
1120 run_queue_test(
1121 $num_producers,
1122 $num_consumers,
1123 $max_queue_size,
1124 $messages_per_producer,
1125 $notification_style,
1126 $timeout,
1127 delay,
1128 );
1129 })*
1130 };
1131 }
1132
1133 run_queue_tests! {
1134 sanity_check_queue(
1135 num_producers: 1,
1136 num_consumers: 1,
1137 max_queue_size: 1,
1138 messages_per_producer: 100_000,
1139 notification_style: NotifyStyle::All,
1140 timeout: Timeout::Bounded(Duration::from_secs(1)),
1141 delay_seconds: 0
1142 );
1143 sanity_check_queue_timeout(
1144 num_producers: 1,
1145 num_consumers: 1,
1146 max_queue_size: 1,
1147 messages_per_producer: 100_000,
1148 notification_style: NotifyStyle::All,
1149 timeout: Timeout::Forever,
1150 delay_seconds: 0
1151 );
1152 new_test_without_timeout_5(
1153 num_producers: 1,
1154 num_consumers: 5,
1155 max_queue_size: 1,
1156 messages_per_producer: 100_000,
1157 notification_style: NotifyStyle::All,
1158 timeout: Timeout::Forever,
1159 delay_seconds: 0
1160 );
1161 one_producer_one_consumer_one_slot(
1162 num_producers: 1,
1163 num_consumers: 1,
1164 max_queue_size: 1,
1165 messages_per_producer: 100_000,
1166 notification_style: NotifyStyle::All,
1167 timeout: Timeout::Forever,
1168 delay_seconds: 0
1169 );
1170 one_producer_one_consumer_one_slot_timeout(
1171 num_producers: 1,
1172 num_consumers: 1,
1173 max_queue_size: 1,
1174 messages_per_producer: 100_000,
1175 notification_style: NotifyStyle::All,
1176 timeout: Timeout::Forever,
1177 delay_seconds: 1
1178 );
1179 one_producer_one_consumer_hundred_slots(
1180 num_producers: 1,
1181 num_consumers: 1,
1182 max_queue_size: 100,
1183 messages_per_producer: 1_000_000,
1184 notification_style: NotifyStyle::All,
1185 timeout: Timeout::Forever,
1186 delay_seconds: 0
1187 );
1188 ten_producers_one_consumer_one_slot(
1189 num_producers: 10,
1190 num_consumers: 1,
1191 max_queue_size: 1,
1192 messages_per_producer: 10000,
1193 notification_style: NotifyStyle::All,
1194 timeout: Timeout::Forever,
1195 delay_seconds: 0
1196 );
1197 ten_producers_one_consumer_hundred_slots_notify_all(
1198 num_producers: 10,
1199 num_consumers: 1,
1200 max_queue_size: 100,
1201 messages_per_producer: 10000,
1202 notification_style: NotifyStyle::All,
1203 timeout: Timeout::Forever,
1204 delay_seconds: 0
1205 );
1206 ten_producers_one_consumer_hundred_slots_notify_one(
1207 num_producers: 10,
1208 num_consumers: 1,
1209 max_queue_size: 100,
1210 messages_per_producer: 10000,
1211 notification_style: NotifyStyle::One,
1212 timeout: Timeout::Forever,
1213 delay_seconds: 0
1214 );
1215 one_producer_ten_consumers_one_slot(
1216 num_producers: 1,
1217 num_consumers: 10,
1218 max_queue_size: 1,
1219 messages_per_producer: 10000,
1220 notification_style: NotifyStyle::All,
1221 timeout: Timeout::Forever,
1222 delay_seconds: 0
1223 );
1224 one_producer_ten_consumers_hundred_slots_notify_all(
1225 num_producers: 1,
1226 num_consumers: 10,
1227 max_queue_size: 100,
1228 messages_per_producer: 100_000,
1229 notification_style: NotifyStyle::All,
1230 timeout: Timeout::Forever,
1231 delay_seconds: 0
1232 );
1233 one_producer_ten_consumers_hundred_slots_notify_one(
1234 num_producers: 1,
1235 num_consumers: 10,
1236 max_queue_size: 100,
1237 messages_per_producer: 100_000,
1238 notification_style: NotifyStyle::One,
1239 timeout: Timeout::Forever,
1240 delay_seconds: 0
1241 );
1242 ten_producers_ten_consumers_one_slot(
1243 num_producers: 10,
1244 num_consumers: 10,
1245 max_queue_size: 1,
1246 messages_per_producer: 50000,
1247 notification_style: NotifyStyle::All,
1248 timeout: Timeout::Forever,
1249 delay_seconds: 0
1250 );
1251 ten_producers_ten_consumers_hundred_slots_notify_all(
1252 num_producers: 10,
1253 num_consumers: 10,
1254 max_queue_size: 100,
1255 messages_per_producer: 50000,
1256 notification_style: NotifyStyle::All,
1257 timeout: Timeout::Forever,
1258 delay_seconds: 0
1259 );
1260 ten_producers_ten_consumers_hundred_slots_notify_one(
1261 num_producers: 10,
1262 num_consumers: 10,
1263 max_queue_size: 100,
1264 messages_per_producer: 50000,
1265 notification_style: NotifyStyle::One,
1266 timeout: Timeout::Forever,
1267 delay_seconds: 0
1268 );
1269 }
1270}
1271