1 | //! Raw, unsafe reader-writer locking implementation, |
2 | //! doesn't depend on the data protected by the lock. |
3 | //! [`RwLock`](super::RwLock) is implemented in terms of this. |
4 | //! |
5 | //! Splitting the implementation this way allows instantiating |
6 | //! the locking code only once, and also lets us make |
7 | //! [`RwLockReadGuard`](super::RwLockReadGuard) covariant in `T`. |
8 | |
9 | use core::mem::forget; |
10 | use core::pin::Pin; |
11 | use core::sync::atomic::{AtomicUsize, Ordering}; |
12 | use core::task::Poll; |
13 | |
14 | use event_listener::{Event, EventListener}; |
15 | use event_listener_strategy::{EventListenerFuture, Strategy}; |
16 | |
17 | use crate::futures::Lock; |
18 | use crate::Mutex; |
19 | |
20 | const WRITER_BIT: usize = 1; |
21 | const ONE_READER: usize = 2; |
22 | |
23 | /// A "raw" RwLock that doesn't hold any data. |
24 | pub(super) struct RawRwLock { |
25 | /// Acquired by the writer. |
26 | mutex: Mutex<()>, |
27 | |
28 | /// Event triggered when the last reader is dropped. |
29 | no_readers: Event, |
30 | |
31 | /// Event triggered when the writer is dropped. |
32 | no_writer: Event, |
33 | |
34 | /// Current state of the lock. |
35 | /// |
36 | /// The least significant bit (`WRITER_BIT`) is set to 1 when a writer is holding the lock or |
37 | /// trying to acquire it. |
38 | /// |
39 | /// The upper bits contain the number of currently active readers. Each active reader |
40 | /// increments the state by `ONE_READER`. |
41 | state: AtomicUsize, |
42 | } |
43 | |
44 | impl RawRwLock { |
45 | #[inline ] |
46 | pub(super) const fn new() -> Self { |
47 | RawRwLock { |
48 | mutex: Mutex::new(()), |
49 | no_readers: Event::new(), |
50 | no_writer: Event::new(), |
51 | state: AtomicUsize::new(0), |
52 | } |
53 | } |
54 | |
55 | /// Returns `true` iff a read lock was successfully acquired. |
56 | pub(super) fn try_read(&self) -> bool { |
57 | let mut state = self.state.load(Ordering::Acquire); |
58 | |
59 | loop { |
60 | // If there's a writer holding the lock or attempting to acquire it, we cannot acquire |
61 | // a read lock here. |
62 | if state & WRITER_BIT != 0 { |
63 | return false; |
64 | } |
65 | |
66 | // Make sure the number of readers doesn't overflow. |
67 | if state > core::isize::MAX as usize { |
68 | crate::abort(); |
69 | } |
70 | |
71 | // Increment the number of readers. |
72 | match self.state.compare_exchange( |
73 | state, |
74 | state + ONE_READER, |
75 | Ordering::AcqRel, |
76 | Ordering::Acquire, |
77 | ) { |
78 | Ok(_) => return true, |
79 | Err(s) => state = s, |
80 | } |
81 | } |
82 | } |
83 | |
84 | #[inline ] |
85 | pub(super) fn read(&self) -> RawRead<'_> { |
86 | RawRead { |
87 | lock: self, |
88 | state: self.state.load(Ordering::Acquire), |
89 | listener: EventListener::new(), |
90 | } |
91 | } |
92 | |
93 | /// Returns `true` iff an upgradable read lock was successfully acquired. |
94 | |
95 | pub(super) fn try_upgradable_read(&self) -> bool { |
96 | // First try grabbing the mutex. |
97 | let lock = if let Some(lock) = self.mutex.try_lock() { |
98 | lock |
99 | } else { |
100 | return false; |
101 | }; |
102 | |
103 | forget(lock); |
104 | |
105 | let mut state = self.state.load(Ordering::Acquire); |
106 | |
107 | // Make sure the number of readers doesn't overflow. |
108 | if state > core::isize::MAX as usize { |
109 | crate::abort(); |
110 | } |
111 | |
112 | // Increment the number of readers. |
113 | loop { |
114 | match self.state.compare_exchange( |
115 | state, |
116 | state + ONE_READER, |
117 | Ordering::AcqRel, |
118 | Ordering::Acquire, |
119 | ) { |
120 | Ok(_) => return true, |
121 | Err(s) => state = s, |
122 | } |
123 | } |
124 | } |
125 | |
126 | #[inline ] |
127 | |
128 | pub(super) fn upgradable_read(&self) -> RawUpgradableRead<'_> { |
129 | RawUpgradableRead { |
130 | lock: self, |
131 | acquire: self.mutex.lock(), |
132 | } |
133 | } |
134 | |
135 | /// Returs `true` iff a write lock was successfully acquired. |
136 | |
137 | pub(super) fn try_write(&self) -> bool { |
138 | // First try grabbing the mutex. |
139 | let lock = if let Some(lock) = self.mutex.try_lock() { |
140 | lock |
141 | } else { |
142 | return false; |
143 | }; |
144 | |
145 | // If there are no readers, grab the write lock. |
146 | if self |
147 | .state |
148 | .compare_exchange(0, WRITER_BIT, Ordering::AcqRel, Ordering::Acquire) |
149 | .is_ok() |
150 | { |
151 | forget(lock); |
152 | true |
153 | } else { |
154 | drop(lock); |
155 | false |
156 | } |
157 | } |
158 | |
159 | #[inline ] |
160 | |
161 | pub(super) fn write(&self) -> RawWrite<'_> { |
162 | RawWrite { |
163 | lock: self, |
164 | no_readers: EventListener::new(), |
165 | state: WriteState::Acquiring { |
166 | lock: self.mutex.lock(), |
167 | }, |
168 | } |
169 | } |
170 | |
171 | /// Returns `true` iff a the upgradable read lock was successfully upgraded to a write lock. |
172 | /// |
173 | /// # Safety |
174 | /// |
175 | /// Caller must hold an upgradable read lock. |
176 | /// This will attempt to upgrade it to a write lock. |
177 | |
178 | pub(super) unsafe fn try_upgrade(&self) -> bool { |
179 | self.state |
180 | .compare_exchange(ONE_READER, WRITER_BIT, Ordering::AcqRel, Ordering::Acquire) |
181 | .is_ok() |
182 | } |
183 | |
184 | /// # Safety |
185 | /// |
186 | /// Caller must hold an upgradable read lock. |
187 | /// This will upgrade it to a write lock. |
188 | |
189 | pub(super) unsafe fn upgrade(&self) -> RawUpgrade<'_> { |
190 | // Set `WRITER_BIT` and decrement the number of readers at the same time. |
191 | self.state |
192 | .fetch_sub(ONE_READER - WRITER_BIT, Ordering::SeqCst); |
193 | |
194 | RawUpgrade { |
195 | lock: Some(self), |
196 | listener: EventListener::new(), |
197 | } |
198 | } |
199 | |
200 | /// # Safety |
201 | /// |
202 | /// Caller must hold an upgradable read lock. |
203 | /// This will downgrade it to a stadard read lock. |
204 | #[inline ] |
205 | |
206 | pub(super) unsafe fn downgrade_upgradable_read(&self) { |
207 | self.mutex.unlock_unchecked(); |
208 | } |
209 | |
210 | /// # Safety |
211 | /// |
212 | /// Caller must hold a write lock. |
213 | /// This will downgrade it to a read lock. |
214 | |
215 | pub(super) unsafe fn downgrade_write(&self) { |
216 | // Atomically downgrade state. |
217 | self.state |
218 | .fetch_add(ONE_READER - WRITER_BIT, Ordering::SeqCst); |
219 | |
220 | // Release the writer mutex. |
221 | self.mutex.unlock_unchecked(); |
222 | |
223 | // Trigger the "no writer" event. |
224 | self.no_writer.notify(1); |
225 | } |
226 | |
227 | /// # Safety |
228 | /// |
229 | /// Caller must hold a write lock. |
230 | /// This will downgrade it to an upgradable read lock. |
231 | |
232 | pub(super) unsafe fn downgrade_to_upgradable(&self) { |
233 | // Atomically downgrade state. |
234 | self.state |
235 | .fetch_add(ONE_READER - WRITER_BIT, Ordering::SeqCst); |
236 | } |
237 | |
238 | /// # Safety |
239 | /// |
240 | /// Caller must hold a read lock . |
241 | /// This will unlock that lock. |
242 | |
243 | pub(super) unsafe fn read_unlock(&self) { |
244 | // Decrement the number of readers. |
245 | if self.state.fetch_sub(ONE_READER, Ordering::SeqCst) & !WRITER_BIT == ONE_READER { |
246 | // If this was the last reader, trigger the "no readers" event. |
247 | self.no_readers.notify(1); |
248 | } |
249 | } |
250 | |
251 | /// # Safety |
252 | /// |
253 | /// Caller must hold an upgradable read lock. |
254 | /// This will unlock that lock. |
255 | |
256 | pub(super) unsafe fn upgradable_read_unlock(&self) { |
257 | // Decrement the number of readers. |
258 | if self.state.fetch_sub(ONE_READER, Ordering::SeqCst) & !WRITER_BIT == ONE_READER { |
259 | // If this was the last reader, trigger the "no readers" event. |
260 | self.no_readers.notify(1); |
261 | } |
262 | |
263 | // SAFETY: upgradable read guards acquire the writer mutex upon creation. |
264 | self.mutex.unlock_unchecked(); |
265 | } |
266 | |
267 | /// # Safety |
268 | /// |
269 | /// Caller must hold a write lock. |
270 | /// This will unlock that lock. |
271 | |
272 | pub(super) unsafe fn write_unlock(&self) { |
273 | // Unset `WRITER_BIT`. |
274 | self.state.fetch_and(!WRITER_BIT, Ordering::SeqCst); |
275 | // Trigger the "no writer" event. |
276 | self.no_writer.notify(1); |
277 | |
278 | // Release the writer lock. |
279 | // SAFETY: `RwLockWriteGuard` always holds a lock on writer mutex. |
280 | self.mutex.unlock_unchecked(); |
281 | } |
282 | } |
283 | |
284 | pin_project_lite::pin_project! { |
285 | /// The future returned by [`RawRwLock::read`]. |
286 | |
287 | pub(super) struct RawRead<'a> { |
288 | // The lock that is being acquired. |
289 | pub(super) lock: &'a RawRwLock, |
290 | |
291 | // The last-observed state of the lock. |
292 | state: usize, |
293 | |
294 | // The listener for the "no writers" event. |
295 | #[pin] |
296 | listener: EventListener, |
297 | } |
298 | } |
299 | |
300 | impl<'a> EventListenerFuture for RawRead<'a> { |
301 | type Output = (); |
302 | |
303 | fn poll_with_strategy<'x, S: Strategy<'x>>( |
304 | self: Pin<&mut Self>, |
305 | strategy: &mut S, |
306 | cx: &mut S::Context, |
307 | ) -> Poll<()> { |
308 | let mut this = self.project(); |
309 | |
310 | loop { |
311 | if *this.state & WRITER_BIT == 0 { |
312 | // Make sure the number of readers doesn't overflow. |
313 | if *this.state > core::isize::MAX as usize { |
314 | crate::abort(); |
315 | } |
316 | |
317 | // If nobody is holding a write lock or attempting to acquire it, increment the |
318 | // number of readers. |
319 | match this.lock.state.compare_exchange( |
320 | *this.state, |
321 | *this.state + ONE_READER, |
322 | Ordering::AcqRel, |
323 | Ordering::Acquire, |
324 | ) { |
325 | Ok(_) => return Poll::Ready(()), |
326 | Err(s) => *this.state = s, |
327 | } |
328 | } else { |
329 | // Start listening for "no writer" events. |
330 | let load_ordering = if !this.listener.is_listening() { |
331 | this.listener.as_mut().listen(&this.lock.no_writer); |
332 | |
333 | // Make sure there really is no writer. |
334 | Ordering::SeqCst |
335 | } else { |
336 | // Wait for the writer to finish. |
337 | ready!(strategy.poll(this.listener.as_mut(), cx)); |
338 | |
339 | // Notify the next reader waiting in list. |
340 | this.lock.no_writer.notify(1); |
341 | |
342 | // Check the state again. |
343 | Ordering::Acquire |
344 | }; |
345 | |
346 | // Reload the state. |
347 | *this.state = this.lock.state.load(load_ordering); |
348 | } |
349 | } |
350 | } |
351 | } |
352 | |
353 | pin_project_lite::pin_project! { |
354 | /// The future returned by [`RawRwLock::upgradable_read`]. |
355 | pub(super) struct RawUpgradableRead<'a> { |
356 | // The lock that is being acquired. |
357 | pub(super) lock: &'a RawRwLock, |
358 | |
359 | // The mutex we are trying to acquire. |
360 | #[pin] |
361 | acquire: Lock<'a, ()>, |
362 | } |
363 | } |
364 | |
365 | impl<'a> EventListenerFuture for RawUpgradableRead<'a> { |
366 | type Output = (); |
367 | |
368 | fn poll_with_strategy<'x, S: Strategy<'x>>( |
369 | self: Pin<&mut Self>, |
370 | strategy: &mut S, |
371 | cx: &mut S::Context, |
372 | ) -> Poll<()> { |
373 | let this = self.project(); |
374 | |
375 | // Acquire the mutex. |
376 | let mutex_guard = ready!(this.acquire.poll_with_strategy(strategy, cx)); |
377 | forget(mutex_guard); |
378 | |
379 | // Load the current state. |
380 | let mut state = this.lock.state.load(Ordering::Acquire); |
381 | |
382 | // Make sure the number of readers doesn't overflow. |
383 | if state > core::isize::MAX as usize { |
384 | crate::abort(); |
385 | } |
386 | |
387 | // Increment the number of readers. |
388 | loop { |
389 | match this.lock.state.compare_exchange( |
390 | state, |
391 | state + ONE_READER, |
392 | Ordering::AcqRel, |
393 | Ordering::Acquire, |
394 | ) { |
395 | Ok(_) => { |
396 | return Poll::Ready(()); |
397 | } |
398 | Err(s) => state = s, |
399 | } |
400 | } |
401 | } |
402 | } |
403 | |
404 | pin_project_lite::pin_project! { |
405 | /// The future returned by [`RawRwLock::write`]. |
406 | |
407 | pub(super) struct RawWrite<'a> { |
408 | // The lock that is being acquired. |
409 | pub(super) lock: &'a RawRwLock, |
410 | |
411 | // Our listener for the "no readers" event. |
412 | #[pin] |
413 | no_readers: EventListener, |
414 | |
415 | // Current state fof this future. |
416 | #[pin] |
417 | state: WriteState<'a>, |
418 | } |
419 | |
420 | impl PinnedDrop for RawWrite<'_> { |
421 | fn drop(this: Pin<&mut Self>) { |
422 | let this = this.project(); |
423 | |
424 | if matches!(this.state.project(), WriteStateProj::WaitingReaders) { |
425 | // Safety: we hold a write lock, more or less. |
426 | unsafe { |
427 | this.lock.write_unlock(); |
428 | } |
429 | } |
430 | } |
431 | } |
432 | } |
433 | |
434 | pin_project_lite::pin_project! { |
435 | #[project = WriteStateProj] |
436 | #[project_replace = WriteStateProjReplace] |
437 | enum WriteState<'a> { |
438 | // We are currently acquiring the inner mutex. |
439 | Acquiring { #[pin] lock: Lock<'a, ()> }, |
440 | |
441 | // We are currently waiting for readers to finish. |
442 | WaitingReaders, |
443 | |
444 | // The future has completed. |
445 | Acquired, |
446 | } |
447 | } |
448 | |
449 | impl<'a> EventListenerFuture for RawWrite<'a> { |
450 | type Output = (); |
451 | |
452 | fn poll_with_strategy<'x, S: Strategy<'x>>( |
453 | self: Pin<&mut Self>, |
454 | strategy: &mut S, |
455 | cx: &mut S::Context, |
456 | ) -> Poll<()> { |
457 | let mut this = self.project(); |
458 | |
459 | loop { |
460 | match this.state.as_mut().project() { |
461 | WriteStateProj::Acquiring { lock } => { |
462 | // First grab the mutex. |
463 | let mutex_guard = ready!(lock.poll_with_strategy(strategy, cx)); |
464 | forget(mutex_guard); |
465 | |
466 | // Set `WRITER_BIT` and create a guard that unsets it in case this future is canceled. |
467 | let new_state = this.lock.state.fetch_or(WRITER_BIT, Ordering::SeqCst); |
468 | |
469 | // If we just acquired the lock, return. |
470 | if new_state == WRITER_BIT { |
471 | this.state.as_mut().set(WriteState::Acquired); |
472 | return Poll::Ready(()); |
473 | } |
474 | |
475 | // Start waiting for the readers to finish. |
476 | this.no_readers.as_mut().listen(&this.lock.no_readers); |
477 | this.state.as_mut().set(WriteState::WaitingReaders); |
478 | } |
479 | |
480 | WriteStateProj::WaitingReaders => { |
481 | let load_ordering = if this.no_readers.is_listening() { |
482 | Ordering::Acquire |
483 | } else { |
484 | Ordering::SeqCst |
485 | }; |
486 | |
487 | // Check the state again. |
488 | if this.lock.state.load(load_ordering) == WRITER_BIT { |
489 | // We are the only ones holding the lock, return `Ready`. |
490 | this.state.as_mut().set(WriteState::Acquired); |
491 | return Poll::Ready(()); |
492 | } |
493 | |
494 | // Wait for the readers to finish. |
495 | if !this.no_readers.is_listening() { |
496 | // Register a listener. |
497 | this.no_readers.as_mut().listen(&this.lock.no_readers); |
498 | } else { |
499 | // Wait for the readers to finish. |
500 | ready!(strategy.poll(this.no_readers.as_mut(), cx)); |
501 | }; |
502 | } |
503 | WriteStateProj::Acquired => panic!("Write lock already acquired" ), |
504 | } |
505 | } |
506 | } |
507 | } |
508 | |
509 | pin_project_lite::pin_project! { |
510 | /// The future returned by [`RawRwLock::upgrade`]. |
511 | |
512 | pub(super) struct RawUpgrade<'a> { |
513 | lock: Option<&'a RawRwLock>, |
514 | |
515 | // The event listener we are waiting on. |
516 | #[pin] |
517 | listener: EventListener, |
518 | } |
519 | |
520 | impl PinnedDrop for RawUpgrade<'_> { |
521 | fn drop(this: Pin<&mut Self>) { |
522 | let this = this.project(); |
523 | if let Some(lock) = this.lock { |
524 | // SAFETY: we are dropping the future that would give us a write lock, |
525 | // so we don't need said lock anymore. |
526 | unsafe { |
527 | lock.write_unlock(); |
528 | } |
529 | } |
530 | } |
531 | } |
532 | } |
533 | |
534 | impl<'a> EventListenerFuture for RawUpgrade<'a> { |
535 | type Output = &'a RawRwLock; |
536 | |
537 | fn poll_with_strategy<'x, S: Strategy<'x>>( |
538 | self: Pin<&mut Self>, |
539 | strategy: &mut S, |
540 | cx: &mut S::Context, |
541 | ) -> Poll<&'a RawRwLock> { |
542 | let mut this = self.project(); |
543 | let lock = this.lock.expect("cannot poll future after completion" ); |
544 | |
545 | // If there are readers, we need to wait for them to finish. |
546 | loop { |
547 | let load_ordering = if this.listener.is_listening() { |
548 | Ordering::Acquire |
549 | } else { |
550 | Ordering::SeqCst |
551 | }; |
552 | |
553 | // See if the number of readers is zero. |
554 | let state = lock.state.load(load_ordering); |
555 | if state == WRITER_BIT { |
556 | break; |
557 | } |
558 | |
559 | // If there are readers, wait for them to finish. |
560 | if !this.listener.is_listening() { |
561 | // Start listening for "no readers" events. |
562 | this.listener.as_mut().listen(&lock.no_readers); |
563 | } else { |
564 | // Wait for the readers to finish. |
565 | ready!(strategy.poll(this.listener.as_mut(), cx)); |
566 | }; |
567 | } |
568 | |
569 | // We are done. |
570 | Poll::Ready(this.lock.take().unwrap()) |
571 | } |
572 | } |
573 | |
574 | impl<'a> RawUpgrade<'a> { |
575 | /// Whether the future returned `Poll::Ready(..)` at some point. |
576 | #[inline ] |
577 | pub(super) fn is_ready(&self) -> bool { |
578 | self.lock.is_none() |
579 | } |
580 | } |
581 | |