1 | //! Futures-powered synchronization primitives. |
2 | |
3 | use alloc::boxed::Box; |
4 | use alloc::sync::Arc; |
5 | use core::cell::UnsafeCell; |
6 | use core::ops::{Deref, DerefMut}; |
7 | use core::pin::Pin; |
8 | use core::sync::atomic::AtomicPtr; |
9 | use core::sync::atomic::Ordering::SeqCst; |
10 | use core::{fmt, ptr}; |
11 | #[cfg (feature = "bilock" )] |
12 | use futures_core::future::Future; |
13 | use futures_core::task::{Context, Poll, Waker}; |
14 | |
15 | /// A type of futures-powered synchronization primitive which is a mutex between |
16 | /// two possible owners. |
17 | /// |
18 | /// This primitive is not as generic as a full-blown mutex but is sufficient for |
19 | /// many use cases where there are only two possible owners of a resource. The |
20 | /// implementation of `BiLock` can be more optimized for just the two possible |
21 | /// owners. |
22 | /// |
23 | /// Note that it's possible to use this lock through a poll-style interface with |
24 | /// the `poll_lock` method but you can also use it as a future with the `lock` |
25 | /// method that consumes a `BiLock` and returns a future that will resolve when |
26 | /// it's locked. |
27 | /// |
28 | /// A `BiLock` is typically used for "split" operations where data which serves |
29 | /// two purposes wants to be split into two to be worked with separately. For |
30 | /// example a TCP stream could be both a reader and a writer or a framing layer |
31 | /// could be both a stream and a sink for messages. A `BiLock` enables splitting |
32 | /// these two and then using each independently in a futures-powered fashion. |
33 | /// |
34 | /// This type is only available when the `bilock` feature of this |
35 | /// library is activated. |
36 | #[derive (Debug)] |
37 | #[cfg_attr (docsrs, doc(cfg(feature = "bilock" )))] |
38 | pub struct BiLock<T> { |
39 | arc: Arc<Inner<T>>, |
40 | } |
41 | |
42 | #[derive (Debug)] |
43 | struct Inner<T> { |
44 | state: AtomicPtr<Waker>, |
45 | value: Option<UnsafeCell<T>>, |
46 | } |
47 | |
48 | unsafe impl<T: Send> Send for Inner<T> {} |
49 | unsafe impl<T: Send> Sync for Inner<T> {} |
50 | |
51 | impl<T> BiLock<T> { |
52 | /// Creates a new `BiLock` protecting the provided data. |
53 | /// |
54 | /// Two handles to the lock are returned, and these are the only two handles |
55 | /// that will ever be available to the lock. These can then be sent to separate |
56 | /// tasks to be managed there. |
57 | /// |
58 | /// The data behind the bilock is considered to be pinned, which allows `Pin` |
59 | /// references to locked data. However, this means that the locked value |
60 | /// will only be available through `Pin<&mut T>` (not `&mut T`) unless `T` is `Unpin`. |
61 | /// Similarly, reuniting the lock and extracting the inner value is only |
62 | /// possible when `T` is `Unpin`. |
63 | pub fn new(t: T) -> (Self, Self) { |
64 | let arc = Arc::new(Inner { |
65 | state: AtomicPtr::new(ptr::null_mut()), |
66 | value: Some(UnsafeCell::new(t)), |
67 | }); |
68 | |
69 | (Self { arc: arc.clone() }, Self { arc }) |
70 | } |
71 | |
72 | /// Attempt to acquire this lock, returning `Pending` if it can't be |
73 | /// acquired. |
74 | /// |
75 | /// This function will acquire the lock in a nonblocking fashion, returning |
76 | /// immediately if the lock is already held. If the lock is successfully |
77 | /// acquired then `Poll::Ready` is returned with a value that represents |
78 | /// the locked value (and can be used to access the protected data). The |
79 | /// lock is unlocked when the returned `BiLockGuard` is dropped. |
80 | /// |
81 | /// If the lock is already held then this function will return |
82 | /// `Poll::Pending`. In this case the current task will also be scheduled |
83 | /// to receive a notification when the lock would otherwise become |
84 | /// available. |
85 | /// |
86 | /// # Panics |
87 | /// |
88 | /// This function will panic if called outside the context of a future's |
89 | /// task. |
90 | pub fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<BiLockGuard<'_, T>> { |
91 | let mut waker = None; |
92 | loop { |
93 | let n = self.arc.state.swap(invalid_ptr(1), SeqCst); |
94 | match n as usize { |
95 | // Woohoo, we grabbed the lock! |
96 | 0 => return Poll::Ready(BiLockGuard { bilock: self }), |
97 | |
98 | // Oops, someone else has locked the lock |
99 | 1 => {} |
100 | |
101 | // A task was previously blocked on this lock, likely our task, |
102 | // so we need to update that task. |
103 | _ => unsafe { |
104 | let mut prev = Box::from_raw(n); |
105 | *prev = cx.waker().clone(); |
106 | waker = Some(prev); |
107 | }, |
108 | } |
109 | |
110 | // type ascription for safety's sake! |
111 | let me: Box<Waker> = waker.take().unwrap_or_else(|| Box::new(cx.waker().clone())); |
112 | let me = Box::into_raw(me); |
113 | |
114 | match self.arc.state.compare_exchange(invalid_ptr(1), me, SeqCst, SeqCst) { |
115 | // The lock is still locked, but we've now parked ourselves, so |
116 | // just report that we're scheduled to receive a notification. |
117 | Ok(_) => return Poll::Pending, |
118 | |
119 | // Oops, looks like the lock was unlocked after our swap above |
120 | // and before the compare_exchange. Deallocate what we just |
121 | // allocated and go through the loop again. |
122 | Err(n) if n.is_null() => unsafe { |
123 | waker = Some(Box::from_raw(me)); |
124 | }, |
125 | |
126 | // The top of this loop set the previous state to 1, so if we |
127 | // failed the CAS above then it's because the previous value was |
128 | // *not* zero or one. This indicates that a task was blocked, |
129 | // but we're trying to acquire the lock and there's only one |
130 | // other reference of the lock, so it should be impossible for |
131 | // that task to ever block itself. |
132 | Err(n) => panic!("invalid state: {}" , n as usize), |
133 | } |
134 | } |
135 | } |
136 | |
137 | /// Perform a "blocking lock" of this lock, consuming this lock handle and |
138 | /// returning a future to the acquired lock. |
139 | /// |
140 | /// This function consumes the `BiLock<T>` and returns a sentinel future, |
141 | /// `BiLockAcquire<T>`. The returned future will resolve to |
142 | /// `BiLockAcquired<T>` which represents a locked lock similarly to |
143 | /// `BiLockGuard<T>`. |
144 | /// |
145 | /// Note that the returned future will never resolve to an error. |
146 | #[cfg (feature = "bilock" )] |
147 | #[cfg_attr (docsrs, doc(cfg(feature = "bilock" )))] |
148 | pub fn lock(&self) -> BiLockAcquire<'_, T> { |
149 | BiLockAcquire { bilock: self } |
150 | } |
151 | |
152 | /// Attempts to put the two "halves" of a `BiLock<T>` back together and |
153 | /// recover the original value. Succeeds only if the two `BiLock<T>`s |
154 | /// originated from the same call to `BiLock::new`. |
155 | pub fn reunite(self, other: Self) -> Result<T, ReuniteError<T>> |
156 | where |
157 | T: Unpin, |
158 | { |
159 | if Arc::ptr_eq(&self.arc, &other.arc) { |
160 | drop(other); |
161 | let inner = Arc::try_unwrap(self.arc) |
162 | .ok() |
163 | .expect("futures: try_unwrap failed in BiLock<T>::reunite" ); |
164 | Ok(unsafe { inner.into_value() }) |
165 | } else { |
166 | Err(ReuniteError(self, other)) |
167 | } |
168 | } |
169 | |
170 | fn unlock(&self) { |
171 | let n = self.arc.state.swap(ptr::null_mut(), SeqCst); |
172 | match n as usize { |
173 | // we've locked the lock, shouldn't be possible for us to see an |
174 | // unlocked lock. |
175 | 0 => panic!("invalid unlocked state" ), |
176 | |
177 | // Ok, no one else tried to get the lock, we're done. |
178 | 1 => {} |
179 | |
180 | // Another task has parked themselves on this lock, let's wake them |
181 | // up as its now their turn. |
182 | _ => unsafe { |
183 | Box::from_raw(n).wake(); |
184 | }, |
185 | } |
186 | } |
187 | } |
188 | |
189 | impl<T: Unpin> Inner<T> { |
190 | unsafe fn into_value(mut self) -> T { |
191 | self.value.take().unwrap().into_inner() |
192 | } |
193 | } |
194 | |
195 | impl<T> Drop for Inner<T> { |
196 | fn drop(&mut self) { |
197 | assert!(self.state.load(SeqCst).is_null()); |
198 | } |
199 | } |
200 | |
201 | /// Error indicating two `BiLock<T>`s were not two halves of a whole, and |
202 | /// thus could not be `reunite`d. |
203 | #[cfg_attr (docsrs, doc(cfg(feature = "bilock" )))] |
204 | pub struct ReuniteError<T>(pub BiLock<T>, pub BiLock<T>); |
205 | |
206 | impl<T> fmt::Debug for ReuniteError<T> { |
207 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
208 | f.debug_tuple(name:"ReuniteError" ).field(&"..." ).finish() |
209 | } |
210 | } |
211 | |
212 | impl<T> fmt::Display for ReuniteError<T> { |
213 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
214 | write!(f, "tried to reunite two BiLocks that don't form a pair" ) |
215 | } |
216 | } |
217 | |
218 | #[cfg (feature = "std" )] |
219 | impl<T: core::any::Any> std::error::Error for ReuniteError<T> {} |
220 | |
221 | /// Returned RAII guard from the `poll_lock` method. |
222 | /// |
223 | /// This structure acts as a sentinel to the data in the `BiLock<T>` itself, |
224 | /// implementing `Deref` and `DerefMut` to `T`. When dropped, the lock will be |
225 | /// unlocked. |
226 | #[derive (Debug)] |
227 | #[cfg_attr (docsrs, doc(cfg(feature = "bilock" )))] |
228 | pub struct BiLockGuard<'a, T> { |
229 | bilock: &'a BiLock<T>, |
230 | } |
231 | |
232 | // We allow parallel access to T via Deref, so Sync bound is also needed here. |
233 | unsafe impl<T: Send + Sync> Sync for BiLockGuard<'_, T> {} |
234 | |
235 | impl<T> Deref for BiLockGuard<'_, T> { |
236 | type Target = T; |
237 | fn deref(&self) -> &T { |
238 | unsafe { &*self.bilock.arc.value.as_ref().unwrap().get() } |
239 | } |
240 | } |
241 | |
242 | impl<T: Unpin> DerefMut for BiLockGuard<'_, T> { |
243 | fn deref_mut(&mut self) -> &mut T { |
244 | unsafe { &mut *self.bilock.arc.value.as_ref().unwrap().get() } |
245 | } |
246 | } |
247 | |
248 | impl<T> BiLockGuard<'_, T> { |
249 | /// Get a mutable pinned reference to the locked value. |
250 | pub fn as_pin_mut(&mut self) -> Pin<&mut T> { |
251 | // Safety: we never allow moving a !Unpin value out of a bilock, nor |
252 | // allow mutable access to it |
253 | unsafe { Pin::new_unchecked(&mut *self.bilock.arc.value.as_ref().unwrap().get()) } |
254 | } |
255 | } |
256 | |
257 | impl<T> Drop for BiLockGuard<'_, T> { |
258 | fn drop(&mut self) { |
259 | self.bilock.unlock(); |
260 | } |
261 | } |
262 | |
263 | /// Future returned by `BiLock::lock` which will resolve when the lock is |
264 | /// acquired. |
265 | #[cfg (feature = "bilock" )] |
266 | #[cfg_attr (docsrs, doc(cfg(feature = "bilock" )))] |
267 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
268 | #[derive (Debug)] |
269 | pub struct BiLockAcquire<'a, T> { |
270 | bilock: &'a BiLock<T>, |
271 | } |
272 | |
273 | // Pinning is never projected to fields |
274 | #[cfg (feature = "bilock" )] |
275 | impl<T> Unpin for BiLockAcquire<'_, T> {} |
276 | |
277 | #[cfg (feature = "bilock" )] |
278 | impl<'a, T> Future for BiLockAcquire<'a, T> { |
279 | type Output = BiLockGuard<'a, T>; |
280 | |
281 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
282 | self.bilock.poll_lock(cx) |
283 | } |
284 | } |
285 | |
286 | // Based on core::ptr::invalid_mut. Equivalent to `addr as *mut T`, but is strict-provenance compatible. |
287 | #[allow (clippy::useless_transmute)] |
288 | #[inline ] |
289 | fn invalid_ptr<T>(addr: usize) -> *mut T { |
290 | // SAFETY: every valid integer is also a valid pointer (as long as you don't dereference that |
291 | // pointer). |
292 | unsafe { core::mem::transmute(src:addr) } |
293 | } |
294 | |