1use std::cell::UnsafeCell;
2use std::marker::PhantomData;
3use std::ops::{Deref, DerefMut};
4use std::pin::Pin;
5use std::sync::atomic::{AtomicUsize, Ordering};
6use std::sync::{Arc, Mutex as StdMutex};
7use std::{fmt, mem};
8
9use slab::Slab;
10
11use futures_core::future::{FusedFuture, Future};
12use futures_core::task::{Context, Poll, Waker};
13
14/// A futures-aware mutex.
15///
16/// # Fairness
17///
18/// This mutex provides no fairness guarantees. Tasks may not acquire the mutex
19/// in the order that they requested the lock, and it's possible for a single task
20/// which repeatedly takes the lock to starve other tasks, which may be left waiting
21/// indefinitely.
22pub struct Mutex<T: ?Sized> {
23 state: AtomicUsize,
24 waiters: StdMutex<Slab<Waiter>>,
25 value: UnsafeCell<T>,
26}
27
28impl<T: ?Sized> fmt::Debug for Mutex<T> {
29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30 let state = self.state.load(Ordering::SeqCst);
31 f.debug_struct("Mutex")
32 .field("is_locked", &((state & IS_LOCKED) != 0))
33 .field("has_waiters", &((state & HAS_WAITERS) != 0))
34 .finish()
35 }
36}
37
38impl<T> From<T> for Mutex<T> {
39 fn from(t: T) -> Self {
40 Self::new(t)
41 }
42}
43
44impl<T: Default> Default for Mutex<T> {
45 fn default() -> Self {
46 Self::new(Default::default())
47 }
48}
49
50enum Waiter {
51 Waiting(Waker),
52 Woken,
53}
54
55impl Waiter {
56 fn register(&mut self, waker: &Waker) {
57 match self {
58 Self::Waiting(w) if waker.will_wake(w) => {}
59 _ => *self = Self::Waiting(waker.clone()),
60 }
61 }
62
63 fn wake(&mut self) {
64 match mem::replace(self, Self::Woken) {
65 Self::Waiting(waker) => waker.wake(),
66 Self::Woken => {}
67 }
68 }
69}
70
71const IS_LOCKED: usize = 1 << 0;
72const HAS_WAITERS: usize = 1 << 1;
73
74impl<T> Mutex<T> {
75 /// Creates a new futures-aware mutex.
76 pub fn new(t: T) -> Self {
77 Self {
78 state: AtomicUsize::new(0),
79 waiters: StdMutex::new(Slab::new()),
80 value: UnsafeCell::new(t),
81 }
82 }
83
84 /// Consumes this mutex, returning the underlying data.
85 ///
86 /// # Examples
87 ///
88 /// ```
89 /// use futures::lock::Mutex;
90 ///
91 /// let mutex = Mutex::new(0);
92 /// assert_eq!(mutex.into_inner(), 0);
93 /// ```
94 pub fn into_inner(self) -> T {
95 self.value.into_inner()
96 }
97}
98
99impl<T: ?Sized> Mutex<T> {
100 /// Attempt to acquire the lock immediately.
101 ///
102 /// If the lock is currently held, this will return `None`.
103 pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
104 let old_state = self.state.fetch_or(IS_LOCKED, Ordering::Acquire);
105 if (old_state & IS_LOCKED) == 0 {
106 Some(MutexGuard { mutex: self })
107 } else {
108 None
109 }
110 }
111
112 /// Attempt to acquire the lock immediately.
113 ///
114 /// If the lock is currently held, this will return `None`.
115 pub fn try_lock_owned(self: &Arc<Self>) -> Option<OwnedMutexGuard<T>> {
116 let old_state = self.state.fetch_or(IS_LOCKED, Ordering::Acquire);
117 if (old_state & IS_LOCKED) == 0 {
118 Some(OwnedMutexGuard { mutex: self.clone() })
119 } else {
120 None
121 }
122 }
123
124 /// Acquire the lock asynchronously.
125 ///
126 /// This method returns a future that will resolve once the lock has been
127 /// successfully acquired.
128 pub fn lock(&self) -> MutexLockFuture<'_, T> {
129 MutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE }
130 }
131
132 /// Acquire the lock asynchronously.
133 ///
134 /// This method returns a future that will resolve once the lock has been
135 /// successfully acquired.
136 pub fn lock_owned(self: Arc<Self>) -> OwnedMutexLockFuture<T> {
137 OwnedMutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE }
138 }
139
140 /// Returns a mutable reference to the underlying data.
141 ///
142 /// Since this call borrows the `Mutex` mutably, no actual locking needs to
143 /// take place -- the mutable borrow statically guarantees no locks exist.
144 ///
145 /// # Examples
146 ///
147 /// ```
148 /// # futures::executor::block_on(async {
149 /// use futures::lock::Mutex;
150 ///
151 /// let mut mutex = Mutex::new(0);
152 /// *mutex.get_mut() = 10;
153 /// assert_eq!(*mutex.lock().await, 10);
154 /// # });
155 /// ```
156 pub fn get_mut(&mut self) -> &mut T {
157 // We know statically that there are no other references to `self`, so
158 // there's no need to lock the inner mutex.
159 unsafe { &mut *self.value.get() }
160 }
161
162 fn remove_waker(&self, wait_key: usize, wake_another: bool) {
163 if wait_key != WAIT_KEY_NONE {
164 let mut waiters = self.waiters.lock().unwrap();
165 match waiters.remove(wait_key) {
166 Waiter::Waiting(_) => {}
167 Waiter::Woken => {
168 // We were awoken, but then dropped before we could
169 // wake up to acquire the lock. Wake up another
170 // waiter.
171 if wake_another {
172 if let Some((_i, waiter)) = waiters.iter_mut().next() {
173 waiter.wake();
174 }
175 }
176 }
177 }
178 if waiters.is_empty() {
179 self.state.fetch_and(!HAS_WAITERS, Ordering::Relaxed); // released by mutex unlock
180 }
181 }
182 }
183
184 // Unlocks the mutex. Called by MutexGuard and MappedMutexGuard when they are
185 // dropped.
186 fn unlock(&self) {
187 let old_state = self.state.fetch_and(!IS_LOCKED, Ordering::AcqRel);
188 if (old_state & HAS_WAITERS) != 0 {
189 let mut waiters = self.waiters.lock().unwrap();
190 if let Some((_i, waiter)) = waiters.iter_mut().next() {
191 waiter.wake();
192 }
193 }
194 }
195}
196
197// Sentinel for when no slot in the `Slab` has been dedicated to this object.
198const WAIT_KEY_NONE: usize = usize::MAX;
199
200/// A future which resolves when the target mutex has been successfully acquired, owned version.
201pub struct OwnedMutexLockFuture<T: ?Sized> {
202 // `None` indicates that the mutex was successfully acquired.
203 mutex: Option<Arc<Mutex<T>>>,
204 wait_key: usize,
205}
206
207impl<T: ?Sized> fmt::Debug for OwnedMutexLockFuture<T> {
208 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209 f.debug_struct("OwnedMutexLockFuture")
210 .field("was_acquired", &self.mutex.is_none())
211 .field("mutex", &self.mutex)
212 .field(
213 "wait_key",
214 &(if self.wait_key == WAIT_KEY_NONE { None } else { Some(self.wait_key) }),
215 )
216 .finish()
217 }
218}
219
220impl<T: ?Sized> FusedFuture for OwnedMutexLockFuture<T> {
221 fn is_terminated(&self) -> bool {
222 self.mutex.is_none()
223 }
224}
225
226impl<T: ?Sized> Future for OwnedMutexLockFuture<T> {
227 type Output = OwnedMutexGuard<T>;
228
229 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
230 let this = self.get_mut();
231
232 let mutex = this.mutex.as_ref().expect("polled OwnedMutexLockFuture after completion");
233
234 if let Some(lock) = mutex.try_lock_owned() {
235 mutex.remove_waker(this.wait_key, false);
236 this.mutex = None;
237 return Poll::Ready(lock);
238 }
239
240 {
241 let mut waiters = mutex.waiters.lock().unwrap();
242 if this.wait_key == WAIT_KEY_NONE {
243 this.wait_key = waiters.insert(Waiter::Waiting(cx.waker().clone()));
244 if waiters.len() == 1 {
245 mutex.state.fetch_or(HAS_WAITERS, Ordering::Relaxed); // released by mutex unlock
246 }
247 } else {
248 waiters[this.wait_key].register(cx.waker());
249 }
250 }
251
252 // Ensure that we haven't raced `MutexGuard::drop`'s unlock path by
253 // attempting to acquire the lock again.
254 if let Some(lock) = mutex.try_lock_owned() {
255 mutex.remove_waker(this.wait_key, false);
256 this.mutex = None;
257 return Poll::Ready(lock);
258 }
259
260 Poll::Pending
261 }
262}
263
264impl<T: ?Sized> Drop for OwnedMutexLockFuture<T> {
265 fn drop(&mut self) {
266 if let Some(mutex) = self.mutex.as_ref() {
267 // This future was dropped before it acquired the mutex.
268 //
269 // Remove ourselves from the map, waking up another waiter if we
270 // had been awoken to acquire the lock.
271 mutex.remove_waker(self.wait_key, true);
272 }
273 }
274}
275
276/// An RAII guard returned by the `lock_owned` and `try_lock_owned` methods.
277/// When this structure is dropped (falls out of scope), the lock will be
278/// unlocked.
279pub struct OwnedMutexGuard<T: ?Sized> {
280 mutex: Arc<Mutex<T>>,
281}
282
283impl<T: ?Sized + fmt::Debug> fmt::Debug for OwnedMutexGuard<T> {
284 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
285 f.debug_struct("OwnedMutexGuard")
286 .field("value", &&**self)
287 .field("mutex", &self.mutex)
288 .finish()
289 }
290}
291
292impl<T: ?Sized> Drop for OwnedMutexGuard<T> {
293 fn drop(&mut self) {
294 self.mutex.unlock()
295 }
296}
297
298impl<T: ?Sized> Deref for OwnedMutexGuard<T> {
299 type Target = T;
300 fn deref(&self) -> &T {
301 unsafe { &*self.mutex.value.get() }
302 }
303}
304
305impl<T: ?Sized> DerefMut for OwnedMutexGuard<T> {
306 fn deref_mut(&mut self) -> &mut T {
307 unsafe { &mut *self.mutex.value.get() }
308 }
309}
310
311/// A future which resolves when the target mutex has been successfully acquired.
312pub struct MutexLockFuture<'a, T: ?Sized> {
313 // `None` indicates that the mutex was successfully acquired.
314 mutex: Option<&'a Mutex<T>>,
315 wait_key: usize,
316}
317
318impl<T: ?Sized> fmt::Debug for MutexLockFuture<'_, T> {
319 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
320 f.debug_struct("MutexLockFuture")
321 .field("was_acquired", &self.mutex.is_none())
322 .field("mutex", &self.mutex)
323 .field(
324 "wait_key",
325 &(if self.wait_key == WAIT_KEY_NONE { None } else { Some(self.wait_key) }),
326 )
327 .finish()
328 }
329}
330
331impl<T: ?Sized> FusedFuture for MutexLockFuture<'_, T> {
332 fn is_terminated(&self) -> bool {
333 self.mutex.is_none()
334 }
335}
336
337impl<'a, T: ?Sized> Future for MutexLockFuture<'a, T> {
338 type Output = MutexGuard<'a, T>;
339
340 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
341 let mutex = self.mutex.expect("polled MutexLockFuture after completion");
342
343 if let Some(lock) = mutex.try_lock() {
344 mutex.remove_waker(self.wait_key, false);
345 self.mutex = None;
346 return Poll::Ready(lock);
347 }
348
349 {
350 let mut waiters = mutex.waiters.lock().unwrap();
351 if self.wait_key == WAIT_KEY_NONE {
352 self.wait_key = waiters.insert(Waiter::Waiting(cx.waker().clone()));
353 if waiters.len() == 1 {
354 mutex.state.fetch_or(HAS_WAITERS, Ordering::Relaxed); // released by mutex unlock
355 }
356 } else {
357 waiters[self.wait_key].register(cx.waker());
358 }
359 }
360
361 // Ensure that we haven't raced `MutexGuard::drop`'s unlock path by
362 // attempting to acquire the lock again.
363 if let Some(lock) = mutex.try_lock() {
364 mutex.remove_waker(self.wait_key, false);
365 self.mutex = None;
366 return Poll::Ready(lock);
367 }
368
369 Poll::Pending
370 }
371}
372
373impl<T: ?Sized> Drop for MutexLockFuture<'_, T> {
374 fn drop(&mut self) {
375 if let Some(mutex) = self.mutex {
376 // This future was dropped before it acquired the mutex.
377 //
378 // Remove ourselves from the map, waking up another waiter if we
379 // had been awoken to acquire the lock.
380 mutex.remove_waker(self.wait_key, true);
381 }
382 }
383}
384
385/// An RAII guard returned by the `lock` and `try_lock` methods.
386/// When this structure is dropped (falls out of scope), the lock will be
387/// unlocked.
388pub struct MutexGuard<'a, T: ?Sized> {
389 mutex: &'a Mutex<T>,
390}
391
392impl<'a, T: ?Sized> MutexGuard<'a, T> {
393 /// Returns a locked view over a portion of the locked data.
394 ///
395 /// # Example
396 ///
397 /// ```
398 /// # futures::executor::block_on(async {
399 /// use futures::lock::{Mutex, MutexGuard};
400 ///
401 /// let data = Mutex::new(Some("value".to_string()));
402 /// {
403 /// let locked_str = MutexGuard::map(data.lock().await, |opt| opt.as_mut().unwrap());
404 /// assert_eq!(&*locked_str, "value");
405 /// }
406 /// # });
407 /// ```
408 #[inline]
409 pub fn map<U: ?Sized, F>(this: Self, f: F) -> MappedMutexGuard<'a, T, U>
410 where
411 F: FnOnce(&mut T) -> &mut U,
412 {
413 let mutex = this.mutex;
414 let value = f(unsafe { &mut *this.mutex.value.get() });
415 // Don't run the `drop` method for MutexGuard. The ownership of the underlying
416 // locked state is being moved to the returned MappedMutexGuard.
417 mem::forget(this);
418 MappedMutexGuard { mutex, value, _marker: PhantomData }
419 }
420}
421
422impl<T: ?Sized + fmt::Debug> fmt::Debug for MutexGuard<'_, T> {
423 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
424 f.debug_struct("MutexGuard").field("value", &&**self).field("mutex", &self.mutex).finish()
425 }
426}
427
428impl<T: ?Sized> Drop for MutexGuard<'_, T> {
429 fn drop(&mut self) {
430 self.mutex.unlock()
431 }
432}
433
434impl<T: ?Sized> Deref for MutexGuard<'_, T> {
435 type Target = T;
436 fn deref(&self) -> &T {
437 unsafe { &*self.mutex.value.get() }
438 }
439}
440
441impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
442 fn deref_mut(&mut self) -> &mut T {
443 unsafe { &mut *self.mutex.value.get() }
444 }
445}
446
447/// An RAII guard returned by the `MutexGuard::map` and `MappedMutexGuard::map` methods.
448/// When this structure is dropped (falls out of scope), the lock will be unlocked.
449pub struct MappedMutexGuard<'a, T: ?Sized, U: ?Sized> {
450 mutex: &'a Mutex<T>,
451 value: *mut U,
452 _marker: PhantomData<&'a mut U>,
453}
454
455impl<'a, T: ?Sized, U: ?Sized> MappedMutexGuard<'a, T, U> {
456 /// Returns a locked view over a portion of the locked data.
457 ///
458 /// # Example
459 ///
460 /// ```
461 /// # futures::executor::block_on(async {
462 /// use futures::lock::{MappedMutexGuard, Mutex, MutexGuard};
463 ///
464 /// let data = Mutex::new(Some("value".to_string()));
465 /// {
466 /// let locked_str = MutexGuard::map(data.lock().await, |opt| opt.as_mut().unwrap());
467 /// let locked_char = MappedMutexGuard::map(locked_str, |s| s.get_mut(0..1).unwrap());
468 /// assert_eq!(&*locked_char, "v");
469 /// }
470 /// # });
471 /// ```
472 #[inline]
473 pub fn map<V: ?Sized, F>(this: Self, f: F) -> MappedMutexGuard<'a, T, V>
474 where
475 F: FnOnce(&mut U) -> &mut V,
476 {
477 let mutex = this.mutex;
478 let value = f(unsafe { &mut *this.value });
479 // Don't run the `drop` method for MappedMutexGuard. The ownership of the underlying
480 // locked state is being moved to the returned MappedMutexGuard.
481 mem::forget(this);
482 MappedMutexGuard { mutex, value, _marker: PhantomData }
483 }
484}
485
486impl<T: ?Sized, U: ?Sized + fmt::Debug> fmt::Debug for MappedMutexGuard<'_, T, U> {
487 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
488 f.debug_struct("MappedMutexGuard")
489 .field("value", &&**self)
490 .field("mutex", &self.mutex)
491 .finish()
492 }
493}
494
495impl<T: ?Sized, U: ?Sized> Drop for MappedMutexGuard<'_, T, U> {
496 fn drop(&mut self) {
497 self.mutex.unlock()
498 }
499}
500
501impl<T: ?Sized, U: ?Sized> Deref for MappedMutexGuard<'_, T, U> {
502 type Target = U;
503 fn deref(&self) -> &U {
504 unsafe { &*self.value }
505 }
506}
507
508impl<T: ?Sized, U: ?Sized> DerefMut for MappedMutexGuard<'_, T, U> {
509 fn deref_mut(&mut self) -> &mut U {
510 unsafe { &mut *self.value }
511 }
512}
513
514// Mutexes can be moved freely between threads and acquired on any thread so long
515// as the inner value can be safely sent between threads.
516unsafe impl<T: ?Sized + Send> Send for Mutex<T> {}
517unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}
518
519// It's safe to switch which thread the acquire is being attempted on so long as
520// `T` can be accessed on that thread.
521unsafe impl<T: ?Sized + Send> Send for MutexLockFuture<'_, T> {}
522
523// doesn't have any interesting `&self` methods (only Debug)
524unsafe impl<T: ?Sized> Sync for MutexLockFuture<'_, T> {}
525
526// It's safe to switch which thread the acquire is being attempted on so long as
527// `T` can be accessed on that thread.
528unsafe impl<T: ?Sized + Send> Send for OwnedMutexLockFuture<T> {}
529
530// doesn't have any interesting `&self` methods (only Debug)
531unsafe impl<T: ?Sized> Sync for OwnedMutexLockFuture<T> {}
532
533// Safe to send since we don't track any thread-specific details-- the inner
534// lock is essentially spinlock-equivalent (attempt to flip an atomic bool)
535unsafe impl<T: ?Sized + Send> Send for MutexGuard<'_, T> {}
536unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {}
537
538unsafe impl<T: ?Sized + Send> Send for OwnedMutexGuard<T> {}
539unsafe impl<T: ?Sized + Sync> Sync for OwnedMutexGuard<T> {}
540
541unsafe impl<T: ?Sized + Send, U: ?Sized + Send> Send for MappedMutexGuard<'_, T, U> {}
542unsafe impl<T: ?Sized + Sync, U: ?Sized + Sync> Sync for MappedMutexGuard<'_, T, U> {}
543
544#[test]
545fn test_mutex_guard_debug_not_recurse() {
546 let mutex = Mutex::new(42);
547 let guard = mutex.try_lock().unwrap();
548 let _ = format!("{:?}", guard);
549 let guard = MutexGuard::map(guard, |n| n);
550 let _ = format!("{:?}", guard);
551}
552