1 | // Copyright 2016 Amanieu d'Antras |
2 | // |
3 | // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or |
4 | // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or |
5 | // http://opensource.org/licenses/MIT>, at your option. This file may not be |
6 | // copied, modified, or distributed except according to those terms. |
7 | |
8 | use crate::elision::{have_elision, AtomicElisionExt}; |
9 | use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL}; |
10 | use crate::util; |
11 | use core::{ |
12 | cell::Cell, |
13 | sync::atomic::{AtomicUsize, Ordering}, |
14 | }; |
15 | use lock_api::{RawRwLock as RawRwLock_, RawRwLockUpgrade}; |
16 | use parking_lot_core::{ |
17 | self, deadlock, FilterOp, ParkResult, ParkToken, SpinWait, UnparkResult, UnparkToken, |
18 | }; |
19 | use std::time::{Duration, Instant}; |
20 | |
21 | // This reader-writer lock implementation is based on Boost's upgrade_mutex: |
22 | // https://github.com/boostorg/thread/blob/fc08c1fe2840baeeee143440fba31ef9e9a813c8/include/boost/thread/v2/shared_mutex.hpp#L432 |
23 | // |
24 | // This implementation uses 2 wait queues, one at key [addr] and one at key |
25 | // [addr + 1]. The primary queue is used for all new waiting threads, and the |
26 | // secondary queue is used by the thread which has acquired WRITER_BIT but is |
27 | // waiting for the remaining readers to exit the lock. |
28 | // |
29 | // This implementation is fair between readers and writers since it uses the |
30 | // order in which threads first started queuing to alternate between read phases |
31 | // and write phases. In particular is it not vulnerable to write starvation |
32 | // since readers will block if there is a pending writer. |
33 | |
34 | // There is at least one thread in the main queue. |
35 | const PARKED_BIT: usize = 0b0001; |
36 | // There is a parked thread holding WRITER_BIT. WRITER_BIT must be set. |
37 | const WRITER_PARKED_BIT: usize = 0b0010; |
38 | // A reader is holding an upgradable lock. The reader count must be non-zero and |
39 | // WRITER_BIT must not be set. |
40 | const UPGRADABLE_BIT: usize = 0b0100; |
41 | // If the reader count is zero: a writer is currently holding an exclusive lock. |
42 | // Otherwise: a writer is waiting for the remaining readers to exit the lock. |
43 | const WRITER_BIT: usize = 0b1000; |
44 | // Mask of bits used to count readers. |
45 | const READERS_MASK: usize = !0b1111; |
46 | // Base unit for counting readers. |
47 | const ONE_READER: usize = 0b10000; |
48 | |
49 | // Token indicating what type of lock a queued thread is trying to acquire |
50 | const TOKEN_SHARED: ParkToken = ParkToken(ONE_READER); |
51 | const TOKEN_EXCLUSIVE: ParkToken = ParkToken(WRITER_BIT); |
52 | const TOKEN_UPGRADABLE: ParkToken = ParkToken(ONE_READER | UPGRADABLE_BIT); |
53 | |
54 | /// Raw reader-writer lock type backed by the parking lot. |
55 | pub struct RawRwLock { |
56 | state: AtomicUsize, |
57 | } |
58 | |
59 | unsafe impl lock_api::RawRwLock for RawRwLock { |
60 | const INIT: RawRwLock = RawRwLock { |
61 | state: AtomicUsize::new(0), |
62 | }; |
63 | |
64 | type GuardMarker = crate::GuardMarker; |
65 | |
66 | #[inline ] |
67 | fn lock_exclusive(&self) { |
68 | if self |
69 | .state |
70 | .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) |
71 | .is_err() |
72 | { |
73 | let result = self.lock_exclusive_slow(None); |
74 | debug_assert!(result); |
75 | } |
76 | self.deadlock_acquire(); |
77 | } |
78 | |
79 | #[inline ] |
80 | fn try_lock_exclusive(&self) -> bool { |
81 | if self |
82 | .state |
83 | .compare_exchange(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) |
84 | .is_ok() |
85 | { |
86 | self.deadlock_acquire(); |
87 | true |
88 | } else { |
89 | false |
90 | } |
91 | } |
92 | |
93 | #[inline ] |
94 | unsafe fn unlock_exclusive(&self) { |
95 | self.deadlock_release(); |
96 | if self |
97 | .state |
98 | .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed) |
99 | .is_ok() |
100 | { |
101 | return; |
102 | } |
103 | self.unlock_exclusive_slow(false); |
104 | } |
105 | |
106 | #[inline ] |
107 | fn lock_shared(&self) { |
108 | if !self.try_lock_shared_fast(false) { |
109 | let result = self.lock_shared_slow(false, None); |
110 | debug_assert!(result); |
111 | } |
112 | self.deadlock_acquire(); |
113 | } |
114 | |
115 | #[inline ] |
116 | fn try_lock_shared(&self) -> bool { |
117 | let result = if self.try_lock_shared_fast(false) { |
118 | true |
119 | } else { |
120 | self.try_lock_shared_slow(false) |
121 | }; |
122 | if result { |
123 | self.deadlock_acquire(); |
124 | } |
125 | result |
126 | } |
127 | |
128 | #[inline ] |
129 | unsafe fn unlock_shared(&self) { |
130 | self.deadlock_release(); |
131 | let state = if have_elision() { |
132 | self.state.elision_fetch_sub_release(ONE_READER) |
133 | } else { |
134 | self.state.fetch_sub(ONE_READER, Ordering::Release) |
135 | }; |
136 | if state & (READERS_MASK | WRITER_PARKED_BIT) == (ONE_READER | WRITER_PARKED_BIT) { |
137 | self.unlock_shared_slow(); |
138 | } |
139 | } |
140 | |
141 | #[inline ] |
142 | fn is_locked(&self) -> bool { |
143 | let state = self.state.load(Ordering::Relaxed); |
144 | state & (WRITER_BIT | READERS_MASK) != 0 |
145 | } |
146 | |
147 | #[inline ] |
148 | fn is_locked_exclusive(&self) -> bool { |
149 | let state = self.state.load(Ordering::Relaxed); |
150 | state & (WRITER_BIT) != 0 |
151 | } |
152 | } |
153 | |
154 | unsafe impl lock_api::RawRwLockFair for RawRwLock { |
155 | #[inline ] |
156 | unsafe fn unlock_shared_fair(&self) { |
157 | // Shared unlocking is always fair in this implementation. |
158 | self.unlock_shared(); |
159 | } |
160 | |
161 | #[inline ] |
162 | unsafe fn unlock_exclusive_fair(&self) { |
163 | self.deadlock_release(); |
164 | if self |
165 | .state |
166 | .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed) |
167 | .is_ok() |
168 | { |
169 | return; |
170 | } |
171 | self.unlock_exclusive_slow(true); |
172 | } |
173 | |
174 | #[inline ] |
175 | unsafe fn bump_shared(&self) { |
176 | if self.state.load(Ordering::Relaxed) & (READERS_MASK | WRITER_BIT) |
177 | == ONE_READER | WRITER_BIT |
178 | { |
179 | self.bump_shared_slow(); |
180 | } |
181 | } |
182 | |
183 | #[inline ] |
184 | unsafe fn bump_exclusive(&self) { |
185 | if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 { |
186 | self.bump_exclusive_slow(); |
187 | } |
188 | } |
189 | } |
190 | |
191 | unsafe impl lock_api::RawRwLockDowngrade for RawRwLock { |
192 | #[inline ] |
193 | unsafe fn downgrade(&self) { |
194 | let state = self |
195 | .state |
196 | .fetch_add(ONE_READER - WRITER_BIT, Ordering::Release); |
197 | |
198 | // Wake up parked shared and upgradable threads if there are any |
199 | if state & PARKED_BIT != 0 { |
200 | self.downgrade_slow(); |
201 | } |
202 | } |
203 | } |
204 | |
205 | unsafe impl lock_api::RawRwLockTimed for RawRwLock { |
206 | type Duration = Duration; |
207 | type Instant = Instant; |
208 | |
209 | #[inline ] |
210 | fn try_lock_shared_for(&self, timeout: Self::Duration) -> bool { |
211 | let result = if self.try_lock_shared_fast(false) { |
212 | true |
213 | } else { |
214 | self.lock_shared_slow(false, util::to_deadline(timeout)) |
215 | }; |
216 | if result { |
217 | self.deadlock_acquire(); |
218 | } |
219 | result |
220 | } |
221 | |
222 | #[inline ] |
223 | fn try_lock_shared_until(&self, timeout: Self::Instant) -> bool { |
224 | let result = if self.try_lock_shared_fast(false) { |
225 | true |
226 | } else { |
227 | self.lock_shared_slow(false, Some(timeout)) |
228 | }; |
229 | if result { |
230 | self.deadlock_acquire(); |
231 | } |
232 | result |
233 | } |
234 | |
235 | #[inline ] |
236 | fn try_lock_exclusive_for(&self, timeout: Duration) -> bool { |
237 | let result = if self |
238 | .state |
239 | .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) |
240 | .is_ok() |
241 | { |
242 | true |
243 | } else { |
244 | self.lock_exclusive_slow(util::to_deadline(timeout)) |
245 | }; |
246 | if result { |
247 | self.deadlock_acquire(); |
248 | } |
249 | result |
250 | } |
251 | |
252 | #[inline ] |
253 | fn try_lock_exclusive_until(&self, timeout: Instant) -> bool { |
254 | let result = if self |
255 | .state |
256 | .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) |
257 | .is_ok() |
258 | { |
259 | true |
260 | } else { |
261 | self.lock_exclusive_slow(Some(timeout)) |
262 | }; |
263 | if result { |
264 | self.deadlock_acquire(); |
265 | } |
266 | result |
267 | } |
268 | } |
269 | |
270 | unsafe impl lock_api::RawRwLockRecursive for RawRwLock { |
271 | #[inline ] |
272 | fn lock_shared_recursive(&self) { |
273 | if !self.try_lock_shared_fast(true) { |
274 | let result = self.lock_shared_slow(true, None); |
275 | debug_assert!(result); |
276 | } |
277 | self.deadlock_acquire(); |
278 | } |
279 | |
280 | #[inline ] |
281 | fn try_lock_shared_recursive(&self) -> bool { |
282 | let result = if self.try_lock_shared_fast(true) { |
283 | true |
284 | } else { |
285 | self.try_lock_shared_slow(true) |
286 | }; |
287 | if result { |
288 | self.deadlock_acquire(); |
289 | } |
290 | result |
291 | } |
292 | } |
293 | |
294 | unsafe impl lock_api::RawRwLockRecursiveTimed for RawRwLock { |
295 | #[inline ] |
296 | fn try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool { |
297 | let result = if self.try_lock_shared_fast(true) { |
298 | true |
299 | } else { |
300 | self.lock_shared_slow(true, util::to_deadline(timeout)) |
301 | }; |
302 | if result { |
303 | self.deadlock_acquire(); |
304 | } |
305 | result |
306 | } |
307 | |
308 | #[inline ] |
309 | fn try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool { |
310 | let result = if self.try_lock_shared_fast(true) { |
311 | true |
312 | } else { |
313 | self.lock_shared_slow(true, Some(timeout)) |
314 | }; |
315 | if result { |
316 | self.deadlock_acquire(); |
317 | } |
318 | result |
319 | } |
320 | } |
321 | |
322 | unsafe impl lock_api::RawRwLockUpgrade for RawRwLock { |
323 | #[inline ] |
324 | fn lock_upgradable(&self) { |
325 | if !self.try_lock_upgradable_fast() { |
326 | let result = self.lock_upgradable_slow(None); |
327 | debug_assert!(result); |
328 | } |
329 | self.deadlock_acquire(); |
330 | } |
331 | |
332 | #[inline ] |
333 | fn try_lock_upgradable(&self) -> bool { |
334 | let result = if self.try_lock_upgradable_fast() { |
335 | true |
336 | } else { |
337 | self.try_lock_upgradable_slow() |
338 | }; |
339 | if result { |
340 | self.deadlock_acquire(); |
341 | } |
342 | result |
343 | } |
344 | |
345 | #[inline ] |
346 | unsafe fn unlock_upgradable(&self) { |
347 | self.deadlock_release(); |
348 | let state = self.state.load(Ordering::Relaxed); |
349 | if state & PARKED_BIT == 0 { |
350 | if self |
351 | .state |
352 | .compare_exchange_weak( |
353 | state, |
354 | state - (ONE_READER | UPGRADABLE_BIT), |
355 | Ordering::Release, |
356 | Ordering::Relaxed, |
357 | ) |
358 | .is_ok() |
359 | { |
360 | return; |
361 | } |
362 | } |
363 | self.unlock_upgradable_slow(false); |
364 | } |
365 | |
366 | #[inline ] |
367 | unsafe fn upgrade(&self) { |
368 | let state = self.state.fetch_sub( |
369 | (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, |
370 | Ordering::Acquire, |
371 | ); |
372 | if state & READERS_MASK != ONE_READER { |
373 | let result = self.upgrade_slow(None); |
374 | debug_assert!(result); |
375 | } |
376 | } |
377 | |
378 | #[inline ] |
379 | unsafe fn try_upgrade(&self) -> bool { |
380 | if self |
381 | .state |
382 | .compare_exchange_weak( |
383 | ONE_READER | UPGRADABLE_BIT, |
384 | WRITER_BIT, |
385 | Ordering::Acquire, |
386 | Ordering::Relaxed, |
387 | ) |
388 | .is_ok() |
389 | { |
390 | true |
391 | } else { |
392 | self.try_upgrade_slow() |
393 | } |
394 | } |
395 | } |
396 | |
397 | unsafe impl lock_api::RawRwLockUpgradeFair for RawRwLock { |
398 | #[inline ] |
399 | unsafe fn unlock_upgradable_fair(&self) { |
400 | self.deadlock_release(); |
401 | let state = self.state.load(Ordering::Relaxed); |
402 | if state & PARKED_BIT == 0 { |
403 | if self |
404 | .state |
405 | .compare_exchange_weak( |
406 | state, |
407 | state - (ONE_READER | UPGRADABLE_BIT), |
408 | Ordering::Release, |
409 | Ordering::Relaxed, |
410 | ) |
411 | .is_ok() |
412 | { |
413 | return; |
414 | } |
415 | } |
416 | self.unlock_upgradable_slow(false); |
417 | } |
418 | |
419 | #[inline ] |
420 | unsafe fn bump_upgradable(&self) { |
421 | if self.state.load(Ordering::Relaxed) == ONE_READER | UPGRADABLE_BIT | PARKED_BIT { |
422 | self.bump_upgradable_slow(); |
423 | } |
424 | } |
425 | } |
426 | |
427 | unsafe impl lock_api::RawRwLockUpgradeDowngrade for RawRwLock { |
428 | #[inline ] |
429 | unsafe fn downgrade_upgradable(&self) { |
430 | let state = self.state.fetch_sub(UPGRADABLE_BIT, Ordering::Relaxed); |
431 | |
432 | // Wake up parked upgradable threads if there are any |
433 | if state & PARKED_BIT != 0 { |
434 | self.downgrade_slow(); |
435 | } |
436 | } |
437 | |
438 | #[inline ] |
439 | unsafe fn downgrade_to_upgradable(&self) { |
440 | let state = self.state.fetch_add( |
441 | (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, |
442 | Ordering::Release, |
443 | ); |
444 | |
445 | // Wake up parked shared threads if there are any |
446 | if state & PARKED_BIT != 0 { |
447 | self.downgrade_to_upgradable_slow(); |
448 | } |
449 | } |
450 | } |
451 | |
452 | unsafe impl lock_api::RawRwLockUpgradeTimed for RawRwLock { |
453 | #[inline ] |
454 | fn try_lock_upgradable_until(&self, timeout: Instant) -> bool { |
455 | let result = if self.try_lock_upgradable_fast() { |
456 | true |
457 | } else { |
458 | self.lock_upgradable_slow(Some(timeout)) |
459 | }; |
460 | if result { |
461 | self.deadlock_acquire(); |
462 | } |
463 | result |
464 | } |
465 | |
466 | #[inline ] |
467 | fn try_lock_upgradable_for(&self, timeout: Duration) -> bool { |
468 | let result = if self.try_lock_upgradable_fast() { |
469 | true |
470 | } else { |
471 | self.lock_upgradable_slow(util::to_deadline(timeout)) |
472 | }; |
473 | if result { |
474 | self.deadlock_acquire(); |
475 | } |
476 | result |
477 | } |
478 | |
479 | #[inline ] |
480 | unsafe fn try_upgrade_until(&self, timeout: Instant) -> bool { |
481 | let state = self.state.fetch_sub( |
482 | (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, |
483 | Ordering::Relaxed, |
484 | ); |
485 | if state & READERS_MASK == ONE_READER { |
486 | true |
487 | } else { |
488 | self.upgrade_slow(Some(timeout)) |
489 | } |
490 | } |
491 | |
492 | #[inline ] |
493 | unsafe fn try_upgrade_for(&self, timeout: Duration) -> bool { |
494 | let state = self.state.fetch_sub( |
495 | (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, |
496 | Ordering::Relaxed, |
497 | ); |
498 | if state & READERS_MASK == ONE_READER { |
499 | true |
500 | } else { |
501 | self.upgrade_slow(util::to_deadline(timeout)) |
502 | } |
503 | } |
504 | } |
505 | |
506 | impl RawRwLock { |
507 | #[inline (always)] |
508 | fn try_lock_shared_fast(&self, recursive: bool) -> bool { |
509 | let state = self.state.load(Ordering::Relaxed); |
510 | |
511 | // We can't allow grabbing a shared lock if there is a writer, even if |
512 | // the writer is still waiting for the remaining readers to exit. |
513 | if state & WRITER_BIT != 0 { |
514 | // To allow recursive locks, we make an exception and allow readers |
515 | // to skip ahead of a pending writer to avoid deadlocking, at the |
516 | // cost of breaking the fairness guarantees. |
517 | if !recursive || state & READERS_MASK == 0 { |
518 | return false; |
519 | } |
520 | } |
521 | |
522 | // Use hardware lock elision to avoid cache conflicts when multiple |
523 | // readers try to acquire the lock. We only do this if the lock is |
524 | // completely empty since elision handles conflicts poorly. |
525 | if have_elision() && state == 0 { |
526 | self.state |
527 | .elision_compare_exchange_acquire(0, ONE_READER) |
528 | .is_ok() |
529 | } else if let Some(new_state) = state.checked_add(ONE_READER) { |
530 | self.state |
531 | .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed) |
532 | .is_ok() |
533 | } else { |
534 | false |
535 | } |
536 | } |
537 | |
538 | #[cold ] |
539 | fn try_lock_shared_slow(&self, recursive: bool) -> bool { |
540 | let mut state = self.state.load(Ordering::Relaxed); |
541 | loop { |
542 | // This mirrors the condition in try_lock_shared_fast |
543 | if state & WRITER_BIT != 0 { |
544 | if !recursive || state & READERS_MASK == 0 { |
545 | return false; |
546 | } |
547 | } |
548 | if have_elision() && state == 0 { |
549 | match self.state.elision_compare_exchange_acquire(0, ONE_READER) { |
550 | Ok(_) => return true, |
551 | Err(x) => state = x, |
552 | } |
553 | } else { |
554 | match self.state.compare_exchange_weak( |
555 | state, |
556 | state |
557 | .checked_add(ONE_READER) |
558 | .expect("RwLock reader count overflow" ), |
559 | Ordering::Acquire, |
560 | Ordering::Relaxed, |
561 | ) { |
562 | Ok(_) => return true, |
563 | Err(x) => state = x, |
564 | } |
565 | } |
566 | } |
567 | } |
568 | |
569 | #[inline (always)] |
570 | fn try_lock_upgradable_fast(&self) -> bool { |
571 | let state = self.state.load(Ordering::Relaxed); |
572 | |
573 | // We can't grab an upgradable lock if there is already a writer or |
574 | // upgradable reader. |
575 | if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { |
576 | return false; |
577 | } |
578 | |
579 | if let Some(new_state) = state.checked_add(ONE_READER | UPGRADABLE_BIT) { |
580 | self.state |
581 | .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed) |
582 | .is_ok() |
583 | } else { |
584 | false |
585 | } |
586 | } |
587 | |
588 | #[cold ] |
589 | fn try_lock_upgradable_slow(&self) -> bool { |
590 | let mut state = self.state.load(Ordering::Relaxed); |
591 | loop { |
592 | // This mirrors the condition in try_lock_upgradable_fast |
593 | if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { |
594 | return false; |
595 | } |
596 | |
597 | match self.state.compare_exchange_weak( |
598 | state, |
599 | state |
600 | .checked_add(ONE_READER | UPGRADABLE_BIT) |
601 | .expect("RwLock reader count overflow" ), |
602 | Ordering::Acquire, |
603 | Ordering::Relaxed, |
604 | ) { |
605 | Ok(_) => return true, |
606 | Err(x) => state = x, |
607 | } |
608 | } |
609 | } |
610 | |
611 | #[cold ] |
612 | fn lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool { |
613 | let try_lock = |state: &mut usize| { |
614 | loop { |
615 | if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { |
616 | return false; |
617 | } |
618 | |
619 | // Grab WRITER_BIT if it isn't set, even if there are parked threads. |
620 | match self.state.compare_exchange_weak( |
621 | *state, |
622 | *state | WRITER_BIT, |
623 | Ordering::Acquire, |
624 | Ordering::Relaxed, |
625 | ) { |
626 | Ok(_) => return true, |
627 | Err(x) => *state = x, |
628 | } |
629 | } |
630 | }; |
631 | |
632 | // Step 1: grab exclusive ownership of WRITER_BIT |
633 | let timed_out = !self.lock_common( |
634 | timeout, |
635 | TOKEN_EXCLUSIVE, |
636 | try_lock, |
637 | WRITER_BIT | UPGRADABLE_BIT, |
638 | ); |
639 | if timed_out { |
640 | return false; |
641 | } |
642 | |
643 | // Step 2: wait for all remaining readers to exit the lock. |
644 | self.wait_for_readers(timeout, 0) |
645 | } |
646 | |
647 | #[cold ] |
648 | fn unlock_exclusive_slow(&self, force_fair: bool) { |
649 | // There are threads to unpark. Try to unpark as many as we can. |
650 | let callback = |mut new_state, result: UnparkResult| { |
651 | // If we are using a fair unlock then we should keep the |
652 | // rwlock locked and hand it off to the unparked threads. |
653 | if result.unparked_threads != 0 && (force_fair || result.be_fair) { |
654 | if result.have_more_threads { |
655 | new_state |= PARKED_BIT; |
656 | } |
657 | self.state.store(new_state, Ordering::Release); |
658 | TOKEN_HANDOFF |
659 | } else { |
660 | // Clear the parked bit if there are no more parked threads. |
661 | if result.have_more_threads { |
662 | self.state.store(PARKED_BIT, Ordering::Release); |
663 | } else { |
664 | self.state.store(0, Ordering::Release); |
665 | } |
666 | TOKEN_NORMAL |
667 | } |
668 | }; |
669 | // SAFETY: `callback` does not panic or call into any function of `parking_lot`. |
670 | unsafe { |
671 | self.wake_parked_threads(0, callback); |
672 | } |
673 | } |
674 | |
675 | #[cold ] |
676 | fn lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool { |
677 | let try_lock = |state: &mut usize| { |
678 | let mut spinwait_shared = SpinWait::new(); |
679 | loop { |
680 | // Use hardware lock elision to avoid cache conflicts when multiple |
681 | // readers try to acquire the lock. We only do this if the lock is |
682 | // completely empty since elision handles conflicts poorly. |
683 | if have_elision() && *state == 0 { |
684 | match self.state.elision_compare_exchange_acquire(0, ONE_READER) { |
685 | Ok(_) => return true, |
686 | Err(x) => *state = x, |
687 | } |
688 | } |
689 | |
690 | // This is the same condition as try_lock_shared_fast |
691 | if *state & WRITER_BIT != 0 { |
692 | if !recursive || *state & READERS_MASK == 0 { |
693 | return false; |
694 | } |
695 | } |
696 | |
697 | if self |
698 | .state |
699 | .compare_exchange_weak( |
700 | *state, |
701 | state |
702 | .checked_add(ONE_READER) |
703 | .expect("RwLock reader count overflow" ), |
704 | Ordering::Acquire, |
705 | Ordering::Relaxed, |
706 | ) |
707 | .is_ok() |
708 | { |
709 | return true; |
710 | } |
711 | |
712 | // If there is high contention on the reader count then we want |
713 | // to leave some time between attempts to acquire the lock to |
714 | // let other threads make progress. |
715 | spinwait_shared.spin_no_yield(); |
716 | *state = self.state.load(Ordering::Relaxed); |
717 | } |
718 | }; |
719 | self.lock_common(timeout, TOKEN_SHARED, try_lock, WRITER_BIT) |
720 | } |
721 | |
722 | #[cold ] |
723 | fn unlock_shared_slow(&self) { |
724 | // At this point WRITER_PARKED_BIT is set and READER_MASK is empty. We |
725 | // just need to wake up a potentially sleeping pending writer. |
726 | // Using the 2nd key at addr + 1 |
727 | let addr = self as *const _ as usize + 1; |
728 | let callback = |_result: UnparkResult| { |
729 | // Clear the WRITER_PARKED_BIT here since there can only be one |
730 | // parked writer thread. |
731 | self.state.fetch_and(!WRITER_PARKED_BIT, Ordering::Relaxed); |
732 | TOKEN_NORMAL |
733 | }; |
734 | // SAFETY: |
735 | // * `addr` is an address we control. |
736 | // * `callback` does not panic or call into any function of `parking_lot`. |
737 | unsafe { |
738 | parking_lot_core::unpark_one(addr, callback); |
739 | } |
740 | } |
741 | |
742 | #[cold ] |
743 | fn lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool { |
744 | let try_lock = |state: &mut usize| { |
745 | let mut spinwait_shared = SpinWait::new(); |
746 | loop { |
747 | if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { |
748 | return false; |
749 | } |
750 | |
751 | if self |
752 | .state |
753 | .compare_exchange_weak( |
754 | *state, |
755 | state |
756 | .checked_add(ONE_READER | UPGRADABLE_BIT) |
757 | .expect("RwLock reader count overflow" ), |
758 | Ordering::Acquire, |
759 | Ordering::Relaxed, |
760 | ) |
761 | .is_ok() |
762 | { |
763 | return true; |
764 | } |
765 | |
766 | // If there is high contention on the reader count then we want |
767 | // to leave some time between attempts to acquire the lock to |
768 | // let other threads make progress. |
769 | spinwait_shared.spin_no_yield(); |
770 | *state = self.state.load(Ordering::Relaxed); |
771 | } |
772 | }; |
773 | self.lock_common( |
774 | timeout, |
775 | TOKEN_UPGRADABLE, |
776 | try_lock, |
777 | WRITER_BIT | UPGRADABLE_BIT, |
778 | ) |
779 | } |
780 | |
781 | #[cold ] |
782 | fn unlock_upgradable_slow(&self, force_fair: bool) { |
783 | // Just release the lock if there are no parked threads. |
784 | let mut state = self.state.load(Ordering::Relaxed); |
785 | while state & PARKED_BIT == 0 { |
786 | match self.state.compare_exchange_weak( |
787 | state, |
788 | state - (ONE_READER | UPGRADABLE_BIT), |
789 | Ordering::Release, |
790 | Ordering::Relaxed, |
791 | ) { |
792 | Ok(_) => return, |
793 | Err(x) => state = x, |
794 | } |
795 | } |
796 | |
797 | // There are threads to unpark. Try to unpark as many as we can. |
798 | let callback = |new_state, result: UnparkResult| { |
799 | // If we are using a fair unlock then we should keep the |
800 | // rwlock locked and hand it off to the unparked threads. |
801 | let mut state = self.state.load(Ordering::Relaxed); |
802 | if force_fair || result.be_fair { |
803 | // Fall back to normal unpark on overflow. Panicking is |
804 | // not allowed in parking_lot callbacks. |
805 | while let Some(mut new_state) = |
806 | (state - (ONE_READER | UPGRADABLE_BIT)).checked_add(new_state) |
807 | { |
808 | if result.have_more_threads { |
809 | new_state |= PARKED_BIT; |
810 | } else { |
811 | new_state &= !PARKED_BIT; |
812 | } |
813 | match self.state.compare_exchange_weak( |
814 | state, |
815 | new_state, |
816 | Ordering::Relaxed, |
817 | Ordering::Relaxed, |
818 | ) { |
819 | Ok(_) => return TOKEN_HANDOFF, |
820 | Err(x) => state = x, |
821 | } |
822 | } |
823 | } |
824 | |
825 | // Otherwise just release the upgradable lock and update PARKED_BIT. |
826 | loop { |
827 | let mut new_state = state - (ONE_READER | UPGRADABLE_BIT); |
828 | if result.have_more_threads { |
829 | new_state |= PARKED_BIT; |
830 | } else { |
831 | new_state &= !PARKED_BIT; |
832 | } |
833 | match self.state.compare_exchange_weak( |
834 | state, |
835 | new_state, |
836 | Ordering::Relaxed, |
837 | Ordering::Relaxed, |
838 | ) { |
839 | Ok(_) => return TOKEN_NORMAL, |
840 | Err(x) => state = x, |
841 | } |
842 | } |
843 | }; |
844 | // SAFETY: `callback` does not panic or call into any function of `parking_lot`. |
845 | unsafe { |
846 | self.wake_parked_threads(0, callback); |
847 | } |
848 | } |
849 | |
850 | #[cold ] |
851 | fn try_upgrade_slow(&self) -> bool { |
852 | let mut state = self.state.load(Ordering::Relaxed); |
853 | loop { |
854 | if state & READERS_MASK != ONE_READER { |
855 | return false; |
856 | } |
857 | match self.state.compare_exchange_weak( |
858 | state, |
859 | state - (ONE_READER | UPGRADABLE_BIT) + WRITER_BIT, |
860 | Ordering::Relaxed, |
861 | Ordering::Relaxed, |
862 | ) { |
863 | Ok(_) => return true, |
864 | Err(x) => state = x, |
865 | } |
866 | } |
867 | } |
868 | |
869 | #[cold ] |
870 | fn upgrade_slow(&self, timeout: Option<Instant>) -> bool { |
871 | self.wait_for_readers(timeout, ONE_READER | UPGRADABLE_BIT) |
872 | } |
873 | |
874 | #[cold ] |
875 | fn downgrade_slow(&self) { |
876 | // We only reach this point if PARKED_BIT is set. |
877 | let callback = |_, result: UnparkResult| { |
878 | // Clear the parked bit if there no more parked threads |
879 | if !result.have_more_threads { |
880 | self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); |
881 | } |
882 | TOKEN_NORMAL |
883 | }; |
884 | // SAFETY: `callback` does not panic or call into any function of `parking_lot`. |
885 | unsafe { |
886 | self.wake_parked_threads(ONE_READER, callback); |
887 | } |
888 | } |
889 | |
890 | #[cold ] |
891 | fn downgrade_to_upgradable_slow(&self) { |
892 | // We only reach this point if PARKED_BIT is set. |
893 | let callback = |_, result: UnparkResult| { |
894 | // Clear the parked bit if there no more parked threads |
895 | if !result.have_more_threads { |
896 | self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); |
897 | } |
898 | TOKEN_NORMAL |
899 | }; |
900 | // SAFETY: `callback` does not panic or call into any function of `parking_lot`. |
901 | unsafe { |
902 | self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback); |
903 | } |
904 | } |
905 | |
906 | #[cold ] |
907 | unsafe fn bump_shared_slow(&self) { |
908 | self.unlock_shared(); |
909 | self.lock_shared(); |
910 | } |
911 | |
912 | #[cold ] |
913 | fn bump_exclusive_slow(&self) { |
914 | self.deadlock_release(); |
915 | self.unlock_exclusive_slow(true); |
916 | self.lock_exclusive(); |
917 | } |
918 | |
919 | #[cold ] |
920 | fn bump_upgradable_slow(&self) { |
921 | self.deadlock_release(); |
922 | self.unlock_upgradable_slow(true); |
923 | self.lock_upgradable(); |
924 | } |
925 | |
926 | /// Common code for waking up parked threads after releasing WRITER_BIT or |
927 | /// UPGRADABLE_BIT. |
928 | /// |
929 | /// # Safety |
930 | /// |
931 | /// `callback` must uphold the requirements of the `callback` parameter to |
932 | /// `parking_lot_core::unpark_filter`. Meaning no panics or calls into any function in |
933 | /// `parking_lot`. |
934 | #[inline ] |
935 | unsafe fn wake_parked_threads( |
936 | &self, |
937 | new_state: usize, |
938 | callback: impl FnOnce(usize, UnparkResult) -> UnparkToken, |
939 | ) { |
940 | // We must wake up at least one upgrader or writer if there is one, |
941 | // otherwise they may end up parked indefinitely since unlock_shared |
942 | // does not call wake_parked_threads. |
943 | let new_state = Cell::new(new_state); |
944 | let addr = self as *const _ as usize; |
945 | let filter = |ParkToken(token)| { |
946 | let s = new_state.get(); |
947 | |
948 | // If we are waking up a writer, don't wake anything else. |
949 | if s & WRITER_BIT != 0 { |
950 | return FilterOp::Stop; |
951 | } |
952 | |
953 | // Otherwise wake *all* readers and one upgrader/writer. |
954 | if token & (UPGRADABLE_BIT | WRITER_BIT) != 0 && s & UPGRADABLE_BIT != 0 { |
955 | // Skip writers and upgradable readers if we already have |
956 | // a writer/upgradable reader. |
957 | FilterOp::Skip |
958 | } else { |
959 | new_state.set(s + token); |
960 | FilterOp::Unpark |
961 | } |
962 | }; |
963 | let callback = |result| callback(new_state.get(), result); |
964 | // SAFETY: |
965 | // * `addr` is an address we control. |
966 | // * `filter` does not panic or call into any function of `parking_lot`. |
967 | // * `callback` safety responsibility is on caller |
968 | parking_lot_core::unpark_filter(addr, filter, callback); |
969 | } |
970 | |
971 | // Common code for waiting for readers to exit the lock after acquiring |
972 | // WRITER_BIT. |
973 | #[inline ] |
974 | fn wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool { |
975 | // At this point WRITER_BIT is already set, we just need to wait for the |
976 | // remaining readers to exit the lock. |
977 | let mut spinwait = SpinWait::new(); |
978 | let mut state = self.state.load(Ordering::Acquire); |
979 | while state & READERS_MASK != 0 { |
980 | // Spin a few times to wait for readers to exit |
981 | if spinwait.spin() { |
982 | state = self.state.load(Ordering::Acquire); |
983 | continue; |
984 | } |
985 | |
986 | // Set the parked bit |
987 | if state & WRITER_PARKED_BIT == 0 { |
988 | if let Err(x) = self.state.compare_exchange_weak( |
989 | state, |
990 | state | WRITER_PARKED_BIT, |
991 | Ordering::Acquire, |
992 | Ordering::Acquire, |
993 | ) { |
994 | state = x; |
995 | continue; |
996 | } |
997 | } |
998 | |
999 | // Park our thread until we are woken up by an unlock |
1000 | // Using the 2nd key at addr + 1 |
1001 | let addr = self as *const _ as usize + 1; |
1002 | let validate = || { |
1003 | let state = self.state.load(Ordering::Relaxed); |
1004 | state & READERS_MASK != 0 && state & WRITER_PARKED_BIT != 0 |
1005 | }; |
1006 | let before_sleep = || {}; |
1007 | let timed_out = |_, _| {}; |
1008 | // SAFETY: |
1009 | // * `addr` is an address we control. |
1010 | // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`. |
1011 | // * `before_sleep` does not call `park`, nor does it panic. |
1012 | let park_result = unsafe { |
1013 | parking_lot_core::park( |
1014 | addr, |
1015 | validate, |
1016 | before_sleep, |
1017 | timed_out, |
1018 | TOKEN_EXCLUSIVE, |
1019 | timeout, |
1020 | ) |
1021 | }; |
1022 | match park_result { |
1023 | // We still need to re-check the state if we are unparked |
1024 | // since a previous writer timing-out could have allowed |
1025 | // another reader to sneak in before we parked. |
1026 | ParkResult::Unparked(_) | ParkResult::Invalid => { |
1027 | state = self.state.load(Ordering::Acquire); |
1028 | continue; |
1029 | } |
1030 | |
1031 | // Timeout expired |
1032 | ParkResult::TimedOut => { |
1033 | // We need to release WRITER_BIT and revert back to |
1034 | // our previous value. We also wake up any threads that |
1035 | // might be waiting on WRITER_BIT. |
1036 | let state = self.state.fetch_add( |
1037 | prev_value.wrapping_sub(WRITER_BIT | WRITER_PARKED_BIT), |
1038 | Ordering::Relaxed, |
1039 | ); |
1040 | if state & PARKED_BIT != 0 { |
1041 | let callback = |_, result: UnparkResult| { |
1042 | // Clear the parked bit if there no more parked threads |
1043 | if !result.have_more_threads { |
1044 | self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); |
1045 | } |
1046 | TOKEN_NORMAL |
1047 | }; |
1048 | // SAFETY: `callback` does not panic or call any function of `parking_lot`. |
1049 | unsafe { |
1050 | self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback); |
1051 | } |
1052 | } |
1053 | return false; |
1054 | } |
1055 | } |
1056 | } |
1057 | true |
1058 | } |
1059 | |
1060 | /// Common code for acquiring a lock |
1061 | #[inline ] |
1062 | fn lock_common( |
1063 | &self, |
1064 | timeout: Option<Instant>, |
1065 | token: ParkToken, |
1066 | mut try_lock: impl FnMut(&mut usize) -> bool, |
1067 | validate_flags: usize, |
1068 | ) -> bool { |
1069 | let mut spinwait = SpinWait::new(); |
1070 | let mut state = self.state.load(Ordering::Relaxed); |
1071 | loop { |
1072 | // Attempt to grab the lock |
1073 | if try_lock(&mut state) { |
1074 | return true; |
1075 | } |
1076 | |
1077 | // If there are no parked threads, try spinning a few times. |
1078 | if state & (PARKED_BIT | WRITER_PARKED_BIT) == 0 && spinwait.spin() { |
1079 | state = self.state.load(Ordering::Relaxed); |
1080 | continue; |
1081 | } |
1082 | |
1083 | // Set the parked bit |
1084 | if state & PARKED_BIT == 0 { |
1085 | if let Err(x) = self.state.compare_exchange_weak( |
1086 | state, |
1087 | state | PARKED_BIT, |
1088 | Ordering::Relaxed, |
1089 | Ordering::Relaxed, |
1090 | ) { |
1091 | state = x; |
1092 | continue; |
1093 | } |
1094 | } |
1095 | |
1096 | // Park our thread until we are woken up by an unlock |
1097 | let addr = self as *const _ as usize; |
1098 | let validate = || { |
1099 | let state = self.state.load(Ordering::Relaxed); |
1100 | state & PARKED_BIT != 0 && (state & validate_flags != 0) |
1101 | }; |
1102 | let before_sleep = || {}; |
1103 | let timed_out = |_, was_last_thread| { |
1104 | // Clear the parked bit if we were the last parked thread |
1105 | if was_last_thread { |
1106 | self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); |
1107 | } |
1108 | }; |
1109 | |
1110 | // SAFETY: |
1111 | // * `addr` is an address we control. |
1112 | // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`. |
1113 | // * `before_sleep` does not call `park`, nor does it panic. |
1114 | let park_result = unsafe { |
1115 | parking_lot_core::park(addr, validate, before_sleep, timed_out, token, timeout) |
1116 | }; |
1117 | match park_result { |
1118 | // The thread that unparked us passed the lock on to us |
1119 | // directly without unlocking it. |
1120 | ParkResult::Unparked(TOKEN_HANDOFF) => return true, |
1121 | |
1122 | // We were unparked normally, try acquiring the lock again |
1123 | ParkResult::Unparked(_) => (), |
1124 | |
1125 | // The validation function failed, try locking again |
1126 | ParkResult::Invalid => (), |
1127 | |
1128 | // Timeout expired |
1129 | ParkResult::TimedOut => return false, |
1130 | } |
1131 | |
1132 | // Loop back and try locking again |
1133 | spinwait.reset(); |
1134 | state = self.state.load(Ordering::Relaxed); |
1135 | } |
1136 | } |
1137 | |
1138 | #[inline ] |
1139 | fn deadlock_acquire(&self) { |
1140 | unsafe { deadlock::acquire_resource(self as *const _ as usize) }; |
1141 | unsafe { deadlock::acquire_resource(self as *const _ as usize + 1) }; |
1142 | } |
1143 | |
1144 | #[inline ] |
1145 | fn deadlock_release(&self) { |
1146 | unsafe { deadlock::release_resource(self as *const _ as usize) }; |
1147 | unsafe { deadlock::release_resource(self as *const _ as usize + 1) }; |
1148 | } |
1149 | } |
1150 | |