1 | // Copyright 2016 Amanieu d'Antras |
2 | // |
3 | // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or |
4 | // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or |
5 | // http://opensource.org/licenses/MIT>, at your option. This file may not be |
6 | // copied, modified, or distributed except according to those terms. |
7 | use crate::thread_parker::{ThreadParker, ThreadParkerT, UnparkHandleT}; |
8 | use crate::util::UncheckedOptionExt; |
9 | use crate::word_lock::WordLock; |
10 | use core::{ |
11 | cell::{Cell, UnsafeCell}, |
12 | ptr, |
13 | sync::atomic::{AtomicPtr, AtomicUsize, Ordering}, |
14 | }; |
15 | use smallvec::SmallVec; |
16 | use std::time::{Duration, Instant}; |
17 | |
18 | // Don't use Instant on wasm32-unknown-unknown, it just panics. |
19 | cfg_if::cfg_if! { |
20 | if #[cfg(all( |
21 | target_family = "wasm" , |
22 | target_os = "unknown" , |
23 | target_vendor = "unknown" |
24 | ))] { |
25 | #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] |
26 | struct TimeoutInstant; |
27 | impl TimeoutInstant { |
28 | fn now() -> TimeoutInstant { |
29 | TimeoutInstant |
30 | } |
31 | } |
32 | impl core::ops::Add<Duration> for TimeoutInstant { |
33 | type Output = Self; |
34 | fn add(self, _rhs: Duration) -> Self::Output { |
35 | TimeoutInstant |
36 | } |
37 | } |
38 | } else { |
39 | use std::time::Instant as TimeoutInstant; |
40 | } |
41 | } |
42 | |
43 | static NUM_THREADS: AtomicUsize = AtomicUsize::new(0); |
44 | |
45 | /// Holds the pointer to the currently active `HashTable`. |
46 | /// |
47 | /// # Safety |
48 | /// |
49 | /// Except for the initial value of null, it must always point to a valid `HashTable` instance. |
50 | /// Any `HashTable` this global static has ever pointed to must never be freed. |
51 | static HASHTABLE: AtomicPtr<HashTable> = AtomicPtr::new(ptr::null_mut()); |
52 | |
53 | // Even with 3x more buckets than threads, the memory overhead per thread is |
54 | // still only a few hundred bytes per thread. |
55 | const LOAD_FACTOR: usize = 3; |
56 | |
57 | struct HashTable { |
58 | // Hash buckets for the table |
59 | entries: Box<[Bucket]>, |
60 | |
61 | // Number of bits used for the hash function |
62 | hash_bits: u32, |
63 | |
64 | // Previous table. This is only kept to keep leak detectors happy. |
65 | _prev: *const HashTable, |
66 | } |
67 | |
68 | impl HashTable { |
69 | #[inline ] |
70 | fn new(num_threads: usize, prev: *const HashTable) -> Box<HashTable> { |
71 | let new_size = (num_threads * LOAD_FACTOR).next_power_of_two(); |
72 | let hash_bits = 0usize.leading_zeros() - new_size.leading_zeros() - 1; |
73 | |
74 | let now = TimeoutInstant::now(); |
75 | let mut entries = Vec::with_capacity(new_size); |
76 | for i in 0..new_size { |
77 | // We must ensure the seed is not zero |
78 | entries.push(Bucket::new(now, i as u32 + 1)); |
79 | } |
80 | |
81 | Box::new(HashTable { |
82 | entries: entries.into_boxed_slice(), |
83 | hash_bits, |
84 | _prev: prev, |
85 | }) |
86 | } |
87 | } |
88 | |
89 | #[repr (align(64))] |
90 | struct Bucket { |
91 | // Lock protecting the queue |
92 | mutex: WordLock, |
93 | |
94 | // Linked list of threads waiting on this bucket |
95 | queue_head: Cell<*const ThreadData>, |
96 | queue_tail: Cell<*const ThreadData>, |
97 | |
98 | // Next time at which point be_fair should be set |
99 | fair_timeout: UnsafeCell<FairTimeout>, |
100 | } |
101 | |
102 | impl Bucket { |
103 | #[inline ] |
104 | pub fn new(timeout: TimeoutInstant, seed: u32) -> Self { |
105 | Self { |
106 | mutex: WordLock::new(), |
107 | queue_head: Cell::new(ptr::null()), |
108 | queue_tail: Cell::new(ptr::null()), |
109 | fair_timeout: UnsafeCell::new(FairTimeout::new(timeout, seed)), |
110 | } |
111 | } |
112 | } |
113 | |
114 | struct FairTimeout { |
115 | // Next time at which point be_fair should be set |
116 | timeout: TimeoutInstant, |
117 | |
118 | // the PRNG state for calculating the next timeout |
119 | seed: u32, |
120 | } |
121 | |
122 | impl FairTimeout { |
123 | #[inline ] |
124 | fn new(timeout: TimeoutInstant, seed: u32) -> FairTimeout { |
125 | FairTimeout { timeout, seed } |
126 | } |
127 | |
128 | // Determine whether we should force a fair unlock, and update the timeout |
129 | #[inline ] |
130 | fn should_timeout(&mut self) -> bool { |
131 | let now = TimeoutInstant::now(); |
132 | if now > self.timeout { |
133 | // Time between 0 and 1ms. |
134 | let nanos = self.gen_u32() % 1_000_000; |
135 | self.timeout = now + Duration::new(0, nanos); |
136 | true |
137 | } else { |
138 | false |
139 | } |
140 | } |
141 | |
142 | // Pseudorandom number generator from the "Xorshift RNGs" paper by George Marsaglia. |
143 | fn gen_u32(&mut self) -> u32 { |
144 | self.seed ^= self.seed << 13; |
145 | self.seed ^= self.seed >> 17; |
146 | self.seed ^= self.seed << 5; |
147 | self.seed |
148 | } |
149 | } |
150 | |
151 | struct ThreadData { |
152 | parker: ThreadParker, |
153 | |
154 | // Key that this thread is sleeping on. This may change if the thread is |
155 | // requeued to a different key. |
156 | key: AtomicUsize, |
157 | |
158 | // Linked list of parked threads in a bucket |
159 | next_in_queue: Cell<*const ThreadData>, |
160 | |
161 | // UnparkToken passed to this thread when it is unparked |
162 | unpark_token: Cell<UnparkToken>, |
163 | |
164 | // ParkToken value set by the thread when it was parked |
165 | park_token: Cell<ParkToken>, |
166 | |
167 | // Is the thread parked with a timeout? |
168 | parked_with_timeout: Cell<bool>, |
169 | |
170 | // Extra data for deadlock detection |
171 | #[cfg (feature = "deadlock_detection" )] |
172 | deadlock_data: deadlock::DeadlockData, |
173 | } |
174 | |
175 | impl ThreadData { |
176 | fn new() -> ThreadData { |
177 | // Keep track of the total number of live ThreadData objects and resize |
178 | // the hash table accordingly. |
179 | let num_threads = NUM_THREADS.fetch_add(1, Ordering::Relaxed) + 1; |
180 | grow_hashtable(num_threads); |
181 | |
182 | ThreadData { |
183 | parker: ThreadParker::new(), |
184 | key: AtomicUsize::new(0), |
185 | next_in_queue: Cell::new(ptr::null()), |
186 | unpark_token: Cell::new(DEFAULT_UNPARK_TOKEN), |
187 | park_token: Cell::new(DEFAULT_PARK_TOKEN), |
188 | parked_with_timeout: Cell::new(false), |
189 | #[cfg (feature = "deadlock_detection" )] |
190 | deadlock_data: deadlock::DeadlockData::new(), |
191 | } |
192 | } |
193 | } |
194 | |
195 | // Invokes the given closure with a reference to the current thread `ThreadData`. |
196 | #[inline (always)] |
197 | fn with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T { |
198 | // Unlike word_lock::ThreadData, parking_lot::ThreadData is always expensive |
199 | // to construct. Try to use a thread-local version if possible. Otherwise just |
200 | // create a ThreadData on the stack |
201 | let mut thread_data_storage = None; |
202 | thread_local!(static THREAD_DATA: ThreadData = ThreadData::new()); |
203 | let thread_data_ptr = THREAD_DATA |
204 | .try_with(|x| x as *const ThreadData) |
205 | .unwrap_or_else(|_| thread_data_storage.get_or_insert_with(ThreadData::new)); |
206 | |
207 | f(unsafe { &*thread_data_ptr }) |
208 | } |
209 | |
210 | impl Drop for ThreadData { |
211 | fn drop(&mut self) { |
212 | NUM_THREADS.fetch_sub(1, Ordering::Relaxed); |
213 | } |
214 | } |
215 | |
216 | /// Returns a reference to the latest hash table, creating one if it doesn't exist yet. |
217 | /// The reference is valid forever. However, the `HashTable` it references might become stale |
218 | /// at any point. Meaning it still exists, but it is not the instance in active use. |
219 | #[inline ] |
220 | fn get_hashtable() -> &'static HashTable { |
221 | let table = HASHTABLE.load(Ordering::Acquire); |
222 | |
223 | // If there is no table, create one |
224 | if table.is_null() { |
225 | create_hashtable() |
226 | } else { |
227 | // SAFETY: when not null, `HASHTABLE` always points to a `HashTable` that is never freed. |
228 | unsafe { &*table } |
229 | } |
230 | } |
231 | |
232 | /// Returns a reference to the latest hash table, creating one if it doesn't exist yet. |
233 | /// The reference is valid forever. However, the `HashTable` it references might become stale |
234 | /// at any point. Meaning it still exists, but it is not the instance in active use. |
235 | #[cold ] |
236 | fn create_hashtable() -> &'static HashTable { |
237 | let new_table = Box::into_raw(HashTable::new(LOAD_FACTOR, ptr::null())); |
238 | |
239 | // If this fails then it means some other thread created the hash table first. |
240 | let table = match HASHTABLE.compare_exchange( |
241 | ptr::null_mut(), |
242 | new_table, |
243 | Ordering::AcqRel, |
244 | Ordering::Acquire, |
245 | ) { |
246 | Ok(_) => new_table, |
247 | Err(old_table) => { |
248 | // Free the table we created |
249 | // SAFETY: `new_table` is created from `Box::into_raw` above and only freed here. |
250 | unsafe { |
251 | let _ = Box::from_raw(new_table); |
252 | } |
253 | old_table |
254 | } |
255 | }; |
256 | // SAFETY: The `HashTable` behind `table` is never freed. It is either the table pointer we |
257 | // created here, or it is one loaded from `HASHTABLE`. |
258 | unsafe { &*table } |
259 | } |
260 | |
261 | // Grow the hash table so that it is big enough for the given number of threads. |
262 | // This isn't performance-critical since it is only done when a ThreadData is |
263 | // created, which only happens once per thread. |
264 | fn grow_hashtable(num_threads: usize) { |
265 | // Lock all buckets in the existing table and get a reference to it |
266 | let old_table = loop { |
267 | let table = get_hashtable(); |
268 | |
269 | // Check if we need to resize the existing table |
270 | if table.entries.len() >= LOAD_FACTOR * num_threads { |
271 | return; |
272 | } |
273 | |
274 | // Lock all buckets in the old table |
275 | for bucket in &table.entries[..] { |
276 | bucket.mutex.lock(); |
277 | } |
278 | |
279 | // Now check if our table is still the latest one. Another thread could |
280 | // have grown the hash table between us reading HASHTABLE and locking |
281 | // the buckets. |
282 | if HASHTABLE.load(Ordering::Relaxed) == table as *const _ as *mut _ { |
283 | break table; |
284 | } |
285 | |
286 | // Unlock buckets and try again |
287 | for bucket in &table.entries[..] { |
288 | // SAFETY: We hold the lock here, as required |
289 | unsafe { bucket.mutex.unlock() }; |
290 | } |
291 | }; |
292 | |
293 | // Create the new table |
294 | let mut new_table = HashTable::new(num_threads, old_table); |
295 | |
296 | // Move the entries from the old table to the new one |
297 | for bucket in &old_table.entries[..] { |
298 | // SAFETY: The park, unpark* and check_wait_graph_fast functions create only correct linked |
299 | // lists. All `ThreadData` instances in these lists will remain valid as long as they are |
300 | // present in the lists, meaning as long as their threads are parked. |
301 | unsafe { rehash_bucket_into(bucket, &mut new_table) }; |
302 | } |
303 | |
304 | // Publish the new table. No races are possible at this point because |
305 | // any other thread trying to grow the hash table is blocked on the bucket |
306 | // locks in the old table. |
307 | HASHTABLE.store(Box::into_raw(new_table), Ordering::Release); |
308 | |
309 | // Unlock all buckets in the old table |
310 | for bucket in &old_table.entries[..] { |
311 | // SAFETY: We hold the lock here, as required |
312 | unsafe { bucket.mutex.unlock() }; |
313 | } |
314 | } |
315 | |
316 | /// Iterate through all `ThreadData` objects in the bucket and insert them into the given table |
317 | /// in the bucket their key correspond to for this table. |
318 | /// |
319 | /// # Safety |
320 | /// |
321 | /// The given `bucket` must have a correctly constructed linked list under `queue_head`, containing |
322 | /// `ThreadData` instances that must stay valid at least as long as the given `table` is in use. |
323 | /// |
324 | /// The given `table` must only contain buckets with correctly constructed linked lists. |
325 | unsafe fn rehash_bucket_into(bucket: &'static Bucket, table: &mut HashTable) { |
326 | let mut current: *const ThreadData = bucket.queue_head.get(); |
327 | while !current.is_null() { |
328 | let next = (*current).next_in_queue.get(); |
329 | let hash = hash((*current).key.load(Ordering::Relaxed), table.hash_bits); |
330 | if table.entries[hash].queue_tail.get().is_null() { |
331 | table.entries[hash].queue_head.set(current); |
332 | } else { |
333 | (*table.entries[hash].queue_tail.get()) |
334 | .next_in_queue |
335 | .set(current); |
336 | } |
337 | table.entries[hash].queue_tail.set(current); |
338 | (*current).next_in_queue.set(ptr::null()); |
339 | current = next; |
340 | } |
341 | } |
342 | |
343 | // Hash function for addresses |
344 | #[cfg (target_pointer_width = "32" )] |
345 | #[inline ] |
346 | fn hash(key: usize, bits: u32) -> usize { |
347 | key.wrapping_mul(0x9E3779B9) >> (32 - bits) |
348 | } |
349 | #[cfg (target_pointer_width = "64" )] |
350 | #[inline ] |
351 | fn hash(key: usize, bits: u32) -> usize { |
352 | key.wrapping_mul(0x9E3779B97F4A7C15) >> (64 - bits) |
353 | } |
354 | |
355 | /// Locks the bucket for the given key and returns a reference to it. |
356 | /// The returned bucket must be unlocked again in order to not cause deadlocks. |
357 | #[inline ] |
358 | fn lock_bucket(key: usize) -> &'static Bucket { |
359 | loop { |
360 | let hashtable = get_hashtable(); |
361 | |
362 | let hash = hash(key, hashtable.hash_bits); |
363 | let bucket = &hashtable.entries[hash]; |
364 | |
365 | // Lock the bucket |
366 | bucket.mutex.lock(); |
367 | |
368 | // If no other thread has rehashed the table before we grabbed the lock |
369 | // then we are good to go! The lock we grabbed prevents any rehashes. |
370 | if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ { |
371 | return bucket; |
372 | } |
373 | |
374 | // Unlock the bucket and try again |
375 | // SAFETY: We hold the lock here, as required |
376 | unsafe { bucket.mutex.unlock() }; |
377 | } |
378 | } |
379 | |
380 | /// Locks the bucket for the given key and returns a reference to it. But checks that the key |
381 | /// hasn't been changed in the meantime due to a requeue. |
382 | /// The returned bucket must be unlocked again in order to not cause deadlocks. |
383 | #[inline ] |
384 | fn lock_bucket_checked(key: &AtomicUsize) -> (usize, &'static Bucket) { |
385 | loop { |
386 | let hashtable = get_hashtable(); |
387 | let current_key = key.load(Ordering::Relaxed); |
388 | |
389 | let hash = hash(current_key, hashtable.hash_bits); |
390 | let bucket = &hashtable.entries[hash]; |
391 | |
392 | // Lock the bucket |
393 | bucket.mutex.lock(); |
394 | |
395 | // Check that both the hash table and key are correct while the bucket |
396 | // is locked. Note that the key can't change once we locked the proper |
397 | // bucket for it, so we just keep trying until we have the correct key. |
398 | if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ |
399 | && key.load(Ordering::Relaxed) == current_key |
400 | { |
401 | return (current_key, bucket); |
402 | } |
403 | |
404 | // Unlock the bucket and try again |
405 | // SAFETY: We hold the lock here, as required |
406 | unsafe { bucket.mutex.unlock() }; |
407 | } |
408 | } |
409 | |
410 | /// Locks the two buckets for the given pair of keys and returns references to them. |
411 | /// The returned buckets must be unlocked again in order to not cause deadlocks. |
412 | /// |
413 | /// If both keys hash to the same value, both returned references will be to the same bucket. Be |
414 | /// careful to only unlock it once in this case, always use `unlock_bucket_pair`. |
415 | #[inline ] |
416 | fn lock_bucket_pair(key1: usize, key2: usize) -> (&'static Bucket, &'static Bucket) { |
417 | loop { |
418 | let hashtable = get_hashtable(); |
419 | |
420 | let hash1 = hash(key1, hashtable.hash_bits); |
421 | let hash2 = hash(key2, hashtable.hash_bits); |
422 | |
423 | // Get the bucket at the lowest hash/index first |
424 | let bucket1 = if hash1 <= hash2 { |
425 | &hashtable.entries[hash1] |
426 | } else { |
427 | &hashtable.entries[hash2] |
428 | }; |
429 | |
430 | // Lock the first bucket |
431 | bucket1.mutex.lock(); |
432 | |
433 | // If no other thread has rehashed the table before we grabbed the lock |
434 | // then we are good to go! The lock we grabbed prevents any rehashes. |
435 | if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ { |
436 | // Now lock the second bucket and return the two buckets |
437 | if hash1 == hash2 { |
438 | return (bucket1, bucket1); |
439 | } else if hash1 < hash2 { |
440 | let bucket2 = &hashtable.entries[hash2]; |
441 | bucket2.mutex.lock(); |
442 | return (bucket1, bucket2); |
443 | } else { |
444 | let bucket2 = &hashtable.entries[hash1]; |
445 | bucket2.mutex.lock(); |
446 | return (bucket2, bucket1); |
447 | } |
448 | } |
449 | |
450 | // Unlock the bucket and try again |
451 | // SAFETY: We hold the lock here, as required |
452 | unsafe { bucket1.mutex.unlock() }; |
453 | } |
454 | } |
455 | |
456 | /// Unlock a pair of buckets |
457 | /// |
458 | /// # Safety |
459 | /// |
460 | /// Both buckets must be locked |
461 | #[inline ] |
462 | unsafe fn unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket) { |
463 | bucket1.mutex.unlock(); |
464 | if !ptr::eq(bucket1, bucket2) { |
465 | bucket2.mutex.unlock(); |
466 | } |
467 | } |
468 | |
469 | /// Result of a park operation. |
470 | #[derive(Copy, Clone, Eq, PartialEq, Debug)] |
471 | pub enum ParkResult { |
472 | /// We were unparked by another thread with the given token. |
473 | Unparked(UnparkToken), |
474 | |
475 | /// The validation callback returned false. |
476 | Invalid, |
477 | |
478 | /// The timeout expired. |
479 | TimedOut, |
480 | } |
481 | |
482 | impl ParkResult { |
483 | /// Returns true if we were unparked by another thread. |
484 | #[inline ] |
485 | pub fn is_unparked(self) -> bool { |
486 | if let ParkResult::Unparked(_) = self { |
487 | true |
488 | } else { |
489 | false |
490 | } |
491 | } |
492 | } |
493 | |
494 | /// Result of an unpark operation. |
495 | #[derive(Copy, Clone, Default, Eq, PartialEq, Debug)] |
496 | pub struct UnparkResult { |
497 | /// The number of threads that were unparked. |
498 | pub unparked_threads: usize, |
499 | |
500 | /// The number of threads that were requeued. |
501 | pub requeued_threads: usize, |
502 | |
503 | /// Whether there are any threads remaining in the queue. This only returns |
504 | /// true if a thread was unparked. |
505 | pub have_more_threads: bool, |
506 | |
507 | /// This is set to true on average once every 0.5ms for any given key. It |
508 | /// should be used to switch to a fair unlocking mechanism for a particular |
509 | /// unlock. |
510 | pub be_fair: bool, |
511 | |
512 | /// Private field so new fields can be added without breakage. |
513 | _sealed: (), |
514 | } |
515 | |
516 | /// Operation that `unpark_requeue` should perform. |
517 | #[derive(Copy, Clone, Eq, PartialEq, Debug)] |
518 | pub enum RequeueOp { |
519 | /// Abort the operation without doing anything. |
520 | Abort, |
521 | |
522 | /// Unpark one thread and requeue the rest onto the target queue. |
523 | UnparkOneRequeueRest, |
524 | |
525 | /// Requeue all threads onto the target queue. |
526 | RequeueAll, |
527 | |
528 | /// Unpark one thread and leave the rest parked. No requeuing is done. |
529 | UnparkOne, |
530 | |
531 | /// Requeue one thread and leave the rest parked on the original queue. |
532 | RequeueOne, |
533 | } |
534 | |
535 | /// Operation that `unpark_filter` should perform for each thread. |
536 | #[derive(Copy, Clone, Eq, PartialEq, Debug)] |
537 | pub enum FilterOp { |
538 | /// Unpark the thread and continue scanning the list of parked threads. |
539 | Unpark, |
540 | |
541 | /// Don't unpark the thread and continue scanning the list of parked threads. |
542 | Skip, |
543 | |
544 | /// Don't unpark the thread and stop scanning the list of parked threads. |
545 | Stop, |
546 | } |
547 | |
548 | /// A value which is passed from an unparker to a parked thread. |
549 | #[derive(Copy, Clone, Eq, PartialEq, Debug)] |
550 | pub struct UnparkToken(pub usize); |
551 | |
552 | /// A value associated with a parked thread which can be used by `unpark_filter`. |
553 | #[derive(Copy, Clone, Eq, PartialEq, Debug)] |
554 | pub struct ParkToken(pub usize); |
555 | |
556 | /// A default unpark token to use. |
557 | pub const DEFAULT_UNPARK_TOKEN: UnparkToken = UnparkToken(0); |
558 | |
559 | /// A default park token to use. |
560 | pub const DEFAULT_PARK_TOKEN: ParkToken = ParkToken(0); |
561 | |
562 | /// Parks the current thread in the queue associated with the given key. |
563 | /// |
564 | /// The `validate` function is called while the queue is locked and can abort |
565 | /// the operation by returning false. If `validate` returns true then the |
566 | /// current thread is appended to the queue and the queue is unlocked. |
567 | /// |
568 | /// The `before_sleep` function is called after the queue is unlocked but before |
569 | /// the thread is put to sleep. The thread will then sleep until it is unparked |
570 | /// or the given timeout is reached. |
571 | /// |
572 | /// The `timed_out` function is also called while the queue is locked, but only |
573 | /// if the timeout was reached. It is passed the key of the queue it was in when |
574 | /// it timed out, which may be different from the original key if |
575 | /// `unpark_requeue` was called. It is also passed a bool which indicates |
576 | /// whether it was the last thread in the queue. |
577 | /// |
578 | /// # Safety |
579 | /// |
580 | /// You should only call this function with an address that you control, since |
581 | /// you could otherwise interfere with the operation of other synchronization |
582 | /// primitives. |
583 | /// |
584 | /// The `validate` and `timed_out` functions are called while the queue is |
585 | /// locked and must not panic or call into any function in `parking_lot`. |
586 | /// |
587 | /// The `before_sleep` function is called outside the queue lock and is allowed |
588 | /// to call `unpark_one`, `unpark_all`, `unpark_requeue` or `unpark_filter`, but |
589 | /// it is not allowed to call `park` or panic. |
590 | #[inline ] |
591 | pub unsafe fn park( |
592 | key: usize, |
593 | validate: impl FnOnce() -> bool, |
594 | before_sleep: impl FnOnce(), |
595 | timed_out: impl FnOnce(usize, bool), |
596 | park_token: ParkToken, |
597 | timeout: Option<Instant>, |
598 | ) -> ParkResult { |
599 | // Grab our thread data, this also ensures that the hash table exists |
600 | with_thread_data(|thread_data| { |
601 | // Lock the bucket for the given key |
602 | let bucket = lock_bucket(key); |
603 | |
604 | // If the validation function fails, just return |
605 | if !validate() { |
606 | // SAFETY: We hold the lock here, as required |
607 | bucket.mutex.unlock(); |
608 | return ParkResult::Invalid; |
609 | } |
610 | |
611 | // Append our thread data to the queue and unlock the bucket |
612 | thread_data.parked_with_timeout.set(timeout.is_some()); |
613 | thread_data.next_in_queue.set(ptr::null()); |
614 | thread_data.key.store(key, Ordering::Relaxed); |
615 | thread_data.park_token.set(park_token); |
616 | thread_data.parker.prepare_park(); |
617 | if !bucket.queue_head.get().is_null() { |
618 | (*bucket.queue_tail.get()).next_in_queue.set(thread_data); |
619 | } else { |
620 | bucket.queue_head.set(thread_data); |
621 | } |
622 | bucket.queue_tail.set(thread_data); |
623 | // SAFETY: We hold the lock here, as required |
624 | bucket.mutex.unlock(); |
625 | |
626 | // Invoke the pre-sleep callback |
627 | before_sleep(); |
628 | |
629 | // Park our thread and determine whether we were woken up by an unpark |
630 | // or by our timeout. Note that this isn't precise: we can still be |
631 | // unparked since we are still in the queue. |
632 | let unparked = match timeout { |
633 | Some(timeout) => thread_data.parker.park_until(timeout), |
634 | None => { |
635 | thread_data.parker.park(); |
636 | // call deadlock detection on_unpark hook |
637 | deadlock::on_unpark(thread_data); |
638 | true |
639 | } |
640 | }; |
641 | |
642 | // If we were unparked, return now |
643 | if unparked { |
644 | return ParkResult::Unparked(thread_data.unpark_token.get()); |
645 | } |
646 | |
647 | // Lock our bucket again. Note that the hashtable may have been rehashed in |
648 | // the meantime. Our key may also have changed if we were requeued. |
649 | let (key, bucket) = lock_bucket_checked(&thread_data.key); |
650 | |
651 | // Now we need to check again if we were unparked or timed out. Unlike the |
652 | // last check this is precise because we hold the bucket lock. |
653 | if !thread_data.parker.timed_out() { |
654 | // SAFETY: We hold the lock here, as required |
655 | bucket.mutex.unlock(); |
656 | return ParkResult::Unparked(thread_data.unpark_token.get()); |
657 | } |
658 | |
659 | // We timed out, so we now need to remove our thread from the queue |
660 | let mut link = &bucket.queue_head; |
661 | let mut current = bucket.queue_head.get(); |
662 | let mut previous = ptr::null(); |
663 | let mut was_last_thread = true; |
664 | while !current.is_null() { |
665 | if current == thread_data { |
666 | let next = (*current).next_in_queue.get(); |
667 | link.set(next); |
668 | if bucket.queue_tail.get() == current { |
669 | bucket.queue_tail.set(previous); |
670 | } else { |
671 | // Scan the rest of the queue to see if there are any other |
672 | // entries with the given key. |
673 | let mut scan = next; |
674 | while !scan.is_null() { |
675 | if (*scan).key.load(Ordering::Relaxed) == key { |
676 | was_last_thread = false; |
677 | break; |
678 | } |
679 | scan = (*scan).next_in_queue.get(); |
680 | } |
681 | } |
682 | |
683 | // Callback to indicate that we timed out, and whether we were the |
684 | // last thread on the queue. |
685 | timed_out(key, was_last_thread); |
686 | break; |
687 | } else { |
688 | if (*current).key.load(Ordering::Relaxed) == key { |
689 | was_last_thread = false; |
690 | } |
691 | link = &(*current).next_in_queue; |
692 | previous = current; |
693 | current = link.get(); |
694 | } |
695 | } |
696 | |
697 | // There should be no way for our thread to have been removed from the queue |
698 | // if we timed out. |
699 | debug_assert!(!current.is_null()); |
700 | |
701 | // Unlock the bucket, we are done |
702 | // SAFETY: We hold the lock here, as required |
703 | bucket.mutex.unlock(); |
704 | ParkResult::TimedOut |
705 | }) |
706 | } |
707 | |
708 | /// Unparks one thread from the queue associated with the given key. |
709 | /// |
710 | /// The `callback` function is called while the queue is locked and before the |
711 | /// target thread is woken up. The `UnparkResult` argument to the function |
712 | /// indicates whether a thread was found in the queue and whether this was the |
713 | /// last thread in the queue. This value is also returned by `unpark_one`. |
714 | /// |
715 | /// The `callback` function should return an `UnparkToken` value which will be |
716 | /// passed to the thread that is unparked. If no thread is unparked then the |
717 | /// returned value is ignored. |
718 | /// |
719 | /// # Safety |
720 | /// |
721 | /// You should only call this function with an address that you control, since |
722 | /// you could otherwise interfere with the operation of other synchronization |
723 | /// primitives. |
724 | /// |
725 | /// The `callback` function is called while the queue is locked and must not |
726 | /// panic or call into any function in `parking_lot`. |
727 | /// |
728 | /// The `parking_lot` functions are not re-entrant and calling this method |
729 | /// from the context of an asynchronous signal handler may result in undefined |
730 | /// behavior, including corruption of internal state and/or deadlocks. |
731 | #[inline ] |
732 | pub unsafe fn unpark_one( |
733 | key: usize, |
734 | callback: impl FnOnce(UnparkResult) -> UnparkToken, |
735 | ) -> UnparkResult { |
736 | // Lock the bucket for the given key |
737 | let bucket = lock_bucket(key); |
738 | |
739 | // Find a thread with a matching key and remove it from the queue |
740 | let mut link = &bucket.queue_head; |
741 | let mut current = bucket.queue_head.get(); |
742 | let mut previous = ptr::null(); |
743 | let mut result = UnparkResult::default(); |
744 | while !current.is_null() { |
745 | if (*current).key.load(Ordering::Relaxed) == key { |
746 | // Remove the thread from the queue |
747 | let next = (*current).next_in_queue.get(); |
748 | link.set(next); |
749 | if bucket.queue_tail.get() == current { |
750 | bucket.queue_tail.set(previous); |
751 | } else { |
752 | // Scan the rest of the queue to see if there are any other |
753 | // entries with the given key. |
754 | let mut scan = next; |
755 | while !scan.is_null() { |
756 | if (*scan).key.load(Ordering::Relaxed) == key { |
757 | result.have_more_threads = true; |
758 | break; |
759 | } |
760 | scan = (*scan).next_in_queue.get(); |
761 | } |
762 | } |
763 | |
764 | // Invoke the callback before waking up the thread |
765 | result.unparked_threads = 1; |
766 | result.be_fair = (*bucket.fair_timeout.get()).should_timeout(); |
767 | let token = callback(result); |
768 | |
769 | // Set the token for the target thread |
770 | (*current).unpark_token.set(token); |
771 | |
772 | // This is a bit tricky: we first lock the ThreadParker to prevent |
773 | // the thread from exiting and freeing its ThreadData if its wait |
774 | // times out. Then we unlock the queue since we don't want to keep |
775 | // the queue locked while we perform a system call. Finally we wake |
776 | // up the parked thread. |
777 | let handle = (*current).parker.unpark_lock(); |
778 | // SAFETY: We hold the lock here, as required |
779 | bucket.mutex.unlock(); |
780 | handle.unpark(); |
781 | |
782 | return result; |
783 | } else { |
784 | link = &(*current).next_in_queue; |
785 | previous = current; |
786 | current = link.get(); |
787 | } |
788 | } |
789 | |
790 | // No threads with a matching key were found in the bucket |
791 | callback(result); |
792 | // SAFETY: We hold the lock here, as required |
793 | bucket.mutex.unlock(); |
794 | result |
795 | } |
796 | |
797 | /// Unparks all threads in the queue associated with the given key. |
798 | /// |
799 | /// The given `UnparkToken` is passed to all unparked threads. |
800 | /// |
801 | /// This function returns the number of threads that were unparked. |
802 | /// |
803 | /// # Safety |
804 | /// |
805 | /// You should only call this function with an address that you control, since |
806 | /// you could otherwise interfere with the operation of other synchronization |
807 | /// primitives. |
808 | /// |
809 | /// The `parking_lot` functions are not re-entrant and calling this method |
810 | /// from the context of an asynchronous signal handler may result in undefined |
811 | /// behavior, including corruption of internal state and/or deadlocks. |
812 | #[inline ] |
813 | pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize { |
814 | // Lock the bucket for the given key |
815 | let bucket = lock_bucket(key); |
816 | |
817 | // Remove all threads with the given key in the bucket |
818 | let mut link = &bucket.queue_head; |
819 | let mut current = bucket.queue_head.get(); |
820 | let mut previous = ptr::null(); |
821 | let mut threads = SmallVec::<[_; 8]>::new(); |
822 | while !current.is_null() { |
823 | if (*current).key.load(Ordering::Relaxed) == key { |
824 | // Remove the thread from the queue |
825 | let next = (*current).next_in_queue.get(); |
826 | link.set(next); |
827 | if bucket.queue_tail.get() == current { |
828 | bucket.queue_tail.set(previous); |
829 | } |
830 | |
831 | // Set the token for the target thread |
832 | (*current).unpark_token.set(unpark_token); |
833 | |
834 | // Don't wake up threads while holding the queue lock. See comment |
835 | // in unpark_one. For now just record which threads we need to wake |
836 | // up. |
837 | threads.push((*current).parker.unpark_lock()); |
838 | current = next; |
839 | } else { |
840 | link = &(*current).next_in_queue; |
841 | previous = current; |
842 | current = link.get(); |
843 | } |
844 | } |
845 | |
846 | // Unlock the bucket |
847 | // SAFETY: We hold the lock here, as required |
848 | bucket.mutex.unlock(); |
849 | |
850 | // Now that we are outside the lock, wake up all the threads that we removed |
851 | // from the queue. |
852 | let num_threads = threads.len(); |
853 | for handle in threads.into_iter() { |
854 | handle.unpark(); |
855 | } |
856 | |
857 | num_threads |
858 | } |
859 | |
860 | /// Removes all threads from the queue associated with `key_from`, optionally |
861 | /// unparks the first one and requeues the rest onto the queue associated with |
862 | /// `key_to`. |
863 | /// |
864 | /// The `validate` function is called while both queues are locked. Its return |
865 | /// value will determine which operation is performed, or whether the operation |
866 | /// should be aborted. See `RequeueOp` for details about the different possible |
867 | /// return values. |
868 | /// |
869 | /// The `callback` function is also called while both queues are locked. It is |
870 | /// passed the `RequeueOp` returned by `validate` and an `UnparkResult` |
871 | /// indicating whether a thread was unparked and whether there are threads still |
872 | /// parked in the new queue. This `UnparkResult` value is also returned by |
873 | /// `unpark_requeue`. |
874 | /// |
875 | /// The `callback` function should return an `UnparkToken` value which will be |
876 | /// passed to the thread that is unparked. If no thread is unparked then the |
877 | /// returned value is ignored. |
878 | /// |
879 | /// # Safety |
880 | /// |
881 | /// You should only call this function with an address that you control, since |
882 | /// you could otherwise interfere with the operation of other synchronization |
883 | /// primitives. |
884 | /// |
885 | /// The `validate` and `callback` functions are called while the queue is locked |
886 | /// and must not panic or call into any function in `parking_lot`. |
887 | #[inline ] |
888 | pub unsafe fn unpark_requeue( |
889 | key_from: usize, |
890 | key_to: usize, |
891 | validate: impl FnOnce() -> RequeueOp, |
892 | callback: impl FnOnce(RequeueOp, UnparkResult) -> UnparkToken, |
893 | ) -> UnparkResult { |
894 | // Lock the two buckets for the given key |
895 | let (bucket_from, bucket_to) = lock_bucket_pair(key_from, key_to); |
896 | |
897 | // If the validation function fails, just return |
898 | let mut result = UnparkResult::default(); |
899 | let op = validate(); |
900 | if op == RequeueOp::Abort { |
901 | // SAFETY: Both buckets are locked, as required. |
902 | unlock_bucket_pair(bucket_from, bucket_to); |
903 | return result; |
904 | } |
905 | |
906 | // Remove all threads with the given key in the source bucket |
907 | let mut link = &bucket_from.queue_head; |
908 | let mut current = bucket_from.queue_head.get(); |
909 | let mut previous = ptr::null(); |
910 | let mut requeue_threads: *const ThreadData = ptr::null(); |
911 | let mut requeue_threads_tail: *const ThreadData = ptr::null(); |
912 | let mut wakeup_thread = None; |
913 | while !current.is_null() { |
914 | if (*current).key.load(Ordering::Relaxed) == key_from { |
915 | // Remove the thread from the queue |
916 | let next = (*current).next_in_queue.get(); |
917 | link.set(next); |
918 | if bucket_from.queue_tail.get() == current { |
919 | bucket_from.queue_tail.set(previous); |
920 | } |
921 | |
922 | // Prepare the first thread for wakeup and requeue the rest. |
923 | if (op == RequeueOp::UnparkOneRequeueRest || op == RequeueOp::UnparkOne) |
924 | && wakeup_thread.is_none() |
925 | { |
926 | wakeup_thread = Some(current); |
927 | result.unparked_threads = 1; |
928 | } else { |
929 | if !requeue_threads.is_null() { |
930 | (*requeue_threads_tail).next_in_queue.set(current); |
931 | } else { |
932 | requeue_threads = current; |
933 | } |
934 | requeue_threads_tail = current; |
935 | (*current).key.store(key_to, Ordering::Relaxed); |
936 | result.requeued_threads += 1; |
937 | } |
938 | if op == RequeueOp::UnparkOne || op == RequeueOp::RequeueOne { |
939 | // Scan the rest of the queue to see if there are any other |
940 | // entries with the given key. |
941 | let mut scan = next; |
942 | while !scan.is_null() { |
943 | if (*scan).key.load(Ordering::Relaxed) == key_from { |
944 | result.have_more_threads = true; |
945 | break; |
946 | } |
947 | scan = (*scan).next_in_queue.get(); |
948 | } |
949 | break; |
950 | } |
951 | current = next; |
952 | } else { |
953 | link = &(*current).next_in_queue; |
954 | previous = current; |
955 | current = link.get(); |
956 | } |
957 | } |
958 | |
959 | // Add the requeued threads to the destination bucket |
960 | if !requeue_threads.is_null() { |
961 | (*requeue_threads_tail).next_in_queue.set(ptr::null()); |
962 | if !bucket_to.queue_head.get().is_null() { |
963 | (*bucket_to.queue_tail.get()) |
964 | .next_in_queue |
965 | .set(requeue_threads); |
966 | } else { |
967 | bucket_to.queue_head.set(requeue_threads); |
968 | } |
969 | bucket_to.queue_tail.set(requeue_threads_tail); |
970 | } |
971 | |
972 | // Invoke the callback before waking up the thread |
973 | if result.unparked_threads != 0 { |
974 | result.be_fair = (*bucket_from.fair_timeout.get()).should_timeout(); |
975 | } |
976 | let token = callback(op, result); |
977 | |
978 | // See comment in unpark_one for why we mess with the locking |
979 | if let Some(wakeup_thread) = wakeup_thread { |
980 | (*wakeup_thread).unpark_token.set(token); |
981 | let handle = (*wakeup_thread).parker.unpark_lock(); |
982 | // SAFETY: Both buckets are locked, as required. |
983 | unlock_bucket_pair(bucket_from, bucket_to); |
984 | handle.unpark(); |
985 | } else { |
986 | // SAFETY: Both buckets are locked, as required. |
987 | unlock_bucket_pair(bucket_from, bucket_to); |
988 | } |
989 | |
990 | result |
991 | } |
992 | |
993 | /// Unparks a number of threads from the front of the queue associated with |
994 | /// `key` depending on the results of a filter function which inspects the |
995 | /// `ParkToken` associated with each thread. |
996 | /// |
997 | /// The `filter` function is called for each thread in the queue or until |
998 | /// `FilterOp::Stop` is returned. This function is passed the `ParkToken` |
999 | /// associated with a particular thread, which is unparked if `FilterOp::Unpark` |
1000 | /// is returned. |
1001 | /// |
1002 | /// The `callback` function is also called while both queues are locked. It is |
1003 | /// passed an `UnparkResult` indicating the number of threads that were unparked |
1004 | /// and whether there are still parked threads in the queue. This `UnparkResult` |
1005 | /// value is also returned by `unpark_filter`. |
1006 | /// |
1007 | /// The `callback` function should return an `UnparkToken` value which will be |
1008 | /// passed to all threads that are unparked. If no thread is unparked then the |
1009 | /// returned value is ignored. |
1010 | /// |
1011 | /// # Safety |
1012 | /// |
1013 | /// You should only call this function with an address that you control, since |
1014 | /// you could otherwise interfere with the operation of other synchronization |
1015 | /// primitives. |
1016 | /// |
1017 | /// The `filter` and `callback` functions are called while the queue is locked |
1018 | /// and must not panic or call into any function in `parking_lot`. |
1019 | #[inline ] |
1020 | pub unsafe fn unpark_filter( |
1021 | key: usize, |
1022 | mut filter: impl FnMut(ParkToken) -> FilterOp, |
1023 | callback: impl FnOnce(UnparkResult) -> UnparkToken, |
1024 | ) -> UnparkResult { |
1025 | // Lock the bucket for the given key |
1026 | let bucket = lock_bucket(key); |
1027 | |
1028 | // Go through the queue looking for threads with a matching key |
1029 | let mut link = &bucket.queue_head; |
1030 | let mut current = bucket.queue_head.get(); |
1031 | let mut previous = ptr::null(); |
1032 | let mut threads = SmallVec::<[_; 8]>::new(); |
1033 | let mut result = UnparkResult::default(); |
1034 | while !current.is_null() { |
1035 | if (*current).key.load(Ordering::Relaxed) == key { |
1036 | // Call the filter function with the thread's ParkToken |
1037 | let next = (*current).next_in_queue.get(); |
1038 | match filter((*current).park_token.get()) { |
1039 | FilterOp::Unpark => { |
1040 | // Remove the thread from the queue |
1041 | link.set(next); |
1042 | if bucket.queue_tail.get() == current { |
1043 | bucket.queue_tail.set(previous); |
1044 | } |
1045 | |
1046 | // Add the thread to our list of threads to unpark |
1047 | threads.push((current, None)); |
1048 | |
1049 | current = next; |
1050 | } |
1051 | FilterOp::Skip => { |
1052 | result.have_more_threads = true; |
1053 | link = &(*current).next_in_queue; |
1054 | previous = current; |
1055 | current = link.get(); |
1056 | } |
1057 | FilterOp::Stop => { |
1058 | result.have_more_threads = true; |
1059 | break; |
1060 | } |
1061 | } |
1062 | } else { |
1063 | link = &(*current).next_in_queue; |
1064 | previous = current; |
1065 | current = link.get(); |
1066 | } |
1067 | } |
1068 | |
1069 | // Invoke the callback before waking up the threads |
1070 | result.unparked_threads = threads.len(); |
1071 | if result.unparked_threads != 0 { |
1072 | result.be_fair = (*bucket.fair_timeout.get()).should_timeout(); |
1073 | } |
1074 | let token = callback(result); |
1075 | |
1076 | // Pass the token to all threads that are going to be unparked and prepare |
1077 | // them for unparking. |
1078 | for t in threads.iter_mut() { |
1079 | (*t.0).unpark_token.set(token); |
1080 | t.1 = Some((*t.0).parker.unpark_lock()); |
1081 | } |
1082 | |
1083 | // SAFETY: We hold the lock here, as required |
1084 | bucket.mutex.unlock(); |
1085 | |
1086 | // Now that we are outside the lock, wake up all the threads that we removed |
1087 | // from the queue. |
1088 | for (_, handle) in threads.into_iter() { |
1089 | handle.unchecked_unwrap().unpark(); |
1090 | } |
1091 | |
1092 | result |
1093 | } |
1094 | |
1095 | /// \[Experimental\] Deadlock detection |
1096 | /// |
1097 | /// Enabled via the `deadlock_detection` feature flag. |
1098 | pub mod deadlock { |
1099 | #[cfg (feature = "deadlock_detection" )] |
1100 | use super::deadlock_impl; |
1101 | |
1102 | #[cfg (feature = "deadlock_detection" )] |
1103 | pub(super) use super::deadlock_impl::DeadlockData; |
1104 | |
1105 | /// Acquire a resource identified by key in the deadlock detector |
1106 | /// Noop if deadlock_detection feature isn't enabled. |
1107 | /// |
1108 | /// # Safety |
1109 | /// |
1110 | /// Call after the resource is acquired |
1111 | #[inline ] |
1112 | pub unsafe fn acquire_resource(_key: usize) { |
1113 | #[cfg (feature = "deadlock_detection" )] |
1114 | deadlock_impl::acquire_resource(_key); |
1115 | } |
1116 | |
1117 | /// Release a resource identified by key in the deadlock detector. |
1118 | /// Noop if deadlock_detection feature isn't enabled. |
1119 | /// |
1120 | /// # Panics |
1121 | /// |
1122 | /// Panics if the resource was already released or wasn't acquired in this thread. |
1123 | /// |
1124 | /// # Safety |
1125 | /// |
1126 | /// Call before the resource is released |
1127 | #[inline ] |
1128 | pub unsafe fn release_resource(_key: usize) { |
1129 | #[cfg (feature = "deadlock_detection" )] |
1130 | deadlock_impl::release_resource(_key); |
1131 | } |
1132 | |
1133 | /// Returns all deadlocks detected *since* the last call. |
1134 | /// Each cycle consist of a vector of `DeadlockedThread`. |
1135 | #[cfg (feature = "deadlock_detection" )] |
1136 | #[inline ] |
1137 | pub fn check_deadlock() -> Vec<Vec<deadlock_impl::DeadlockedThread>> { |
1138 | deadlock_impl::check_deadlock() |
1139 | } |
1140 | |
1141 | #[inline ] |
1142 | pub(super) unsafe fn on_unpark(_td: &super::ThreadData) { |
1143 | #[cfg (feature = "deadlock_detection" )] |
1144 | deadlock_impl::on_unpark(_td); |
1145 | } |
1146 | } |
1147 | |
1148 | #[cfg (feature = "deadlock_detection" )] |
1149 | mod deadlock_impl { |
1150 | use super::{get_hashtable, lock_bucket, with_thread_data, ThreadData, NUM_THREADS}; |
1151 | use crate::thread_parker::{ThreadParkerT, UnparkHandleT}; |
1152 | use crate::word_lock::WordLock; |
1153 | use backtrace::Backtrace; |
1154 | use petgraph; |
1155 | use petgraph::graphmap::DiGraphMap; |
1156 | use std::cell::{Cell, UnsafeCell}; |
1157 | use std::collections::HashSet; |
1158 | use std::sync::atomic::Ordering; |
1159 | use std::sync::mpsc; |
1160 | use thread_id; |
1161 | |
1162 | /// Representation of a deadlocked thread |
1163 | pub struct DeadlockedThread { |
1164 | thread_id: usize, |
1165 | backtrace: Backtrace, |
1166 | } |
1167 | |
1168 | impl DeadlockedThread { |
1169 | /// The system thread id |
1170 | pub fn thread_id(&self) -> usize { |
1171 | self.thread_id |
1172 | } |
1173 | |
1174 | /// The thread backtrace |
1175 | pub fn backtrace(&self) -> &Backtrace { |
1176 | &self.backtrace |
1177 | } |
1178 | } |
1179 | |
1180 | pub struct DeadlockData { |
1181 | // Currently owned resources (keys) |
1182 | resources: UnsafeCell<Vec<usize>>, |
1183 | |
1184 | // Set when there's a pending callstack request |
1185 | deadlocked: Cell<bool>, |
1186 | |
1187 | // Sender used to report the backtrace |
1188 | backtrace_sender: UnsafeCell<Option<mpsc::Sender<DeadlockedThread>>>, |
1189 | |
1190 | // System thread id |
1191 | thread_id: usize, |
1192 | } |
1193 | |
1194 | impl DeadlockData { |
1195 | pub fn new() -> Self { |
1196 | DeadlockData { |
1197 | resources: UnsafeCell::new(Vec::new()), |
1198 | deadlocked: Cell::new(false), |
1199 | backtrace_sender: UnsafeCell::new(None), |
1200 | thread_id: thread_id::get(), |
1201 | } |
1202 | } |
1203 | } |
1204 | |
1205 | pub(super) unsafe fn on_unpark(td: &ThreadData) { |
1206 | if td.deadlock_data.deadlocked.get() { |
1207 | let sender = (*td.deadlock_data.backtrace_sender.get()).take().unwrap(); |
1208 | sender |
1209 | .send(DeadlockedThread { |
1210 | thread_id: td.deadlock_data.thread_id, |
1211 | backtrace: Backtrace::new(), |
1212 | }) |
1213 | .unwrap(); |
1214 | // make sure to close this sender |
1215 | drop(sender); |
1216 | |
1217 | // park until the end of the time |
1218 | td.parker.prepare_park(); |
1219 | td.parker.park(); |
1220 | unreachable!("unparked deadlocked thread!" ); |
1221 | } |
1222 | } |
1223 | |
1224 | pub unsafe fn acquire_resource(key: usize) { |
1225 | with_thread_data(|thread_data| { |
1226 | (*thread_data.deadlock_data.resources.get()).push(key); |
1227 | }); |
1228 | } |
1229 | |
1230 | pub unsafe fn release_resource(key: usize) { |
1231 | with_thread_data(|thread_data| { |
1232 | let resources = &mut (*thread_data.deadlock_data.resources.get()); |
1233 | |
1234 | // There is only one situation where we can fail to find the |
1235 | // resource: we are currently running TLS destructors and our |
1236 | // ThreadData has already been freed. There isn't much we can do |
1237 | // about it at this point, so just ignore it. |
1238 | if let Some(p) = resources.iter().rposition(|x| *x == key) { |
1239 | resources.swap_remove(p); |
1240 | } |
1241 | }); |
1242 | } |
1243 | |
1244 | pub fn check_deadlock() -> Vec<Vec<DeadlockedThread>> { |
1245 | unsafe { |
1246 | // fast pass |
1247 | if check_wait_graph_fast() { |
1248 | // double check |
1249 | check_wait_graph_slow() |
1250 | } else { |
1251 | Vec::new() |
1252 | } |
1253 | } |
1254 | } |
1255 | |
1256 | // Simple algorithm that builds a wait graph f the threads and the resources, |
1257 | // then checks for the presence of cycles (deadlocks). |
1258 | // This variant isn't precise as it doesn't lock the entire table before checking |
1259 | unsafe fn check_wait_graph_fast() -> bool { |
1260 | let table = get_hashtable(); |
1261 | let thread_count = NUM_THREADS.load(Ordering::Relaxed); |
1262 | let mut graph = DiGraphMap::<usize, ()>::with_capacity(thread_count * 2, thread_count * 2); |
1263 | |
1264 | for b in &(*table).entries[..] { |
1265 | b.mutex.lock(); |
1266 | let mut current = b.queue_head.get(); |
1267 | while !current.is_null() { |
1268 | if !(*current).parked_with_timeout.get() |
1269 | && !(*current).deadlock_data.deadlocked.get() |
1270 | { |
1271 | // .resources are waiting for their owner |
1272 | for &resource in &(*(*current).deadlock_data.resources.get()) { |
1273 | graph.add_edge(resource, current as usize, ()); |
1274 | } |
1275 | // owner waits for resource .key |
1276 | graph.add_edge(current as usize, (*current).key.load(Ordering::Relaxed), ()); |
1277 | } |
1278 | current = (*current).next_in_queue.get(); |
1279 | } |
1280 | // SAFETY: We hold the lock here, as required |
1281 | b.mutex.unlock(); |
1282 | } |
1283 | |
1284 | petgraph::algo::is_cyclic_directed(&graph) |
1285 | } |
1286 | |
1287 | #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)] |
1288 | enum WaitGraphNode { |
1289 | Thread(*const ThreadData), |
1290 | Resource(usize), |
1291 | } |
1292 | |
1293 | use self::WaitGraphNode::*; |
1294 | |
1295 | // Contrary to the _fast variant this locks the entries table before looking for cycles. |
1296 | // Returns all detected thread wait cycles. |
1297 | // Note that once a cycle is reported it's never reported again. |
1298 | unsafe fn check_wait_graph_slow() -> Vec<Vec<DeadlockedThread>> { |
1299 | static DEADLOCK_DETECTION_LOCK: WordLock = WordLock::new(); |
1300 | DEADLOCK_DETECTION_LOCK.lock(); |
1301 | |
1302 | let mut table = get_hashtable(); |
1303 | loop { |
1304 | // Lock all buckets in the old table |
1305 | for b in &table.entries[..] { |
1306 | b.mutex.lock(); |
1307 | } |
1308 | |
1309 | // Now check if our table is still the latest one. Another thread could |
1310 | // have grown the hash table between us getting and locking the hash table. |
1311 | let new_table = get_hashtable(); |
1312 | if new_table as *const _ == table as *const _ { |
1313 | break; |
1314 | } |
1315 | |
1316 | // Unlock buckets and try again |
1317 | for b in &table.entries[..] { |
1318 | // SAFETY: We hold the lock here, as required |
1319 | b.mutex.unlock(); |
1320 | } |
1321 | |
1322 | table = new_table; |
1323 | } |
1324 | |
1325 | let thread_count = NUM_THREADS.load(Ordering::Relaxed); |
1326 | let mut graph = |
1327 | DiGraphMap::<WaitGraphNode, ()>::with_capacity(thread_count * 2, thread_count * 2); |
1328 | |
1329 | for b in &table.entries[..] { |
1330 | let mut current = b.queue_head.get(); |
1331 | while !current.is_null() { |
1332 | if !(*current).parked_with_timeout.get() |
1333 | && !(*current).deadlock_data.deadlocked.get() |
1334 | { |
1335 | // .resources are waiting for their owner |
1336 | for &resource in &(*(*current).deadlock_data.resources.get()) { |
1337 | graph.add_edge(Resource(resource), Thread(current), ()); |
1338 | } |
1339 | // owner waits for resource .key |
1340 | graph.add_edge( |
1341 | Thread(current), |
1342 | Resource((*current).key.load(Ordering::Relaxed)), |
1343 | (), |
1344 | ); |
1345 | } |
1346 | current = (*current).next_in_queue.get(); |
1347 | } |
1348 | } |
1349 | |
1350 | for b in &table.entries[..] { |
1351 | // SAFETY: We hold the lock here, as required |
1352 | b.mutex.unlock(); |
1353 | } |
1354 | |
1355 | // find cycles |
1356 | let cycles = graph_cycles(&graph); |
1357 | |
1358 | let mut results = Vec::with_capacity(cycles.len()); |
1359 | |
1360 | for cycle in cycles { |
1361 | let (sender, receiver) = mpsc::channel(); |
1362 | for td in cycle { |
1363 | let bucket = lock_bucket((*td).key.load(Ordering::Relaxed)); |
1364 | (*td).deadlock_data.deadlocked.set(true); |
1365 | *(*td).deadlock_data.backtrace_sender.get() = Some(sender.clone()); |
1366 | let handle = (*td).parker.unpark_lock(); |
1367 | // SAFETY: We hold the lock here, as required |
1368 | bucket.mutex.unlock(); |
1369 | // unpark the deadlocked thread! |
1370 | // on unpark it'll notice the deadlocked flag and report back |
1371 | handle.unpark(); |
1372 | } |
1373 | // make sure to drop our sender before collecting results |
1374 | drop(sender); |
1375 | results.push(receiver.iter().collect()); |
1376 | } |
1377 | |
1378 | DEADLOCK_DETECTION_LOCK.unlock(); |
1379 | |
1380 | results |
1381 | } |
1382 | |
1383 | // normalize a cycle to start with the "smallest" node |
1384 | fn normalize_cycle<T: Ord + Copy + Clone>(input: &[T]) -> Vec<T> { |
1385 | let min_pos = input |
1386 | .iter() |
1387 | .enumerate() |
1388 | .min_by_key(|&(_, &t)| t) |
1389 | .map(|(p, _)| p) |
1390 | .unwrap_or(0); |
1391 | input |
1392 | .iter() |
1393 | .cycle() |
1394 | .skip(min_pos) |
1395 | .take(input.len()) |
1396 | .cloned() |
1397 | .collect() |
1398 | } |
1399 | |
1400 | // returns all thread cycles in the wait graph |
1401 | fn graph_cycles(g: &DiGraphMap<WaitGraphNode, ()>) -> Vec<Vec<*const ThreadData>> { |
1402 | use petgraph::visit::depth_first_search; |
1403 | use petgraph::visit::DfsEvent; |
1404 | use petgraph::visit::NodeIndexable; |
1405 | |
1406 | let mut cycles = HashSet::new(); |
1407 | let mut path = Vec::with_capacity(g.node_bound()); |
1408 | // start from threads to get the correct threads cycle |
1409 | let threads = g |
1410 | .nodes() |
1411 | .filter(|n| if let &Thread(_) = n { true } else { false }); |
1412 | |
1413 | depth_first_search(g, threads, |e| match e { |
1414 | DfsEvent::Discover(Thread(n), _) => path.push(n), |
1415 | DfsEvent::Finish(Thread(_), _) => { |
1416 | path.pop(); |
1417 | } |
1418 | DfsEvent::BackEdge(_, Thread(n)) => { |
1419 | let from = path.iter().rposition(|&i| i == n).unwrap(); |
1420 | cycles.insert(normalize_cycle(&path[from..])); |
1421 | } |
1422 | _ => (), |
1423 | }); |
1424 | |
1425 | cycles.iter().cloned().collect() |
1426 | } |
1427 | } |
1428 | |
1429 | #[cfg (test)] |
1430 | mod tests { |
1431 | use super::{ThreadData, DEFAULT_PARK_TOKEN, DEFAULT_UNPARK_TOKEN}; |
1432 | use std::{ |
1433 | ptr, |
1434 | sync::{ |
1435 | atomic::{AtomicIsize, AtomicPtr, AtomicUsize, Ordering}, |
1436 | Arc, |
1437 | }, |
1438 | thread, |
1439 | time::Duration, |
1440 | }; |
1441 | |
1442 | /// Calls a closure for every `ThreadData` currently parked on a given key |
1443 | fn for_each(key: usize, mut f: impl FnMut(&ThreadData)) { |
1444 | let bucket = super::lock_bucket(key); |
1445 | |
1446 | let mut current: *const ThreadData = bucket.queue_head.get(); |
1447 | while !current.is_null() { |
1448 | let current_ref = unsafe { &*current }; |
1449 | if current_ref.key.load(Ordering::Relaxed) == key { |
1450 | f(current_ref); |
1451 | } |
1452 | current = current_ref.next_in_queue.get(); |
1453 | } |
1454 | |
1455 | // SAFETY: We hold the lock here, as required |
1456 | unsafe { bucket.mutex.unlock() }; |
1457 | } |
1458 | |
1459 | macro_rules! test { |
1460 | ( $( $name:ident( |
1461 | repeats: $repeats:expr, |
1462 | latches: $latches:expr, |
1463 | delay: $delay:expr, |
1464 | threads: $threads:expr, |
1465 | single_unparks: $single_unparks:expr); |
1466 | )* ) => { |
1467 | $(#[test] |
1468 | fn $name() { |
1469 | let delay = Duration::from_micros($delay); |
1470 | for _ in 0..$repeats { |
1471 | run_parking_test($latches, delay, $threads, $single_unparks); |
1472 | } |
1473 | })* |
1474 | }; |
1475 | } |
1476 | |
1477 | test! { |
1478 | unpark_all_one_fast( |
1479 | repeats: 1000, latches: 1, delay: 0, threads: 1, single_unparks: 0 |
1480 | ); |
1481 | unpark_all_hundred_fast( |
1482 | repeats: 100, latches: 1, delay: 0, threads: 100, single_unparks: 0 |
1483 | ); |
1484 | unpark_one_one_fast( |
1485 | repeats: 1000, latches: 1, delay: 0, threads: 1, single_unparks: 1 |
1486 | ); |
1487 | unpark_one_hundred_fast( |
1488 | repeats: 20, latches: 1, delay: 0, threads: 100, single_unparks: 100 |
1489 | ); |
1490 | unpark_one_fifty_then_fifty_all_fast( |
1491 | repeats: 50, latches: 1, delay: 0, threads: 100, single_unparks: 50 |
1492 | ); |
1493 | unpark_all_one( |
1494 | repeats: 100, latches: 1, delay: 10000, threads: 1, single_unparks: 0 |
1495 | ); |
1496 | unpark_all_hundred( |
1497 | repeats: 100, latches: 1, delay: 10000, threads: 100, single_unparks: 0 |
1498 | ); |
1499 | unpark_one_one( |
1500 | repeats: 10, latches: 1, delay: 10000, threads: 1, single_unparks: 1 |
1501 | ); |
1502 | unpark_one_fifty( |
1503 | repeats: 1, latches: 1, delay: 10000, threads: 50, single_unparks: 50 |
1504 | ); |
1505 | unpark_one_fifty_then_fifty_all( |
1506 | repeats: 2, latches: 1, delay: 10000, threads: 100, single_unparks: 50 |
1507 | ); |
1508 | hundred_unpark_all_one_fast( |
1509 | repeats: 100, latches: 100, delay: 0, threads: 1, single_unparks: 0 |
1510 | ); |
1511 | hundred_unpark_all_one( |
1512 | repeats: 1, latches: 100, delay: 10000, threads: 1, single_unparks: 0 |
1513 | ); |
1514 | } |
1515 | |
1516 | fn run_parking_test( |
1517 | num_latches: usize, |
1518 | delay: Duration, |
1519 | num_threads: usize, |
1520 | num_single_unparks: usize, |
1521 | ) { |
1522 | let mut tests = Vec::with_capacity(num_latches); |
1523 | |
1524 | for _ in 0..num_latches { |
1525 | let test = Arc::new(SingleLatchTest::new(num_threads)); |
1526 | let mut threads = Vec::with_capacity(num_threads); |
1527 | for _ in 0..num_threads { |
1528 | let test = test.clone(); |
1529 | threads.push(thread::spawn(move || test.run())); |
1530 | } |
1531 | tests.push((test, threads)); |
1532 | } |
1533 | |
1534 | for unpark_index in 0..num_single_unparks { |
1535 | thread::sleep(delay); |
1536 | for (test, _) in &tests { |
1537 | test.unpark_one(unpark_index); |
1538 | } |
1539 | } |
1540 | |
1541 | for (test, threads) in tests { |
1542 | test.finish(num_single_unparks); |
1543 | for thread in threads { |
1544 | thread.join().expect("Test thread panic" ); |
1545 | } |
1546 | } |
1547 | } |
1548 | |
1549 | struct SingleLatchTest { |
1550 | semaphore: AtomicIsize, |
1551 | num_awake: AtomicUsize, |
1552 | /// Holds the pointer to the last *unprocessed* woken up thread. |
1553 | last_awoken: AtomicPtr<ThreadData>, |
1554 | /// Total number of threads participating in this test. |
1555 | num_threads: usize, |
1556 | } |
1557 | |
1558 | impl SingleLatchTest { |
1559 | pub fn new(num_threads: usize) -> Self { |
1560 | Self { |
1561 | // This implements a fair (FIFO) semaphore, and it starts out unavailable. |
1562 | semaphore: AtomicIsize::new(0), |
1563 | num_awake: AtomicUsize::new(0), |
1564 | last_awoken: AtomicPtr::new(ptr::null_mut()), |
1565 | num_threads, |
1566 | } |
1567 | } |
1568 | |
1569 | pub fn run(&self) { |
1570 | // Get one slot from the semaphore |
1571 | self.down(); |
1572 | |
1573 | // Report back to the test verification code that this thread woke up |
1574 | let this_thread_ptr = super::with_thread_data(|t| t as *const _ as *mut _); |
1575 | self.last_awoken.store(this_thread_ptr, Ordering::SeqCst); |
1576 | self.num_awake.fetch_add(1, Ordering::SeqCst); |
1577 | } |
1578 | |
1579 | pub fn unpark_one(&self, single_unpark_index: usize) { |
1580 | // last_awoken should be null at all times except between self.up() and at the bottom |
1581 | // of this method where it's reset to null again |
1582 | assert!(self.last_awoken.load(Ordering::SeqCst).is_null()); |
1583 | |
1584 | let mut queue: Vec<*mut ThreadData> = Vec::with_capacity(self.num_threads); |
1585 | for_each(self.semaphore_addr(), |thread_data| { |
1586 | queue.push(thread_data as *const _ as *mut _); |
1587 | }); |
1588 | assert!(queue.len() <= self.num_threads - single_unpark_index); |
1589 | |
1590 | let num_awake_before_up = self.num_awake.load(Ordering::SeqCst); |
1591 | |
1592 | self.up(); |
1593 | |
1594 | // Wait for a parked thread to wake up and update num_awake + last_awoken. |
1595 | while self.num_awake.load(Ordering::SeqCst) != num_awake_before_up + 1 { |
1596 | thread::yield_now(); |
1597 | } |
1598 | |
1599 | // At this point the other thread should have set last_awoken inside the run() method |
1600 | let last_awoken = self.last_awoken.load(Ordering::SeqCst); |
1601 | assert!(!last_awoken.is_null()); |
1602 | if !queue.is_empty() && queue[0] != last_awoken { |
1603 | panic!( |
1604 | "Woke up wrong thread: \n\tqueue: {:?} \n\tlast awoken: {:?}" , |
1605 | queue, last_awoken |
1606 | ); |
1607 | } |
1608 | self.last_awoken.store(ptr::null_mut(), Ordering::SeqCst); |
1609 | } |
1610 | |
1611 | pub fn finish(&self, num_single_unparks: usize) { |
1612 | // The amount of threads not unparked via unpark_one |
1613 | let mut num_threads_left = self.num_threads.checked_sub(num_single_unparks).unwrap(); |
1614 | |
1615 | // Wake remaining threads up with unpark_all. Has to be in a loop, because there might |
1616 | // still be threads that has not yet parked. |
1617 | while num_threads_left > 0 { |
1618 | let mut num_waiting_on_address = 0; |
1619 | for_each(self.semaphore_addr(), |_thread_data| { |
1620 | num_waiting_on_address += 1; |
1621 | }); |
1622 | assert!(num_waiting_on_address <= num_threads_left); |
1623 | |
1624 | let num_awake_before_unpark = self.num_awake.load(Ordering::SeqCst); |
1625 | |
1626 | let num_unparked = |
1627 | unsafe { super::unpark_all(self.semaphore_addr(), DEFAULT_UNPARK_TOKEN) }; |
1628 | assert!(num_unparked >= num_waiting_on_address); |
1629 | assert!(num_unparked <= num_threads_left); |
1630 | |
1631 | // Wait for all unparked threads to wake up and update num_awake + last_awoken. |
1632 | while self.num_awake.load(Ordering::SeqCst) |
1633 | != num_awake_before_unpark + num_unparked |
1634 | { |
1635 | thread::yield_now() |
1636 | } |
1637 | |
1638 | num_threads_left = num_threads_left.checked_sub(num_unparked).unwrap(); |
1639 | } |
1640 | // By now, all threads should have been woken up |
1641 | assert_eq!(self.num_awake.load(Ordering::SeqCst), self.num_threads); |
1642 | |
1643 | // Make sure no thread is parked on our semaphore address |
1644 | let mut num_waiting_on_address = 0; |
1645 | for_each(self.semaphore_addr(), |_thread_data| { |
1646 | num_waiting_on_address += 1; |
1647 | }); |
1648 | assert_eq!(num_waiting_on_address, 0); |
1649 | } |
1650 | |
1651 | pub fn down(&self) { |
1652 | let old_semaphore_value = self.semaphore.fetch_sub(1, Ordering::SeqCst); |
1653 | |
1654 | if old_semaphore_value > 0 { |
1655 | // We acquired the semaphore. Done. |
1656 | return; |
1657 | } |
1658 | |
1659 | // We need to wait. |
1660 | let validate = || true; |
1661 | let before_sleep = || {}; |
1662 | let timed_out = |_, _| {}; |
1663 | unsafe { |
1664 | super::park( |
1665 | self.semaphore_addr(), |
1666 | validate, |
1667 | before_sleep, |
1668 | timed_out, |
1669 | DEFAULT_PARK_TOKEN, |
1670 | None, |
1671 | ); |
1672 | } |
1673 | } |
1674 | |
1675 | pub fn up(&self) { |
1676 | let old_semaphore_value = self.semaphore.fetch_add(1, Ordering::SeqCst); |
1677 | |
1678 | // Check if anyone was waiting on the semaphore. If they were, then pass ownership to them. |
1679 | if old_semaphore_value < 0 { |
1680 | // We need to continue until we have actually unparked someone. It might be that |
1681 | // the thread we want to pass ownership to has decremented the semaphore counter, |
1682 | // but not yet parked. |
1683 | loop { |
1684 | match unsafe { |
1685 | super::unpark_one(self.semaphore_addr(), |_| DEFAULT_UNPARK_TOKEN) |
1686 | .unparked_threads |
1687 | } { |
1688 | 1 => break, |
1689 | 0 => (), |
1690 | i => panic!("Should not wake up {} threads" , i), |
1691 | } |
1692 | } |
1693 | } |
1694 | } |
1695 | |
1696 | fn semaphore_addr(&self) -> usize { |
1697 | &self.semaphore as *const _ as usize |
1698 | } |
1699 | } |
1700 | } |
1701 | |