1 | //! Raw, unsafe reader-writer locking implementation, |
2 | //! doesn't depend on the data protected by the lock. |
3 | //! [`RwLock`](super::RwLock) is implemented in terms of this. |
4 | //! |
5 | //! Splitting the implementation this way allows instantiating |
6 | //! the locking code only once, and also lets us make |
7 | //! [`RwLockReadGuard`](super::RwLockReadGuard) covariant in `T`. |
8 | |
9 | use std::future::Future; |
10 | use std::mem::forget; |
11 | use std::pin::Pin; |
12 | use std::process; |
13 | use std::sync::atomic::{AtomicUsize, Ordering}; |
14 | use std::task::{Context, Poll}; |
15 | |
16 | use event_listener::{Event, EventListener}; |
17 | |
18 | use crate::futures::Lock; |
19 | use crate::Mutex; |
20 | |
21 | const WRITER_BIT: usize = 1; |
22 | const ONE_READER: usize = 2; |
23 | |
24 | /// A "raw" RwLock that doesn't hold any data. |
25 | pub(super) struct RawRwLock { |
26 | /// Acquired by the writer. |
27 | mutex: Mutex<()>, |
28 | |
29 | /// Event triggered when the last reader is dropped. |
30 | no_readers: Event, |
31 | |
32 | /// Event triggered when the writer is dropped. |
33 | no_writer: Event, |
34 | |
35 | /// Current state of the lock. |
36 | /// |
37 | /// The least significant bit (`WRITER_BIT`) is set to 1 when a writer is holding the lock or |
38 | /// trying to acquire it. |
39 | /// |
40 | /// The upper bits contain the number of currently active readers. Each active reader |
41 | /// increments the state by `ONE_READER`. |
42 | state: AtomicUsize, |
43 | } |
44 | |
45 | impl RawRwLock { |
46 | #[inline ] |
47 | pub(super) const fn new() -> Self { |
48 | RawRwLock { |
49 | mutex: Mutex::new(()), |
50 | no_readers: Event::new(), |
51 | no_writer: Event::new(), |
52 | state: AtomicUsize::new(0), |
53 | } |
54 | } |
55 | |
56 | /// Returns `true` iff a read lock was successfully acquired. |
57 | |
58 | pub(super) fn try_read(&self) -> bool { |
59 | let mut state = self.state.load(Ordering::Acquire); |
60 | |
61 | loop { |
62 | // If there's a writer holding the lock or attempting to acquire it, we cannot acquire |
63 | // a read lock here. |
64 | if state & WRITER_BIT != 0 { |
65 | return false; |
66 | } |
67 | |
68 | // Make sure the number of readers doesn't overflow. |
69 | if state > std::isize::MAX as usize { |
70 | process::abort(); |
71 | } |
72 | |
73 | // Increment the number of readers. |
74 | match self.state.compare_exchange( |
75 | state, |
76 | state + ONE_READER, |
77 | Ordering::AcqRel, |
78 | Ordering::Acquire, |
79 | ) { |
80 | Ok(_) => return true, |
81 | Err(s) => state = s, |
82 | } |
83 | } |
84 | } |
85 | |
86 | #[inline ] |
87 | pub(super) fn read(&self) -> RawRead<'_> { |
88 | RawRead { |
89 | lock: self, |
90 | state: self.state.load(Ordering::Acquire), |
91 | listener: None, |
92 | } |
93 | } |
94 | |
95 | /// Returns `true` iff an upgradable read lock was successfully acquired. |
96 | |
97 | pub(super) fn try_upgradable_read(&self) -> bool { |
98 | // First try grabbing the mutex. |
99 | let lock = if let Some(lock) = self.mutex.try_lock() { |
100 | lock |
101 | } else { |
102 | return false; |
103 | }; |
104 | |
105 | forget(lock); |
106 | |
107 | let mut state = self.state.load(Ordering::Acquire); |
108 | |
109 | // Make sure the number of readers doesn't overflow. |
110 | if state > std::isize::MAX as usize { |
111 | process::abort(); |
112 | } |
113 | |
114 | // Increment the number of readers. |
115 | loop { |
116 | match self.state.compare_exchange( |
117 | state, |
118 | state + ONE_READER, |
119 | Ordering::AcqRel, |
120 | Ordering::Acquire, |
121 | ) { |
122 | Ok(_) => return true, |
123 | Err(s) => state = s, |
124 | } |
125 | } |
126 | } |
127 | |
128 | #[inline ] |
129 | |
130 | pub(super) fn upgradable_read(&self) -> RawUpgradableRead<'_> { |
131 | RawUpgradableRead { |
132 | lock: self, |
133 | acquire: self.mutex.lock(), |
134 | } |
135 | } |
136 | |
137 | /// Returs `true` iff a write lock was successfully acquired. |
138 | |
139 | pub(super) fn try_write(&self) -> bool { |
140 | // First try grabbing the mutex. |
141 | let lock = if let Some(lock) = self.mutex.try_lock() { |
142 | lock |
143 | } else { |
144 | return false; |
145 | }; |
146 | |
147 | // If there are no readers, grab the write lock. |
148 | if self |
149 | .state |
150 | .compare_exchange(0, WRITER_BIT, Ordering::AcqRel, Ordering::Acquire) |
151 | .is_ok() |
152 | { |
153 | forget(lock); |
154 | true |
155 | } else { |
156 | drop(lock); |
157 | false |
158 | } |
159 | } |
160 | |
161 | #[inline ] |
162 | |
163 | pub(super) fn write(&self) -> RawWrite<'_> { |
164 | RawWrite { |
165 | lock: self, |
166 | state: WriteState::Acquiring(self.mutex.lock()), |
167 | } |
168 | } |
169 | |
170 | /// Returns `true` iff a the upgradable read lock was successfully upgraded to a write lock. |
171 | /// |
172 | /// # Safety |
173 | /// |
174 | /// Caller must hold an upgradable read lock. |
175 | /// This will attempt to upgrade it to a write lock. |
176 | |
177 | pub(super) unsafe fn try_upgrade(&self) -> bool { |
178 | self.state |
179 | .compare_exchange(ONE_READER, WRITER_BIT, Ordering::AcqRel, Ordering::Acquire) |
180 | .is_ok() |
181 | } |
182 | |
183 | /// # Safety |
184 | /// |
185 | /// Caller must hold an upgradable read lock. |
186 | /// This will upgrade it to a write lock. |
187 | |
188 | pub(super) unsafe fn upgrade(&self) -> RawUpgrade<'_> { |
189 | // Set `WRITER_BIT` and decrement the number of readers at the same time. |
190 | self.state |
191 | .fetch_sub(ONE_READER - WRITER_BIT, Ordering::SeqCst); |
192 | |
193 | RawUpgrade { |
194 | lock: Some(self), |
195 | listener: None, |
196 | } |
197 | } |
198 | |
199 | /// # Safety |
200 | /// |
201 | /// Caller must hold an upgradable read lock. |
202 | /// This will downgrade it to a stadard read lock. |
203 | #[inline ] |
204 | |
205 | pub(super) unsafe fn downgrade_upgradable_read(&self) { |
206 | self.mutex.unlock_unchecked(); |
207 | } |
208 | |
209 | /// # Safety |
210 | /// |
211 | /// Caller must hold a write lock. |
212 | /// This will downgrade it to a read lock. |
213 | |
214 | pub(super) unsafe fn downgrade_write(&self) { |
215 | // Atomically downgrade state. |
216 | self.state |
217 | .fetch_add(ONE_READER - WRITER_BIT, Ordering::SeqCst); |
218 | |
219 | // Release the writer mutex. |
220 | self.mutex.unlock_unchecked(); |
221 | |
222 | // Trigger the "no writer" event. |
223 | self.no_writer.notify(1); |
224 | } |
225 | |
226 | /// # Safety |
227 | /// |
228 | /// Caller must hold a write lock. |
229 | /// This will downgrade it to an upgradable read lock. |
230 | |
231 | pub(super) unsafe fn downgrade_to_upgradable(&self) { |
232 | // Atomically downgrade state. |
233 | self.state |
234 | .fetch_add(ONE_READER - WRITER_BIT, Ordering::SeqCst); |
235 | } |
236 | |
237 | /// # Safety |
238 | /// |
239 | /// Caller must hold a read lock . |
240 | /// This will unlock that lock. |
241 | |
242 | pub(super) unsafe fn read_unlock(&self) { |
243 | // Decrement the number of readers. |
244 | if self.state.fetch_sub(ONE_READER, Ordering::SeqCst) & !WRITER_BIT == ONE_READER { |
245 | // If this was the last reader, trigger the "no readers" event. |
246 | self.no_readers.notify(1); |
247 | } |
248 | } |
249 | |
250 | /// # Safety |
251 | /// |
252 | /// Caller must hold an upgradable read lock. |
253 | /// This will unlock that lock. |
254 | |
255 | pub(super) unsafe fn upgradable_read_unlock(&self) { |
256 | // Decrement the number of readers. |
257 | if self.state.fetch_sub(ONE_READER, Ordering::SeqCst) & !WRITER_BIT == ONE_READER { |
258 | // If this was the last reader, trigger the "no readers" event. |
259 | self.no_readers.notify(1); |
260 | } |
261 | |
262 | // SAFETY: upgradable read guards acquire the writer mutex upon creation. |
263 | self.mutex.unlock_unchecked(); |
264 | } |
265 | |
266 | /// # Safety |
267 | /// |
268 | /// Caller must hold a write lock. |
269 | /// This will unlock that lock. |
270 | |
271 | pub(super) unsafe fn write_unlock(&self) { |
272 | // Unset `WRITER_BIT`. |
273 | self.state.fetch_and(!WRITER_BIT, Ordering::SeqCst); |
274 | // Trigger the "no writer" event. |
275 | self.no_writer.notify(1); |
276 | |
277 | // Release the writer lock. |
278 | // SAFETY: `RwLockWriteGuard` always holds a lock on writer mutex. |
279 | self.mutex.unlock_unchecked(); |
280 | } |
281 | } |
282 | |
283 | /// The future returned by [`RawRwLock::read`]. |
284 | |
285 | pub(super) struct RawRead<'a> { |
286 | /// The lock that is being acquired. |
287 | pub(super) lock: &'a RawRwLock, |
288 | |
289 | /// The last-observed state of the lock. |
290 | state: usize, |
291 | |
292 | /// The listener for the "no writers" event. |
293 | listener: Option<EventListener>, |
294 | } |
295 | |
296 | impl<'a> Future for RawRead<'a> { |
297 | type Output = (); |
298 | |
299 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { |
300 | let this = self.get_mut(); |
301 | |
302 | loop { |
303 | if this.state & WRITER_BIT == 0 { |
304 | // Make sure the number of readers doesn't overflow. |
305 | if this.state > std::isize::MAX as usize { |
306 | process::abort(); |
307 | } |
308 | |
309 | // If nobody is holding a write lock or attempting to acquire it, increment the |
310 | // number of readers. |
311 | match this.lock.state.compare_exchange( |
312 | this.state, |
313 | this.state + ONE_READER, |
314 | Ordering::AcqRel, |
315 | Ordering::Acquire, |
316 | ) { |
317 | Ok(_) => return Poll::Ready(()), |
318 | Err(s) => this.state = s, |
319 | } |
320 | } else { |
321 | // Start listening for "no writer" events. |
322 | let load_ordering = match &mut this.listener { |
323 | None => { |
324 | this.listener = Some(this.lock.no_writer.listen()); |
325 | |
326 | // Make sure there really is no writer. |
327 | Ordering::SeqCst |
328 | } |
329 | |
330 | Some(ref mut listener) => { |
331 | // Wait for the writer to finish. |
332 | ready!(Pin::new(listener).poll(cx)); |
333 | this.listener = None; |
334 | |
335 | // Notify the next reader waiting in list. |
336 | this.lock.no_writer.notify(1); |
337 | |
338 | // Check the state again. |
339 | Ordering::Acquire |
340 | } |
341 | }; |
342 | |
343 | // Reload the state. |
344 | this.state = this.lock.state.load(load_ordering); |
345 | } |
346 | } |
347 | } |
348 | } |
349 | |
350 | /// The future returned by [`RawRwLock::upgradable_read`]. |
351 | |
352 | pub(super) struct RawUpgradableRead<'a> { |
353 | /// The lock that is being acquired. |
354 | pub(super) lock: &'a RawRwLock, |
355 | |
356 | /// The mutex we are trying to acquire. |
357 | acquire: Lock<'a, ()>, |
358 | } |
359 | |
360 | impl<'a> Future for RawUpgradableRead<'a> { |
361 | type Output = (); |
362 | |
363 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { |
364 | let this = self.get_mut(); |
365 | |
366 | // Acquire the mutex. |
367 | let mutex_guard = ready!(Pin::new(&mut this.acquire).poll(cx)); |
368 | forget(mutex_guard); |
369 | |
370 | let mut state = this.lock.state.load(Ordering::Acquire); |
371 | |
372 | // Make sure the number of readers doesn't overflow. |
373 | if state > std::isize::MAX as usize { |
374 | process::abort(); |
375 | } |
376 | |
377 | // Increment the number of readers. |
378 | loop { |
379 | match this.lock.state.compare_exchange( |
380 | state, |
381 | state + ONE_READER, |
382 | Ordering::AcqRel, |
383 | Ordering::Acquire, |
384 | ) { |
385 | Ok(_) => { |
386 | return Poll::Ready(()); |
387 | } |
388 | Err(s) => state = s, |
389 | } |
390 | } |
391 | } |
392 | } |
393 | |
394 | /// The future returned by [`RawRwLock::write`]. |
395 | |
396 | pub(super) struct RawWrite<'a> { |
397 | /// The lock that is being acquired. |
398 | pub(super) lock: &'a RawRwLock, |
399 | |
400 | /// Current state fof this future. |
401 | state: WriteState<'a>, |
402 | } |
403 | |
404 | enum WriteState<'a> { |
405 | /// We are currently acquiring the inner mutex. |
406 | Acquiring(Lock<'a, ()>), |
407 | |
408 | /// We are currently waiting for readers to finish. |
409 | WaitingReaders { |
410 | /// The listener for the "no readers" event. |
411 | listener: Option<EventListener>, |
412 | }, |
413 | |
414 | /// The future has completed. |
415 | Acquired, |
416 | } |
417 | |
418 | impl<'a> Future for RawWrite<'a> { |
419 | type Output = (); |
420 | |
421 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { |
422 | let this = self.get_mut(); |
423 | |
424 | loop { |
425 | match &mut this.state { |
426 | WriteState::Acquiring(lock) => { |
427 | // First grab the mutex. |
428 | let mutex_guard = ready!(Pin::new(lock).poll(cx)); |
429 | forget(mutex_guard); |
430 | |
431 | // Set `WRITER_BIT` and create a guard that unsets it in case this future is canceled. |
432 | let new_state = this.lock.state.fetch_or(WRITER_BIT, Ordering::SeqCst); |
433 | |
434 | // If we just acquired the lock, return. |
435 | if new_state == WRITER_BIT { |
436 | this.state = WriteState::Acquired; |
437 | return Poll::Ready(()); |
438 | } |
439 | |
440 | // Start waiting for the readers to finish. |
441 | this.state = WriteState::WaitingReaders { |
442 | listener: Some(this.lock.no_readers.listen()), |
443 | }; |
444 | } |
445 | |
446 | WriteState::WaitingReaders { ref mut listener } => { |
447 | let load_ordering = if listener.is_some() { |
448 | Ordering::Acquire |
449 | } else { |
450 | Ordering::SeqCst |
451 | }; |
452 | |
453 | // Check the state again. |
454 | if this.lock.state.load(load_ordering) == WRITER_BIT { |
455 | // We are the only ones holding the lock, return `Ready`. |
456 | this.state = WriteState::Acquired; |
457 | return Poll::Ready(()); |
458 | } |
459 | |
460 | // Wait for the readers to finish. |
461 | match listener { |
462 | None => { |
463 | // Register a listener. |
464 | *listener = Some(this.lock.no_readers.listen()); |
465 | } |
466 | |
467 | Some(ref mut evl) => { |
468 | // Wait for the readers to finish. |
469 | ready!(Pin::new(evl).poll(cx)); |
470 | *listener = None; |
471 | } |
472 | }; |
473 | } |
474 | WriteState::Acquired => panic!("Write lock already acquired" ), |
475 | } |
476 | } |
477 | } |
478 | } |
479 | |
480 | impl<'a> Drop for RawWrite<'a> { |
481 | fn drop(&mut self) { |
482 | if matches!(self.state, WriteState::WaitingReaders { .. }) { |
483 | // Safety: we hold a write lock, more or less. |
484 | unsafe { |
485 | self.lock.write_unlock(); |
486 | } |
487 | } |
488 | } |
489 | } |
490 | |
491 | /// The future returned by [`RawRwLock::upgrade`]. |
492 | |
493 | pub(super) struct RawUpgrade<'a> { |
494 | lock: Option<&'a RawRwLock>, |
495 | |
496 | /// The event listener we are waiting on. |
497 | listener: Option<EventListener>, |
498 | } |
499 | |
500 | impl<'a> Future for RawUpgrade<'a> { |
501 | type Output = &'a RawRwLock; |
502 | |
503 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<&'a RawRwLock> { |
504 | let this = self.get_mut(); |
505 | let lock = this.lock.expect("cannot poll future after completion" ); |
506 | |
507 | // If there are readers, we need to wait for them to finish. |
508 | loop { |
509 | let load_ordering = if this.listener.is_some() { |
510 | Ordering::Acquire |
511 | } else { |
512 | Ordering::SeqCst |
513 | }; |
514 | |
515 | // See if the number of readers is zero. |
516 | let state = lock.state.load(load_ordering); |
517 | if state == WRITER_BIT { |
518 | break; |
519 | } |
520 | |
521 | // If there are readers, wait for them to finish. |
522 | match &mut this.listener { |
523 | None => { |
524 | // Start listening for "no readers" events. |
525 | this.listener = Some(lock.no_readers.listen()); |
526 | } |
527 | |
528 | Some(ref mut listener) => { |
529 | // Wait for the readers to finish. |
530 | ready!(Pin::new(listener).poll(cx)); |
531 | this.listener = None; |
532 | } |
533 | } |
534 | } |
535 | |
536 | // We are done. |
537 | Poll::Ready(this.lock.take().unwrap()) |
538 | } |
539 | } |
540 | |
541 | impl<'a> Drop for RawUpgrade<'a> { |
542 | #[inline ] |
543 | fn drop(&mut self) { |
544 | if let Some(lock: &RawRwLock) = self.lock { |
545 | // SAFETY: we are dropping the future that would give us a write lock, |
546 | // so we don't need said lock anymore. |
547 | unsafe { |
548 | lock.write_unlock(); |
549 | } |
550 | } |
551 | } |
552 | } |
553 | |
554 | impl<'a> RawUpgrade<'a> { |
555 | /// Whether the future returned `Poll::Ready(..)` at some point. |
556 | #[inline ] |
557 | pub(super) fn is_ready(&self) -> bool { |
558 | self.lock.is_none() |
559 | } |
560 | } |
561 | |