1//! Raw, unsafe reader-writer locking implementation,
2//! doesn't depend on the data protected by the lock.
3//! [`RwLock`](super::RwLock) is implemented in terms of this.
4//!
5//! Splitting the implementation this way allows instantiating
6//! the locking code only once, and also lets us make
7//! [`RwLockReadGuard`](super::RwLockReadGuard) covariant in `T`.
8
9use core::mem::forget;
10use core::pin::Pin;
11use core::sync::atomic::{AtomicUsize, Ordering};
12use core::task::Poll;
13
14use event_listener::{Event, EventListener};
15use event_listener_strategy::{EventListenerFuture, Strategy};
16
17use crate::futures::Lock;
18use crate::Mutex;
19
20const WRITER_BIT: usize = 1;
21const ONE_READER: usize = 2;
22
23/// A "raw" RwLock that doesn't hold any data.
24pub(super) struct RawRwLock {
25 /// Acquired by the writer.
26 mutex: Mutex<()>,
27
28 /// Event triggered when the last reader is dropped.
29 no_readers: Event,
30
31 /// Event triggered when the writer is dropped.
32 no_writer: Event,
33
34 /// Current state of the lock.
35 ///
36 /// The least significant bit (`WRITER_BIT`) is set to 1 when a writer is holding the lock or
37 /// trying to acquire it.
38 ///
39 /// The upper bits contain the number of currently active readers. Each active reader
40 /// increments the state by `ONE_READER`.
41 state: AtomicUsize,
42}
43
44impl RawRwLock {
45 #[inline]
46 pub(super) const fn new() -> Self {
47 RawRwLock {
48 mutex: Mutex::new(()),
49 no_readers: Event::new(),
50 no_writer: Event::new(),
51 state: AtomicUsize::new(0),
52 }
53 }
54
55 /// Returns `true` iff a read lock was successfully acquired.
56 pub(super) fn try_read(&self) -> bool {
57 let mut state = self.state.load(Ordering::Acquire);
58
59 loop {
60 // If there's a writer holding the lock or attempting to acquire it, we cannot acquire
61 // a read lock here.
62 if state & WRITER_BIT != 0 {
63 return false;
64 }
65
66 // Make sure the number of readers doesn't overflow.
67 if state > core::isize::MAX as usize {
68 crate::abort();
69 }
70
71 // Increment the number of readers.
72 match self.state.compare_exchange(
73 state,
74 state + ONE_READER,
75 Ordering::AcqRel,
76 Ordering::Acquire,
77 ) {
78 Ok(_) => return true,
79 Err(s) => state = s,
80 }
81 }
82 }
83
84 #[inline]
85 pub(super) fn read(&self) -> RawRead<'_> {
86 RawRead {
87 lock: self,
88 state: self.state.load(Ordering::Acquire),
89 listener: EventListener::new(),
90 }
91 }
92
93 /// Returns `true` iff an upgradable read lock was successfully acquired.
94
95 pub(super) fn try_upgradable_read(&self) -> bool {
96 // First try grabbing the mutex.
97 let lock = if let Some(lock) = self.mutex.try_lock() {
98 lock
99 } else {
100 return false;
101 };
102
103 forget(lock);
104
105 let mut state = self.state.load(Ordering::Acquire);
106
107 // Make sure the number of readers doesn't overflow.
108 if state > core::isize::MAX as usize {
109 crate::abort();
110 }
111
112 // Increment the number of readers.
113 loop {
114 match self.state.compare_exchange(
115 state,
116 state + ONE_READER,
117 Ordering::AcqRel,
118 Ordering::Acquire,
119 ) {
120 Ok(_) => return true,
121 Err(s) => state = s,
122 }
123 }
124 }
125
126 #[inline]
127
128 pub(super) fn upgradable_read(&self) -> RawUpgradableRead<'_> {
129 RawUpgradableRead {
130 lock: self,
131 acquire: self.mutex.lock(),
132 }
133 }
134
135 /// Returs `true` iff a write lock was successfully acquired.
136
137 pub(super) fn try_write(&self) -> bool {
138 // First try grabbing the mutex.
139 let lock = if let Some(lock) = self.mutex.try_lock() {
140 lock
141 } else {
142 return false;
143 };
144
145 // If there are no readers, grab the write lock.
146 if self
147 .state
148 .compare_exchange(0, WRITER_BIT, Ordering::AcqRel, Ordering::Acquire)
149 .is_ok()
150 {
151 forget(lock);
152 true
153 } else {
154 drop(lock);
155 false
156 }
157 }
158
159 #[inline]
160
161 pub(super) fn write(&self) -> RawWrite<'_> {
162 RawWrite {
163 lock: self,
164 no_readers: EventListener::new(),
165 state: WriteState::Acquiring {
166 lock: self.mutex.lock(),
167 },
168 }
169 }
170
171 /// Returns `true` iff a the upgradable read lock was successfully upgraded to a write lock.
172 ///
173 /// # Safety
174 ///
175 /// Caller must hold an upgradable read lock.
176 /// This will attempt to upgrade it to a write lock.
177
178 pub(super) unsafe fn try_upgrade(&self) -> bool {
179 self.state
180 .compare_exchange(ONE_READER, WRITER_BIT, Ordering::AcqRel, Ordering::Acquire)
181 .is_ok()
182 }
183
184 /// # Safety
185 ///
186 /// Caller must hold an upgradable read lock.
187 /// This will upgrade it to a write lock.
188
189 pub(super) unsafe fn upgrade(&self) -> RawUpgrade<'_> {
190 // Set `WRITER_BIT` and decrement the number of readers at the same time.
191 self.state
192 .fetch_sub(ONE_READER - WRITER_BIT, Ordering::SeqCst);
193
194 RawUpgrade {
195 lock: Some(self),
196 listener: EventListener::new(),
197 }
198 }
199
200 /// # Safety
201 ///
202 /// Caller must hold an upgradable read lock.
203 /// This will downgrade it to a stadard read lock.
204 #[inline]
205
206 pub(super) unsafe fn downgrade_upgradable_read(&self) {
207 self.mutex.unlock_unchecked();
208 }
209
210 /// # Safety
211 ///
212 /// Caller must hold a write lock.
213 /// This will downgrade it to a read lock.
214
215 pub(super) unsafe fn downgrade_write(&self) {
216 // Atomically downgrade state.
217 self.state
218 .fetch_add(ONE_READER - WRITER_BIT, Ordering::SeqCst);
219
220 // Release the writer mutex.
221 self.mutex.unlock_unchecked();
222
223 // Trigger the "no writer" event.
224 self.no_writer.notify(1);
225 }
226
227 /// # Safety
228 ///
229 /// Caller must hold a write lock.
230 /// This will downgrade it to an upgradable read lock.
231
232 pub(super) unsafe fn downgrade_to_upgradable(&self) {
233 // Atomically downgrade state.
234 self.state
235 .fetch_add(ONE_READER - WRITER_BIT, Ordering::SeqCst);
236 }
237
238 /// # Safety
239 ///
240 /// Caller must hold a read lock .
241 /// This will unlock that lock.
242
243 pub(super) unsafe fn read_unlock(&self) {
244 // Decrement the number of readers.
245 if self.state.fetch_sub(ONE_READER, Ordering::SeqCst) & !WRITER_BIT == ONE_READER {
246 // If this was the last reader, trigger the "no readers" event.
247 self.no_readers.notify(1);
248 }
249 }
250
251 /// # Safety
252 ///
253 /// Caller must hold an upgradable read lock.
254 /// This will unlock that lock.
255
256 pub(super) unsafe fn upgradable_read_unlock(&self) {
257 // Decrement the number of readers.
258 if self.state.fetch_sub(ONE_READER, Ordering::SeqCst) & !WRITER_BIT == ONE_READER {
259 // If this was the last reader, trigger the "no readers" event.
260 self.no_readers.notify(1);
261 }
262
263 // SAFETY: upgradable read guards acquire the writer mutex upon creation.
264 self.mutex.unlock_unchecked();
265 }
266
267 /// # Safety
268 ///
269 /// Caller must hold a write lock.
270 /// This will unlock that lock.
271
272 pub(super) unsafe fn write_unlock(&self) {
273 // Unset `WRITER_BIT`.
274 self.state.fetch_and(!WRITER_BIT, Ordering::SeqCst);
275 // Trigger the "no writer" event.
276 self.no_writer.notify(1);
277
278 // Release the writer lock.
279 // SAFETY: `RwLockWriteGuard` always holds a lock on writer mutex.
280 self.mutex.unlock_unchecked();
281 }
282}
283
284pin_project_lite::pin_project! {
285 /// The future returned by [`RawRwLock::read`].
286
287 pub(super) struct RawRead<'a> {
288 // The lock that is being acquired.
289 pub(super) lock: &'a RawRwLock,
290
291 // The last-observed state of the lock.
292 state: usize,
293
294 // The listener for the "no writers" event.
295 #[pin]
296 listener: EventListener,
297 }
298}
299
300impl<'a> EventListenerFuture for RawRead<'a> {
301 type Output = ();
302
303 fn poll_with_strategy<'x, S: Strategy<'x>>(
304 self: Pin<&mut Self>,
305 strategy: &mut S,
306 cx: &mut S::Context,
307 ) -> Poll<()> {
308 let mut this = self.project();
309
310 loop {
311 if *this.state & WRITER_BIT == 0 {
312 // Make sure the number of readers doesn't overflow.
313 if *this.state > core::isize::MAX as usize {
314 crate::abort();
315 }
316
317 // If nobody is holding a write lock or attempting to acquire it, increment the
318 // number of readers.
319 match this.lock.state.compare_exchange(
320 *this.state,
321 *this.state + ONE_READER,
322 Ordering::AcqRel,
323 Ordering::Acquire,
324 ) {
325 Ok(_) => return Poll::Ready(()),
326 Err(s) => *this.state = s,
327 }
328 } else {
329 // Start listening for "no writer" events.
330 let load_ordering = if !this.listener.is_listening() {
331 this.listener.as_mut().listen(&this.lock.no_writer);
332
333 // Make sure there really is no writer.
334 Ordering::SeqCst
335 } else {
336 // Wait for the writer to finish.
337 ready!(strategy.poll(this.listener.as_mut(), cx));
338
339 // Notify the next reader waiting in list.
340 this.lock.no_writer.notify(1);
341
342 // Check the state again.
343 Ordering::Acquire
344 };
345
346 // Reload the state.
347 *this.state = this.lock.state.load(load_ordering);
348 }
349 }
350 }
351}
352
353pin_project_lite::pin_project! {
354 /// The future returned by [`RawRwLock::upgradable_read`].
355 pub(super) struct RawUpgradableRead<'a> {
356 // The lock that is being acquired.
357 pub(super) lock: &'a RawRwLock,
358
359 // The mutex we are trying to acquire.
360 #[pin]
361 acquire: Lock<'a, ()>,
362 }
363}
364
365impl<'a> EventListenerFuture for RawUpgradableRead<'a> {
366 type Output = ();
367
368 fn poll_with_strategy<'x, S: Strategy<'x>>(
369 self: Pin<&mut Self>,
370 strategy: &mut S,
371 cx: &mut S::Context,
372 ) -> Poll<()> {
373 let this = self.project();
374
375 // Acquire the mutex.
376 let mutex_guard = ready!(this.acquire.poll_with_strategy(strategy, cx));
377 forget(mutex_guard);
378
379 // Load the current state.
380 let mut state = this.lock.state.load(Ordering::Acquire);
381
382 // Make sure the number of readers doesn't overflow.
383 if state > core::isize::MAX as usize {
384 crate::abort();
385 }
386
387 // Increment the number of readers.
388 loop {
389 match this.lock.state.compare_exchange(
390 state,
391 state + ONE_READER,
392 Ordering::AcqRel,
393 Ordering::Acquire,
394 ) {
395 Ok(_) => {
396 return Poll::Ready(());
397 }
398 Err(s) => state = s,
399 }
400 }
401 }
402}
403
404pin_project_lite::pin_project! {
405 /// The future returned by [`RawRwLock::write`].
406
407 pub(super) struct RawWrite<'a> {
408 // The lock that is being acquired.
409 pub(super) lock: &'a RawRwLock,
410
411 // Our listener for the "no readers" event.
412 #[pin]
413 no_readers: EventListener,
414
415 // Current state fof this future.
416 #[pin]
417 state: WriteState<'a>,
418 }
419
420 impl PinnedDrop for RawWrite<'_> {
421 fn drop(this: Pin<&mut Self>) {
422 let this = this.project();
423
424 if matches!(this.state.project(), WriteStateProj::WaitingReaders) {
425 // Safety: we hold a write lock, more or less.
426 unsafe {
427 this.lock.write_unlock();
428 }
429 }
430 }
431 }
432}
433
434pin_project_lite::pin_project! {
435 #[project = WriteStateProj]
436 #[project_replace = WriteStateProjReplace]
437 enum WriteState<'a> {
438 // We are currently acquiring the inner mutex.
439 Acquiring { #[pin] lock: Lock<'a, ()> },
440
441 // We are currently waiting for readers to finish.
442 WaitingReaders,
443
444 // The future has completed.
445 Acquired,
446 }
447}
448
449impl<'a> EventListenerFuture for RawWrite<'a> {
450 type Output = ();
451
452 fn poll_with_strategy<'x, S: Strategy<'x>>(
453 self: Pin<&mut Self>,
454 strategy: &mut S,
455 cx: &mut S::Context,
456 ) -> Poll<()> {
457 let mut this = self.project();
458
459 loop {
460 match this.state.as_mut().project() {
461 WriteStateProj::Acquiring { lock } => {
462 // First grab the mutex.
463 let mutex_guard = ready!(lock.poll_with_strategy(strategy, cx));
464 forget(mutex_guard);
465
466 // Set `WRITER_BIT` and create a guard that unsets it in case this future is canceled.
467 let new_state = this.lock.state.fetch_or(WRITER_BIT, Ordering::SeqCst);
468
469 // If we just acquired the lock, return.
470 if new_state == WRITER_BIT {
471 this.state.as_mut().set(WriteState::Acquired);
472 return Poll::Ready(());
473 }
474
475 // Start waiting for the readers to finish.
476 this.no_readers.as_mut().listen(&this.lock.no_readers);
477 this.state.as_mut().set(WriteState::WaitingReaders);
478 }
479
480 WriteStateProj::WaitingReaders => {
481 let load_ordering = if this.no_readers.is_listening() {
482 Ordering::Acquire
483 } else {
484 Ordering::SeqCst
485 };
486
487 // Check the state again.
488 if this.lock.state.load(load_ordering) == WRITER_BIT {
489 // We are the only ones holding the lock, return `Ready`.
490 this.state.as_mut().set(WriteState::Acquired);
491 return Poll::Ready(());
492 }
493
494 // Wait for the readers to finish.
495 if !this.no_readers.is_listening() {
496 // Register a listener.
497 this.no_readers.as_mut().listen(&this.lock.no_readers);
498 } else {
499 // Wait for the readers to finish.
500 ready!(strategy.poll(this.no_readers.as_mut(), cx));
501 };
502 }
503 WriteStateProj::Acquired => panic!("Write lock already acquired"),
504 }
505 }
506 }
507}
508
509pin_project_lite::pin_project! {
510 /// The future returned by [`RawRwLock::upgrade`].
511
512 pub(super) struct RawUpgrade<'a> {
513 lock: Option<&'a RawRwLock>,
514
515 // The event listener we are waiting on.
516 #[pin]
517 listener: EventListener,
518 }
519
520 impl PinnedDrop for RawUpgrade<'_> {
521 fn drop(this: Pin<&mut Self>) {
522 let this = this.project();
523 if let Some(lock) = this.lock {
524 // SAFETY: we are dropping the future that would give us a write lock,
525 // so we don't need said lock anymore.
526 unsafe {
527 lock.write_unlock();
528 }
529 }
530 }
531 }
532}
533
534impl<'a> EventListenerFuture for RawUpgrade<'a> {
535 type Output = &'a RawRwLock;
536
537 fn poll_with_strategy<'x, S: Strategy<'x>>(
538 self: Pin<&mut Self>,
539 strategy: &mut S,
540 cx: &mut S::Context,
541 ) -> Poll<&'a RawRwLock> {
542 let mut this = self.project();
543 let lock = this.lock.expect("cannot poll future after completion");
544
545 // If there are readers, we need to wait for them to finish.
546 loop {
547 let load_ordering = if this.listener.is_listening() {
548 Ordering::Acquire
549 } else {
550 Ordering::SeqCst
551 };
552
553 // See if the number of readers is zero.
554 let state = lock.state.load(load_ordering);
555 if state == WRITER_BIT {
556 break;
557 }
558
559 // If there are readers, wait for them to finish.
560 if !this.listener.is_listening() {
561 // Start listening for "no readers" events.
562 this.listener.as_mut().listen(&lock.no_readers);
563 } else {
564 // Wait for the readers to finish.
565 ready!(strategy.poll(this.listener.as_mut(), cx));
566 };
567 }
568
569 // We are done.
570 Poll::Ready(this.lock.take().unwrap())
571 }
572}
573
574impl<'a> RawUpgrade<'a> {
575 /// Whether the future returned `Poll::Ready(..)` at some point.
576 #[inline]
577 pub(super) fn is_ready(&self) -> bool {
578 self.lock.is_none()
579 }
580}
581