1use crate::sync::atomic::{
2 AtomicU32,
3 Ordering::{Acquire, Relaxed, Release},
4};
5use crate::sys::futex::{futex_wait, futex_wake, futex_wake_all};
6
7pub struct RwLock {
8 // The state consists of a 30-bit reader counter, a 'readers waiting' flag, and a 'writers waiting' flag.
9 // Bits 0..30:
10 // 0: Unlocked
11 // 1..=0x3FFF_FFFE: Locked by N readers
12 // 0x3FFF_FFFF: Write locked
13 // Bit 30: Readers are waiting on this futex.
14 // Bit 31: Writers are waiting on the writer_notify futex.
15 state: AtomicU32,
16 // The 'condition variable' to notify writers through.
17 // Incremented on every signal.
18 writer_notify: AtomicU32,
19}
20
21const READ_LOCKED: u32 = 1;
22const MASK: u32 = (1 << 30) - 1;
23const WRITE_LOCKED: u32 = MASK;
24const MAX_READERS: u32 = MASK - 1;
25const READERS_WAITING: u32 = 1 << 30;
26const WRITERS_WAITING: u32 = 1 << 31;
27
28#[inline]
29fn is_unlocked(state: u32) -> bool {
30 state & MASK == 0
31}
32
33#[inline]
34fn is_write_locked(state: u32) -> bool {
35 state & MASK == WRITE_LOCKED
36}
37
38#[inline]
39fn has_readers_waiting(state: u32) -> bool {
40 state & READERS_WAITING != 0
41}
42
43#[inline]
44fn has_writers_waiting(state: u32) -> bool {
45 state & WRITERS_WAITING != 0
46}
47
48#[inline]
49fn is_read_lockable(state: u32) -> bool {
50 // This also returns false if the counter could overflow if we tried to read lock it.
51 //
52 // We don't allow read-locking if there's readers waiting, even if the lock is unlocked
53 // and there's no writers waiting. The only situation when this happens is after unlocking,
54 // at which point the unlocking thread might be waking up writers, which have priority over readers.
55 // The unlocking thread will clear the readers waiting bit and wake up readers, if necessary.
56 state & MASK < MAX_READERS && !has_readers_waiting(state) && !has_writers_waiting(state)
57}
58
59#[inline]
60fn has_reached_max_readers(state: u32) -> bool {
61 state & MASK == MAX_READERS
62}
63
64impl RwLock {
65 #[inline]
66 pub const fn new() -> Self {
67 Self { state: AtomicU32::new(0), writer_notify: AtomicU32::new(0) }
68 }
69
70 #[inline]
71 pub fn try_read(&self) -> bool {
72 self.state
73 .fetch_update(Acquire, Relaxed, |s| is_read_lockable(s).then(|| s + READ_LOCKED))
74 .is_ok()
75 }
76
77 #[inline]
78 pub fn read(&self) {
79 let state = self.state.load(Relaxed);
80 if !is_read_lockable(state)
81 || self
82 .state
83 .compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
84 .is_err()
85 {
86 self.read_contended();
87 }
88 }
89
90 #[inline]
91 pub unsafe fn read_unlock(&self) {
92 let state = self.state.fetch_sub(READ_LOCKED, Release) - READ_LOCKED;
93
94 // It's impossible for a reader to be waiting on a read-locked RwLock,
95 // except if there is also a writer waiting.
96 debug_assert!(!has_readers_waiting(state) || has_writers_waiting(state));
97
98 // Wake up a writer if we were the last reader and there's a writer waiting.
99 if is_unlocked(state) && has_writers_waiting(state) {
100 self.wake_writer_or_readers(state);
101 }
102 }
103
104 #[cold]
105 fn read_contended(&self) {
106 let mut state = self.spin_read();
107
108 loop {
109 // If we can lock it, lock it.
110 if is_read_lockable(state) {
111 match self.state.compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
112 {
113 Ok(_) => return, // Locked!
114 Err(s) => {
115 state = s;
116 continue;
117 }
118 }
119 }
120
121 // Check for overflow.
122 if has_reached_max_readers(state) {
123 panic!("too many active read locks on RwLock");
124 }
125
126 // Make sure the readers waiting bit is set before we go to sleep.
127 if !has_readers_waiting(state) {
128 if let Err(s) =
129 self.state.compare_exchange(state, state | READERS_WAITING, Relaxed, Relaxed)
130 {
131 state = s;
132 continue;
133 }
134 }
135
136 // Wait for the state to change.
137 futex_wait(&self.state, state | READERS_WAITING, None);
138
139 // Spin again after waking up.
140 state = self.spin_read();
141 }
142 }
143
144 #[inline]
145 pub fn try_write(&self) -> bool {
146 self.state
147 .fetch_update(Acquire, Relaxed, |s| is_unlocked(s).then(|| s + WRITE_LOCKED))
148 .is_ok()
149 }
150
151 #[inline]
152 pub fn write(&self) {
153 if self.state.compare_exchange_weak(0, WRITE_LOCKED, Acquire, Relaxed).is_err() {
154 self.write_contended();
155 }
156 }
157
158 #[inline]
159 pub unsafe fn write_unlock(&self) {
160 let state = self.state.fetch_sub(WRITE_LOCKED, Release) - WRITE_LOCKED;
161
162 debug_assert!(is_unlocked(state));
163
164 if has_writers_waiting(state) || has_readers_waiting(state) {
165 self.wake_writer_or_readers(state);
166 }
167 }
168
169 #[cold]
170 fn write_contended(&self) {
171 let mut state = self.spin_write();
172
173 let mut other_writers_waiting = 0;
174
175 loop {
176 // If it's unlocked, we try to lock it.
177 if is_unlocked(state) {
178 match self.state.compare_exchange_weak(
179 state,
180 state | WRITE_LOCKED | other_writers_waiting,
181 Acquire,
182 Relaxed,
183 ) {
184 Ok(_) => return, // Locked!
185 Err(s) => {
186 state = s;
187 continue;
188 }
189 }
190 }
191
192 // Set the waiting bit indicating that we're waiting on it.
193 if !has_writers_waiting(state) {
194 if let Err(s) =
195 self.state.compare_exchange(state, state | WRITERS_WAITING, Relaxed, Relaxed)
196 {
197 state = s;
198 continue;
199 }
200 }
201
202 // Other writers might be waiting now too, so we should make sure
203 // we keep that bit on once we manage lock it.
204 other_writers_waiting = WRITERS_WAITING;
205
206 // Examine the notification counter before we check if `state` has changed,
207 // to make sure we don't miss any notifications.
208 let seq = self.writer_notify.load(Acquire);
209
210 // Don't go to sleep if the lock has become available,
211 // or if the writers waiting bit is no longer set.
212 state = self.state.load(Relaxed);
213 if is_unlocked(state) || !has_writers_waiting(state) {
214 continue;
215 }
216
217 // Wait for the state to change.
218 futex_wait(&self.writer_notify, seq, None);
219
220 // Spin again after waking up.
221 state = self.spin_write();
222 }
223 }
224
225 /// Wake up waiting threads after unlocking.
226 ///
227 /// If both are waiting, this will wake up only one writer, but will fall
228 /// back to waking up readers if there was no writer to wake up.
229 #[cold]
230 fn wake_writer_or_readers(&self, mut state: u32) {
231 assert!(is_unlocked(state));
232
233 // The readers waiting bit might be turned on at any point now,
234 // since readers will block when there's anything waiting.
235 // Writers will just lock the lock though, regardless of the waiting bits,
236 // so we don't have to worry about the writer waiting bit.
237 //
238 // If the lock gets locked in the meantime, we don't have to do
239 // anything, because then the thread that locked the lock will take
240 // care of waking up waiters when it unlocks.
241
242 // If only writers are waiting, wake one of them up.
243 if state == WRITERS_WAITING {
244 match self.state.compare_exchange(state, 0, Relaxed, Relaxed) {
245 Ok(_) => {
246 self.wake_writer();
247 return;
248 }
249 Err(s) => {
250 // Maybe some readers are now waiting too. So, continue to the next `if`.
251 state = s;
252 }
253 }
254 }
255
256 // If both writers and readers are waiting, leave the readers waiting
257 // and only wake up one writer.
258 if state == READERS_WAITING + WRITERS_WAITING {
259 if self.state.compare_exchange(state, READERS_WAITING, Relaxed, Relaxed).is_err() {
260 // The lock got locked. Not our problem anymore.
261 return;
262 }
263 if self.wake_writer() {
264 return;
265 }
266 // No writers were actually blocked on futex_wait, so we continue
267 // to wake up readers instead, since we can't be sure if we notified a writer.
268 state = READERS_WAITING;
269 }
270
271 // If readers are waiting, wake them all up.
272 if state == READERS_WAITING {
273 if self.state.compare_exchange(state, 0, Relaxed, Relaxed).is_ok() {
274 futex_wake_all(&self.state);
275 }
276 }
277 }
278
279 /// This wakes one writer and returns true if we woke up a writer that was
280 /// blocked on futex_wait.
281 ///
282 /// If this returns false, it might still be the case that we notified a
283 /// writer that was about to go to sleep.
284 fn wake_writer(&self) -> bool {
285 self.writer_notify.fetch_add(1, Release);
286 futex_wake(&self.writer_notify)
287 // Note that FreeBSD and DragonFlyBSD don't tell us whether they woke
288 // up any threads or not, and always return `false` here. That still
289 // results in correct behaviour: it just means readers get woken up as
290 // well in case both readers and writers were waiting.
291 }
292
293 /// Spin for a while, but stop directly at the given condition.
294 #[inline]
295 fn spin_until(&self, f: impl Fn(u32) -> bool) -> u32 {
296 let mut spin = 100; // Chosen by fair dice roll.
297 loop {
298 let state = self.state.load(Relaxed);
299 if f(state) || spin == 0 {
300 return state;
301 }
302 crate::hint::spin_loop();
303 spin -= 1;
304 }
305 }
306
307 #[inline]
308 fn spin_write(&self) -> u32 {
309 // Stop spinning when it's unlocked or when there's waiting writers, to keep things somewhat fair.
310 self.spin_until(|state| is_unlocked(state) || has_writers_waiting(state))
311 }
312
313 #[inline]
314 fn spin_read(&self) -> u32 {
315 // Stop spinning when it's unlocked or read locked, or when there's waiting threads.
316 self.spin_until(|state| {
317 !is_write_locked(state) || has_readers_waiting(state) || has_writers_waiting(state)
318 })
319 }
320}
321