1 | //! Futures-powered synchronization primitives. |
2 | |
3 | use alloc::boxed::Box; |
4 | use alloc::sync::Arc; |
5 | use core::cell::UnsafeCell; |
6 | use core::ops::{Deref, DerefMut}; |
7 | use core::pin::Pin; |
8 | use core::sync::atomic::AtomicPtr; |
9 | use core::sync::atomic::Ordering::SeqCst; |
10 | use core::{fmt, ptr}; |
11 | #[cfg (feature = "bilock" )] |
12 | use futures_core::future::Future; |
13 | use futures_core::task::{Context, Poll, Waker}; |
14 | |
15 | /// A type of futures-powered synchronization primitive which is a mutex between |
16 | /// two possible owners. |
17 | /// |
18 | /// This primitive is not as generic as a full-blown mutex but is sufficient for |
19 | /// many use cases where there are only two possible owners of a resource. The |
20 | /// implementation of `BiLock` can be more optimized for just the two possible |
21 | /// owners. |
22 | /// |
23 | /// Note that it's possible to use this lock through a poll-style interface with |
24 | /// the `poll_lock` method but you can also use it as a future with the `lock` |
25 | /// method that consumes a `BiLock` and returns a future that will resolve when |
26 | /// it's locked. |
27 | /// |
28 | /// A `BiLock` is typically used for "split" operations where data which serves |
29 | /// two purposes wants to be split into two to be worked with separately. For |
30 | /// example a TCP stream could be both a reader and a writer or a framing layer |
31 | /// could be both a stream and a sink for messages. A `BiLock` enables splitting |
32 | /// these two and then using each independently in a futures-powered fashion. |
33 | /// |
34 | /// This type is only available when the `bilock` feature of this |
35 | /// library is activated. |
36 | #[derive (Debug)] |
37 | #[cfg_attr (docsrs, doc(cfg(feature = "bilock" )))] |
38 | pub struct BiLock<T> { |
39 | arc: Arc<Inner<T>>, |
40 | } |
41 | |
42 | #[derive (Debug)] |
43 | struct Inner<T> { |
44 | state: AtomicPtr<Waker>, |
45 | value: Option<UnsafeCell<T>>, |
46 | } |
47 | |
48 | unsafe impl<T: Send> Send for Inner<T> {} |
49 | unsafe impl<T: Send> Sync for Inner<T> {} |
50 | |
51 | impl<T> BiLock<T> { |
52 | /// Creates a new `BiLock` protecting the provided data. |
53 | /// |
54 | /// Two handles to the lock are returned, and these are the only two handles |
55 | /// that will ever be available to the lock. These can then be sent to separate |
56 | /// tasks to be managed there. |
57 | /// |
58 | /// The data behind the bilock is considered to be pinned, which allows `Pin` |
59 | /// references to locked data. However, this means that the locked value |
60 | /// will only be available through `Pin<&mut T>` (not `&mut T`) unless `T` is `Unpin`. |
61 | /// Similarly, reuniting the lock and extracting the inner value is only |
62 | /// possible when `T` is `Unpin`. |
63 | pub fn new(t: T) -> (Self, Self) { |
64 | let arc = Arc::new(Inner { |
65 | state: AtomicPtr::new(ptr::null_mut()), |
66 | value: Some(UnsafeCell::new(t)), |
67 | }); |
68 | |
69 | (Self { arc: arc.clone() }, Self { arc }) |
70 | } |
71 | |
72 | /// Attempt to acquire this lock, returning `Pending` if it can't be |
73 | /// acquired. |
74 | /// |
75 | /// This function will acquire the lock in a nonblocking fashion, returning |
76 | /// immediately if the lock is already held. If the lock is successfully |
77 | /// acquired then `Poll::Ready` is returned with a value that represents |
78 | /// the locked value (and can be used to access the protected data). The |
79 | /// lock is unlocked when the returned `BiLockGuard` is dropped. |
80 | /// |
81 | /// If the lock is already held then this function will return |
82 | /// `Poll::Pending`. In this case the current task will also be scheduled |
83 | /// to receive a notification when the lock would otherwise become |
84 | /// available. |
85 | /// |
86 | /// # Panics |
87 | /// |
88 | /// This function will panic if called outside the context of a future's |
89 | /// task. |
90 | pub fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<BiLockGuard<'_, T>> { |
91 | let mut waker = None; |
92 | loop { |
93 | let n = self.arc.state.swap(invalid_ptr(1), SeqCst); |
94 | match n as usize { |
95 | // Woohoo, we grabbed the lock! |
96 | 0 => return Poll::Ready(BiLockGuard { bilock: self }), |
97 | |
98 | // Oops, someone else has locked the lock |
99 | 1 => {} |
100 | |
101 | // A task was previously blocked on this lock, likely our task, |
102 | // so we need to update that task. |
103 | _ => unsafe { |
104 | let mut prev = Box::from_raw(n); |
105 | *prev = cx.waker().clone(); |
106 | waker = Some(prev); |
107 | }, |
108 | } |
109 | |
110 | // type ascription for safety's sake! |
111 | let me: Box<Waker> = waker.take().unwrap_or_else(|| Box::new(cx.waker().clone())); |
112 | let me = Box::into_raw(me); |
113 | |
114 | match self.arc.state.compare_exchange(invalid_ptr(1), me, SeqCst, SeqCst) { |
115 | // The lock is still locked, but we've now parked ourselves, so |
116 | // just report that we're scheduled to receive a notification. |
117 | Ok(_) => return Poll::Pending, |
118 | |
119 | // Oops, looks like the lock was unlocked after our swap above |
120 | // and before the compare_exchange. Deallocate what we just |
121 | // allocated and go through the loop again. |
122 | Err(n) if n.is_null() => unsafe { |
123 | waker = Some(Box::from_raw(me)); |
124 | }, |
125 | |
126 | // The top of this loop set the previous state to 1, so if we |
127 | // failed the CAS above then it's because the previous value was |
128 | // *not* zero or one. This indicates that a task was blocked, |
129 | // but we're trying to acquire the lock and there's only one |
130 | // other reference of the lock, so it should be impossible for |
131 | // that task to ever block itself. |
132 | Err(n) => panic!("invalid state: {}" , n as usize), |
133 | } |
134 | } |
135 | } |
136 | |
137 | /// Perform a "blocking lock" of this lock, consuming this lock handle and |
138 | /// returning a future to the acquired lock. |
139 | /// |
140 | /// This function consumes the `BiLock<T>` and returns a sentinel future, |
141 | /// `BiLockAcquire<T>`. The returned future will resolve to |
142 | /// `BiLockAcquired<T>` which represents a locked lock similarly to |
143 | /// `BiLockGuard<T>`. |
144 | /// |
145 | /// Note that the returned future will never resolve to an error. |
146 | #[cfg (feature = "bilock" )] |
147 | #[cfg_attr (docsrs, doc(cfg(feature = "bilock" )))] |
148 | pub fn lock(&self) -> BiLockAcquire<'_, T> { |
149 | BiLockAcquire { bilock: self } |
150 | } |
151 | |
152 | /// Returns `true` only if the other `BiLock<T>` originated from the same call to `BiLock::new`. |
153 | pub fn is_pair_of(&self, other: &Self) -> bool { |
154 | Arc::ptr_eq(&self.arc, &other.arc) |
155 | } |
156 | |
157 | /// Attempts to put the two "halves" of a `BiLock<T>` back together and |
158 | /// recover the original value. Succeeds only if the two `BiLock<T>`s |
159 | /// originated from the same call to `BiLock::new`. |
160 | pub fn reunite(self, other: Self) -> Result<T, ReuniteError<T>> |
161 | where |
162 | T: Unpin, |
163 | { |
164 | if self.is_pair_of(&other) { |
165 | drop(other); |
166 | let inner = Arc::try_unwrap(self.arc) |
167 | .ok() |
168 | .expect("futures: try_unwrap failed in BiLock<T>::reunite" ); |
169 | Ok(unsafe { inner.into_value() }) |
170 | } else { |
171 | Err(ReuniteError(self, other)) |
172 | } |
173 | } |
174 | |
175 | fn unlock(&self) { |
176 | let n = self.arc.state.swap(ptr::null_mut(), SeqCst); |
177 | match n as usize { |
178 | // we've locked the lock, shouldn't be possible for us to see an |
179 | // unlocked lock. |
180 | 0 => panic!("invalid unlocked state" ), |
181 | |
182 | // Ok, no one else tried to get the lock, we're done. |
183 | 1 => {} |
184 | |
185 | // Another task has parked themselves on this lock, let's wake them |
186 | // up as its now their turn. |
187 | _ => unsafe { |
188 | Box::from_raw(n).wake(); |
189 | }, |
190 | } |
191 | } |
192 | } |
193 | |
194 | impl<T: Unpin> Inner<T> { |
195 | unsafe fn into_value(mut self) -> T { |
196 | self.value.take().unwrap().into_inner() |
197 | } |
198 | } |
199 | |
200 | impl<T> Drop for Inner<T> { |
201 | fn drop(&mut self) { |
202 | assert!(self.state.load(SeqCst).is_null()); |
203 | } |
204 | } |
205 | |
206 | /// Error indicating two `BiLock<T>`s were not two halves of a whole, and |
207 | /// thus could not be `reunite`d. |
208 | #[cfg_attr (docsrs, doc(cfg(feature = "bilock" )))] |
209 | pub struct ReuniteError<T>(pub BiLock<T>, pub BiLock<T>); |
210 | |
211 | impl<T> fmt::Debug for ReuniteError<T> { |
212 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
213 | f.debug_tuple(name:"ReuniteError" ).field(&"..." ).finish() |
214 | } |
215 | } |
216 | |
217 | impl<T> fmt::Display for ReuniteError<T> { |
218 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
219 | write!(f, "tried to reunite two BiLocks that don't form a pair" ) |
220 | } |
221 | } |
222 | |
223 | #[cfg (feature = "std" )] |
224 | impl<T: core::any::Any> std::error::Error for ReuniteError<T> {} |
225 | |
226 | /// Returned RAII guard from the `poll_lock` method. |
227 | /// |
228 | /// This structure acts as a sentinel to the data in the `BiLock<T>` itself, |
229 | /// implementing `Deref` and `DerefMut` to `T`. When dropped, the lock will be |
230 | /// unlocked. |
231 | #[derive (Debug)] |
232 | #[cfg_attr (docsrs, doc(cfg(feature = "bilock" )))] |
233 | pub struct BiLockGuard<'a, T> { |
234 | bilock: &'a BiLock<T>, |
235 | } |
236 | |
237 | // We allow parallel access to T via Deref, so Sync bound is also needed here. |
238 | unsafe impl<T: Send + Sync> Sync for BiLockGuard<'_, T> {} |
239 | |
240 | impl<T> Deref for BiLockGuard<'_, T> { |
241 | type Target = T; |
242 | fn deref(&self) -> &T { |
243 | unsafe { &*self.bilock.arc.value.as_ref().unwrap().get() } |
244 | } |
245 | } |
246 | |
247 | impl<T: Unpin> DerefMut for BiLockGuard<'_, T> { |
248 | fn deref_mut(&mut self) -> &mut T { |
249 | unsafe { &mut *self.bilock.arc.value.as_ref().unwrap().get() } |
250 | } |
251 | } |
252 | |
253 | impl<T> BiLockGuard<'_, T> { |
254 | /// Get a mutable pinned reference to the locked value. |
255 | pub fn as_pin_mut(&mut self) -> Pin<&mut T> { |
256 | // Safety: we never allow moving a !Unpin value out of a bilock, nor |
257 | // allow mutable access to it |
258 | unsafe { Pin::new_unchecked(&mut *self.bilock.arc.value.as_ref().unwrap().get()) } |
259 | } |
260 | } |
261 | |
262 | impl<T> Drop for BiLockGuard<'_, T> { |
263 | fn drop(&mut self) { |
264 | self.bilock.unlock(); |
265 | } |
266 | } |
267 | |
268 | /// Future returned by `BiLock::lock` which will resolve when the lock is |
269 | /// acquired. |
270 | #[cfg (feature = "bilock" )] |
271 | #[cfg_attr (docsrs, doc(cfg(feature = "bilock" )))] |
272 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
273 | #[derive (Debug)] |
274 | pub struct BiLockAcquire<'a, T> { |
275 | bilock: &'a BiLock<T>, |
276 | } |
277 | |
278 | // Pinning is never projected to fields |
279 | #[cfg (feature = "bilock" )] |
280 | impl<T> Unpin for BiLockAcquire<'_, T> {} |
281 | |
282 | #[cfg (feature = "bilock" )] |
283 | impl<'a, T> Future for BiLockAcquire<'a, T> { |
284 | type Output = BiLockGuard<'a, T>; |
285 | |
286 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
287 | self.bilock.poll_lock(cx) |
288 | } |
289 | } |
290 | |
291 | // Based on core::ptr::invalid_mut. Equivalent to `addr as *mut T`, but is strict-provenance compatible. |
292 | #[allow (clippy::useless_transmute)] |
293 | #[inline ] |
294 | fn invalid_ptr<T>(addr: usize) -> *mut T { |
295 | // SAFETY: every valid integer is also a valid pointer (as long as you don't dereference that |
296 | // pointer). |
297 | unsafe { core::mem::transmute(src:addr) } |
298 | } |
299 | |