1//! Raw, unsafe reader-writer locking implementation,
2//! doesn't depend on the data protected by the lock.
3//! [`RwLock`](super::RwLock) is implemented in terms of this.
4//!
5//! Splitting the implementation this way allows instantiating
6//! the locking code only once, and also lets us make
7//! [`RwLockReadGuard`](super::RwLockReadGuard) covariant in `T`.
8
9use std::future::Future;
10use std::mem::forget;
11use std::pin::Pin;
12use std::process;
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::task::{Context, Poll};
15
16use event_listener::{Event, EventListener};
17
18use crate::futures::Lock;
19use crate::Mutex;
20
21const WRITER_BIT: usize = 1;
22const ONE_READER: usize = 2;
23
24/// A "raw" RwLock that doesn't hold any data.
25pub(super) struct RawRwLock {
26 /// Acquired by the writer.
27 mutex: Mutex<()>,
28
29 /// Event triggered when the last reader is dropped.
30 no_readers: Event,
31
32 /// Event triggered when the writer is dropped.
33 no_writer: Event,
34
35 /// Current state of the lock.
36 ///
37 /// The least significant bit (`WRITER_BIT`) is set to 1 when a writer is holding the lock or
38 /// trying to acquire it.
39 ///
40 /// The upper bits contain the number of currently active readers. Each active reader
41 /// increments the state by `ONE_READER`.
42 state: AtomicUsize,
43}
44
45impl RawRwLock {
46 #[inline]
47 pub(super) const fn new() -> Self {
48 RawRwLock {
49 mutex: Mutex::new(()),
50 no_readers: Event::new(),
51 no_writer: Event::new(),
52 state: AtomicUsize::new(0),
53 }
54 }
55
56 /// Returns `true` iff a read lock was successfully acquired.
57
58 pub(super) fn try_read(&self) -> bool {
59 let mut state = self.state.load(Ordering::Acquire);
60
61 loop {
62 // If there's a writer holding the lock or attempting to acquire it, we cannot acquire
63 // a read lock here.
64 if state & WRITER_BIT != 0 {
65 return false;
66 }
67
68 // Make sure the number of readers doesn't overflow.
69 if state > std::isize::MAX as usize {
70 process::abort();
71 }
72
73 // Increment the number of readers.
74 match self.state.compare_exchange(
75 state,
76 state + ONE_READER,
77 Ordering::AcqRel,
78 Ordering::Acquire,
79 ) {
80 Ok(_) => return true,
81 Err(s) => state = s,
82 }
83 }
84 }
85
86 #[inline]
87 pub(super) fn read(&self) -> RawRead<'_> {
88 RawRead {
89 lock: self,
90 state: self.state.load(Ordering::Acquire),
91 listener: None,
92 }
93 }
94
95 /// Returns `true` iff an upgradable read lock was successfully acquired.
96
97 pub(super) fn try_upgradable_read(&self) -> bool {
98 // First try grabbing the mutex.
99 let lock = if let Some(lock) = self.mutex.try_lock() {
100 lock
101 } else {
102 return false;
103 };
104
105 forget(lock);
106
107 let mut state = self.state.load(Ordering::Acquire);
108
109 // Make sure the number of readers doesn't overflow.
110 if state > std::isize::MAX as usize {
111 process::abort();
112 }
113
114 // Increment the number of readers.
115 loop {
116 match self.state.compare_exchange(
117 state,
118 state + ONE_READER,
119 Ordering::AcqRel,
120 Ordering::Acquire,
121 ) {
122 Ok(_) => return true,
123 Err(s) => state = s,
124 }
125 }
126 }
127
128 #[inline]
129
130 pub(super) fn upgradable_read(&self) -> RawUpgradableRead<'_> {
131 RawUpgradableRead {
132 lock: self,
133 acquire: self.mutex.lock(),
134 }
135 }
136
137 /// Returs `true` iff a write lock was successfully acquired.
138
139 pub(super) fn try_write(&self) -> bool {
140 // First try grabbing the mutex.
141 let lock = if let Some(lock) = self.mutex.try_lock() {
142 lock
143 } else {
144 return false;
145 };
146
147 // If there are no readers, grab the write lock.
148 if self
149 .state
150 .compare_exchange(0, WRITER_BIT, Ordering::AcqRel, Ordering::Acquire)
151 .is_ok()
152 {
153 forget(lock);
154 true
155 } else {
156 drop(lock);
157 false
158 }
159 }
160
161 #[inline]
162
163 pub(super) fn write(&self) -> RawWrite<'_> {
164 RawWrite {
165 lock: self,
166 state: WriteState::Acquiring(self.mutex.lock()),
167 }
168 }
169
170 /// Returns `true` iff a the upgradable read lock was successfully upgraded to a write lock.
171 ///
172 /// # Safety
173 ///
174 /// Caller must hold an upgradable read lock.
175 /// This will attempt to upgrade it to a write lock.
176
177 pub(super) unsafe fn try_upgrade(&self) -> bool {
178 self.state
179 .compare_exchange(ONE_READER, WRITER_BIT, Ordering::AcqRel, Ordering::Acquire)
180 .is_ok()
181 }
182
183 /// # Safety
184 ///
185 /// Caller must hold an upgradable read lock.
186 /// This will upgrade it to a write lock.
187
188 pub(super) unsafe fn upgrade(&self) -> RawUpgrade<'_> {
189 // Set `WRITER_BIT` and decrement the number of readers at the same time.
190 self.state
191 .fetch_sub(ONE_READER - WRITER_BIT, Ordering::SeqCst);
192
193 RawUpgrade {
194 lock: Some(self),
195 listener: None,
196 }
197 }
198
199 /// # Safety
200 ///
201 /// Caller must hold an upgradable read lock.
202 /// This will downgrade it to a stadard read lock.
203 #[inline]
204
205 pub(super) unsafe fn downgrade_upgradable_read(&self) {
206 self.mutex.unlock_unchecked();
207 }
208
209 /// # Safety
210 ///
211 /// Caller must hold a write lock.
212 /// This will downgrade it to a read lock.
213
214 pub(super) unsafe fn downgrade_write(&self) {
215 // Atomically downgrade state.
216 self.state
217 .fetch_add(ONE_READER - WRITER_BIT, Ordering::SeqCst);
218
219 // Release the writer mutex.
220 self.mutex.unlock_unchecked();
221
222 // Trigger the "no writer" event.
223 self.no_writer.notify(1);
224 }
225
226 /// # Safety
227 ///
228 /// Caller must hold a write lock.
229 /// This will downgrade it to an upgradable read lock.
230
231 pub(super) unsafe fn downgrade_to_upgradable(&self) {
232 // Atomically downgrade state.
233 self.state
234 .fetch_add(ONE_READER - WRITER_BIT, Ordering::SeqCst);
235 }
236
237 /// # Safety
238 ///
239 /// Caller must hold a read lock .
240 /// This will unlock that lock.
241
242 pub(super) unsafe fn read_unlock(&self) {
243 // Decrement the number of readers.
244 if self.state.fetch_sub(ONE_READER, Ordering::SeqCst) & !WRITER_BIT == ONE_READER {
245 // If this was the last reader, trigger the "no readers" event.
246 self.no_readers.notify(1);
247 }
248 }
249
250 /// # Safety
251 ///
252 /// Caller must hold an upgradable read lock.
253 /// This will unlock that lock.
254
255 pub(super) unsafe fn upgradable_read_unlock(&self) {
256 // Decrement the number of readers.
257 if self.state.fetch_sub(ONE_READER, Ordering::SeqCst) & !WRITER_BIT == ONE_READER {
258 // If this was the last reader, trigger the "no readers" event.
259 self.no_readers.notify(1);
260 }
261
262 // SAFETY: upgradable read guards acquire the writer mutex upon creation.
263 self.mutex.unlock_unchecked();
264 }
265
266 /// # Safety
267 ///
268 /// Caller must hold a write lock.
269 /// This will unlock that lock.
270
271 pub(super) unsafe fn write_unlock(&self) {
272 // Unset `WRITER_BIT`.
273 self.state.fetch_and(!WRITER_BIT, Ordering::SeqCst);
274 // Trigger the "no writer" event.
275 self.no_writer.notify(1);
276
277 // Release the writer lock.
278 // SAFETY: `RwLockWriteGuard` always holds a lock on writer mutex.
279 self.mutex.unlock_unchecked();
280 }
281}
282
283/// The future returned by [`RawRwLock::read`].
284
285pub(super) struct RawRead<'a> {
286 /// The lock that is being acquired.
287 pub(super) lock: &'a RawRwLock,
288
289 /// The last-observed state of the lock.
290 state: usize,
291
292 /// The listener for the "no writers" event.
293 listener: Option<EventListener>,
294}
295
296impl<'a> Future for RawRead<'a> {
297 type Output = ();
298
299 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
300 let this = self.get_mut();
301
302 loop {
303 if this.state & WRITER_BIT == 0 {
304 // Make sure the number of readers doesn't overflow.
305 if this.state > std::isize::MAX as usize {
306 process::abort();
307 }
308
309 // If nobody is holding a write lock or attempting to acquire it, increment the
310 // number of readers.
311 match this.lock.state.compare_exchange(
312 this.state,
313 this.state + ONE_READER,
314 Ordering::AcqRel,
315 Ordering::Acquire,
316 ) {
317 Ok(_) => return Poll::Ready(()),
318 Err(s) => this.state = s,
319 }
320 } else {
321 // Start listening for "no writer" events.
322 let load_ordering = match &mut this.listener {
323 None => {
324 this.listener = Some(this.lock.no_writer.listen());
325
326 // Make sure there really is no writer.
327 Ordering::SeqCst
328 }
329
330 Some(ref mut listener) => {
331 // Wait for the writer to finish.
332 ready!(Pin::new(listener).poll(cx));
333 this.listener = None;
334
335 // Notify the next reader waiting in list.
336 this.lock.no_writer.notify(1);
337
338 // Check the state again.
339 Ordering::Acquire
340 }
341 };
342
343 // Reload the state.
344 this.state = this.lock.state.load(load_ordering);
345 }
346 }
347 }
348}
349
350/// The future returned by [`RawRwLock::upgradable_read`].
351
352pub(super) struct RawUpgradableRead<'a> {
353 /// The lock that is being acquired.
354 pub(super) lock: &'a RawRwLock,
355
356 /// The mutex we are trying to acquire.
357 acquire: Lock<'a, ()>,
358}
359
360impl<'a> Future for RawUpgradableRead<'a> {
361 type Output = ();
362
363 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
364 let this = self.get_mut();
365
366 // Acquire the mutex.
367 let mutex_guard = ready!(Pin::new(&mut this.acquire).poll(cx));
368 forget(mutex_guard);
369
370 let mut state = this.lock.state.load(Ordering::Acquire);
371
372 // Make sure the number of readers doesn't overflow.
373 if state > std::isize::MAX as usize {
374 process::abort();
375 }
376
377 // Increment the number of readers.
378 loop {
379 match this.lock.state.compare_exchange(
380 state,
381 state + ONE_READER,
382 Ordering::AcqRel,
383 Ordering::Acquire,
384 ) {
385 Ok(_) => {
386 return Poll::Ready(());
387 }
388 Err(s) => state = s,
389 }
390 }
391 }
392}
393
394/// The future returned by [`RawRwLock::write`].
395
396pub(super) struct RawWrite<'a> {
397 /// The lock that is being acquired.
398 pub(super) lock: &'a RawRwLock,
399
400 /// Current state fof this future.
401 state: WriteState<'a>,
402}
403
404enum WriteState<'a> {
405 /// We are currently acquiring the inner mutex.
406 Acquiring(Lock<'a, ()>),
407
408 /// We are currently waiting for readers to finish.
409 WaitingReaders {
410 /// The listener for the "no readers" event.
411 listener: Option<EventListener>,
412 },
413
414 /// The future has completed.
415 Acquired,
416}
417
418impl<'a> Future for RawWrite<'a> {
419 type Output = ();
420
421 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
422 let this = self.get_mut();
423
424 loop {
425 match &mut this.state {
426 WriteState::Acquiring(lock) => {
427 // First grab the mutex.
428 let mutex_guard = ready!(Pin::new(lock).poll(cx));
429 forget(mutex_guard);
430
431 // Set `WRITER_BIT` and create a guard that unsets it in case this future is canceled.
432 let new_state = this.lock.state.fetch_or(WRITER_BIT, Ordering::SeqCst);
433
434 // If we just acquired the lock, return.
435 if new_state == WRITER_BIT {
436 this.state = WriteState::Acquired;
437 return Poll::Ready(());
438 }
439
440 // Start waiting for the readers to finish.
441 this.state = WriteState::WaitingReaders {
442 listener: Some(this.lock.no_readers.listen()),
443 };
444 }
445
446 WriteState::WaitingReaders { ref mut listener } => {
447 let load_ordering = if listener.is_some() {
448 Ordering::Acquire
449 } else {
450 Ordering::SeqCst
451 };
452
453 // Check the state again.
454 if this.lock.state.load(load_ordering) == WRITER_BIT {
455 // We are the only ones holding the lock, return `Ready`.
456 this.state = WriteState::Acquired;
457 return Poll::Ready(());
458 }
459
460 // Wait for the readers to finish.
461 match listener {
462 None => {
463 // Register a listener.
464 *listener = Some(this.lock.no_readers.listen());
465 }
466
467 Some(ref mut evl) => {
468 // Wait for the readers to finish.
469 ready!(Pin::new(evl).poll(cx));
470 *listener = None;
471 }
472 };
473 }
474 WriteState::Acquired => panic!("Write lock already acquired"),
475 }
476 }
477 }
478}
479
480impl<'a> Drop for RawWrite<'a> {
481 fn drop(&mut self) {
482 if matches!(self.state, WriteState::WaitingReaders { .. }) {
483 // Safety: we hold a write lock, more or less.
484 unsafe {
485 self.lock.write_unlock();
486 }
487 }
488 }
489}
490
491/// The future returned by [`RawRwLock::upgrade`].
492
493pub(super) struct RawUpgrade<'a> {
494 lock: Option<&'a RawRwLock>,
495
496 /// The event listener we are waiting on.
497 listener: Option<EventListener>,
498}
499
500impl<'a> Future for RawUpgrade<'a> {
501 type Output = &'a RawRwLock;
502
503 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<&'a RawRwLock> {
504 let this = self.get_mut();
505 let lock = this.lock.expect("cannot poll future after completion");
506
507 // If there are readers, we need to wait for them to finish.
508 loop {
509 let load_ordering = if this.listener.is_some() {
510 Ordering::Acquire
511 } else {
512 Ordering::SeqCst
513 };
514
515 // See if the number of readers is zero.
516 let state = lock.state.load(load_ordering);
517 if state == WRITER_BIT {
518 break;
519 }
520
521 // If there are readers, wait for them to finish.
522 match &mut this.listener {
523 None => {
524 // Start listening for "no readers" events.
525 this.listener = Some(lock.no_readers.listen());
526 }
527
528 Some(ref mut listener) => {
529 // Wait for the readers to finish.
530 ready!(Pin::new(listener).poll(cx));
531 this.listener = None;
532 }
533 }
534 }
535
536 // We are done.
537 Poll::Ready(this.lock.take().unwrap())
538 }
539}
540
541impl<'a> Drop for RawUpgrade<'a> {
542 #[inline]
543 fn drop(&mut self) {
544 if let Some(lock: &RawRwLock) = self.lock {
545 // SAFETY: we are dropping the future that would give us a write lock,
546 // so we don't need said lock anymore.
547 unsafe {
548 lock.write_unlock();
549 }
550 }
551 }
552}
553
554impl<'a> RawUpgrade<'a> {
555 /// Whether the future returned `Poll::Ready(..)` at some point.
556 #[inline]
557 pub(super) fn is_ready(&self) -> bool {
558 self.lock.is_none()
559 }
560}
561