1 | // Copyright 2016 Amanieu d'Antras |
2 | // |
3 | // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or |
4 | // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or |
5 | // http://opensource.org/licenses/MIT>, at your option. This file may not be |
6 | // copied, modified, or distributed except according to those terms. |
7 | |
8 | use crate::spinwait::SpinWait; |
9 | use crate::thread_parker::{ThreadParker, ThreadParkerT, UnparkHandleT}; |
10 | use core::{ |
11 | cell::Cell, |
12 | mem, ptr, |
13 | sync::atomic::{fence, AtomicUsize, Ordering}, |
14 | }; |
15 | |
16 | struct ThreadData { |
17 | parker: ThreadParker, |
18 | |
19 | // Linked list of threads in the queue. The queue is split into two parts: |
20 | // the processed part and the unprocessed part. When new nodes are added to |
21 | // the list, they only have the next pointer set, and queue_tail is null. |
22 | // |
23 | // Nodes are processed with the queue lock held, which consists of setting |
24 | // the prev pointer for each node and setting the queue_tail pointer on the |
25 | // first processed node of the list. |
26 | // |
27 | // This setup allows nodes to be added to the queue without a lock, while |
28 | // still allowing O(1) removal of nodes from the processed part of the list. |
29 | // The only cost is the O(n) processing, but this only needs to be done |
30 | // once for each node, and therefore isn't too expensive. |
31 | queue_tail: Cell<*const ThreadData>, |
32 | prev: Cell<*const ThreadData>, |
33 | next: Cell<*const ThreadData>, |
34 | } |
35 | |
36 | impl ThreadData { |
37 | #[inline ] |
38 | fn new() -> ThreadData { |
39 | assert!(mem::align_of::<ThreadData>() > !QUEUE_MASK); |
40 | ThreadData { |
41 | parker: ThreadParker::new(), |
42 | queue_tail: Cell::new(ptr::null()), |
43 | prev: Cell::new(ptr::null()), |
44 | next: Cell::new(ptr::null()), |
45 | } |
46 | } |
47 | } |
48 | |
49 | // Invokes the given closure with a reference to the current thread `ThreadData`. |
50 | #[inline ] |
51 | fn with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T { |
52 | let mut thread_data_ptr: *const ThreadData = ptr::null(); |
53 | // If ThreadData is expensive to construct, then we want to use a cached |
54 | // version in thread-local storage if possible. |
55 | if !ThreadParker::IS_CHEAP_TO_CONSTRUCT { |
56 | thread_local!(static THREAD_DATA: ThreadData = ThreadData::new()); |
57 | if let Ok(tls_thread_data: *const ThreadData) = THREAD_DATA.try_with(|x: &ThreadData| x as *const ThreadData) { |
58 | thread_data_ptr = tls_thread_data; |
59 | } |
60 | } |
61 | // Otherwise just create a ThreadData on the stack |
62 | let mut thread_data_storage: Option = None; |
63 | if thread_data_ptr.is_null() { |
64 | thread_data_ptr = thread_data_storage.get_or_insert_with(ThreadData::new); |
65 | } |
66 | |
67 | f(unsafe { &*thread_data_ptr }) |
68 | } |
69 | |
70 | const LOCKED_BIT: usize = 1; |
71 | const QUEUE_LOCKED_BIT: usize = 2; |
72 | const QUEUE_MASK: usize = !3; |
73 | |
74 | // Word-sized lock that is used to implement the parking_lot API. Since this |
75 | // can't use parking_lot, it instead manages its own queue of waiting threads. |
76 | pub struct WordLock { |
77 | state: AtomicUsize, |
78 | } |
79 | |
80 | impl WordLock { |
81 | /// Returns a new, unlocked, WordLock. |
82 | pub const fn new() -> Self { |
83 | WordLock { |
84 | state: AtomicUsize::new(0), |
85 | } |
86 | } |
87 | |
88 | #[inline ] |
89 | pub fn lock(&self) { |
90 | if self |
91 | .state |
92 | .compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed) |
93 | .is_ok() |
94 | { |
95 | return; |
96 | } |
97 | self.lock_slow(); |
98 | } |
99 | |
100 | /// Must not be called on an already unlocked `WordLock`! |
101 | #[inline ] |
102 | pub unsafe fn unlock(&self) { |
103 | let state = self.state.fetch_sub(LOCKED_BIT, Ordering::Release); |
104 | if state.is_queue_locked() || state.queue_head().is_null() { |
105 | return; |
106 | } |
107 | self.unlock_slow(); |
108 | } |
109 | |
110 | #[cold ] |
111 | fn lock_slow(&self) { |
112 | let mut spinwait = SpinWait::new(); |
113 | let mut state = self.state.load(Ordering::Relaxed); |
114 | loop { |
115 | // Grab the lock if it isn't locked, even if there is a queue on it |
116 | if !state.is_locked() { |
117 | match self.state.compare_exchange_weak( |
118 | state, |
119 | state | LOCKED_BIT, |
120 | Ordering::Acquire, |
121 | Ordering::Relaxed, |
122 | ) { |
123 | Ok(_) => return, |
124 | Err(x) => state = x, |
125 | } |
126 | continue; |
127 | } |
128 | |
129 | // If there is no queue, try spinning a few times |
130 | if state.queue_head().is_null() && spinwait.spin() { |
131 | state = self.state.load(Ordering::Relaxed); |
132 | continue; |
133 | } |
134 | |
135 | // Get our thread data and prepare it for parking |
136 | state = with_thread_data(|thread_data| { |
137 | // The pthread implementation is still unsafe, so we need to surround `prepare_park` |
138 | // with `unsafe {}`. |
139 | #[allow (unused_unsafe)] |
140 | unsafe { |
141 | thread_data.parker.prepare_park(); |
142 | } |
143 | |
144 | // Add our thread to the front of the queue |
145 | let queue_head = state.queue_head(); |
146 | if queue_head.is_null() { |
147 | thread_data.queue_tail.set(thread_data); |
148 | thread_data.prev.set(ptr::null()); |
149 | } else { |
150 | thread_data.queue_tail.set(ptr::null()); |
151 | thread_data.prev.set(ptr::null()); |
152 | thread_data.next.set(queue_head); |
153 | } |
154 | if let Err(x) = self.state.compare_exchange_weak( |
155 | state, |
156 | state.with_queue_head(thread_data), |
157 | Ordering::AcqRel, |
158 | Ordering::Relaxed, |
159 | ) { |
160 | return x; |
161 | } |
162 | |
163 | // Sleep until we are woken up by an unlock |
164 | // Ignoring unused unsafe, since it's only a few platforms where this is unsafe. |
165 | #[allow (unused_unsafe)] |
166 | unsafe { |
167 | thread_data.parker.park(); |
168 | } |
169 | |
170 | // Loop back and try locking again |
171 | spinwait.reset(); |
172 | self.state.load(Ordering::Relaxed) |
173 | }); |
174 | } |
175 | } |
176 | |
177 | #[cold ] |
178 | fn unlock_slow(&self) { |
179 | let mut state = self.state.load(Ordering::Relaxed); |
180 | loop { |
181 | // We just unlocked the WordLock. Just check if there is a thread |
182 | // to wake up. If the queue is locked then another thread is already |
183 | // taking care of waking up a thread. |
184 | if state.is_queue_locked() || state.queue_head().is_null() { |
185 | return; |
186 | } |
187 | |
188 | // Try to grab the queue lock |
189 | match self.state.compare_exchange_weak( |
190 | state, |
191 | state | QUEUE_LOCKED_BIT, |
192 | Ordering::Acquire, |
193 | Ordering::Relaxed, |
194 | ) { |
195 | Ok(_) => break, |
196 | Err(x) => state = x, |
197 | } |
198 | } |
199 | |
200 | // Now we have the queue lock and the queue is non-empty |
201 | 'outer: loop { |
202 | // First, we need to fill in the prev pointers for any newly added |
203 | // threads. We do this until we reach a node that we previously |
204 | // processed, which has a non-null queue_tail pointer. |
205 | let queue_head = state.queue_head(); |
206 | let mut queue_tail; |
207 | let mut current = queue_head; |
208 | loop { |
209 | queue_tail = unsafe { (*current).queue_tail.get() }; |
210 | if !queue_tail.is_null() { |
211 | break; |
212 | } |
213 | unsafe { |
214 | let next = (*current).next.get(); |
215 | (*next).prev.set(current); |
216 | current = next; |
217 | } |
218 | } |
219 | |
220 | // Set queue_tail on the queue head to indicate that the whole list |
221 | // has prev pointers set correctly. |
222 | unsafe { |
223 | (*queue_head).queue_tail.set(queue_tail); |
224 | } |
225 | |
226 | // If the WordLock is locked, then there is no point waking up a |
227 | // thread now. Instead we let the next unlocker take care of waking |
228 | // up a thread. |
229 | if state.is_locked() { |
230 | match self.state.compare_exchange_weak( |
231 | state, |
232 | state & !QUEUE_LOCKED_BIT, |
233 | Ordering::Release, |
234 | Ordering::Relaxed, |
235 | ) { |
236 | Ok(_) => return, |
237 | Err(x) => state = x, |
238 | } |
239 | |
240 | // Need an acquire fence before reading the new queue |
241 | fence_acquire(&self.state); |
242 | continue; |
243 | } |
244 | |
245 | // Remove the last thread from the queue and unlock the queue |
246 | let new_tail = unsafe { (*queue_tail).prev.get() }; |
247 | if new_tail.is_null() { |
248 | loop { |
249 | match self.state.compare_exchange_weak( |
250 | state, |
251 | state & LOCKED_BIT, |
252 | Ordering::Release, |
253 | Ordering::Relaxed, |
254 | ) { |
255 | Ok(_) => break, |
256 | Err(x) => state = x, |
257 | } |
258 | |
259 | // If the compare_exchange failed because a new thread was |
260 | // added to the queue then we need to re-scan the queue to |
261 | // find the previous element. |
262 | if state.queue_head().is_null() { |
263 | continue; |
264 | } else { |
265 | // Need an acquire fence before reading the new queue |
266 | fence_acquire(&self.state); |
267 | continue 'outer; |
268 | } |
269 | } |
270 | } else { |
271 | unsafe { |
272 | (*queue_head).queue_tail.set(new_tail); |
273 | } |
274 | self.state.fetch_and(!QUEUE_LOCKED_BIT, Ordering::Release); |
275 | } |
276 | |
277 | // Finally, wake up the thread we removed from the queue. Note that |
278 | // we don't need to worry about any races here since the thread is |
279 | // guaranteed to be sleeping right now and we are the only one who |
280 | // can wake it up. |
281 | unsafe { |
282 | (*queue_tail).parker.unpark_lock().unpark(); |
283 | } |
284 | break; |
285 | } |
286 | } |
287 | } |
288 | |
289 | // Thread-Sanitizer only has partial fence support, so when running under it, we |
290 | // try and avoid false positives by using a discarded acquire load instead. |
291 | #[inline ] |
292 | fn fence_acquire(a: &AtomicUsize) { |
293 | if cfg!(tsan_enabled) { |
294 | let _ = a.load(order:Ordering::Acquire); |
295 | } else { |
296 | fence(order:Ordering::Acquire); |
297 | } |
298 | } |
299 | |
300 | trait LockState { |
301 | fn is_locked(self) -> bool; |
302 | fn is_queue_locked(self) -> bool; |
303 | fn queue_head(self) -> *const ThreadData; |
304 | fn with_queue_head(self, thread_data: *const ThreadData) -> Self; |
305 | } |
306 | |
307 | impl LockState for usize { |
308 | #[inline ] |
309 | fn is_locked(self) -> bool { |
310 | self & LOCKED_BIT != 0 |
311 | } |
312 | |
313 | #[inline ] |
314 | fn is_queue_locked(self) -> bool { |
315 | self & QUEUE_LOCKED_BIT != 0 |
316 | } |
317 | |
318 | #[inline ] |
319 | fn queue_head(self) -> *const ThreadData { |
320 | (self & QUEUE_MASK) as *const ThreadData |
321 | } |
322 | |
323 | #[inline ] |
324 | fn with_queue_head(self, thread_data: *const ThreadData) -> Self { |
325 | (self & !QUEUE_MASK) | thread_data as *const _ as usize |
326 | } |
327 | } |
328 | |