1// Copyright 2016 Amanieu d'Antras
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8use crate::spinwait::SpinWait;
9use crate::thread_parker::{ThreadParker, ThreadParkerT, UnparkHandleT};
10use core::{
11 cell::Cell,
12 mem, ptr,
13 sync::atomic::{fence, AtomicUsize, Ordering},
14};
15
16struct ThreadData {
17 parker: ThreadParker,
18
19 // Linked list of threads in the queue. The queue is split into two parts:
20 // the processed part and the unprocessed part. When new nodes are added to
21 // the list, they only have the next pointer set, and queue_tail is null.
22 //
23 // Nodes are processed with the queue lock held, which consists of setting
24 // the prev pointer for each node and setting the queue_tail pointer on the
25 // first processed node of the list.
26 //
27 // This setup allows nodes to be added to the queue without a lock, while
28 // still allowing O(1) removal of nodes from the processed part of the list.
29 // The only cost is the O(n) processing, but this only needs to be done
30 // once for each node, and therefore isn't too expensive.
31 queue_tail: Cell<*const ThreadData>,
32 prev: Cell<*const ThreadData>,
33 next: Cell<*const ThreadData>,
34}
35
36impl ThreadData {
37 #[inline]
38 fn new() -> ThreadData {
39 assert!(mem::align_of::<ThreadData>() > !QUEUE_MASK);
40 ThreadData {
41 parker: ThreadParker::new(),
42 queue_tail: Cell::new(ptr::null()),
43 prev: Cell::new(ptr::null()),
44 next: Cell::new(ptr::null()),
45 }
46 }
47}
48
49// Invokes the given closure with a reference to the current thread `ThreadData`.
50#[inline]
51fn with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T {
52 let mut thread_data_ptr: *const ThreadData = ptr::null();
53 // If ThreadData is expensive to construct, then we want to use a cached
54 // version in thread-local storage if possible.
55 if !ThreadParker::IS_CHEAP_TO_CONSTRUCT {
56 thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
57 if let Ok(tls_thread_data: *const ThreadData) = THREAD_DATA.try_with(|x: &ThreadData| x as *const ThreadData) {
58 thread_data_ptr = tls_thread_data;
59 }
60 }
61 // Otherwise just create a ThreadData on the stack
62 let mut thread_data_storage: Option = None;
63 if thread_data_ptr.is_null() {
64 thread_data_ptr = thread_data_storage.get_or_insert_with(ThreadData::new);
65 }
66
67 f(unsafe { &*thread_data_ptr })
68}
69
70const LOCKED_BIT: usize = 1;
71const QUEUE_LOCKED_BIT: usize = 2;
72const QUEUE_MASK: usize = !3;
73
74// Word-sized lock that is used to implement the parking_lot API. Since this
75// can't use parking_lot, it instead manages its own queue of waiting threads.
76pub struct WordLock {
77 state: AtomicUsize,
78}
79
80impl WordLock {
81 /// Returns a new, unlocked, WordLock.
82 pub const fn new() -> Self {
83 WordLock {
84 state: AtomicUsize::new(0),
85 }
86 }
87
88 #[inline]
89 pub fn lock(&self) {
90 if self
91 .state
92 .compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed)
93 .is_ok()
94 {
95 return;
96 }
97 self.lock_slow();
98 }
99
100 /// Must not be called on an already unlocked `WordLock`!
101 #[inline]
102 pub unsafe fn unlock(&self) {
103 let state = self.state.fetch_sub(LOCKED_BIT, Ordering::Release);
104 if state.is_queue_locked() || state.queue_head().is_null() {
105 return;
106 }
107 self.unlock_slow();
108 }
109
110 #[cold]
111 fn lock_slow(&self) {
112 let mut spinwait = SpinWait::new();
113 let mut state = self.state.load(Ordering::Relaxed);
114 loop {
115 // Grab the lock if it isn't locked, even if there is a queue on it
116 if !state.is_locked() {
117 match self.state.compare_exchange_weak(
118 state,
119 state | LOCKED_BIT,
120 Ordering::Acquire,
121 Ordering::Relaxed,
122 ) {
123 Ok(_) => return,
124 Err(x) => state = x,
125 }
126 continue;
127 }
128
129 // If there is no queue, try spinning a few times
130 if state.queue_head().is_null() && spinwait.spin() {
131 state = self.state.load(Ordering::Relaxed);
132 continue;
133 }
134
135 // Get our thread data and prepare it for parking
136 state = with_thread_data(|thread_data| {
137 // The pthread implementation is still unsafe, so we need to surround `prepare_park`
138 // with `unsafe {}`.
139 #[allow(unused_unsafe)]
140 unsafe {
141 thread_data.parker.prepare_park();
142 }
143
144 // Add our thread to the front of the queue
145 let queue_head = state.queue_head();
146 if queue_head.is_null() {
147 thread_data.queue_tail.set(thread_data);
148 thread_data.prev.set(ptr::null());
149 } else {
150 thread_data.queue_tail.set(ptr::null());
151 thread_data.prev.set(ptr::null());
152 thread_data.next.set(queue_head);
153 }
154 if let Err(x) = self.state.compare_exchange_weak(
155 state,
156 state.with_queue_head(thread_data),
157 Ordering::AcqRel,
158 Ordering::Relaxed,
159 ) {
160 return x;
161 }
162
163 // Sleep until we are woken up by an unlock
164 // Ignoring unused unsafe, since it's only a few platforms where this is unsafe.
165 #[allow(unused_unsafe)]
166 unsafe {
167 thread_data.parker.park();
168 }
169
170 // Loop back and try locking again
171 spinwait.reset();
172 self.state.load(Ordering::Relaxed)
173 });
174 }
175 }
176
177 #[cold]
178 fn unlock_slow(&self) {
179 let mut state = self.state.load(Ordering::Relaxed);
180 loop {
181 // We just unlocked the WordLock. Just check if there is a thread
182 // to wake up. If the queue is locked then another thread is already
183 // taking care of waking up a thread.
184 if state.is_queue_locked() || state.queue_head().is_null() {
185 return;
186 }
187
188 // Try to grab the queue lock
189 match self.state.compare_exchange_weak(
190 state,
191 state | QUEUE_LOCKED_BIT,
192 Ordering::Acquire,
193 Ordering::Relaxed,
194 ) {
195 Ok(_) => break,
196 Err(x) => state = x,
197 }
198 }
199
200 // Now we have the queue lock and the queue is non-empty
201 'outer: loop {
202 // First, we need to fill in the prev pointers for any newly added
203 // threads. We do this until we reach a node that we previously
204 // processed, which has a non-null queue_tail pointer.
205 let queue_head = state.queue_head();
206 let mut queue_tail;
207 let mut current = queue_head;
208 loop {
209 queue_tail = unsafe { (*current).queue_tail.get() };
210 if !queue_tail.is_null() {
211 break;
212 }
213 unsafe {
214 let next = (*current).next.get();
215 (*next).prev.set(current);
216 current = next;
217 }
218 }
219
220 // Set queue_tail on the queue head to indicate that the whole list
221 // has prev pointers set correctly.
222 unsafe {
223 (*queue_head).queue_tail.set(queue_tail);
224 }
225
226 // If the WordLock is locked, then there is no point waking up a
227 // thread now. Instead we let the next unlocker take care of waking
228 // up a thread.
229 if state.is_locked() {
230 match self.state.compare_exchange_weak(
231 state,
232 state & !QUEUE_LOCKED_BIT,
233 Ordering::Release,
234 Ordering::Relaxed,
235 ) {
236 Ok(_) => return,
237 Err(x) => state = x,
238 }
239
240 // Need an acquire fence before reading the new queue
241 fence_acquire(&self.state);
242 continue;
243 }
244
245 // Remove the last thread from the queue and unlock the queue
246 let new_tail = unsafe { (*queue_tail).prev.get() };
247 if new_tail.is_null() {
248 loop {
249 match self.state.compare_exchange_weak(
250 state,
251 state & LOCKED_BIT,
252 Ordering::Release,
253 Ordering::Relaxed,
254 ) {
255 Ok(_) => break,
256 Err(x) => state = x,
257 }
258
259 // If the compare_exchange failed because a new thread was
260 // added to the queue then we need to re-scan the queue to
261 // find the previous element.
262 if state.queue_head().is_null() {
263 continue;
264 } else {
265 // Need an acquire fence before reading the new queue
266 fence_acquire(&self.state);
267 continue 'outer;
268 }
269 }
270 } else {
271 unsafe {
272 (*queue_head).queue_tail.set(new_tail);
273 }
274 self.state.fetch_and(!QUEUE_LOCKED_BIT, Ordering::Release);
275 }
276
277 // Finally, wake up the thread we removed from the queue. Note that
278 // we don't need to worry about any races here since the thread is
279 // guaranteed to be sleeping right now and we are the only one who
280 // can wake it up.
281 unsafe {
282 (*queue_tail).parker.unpark_lock().unpark();
283 }
284 break;
285 }
286 }
287}
288
289// Thread-Sanitizer only has partial fence support, so when running under it, we
290// try and avoid false positives by using a discarded acquire load instead.
291#[inline]
292fn fence_acquire(a: &AtomicUsize) {
293 if cfg!(tsan_enabled) {
294 let _ = a.load(order:Ordering::Acquire);
295 } else {
296 fence(order:Ordering::Acquire);
297 }
298}
299
300trait LockState {
301 fn is_locked(self) -> bool;
302 fn is_queue_locked(self) -> bool;
303 fn queue_head(self) -> *const ThreadData;
304 fn with_queue_head(self, thread_data: *const ThreadData) -> Self;
305}
306
307impl LockState for usize {
308 #[inline]
309 fn is_locked(self) -> bool {
310 self & LOCKED_BIT != 0
311 }
312
313 #[inline]
314 fn is_queue_locked(self) -> bool {
315 self & QUEUE_LOCKED_BIT != 0
316 }
317
318 #[inline]
319 fn queue_head(self) -> *const ThreadData {
320 (self & QUEUE_MASK) as *const ThreadData
321 }
322
323 #[inline]
324 fn with_queue_head(self, thread_data: *const ThreadData) -> Self {
325 (self & !QUEUE_MASK) | thread_data as *const _ as usize
326 }
327}
328