1// Copyright 2016 Amanieu d'Antras
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8use crate::elision::{have_elision, AtomicElisionExt};
9use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL};
10use crate::util;
11use core::{
12 cell::Cell,
13 sync::atomic::{AtomicUsize, Ordering},
14};
15use lock_api::{RawRwLock as RawRwLock_, RawRwLockUpgrade};
16use parking_lot_core::{
17 self, deadlock, FilterOp, ParkResult, ParkToken, SpinWait, UnparkResult, UnparkToken,
18};
19use std::time::{Duration, Instant};
20
21// This reader-writer lock implementation is based on Boost's upgrade_mutex:
22// https://github.com/boostorg/thread/blob/fc08c1fe2840baeeee143440fba31ef9e9a813c8/include/boost/thread/v2/shared_mutex.hpp#L432
23//
24// This implementation uses 2 wait queues, one at key [addr] and one at key
25// [addr + 1]. The primary queue is used for all new waiting threads, and the
26// secondary queue is used by the thread which has acquired WRITER_BIT but is
27// waiting for the remaining readers to exit the lock.
28//
29// This implementation is fair between readers and writers since it uses the
30// order in which threads first started queuing to alternate between read phases
31// and write phases. In particular is it not vulnerable to write starvation
32// since readers will block if there is a pending writer.
33
34// There is at least one thread in the main queue.
35const PARKED_BIT: usize = 0b0001;
36// There is a parked thread holding WRITER_BIT. WRITER_BIT must be set.
37const WRITER_PARKED_BIT: usize = 0b0010;
38// A reader is holding an upgradable lock. The reader count must be non-zero and
39// WRITER_BIT must not be set.
40const UPGRADABLE_BIT: usize = 0b0100;
41// If the reader count is zero: a writer is currently holding an exclusive lock.
42// Otherwise: a writer is waiting for the remaining readers to exit the lock.
43const WRITER_BIT: usize = 0b1000;
44// Mask of bits used to count readers.
45const READERS_MASK: usize = !0b1111;
46// Base unit for counting readers.
47const ONE_READER: usize = 0b10000;
48
49// Token indicating what type of lock a queued thread is trying to acquire
50const TOKEN_SHARED: ParkToken = ParkToken(ONE_READER);
51const TOKEN_EXCLUSIVE: ParkToken = ParkToken(WRITER_BIT);
52const TOKEN_UPGRADABLE: ParkToken = ParkToken(ONE_READER | UPGRADABLE_BIT);
53
54/// Raw reader-writer lock type backed by the parking lot.
55pub struct RawRwLock {
56 state: AtomicUsize,
57}
58
59unsafe impl lock_api::RawRwLock for RawRwLock {
60 const INIT: RawRwLock = RawRwLock {
61 state: AtomicUsize::new(0),
62 };
63
64 type GuardMarker = crate::GuardMarker;
65
66 #[inline]
67 fn lock_exclusive(&self) {
68 if self
69 .state
70 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
71 .is_err()
72 {
73 let result = self.lock_exclusive_slow(None);
74 debug_assert!(result);
75 }
76 self.deadlock_acquire();
77 }
78
79 #[inline]
80 fn try_lock_exclusive(&self) -> bool {
81 if self
82 .state
83 .compare_exchange(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
84 .is_ok()
85 {
86 self.deadlock_acquire();
87 true
88 } else {
89 false
90 }
91 }
92
93 #[inline]
94 unsafe fn unlock_exclusive(&self) {
95 self.deadlock_release();
96 if self
97 .state
98 .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed)
99 .is_ok()
100 {
101 return;
102 }
103 self.unlock_exclusive_slow(false);
104 }
105
106 #[inline]
107 fn lock_shared(&self) {
108 if !self.try_lock_shared_fast(false) {
109 let result = self.lock_shared_slow(false, None);
110 debug_assert!(result);
111 }
112 self.deadlock_acquire();
113 }
114
115 #[inline]
116 fn try_lock_shared(&self) -> bool {
117 let result = if self.try_lock_shared_fast(false) {
118 true
119 } else {
120 self.try_lock_shared_slow(false)
121 };
122 if result {
123 self.deadlock_acquire();
124 }
125 result
126 }
127
128 #[inline]
129 unsafe fn unlock_shared(&self) {
130 self.deadlock_release();
131 let state = if have_elision() {
132 self.state.elision_fetch_sub_release(ONE_READER)
133 } else {
134 self.state.fetch_sub(ONE_READER, Ordering::Release)
135 };
136 if state & (READERS_MASK | WRITER_PARKED_BIT) == (ONE_READER | WRITER_PARKED_BIT) {
137 self.unlock_shared_slow();
138 }
139 }
140
141 #[inline]
142 fn is_locked(&self) -> bool {
143 let state = self.state.load(Ordering::Relaxed);
144 state & (WRITER_BIT | READERS_MASK) != 0
145 }
146
147 #[inline]
148 fn is_locked_exclusive(&self) -> bool {
149 let state = self.state.load(Ordering::Relaxed);
150 state & (WRITER_BIT) != 0
151 }
152}
153
154unsafe impl lock_api::RawRwLockFair for RawRwLock {
155 #[inline]
156 unsafe fn unlock_shared_fair(&self) {
157 // Shared unlocking is always fair in this implementation.
158 self.unlock_shared();
159 }
160
161 #[inline]
162 unsafe fn unlock_exclusive_fair(&self) {
163 self.deadlock_release();
164 if self
165 .state
166 .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed)
167 .is_ok()
168 {
169 return;
170 }
171 self.unlock_exclusive_slow(true);
172 }
173
174 #[inline]
175 unsafe fn bump_shared(&self) {
176 if self.state.load(Ordering::Relaxed) & (READERS_MASK | WRITER_BIT)
177 == ONE_READER | WRITER_BIT
178 {
179 self.bump_shared_slow();
180 }
181 }
182
183 #[inline]
184 unsafe fn bump_exclusive(&self) {
185 if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 {
186 self.bump_exclusive_slow();
187 }
188 }
189}
190
191unsafe impl lock_api::RawRwLockDowngrade for RawRwLock {
192 #[inline]
193 unsafe fn downgrade(&self) {
194 let state: usize = self
195 .state
196 .fetch_add(ONE_READER - WRITER_BIT, order:Ordering::Release);
197
198 // Wake up parked shared and upgradable threads if there are any
199 if state & PARKED_BIT != 0 {
200 self.downgrade_slow();
201 }
202 }
203}
204
205unsafe impl lock_api::RawRwLockTimed for RawRwLock {
206 type Duration = Duration;
207 type Instant = Instant;
208
209 #[inline]
210 fn try_lock_shared_for(&self, timeout: Self::Duration) -> bool {
211 let result = if self.try_lock_shared_fast(false) {
212 true
213 } else {
214 self.lock_shared_slow(false, util::to_deadline(timeout))
215 };
216 if result {
217 self.deadlock_acquire();
218 }
219 result
220 }
221
222 #[inline]
223 fn try_lock_shared_until(&self, timeout: Self::Instant) -> bool {
224 let result = if self.try_lock_shared_fast(false) {
225 true
226 } else {
227 self.lock_shared_slow(false, Some(timeout))
228 };
229 if result {
230 self.deadlock_acquire();
231 }
232 result
233 }
234
235 #[inline]
236 fn try_lock_exclusive_for(&self, timeout: Duration) -> bool {
237 let result = if self
238 .state
239 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
240 .is_ok()
241 {
242 true
243 } else {
244 self.lock_exclusive_slow(util::to_deadline(timeout))
245 };
246 if result {
247 self.deadlock_acquire();
248 }
249 result
250 }
251
252 #[inline]
253 fn try_lock_exclusive_until(&self, timeout: Instant) -> bool {
254 let result = if self
255 .state
256 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
257 .is_ok()
258 {
259 true
260 } else {
261 self.lock_exclusive_slow(Some(timeout))
262 };
263 if result {
264 self.deadlock_acquire();
265 }
266 result
267 }
268}
269
270unsafe impl lock_api::RawRwLockRecursive for RawRwLock {
271 #[inline]
272 fn lock_shared_recursive(&self) {
273 if !self.try_lock_shared_fast(recursive:true) {
274 let result: bool = self.lock_shared_slow(recursive:true, timeout:None);
275 debug_assert!(result);
276 }
277 self.deadlock_acquire();
278 }
279
280 #[inline]
281 fn try_lock_shared_recursive(&self) -> bool {
282 let result: bool = if self.try_lock_shared_fast(recursive:true) {
283 true
284 } else {
285 self.try_lock_shared_slow(recursive:true)
286 };
287 if result {
288 self.deadlock_acquire();
289 }
290 result
291 }
292}
293
294unsafe impl lock_api::RawRwLockRecursiveTimed for RawRwLock {
295 #[inline]
296 fn try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool {
297 let result = if self.try_lock_shared_fast(true) {
298 true
299 } else {
300 self.lock_shared_slow(true, util::to_deadline(timeout))
301 };
302 if result {
303 self.deadlock_acquire();
304 }
305 result
306 }
307
308 #[inline]
309 fn try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool {
310 let result = if self.try_lock_shared_fast(true) {
311 true
312 } else {
313 self.lock_shared_slow(true, Some(timeout))
314 };
315 if result {
316 self.deadlock_acquire();
317 }
318 result
319 }
320}
321
322unsafe impl lock_api::RawRwLockUpgrade for RawRwLock {
323 #[inline]
324 fn lock_upgradable(&self) {
325 if !self.try_lock_upgradable_fast() {
326 let result = self.lock_upgradable_slow(None);
327 debug_assert!(result);
328 }
329 self.deadlock_acquire();
330 }
331
332 #[inline]
333 fn try_lock_upgradable(&self) -> bool {
334 let result = if self.try_lock_upgradable_fast() {
335 true
336 } else {
337 self.try_lock_upgradable_slow()
338 };
339 if result {
340 self.deadlock_acquire();
341 }
342 result
343 }
344
345 #[inline]
346 unsafe fn unlock_upgradable(&self) {
347 self.deadlock_release();
348 let state = self.state.load(Ordering::Relaxed);
349 if state & PARKED_BIT == 0 {
350 if self
351 .state
352 .compare_exchange_weak(
353 state,
354 state - (ONE_READER | UPGRADABLE_BIT),
355 Ordering::Release,
356 Ordering::Relaxed,
357 )
358 .is_ok()
359 {
360 return;
361 }
362 }
363 self.unlock_upgradable_slow(false);
364 }
365
366 #[inline]
367 unsafe fn upgrade(&self) {
368 let state = self.state.fetch_sub(
369 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
370 Ordering::Acquire,
371 );
372 if state & READERS_MASK != ONE_READER {
373 let result = self.upgrade_slow(None);
374 debug_assert!(result);
375 }
376 }
377
378 #[inline]
379 unsafe fn try_upgrade(&self) -> bool {
380 if self
381 .state
382 .compare_exchange_weak(
383 ONE_READER | UPGRADABLE_BIT,
384 WRITER_BIT,
385 Ordering::Acquire,
386 Ordering::Relaxed,
387 )
388 .is_ok()
389 {
390 true
391 } else {
392 self.try_upgrade_slow()
393 }
394 }
395}
396
397unsafe impl lock_api::RawRwLockUpgradeFair for RawRwLock {
398 #[inline]
399 unsafe fn unlock_upgradable_fair(&self) {
400 self.deadlock_release();
401 let state = self.state.load(Ordering::Relaxed);
402 if state & PARKED_BIT == 0 {
403 if self
404 .state
405 .compare_exchange_weak(
406 state,
407 state - (ONE_READER | UPGRADABLE_BIT),
408 Ordering::Release,
409 Ordering::Relaxed,
410 )
411 .is_ok()
412 {
413 return;
414 }
415 }
416 self.unlock_upgradable_slow(false);
417 }
418
419 #[inline]
420 unsafe fn bump_upgradable(&self) {
421 if self.state.load(Ordering::Relaxed) == ONE_READER | UPGRADABLE_BIT | PARKED_BIT {
422 self.bump_upgradable_slow();
423 }
424 }
425}
426
427unsafe impl lock_api::RawRwLockUpgradeDowngrade for RawRwLock {
428 #[inline]
429 unsafe fn downgrade_upgradable(&self) {
430 let state: usize = self.state.fetch_sub(UPGRADABLE_BIT, order:Ordering::Relaxed);
431
432 // Wake up parked upgradable threads if there are any
433 if state & PARKED_BIT != 0 {
434 self.downgrade_slow();
435 }
436 }
437
438 #[inline]
439 unsafe fn downgrade_to_upgradable(&self) {
440 let state: usize = self.state.fetch_add(
441 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
442 order:Ordering::Release,
443 );
444
445 // Wake up parked shared threads if there are any
446 if state & PARKED_BIT != 0 {
447 self.downgrade_to_upgradable_slow();
448 }
449 }
450}
451
452unsafe impl lock_api::RawRwLockUpgradeTimed for RawRwLock {
453 #[inline]
454 fn try_lock_upgradable_until(&self, timeout: Instant) -> bool {
455 let result = if self.try_lock_upgradable_fast() {
456 true
457 } else {
458 self.lock_upgradable_slow(Some(timeout))
459 };
460 if result {
461 self.deadlock_acquire();
462 }
463 result
464 }
465
466 #[inline]
467 fn try_lock_upgradable_for(&self, timeout: Duration) -> bool {
468 let result = if self.try_lock_upgradable_fast() {
469 true
470 } else {
471 self.lock_upgradable_slow(util::to_deadline(timeout))
472 };
473 if result {
474 self.deadlock_acquire();
475 }
476 result
477 }
478
479 #[inline]
480 unsafe fn try_upgrade_until(&self, timeout: Instant) -> bool {
481 let state = self.state.fetch_sub(
482 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
483 Ordering::Relaxed,
484 );
485 if state & READERS_MASK == ONE_READER {
486 true
487 } else {
488 self.upgrade_slow(Some(timeout))
489 }
490 }
491
492 #[inline]
493 unsafe fn try_upgrade_for(&self, timeout: Duration) -> bool {
494 let state = self.state.fetch_sub(
495 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
496 Ordering::Relaxed,
497 );
498 if state & READERS_MASK == ONE_READER {
499 true
500 } else {
501 self.upgrade_slow(util::to_deadline(timeout))
502 }
503 }
504}
505
506impl RawRwLock {
507 #[inline(always)]
508 fn try_lock_shared_fast(&self, recursive: bool) -> bool {
509 let state = self.state.load(Ordering::Relaxed);
510
511 // We can't allow grabbing a shared lock if there is a writer, even if
512 // the writer is still waiting for the remaining readers to exit.
513 if state & WRITER_BIT != 0 {
514 // To allow recursive locks, we make an exception and allow readers
515 // to skip ahead of a pending writer to avoid deadlocking, at the
516 // cost of breaking the fairness guarantees.
517 if !recursive || state & READERS_MASK == 0 {
518 return false;
519 }
520 }
521
522 // Use hardware lock elision to avoid cache conflicts when multiple
523 // readers try to acquire the lock. We only do this if the lock is
524 // completely empty since elision handles conflicts poorly.
525 if have_elision() && state == 0 {
526 self.state
527 .elision_compare_exchange_acquire(0, ONE_READER)
528 .is_ok()
529 } else if let Some(new_state) = state.checked_add(ONE_READER) {
530 self.state
531 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
532 .is_ok()
533 } else {
534 false
535 }
536 }
537
538 #[cold]
539 fn try_lock_shared_slow(&self, recursive: bool) -> bool {
540 let mut state = self.state.load(Ordering::Relaxed);
541 loop {
542 // This mirrors the condition in try_lock_shared_fast
543 if state & WRITER_BIT != 0 {
544 if !recursive || state & READERS_MASK == 0 {
545 return false;
546 }
547 }
548 if have_elision() && state == 0 {
549 match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
550 Ok(_) => return true,
551 Err(x) => state = x,
552 }
553 } else {
554 match self.state.compare_exchange_weak(
555 state,
556 state
557 .checked_add(ONE_READER)
558 .expect("RwLock reader count overflow"),
559 Ordering::Acquire,
560 Ordering::Relaxed,
561 ) {
562 Ok(_) => return true,
563 Err(x) => state = x,
564 }
565 }
566 }
567 }
568
569 #[inline(always)]
570 fn try_lock_upgradable_fast(&self) -> bool {
571 let state = self.state.load(Ordering::Relaxed);
572
573 // We can't grab an upgradable lock if there is already a writer or
574 // upgradable reader.
575 if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
576 return false;
577 }
578
579 if let Some(new_state) = state.checked_add(ONE_READER | UPGRADABLE_BIT) {
580 self.state
581 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
582 .is_ok()
583 } else {
584 false
585 }
586 }
587
588 #[cold]
589 fn try_lock_upgradable_slow(&self) -> bool {
590 let mut state = self.state.load(Ordering::Relaxed);
591 loop {
592 // This mirrors the condition in try_lock_upgradable_fast
593 if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
594 return false;
595 }
596
597 match self.state.compare_exchange_weak(
598 state,
599 state
600 .checked_add(ONE_READER | UPGRADABLE_BIT)
601 .expect("RwLock reader count overflow"),
602 Ordering::Acquire,
603 Ordering::Relaxed,
604 ) {
605 Ok(_) => return true,
606 Err(x) => state = x,
607 }
608 }
609 }
610
611 #[cold]
612 fn lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool {
613 let try_lock = |state: &mut usize| {
614 loop {
615 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
616 return false;
617 }
618
619 // Grab WRITER_BIT if it isn't set, even if there are parked threads.
620 match self.state.compare_exchange_weak(
621 *state,
622 *state | WRITER_BIT,
623 Ordering::Acquire,
624 Ordering::Relaxed,
625 ) {
626 Ok(_) => return true,
627 Err(x) => *state = x,
628 }
629 }
630 };
631
632 // Step 1: grab exclusive ownership of WRITER_BIT
633 let timed_out = !self.lock_common(
634 timeout,
635 TOKEN_EXCLUSIVE,
636 try_lock,
637 WRITER_BIT | UPGRADABLE_BIT,
638 );
639 if timed_out {
640 return false;
641 }
642
643 // Step 2: wait for all remaining readers to exit the lock.
644 self.wait_for_readers(timeout, 0)
645 }
646
647 #[cold]
648 fn unlock_exclusive_slow(&self, force_fair: bool) {
649 // There are threads to unpark. Try to unpark as many as we can.
650 let callback = |mut new_state, result: UnparkResult| {
651 // If we are using a fair unlock then we should keep the
652 // rwlock locked and hand it off to the unparked threads.
653 if result.unparked_threads != 0 && (force_fair || result.be_fair) {
654 if result.have_more_threads {
655 new_state |= PARKED_BIT;
656 }
657 self.state.store(new_state, Ordering::Release);
658 TOKEN_HANDOFF
659 } else {
660 // Clear the parked bit if there are no more parked threads.
661 if result.have_more_threads {
662 self.state.store(PARKED_BIT, Ordering::Release);
663 } else {
664 self.state.store(0, Ordering::Release);
665 }
666 TOKEN_NORMAL
667 }
668 };
669 // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
670 unsafe {
671 self.wake_parked_threads(0, callback);
672 }
673 }
674
675 #[cold]
676 fn lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool {
677 let try_lock = |state: &mut usize| {
678 let mut spinwait_shared = SpinWait::new();
679 loop {
680 // Use hardware lock elision to avoid cache conflicts when multiple
681 // readers try to acquire the lock. We only do this if the lock is
682 // completely empty since elision handles conflicts poorly.
683 if have_elision() && *state == 0 {
684 match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
685 Ok(_) => return true,
686 Err(x) => *state = x,
687 }
688 }
689
690 // This is the same condition as try_lock_shared_fast
691 if *state & WRITER_BIT != 0 {
692 if !recursive || *state & READERS_MASK == 0 {
693 return false;
694 }
695 }
696
697 if self
698 .state
699 .compare_exchange_weak(
700 *state,
701 state
702 .checked_add(ONE_READER)
703 .expect("RwLock reader count overflow"),
704 Ordering::Acquire,
705 Ordering::Relaxed,
706 )
707 .is_ok()
708 {
709 return true;
710 }
711
712 // If there is high contention on the reader count then we want
713 // to leave some time between attempts to acquire the lock to
714 // let other threads make progress.
715 spinwait_shared.spin_no_yield();
716 *state = self.state.load(Ordering::Relaxed);
717 }
718 };
719 self.lock_common(timeout, TOKEN_SHARED, try_lock, WRITER_BIT)
720 }
721
722 #[cold]
723 fn unlock_shared_slow(&self) {
724 // At this point WRITER_PARKED_BIT is set and READER_MASK is empty. We
725 // just need to wake up a potentially sleeping pending writer.
726 // Using the 2nd key at addr + 1
727 let addr = self as *const _ as usize + 1;
728 let callback = |_result: UnparkResult| {
729 // Clear the WRITER_PARKED_BIT here since there can only be one
730 // parked writer thread.
731 self.state.fetch_and(!WRITER_PARKED_BIT, Ordering::Relaxed);
732 TOKEN_NORMAL
733 };
734 // SAFETY:
735 // * `addr` is an address we control.
736 // * `callback` does not panic or call into any function of `parking_lot`.
737 unsafe {
738 parking_lot_core::unpark_one(addr, callback);
739 }
740 }
741
742 #[cold]
743 fn lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool {
744 let try_lock = |state: &mut usize| {
745 let mut spinwait_shared = SpinWait::new();
746 loop {
747 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
748 return false;
749 }
750
751 if self
752 .state
753 .compare_exchange_weak(
754 *state,
755 state
756 .checked_add(ONE_READER | UPGRADABLE_BIT)
757 .expect("RwLock reader count overflow"),
758 Ordering::Acquire,
759 Ordering::Relaxed,
760 )
761 .is_ok()
762 {
763 return true;
764 }
765
766 // If there is high contention on the reader count then we want
767 // to leave some time between attempts to acquire the lock to
768 // let other threads make progress.
769 spinwait_shared.spin_no_yield();
770 *state = self.state.load(Ordering::Relaxed);
771 }
772 };
773 self.lock_common(
774 timeout,
775 TOKEN_UPGRADABLE,
776 try_lock,
777 WRITER_BIT | UPGRADABLE_BIT,
778 )
779 }
780
781 #[cold]
782 fn unlock_upgradable_slow(&self, force_fair: bool) {
783 // Just release the lock if there are no parked threads.
784 let mut state = self.state.load(Ordering::Relaxed);
785 while state & PARKED_BIT == 0 {
786 match self.state.compare_exchange_weak(
787 state,
788 state - (ONE_READER | UPGRADABLE_BIT),
789 Ordering::Release,
790 Ordering::Relaxed,
791 ) {
792 Ok(_) => return,
793 Err(x) => state = x,
794 }
795 }
796
797 // There are threads to unpark. Try to unpark as many as we can.
798 let callback = |new_state, result: UnparkResult| {
799 // If we are using a fair unlock then we should keep the
800 // rwlock locked and hand it off to the unparked threads.
801 let mut state = self.state.load(Ordering::Relaxed);
802 if force_fair || result.be_fair {
803 // Fall back to normal unpark on overflow. Panicking is
804 // not allowed in parking_lot callbacks.
805 while let Some(mut new_state) =
806 (state - (ONE_READER | UPGRADABLE_BIT)).checked_add(new_state)
807 {
808 if result.have_more_threads {
809 new_state |= PARKED_BIT;
810 } else {
811 new_state &= !PARKED_BIT;
812 }
813 match self.state.compare_exchange_weak(
814 state,
815 new_state,
816 Ordering::Relaxed,
817 Ordering::Relaxed,
818 ) {
819 Ok(_) => return TOKEN_HANDOFF,
820 Err(x) => state = x,
821 }
822 }
823 }
824
825 // Otherwise just release the upgradable lock and update PARKED_BIT.
826 loop {
827 let mut new_state = state - (ONE_READER | UPGRADABLE_BIT);
828 if result.have_more_threads {
829 new_state |= PARKED_BIT;
830 } else {
831 new_state &= !PARKED_BIT;
832 }
833 match self.state.compare_exchange_weak(
834 state,
835 new_state,
836 Ordering::Relaxed,
837 Ordering::Relaxed,
838 ) {
839 Ok(_) => return TOKEN_NORMAL,
840 Err(x) => state = x,
841 }
842 }
843 };
844 // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
845 unsafe {
846 self.wake_parked_threads(0, callback);
847 }
848 }
849
850 #[cold]
851 fn try_upgrade_slow(&self) -> bool {
852 let mut state = self.state.load(Ordering::Relaxed);
853 loop {
854 if state & READERS_MASK != ONE_READER {
855 return false;
856 }
857 match self.state.compare_exchange_weak(
858 state,
859 state - (ONE_READER | UPGRADABLE_BIT) + WRITER_BIT,
860 Ordering::Relaxed,
861 Ordering::Relaxed,
862 ) {
863 Ok(_) => return true,
864 Err(x) => state = x,
865 }
866 }
867 }
868
869 #[cold]
870 fn upgrade_slow(&self, timeout: Option<Instant>) -> bool {
871 self.wait_for_readers(timeout, ONE_READER | UPGRADABLE_BIT)
872 }
873
874 #[cold]
875 fn downgrade_slow(&self) {
876 // We only reach this point if PARKED_BIT is set.
877 let callback = |_, result: UnparkResult| {
878 // Clear the parked bit if there no more parked threads
879 if !result.have_more_threads {
880 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
881 }
882 TOKEN_NORMAL
883 };
884 // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
885 unsafe {
886 self.wake_parked_threads(ONE_READER, callback);
887 }
888 }
889
890 #[cold]
891 fn downgrade_to_upgradable_slow(&self) {
892 // We only reach this point if PARKED_BIT is set.
893 let callback = |_, result: UnparkResult| {
894 // Clear the parked bit if there no more parked threads
895 if !result.have_more_threads {
896 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
897 }
898 TOKEN_NORMAL
899 };
900 // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
901 unsafe {
902 self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback);
903 }
904 }
905
906 #[cold]
907 unsafe fn bump_shared_slow(&self) {
908 self.unlock_shared();
909 self.lock_shared();
910 }
911
912 #[cold]
913 fn bump_exclusive_slow(&self) {
914 self.deadlock_release();
915 self.unlock_exclusive_slow(true);
916 self.lock_exclusive();
917 }
918
919 #[cold]
920 fn bump_upgradable_slow(&self) {
921 self.deadlock_release();
922 self.unlock_upgradable_slow(true);
923 self.lock_upgradable();
924 }
925
926 /// Common code for waking up parked threads after releasing WRITER_BIT or
927 /// UPGRADABLE_BIT.
928 ///
929 /// # Safety
930 ///
931 /// `callback` must uphold the requirements of the `callback` parameter to
932 /// `parking_lot_core::unpark_filter`. Meaning no panics or calls into any function in
933 /// `parking_lot`.
934 #[inline]
935 unsafe fn wake_parked_threads(
936 &self,
937 new_state: usize,
938 callback: impl FnOnce(usize, UnparkResult) -> UnparkToken,
939 ) {
940 // We must wake up at least one upgrader or writer if there is one,
941 // otherwise they may end up parked indefinitely since unlock_shared
942 // does not call wake_parked_threads.
943 let new_state = Cell::new(new_state);
944 let addr = self as *const _ as usize;
945 let filter = |ParkToken(token)| {
946 let s = new_state.get();
947
948 // If we are waking up a writer, don't wake anything else.
949 if s & WRITER_BIT != 0 {
950 return FilterOp::Stop;
951 }
952
953 // Otherwise wake *all* readers and one upgrader/writer.
954 if token & (UPGRADABLE_BIT | WRITER_BIT) != 0 && s & UPGRADABLE_BIT != 0 {
955 // Skip writers and upgradable readers if we already have
956 // a writer/upgradable reader.
957 FilterOp::Skip
958 } else {
959 new_state.set(s + token);
960 FilterOp::Unpark
961 }
962 };
963 let callback = |result| callback(new_state.get(), result);
964 // SAFETY:
965 // * `addr` is an address we control.
966 // * `filter` does not panic or call into any function of `parking_lot`.
967 // * `callback` safety responsibility is on caller
968 parking_lot_core::unpark_filter(addr, filter, callback);
969 }
970
971 // Common code for waiting for readers to exit the lock after acquiring
972 // WRITER_BIT.
973 #[inline]
974 fn wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool {
975 // At this point WRITER_BIT is already set, we just need to wait for the
976 // remaining readers to exit the lock.
977 let mut spinwait = SpinWait::new();
978 let mut state = self.state.load(Ordering::Acquire);
979 while state & READERS_MASK != 0 {
980 // Spin a few times to wait for readers to exit
981 if spinwait.spin() {
982 state = self.state.load(Ordering::Acquire);
983 continue;
984 }
985
986 // Set the parked bit
987 if state & WRITER_PARKED_BIT == 0 {
988 if let Err(x) = self.state.compare_exchange_weak(
989 state,
990 state | WRITER_PARKED_BIT,
991 Ordering::Acquire,
992 Ordering::Acquire,
993 ) {
994 state = x;
995 continue;
996 }
997 }
998
999 // Park our thread until we are woken up by an unlock
1000 // Using the 2nd key at addr + 1
1001 let addr = self as *const _ as usize + 1;
1002 let validate = || {
1003 let state = self.state.load(Ordering::Relaxed);
1004 state & READERS_MASK != 0 && state & WRITER_PARKED_BIT != 0
1005 };
1006 let before_sleep = || {};
1007 let timed_out = |_, _| {};
1008 // SAFETY:
1009 // * `addr` is an address we control.
1010 // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`.
1011 // * `before_sleep` does not call `park`, nor does it panic.
1012 let park_result = unsafe {
1013 parking_lot_core::park(
1014 addr,
1015 validate,
1016 before_sleep,
1017 timed_out,
1018 TOKEN_EXCLUSIVE,
1019 timeout,
1020 )
1021 };
1022 match park_result {
1023 // We still need to re-check the state if we are unparked
1024 // since a previous writer timing-out could have allowed
1025 // another reader to sneak in before we parked.
1026 ParkResult::Unparked(_) | ParkResult::Invalid => {
1027 state = self.state.load(Ordering::Acquire);
1028 continue;
1029 }
1030
1031 // Timeout expired
1032 ParkResult::TimedOut => {
1033 // We need to release WRITER_BIT and revert back to
1034 // our previous value. We also wake up any threads that
1035 // might be waiting on WRITER_BIT.
1036 let state = self.state.fetch_add(
1037 prev_value.wrapping_sub(WRITER_BIT | WRITER_PARKED_BIT),
1038 Ordering::Relaxed,
1039 );
1040 if state & PARKED_BIT != 0 {
1041 let callback = |_, result: UnparkResult| {
1042 // Clear the parked bit if there no more parked threads
1043 if !result.have_more_threads {
1044 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
1045 }
1046 TOKEN_NORMAL
1047 };
1048 // SAFETY: `callback` does not panic or call any function of `parking_lot`.
1049 unsafe {
1050 self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback);
1051 }
1052 }
1053 return false;
1054 }
1055 }
1056 }
1057 true
1058 }
1059
1060 /// Common code for acquiring a lock
1061 #[inline]
1062 fn lock_common(
1063 &self,
1064 timeout: Option<Instant>,
1065 token: ParkToken,
1066 mut try_lock: impl FnMut(&mut usize) -> bool,
1067 validate_flags: usize,
1068 ) -> bool {
1069 let mut spinwait = SpinWait::new();
1070 let mut state = self.state.load(Ordering::Relaxed);
1071 loop {
1072 // Attempt to grab the lock
1073 if try_lock(&mut state) {
1074 return true;
1075 }
1076
1077 // If there are no parked threads, try spinning a few times.
1078 if state & (PARKED_BIT | WRITER_PARKED_BIT) == 0 && spinwait.spin() {
1079 state = self.state.load(Ordering::Relaxed);
1080 continue;
1081 }
1082
1083 // Set the parked bit
1084 if state & PARKED_BIT == 0 {
1085 if let Err(x) = self.state.compare_exchange_weak(
1086 state,
1087 state | PARKED_BIT,
1088 Ordering::Relaxed,
1089 Ordering::Relaxed,
1090 ) {
1091 state = x;
1092 continue;
1093 }
1094 }
1095
1096 // Park our thread until we are woken up by an unlock
1097 let addr = self as *const _ as usize;
1098 let validate = || {
1099 let state = self.state.load(Ordering::Relaxed);
1100 state & PARKED_BIT != 0 && (state & validate_flags != 0)
1101 };
1102 let before_sleep = || {};
1103 let timed_out = |_, was_last_thread| {
1104 // Clear the parked bit if we were the last parked thread
1105 if was_last_thread {
1106 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
1107 }
1108 };
1109
1110 // SAFETY:
1111 // * `addr` is an address we control.
1112 // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`.
1113 // * `before_sleep` does not call `park`, nor does it panic.
1114 let park_result = unsafe {
1115 parking_lot_core::park(addr, validate, before_sleep, timed_out, token, timeout)
1116 };
1117 match park_result {
1118 // The thread that unparked us passed the lock on to us
1119 // directly without unlocking it.
1120 ParkResult::Unparked(TOKEN_HANDOFF) => return true,
1121
1122 // We were unparked normally, try acquiring the lock again
1123 ParkResult::Unparked(_) => (),
1124
1125 // The validation function failed, try locking again
1126 ParkResult::Invalid => (),
1127
1128 // Timeout expired
1129 ParkResult::TimedOut => return false,
1130 }
1131
1132 // Loop back and try locking again
1133 spinwait.reset();
1134 state = self.state.load(Ordering::Relaxed);
1135 }
1136 }
1137
1138 #[inline]
1139 fn deadlock_acquire(&self) {
1140 unsafe { deadlock::acquire_resource(self as *const _ as usize) };
1141 unsafe { deadlock::acquire_resource(self as *const _ as usize + 1) };
1142 }
1143
1144 #[inline]
1145 fn deadlock_release(&self) {
1146 unsafe { deadlock::release_resource(self as *const _ as usize) };
1147 unsafe { deadlock::release_resource(self as *const _ as usize + 1) };
1148 }
1149}
1150