1 | // Copyright 2016 Amanieu d'Antras |
2 | // |
3 | // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or |
4 | // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or |
5 | // http://opensource.org/licenses/MIT>, at your option. This file may not be |
6 | // copied, modified, or distributed except according to those terms. |
7 | |
8 | use crate::elision::{have_elision, AtomicElisionExt}; |
9 | use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL}; |
10 | use crate::util; |
11 | use core::{ |
12 | cell::Cell, |
13 | sync::atomic::{AtomicUsize, Ordering}, |
14 | }; |
15 | use lock_api::{RawRwLock as RawRwLock_, RawRwLockUpgrade}; |
16 | use parking_lot_core::{ |
17 | self, deadlock, FilterOp, ParkResult, ParkToken, SpinWait, UnparkResult, UnparkToken, |
18 | }; |
19 | use std::time::{Duration, Instant}; |
20 | |
21 | // This reader-writer lock implementation is based on Boost's upgrade_mutex: |
22 | // https://github.com/boostorg/thread/blob/fc08c1fe2840baeeee143440fba31ef9e9a813c8/include/boost/thread/v2/shared_mutex.hpp#L432 |
23 | // |
24 | // This implementation uses 2 wait queues, one at key [addr] and one at key |
25 | // [addr + 1]. The primary queue is used for all new waiting threads, and the |
26 | // secondary queue is used by the thread which has acquired WRITER_BIT but is |
27 | // waiting for the remaining readers to exit the lock. |
28 | // |
29 | // This implementation is fair between readers and writers since it uses the |
30 | // order in which threads first started queuing to alternate between read phases |
31 | // and write phases. In particular is it not vulnerable to write starvation |
32 | // since readers will block if there is a pending writer. |
33 | |
34 | // There is at least one thread in the main queue. |
35 | const PARKED_BIT: usize = 0b0001; |
36 | // There is a parked thread holding WRITER_BIT. WRITER_BIT must be set. |
37 | const WRITER_PARKED_BIT: usize = 0b0010; |
38 | // A reader is holding an upgradable lock. The reader count must be non-zero and |
39 | // WRITER_BIT must not be set. |
40 | const UPGRADABLE_BIT: usize = 0b0100; |
41 | // If the reader count is zero: a writer is currently holding an exclusive lock. |
42 | // Otherwise: a writer is waiting for the remaining readers to exit the lock. |
43 | const WRITER_BIT: usize = 0b1000; |
44 | // Mask of bits used to count readers. |
45 | const READERS_MASK: usize = !0b1111; |
46 | // Base unit for counting readers. |
47 | const ONE_READER: usize = 0b10000; |
48 | |
49 | // Token indicating what type of lock a queued thread is trying to acquire |
50 | const TOKEN_SHARED: ParkToken = ParkToken(ONE_READER); |
51 | const TOKEN_EXCLUSIVE: ParkToken = ParkToken(WRITER_BIT); |
52 | const TOKEN_UPGRADABLE: ParkToken = ParkToken(ONE_READER | UPGRADABLE_BIT); |
53 | |
54 | /// Raw reader-writer lock type backed by the parking lot. |
55 | pub struct RawRwLock { |
56 | state: AtomicUsize, |
57 | } |
58 | |
59 | unsafe impl lock_api::RawRwLock for RawRwLock { |
60 | const INIT: RawRwLock = RawRwLock { |
61 | state: AtomicUsize::new(0), |
62 | }; |
63 | |
64 | type GuardMarker = crate::GuardMarker; |
65 | |
66 | #[inline ] |
67 | fn lock_exclusive(&self) { |
68 | if self |
69 | .state |
70 | .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) |
71 | .is_err() |
72 | { |
73 | let result = self.lock_exclusive_slow(None); |
74 | debug_assert!(result); |
75 | } |
76 | self.deadlock_acquire(); |
77 | } |
78 | |
79 | #[inline ] |
80 | fn try_lock_exclusive(&self) -> bool { |
81 | if self |
82 | .state |
83 | .compare_exchange(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) |
84 | .is_ok() |
85 | { |
86 | self.deadlock_acquire(); |
87 | true |
88 | } else { |
89 | false |
90 | } |
91 | } |
92 | |
93 | #[inline ] |
94 | unsafe fn unlock_exclusive(&self) { |
95 | self.deadlock_release(); |
96 | if self |
97 | .state |
98 | .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed) |
99 | .is_ok() |
100 | { |
101 | return; |
102 | } |
103 | self.unlock_exclusive_slow(false); |
104 | } |
105 | |
106 | #[inline ] |
107 | fn lock_shared(&self) { |
108 | if !self.try_lock_shared_fast(false) { |
109 | let result = self.lock_shared_slow(false, None); |
110 | debug_assert!(result); |
111 | } |
112 | self.deadlock_acquire(); |
113 | } |
114 | |
115 | #[inline ] |
116 | fn try_lock_shared(&self) -> bool { |
117 | let result = if self.try_lock_shared_fast(false) { |
118 | true |
119 | } else { |
120 | self.try_lock_shared_slow(false) |
121 | }; |
122 | if result { |
123 | self.deadlock_acquire(); |
124 | } |
125 | result |
126 | } |
127 | |
128 | #[inline ] |
129 | unsafe fn unlock_shared(&self) { |
130 | self.deadlock_release(); |
131 | let state = if have_elision() { |
132 | self.state.elision_fetch_sub_release(ONE_READER) |
133 | } else { |
134 | self.state.fetch_sub(ONE_READER, Ordering::Release) |
135 | }; |
136 | if state & (READERS_MASK | WRITER_PARKED_BIT) == (ONE_READER | WRITER_PARKED_BIT) { |
137 | self.unlock_shared_slow(); |
138 | } |
139 | } |
140 | |
141 | #[inline ] |
142 | fn is_locked(&self) -> bool { |
143 | let state = self.state.load(Ordering::Relaxed); |
144 | state & (WRITER_BIT | READERS_MASK) != 0 |
145 | } |
146 | |
147 | #[inline ] |
148 | fn is_locked_exclusive(&self) -> bool { |
149 | let state = self.state.load(Ordering::Relaxed); |
150 | state & (WRITER_BIT) != 0 |
151 | } |
152 | } |
153 | |
154 | unsafe impl lock_api::RawRwLockFair for RawRwLock { |
155 | #[inline ] |
156 | unsafe fn unlock_shared_fair(&self) { |
157 | // Shared unlocking is always fair in this implementation. |
158 | self.unlock_shared(); |
159 | } |
160 | |
161 | #[inline ] |
162 | unsafe fn unlock_exclusive_fair(&self) { |
163 | self.deadlock_release(); |
164 | if self |
165 | .state |
166 | .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed) |
167 | .is_ok() |
168 | { |
169 | return; |
170 | } |
171 | self.unlock_exclusive_slow(true); |
172 | } |
173 | |
174 | #[inline ] |
175 | unsafe fn bump_shared(&self) { |
176 | if self.state.load(Ordering::Relaxed) & (READERS_MASK | WRITER_BIT) |
177 | == ONE_READER | WRITER_BIT |
178 | { |
179 | self.bump_shared_slow(); |
180 | } |
181 | } |
182 | |
183 | #[inline ] |
184 | unsafe fn bump_exclusive(&self) { |
185 | if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 { |
186 | self.bump_exclusive_slow(); |
187 | } |
188 | } |
189 | } |
190 | |
191 | unsafe impl lock_api::RawRwLockDowngrade for RawRwLock { |
192 | #[inline ] |
193 | unsafe fn downgrade(&self) { |
194 | let state: usize = self |
195 | .state |
196 | .fetch_add(ONE_READER - WRITER_BIT, order:Ordering::Release); |
197 | |
198 | // Wake up parked shared and upgradable threads if there are any |
199 | if state & PARKED_BIT != 0 { |
200 | self.downgrade_slow(); |
201 | } |
202 | } |
203 | } |
204 | |
205 | unsafe impl lock_api::RawRwLockTimed for RawRwLock { |
206 | type Duration = Duration; |
207 | type Instant = Instant; |
208 | |
209 | #[inline ] |
210 | fn try_lock_shared_for(&self, timeout: Self::Duration) -> bool { |
211 | let result = if self.try_lock_shared_fast(false) { |
212 | true |
213 | } else { |
214 | self.lock_shared_slow(false, util::to_deadline(timeout)) |
215 | }; |
216 | if result { |
217 | self.deadlock_acquire(); |
218 | } |
219 | result |
220 | } |
221 | |
222 | #[inline ] |
223 | fn try_lock_shared_until(&self, timeout: Self::Instant) -> bool { |
224 | let result = if self.try_lock_shared_fast(false) { |
225 | true |
226 | } else { |
227 | self.lock_shared_slow(false, Some(timeout)) |
228 | }; |
229 | if result { |
230 | self.deadlock_acquire(); |
231 | } |
232 | result |
233 | } |
234 | |
235 | #[inline ] |
236 | fn try_lock_exclusive_for(&self, timeout: Duration) -> bool { |
237 | let result = if self |
238 | .state |
239 | .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) |
240 | .is_ok() |
241 | { |
242 | true |
243 | } else { |
244 | self.lock_exclusive_slow(util::to_deadline(timeout)) |
245 | }; |
246 | if result { |
247 | self.deadlock_acquire(); |
248 | } |
249 | result |
250 | } |
251 | |
252 | #[inline ] |
253 | fn try_lock_exclusive_until(&self, timeout: Instant) -> bool { |
254 | let result = if self |
255 | .state |
256 | .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) |
257 | .is_ok() |
258 | { |
259 | true |
260 | } else { |
261 | self.lock_exclusive_slow(Some(timeout)) |
262 | }; |
263 | if result { |
264 | self.deadlock_acquire(); |
265 | } |
266 | result |
267 | } |
268 | } |
269 | |
270 | unsafe impl lock_api::RawRwLockRecursive for RawRwLock { |
271 | #[inline ] |
272 | fn lock_shared_recursive(&self) { |
273 | if !self.try_lock_shared_fast(recursive:true) { |
274 | let result: bool = self.lock_shared_slow(recursive:true, timeout:None); |
275 | debug_assert!(result); |
276 | } |
277 | self.deadlock_acquire(); |
278 | } |
279 | |
280 | #[inline ] |
281 | fn try_lock_shared_recursive(&self) -> bool { |
282 | let result: bool = if self.try_lock_shared_fast(recursive:true) { |
283 | true |
284 | } else { |
285 | self.try_lock_shared_slow(recursive:true) |
286 | }; |
287 | if result { |
288 | self.deadlock_acquire(); |
289 | } |
290 | result |
291 | } |
292 | } |
293 | |
294 | unsafe impl lock_api::RawRwLockRecursiveTimed for RawRwLock { |
295 | #[inline ] |
296 | fn try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool { |
297 | let result = if self.try_lock_shared_fast(true) { |
298 | true |
299 | } else { |
300 | self.lock_shared_slow(true, util::to_deadline(timeout)) |
301 | }; |
302 | if result { |
303 | self.deadlock_acquire(); |
304 | } |
305 | result |
306 | } |
307 | |
308 | #[inline ] |
309 | fn try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool { |
310 | let result = if self.try_lock_shared_fast(true) { |
311 | true |
312 | } else { |
313 | self.lock_shared_slow(true, Some(timeout)) |
314 | }; |
315 | if result { |
316 | self.deadlock_acquire(); |
317 | } |
318 | result |
319 | } |
320 | } |
321 | |
322 | unsafe impl lock_api::RawRwLockUpgrade for RawRwLock { |
323 | #[inline ] |
324 | fn lock_upgradable(&self) { |
325 | if !self.try_lock_upgradable_fast() { |
326 | let result = self.lock_upgradable_slow(None); |
327 | debug_assert!(result); |
328 | } |
329 | self.deadlock_acquire(); |
330 | } |
331 | |
332 | #[inline ] |
333 | fn try_lock_upgradable(&self) -> bool { |
334 | let result = if self.try_lock_upgradable_fast() { |
335 | true |
336 | } else { |
337 | self.try_lock_upgradable_slow() |
338 | }; |
339 | if result { |
340 | self.deadlock_acquire(); |
341 | } |
342 | result |
343 | } |
344 | |
345 | #[inline ] |
346 | unsafe fn unlock_upgradable(&self) { |
347 | self.deadlock_release(); |
348 | let state = self.state.load(Ordering::Relaxed); |
349 | #[allow (clippy::collapsible_if)] |
350 | if state & PARKED_BIT == 0 { |
351 | if self |
352 | .state |
353 | .compare_exchange_weak( |
354 | state, |
355 | state - (ONE_READER | UPGRADABLE_BIT), |
356 | Ordering::Release, |
357 | Ordering::Relaxed, |
358 | ) |
359 | .is_ok() |
360 | { |
361 | return; |
362 | } |
363 | } |
364 | self.unlock_upgradable_slow(false); |
365 | } |
366 | |
367 | #[inline ] |
368 | unsafe fn upgrade(&self) { |
369 | let state = self.state.fetch_sub( |
370 | (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, |
371 | Ordering::Acquire, |
372 | ); |
373 | if state & READERS_MASK != ONE_READER { |
374 | let result = self.upgrade_slow(None); |
375 | debug_assert!(result); |
376 | } |
377 | } |
378 | |
379 | #[inline ] |
380 | unsafe fn try_upgrade(&self) -> bool { |
381 | if self |
382 | .state |
383 | .compare_exchange_weak( |
384 | ONE_READER | UPGRADABLE_BIT, |
385 | WRITER_BIT, |
386 | Ordering::Acquire, |
387 | Ordering::Relaxed, |
388 | ) |
389 | .is_ok() |
390 | { |
391 | true |
392 | } else { |
393 | self.try_upgrade_slow() |
394 | } |
395 | } |
396 | } |
397 | |
398 | unsafe impl lock_api::RawRwLockUpgradeFair for RawRwLock { |
399 | #[inline ] |
400 | unsafe fn unlock_upgradable_fair(&self) { |
401 | self.deadlock_release(); |
402 | let state = self.state.load(Ordering::Relaxed); |
403 | #[allow (clippy::collapsible_if)] |
404 | if state & PARKED_BIT == 0 { |
405 | if self |
406 | .state |
407 | .compare_exchange_weak( |
408 | state, |
409 | state - (ONE_READER | UPGRADABLE_BIT), |
410 | Ordering::Release, |
411 | Ordering::Relaxed, |
412 | ) |
413 | .is_ok() |
414 | { |
415 | return; |
416 | } |
417 | } |
418 | self.unlock_upgradable_slow(false); |
419 | } |
420 | |
421 | #[inline ] |
422 | unsafe fn bump_upgradable(&self) { |
423 | if self.state.load(Ordering::Relaxed) == ONE_READER | UPGRADABLE_BIT | PARKED_BIT { |
424 | self.bump_upgradable_slow(); |
425 | } |
426 | } |
427 | } |
428 | |
429 | unsafe impl lock_api::RawRwLockUpgradeDowngrade for RawRwLock { |
430 | #[inline ] |
431 | unsafe fn downgrade_upgradable(&self) { |
432 | let state: usize = self.state.fetch_sub(UPGRADABLE_BIT, order:Ordering::Relaxed); |
433 | |
434 | // Wake up parked upgradable threads if there are any |
435 | if state & PARKED_BIT != 0 { |
436 | self.downgrade_slow(); |
437 | } |
438 | } |
439 | |
440 | #[inline ] |
441 | unsafe fn downgrade_to_upgradable(&self) { |
442 | let state: usize = self.state.fetch_add( |
443 | (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, |
444 | order:Ordering::Release, |
445 | ); |
446 | |
447 | // Wake up parked shared threads if there are any |
448 | if state & PARKED_BIT != 0 { |
449 | self.downgrade_to_upgradable_slow(); |
450 | } |
451 | } |
452 | } |
453 | |
454 | unsafe impl lock_api::RawRwLockUpgradeTimed for RawRwLock { |
455 | #[inline ] |
456 | fn try_lock_upgradable_until(&self, timeout: Instant) -> bool { |
457 | let result = if self.try_lock_upgradable_fast() { |
458 | true |
459 | } else { |
460 | self.lock_upgradable_slow(Some(timeout)) |
461 | }; |
462 | if result { |
463 | self.deadlock_acquire(); |
464 | } |
465 | result |
466 | } |
467 | |
468 | #[inline ] |
469 | fn try_lock_upgradable_for(&self, timeout: Duration) -> bool { |
470 | let result = if self.try_lock_upgradable_fast() { |
471 | true |
472 | } else { |
473 | self.lock_upgradable_slow(util::to_deadline(timeout)) |
474 | }; |
475 | if result { |
476 | self.deadlock_acquire(); |
477 | } |
478 | result |
479 | } |
480 | |
481 | #[inline ] |
482 | unsafe fn try_upgrade_until(&self, timeout: Instant) -> bool { |
483 | let state = self.state.fetch_sub( |
484 | (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, |
485 | Ordering::Relaxed, |
486 | ); |
487 | if state & READERS_MASK == ONE_READER { |
488 | true |
489 | } else { |
490 | self.upgrade_slow(Some(timeout)) |
491 | } |
492 | } |
493 | |
494 | #[inline ] |
495 | unsafe fn try_upgrade_for(&self, timeout: Duration) -> bool { |
496 | let state = self.state.fetch_sub( |
497 | (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, |
498 | Ordering::Relaxed, |
499 | ); |
500 | if state & READERS_MASK == ONE_READER { |
501 | true |
502 | } else { |
503 | self.upgrade_slow(util::to_deadline(timeout)) |
504 | } |
505 | } |
506 | } |
507 | |
508 | impl RawRwLock { |
509 | #[inline (always)] |
510 | fn try_lock_shared_fast(&self, recursive: bool) -> bool { |
511 | let state = self.state.load(Ordering::Relaxed); |
512 | |
513 | // We can't allow grabbing a shared lock if there is a writer, even if |
514 | // the writer is still waiting for the remaining readers to exit. |
515 | if state & WRITER_BIT != 0 { |
516 | // To allow recursive locks, we make an exception and allow readers |
517 | // to skip ahead of a pending writer to avoid deadlocking, at the |
518 | // cost of breaking the fairness guarantees. |
519 | if !recursive || state & READERS_MASK == 0 { |
520 | return false; |
521 | } |
522 | } |
523 | |
524 | // Use hardware lock elision to avoid cache conflicts when multiple |
525 | // readers try to acquire the lock. We only do this if the lock is |
526 | // completely empty since elision handles conflicts poorly. |
527 | if have_elision() && state == 0 { |
528 | self.state |
529 | .elision_compare_exchange_acquire(0, ONE_READER) |
530 | .is_ok() |
531 | } else if let Some(new_state) = state.checked_add(ONE_READER) { |
532 | self.state |
533 | .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed) |
534 | .is_ok() |
535 | } else { |
536 | false |
537 | } |
538 | } |
539 | |
540 | #[cold ] |
541 | fn try_lock_shared_slow(&self, recursive: bool) -> bool { |
542 | let mut state = self.state.load(Ordering::Relaxed); |
543 | loop { |
544 | // This mirrors the condition in try_lock_shared_fast |
545 | #[allow (clippy::collapsible_if)] |
546 | if state & WRITER_BIT != 0 { |
547 | if !recursive || state & READERS_MASK == 0 { |
548 | return false; |
549 | } |
550 | } |
551 | if have_elision() && state == 0 { |
552 | match self.state.elision_compare_exchange_acquire(0, ONE_READER) { |
553 | Ok(_) => return true, |
554 | Err(x) => state = x, |
555 | } |
556 | } else { |
557 | match self.state.compare_exchange_weak( |
558 | state, |
559 | state |
560 | .checked_add(ONE_READER) |
561 | .expect("RwLock reader count overflow" ), |
562 | Ordering::Acquire, |
563 | Ordering::Relaxed, |
564 | ) { |
565 | Ok(_) => return true, |
566 | Err(x) => state = x, |
567 | } |
568 | } |
569 | } |
570 | } |
571 | |
572 | #[inline (always)] |
573 | fn try_lock_upgradable_fast(&self) -> bool { |
574 | let state = self.state.load(Ordering::Relaxed); |
575 | |
576 | // We can't grab an upgradable lock if there is already a writer or |
577 | // upgradable reader. |
578 | if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { |
579 | return false; |
580 | } |
581 | |
582 | if let Some(new_state) = state.checked_add(ONE_READER | UPGRADABLE_BIT) { |
583 | self.state |
584 | .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed) |
585 | .is_ok() |
586 | } else { |
587 | false |
588 | } |
589 | } |
590 | |
591 | #[cold ] |
592 | fn try_lock_upgradable_slow(&self) -> bool { |
593 | let mut state = self.state.load(Ordering::Relaxed); |
594 | loop { |
595 | // This mirrors the condition in try_lock_upgradable_fast |
596 | if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { |
597 | return false; |
598 | } |
599 | |
600 | match self.state.compare_exchange_weak( |
601 | state, |
602 | state |
603 | .checked_add(ONE_READER | UPGRADABLE_BIT) |
604 | .expect("RwLock reader count overflow" ), |
605 | Ordering::Acquire, |
606 | Ordering::Relaxed, |
607 | ) { |
608 | Ok(_) => return true, |
609 | Err(x) => state = x, |
610 | } |
611 | } |
612 | } |
613 | |
614 | #[cold ] |
615 | fn lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool { |
616 | let try_lock = |state: &mut usize| { |
617 | loop { |
618 | if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { |
619 | return false; |
620 | } |
621 | |
622 | // Grab WRITER_BIT if it isn't set, even if there are parked threads. |
623 | match self.state.compare_exchange_weak( |
624 | *state, |
625 | *state | WRITER_BIT, |
626 | Ordering::Acquire, |
627 | Ordering::Relaxed, |
628 | ) { |
629 | Ok(_) => return true, |
630 | Err(x) => *state = x, |
631 | } |
632 | } |
633 | }; |
634 | |
635 | // Step 1: grab exclusive ownership of WRITER_BIT |
636 | let timed_out = !self.lock_common( |
637 | timeout, |
638 | TOKEN_EXCLUSIVE, |
639 | try_lock, |
640 | WRITER_BIT | UPGRADABLE_BIT, |
641 | ); |
642 | if timed_out { |
643 | return false; |
644 | } |
645 | |
646 | // Step 2: wait for all remaining readers to exit the lock. |
647 | self.wait_for_readers(timeout, 0) |
648 | } |
649 | |
650 | #[cold ] |
651 | fn unlock_exclusive_slow(&self, force_fair: bool) { |
652 | // There are threads to unpark. Try to unpark as many as we can. |
653 | let callback = |mut new_state, result: UnparkResult| { |
654 | // If we are using a fair unlock then we should keep the |
655 | // rwlock locked and hand it off to the unparked threads. |
656 | if result.unparked_threads != 0 && (force_fair || result.be_fair) { |
657 | if result.have_more_threads { |
658 | new_state |= PARKED_BIT; |
659 | } |
660 | self.state.store(new_state, Ordering::Release); |
661 | TOKEN_HANDOFF |
662 | } else { |
663 | // Clear the parked bit if there are no more parked threads. |
664 | if result.have_more_threads { |
665 | self.state.store(PARKED_BIT, Ordering::Release); |
666 | } else { |
667 | self.state.store(0, Ordering::Release); |
668 | } |
669 | TOKEN_NORMAL |
670 | } |
671 | }; |
672 | // SAFETY: `callback` does not panic or call into any function of `parking_lot`. |
673 | unsafe { |
674 | self.wake_parked_threads(0, callback); |
675 | } |
676 | } |
677 | |
678 | #[cold ] |
679 | fn lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool { |
680 | let try_lock = |state: &mut usize| { |
681 | let mut spinwait_shared = SpinWait::new(); |
682 | loop { |
683 | // Use hardware lock elision to avoid cache conflicts when multiple |
684 | // readers try to acquire the lock. We only do this if the lock is |
685 | // completely empty since elision handles conflicts poorly. |
686 | if have_elision() && *state == 0 { |
687 | match self.state.elision_compare_exchange_acquire(0, ONE_READER) { |
688 | Ok(_) => return true, |
689 | Err(x) => *state = x, |
690 | } |
691 | } |
692 | |
693 | // This is the same condition as try_lock_shared_fast |
694 | #[allow (clippy::collapsible_if)] |
695 | if *state & WRITER_BIT != 0 { |
696 | if !recursive || *state & READERS_MASK == 0 { |
697 | return false; |
698 | } |
699 | } |
700 | |
701 | if self |
702 | .state |
703 | .compare_exchange_weak( |
704 | *state, |
705 | state |
706 | .checked_add(ONE_READER) |
707 | .expect("RwLock reader count overflow" ), |
708 | Ordering::Acquire, |
709 | Ordering::Relaxed, |
710 | ) |
711 | .is_ok() |
712 | { |
713 | return true; |
714 | } |
715 | |
716 | // If there is high contention on the reader count then we want |
717 | // to leave some time between attempts to acquire the lock to |
718 | // let other threads make progress. |
719 | spinwait_shared.spin_no_yield(); |
720 | *state = self.state.load(Ordering::Relaxed); |
721 | } |
722 | }; |
723 | self.lock_common(timeout, TOKEN_SHARED, try_lock, WRITER_BIT) |
724 | } |
725 | |
726 | #[cold ] |
727 | fn unlock_shared_slow(&self) { |
728 | // At this point WRITER_PARKED_BIT is set and READER_MASK is empty. We |
729 | // just need to wake up a potentially sleeping pending writer. |
730 | // Using the 2nd key at addr + 1 |
731 | let addr = self as *const _ as usize + 1; |
732 | let callback = |_result: UnparkResult| { |
733 | // Clear the WRITER_PARKED_BIT here since there can only be one |
734 | // parked writer thread. |
735 | self.state.fetch_and(!WRITER_PARKED_BIT, Ordering::Relaxed); |
736 | TOKEN_NORMAL |
737 | }; |
738 | // SAFETY: |
739 | // * `addr` is an address we control. |
740 | // * `callback` does not panic or call into any function of `parking_lot`. |
741 | unsafe { |
742 | parking_lot_core::unpark_one(addr, callback); |
743 | } |
744 | } |
745 | |
746 | #[cold ] |
747 | fn lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool { |
748 | let try_lock = |state: &mut usize| { |
749 | let mut spinwait_shared = SpinWait::new(); |
750 | loop { |
751 | if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { |
752 | return false; |
753 | } |
754 | |
755 | if self |
756 | .state |
757 | .compare_exchange_weak( |
758 | *state, |
759 | state |
760 | .checked_add(ONE_READER | UPGRADABLE_BIT) |
761 | .expect("RwLock reader count overflow" ), |
762 | Ordering::Acquire, |
763 | Ordering::Relaxed, |
764 | ) |
765 | .is_ok() |
766 | { |
767 | return true; |
768 | } |
769 | |
770 | // If there is high contention on the reader count then we want |
771 | // to leave some time between attempts to acquire the lock to |
772 | // let other threads make progress. |
773 | spinwait_shared.spin_no_yield(); |
774 | *state = self.state.load(Ordering::Relaxed); |
775 | } |
776 | }; |
777 | self.lock_common( |
778 | timeout, |
779 | TOKEN_UPGRADABLE, |
780 | try_lock, |
781 | WRITER_BIT | UPGRADABLE_BIT, |
782 | ) |
783 | } |
784 | |
785 | #[cold ] |
786 | fn unlock_upgradable_slow(&self, force_fair: bool) { |
787 | // Just release the lock if there are no parked threads. |
788 | let mut state = self.state.load(Ordering::Relaxed); |
789 | while state & PARKED_BIT == 0 { |
790 | match self.state.compare_exchange_weak( |
791 | state, |
792 | state - (ONE_READER | UPGRADABLE_BIT), |
793 | Ordering::Release, |
794 | Ordering::Relaxed, |
795 | ) { |
796 | Ok(_) => return, |
797 | Err(x) => state = x, |
798 | } |
799 | } |
800 | |
801 | // There are threads to unpark. Try to unpark as many as we can. |
802 | let callback = |new_state, result: UnparkResult| { |
803 | // If we are using a fair unlock then we should keep the |
804 | // rwlock locked and hand it off to the unparked threads. |
805 | let mut state = self.state.load(Ordering::Relaxed); |
806 | if force_fair || result.be_fair { |
807 | // Fall back to normal unpark on overflow. Panicking is |
808 | // not allowed in parking_lot callbacks. |
809 | while let Some(mut new_state) = |
810 | (state - (ONE_READER | UPGRADABLE_BIT)).checked_add(new_state) |
811 | { |
812 | if result.have_more_threads { |
813 | new_state |= PARKED_BIT; |
814 | } else { |
815 | new_state &= !PARKED_BIT; |
816 | } |
817 | match self.state.compare_exchange_weak( |
818 | state, |
819 | new_state, |
820 | Ordering::Relaxed, |
821 | Ordering::Relaxed, |
822 | ) { |
823 | Ok(_) => return TOKEN_HANDOFF, |
824 | Err(x) => state = x, |
825 | } |
826 | } |
827 | } |
828 | |
829 | // Otherwise just release the upgradable lock and update PARKED_BIT. |
830 | loop { |
831 | let mut new_state = state - (ONE_READER | UPGRADABLE_BIT); |
832 | if result.have_more_threads { |
833 | new_state |= PARKED_BIT; |
834 | } else { |
835 | new_state &= !PARKED_BIT; |
836 | } |
837 | match self.state.compare_exchange_weak( |
838 | state, |
839 | new_state, |
840 | Ordering::Relaxed, |
841 | Ordering::Relaxed, |
842 | ) { |
843 | Ok(_) => return TOKEN_NORMAL, |
844 | Err(x) => state = x, |
845 | } |
846 | } |
847 | }; |
848 | // SAFETY: `callback` does not panic or call into any function of `parking_lot`. |
849 | unsafe { |
850 | self.wake_parked_threads(0, callback); |
851 | } |
852 | } |
853 | |
854 | #[cold ] |
855 | fn try_upgrade_slow(&self) -> bool { |
856 | let mut state = self.state.load(Ordering::Relaxed); |
857 | loop { |
858 | if state & READERS_MASK != ONE_READER { |
859 | return false; |
860 | } |
861 | match self.state.compare_exchange_weak( |
862 | state, |
863 | state - (ONE_READER | UPGRADABLE_BIT) + WRITER_BIT, |
864 | Ordering::Relaxed, |
865 | Ordering::Relaxed, |
866 | ) { |
867 | Ok(_) => return true, |
868 | Err(x) => state = x, |
869 | } |
870 | } |
871 | } |
872 | |
873 | #[cold ] |
874 | fn upgrade_slow(&self, timeout: Option<Instant>) -> bool { |
875 | self.wait_for_readers(timeout, ONE_READER | UPGRADABLE_BIT) |
876 | } |
877 | |
878 | #[cold ] |
879 | fn downgrade_slow(&self) { |
880 | // We only reach this point if PARKED_BIT is set. |
881 | let callback = |_, result: UnparkResult| { |
882 | // Clear the parked bit if there no more parked threads |
883 | if !result.have_more_threads { |
884 | self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); |
885 | } |
886 | TOKEN_NORMAL |
887 | }; |
888 | // SAFETY: `callback` does not panic or call into any function of `parking_lot`. |
889 | unsafe { |
890 | self.wake_parked_threads(ONE_READER, callback); |
891 | } |
892 | } |
893 | |
894 | #[cold ] |
895 | fn downgrade_to_upgradable_slow(&self) { |
896 | // We only reach this point if PARKED_BIT is set. |
897 | let callback = |_, result: UnparkResult| { |
898 | // Clear the parked bit if there no more parked threads |
899 | if !result.have_more_threads { |
900 | self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); |
901 | } |
902 | TOKEN_NORMAL |
903 | }; |
904 | // SAFETY: `callback` does not panic or call into any function of `parking_lot`. |
905 | unsafe { |
906 | self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback); |
907 | } |
908 | } |
909 | |
910 | #[cold ] |
911 | unsafe fn bump_shared_slow(&self) { |
912 | self.unlock_shared(); |
913 | self.lock_shared(); |
914 | } |
915 | |
916 | #[cold ] |
917 | fn bump_exclusive_slow(&self) { |
918 | self.deadlock_release(); |
919 | self.unlock_exclusive_slow(true); |
920 | self.lock_exclusive(); |
921 | } |
922 | |
923 | #[cold ] |
924 | fn bump_upgradable_slow(&self) { |
925 | self.deadlock_release(); |
926 | self.unlock_upgradable_slow(true); |
927 | self.lock_upgradable(); |
928 | } |
929 | |
930 | /// Common code for waking up parked threads after releasing `WRITER_BIT` or |
931 | /// `UPGRADABLE_BIT`. |
932 | /// |
933 | /// # Safety |
934 | /// |
935 | /// `callback` must uphold the requirements of the `callback` parameter to |
936 | /// `parking_lot_core::unpark_filter`. Meaning no panics or calls into any function in |
937 | /// `parking_lot`. |
938 | #[inline ] |
939 | unsafe fn wake_parked_threads( |
940 | &self, |
941 | new_state: usize, |
942 | callback: impl FnOnce(usize, UnparkResult) -> UnparkToken, |
943 | ) { |
944 | // We must wake up at least one upgrader or writer if there is one, |
945 | // otherwise they may end up parked indefinitely since unlock_shared |
946 | // does not call wake_parked_threads. |
947 | let new_state = Cell::new(new_state); |
948 | let addr = self as *const _ as usize; |
949 | let filter = |ParkToken(token)| { |
950 | let s = new_state.get(); |
951 | |
952 | // If we are waking up a writer, don't wake anything else. |
953 | if s & WRITER_BIT != 0 { |
954 | return FilterOp::Stop; |
955 | } |
956 | |
957 | // Otherwise wake *all* readers and one upgrader/writer. |
958 | if token & (UPGRADABLE_BIT | WRITER_BIT) != 0 && s & UPGRADABLE_BIT != 0 { |
959 | // Skip writers and upgradable readers if we already have |
960 | // a writer/upgradable reader. |
961 | FilterOp::Skip |
962 | } else { |
963 | new_state.set(s + token); |
964 | FilterOp::Unpark |
965 | } |
966 | }; |
967 | let callback = |result| callback(new_state.get(), result); |
968 | // SAFETY: |
969 | // * `addr` is an address we control. |
970 | // * `filter` does not panic or call into any function of `parking_lot`. |
971 | // * `callback` safety responsibility is on caller |
972 | parking_lot_core::unpark_filter(addr, filter, callback); |
973 | } |
974 | |
975 | // Common code for waiting for readers to exit the lock after acquiring |
976 | // WRITER_BIT. |
977 | #[inline ] |
978 | fn wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool { |
979 | // At this point WRITER_BIT is already set, we just need to wait for the |
980 | // remaining readers to exit the lock. |
981 | let mut spinwait = SpinWait::new(); |
982 | let mut state = self.state.load(Ordering::Acquire); |
983 | while state & READERS_MASK != 0 { |
984 | // Spin a few times to wait for readers to exit |
985 | if spinwait.spin() { |
986 | state = self.state.load(Ordering::Acquire); |
987 | continue; |
988 | } |
989 | |
990 | // Set the parked bit |
991 | if state & WRITER_PARKED_BIT == 0 { |
992 | if let Err(x) = self.state.compare_exchange_weak( |
993 | state, |
994 | state | WRITER_PARKED_BIT, |
995 | Ordering::Acquire, |
996 | Ordering::Acquire, |
997 | ) { |
998 | state = x; |
999 | continue; |
1000 | } |
1001 | } |
1002 | |
1003 | // Park our thread until we are woken up by an unlock |
1004 | // Using the 2nd key at addr + 1 |
1005 | let addr = self as *const _ as usize + 1; |
1006 | let validate = || { |
1007 | let state = self.state.load(Ordering::Relaxed); |
1008 | state & READERS_MASK != 0 && state & WRITER_PARKED_BIT != 0 |
1009 | }; |
1010 | let before_sleep = || {}; |
1011 | let timed_out = |_, _| {}; |
1012 | // SAFETY: |
1013 | // * `addr` is an address we control. |
1014 | // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`. |
1015 | // * `before_sleep` does not call `park`, nor does it panic. |
1016 | let park_result = unsafe { |
1017 | parking_lot_core::park( |
1018 | addr, |
1019 | validate, |
1020 | before_sleep, |
1021 | timed_out, |
1022 | TOKEN_EXCLUSIVE, |
1023 | timeout, |
1024 | ) |
1025 | }; |
1026 | match park_result { |
1027 | // We still need to re-check the state if we are unparked |
1028 | // since a previous writer timing-out could have allowed |
1029 | // another reader to sneak in before we parked. |
1030 | ParkResult::Unparked(_) | ParkResult::Invalid => { |
1031 | state = self.state.load(Ordering::Acquire); |
1032 | continue; |
1033 | } |
1034 | |
1035 | // Timeout expired |
1036 | ParkResult::TimedOut => { |
1037 | // We need to release WRITER_BIT and revert back to |
1038 | // our previous value. We also wake up any threads that |
1039 | // might be waiting on WRITER_BIT. |
1040 | let state = self.state.fetch_add( |
1041 | prev_value.wrapping_sub(WRITER_BIT | WRITER_PARKED_BIT), |
1042 | Ordering::Relaxed, |
1043 | ); |
1044 | if state & PARKED_BIT != 0 { |
1045 | let callback = |_, result: UnparkResult| { |
1046 | // Clear the parked bit if there no more parked threads |
1047 | if !result.have_more_threads { |
1048 | self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); |
1049 | } |
1050 | TOKEN_NORMAL |
1051 | }; |
1052 | // SAFETY: `callback` does not panic or call any function of `parking_lot`. |
1053 | unsafe { |
1054 | self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback); |
1055 | } |
1056 | } |
1057 | return false; |
1058 | } |
1059 | } |
1060 | } |
1061 | true |
1062 | } |
1063 | |
1064 | /// Common code for acquiring a lock |
1065 | #[inline ] |
1066 | fn lock_common( |
1067 | &self, |
1068 | timeout: Option<Instant>, |
1069 | token: ParkToken, |
1070 | mut try_lock: impl FnMut(&mut usize) -> bool, |
1071 | validate_flags: usize, |
1072 | ) -> bool { |
1073 | let mut spinwait = SpinWait::new(); |
1074 | let mut state = self.state.load(Ordering::Relaxed); |
1075 | loop { |
1076 | // Attempt to grab the lock |
1077 | if try_lock(&mut state) { |
1078 | return true; |
1079 | } |
1080 | |
1081 | // If there are no parked threads, try spinning a few times. |
1082 | if state & (PARKED_BIT | WRITER_PARKED_BIT) == 0 && spinwait.spin() { |
1083 | state = self.state.load(Ordering::Relaxed); |
1084 | continue; |
1085 | } |
1086 | |
1087 | // Set the parked bit |
1088 | if state & PARKED_BIT == 0 { |
1089 | if let Err(x) = self.state.compare_exchange_weak( |
1090 | state, |
1091 | state | PARKED_BIT, |
1092 | Ordering::Relaxed, |
1093 | Ordering::Relaxed, |
1094 | ) { |
1095 | state = x; |
1096 | continue; |
1097 | } |
1098 | } |
1099 | |
1100 | // Park our thread until we are woken up by an unlock |
1101 | let addr = self as *const _ as usize; |
1102 | let validate = || { |
1103 | let state = self.state.load(Ordering::Relaxed); |
1104 | state & PARKED_BIT != 0 && (state & validate_flags != 0) |
1105 | }; |
1106 | let before_sleep = || {}; |
1107 | let timed_out = |_, was_last_thread| { |
1108 | // Clear the parked bit if we were the last parked thread |
1109 | if was_last_thread { |
1110 | self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); |
1111 | } |
1112 | }; |
1113 | |
1114 | // SAFETY: |
1115 | // * `addr` is an address we control. |
1116 | // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`. |
1117 | // * `before_sleep` does not call `park`, nor does it panic. |
1118 | let park_result = unsafe { |
1119 | parking_lot_core::park(addr, validate, before_sleep, timed_out, token, timeout) |
1120 | }; |
1121 | match park_result { |
1122 | // The thread that unparked us passed the lock on to us |
1123 | // directly without unlocking it. |
1124 | ParkResult::Unparked(TOKEN_HANDOFF) => return true, |
1125 | |
1126 | // We were unparked normally, try acquiring the lock again |
1127 | ParkResult::Unparked(_) => (), |
1128 | |
1129 | // The validation function failed, try locking again |
1130 | ParkResult::Invalid => (), |
1131 | |
1132 | // Timeout expired |
1133 | ParkResult::TimedOut => return false, |
1134 | } |
1135 | |
1136 | // Loop back and try locking again |
1137 | spinwait.reset(); |
1138 | state = self.state.load(Ordering::Relaxed); |
1139 | } |
1140 | } |
1141 | |
1142 | #[inline ] |
1143 | fn deadlock_acquire(&self) { |
1144 | unsafe { deadlock::acquire_resource(self as *const _ as usize) }; |
1145 | unsafe { deadlock::acquire_resource(self as *const _ as usize + 1) }; |
1146 | } |
1147 | |
1148 | #[inline ] |
1149 | fn deadlock_release(&self) { |
1150 | unsafe { deadlock::release_resource(self as *const _ as usize) }; |
1151 | unsafe { deadlock::release_resource(self as *const _ as usize + 1) }; |
1152 | } |
1153 | } |
1154 | |