1 | use core::borrow::Borrow; |
2 | use core::cell::UnsafeCell; |
3 | use core::fmt; |
4 | use core::marker::PhantomData; |
5 | use core::ops::{Deref, DerefMut}; |
6 | use core::pin::Pin; |
7 | use core::sync::atomic::{AtomicUsize, Ordering}; |
8 | use core::task::Poll; |
9 | use core::usize; |
10 | |
11 | use alloc::sync::Arc; |
12 | |
13 | #[cfg (all(feature = "std" , not(target_family = "wasm" )))] |
14 | use std::time::{Duration, Instant}; |
15 | |
16 | use event_listener::{Event, EventListener}; |
17 | use event_listener_strategy::{easy_wrapper, EventListenerFuture}; |
18 | |
19 | /// An async mutex. |
20 | /// |
21 | /// The locking mechanism uses eventual fairness to ensure locking will be fair on average without |
22 | /// sacrificing performance. This is done by forcing a fair lock whenever a lock operation is |
23 | /// starved for longer than 0.5 milliseconds. |
24 | /// |
25 | /// # Examples |
26 | /// |
27 | /// ``` |
28 | /// # futures_lite::future::block_on(async { |
29 | /// use async_lock::Mutex; |
30 | /// |
31 | /// let m = Mutex::new(1); |
32 | /// |
33 | /// let mut guard = m.lock().await; |
34 | /// *guard = 2; |
35 | /// |
36 | /// assert!(m.try_lock().is_none()); |
37 | /// drop(guard); |
38 | /// assert_eq!(*m.try_lock().unwrap(), 2); |
39 | /// # }) |
40 | /// ``` |
41 | pub struct Mutex<T: ?Sized> { |
42 | /// Current state of the mutex. |
43 | /// |
44 | /// The least significant bit is set to 1 if the mutex is locked. |
45 | /// The other bits hold the number of starved lock operations. |
46 | state: AtomicUsize, |
47 | |
48 | /// Lock operations waiting for the mutex to be released. |
49 | lock_ops: Event, |
50 | |
51 | /// The value inside the mutex. |
52 | data: UnsafeCell<T>, |
53 | } |
54 | |
55 | unsafe impl<T: Send + ?Sized> Send for Mutex<T> {} |
56 | unsafe impl<T: Send + ?Sized> Sync for Mutex<T> {} |
57 | |
58 | impl<T> Mutex<T> { |
59 | /// Creates a new async mutex. |
60 | /// |
61 | /// # Examples |
62 | /// |
63 | /// ``` |
64 | /// use async_lock::Mutex; |
65 | /// |
66 | /// let mutex = Mutex::new(0); |
67 | /// ``` |
68 | pub const fn new(data: T) -> Mutex<T> { |
69 | Mutex { |
70 | state: AtomicUsize::new(0), |
71 | lock_ops: Event::new(), |
72 | data: UnsafeCell::new(data), |
73 | } |
74 | } |
75 | |
76 | /// Consumes the mutex, returning the underlying data. |
77 | /// |
78 | /// # Examples |
79 | /// |
80 | /// ``` |
81 | /// use async_lock::Mutex; |
82 | /// |
83 | /// let mutex = Mutex::new(10); |
84 | /// assert_eq!(mutex.into_inner(), 10); |
85 | /// ``` |
86 | pub fn into_inner(self) -> T { |
87 | self.data.into_inner() |
88 | } |
89 | } |
90 | |
91 | impl<T: ?Sized> Mutex<T> { |
92 | /// Acquires the mutex. |
93 | /// |
94 | /// Returns a guard that releases the mutex when dropped. |
95 | /// |
96 | /// # Examples |
97 | /// |
98 | /// ``` |
99 | /// # futures_lite::future::block_on(async { |
100 | /// use async_lock::Mutex; |
101 | /// |
102 | /// let mutex = Mutex::new(10); |
103 | /// let guard = mutex.lock().await; |
104 | /// assert_eq!(*guard, 10); |
105 | /// # }) |
106 | /// ``` |
107 | #[inline ] |
108 | pub fn lock(&self) -> Lock<'_, T> { |
109 | Lock::_new(LockInner { |
110 | mutex: self, |
111 | acquire_slow: None, |
112 | }) |
113 | } |
114 | |
115 | /// Acquires the mutex using the blocking strategy. |
116 | /// |
117 | /// Returns a guard that releases the mutex when dropped. |
118 | /// |
119 | /// # Blocking |
120 | /// |
121 | /// Rather than using asynchronous waiting, like the [`lock`][Mutex::lock] method, |
122 | /// this method will block the current thread until the lock is acquired. |
123 | /// |
124 | /// This method should not be used in an asynchronous context. It is intended to be |
125 | /// used in a way that a mutex can be used in both asynchronous and synchronous contexts. |
126 | /// Calling this method in an asynchronous context may result in a deadlock. |
127 | /// |
128 | /// # Examples |
129 | /// |
130 | /// ``` |
131 | /// use async_lock::Mutex; |
132 | /// |
133 | /// let mutex = Mutex::new(10); |
134 | /// let guard = mutex.lock_blocking(); |
135 | /// assert_eq!(*guard, 10); |
136 | /// ``` |
137 | #[cfg (all(feature = "std" , not(target_family = "wasm" )))] |
138 | #[inline ] |
139 | pub fn lock_blocking(&self) -> MutexGuard<'_, T> { |
140 | self.lock().wait() |
141 | } |
142 | |
143 | /// Attempts to acquire the mutex. |
144 | /// |
145 | /// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, a |
146 | /// guard is returned that releases the mutex when dropped. |
147 | /// |
148 | /// # Examples |
149 | /// |
150 | /// ``` |
151 | /// use async_lock::Mutex; |
152 | /// |
153 | /// let mutex = Mutex::new(10); |
154 | /// if let Some(guard) = mutex.try_lock() { |
155 | /// assert_eq!(*guard, 10); |
156 | /// } |
157 | /// # ; |
158 | /// ``` |
159 | #[inline ] |
160 | pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> { |
161 | if self |
162 | .state |
163 | .compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire) |
164 | .is_ok() |
165 | { |
166 | Some(MutexGuard(self)) |
167 | } else { |
168 | None |
169 | } |
170 | } |
171 | |
172 | /// Returns a mutable reference to the underlying data. |
173 | /// |
174 | /// Since this call borrows the mutex mutably, no actual locking takes place -- the mutable |
175 | /// borrow statically guarantees the mutex is not already acquired. |
176 | /// |
177 | /// # Examples |
178 | /// |
179 | /// ``` |
180 | /// # futures_lite::future::block_on(async { |
181 | /// use async_lock::Mutex; |
182 | /// |
183 | /// let mut mutex = Mutex::new(0); |
184 | /// *mutex.get_mut() = 10; |
185 | /// assert_eq!(*mutex.lock().await, 10); |
186 | /// # }) |
187 | /// ``` |
188 | pub fn get_mut(&mut self) -> &mut T { |
189 | unsafe { &mut *self.data.get() } |
190 | } |
191 | |
192 | /// Unlocks the mutex directly. |
193 | /// |
194 | /// # Safety |
195 | /// |
196 | /// This function is intended to be used only in the case where the mutex is locked, |
197 | /// and the guard is subsequently forgotten. Calling this while you don't hold a lock |
198 | /// on the mutex will likely lead to UB. |
199 | pub(crate) unsafe fn unlock_unchecked(&self) { |
200 | // Remove the last bit and notify a waiting lock operation. |
201 | self.state.fetch_sub(1, Ordering::Release); |
202 | self.lock_ops.notify(1); |
203 | } |
204 | } |
205 | |
206 | impl<T: ?Sized> Mutex<T> { |
207 | /// Acquires the mutex and clones a reference to it. |
208 | /// |
209 | /// Returns an owned guard that releases the mutex when dropped. |
210 | /// |
211 | /// # Examples |
212 | /// |
213 | /// ``` |
214 | /// # futures_lite::future::block_on(async { |
215 | /// use async_lock::Mutex; |
216 | /// use std::sync::Arc; |
217 | /// |
218 | /// let mutex = Arc::new(Mutex::new(10)); |
219 | /// let guard = mutex.lock_arc().await; |
220 | /// assert_eq!(*guard, 10); |
221 | /// # }) |
222 | /// ``` |
223 | #[inline ] |
224 | pub fn lock_arc(self: &Arc<Self>) -> LockArc<T> { |
225 | LockArc::_new(LockArcInnards::Unpolled { |
226 | mutex: Some(self.clone()), |
227 | }) |
228 | } |
229 | |
230 | /// Acquires the mutex and clones a reference to it using the blocking strategy. |
231 | /// |
232 | /// Returns an owned guard that releases the mutex when dropped. |
233 | /// |
234 | /// # Blocking |
235 | /// |
236 | /// Rather than using asynchronous waiting, like the [`lock_arc`][Mutex::lock_arc] method, |
237 | /// this method will block the current thread until the lock is acquired. |
238 | /// |
239 | /// This method should not be used in an asynchronous context. It is intended to be |
240 | /// used in a way that a mutex can be used in both asynchronous and synchronous contexts. |
241 | /// Calling this method in an asynchronous context may result in a deadlock. |
242 | /// |
243 | /// # Examples |
244 | /// |
245 | /// ``` |
246 | /// use async_lock::Mutex; |
247 | /// use std::sync::Arc; |
248 | /// |
249 | /// let mutex = Arc::new(Mutex::new(10)); |
250 | /// let guard = mutex.lock_arc_blocking(); |
251 | /// assert_eq!(*guard, 10); |
252 | /// ``` |
253 | #[cfg (all(feature = "std" , not(target_family = "wasm" )))] |
254 | #[inline ] |
255 | pub fn lock_arc_blocking(self: &Arc<Self>) -> MutexGuardArc<T> { |
256 | self.lock_arc().wait() |
257 | } |
258 | |
259 | /// Attempts to acquire the mutex and clone a reference to it. |
260 | /// |
261 | /// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, an |
262 | /// owned guard is returned that releases the mutex when dropped. |
263 | /// |
264 | /// # Examples |
265 | /// |
266 | /// ``` |
267 | /// use async_lock::Mutex; |
268 | /// use std::sync::Arc; |
269 | /// |
270 | /// let mutex = Arc::new(Mutex::new(10)); |
271 | /// if let Some(guard) = mutex.try_lock() { |
272 | /// assert_eq!(*guard, 10); |
273 | /// } |
274 | /// # ; |
275 | /// ``` |
276 | #[inline ] |
277 | pub fn try_lock_arc(self: &Arc<Self>) -> Option<MutexGuardArc<T>> { |
278 | if self |
279 | .state |
280 | .compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire) |
281 | .is_ok() |
282 | { |
283 | Some(MutexGuardArc(self.clone())) |
284 | } else { |
285 | None |
286 | } |
287 | } |
288 | } |
289 | |
290 | impl<T: fmt::Debug + ?Sized> fmt::Debug for Mutex<T> { |
291 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
292 | struct Locked; |
293 | impl fmt::Debug for Locked { |
294 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
295 | f.write_str(data:"<locked>" ) |
296 | } |
297 | } |
298 | |
299 | match self.try_lock() { |
300 | None => f.debug_struct("Mutex" ).field(name:"data" , &Locked).finish(), |
301 | Some(guard: MutexGuard<'_, T>) => f.debug_struct("Mutex" ).field(name:"data" , &&*guard).finish(), |
302 | } |
303 | } |
304 | } |
305 | |
306 | impl<T> From<T> for Mutex<T> { |
307 | fn from(val: T) -> Mutex<T> { |
308 | Mutex::new(data:val) |
309 | } |
310 | } |
311 | |
312 | impl<T: Default + ?Sized> Default for Mutex<T> { |
313 | fn default() -> Mutex<T> { |
314 | Mutex::new(data:Default::default()) |
315 | } |
316 | } |
317 | |
318 | easy_wrapper! { |
319 | /// The future returned by [`Mutex::lock`]. |
320 | pub struct Lock<'a, T: ?Sized>(LockInner<'a, T> => MutexGuard<'a, T>); |
321 | #[cfg (all(feature = "std" , not(target_family = "wasm" )))] |
322 | pub(crate) wait(); |
323 | } |
324 | |
325 | pin_project_lite::pin_project! { |
326 | /// Inner future for acquiring the mutex. |
327 | struct LockInner<'a, T: ?Sized> { |
328 | // Reference to the mutex. |
329 | mutex: &'a Mutex<T>, |
330 | |
331 | // The future that waits for the mutex to become available. |
332 | #[pin] |
333 | acquire_slow: Option<AcquireSlow<&'a Mutex<T>, T>>, |
334 | } |
335 | } |
336 | |
337 | unsafe impl<T: Send + ?Sized> Send for Lock<'_, T> {} |
338 | unsafe impl<T: Sync + ?Sized> Sync for Lock<'_, T> {} |
339 | |
340 | impl<T: ?Sized> fmt::Debug for Lock<'_, T> { |
341 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
342 | f.write_str(data:"Lock { .. }" ) |
343 | } |
344 | } |
345 | |
346 | impl<'a, T: ?Sized> EventListenerFuture for LockInner<'a, T> { |
347 | type Output = MutexGuard<'a, T>; |
348 | |
349 | #[inline ] |
350 | fn poll_with_strategy<'x, S: event_listener_strategy::Strategy<'x>>( |
351 | self: Pin<&mut Self>, |
352 | strategy: &mut S, |
353 | context: &mut S::Context, |
354 | ) -> Poll<Self::Output> { |
355 | let mut this = self.project(); |
356 | |
357 | // This may seem weird, but the borrow checker complains otherwise. |
358 | if this.acquire_slow.is_none() { |
359 | match this.mutex.try_lock() { |
360 | Some(guard) => return Poll::Ready(guard), |
361 | None => { |
362 | this.acquire_slow.set(Some(AcquireSlow::new(this.mutex))); |
363 | } |
364 | } |
365 | } |
366 | |
367 | ready!(this |
368 | .acquire_slow |
369 | .as_pin_mut() |
370 | .unwrap() |
371 | .poll_with_strategy(strategy, context)); |
372 | Poll::Ready(MutexGuard(this.mutex)) |
373 | } |
374 | } |
375 | |
376 | easy_wrapper! { |
377 | /// The future returned by [`Mutex::lock_arc`]. |
378 | pub struct LockArc<T: ?Sized>(LockArcInnards<T> => MutexGuardArc<T>); |
379 | #[cfg (all(feature = "std" , not(target_family = "wasm" )))] |
380 | pub(crate) wait(); |
381 | } |
382 | |
383 | pin_project_lite::pin_project! { |
384 | #[project = LockArcInnardsProj] |
385 | enum LockArcInnards<T: ?Sized> { |
386 | /// We have not tried to poll the fast path yet. |
387 | Unpolled { mutex: Option<Arc<Mutex<T>>> }, |
388 | |
389 | /// We are acquiring the mutex through the slow path. |
390 | AcquireSlow { |
391 | #[pin] |
392 | inner: AcquireSlow<Arc<Mutex<T>>, T> |
393 | }, |
394 | } |
395 | } |
396 | |
397 | unsafe impl<T: Send + ?Sized> Send for LockArc<T> {} |
398 | unsafe impl<T: Sync + ?Sized> Sync for LockArc<T> {} |
399 | |
400 | impl<T: ?Sized> fmt::Debug for LockArcInnards<T> { |
401 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
402 | f.write_str(data:"LockArc { .. }" ) |
403 | } |
404 | } |
405 | |
406 | impl<T: ?Sized> EventListenerFuture for LockArcInnards<T> { |
407 | type Output = MutexGuardArc<T>; |
408 | |
409 | fn poll_with_strategy<'a, S: event_listener_strategy::Strategy<'a>>( |
410 | mut self: Pin<&mut Self>, |
411 | strategy: &mut S, |
412 | context: &mut S::Context, |
413 | ) -> Poll<Self::Output> { |
414 | // Set the inner future if needed. |
415 | if let LockArcInnardsProj::Unpolled { mutex } = self.as_mut().project() { |
416 | let mutex = mutex.take().expect("mutex taken more than once" ); |
417 | |
418 | // Try the fast path before trying to register slowly. |
419 | if let Some(guard) = mutex.try_lock_arc() { |
420 | return Poll::Ready(guard); |
421 | } |
422 | |
423 | // Set the inner future to the slow acquire path. |
424 | self.as_mut().set(LockArcInnards::AcquireSlow { |
425 | inner: AcquireSlow::new(mutex), |
426 | }); |
427 | } |
428 | |
429 | // Poll the inner future. |
430 | let value = match self.project() { |
431 | LockArcInnardsProj::AcquireSlow { inner } => { |
432 | ready!(inner.poll_with_strategy(strategy, context)) |
433 | } |
434 | _ => unreachable!(), |
435 | }; |
436 | |
437 | Poll::Ready(MutexGuardArc(value)) |
438 | } |
439 | } |
440 | |
441 | pin_project_lite::pin_project! { |
442 | /// Future for acquiring the mutex slowly. |
443 | struct AcquireSlow<B: Borrow<Mutex<T>>, T: ?Sized> { |
444 | // Reference to the mutex. |
445 | mutex: Option<B>, |
446 | |
447 | // The event listener waiting on the mutex. |
448 | #[pin] |
449 | listener: EventListener, |
450 | |
451 | // The point at which the mutex lock was started. |
452 | start: Start, |
453 | |
454 | // This lock operation is starving. |
455 | starved: bool, |
456 | |
457 | // Capture the `T` lifetime. |
458 | #[pin] |
459 | _marker: PhantomData<T>, |
460 | } |
461 | |
462 | impl<T: ?Sized, B: Borrow<Mutex<T>>> PinnedDrop for AcquireSlow<B, T> { |
463 | fn drop(this: Pin<&mut Self>) { |
464 | // Make sure the starvation counter is decremented. |
465 | this.take_mutex(); |
466 | } |
467 | } |
468 | } |
469 | |
470 | /// `pin_project_lite` doesn't support `#[cfg]` yet, so we have to do this manually. |
471 | struct Start { |
472 | #[cfg (all(feature = "std" , not(target_family = "wasm" )))] |
473 | start: Option<Instant>, |
474 | } |
475 | |
476 | impl<T: ?Sized, B: Borrow<Mutex<T>>> AcquireSlow<B, T> { |
477 | /// Create a new `AcquireSlow` future. |
478 | #[cold ] |
479 | fn new(mutex: B) -> Self { |
480 | // Create a new instance of the listener. |
481 | let listener = { EventListener::new() }; |
482 | |
483 | AcquireSlow { |
484 | mutex: Some(mutex), |
485 | listener, |
486 | start: Start { |
487 | #[cfg (all(feature = "std" , not(target_family = "wasm" )))] |
488 | start: None, |
489 | }, |
490 | starved: false, |
491 | _marker: PhantomData, |
492 | } |
493 | } |
494 | |
495 | /// Take the mutex reference out, decrementing the counter if necessary. |
496 | fn take_mutex(self: Pin<&mut Self>) -> Option<B> { |
497 | let this = self.project(); |
498 | let mutex = this.mutex.take(); |
499 | |
500 | if *this.starved { |
501 | if let Some(mutex) = mutex.as_ref() { |
502 | // Decrement this counter before we exit. |
503 | mutex.borrow().state.fetch_sub(2, Ordering::Release); |
504 | } |
505 | } |
506 | |
507 | mutex |
508 | } |
509 | } |
510 | |
511 | impl<T: ?Sized, B: Unpin + Borrow<Mutex<T>>> EventListenerFuture for AcquireSlow<B, T> { |
512 | type Output = B; |
513 | |
514 | #[cold ] |
515 | fn poll_with_strategy<'a, S: event_listener_strategy::Strategy<'a>>( |
516 | mut self: Pin<&mut Self>, |
517 | strategy: &mut S, |
518 | context: &mut S::Context, |
519 | ) -> Poll<Self::Output> { |
520 | let mut this = self.as_mut().project(); |
521 | #[cfg (all(feature = "std" , not(target_family = "wasm" )))] |
522 | let start = *this.start.start.get_or_insert_with(Instant::now); |
523 | let mutex = Borrow::<Mutex<T>>::borrow( |
524 | this.mutex.as_ref().expect("future polled after completion" ), |
525 | ); |
526 | |
527 | // Only use this hot loop if we aren't currently starved. |
528 | if !*this.starved { |
529 | loop { |
530 | // Start listening for events. |
531 | if !this.listener.is_listening() { |
532 | this.listener.as_mut().listen(&mutex.lock_ops); |
533 | |
534 | // Try locking if nobody is being starved. |
535 | match mutex |
536 | .state |
537 | .compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire) |
538 | .unwrap_or_else(|x| x) |
539 | { |
540 | // Lock acquired! |
541 | 0 => return Poll::Ready(self.take_mutex().unwrap()), |
542 | |
543 | // Lock is held and nobody is starved. |
544 | 1 => {} |
545 | |
546 | // Somebody is starved. |
547 | _ => break, |
548 | } |
549 | } else { |
550 | ready!(strategy.poll(this.listener.as_mut(), context)); |
551 | |
552 | // Try locking if nobody is being starved. |
553 | match mutex |
554 | .state |
555 | .compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire) |
556 | .unwrap_or_else(|x| x) |
557 | { |
558 | // Lock acquired! |
559 | 0 => return Poll::Ready(self.take_mutex().unwrap()), |
560 | |
561 | // Lock is held and nobody is starved. |
562 | 1 => {} |
563 | |
564 | // Somebody is starved. |
565 | _ => { |
566 | // Notify the first listener in line because we probably received a |
567 | // notification that was meant for a starved task. |
568 | mutex.lock_ops.notify(1); |
569 | break; |
570 | } |
571 | } |
572 | |
573 | // If waiting for too long, fall back to a fairer locking strategy that will prevent |
574 | // newer lock operations from starving us forever. |
575 | #[cfg (all(feature = "std" , not(target_family = "wasm" )))] |
576 | if start.elapsed() > Duration::from_micros(500) { |
577 | break; |
578 | } |
579 | } |
580 | } |
581 | |
582 | // Increment the number of starved lock operations. |
583 | if mutex.state.fetch_add(2, Ordering::Release) > usize::MAX / 2 { |
584 | // In case of potential overflow, abort. |
585 | crate::abort(); |
586 | } |
587 | |
588 | // Indicate that we are now starving and will use a fairer locking strategy. |
589 | *this.starved = true; |
590 | } |
591 | |
592 | // Fairer locking loop. |
593 | loop { |
594 | if !this.listener.is_listening() { |
595 | // Start listening for events. |
596 | this.listener.as_mut().listen(&mutex.lock_ops); |
597 | |
598 | // Try locking if nobody else is being starved. |
599 | match mutex |
600 | .state |
601 | .compare_exchange(2, 2 | 1, Ordering::Acquire, Ordering::Acquire) |
602 | .unwrap_or_else(|x| x) |
603 | { |
604 | // Lock acquired! |
605 | 2 => return Poll::Ready(self.take_mutex().unwrap()), |
606 | |
607 | // Lock is held by someone. |
608 | s if s % 2 == 1 => {} |
609 | |
610 | // Lock is available. |
611 | _ => { |
612 | // Be fair: notify the first listener and then go wait in line. |
613 | mutex.lock_ops.notify(1); |
614 | } |
615 | } |
616 | } else { |
617 | // Wait for a notification. |
618 | ready!(strategy.poll(this.listener.as_mut(), context)); |
619 | |
620 | // Try acquiring the lock without waiting for others. |
621 | if mutex.state.fetch_or(1, Ordering::Acquire) % 2 == 0 { |
622 | return Poll::Ready(self.take_mutex().unwrap()); |
623 | } |
624 | } |
625 | } |
626 | } |
627 | } |
628 | |
629 | /// A guard that releases the mutex when dropped. |
630 | #[clippy::has_significant_drop] |
631 | pub struct MutexGuard<'a, T: ?Sized>(&'a Mutex<T>); |
632 | |
633 | unsafe impl<T: Send + ?Sized> Send for MutexGuard<'_, T> {} |
634 | unsafe impl<T: Sync + ?Sized> Sync for MutexGuard<'_, T> {} |
635 | |
636 | impl<'a, T: ?Sized> MutexGuard<'a, T> { |
637 | /// Returns a reference to the mutex a guard came from. |
638 | /// |
639 | /// # Examples |
640 | /// |
641 | /// ``` |
642 | /// # futures_lite::future::block_on(async { |
643 | /// use async_lock::{Mutex, MutexGuard}; |
644 | /// |
645 | /// let mutex = Mutex::new(10i32); |
646 | /// let guard = mutex.lock().await; |
647 | /// dbg!(MutexGuard::source(&guard)); |
648 | /// # }) |
649 | /// ``` |
650 | pub fn source(guard: &MutexGuard<'a, T>) -> &'a Mutex<T> { |
651 | guard.0 |
652 | } |
653 | } |
654 | |
655 | impl<T: ?Sized> Drop for MutexGuard<'_, T> { |
656 | #[inline ] |
657 | fn drop(&mut self) { |
658 | // SAFETY: we are dropping the mutex guard, therefore unlocking the mutex. |
659 | unsafe { |
660 | self.0.unlock_unchecked(); |
661 | } |
662 | } |
663 | } |
664 | |
665 | impl<T: fmt::Debug + ?Sized> fmt::Debug for MutexGuard<'_, T> { |
666 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
667 | fmt::Debug::fmt(&**self, f) |
668 | } |
669 | } |
670 | |
671 | impl<T: fmt::Display + ?Sized> fmt::Display for MutexGuard<'_, T> { |
672 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
673 | (**self).fmt(f) |
674 | } |
675 | } |
676 | |
677 | impl<T: ?Sized> Deref for MutexGuard<'_, T> { |
678 | type Target = T; |
679 | |
680 | fn deref(&self) -> &T { |
681 | unsafe { &*self.0.data.get() } |
682 | } |
683 | } |
684 | |
685 | impl<T: ?Sized> DerefMut for MutexGuard<'_, T> { |
686 | fn deref_mut(&mut self) -> &mut T { |
687 | unsafe { &mut *self.0.data.get() } |
688 | } |
689 | } |
690 | |
691 | /// An owned guard that releases the mutex when dropped. |
692 | #[clippy::has_significant_drop] |
693 | pub struct MutexGuardArc<T: ?Sized>(Arc<Mutex<T>>); |
694 | |
695 | unsafe impl<T: Send + ?Sized> Send for MutexGuardArc<T> {} |
696 | unsafe impl<T: Sync + ?Sized> Sync for MutexGuardArc<T> {} |
697 | |
698 | impl<T: ?Sized> MutexGuardArc<T> { |
699 | /// Returns a reference to the mutex a guard came from. |
700 | /// |
701 | /// # Examples |
702 | /// |
703 | /// ``` |
704 | /// # futures_lite::future::block_on(async { |
705 | /// use async_lock::{Mutex, MutexGuardArc}; |
706 | /// use std::sync::Arc; |
707 | /// |
708 | /// let mutex = Arc::new(Mutex::new(10i32)); |
709 | /// let guard = mutex.lock_arc().await; |
710 | /// dbg!(MutexGuardArc::source(&guard)); |
711 | /// # }) |
712 | /// ``` |
713 | pub fn source(guard: &Self) -> &Arc<Mutex<T>> |
714 | where |
715 | // Required because `MutexGuardArc` implements `Sync` regardless of whether `T` is `Send`, |
716 | // but this method allows dropping `T` from a different thead than it was created in. |
717 | T: Send, |
718 | { |
719 | &guard.0 |
720 | } |
721 | } |
722 | |
723 | impl<T: ?Sized> Drop for MutexGuardArc<T> { |
724 | #[inline ] |
725 | fn drop(&mut self) { |
726 | // SAFETY: we are dropping the mutex guard, therefore unlocking the mutex. |
727 | unsafe { |
728 | self.0.unlock_unchecked(); |
729 | } |
730 | } |
731 | } |
732 | |
733 | impl<T: fmt::Debug + ?Sized> fmt::Debug for MutexGuardArc<T> { |
734 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
735 | fmt::Debug::fmt(&**self, f) |
736 | } |
737 | } |
738 | |
739 | impl<T: fmt::Display + ?Sized> fmt::Display for MutexGuardArc<T> { |
740 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
741 | (**self).fmt(f) |
742 | } |
743 | } |
744 | |
745 | impl<T: ?Sized> Deref for MutexGuardArc<T> { |
746 | type Target = T; |
747 | |
748 | fn deref(&self) -> &T { |
749 | unsafe { &*self.0.data.get() } |
750 | } |
751 | } |
752 | |
753 | impl<T: ?Sized> DerefMut for MutexGuardArc<T> { |
754 | fn deref_mut(&mut self) -> &mut T { |
755 | unsafe { &mut *self.0.data.get() } |
756 | } |
757 | } |
758 | |
759 | /// Calls a function when dropped. |
760 | struct CallOnDrop<F: Fn()>(F); |
761 | |
762 | impl<F: Fn()> Drop for CallOnDrop<F> { |
763 | fn drop(&mut self) { |
764 | (self.0)(); |
765 | } |
766 | } |
767 | |