| 1 | //! Futures-powered synchronization primitives. |
| 2 | |
| 3 | use alloc::boxed::Box; |
| 4 | use alloc::sync::Arc; |
| 5 | use core::cell::UnsafeCell; |
| 6 | use core::ops::{Deref, DerefMut}; |
| 7 | use core::pin::Pin; |
| 8 | use core::sync::atomic::AtomicPtr; |
| 9 | use core::sync::atomic::Ordering::SeqCst; |
| 10 | use core::{fmt, ptr}; |
| 11 | #[cfg (feature = "bilock" )] |
| 12 | use futures_core::future::Future; |
| 13 | use futures_core::task::{Context, Poll, Waker}; |
| 14 | |
| 15 | /// A type of futures-powered synchronization primitive which is a mutex between |
| 16 | /// two possible owners. |
| 17 | /// |
| 18 | /// This primitive is not as generic as a full-blown mutex but is sufficient for |
| 19 | /// many use cases where there are only two possible owners of a resource. The |
| 20 | /// implementation of `BiLock` can be more optimized for just the two possible |
| 21 | /// owners. |
| 22 | /// |
| 23 | /// Note that it's possible to use this lock through a poll-style interface with |
| 24 | /// the `poll_lock` method but you can also use it as a future with the `lock` |
| 25 | /// method that consumes a `BiLock` and returns a future that will resolve when |
| 26 | /// it's locked. |
| 27 | /// |
| 28 | /// A `BiLock` is typically used for "split" operations where data which serves |
| 29 | /// two purposes wants to be split into two to be worked with separately. For |
| 30 | /// example a TCP stream could be both a reader and a writer or a framing layer |
| 31 | /// could be both a stream and a sink for messages. A `BiLock` enables splitting |
| 32 | /// these two and then using each independently in a futures-powered fashion. |
| 33 | /// |
| 34 | /// This type is only available when the `bilock` feature of this |
| 35 | /// library is activated. |
| 36 | #[derive (Debug)] |
| 37 | #[cfg_attr (docsrs, doc(cfg(feature = "bilock" )))] |
| 38 | pub struct BiLock<T> { |
| 39 | arc: Arc<Inner<T>>, |
| 40 | } |
| 41 | |
| 42 | #[derive (Debug)] |
| 43 | struct Inner<T> { |
| 44 | state: AtomicPtr<Waker>, |
| 45 | value: Option<UnsafeCell<T>>, |
| 46 | } |
| 47 | |
| 48 | unsafe impl<T: Send> Send for Inner<T> {} |
| 49 | unsafe impl<T: Send> Sync for Inner<T> {} |
| 50 | |
| 51 | impl<T> BiLock<T> { |
| 52 | /// Creates a new `BiLock` protecting the provided data. |
| 53 | /// |
| 54 | /// Two handles to the lock are returned, and these are the only two handles |
| 55 | /// that will ever be available to the lock. These can then be sent to separate |
| 56 | /// tasks to be managed there. |
| 57 | /// |
| 58 | /// The data behind the bilock is considered to be pinned, which allows `Pin` |
| 59 | /// references to locked data. However, this means that the locked value |
| 60 | /// will only be available through `Pin<&mut T>` (not `&mut T`) unless `T` is `Unpin`. |
| 61 | /// Similarly, reuniting the lock and extracting the inner value is only |
| 62 | /// possible when `T` is `Unpin`. |
| 63 | pub fn new(t: T) -> (Self, Self) { |
| 64 | let arc = Arc::new(Inner { |
| 65 | state: AtomicPtr::new(ptr::null_mut()), |
| 66 | value: Some(UnsafeCell::new(t)), |
| 67 | }); |
| 68 | |
| 69 | (Self { arc: arc.clone() }, Self { arc }) |
| 70 | } |
| 71 | |
| 72 | /// Attempt to acquire this lock, returning `Pending` if it can't be |
| 73 | /// acquired. |
| 74 | /// |
| 75 | /// This function will acquire the lock in a nonblocking fashion, returning |
| 76 | /// immediately if the lock is already held. If the lock is successfully |
| 77 | /// acquired then `Poll::Ready` is returned with a value that represents |
| 78 | /// the locked value (and can be used to access the protected data). The |
| 79 | /// lock is unlocked when the returned `BiLockGuard` is dropped. |
| 80 | /// |
| 81 | /// If the lock is already held then this function will return |
| 82 | /// `Poll::Pending`. In this case the current task will also be scheduled |
| 83 | /// to receive a notification when the lock would otherwise become |
| 84 | /// available. |
| 85 | /// |
| 86 | /// # Panics |
| 87 | /// |
| 88 | /// This function will panic if called outside the context of a future's |
| 89 | /// task. |
| 90 | pub fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<BiLockGuard<'_, T>> { |
| 91 | let mut waker = None; |
| 92 | loop { |
| 93 | let n = self.arc.state.swap(invalid_ptr(1), SeqCst); |
| 94 | match n as usize { |
| 95 | // Woohoo, we grabbed the lock! |
| 96 | 0 => return Poll::Ready(BiLockGuard { bilock: self }), |
| 97 | |
| 98 | // Oops, someone else has locked the lock |
| 99 | 1 => {} |
| 100 | |
| 101 | // A task was previously blocked on this lock, likely our task, |
| 102 | // so we need to update that task. |
| 103 | _ => unsafe { |
| 104 | let mut prev = Box::from_raw(n); |
| 105 | *prev = cx.waker().clone(); |
| 106 | waker = Some(prev); |
| 107 | }, |
| 108 | } |
| 109 | |
| 110 | // type ascription for safety's sake! |
| 111 | let me: Box<Waker> = waker.take().unwrap_or_else(|| Box::new(cx.waker().clone())); |
| 112 | let me = Box::into_raw(me); |
| 113 | |
| 114 | match self.arc.state.compare_exchange(invalid_ptr(1), me, SeqCst, SeqCst) { |
| 115 | // The lock is still locked, but we've now parked ourselves, so |
| 116 | // just report that we're scheduled to receive a notification. |
| 117 | Ok(_) => return Poll::Pending, |
| 118 | |
| 119 | // Oops, looks like the lock was unlocked after our swap above |
| 120 | // and before the compare_exchange. Deallocate what we just |
| 121 | // allocated and go through the loop again. |
| 122 | Err(n) if n.is_null() => unsafe { |
| 123 | waker = Some(Box::from_raw(me)); |
| 124 | }, |
| 125 | |
| 126 | // The top of this loop set the previous state to 1, so if we |
| 127 | // failed the CAS above then it's because the previous value was |
| 128 | // *not* zero or one. This indicates that a task was blocked, |
| 129 | // but we're trying to acquire the lock and there's only one |
| 130 | // other reference of the lock, so it should be impossible for |
| 131 | // that task to ever block itself. |
| 132 | Err(n) => panic!("invalid state: {}" , n as usize), |
| 133 | } |
| 134 | } |
| 135 | } |
| 136 | |
| 137 | /// Perform a "blocking lock" of this lock, consuming this lock handle and |
| 138 | /// returning a future to the acquired lock. |
| 139 | /// |
| 140 | /// This function consumes the `BiLock<T>` and returns a sentinel future, |
| 141 | /// `BiLockAcquire<T>`. The returned future will resolve to |
| 142 | /// `BiLockAcquired<T>` which represents a locked lock similarly to |
| 143 | /// `BiLockGuard<T>`. |
| 144 | /// |
| 145 | /// Note that the returned future will never resolve to an error. |
| 146 | #[cfg (feature = "bilock" )] |
| 147 | #[cfg_attr (docsrs, doc(cfg(feature = "bilock" )))] |
| 148 | pub fn lock(&self) -> BiLockAcquire<'_, T> { |
| 149 | BiLockAcquire { bilock: self } |
| 150 | } |
| 151 | |
| 152 | /// Returns `true` only if the other `BiLock<T>` originated from the same call to `BiLock::new`. |
| 153 | pub fn is_pair_of(&self, other: &Self) -> bool { |
| 154 | Arc::ptr_eq(&self.arc, &other.arc) |
| 155 | } |
| 156 | |
| 157 | /// Attempts to put the two "halves" of a `BiLock<T>` back together and |
| 158 | /// recover the original value. Succeeds only if the two `BiLock<T>`s |
| 159 | /// originated from the same call to `BiLock::new`. |
| 160 | pub fn reunite(self, other: Self) -> Result<T, ReuniteError<T>> |
| 161 | where |
| 162 | T: Unpin, |
| 163 | { |
| 164 | if self.is_pair_of(&other) { |
| 165 | drop(other); |
| 166 | let inner = Arc::try_unwrap(self.arc) |
| 167 | .ok() |
| 168 | .expect("futures: try_unwrap failed in BiLock<T>::reunite" ); |
| 169 | Ok(unsafe { inner.into_value() }) |
| 170 | } else { |
| 171 | Err(ReuniteError(self, other)) |
| 172 | } |
| 173 | } |
| 174 | |
| 175 | fn unlock(&self) { |
| 176 | let n = self.arc.state.swap(ptr::null_mut(), SeqCst); |
| 177 | match n as usize { |
| 178 | // we've locked the lock, shouldn't be possible for us to see an |
| 179 | // unlocked lock. |
| 180 | 0 => panic!("invalid unlocked state" ), |
| 181 | |
| 182 | // Ok, no one else tried to get the lock, we're done. |
| 183 | 1 => {} |
| 184 | |
| 185 | // Another task has parked themselves on this lock, let's wake them |
| 186 | // up as its now their turn. |
| 187 | _ => unsafe { |
| 188 | Box::from_raw(n).wake(); |
| 189 | }, |
| 190 | } |
| 191 | } |
| 192 | } |
| 193 | |
| 194 | impl<T: Unpin> Inner<T> { |
| 195 | unsafe fn into_value(mut self) -> T { |
| 196 | self.value.take().unwrap().into_inner() |
| 197 | } |
| 198 | } |
| 199 | |
| 200 | impl<T> Drop for Inner<T> { |
| 201 | fn drop(&mut self) { |
| 202 | assert!(self.state.load(SeqCst).is_null()); |
| 203 | } |
| 204 | } |
| 205 | |
| 206 | /// Error indicating two `BiLock<T>`s were not two halves of a whole, and |
| 207 | /// thus could not be `reunite`d. |
| 208 | #[cfg_attr (docsrs, doc(cfg(feature = "bilock" )))] |
| 209 | pub struct ReuniteError<T>(pub BiLock<T>, pub BiLock<T>); |
| 210 | |
| 211 | impl<T> fmt::Debug for ReuniteError<T> { |
| 212 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 213 | f.debug_tuple(name:"ReuniteError" ).field(&"..." ).finish() |
| 214 | } |
| 215 | } |
| 216 | |
| 217 | impl<T> fmt::Display for ReuniteError<T> { |
| 218 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 219 | write!(f, "tried to reunite two BiLocks that don't form a pair" ) |
| 220 | } |
| 221 | } |
| 222 | |
| 223 | #[cfg (feature = "std" )] |
| 224 | impl<T: core::any::Any> std::error::Error for ReuniteError<T> {} |
| 225 | |
| 226 | /// Returned RAII guard from the `poll_lock` method. |
| 227 | /// |
| 228 | /// This structure acts as a sentinel to the data in the `BiLock<T>` itself, |
| 229 | /// implementing `Deref` and `DerefMut` to `T`. When dropped, the lock will be |
| 230 | /// unlocked. |
| 231 | #[derive (Debug)] |
| 232 | #[cfg_attr (docsrs, doc(cfg(feature = "bilock" )))] |
| 233 | pub struct BiLockGuard<'a, T> { |
| 234 | bilock: &'a BiLock<T>, |
| 235 | } |
| 236 | |
| 237 | // We allow parallel access to T via Deref, so Sync bound is also needed here. |
| 238 | unsafe impl<T: Send + Sync> Sync for BiLockGuard<'_, T> {} |
| 239 | |
| 240 | impl<T> Deref for BiLockGuard<'_, T> { |
| 241 | type Target = T; |
| 242 | fn deref(&self) -> &T { |
| 243 | unsafe { &*self.bilock.arc.value.as_ref().unwrap().get() } |
| 244 | } |
| 245 | } |
| 246 | |
| 247 | impl<T: Unpin> DerefMut for BiLockGuard<'_, T> { |
| 248 | fn deref_mut(&mut self) -> &mut T { |
| 249 | unsafe { &mut *self.bilock.arc.value.as_ref().unwrap().get() } |
| 250 | } |
| 251 | } |
| 252 | |
| 253 | impl<T> BiLockGuard<'_, T> { |
| 254 | /// Get a mutable pinned reference to the locked value. |
| 255 | pub fn as_pin_mut(&mut self) -> Pin<&mut T> { |
| 256 | // Safety: we never allow moving a !Unpin value out of a bilock, nor |
| 257 | // allow mutable access to it |
| 258 | unsafe { Pin::new_unchecked(&mut *self.bilock.arc.value.as_ref().unwrap().get()) } |
| 259 | } |
| 260 | } |
| 261 | |
| 262 | impl<T> Drop for BiLockGuard<'_, T> { |
| 263 | fn drop(&mut self) { |
| 264 | self.bilock.unlock(); |
| 265 | } |
| 266 | } |
| 267 | |
| 268 | /// Future returned by `BiLock::lock` which will resolve when the lock is |
| 269 | /// acquired. |
| 270 | #[cfg (feature = "bilock" )] |
| 271 | #[cfg_attr (docsrs, doc(cfg(feature = "bilock" )))] |
| 272 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
| 273 | #[derive (Debug)] |
| 274 | pub struct BiLockAcquire<'a, T> { |
| 275 | bilock: &'a BiLock<T>, |
| 276 | } |
| 277 | |
| 278 | // Pinning is never projected to fields |
| 279 | #[cfg (feature = "bilock" )] |
| 280 | impl<T> Unpin for BiLockAcquire<'_, T> {} |
| 281 | |
| 282 | #[cfg (feature = "bilock" )] |
| 283 | impl<'a, T> Future for BiLockAcquire<'a, T> { |
| 284 | type Output = BiLockGuard<'a, T>; |
| 285 | |
| 286 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 287 | self.bilock.poll_lock(cx) |
| 288 | } |
| 289 | } |
| 290 | |
| 291 | // Based on core::ptr::invalid_mut. Equivalent to `addr as *mut T`, but is strict-provenance compatible. |
| 292 | #[allow (clippy::useless_transmute)] |
| 293 | #[inline ] |
| 294 | fn invalid_ptr<T>(addr: usize) -> *mut T { |
| 295 | // SAFETY: every valid integer is also a valid pointer (as long as you don't dereference that |
| 296 | // pointer). |
| 297 | unsafe { core::mem::transmute(src:addr) } |
| 298 | } |
| 299 | |