1 | //! Raw, unsafe reader-writer locking implementation, |
2 | //! doesn't depend on the data protected by the lock. |
3 | //! [`RwLock`](super::RwLock) is implemented in terms of this. |
4 | //! |
5 | //! Splitting the implementation this way allows instantiating |
6 | //! the locking code only once, and also lets us make |
7 | //! [`RwLockReadGuard`](super::RwLockReadGuard) covariant in `T`. |
8 | |
9 | use core::marker::PhantomPinned; |
10 | use core::mem::forget; |
11 | use core::pin::Pin; |
12 | use core::task::Poll; |
13 | |
14 | use crate::sync::atomic::{AtomicUsize, Ordering}; |
15 | |
16 | use event_listener::{Event, EventListener}; |
17 | use event_listener_strategy::{EventListenerFuture, Strategy}; |
18 | |
19 | use crate::futures::Lock; |
20 | use crate::Mutex; |
21 | |
22 | const WRITER_BIT: usize = 1; |
23 | const ONE_READER: usize = 2; |
24 | |
25 | /// A "raw" RwLock that doesn't hold any data. |
26 | pub(super) struct RawRwLock { |
27 | /// Acquired by the writer. |
28 | mutex: Mutex<()>, |
29 | |
30 | /// Event triggered when the last reader is dropped. |
31 | no_readers: Event, |
32 | |
33 | /// Event triggered when the writer is dropped. |
34 | no_writer: Event, |
35 | |
36 | /// Current state of the lock. |
37 | /// |
38 | /// The least significant bit (`WRITER_BIT`) is set to 1 when a writer is holding the lock or |
39 | /// trying to acquire it. |
40 | /// |
41 | /// The upper bits contain the number of currently active readers. Each active reader |
42 | /// increments the state by `ONE_READER`. |
43 | state: AtomicUsize, |
44 | } |
45 | |
46 | impl RawRwLock { |
47 | const_fn! { |
48 | const_if: #[cfg(not(loom))]; |
49 | #[inline ] |
50 | pub(super) const fn new() -> Self { |
51 | RawRwLock { |
52 | mutex: Mutex::new(()), |
53 | no_readers: Event::new(), |
54 | no_writer: Event::new(), |
55 | state: AtomicUsize::new(0), |
56 | } |
57 | } |
58 | } |
59 | |
60 | /// Returns `true` iff a read lock was successfully acquired. |
61 | pub(super) fn try_read(&self) -> bool { |
62 | let mut state = self.state.load(Ordering::Acquire); |
63 | |
64 | loop { |
65 | // If there's a writer holding the lock or attempting to acquire it, we cannot acquire |
66 | // a read lock here. |
67 | if state & WRITER_BIT != 0 { |
68 | return false; |
69 | } |
70 | |
71 | // Make sure the number of readers doesn't overflow. |
72 | if state > core::isize::MAX as usize { |
73 | crate::abort(); |
74 | } |
75 | |
76 | // Increment the number of readers. |
77 | match self.state.compare_exchange( |
78 | state, |
79 | state + ONE_READER, |
80 | Ordering::AcqRel, |
81 | Ordering::Acquire, |
82 | ) { |
83 | Ok(_) => return true, |
84 | Err(s) => state = s, |
85 | } |
86 | } |
87 | } |
88 | |
89 | #[inline ] |
90 | pub(super) fn read(&self) -> RawRead<'_> { |
91 | RawRead { |
92 | lock: self, |
93 | state: self.state.load(Ordering::Acquire), |
94 | listener: None, |
95 | _pin: PhantomPinned, |
96 | } |
97 | } |
98 | |
99 | /// Returns `true` iff an upgradable read lock was successfully acquired. |
100 | |
101 | pub(super) fn try_upgradable_read(&self) -> bool { |
102 | // First try grabbing the mutex. |
103 | let lock = if let Some(lock) = self.mutex.try_lock() { |
104 | lock |
105 | } else { |
106 | return false; |
107 | }; |
108 | |
109 | forget(lock); |
110 | |
111 | let mut state = self.state.load(Ordering::Acquire); |
112 | |
113 | // Make sure the number of readers doesn't overflow. |
114 | if state > core::isize::MAX as usize { |
115 | crate::abort(); |
116 | } |
117 | |
118 | // Increment the number of readers. |
119 | loop { |
120 | match self.state.compare_exchange( |
121 | state, |
122 | state + ONE_READER, |
123 | Ordering::AcqRel, |
124 | Ordering::Acquire, |
125 | ) { |
126 | Ok(_) => return true, |
127 | Err(s) => state = s, |
128 | } |
129 | } |
130 | } |
131 | |
132 | #[inline ] |
133 | |
134 | pub(super) fn upgradable_read(&self) -> RawUpgradableRead<'_> { |
135 | RawUpgradableRead { |
136 | lock: self, |
137 | acquire: self.mutex.lock(), |
138 | } |
139 | } |
140 | |
141 | /// Returs `true` iff a write lock was successfully acquired. |
142 | |
143 | pub(super) fn try_write(&self) -> bool { |
144 | // First try grabbing the mutex. |
145 | let lock = if let Some(lock) = self.mutex.try_lock() { |
146 | lock |
147 | } else { |
148 | return false; |
149 | }; |
150 | |
151 | // If there are no readers, grab the write lock. |
152 | if self |
153 | .state |
154 | .compare_exchange(0, WRITER_BIT, Ordering::AcqRel, Ordering::Acquire) |
155 | .is_ok() |
156 | { |
157 | forget(lock); |
158 | true |
159 | } else { |
160 | drop(lock); |
161 | false |
162 | } |
163 | } |
164 | |
165 | #[inline ] |
166 | |
167 | pub(super) fn write(&self) -> RawWrite<'_> { |
168 | RawWrite { |
169 | lock: self, |
170 | no_readers: None, |
171 | state: WriteState::Acquiring { |
172 | lock: self.mutex.lock(), |
173 | }, |
174 | } |
175 | } |
176 | |
177 | /// Returns `true` iff a the upgradable read lock was successfully upgraded to a write lock. |
178 | /// |
179 | /// # Safety |
180 | /// |
181 | /// Caller must hold an upgradable read lock. |
182 | /// This will attempt to upgrade it to a write lock. |
183 | |
184 | pub(super) unsafe fn try_upgrade(&self) -> bool { |
185 | self.state |
186 | .compare_exchange(ONE_READER, WRITER_BIT, Ordering::AcqRel, Ordering::Acquire) |
187 | .is_ok() |
188 | } |
189 | |
190 | /// # Safety |
191 | /// |
192 | /// Caller must hold an upgradable read lock. |
193 | /// This will upgrade it to a write lock. |
194 | |
195 | pub(super) unsafe fn upgrade(&self) -> RawUpgrade<'_> { |
196 | // Set `WRITER_BIT` and decrement the number of readers at the same time. |
197 | self.state |
198 | .fetch_sub(ONE_READER - WRITER_BIT, Ordering::SeqCst); |
199 | |
200 | RawUpgrade { |
201 | lock: Some(self), |
202 | listener: None, |
203 | _pin: PhantomPinned, |
204 | } |
205 | } |
206 | |
207 | /// # Safety |
208 | /// |
209 | /// Caller must hold an upgradable read lock. |
210 | /// This will downgrade it to a stadard read lock. |
211 | #[inline ] |
212 | |
213 | pub(super) unsafe fn downgrade_upgradable_read(&self) { |
214 | self.mutex.unlock_unchecked(); |
215 | } |
216 | |
217 | /// # Safety |
218 | /// |
219 | /// Caller must hold a write lock. |
220 | /// This will downgrade it to a read lock. |
221 | |
222 | pub(super) unsafe fn downgrade_write(&self) { |
223 | // Atomically downgrade state. |
224 | self.state |
225 | .fetch_add(ONE_READER - WRITER_BIT, Ordering::SeqCst); |
226 | |
227 | // Release the writer mutex. |
228 | self.mutex.unlock_unchecked(); |
229 | |
230 | // Trigger the "no writer" event. |
231 | self.no_writer.notify(1); |
232 | } |
233 | |
234 | /// # Safety |
235 | /// |
236 | /// Caller must hold a write lock. |
237 | /// This will downgrade it to an upgradable read lock. |
238 | |
239 | pub(super) unsafe fn downgrade_to_upgradable(&self) { |
240 | // Atomically downgrade state. |
241 | self.state |
242 | .fetch_add(ONE_READER - WRITER_BIT, Ordering::SeqCst); |
243 | } |
244 | |
245 | /// # Safety |
246 | /// |
247 | /// Caller must hold a read lock . |
248 | /// This will unlock that lock. |
249 | |
250 | pub(super) unsafe fn read_unlock(&self) { |
251 | // Decrement the number of readers. |
252 | if self.state.fetch_sub(ONE_READER, Ordering::SeqCst) & !WRITER_BIT == ONE_READER { |
253 | // If this was the last reader, trigger the "no readers" event. |
254 | self.no_readers.notify(1); |
255 | } |
256 | } |
257 | |
258 | /// # Safety |
259 | /// |
260 | /// Caller must hold an upgradable read lock. |
261 | /// This will unlock that lock. |
262 | |
263 | pub(super) unsafe fn upgradable_read_unlock(&self) { |
264 | // Decrement the number of readers. |
265 | if self.state.fetch_sub(ONE_READER, Ordering::SeqCst) & !WRITER_BIT == ONE_READER { |
266 | // If this was the last reader, trigger the "no readers" event. |
267 | self.no_readers.notify(1); |
268 | } |
269 | |
270 | // SAFETY: upgradable read guards acquire the writer mutex upon creation. |
271 | self.mutex.unlock_unchecked(); |
272 | } |
273 | |
274 | /// # Safety |
275 | /// |
276 | /// Caller must hold a write lock. |
277 | /// This will unlock that lock. |
278 | |
279 | pub(super) unsafe fn write_unlock(&self) { |
280 | // Unset `WRITER_BIT`. |
281 | self.state.fetch_and(!WRITER_BIT, Ordering::SeqCst); |
282 | // Trigger the "no writer" event. |
283 | self.no_writer.notify(1); |
284 | |
285 | // Release the writer lock. |
286 | // SAFETY: `RwLockWriteGuard` always holds a lock on writer mutex. |
287 | self.mutex.unlock_unchecked(); |
288 | } |
289 | } |
290 | |
291 | pin_project_lite::pin_project! { |
292 | /// The future returned by [`RawRwLock::read`]. |
293 | |
294 | pub(super) struct RawRead<'a> { |
295 | // The lock that is being acquired. |
296 | pub(super) lock: &'a RawRwLock, |
297 | |
298 | // The last-observed state of the lock. |
299 | state: usize, |
300 | |
301 | // The listener for the "no writers" event. |
302 | listener: Option<EventListener>, |
303 | |
304 | // Making this type `!Unpin` enables future optimizations. |
305 | #[pin] |
306 | _pin: PhantomPinned |
307 | } |
308 | } |
309 | |
310 | impl<'a> EventListenerFuture for RawRead<'a> { |
311 | type Output = (); |
312 | |
313 | fn poll_with_strategy<'x, S: Strategy<'x>>( |
314 | self: Pin<&mut Self>, |
315 | strategy: &mut S, |
316 | cx: &mut S::Context, |
317 | ) -> Poll<()> { |
318 | let this = self.project(); |
319 | |
320 | loop { |
321 | if *this.state & WRITER_BIT == 0 { |
322 | // Make sure the number of readers doesn't overflow. |
323 | if *this.state > core::isize::MAX as usize { |
324 | crate::abort(); |
325 | } |
326 | |
327 | // If nobody is holding a write lock or attempting to acquire it, increment the |
328 | // number of readers. |
329 | match this.lock.state.compare_exchange( |
330 | *this.state, |
331 | *this.state + ONE_READER, |
332 | Ordering::AcqRel, |
333 | Ordering::Acquire, |
334 | ) { |
335 | Ok(_) => return Poll::Ready(()), |
336 | Err(s) => *this.state = s, |
337 | } |
338 | } else { |
339 | // Start listening for "no writer" events. |
340 | let load_ordering = if this.listener.is_none() { |
341 | *this.listener = Some(this.lock.no_writer.listen()); |
342 | |
343 | // Make sure there really is no writer. |
344 | Ordering::SeqCst |
345 | } else { |
346 | // Wait for the writer to finish. |
347 | ready!(strategy.poll(this.listener, cx)); |
348 | |
349 | // Notify the next reader waiting in list. |
350 | this.lock.no_writer.notify(1); |
351 | |
352 | // Check the state again. |
353 | Ordering::Acquire |
354 | }; |
355 | |
356 | // Reload the state. |
357 | *this.state = this.lock.state.load(load_ordering); |
358 | } |
359 | } |
360 | } |
361 | } |
362 | |
363 | pin_project_lite::pin_project! { |
364 | /// The future returned by [`RawRwLock::upgradable_read`]. |
365 | pub(super) struct RawUpgradableRead<'a> { |
366 | // The lock that is being acquired. |
367 | pub(super) lock: &'a RawRwLock, |
368 | |
369 | // The mutex we are trying to acquire. |
370 | #[pin] |
371 | acquire: Lock<'a, ()>, |
372 | } |
373 | } |
374 | |
375 | impl<'a> EventListenerFuture for RawUpgradableRead<'a> { |
376 | type Output = (); |
377 | |
378 | fn poll_with_strategy<'x, S: Strategy<'x>>( |
379 | self: Pin<&mut Self>, |
380 | strategy: &mut S, |
381 | cx: &mut S::Context, |
382 | ) -> Poll<()> { |
383 | let this = self.project(); |
384 | |
385 | // Acquire the mutex. |
386 | let mutex_guard = ready!(this.acquire.poll_with_strategy(strategy, cx)); |
387 | forget(mutex_guard); |
388 | |
389 | // Load the current state. |
390 | let mut state = this.lock.state.load(Ordering::Acquire); |
391 | |
392 | // Make sure the number of readers doesn't overflow. |
393 | if state > core::isize::MAX as usize { |
394 | crate::abort(); |
395 | } |
396 | |
397 | // Increment the number of readers. |
398 | loop { |
399 | match this.lock.state.compare_exchange( |
400 | state, |
401 | state + ONE_READER, |
402 | Ordering::AcqRel, |
403 | Ordering::Acquire, |
404 | ) { |
405 | Ok(_) => { |
406 | return Poll::Ready(()); |
407 | } |
408 | Err(s) => state = s, |
409 | } |
410 | } |
411 | } |
412 | } |
413 | |
414 | pin_project_lite::pin_project! { |
415 | /// The future returned by [`RawRwLock::write`]. |
416 | |
417 | pub(super) struct RawWrite<'a> { |
418 | // The lock that is being acquired. |
419 | pub(super) lock: &'a RawRwLock, |
420 | |
421 | // Our listener for the "no readers" event. |
422 | no_readers: Option<EventListener>, |
423 | |
424 | // Current state fof this future. |
425 | #[pin] |
426 | state: WriteState<'a>, |
427 | } |
428 | |
429 | impl PinnedDrop for RawWrite<'_> { |
430 | fn drop(this: Pin<&mut Self>) { |
431 | let this = this.project(); |
432 | |
433 | if matches!(this.state.project(), WriteStateProj::WaitingReaders) { |
434 | // Safety: we hold a write lock, more or less. |
435 | unsafe { |
436 | this.lock.write_unlock(); |
437 | } |
438 | } |
439 | } |
440 | } |
441 | } |
442 | |
443 | pin_project_lite::pin_project! { |
444 | #[project = WriteStateProj] |
445 | #[project_replace = WriteStateProjReplace] |
446 | enum WriteState<'a> { |
447 | // We are currently acquiring the inner mutex. |
448 | Acquiring { #[pin] lock: Lock<'a, ()> }, |
449 | |
450 | // We are currently waiting for readers to finish. |
451 | WaitingReaders, |
452 | |
453 | // The future has completed. |
454 | Acquired, |
455 | } |
456 | } |
457 | |
458 | impl<'a> EventListenerFuture for RawWrite<'a> { |
459 | type Output = (); |
460 | |
461 | fn poll_with_strategy<'x, S: Strategy<'x>>( |
462 | self: Pin<&mut Self>, |
463 | strategy: &mut S, |
464 | cx: &mut S::Context, |
465 | ) -> Poll<()> { |
466 | let mut this = self.project(); |
467 | |
468 | loop { |
469 | match this.state.as_mut().project() { |
470 | WriteStateProj::Acquiring { lock } => { |
471 | // First grab the mutex. |
472 | let mutex_guard = ready!(lock.poll_with_strategy(strategy, cx)); |
473 | forget(mutex_guard); |
474 | |
475 | // Set `WRITER_BIT` and create a guard that unsets it in case this future is canceled. |
476 | let new_state = this.lock.state.fetch_or(WRITER_BIT, Ordering::SeqCst); |
477 | |
478 | // If we just acquired the lock, return. |
479 | if new_state == WRITER_BIT { |
480 | this.state.as_mut().set(WriteState::Acquired); |
481 | return Poll::Ready(()); |
482 | } |
483 | |
484 | // Start waiting for the readers to finish. |
485 | *this.no_readers = Some(this.lock.no_readers.listen()); |
486 | this.state.as_mut().set(WriteState::WaitingReaders); |
487 | } |
488 | |
489 | WriteStateProj::WaitingReaders => { |
490 | let load_ordering = if this.no_readers.is_some() { |
491 | Ordering::Acquire |
492 | } else { |
493 | Ordering::SeqCst |
494 | }; |
495 | |
496 | // Check the state again. |
497 | if this.lock.state.load(load_ordering) == WRITER_BIT { |
498 | // We are the only ones holding the lock, return `Ready`. |
499 | this.state.as_mut().set(WriteState::Acquired); |
500 | return Poll::Ready(()); |
501 | } |
502 | |
503 | // Wait for the readers to finish. |
504 | if this.no_readers.is_none() { |
505 | // Register a listener. |
506 | *this.no_readers = Some(this.lock.no_readers.listen()); |
507 | } else { |
508 | // Wait for the readers to finish. |
509 | ready!(strategy.poll(this.no_readers, cx)); |
510 | }; |
511 | } |
512 | WriteStateProj::Acquired => panic!("Write lock already acquired" ), |
513 | } |
514 | } |
515 | } |
516 | } |
517 | |
518 | pin_project_lite::pin_project! { |
519 | /// The future returned by [`RawRwLock::upgrade`]. |
520 | |
521 | pub(super) struct RawUpgrade<'a> { |
522 | lock: Option<&'a RawRwLock>, |
523 | |
524 | // The event listener we are waiting on. |
525 | listener: Option<EventListener>, |
526 | |
527 | // Keeping this future `!Unpin` enables future optimizations. |
528 | #[pin] |
529 | _pin: PhantomPinned |
530 | } |
531 | |
532 | impl PinnedDrop for RawUpgrade<'_> { |
533 | fn drop(this: Pin<&mut Self>) { |
534 | let this = this.project(); |
535 | if let Some(lock) = this.lock { |
536 | // SAFETY: we are dropping the future that would give us a write lock, |
537 | // so we don't need said lock anymore. |
538 | unsafe { |
539 | lock.write_unlock(); |
540 | } |
541 | } |
542 | } |
543 | } |
544 | } |
545 | |
546 | impl<'a> EventListenerFuture for RawUpgrade<'a> { |
547 | type Output = &'a RawRwLock; |
548 | |
549 | fn poll_with_strategy<'x, S: Strategy<'x>>( |
550 | self: Pin<&mut Self>, |
551 | strategy: &mut S, |
552 | cx: &mut S::Context, |
553 | ) -> Poll<&'a RawRwLock> { |
554 | let this = self.project(); |
555 | let lock = this.lock.expect("cannot poll future after completion" ); |
556 | |
557 | // If there are readers, we need to wait for them to finish. |
558 | loop { |
559 | let load_ordering = if this.listener.is_some() { |
560 | Ordering::Acquire |
561 | } else { |
562 | Ordering::SeqCst |
563 | }; |
564 | |
565 | // See if the number of readers is zero. |
566 | let state = lock.state.load(load_ordering); |
567 | if state == WRITER_BIT { |
568 | break; |
569 | } |
570 | |
571 | // If there are readers, wait for them to finish. |
572 | if this.listener.is_none() { |
573 | // Start listening for "no readers" events. |
574 | *this.listener = Some(lock.no_readers.listen()); |
575 | } else { |
576 | // Wait for the readers to finish. |
577 | ready!(strategy.poll(this.listener, cx)); |
578 | }; |
579 | } |
580 | |
581 | // We are done. |
582 | Poll::Ready(this.lock.take().unwrap()) |
583 | } |
584 | } |
585 | |
586 | impl<'a> RawUpgrade<'a> { |
587 | /// Whether the future returned `Poll::Ready(..)` at some point. |
588 | #[inline ] |
589 | pub(super) fn is_ready(&self) -> bool { |
590 | self.lock.is_none() |
591 | } |
592 | } |
593 | |