1use std::borrow::Borrow;
2use std::cell::UnsafeCell;
3use std::fmt;
4use std::future::Future;
5use std::marker::PhantomData;
6use std::mem;
7use std::ops::{Deref, DerefMut};
8use std::pin::Pin;
9use std::process;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::Arc;
12use std::task::{Context, Poll};
13
14// Note: we cannot use `target_family = "wasm"` here because it requires Rust 1.54.
15#[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))]
16use std::time::{Duration, Instant};
17
18use std::usize;
19
20use event_listener::{Event, EventListener};
21
22/// An async mutex.
23///
24/// The locking mechanism uses eventual fairness to ensure locking will be fair on average without
25/// sacrificing performance. This is done by forcing a fair lock whenever a lock operation is
26/// starved for longer than 0.5 milliseconds.
27///
28/// # Examples
29///
30/// ```
31/// # futures_lite::future::block_on(async {
32/// use async_lock::Mutex;
33///
34/// let m = Mutex::new(1);
35///
36/// let mut guard = m.lock().await;
37/// *guard = 2;
38///
39/// assert!(m.try_lock().is_none());
40/// drop(guard);
41/// assert_eq!(*m.try_lock().unwrap(), 2);
42/// # })
43/// ```
44pub struct Mutex<T: ?Sized> {
45 /// Current state of the mutex.
46 ///
47 /// The least significant bit is set to 1 if the mutex is locked.
48 /// The other bits hold the number of starved lock operations.
49 state: AtomicUsize,
50
51 /// Lock operations waiting for the mutex to be released.
52 lock_ops: Event,
53
54 /// The value inside the mutex.
55 data: UnsafeCell<T>,
56}
57
58unsafe impl<T: Send + ?Sized> Send for Mutex<T> {}
59unsafe impl<T: Send + ?Sized> Sync for Mutex<T> {}
60
61impl<T> Mutex<T> {
62 /// Creates a new async mutex.
63 ///
64 /// # Examples
65 ///
66 /// ```
67 /// use async_lock::Mutex;
68 ///
69 /// let mutex = Mutex::new(0);
70 /// ```
71 pub const fn new(data: T) -> Mutex<T> {
72 Mutex {
73 state: AtomicUsize::new(0),
74 lock_ops: Event::new(),
75 data: UnsafeCell::new(data),
76 }
77 }
78
79 /// Consumes the mutex, returning the underlying data.
80 ///
81 /// # Examples
82 ///
83 /// ```
84 /// use async_lock::Mutex;
85 ///
86 /// let mutex = Mutex::new(10);
87 /// assert_eq!(mutex.into_inner(), 10);
88 /// ```
89 pub fn into_inner(self) -> T {
90 self.data.into_inner()
91 }
92}
93
94impl<T: ?Sized> Mutex<T> {
95 /// Acquires the mutex.
96 ///
97 /// Returns a guard that releases the mutex when dropped.
98 ///
99 /// # Examples
100 ///
101 /// ```
102 /// # futures_lite::future::block_on(async {
103 /// use async_lock::Mutex;
104 ///
105 /// let mutex = Mutex::new(10);
106 /// let guard = mutex.lock().await;
107 /// assert_eq!(*guard, 10);
108 /// # })
109 /// ```
110 #[inline]
111 pub fn lock(&self) -> Lock<'_, T> {
112 Lock {
113 mutex: self,
114 acquire_slow: None,
115 }
116 }
117
118 /// Attempts to acquire the mutex.
119 ///
120 /// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, a
121 /// guard is returned that releases the mutex when dropped.
122 ///
123 /// # Examples
124 ///
125 /// ```
126 /// use async_lock::Mutex;
127 ///
128 /// let mutex = Mutex::new(10);
129 /// if let Some(guard) = mutex.try_lock() {
130 /// assert_eq!(*guard, 10);
131 /// }
132 /// # ;
133 /// ```
134 #[inline]
135 pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
136 if self
137 .state
138 .compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire)
139 .is_ok()
140 {
141 Some(MutexGuard(self))
142 } else {
143 None
144 }
145 }
146
147 /// Returns a mutable reference to the underlying data.
148 ///
149 /// Since this call borrows the mutex mutably, no actual locking takes place -- the mutable
150 /// borrow statically guarantees the mutex is not already acquired.
151 ///
152 /// # Examples
153 ///
154 /// ```
155 /// # futures_lite::future::block_on(async {
156 /// use async_lock::Mutex;
157 ///
158 /// let mut mutex = Mutex::new(0);
159 /// *mutex.get_mut() = 10;
160 /// assert_eq!(*mutex.lock().await, 10);
161 /// # })
162 /// ```
163 pub fn get_mut(&mut self) -> &mut T {
164 unsafe { &mut *self.data.get() }
165 }
166
167 /// Unlocks the mutex directly.
168 ///
169 /// # Safety
170 ///
171 /// This function is intended to be used only in the case where the mutex is locked,
172 /// and the guard is subsequently forgotten. Calling this while you don't hold a lock
173 /// on the mutex will likely lead to UB.
174 pub(crate) unsafe fn unlock_unchecked(&self) {
175 // Remove the last bit and notify a waiting lock operation.
176 self.state.fetch_sub(1, Ordering::Release);
177 self.lock_ops.notify(1);
178 }
179}
180
181impl<T: ?Sized> Mutex<T> {
182 /// Acquires the mutex and clones a reference to it.
183 ///
184 /// Returns an owned guard that releases the mutex when dropped.
185 ///
186 /// # Examples
187 ///
188 /// ```
189 /// # futures_lite::future::block_on(async {
190 /// use async_lock::Mutex;
191 /// use std::sync::Arc;
192 ///
193 /// let mutex = Arc::new(Mutex::new(10));
194 /// let guard = mutex.lock_arc().await;
195 /// assert_eq!(*guard, 10);
196 /// # })
197 /// ```
198 #[inline]
199 pub fn lock_arc(self: &Arc<Self>) -> LockArc<T> {
200 LockArc(LockArcInnards::Unpolled(self.clone()))
201 }
202
203 /// Attempts to acquire the mutex and clone a reference to it.
204 ///
205 /// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, an
206 /// owned guard is returned that releases the mutex when dropped.
207 ///
208 /// # Examples
209 ///
210 /// ```
211 /// use async_lock::Mutex;
212 /// use std::sync::Arc;
213 ///
214 /// let mutex = Arc::new(Mutex::new(10));
215 /// if let Some(guard) = mutex.try_lock() {
216 /// assert_eq!(*guard, 10);
217 /// }
218 /// # ;
219 /// ```
220 #[inline]
221 pub fn try_lock_arc(self: &Arc<Self>) -> Option<MutexGuardArc<T>> {
222 if self
223 .state
224 .compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire)
225 .is_ok()
226 {
227 Some(MutexGuardArc(self.clone()))
228 } else {
229 None
230 }
231 }
232}
233
234impl<T: fmt::Debug + ?Sized> fmt::Debug for Mutex<T> {
235 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
236 struct Locked;
237 impl fmt::Debug for Locked {
238 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
239 f.write_str(data:"<locked>")
240 }
241 }
242
243 match self.try_lock() {
244 None => f.debug_struct("Mutex").field(name:"data", &Locked).finish(),
245 Some(guard: MutexGuard<'_, T>) => f.debug_struct("Mutex").field(name:"data", &&*guard).finish(),
246 }
247 }
248}
249
250impl<T> From<T> for Mutex<T> {
251 fn from(val: T) -> Mutex<T> {
252 Mutex::new(data:val)
253 }
254}
255
256impl<T: Default + ?Sized> Default for Mutex<T> {
257 fn default() -> Mutex<T> {
258 Mutex::new(data:Default::default())
259 }
260}
261
262/// The future returned by [`Mutex::lock`].
263pub struct Lock<'a, T: ?Sized> {
264 /// Reference to the mutex.
265 mutex: &'a Mutex<T>,
266
267 /// The future that waits for the mutex to become available.
268 acquire_slow: Option<AcquireSlow<&'a Mutex<T>, T>>,
269}
270
271unsafe impl<T: Send + ?Sized> Send for Lock<'_, T> {}
272unsafe impl<T: Sync + ?Sized> Sync for Lock<'_, T> {}
273
274impl<'a, T: ?Sized> Unpin for Lock<'a, T> {}
275
276impl<T: ?Sized> fmt::Debug for Lock<'_, T> {
277 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
278 f.write_str(data:"Lock { .. }")
279 }
280}
281
282impl<'a, T: ?Sized> Future for Lock<'a, T> {
283 type Output = MutexGuard<'a, T>;
284
285 #[inline]
286 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
287 let this = self.get_mut();
288
289 loop {
290 match this.acquire_slow.as_mut() {
291 None => {
292 // Try the fast path before trying to register slowly.
293 match this.mutex.try_lock() {
294 Some(guard) => return Poll::Ready(guard),
295 None => {
296 this.acquire_slow = Some(AcquireSlow::new(this.mutex));
297 }
298 }
299 }
300
301 Some(acquire_slow) => {
302 // Continue registering slowly.
303 let value = ready!(Pin::new(acquire_slow).poll(cx));
304 return Poll::Ready(MutexGuard(value));
305 }
306 }
307 }
308 }
309}
310
311/// The future returned by [`Mutex::lock_arc`].
312pub struct LockArc<T: ?Sized>(LockArcInnards<T>);
313
314enum LockArcInnards<T: ?Sized> {
315 /// We have not tried to poll the fast path yet.
316 Unpolled(Arc<Mutex<T>>),
317
318 /// We are acquiring the mutex through the slow path.
319 AcquireSlow(AcquireSlow<Arc<Mutex<T>>, T>),
320
321 /// Empty hole to make taking easier.
322 Empty,
323}
324
325unsafe impl<T: Send + ?Sized> Send for LockArc<T> {}
326unsafe impl<T: Sync + ?Sized> Sync for LockArc<T> {}
327
328impl<T: ?Sized> Unpin for LockArc<T> {}
329
330impl<T: ?Sized> fmt::Debug for LockArc<T> {
331 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
332 f.write_str(data:"LockArc { .. }")
333 }
334}
335
336impl<T: ?Sized> Future for LockArc<T> {
337 type Output = MutexGuardArc<T>;
338
339 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
340 let this = self.get_mut();
341
342 loop {
343 match mem::replace(&mut this.0, LockArcInnards::Empty) {
344 LockArcInnards::Unpolled(mutex) => {
345 // Try the fast path before trying to register slowly.
346 match mutex.try_lock_arc() {
347 Some(guard) => return Poll::Ready(guard),
348 None => {
349 *this = LockArc(LockArcInnards::AcquireSlow(AcquireSlow::new(
350 mutex.clone(),
351 )));
352 }
353 }
354 }
355
356 LockArcInnards::AcquireSlow(mut acquire_slow) => {
357 // Continue registering slowly.
358 let value = match Pin::new(&mut acquire_slow).poll(cx) {
359 Poll::Pending => {
360 *this = LockArc(LockArcInnards::AcquireSlow(acquire_slow));
361 return Poll::Pending;
362 }
363 Poll::Ready(value) => value,
364 };
365 return Poll::Ready(MutexGuardArc(value));
366 }
367
368 LockArcInnards::Empty => panic!("future polled after completion"),
369 }
370 }
371 }
372}
373
374/// Future for acquiring the mutex slowly.
375struct AcquireSlow<B: Borrow<Mutex<T>>, T: ?Sized> {
376 /// Reference to the mutex.
377 mutex: Option<B>,
378
379 /// The event listener waiting on the mutex.
380 listener: Option<EventListener>,
381
382 /// The point at which the mutex lock was started.
383 #[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))]
384 start: Option<Instant>,
385
386 /// This lock operation is starving.
387 starved: bool,
388
389 /// Capture the `T` lifetime.
390 _marker: PhantomData<T>,
391}
392
393impl<B: Borrow<Mutex<T>> + Unpin, T: ?Sized> Unpin for AcquireSlow<B, T> {}
394
395impl<T: ?Sized, B: Borrow<Mutex<T>>> AcquireSlow<B, T> {
396 /// Create a new `AcquireSlow` future.
397 #[cold]
398 fn new(mutex: B) -> Self {
399 AcquireSlow {
400 mutex: Some(mutex),
401 listener: None,
402 #[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))]
403 start: None,
404 starved: false,
405 _marker: PhantomData,
406 }
407 }
408
409 /// Take the mutex reference out, decrementing the counter if necessary.
410 fn take_mutex(&mut self) -> Option<B> {
411 let mutex = self.mutex.take();
412
413 if self.starved {
414 if let Some(mutex) = mutex.as_ref() {
415 // Decrement this counter before we exit.
416 mutex.borrow().state.fetch_sub(2, Ordering::Release);
417 }
418 }
419
420 mutex
421 }
422}
423
424impl<T: ?Sized, B: Unpin + Borrow<Mutex<T>>> Future for AcquireSlow<B, T> {
425 type Output = B;
426
427 #[cold]
428 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
429 let this = &mut *self;
430 #[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))]
431 let start = *this.start.get_or_insert_with(Instant::now);
432 let mutex = this
433 .mutex
434 .as_ref()
435 .expect("future polled after completion")
436 .borrow();
437
438 // Only use this hot loop if we aren't currently starved.
439 if !this.starved {
440 loop {
441 // Start listening for events.
442 match &mut this.listener {
443 None => {
444 // Start listening for events.
445 this.listener = Some(mutex.lock_ops.listen());
446
447 // Try locking if nobody is being starved.
448 match mutex
449 .state
450 .compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire)
451 .unwrap_or_else(|x| x)
452 {
453 // Lock acquired!
454 0 => return Poll::Ready(this.take_mutex().unwrap()),
455
456 // Lock is held and nobody is starved.
457 1 => {}
458
459 // Somebody is starved.
460 _ => break,
461 }
462 }
463 Some(ref mut listener) => {
464 // Wait for a notification.
465 ready!(Pin::new(listener).poll(cx));
466 this.listener = None;
467
468 // Try locking if nobody is being starved.
469 match mutex
470 .state
471 .compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire)
472 .unwrap_or_else(|x| x)
473 {
474 // Lock acquired!
475 0 => return Poll::Ready(this.take_mutex().unwrap()),
476
477 // Lock is held and nobody is starved.
478 1 => {}
479
480 // Somebody is starved.
481 _ => {
482 // Notify the first listener in line because we probably received a
483 // notification that was meant for a starved task.
484 mutex.lock_ops.notify(1);
485 break;
486 }
487 }
488
489 // If waiting for too long, fall back to a fairer locking strategy that will prevent
490 // newer lock operations from starving us forever.
491 #[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))]
492 if start.elapsed() > Duration::from_micros(500) {
493 break;
494 }
495 }
496 }
497 }
498
499 // Increment the number of starved lock operations.
500 if mutex.state.fetch_add(2, Ordering::Release) > usize::MAX / 2 {
501 // In case of potential overflow, abort.
502 process::abort();
503 }
504
505 // Indicate that we are now starving and will use a fairer locking strategy.
506 this.starved = true;
507 }
508
509 // Fairer locking loop.
510 loop {
511 match &mut this.listener {
512 None => {
513 // Start listening for events.
514 this.listener = Some(mutex.lock_ops.listen());
515
516 // Try locking if nobody else is being starved.
517 match mutex
518 .state
519 .compare_exchange(2, 2 | 1, Ordering::Acquire, Ordering::Acquire)
520 .unwrap_or_else(|x| x)
521 {
522 // Lock acquired!
523 2 => return Poll::Ready(this.take_mutex().unwrap()),
524
525 // Lock is held by someone.
526 s if s % 2 == 1 => {}
527
528 // Lock is available.
529 _ => {
530 // Be fair: notify the first listener and then go wait in line.
531 mutex.lock_ops.notify(1);
532 }
533 }
534 }
535 Some(ref mut listener) => {
536 // Wait for a notification.
537 ready!(Pin::new(listener).poll(cx));
538 this.listener = None;
539
540 // Try acquiring the lock without waiting for others.
541 if mutex.state.fetch_or(1, Ordering::Acquire) % 2 == 0 {
542 return Poll::Ready(this.take_mutex().unwrap());
543 }
544 }
545 }
546 }
547 }
548}
549
550impl<T: ?Sized, B: Borrow<Mutex<T>>> Drop for AcquireSlow<B, T> {
551 fn drop(&mut self) {
552 // Make sure the starvation counter is decremented.
553 self.take_mutex();
554 }
555}
556
557/// A guard that releases the mutex when dropped.
558#[clippy::has_significant_drop]
559pub struct MutexGuard<'a, T: ?Sized>(&'a Mutex<T>);
560
561unsafe impl<T: Send + ?Sized> Send for MutexGuard<'_, T> {}
562unsafe impl<T: Sync + ?Sized> Sync for MutexGuard<'_, T> {}
563
564impl<'a, T: ?Sized> MutexGuard<'a, T> {
565 /// Returns a reference to the mutex a guard came from.
566 ///
567 /// # Examples
568 ///
569 /// ```
570 /// # futures_lite::future::block_on(async {
571 /// use async_lock::{Mutex, MutexGuard};
572 ///
573 /// let mutex = Mutex::new(10i32);
574 /// let guard = mutex.lock().await;
575 /// dbg!(MutexGuard::source(&guard));
576 /// # })
577 /// ```
578 pub fn source(guard: &MutexGuard<'a, T>) -> &'a Mutex<T> {
579 guard.0
580 }
581}
582
583impl<T: ?Sized> Drop for MutexGuard<'_, T> {
584 #[inline]
585 fn drop(&mut self) {
586 // SAFETY: we are dropping the mutex guard, therefore unlocking the mutex.
587 unsafe {
588 self.0.unlock_unchecked();
589 }
590 }
591}
592
593impl<T: fmt::Debug + ?Sized> fmt::Debug for MutexGuard<'_, T> {
594 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
595 fmt::Debug::fmt(&**self, f)
596 }
597}
598
599impl<T: fmt::Display + ?Sized> fmt::Display for MutexGuard<'_, T> {
600 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
601 (**self).fmt(f)
602 }
603}
604
605impl<T: ?Sized> Deref for MutexGuard<'_, T> {
606 type Target = T;
607
608 fn deref(&self) -> &T {
609 unsafe { &*self.0.data.get() }
610 }
611}
612
613impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
614 fn deref_mut(&mut self) -> &mut T {
615 unsafe { &mut *self.0.data.get() }
616 }
617}
618
619/// An owned guard that releases the mutex when dropped.
620#[clippy::has_significant_drop]
621pub struct MutexGuardArc<T: ?Sized>(Arc<Mutex<T>>);
622
623unsafe impl<T: Send + ?Sized> Send for MutexGuardArc<T> {}
624unsafe impl<T: Sync + ?Sized> Sync for MutexGuardArc<T> {}
625
626impl<T: ?Sized> MutexGuardArc<T> {
627 /// Returns a reference to the mutex a guard came from.
628 ///
629 /// # Examples
630 ///
631 /// ```
632 /// # futures_lite::future::block_on(async {
633 /// use async_lock::{Mutex, MutexGuardArc};
634 /// use std::sync::Arc;
635 ///
636 /// let mutex = Arc::new(Mutex::new(10i32));
637 /// let guard = mutex.lock_arc().await;
638 /// dbg!(MutexGuardArc::source(&guard));
639 /// # })
640 /// ```
641 pub fn source(guard: &Self) -> &Arc<Mutex<T>>
642 where
643 // Required because `MutexGuardArc` implements `Sync` regardless of whether `T` is `Send`,
644 // but this method allows dropping `T` from a different thead than it was created in.
645 T: Send,
646 {
647 &guard.0
648 }
649}
650
651impl<T: ?Sized> Drop for MutexGuardArc<T> {
652 #[inline]
653 fn drop(&mut self) {
654 // SAFETY: we are dropping the mutex guard, therefore unlocking the mutex.
655 unsafe {
656 self.0.unlock_unchecked();
657 }
658 }
659}
660
661impl<T: fmt::Debug + ?Sized> fmt::Debug for MutexGuardArc<T> {
662 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
663 fmt::Debug::fmt(&**self, f)
664 }
665}
666
667impl<T: fmt::Display + ?Sized> fmt::Display for MutexGuardArc<T> {
668 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
669 (**self).fmt(f)
670 }
671}
672
673impl<T: ?Sized> Deref for MutexGuardArc<T> {
674 type Target = T;
675
676 fn deref(&self) -> &T {
677 unsafe { &*self.0.data.get() }
678 }
679}
680
681impl<T: ?Sized> DerefMut for MutexGuardArc<T> {
682 fn deref_mut(&mut self) -> &mut T {
683 unsafe { &mut *self.0.data.get() }
684 }
685}
686
687/// Calls a function when dropped.
688struct CallOnDrop<F: Fn()>(F);
689
690impl<F: Fn()> Drop for CallOnDrop<F> {
691 fn drop(&mut self) {
692 (self.0)();
693 }
694}
695