| 1 | // Copyright 2016 Amanieu d'Antras |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or |
| 4 | // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or |
| 5 | // http://opensource.org/licenses/MIT>, at your option. This file may not be |
| 6 | // copied, modified, or distributed except according to those terms. |
| 7 | use crate::thread_parker::{ThreadParker, ThreadParkerT, UnparkHandleT}; |
| 8 | use crate::util::UncheckedOptionExt; |
| 9 | use crate::word_lock::WordLock; |
| 10 | use core::{ |
| 11 | cell::{Cell, UnsafeCell}, |
| 12 | ptr, |
| 13 | sync::atomic::{AtomicPtr, AtomicUsize, Ordering}, |
| 14 | }; |
| 15 | use smallvec::SmallVec; |
| 16 | use std::time::{Duration, Instant}; |
| 17 | |
| 18 | // Don't use Instant on wasm32-unknown-unknown, it just panics. |
| 19 | cfg_if::cfg_if! { |
| 20 | if #[cfg(all( |
| 21 | target_family = "wasm" , |
| 22 | target_os = "unknown" , |
| 23 | target_vendor = "unknown" |
| 24 | ))] { |
| 25 | #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] |
| 26 | struct TimeoutInstant; |
| 27 | impl TimeoutInstant { |
| 28 | fn now() -> TimeoutInstant { |
| 29 | TimeoutInstant |
| 30 | } |
| 31 | } |
| 32 | impl core::ops::Add<Duration> for TimeoutInstant { |
| 33 | type Output = Self; |
| 34 | fn add(self, _rhs: Duration) -> Self::Output { |
| 35 | TimeoutInstant |
| 36 | } |
| 37 | } |
| 38 | } else { |
| 39 | use std::time::Instant as TimeoutInstant; |
| 40 | } |
| 41 | } |
| 42 | |
| 43 | static NUM_THREADS: AtomicUsize = AtomicUsize::new(0); |
| 44 | |
| 45 | /// Holds the pointer to the currently active `HashTable`. |
| 46 | /// |
| 47 | /// # Safety |
| 48 | /// |
| 49 | /// Except for the initial value of null, it must always point to a valid `HashTable` instance. |
| 50 | /// Any `HashTable` this global static has ever pointed to must never be freed. |
| 51 | static HASHTABLE: AtomicPtr<HashTable> = AtomicPtr::new(ptr::null_mut()); |
| 52 | |
| 53 | // Even with 3x more buckets than threads, the memory overhead per thread is |
| 54 | // still only a few hundred bytes per thread. |
| 55 | const LOAD_FACTOR: usize = 3; |
| 56 | |
| 57 | struct HashTable { |
| 58 | // Hash buckets for the table |
| 59 | entries: Box<[Bucket]>, |
| 60 | |
| 61 | // Number of bits used for the hash function |
| 62 | hash_bits: u32, |
| 63 | |
| 64 | // Previous table. This is only kept to keep leak detectors happy. |
| 65 | _prev: *const HashTable, |
| 66 | } |
| 67 | |
| 68 | impl HashTable { |
| 69 | #[inline ] |
| 70 | fn new(num_threads: usize, prev: *const HashTable) -> Box<HashTable> { |
| 71 | let new_size: usize = (num_threads * LOAD_FACTOR).next_power_of_two(); |
| 72 | let hash_bits: u32 = 0usize.leading_zeros() - new_size.leading_zeros() - 1; |
| 73 | |
| 74 | let now: Instant = TimeoutInstant::now(); |
| 75 | let mut entries: Vec = Vec::with_capacity(new_size); |
| 76 | for i: usize in 0..new_size { |
| 77 | // We must ensure the seed is not zero |
| 78 | entries.push(Bucket::new(timeout:now, seed:i as u32 + 1)); |
| 79 | } |
| 80 | |
| 81 | Box::new(HashTable { |
| 82 | entries: entries.into_boxed_slice(), |
| 83 | hash_bits, |
| 84 | _prev: prev, |
| 85 | }) |
| 86 | } |
| 87 | } |
| 88 | |
| 89 | #[repr (align(64))] |
| 90 | struct Bucket { |
| 91 | // Lock protecting the queue |
| 92 | mutex: WordLock, |
| 93 | |
| 94 | // Linked list of threads waiting on this bucket |
| 95 | queue_head: Cell<*const ThreadData>, |
| 96 | queue_tail: Cell<*const ThreadData>, |
| 97 | |
| 98 | // Next time at which point be_fair should be set |
| 99 | fair_timeout: UnsafeCell<FairTimeout>, |
| 100 | } |
| 101 | |
| 102 | impl Bucket { |
| 103 | #[inline ] |
| 104 | pub fn new(timeout: TimeoutInstant, seed: u32) -> Self { |
| 105 | Self { |
| 106 | mutex: WordLock::new(), |
| 107 | queue_head: Cell::new(ptr::null()), |
| 108 | queue_tail: Cell::new(ptr::null()), |
| 109 | fair_timeout: UnsafeCell::new(FairTimeout::new(timeout, seed)), |
| 110 | } |
| 111 | } |
| 112 | } |
| 113 | |
| 114 | struct FairTimeout { |
| 115 | // Next time at which point be_fair should be set |
| 116 | timeout: TimeoutInstant, |
| 117 | |
| 118 | // the PRNG state for calculating the next timeout |
| 119 | seed: u32, |
| 120 | } |
| 121 | |
| 122 | impl FairTimeout { |
| 123 | #[inline ] |
| 124 | fn new(timeout: TimeoutInstant, seed: u32) -> FairTimeout { |
| 125 | FairTimeout { timeout, seed } |
| 126 | } |
| 127 | |
| 128 | // Determine whether we should force a fair unlock, and update the timeout |
| 129 | #[inline ] |
| 130 | fn should_timeout(&mut self) -> bool { |
| 131 | let now = TimeoutInstant::now(); |
| 132 | if now > self.timeout { |
| 133 | // Time between 0 and 1ms. |
| 134 | let nanos = self.gen_u32() % 1_000_000; |
| 135 | self.timeout = now + Duration::new(0, nanos); |
| 136 | true |
| 137 | } else { |
| 138 | false |
| 139 | } |
| 140 | } |
| 141 | |
| 142 | // Pseudorandom number generator from the "Xorshift RNGs" paper by George Marsaglia. |
| 143 | fn gen_u32(&mut self) -> u32 { |
| 144 | self.seed ^= self.seed << 13; |
| 145 | self.seed ^= self.seed >> 17; |
| 146 | self.seed ^= self.seed << 5; |
| 147 | self.seed |
| 148 | } |
| 149 | } |
| 150 | |
| 151 | struct ThreadData { |
| 152 | parker: ThreadParker, |
| 153 | |
| 154 | // Key that this thread is sleeping on. This may change if the thread is |
| 155 | // requeued to a different key. |
| 156 | key: AtomicUsize, |
| 157 | |
| 158 | // Linked list of parked threads in a bucket |
| 159 | next_in_queue: Cell<*const ThreadData>, |
| 160 | |
| 161 | // UnparkToken passed to this thread when it is unparked |
| 162 | unpark_token: Cell<UnparkToken>, |
| 163 | |
| 164 | // ParkToken value set by the thread when it was parked |
| 165 | park_token: Cell<ParkToken>, |
| 166 | |
| 167 | // Is the thread parked with a timeout? |
| 168 | parked_with_timeout: Cell<bool>, |
| 169 | |
| 170 | // Extra data for deadlock detection |
| 171 | #[cfg (feature = "deadlock_detection" )] |
| 172 | deadlock_data: deadlock::DeadlockData, |
| 173 | } |
| 174 | |
| 175 | impl ThreadData { |
| 176 | fn new() -> ThreadData { |
| 177 | // Keep track of the total number of live ThreadData objects and resize |
| 178 | // the hash table accordingly. |
| 179 | let num_threads: usize = NUM_THREADS.fetch_add(val:1, order:Ordering::Relaxed) + 1; |
| 180 | grow_hashtable(num_threads); |
| 181 | |
| 182 | ThreadData { |
| 183 | parker: ThreadParker::new(), |
| 184 | key: AtomicUsize::new(0), |
| 185 | next_in_queue: Cell::new(ptr::null()), |
| 186 | unpark_token: Cell::new(DEFAULT_UNPARK_TOKEN), |
| 187 | park_token: Cell::new(DEFAULT_PARK_TOKEN), |
| 188 | parked_with_timeout: Cell::new(false), |
| 189 | #[cfg (feature = "deadlock_detection" )] |
| 190 | deadlock_data: deadlock::DeadlockData::new(), |
| 191 | } |
| 192 | } |
| 193 | } |
| 194 | |
| 195 | // Invokes the given closure with a reference to the current thread `ThreadData`. |
| 196 | #[inline (always)] |
| 197 | fn with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T { |
| 198 | // Unlike word_lock::ThreadData, parking_lot::ThreadData is always expensive |
| 199 | // to construct. Try to use a thread-local version if possible. Otherwise just |
| 200 | // create a ThreadData on the stack |
| 201 | let mut thread_data_storage: Option = None; |
| 202 | thread_local!(static THREAD_DATA: ThreadData = ThreadData::new()); |
| 203 | let thread_data_ptr: *const ThreadData = THREAD_DATA |
| 204 | .try_with(|x| x as *const ThreadData) |
| 205 | .unwrap_or_else(|_| thread_data_storage.get_or_insert_with(ThreadData::new)); |
| 206 | |
| 207 | f(unsafe { &*thread_data_ptr }) |
| 208 | } |
| 209 | |
| 210 | impl Drop for ThreadData { |
| 211 | fn drop(&mut self) { |
| 212 | NUM_THREADS.fetch_sub(val:1, order:Ordering::Relaxed); |
| 213 | } |
| 214 | } |
| 215 | |
| 216 | /// Returns a reference to the latest hash table, creating one if it doesn't exist yet. |
| 217 | /// The reference is valid forever. However, the `HashTable` it references might become stale |
| 218 | /// at any point. Meaning it still exists, but it is not the instance in active use. |
| 219 | #[inline ] |
| 220 | fn get_hashtable() -> &'static HashTable { |
| 221 | let table: *mut HashTable = HASHTABLE.load(order:Ordering::Acquire); |
| 222 | |
| 223 | // If there is no table, create one |
| 224 | if table.is_null() { |
| 225 | create_hashtable() |
| 226 | } else { |
| 227 | // SAFETY: when not null, `HASHTABLE` always points to a `HashTable` that is never freed. |
| 228 | unsafe { &*table } |
| 229 | } |
| 230 | } |
| 231 | |
| 232 | /// Returns a reference to the latest hash table, creating one if it doesn't exist yet. |
| 233 | /// The reference is valid forever. However, the `HashTable` it references might become stale |
| 234 | /// at any point. Meaning it still exists, but it is not the instance in active use. |
| 235 | #[cold ] |
| 236 | fn create_hashtable() -> &'static HashTable { |
| 237 | let new_table: *mut HashTable = Box::into_raw(HashTable::new(LOAD_FACTOR, prev:ptr::null())); |
| 238 | |
| 239 | // If this fails then it means some other thread created the hash table first. |
| 240 | let table: *mut HashTable = match HASHTABLE.compare_exchange( |
| 241 | current:ptr::null_mut(), |
| 242 | new_table, |
| 243 | success:Ordering::AcqRel, |
| 244 | failure:Ordering::Acquire, |
| 245 | ) { |
| 246 | Ok(_) => new_table, |
| 247 | Err(old_table: *mut HashTable) => { |
| 248 | // Free the table we created |
| 249 | // SAFETY: `new_table` is created from `Box::into_raw` above and only freed here. |
| 250 | unsafe { |
| 251 | let _ = Box::from_raw(new_table); |
| 252 | } |
| 253 | old_table |
| 254 | } |
| 255 | }; |
| 256 | // SAFETY: The `HashTable` behind `table` is never freed. It is either the table pointer we |
| 257 | // created here, or it is one loaded from `HASHTABLE`. |
| 258 | unsafe { &*table } |
| 259 | } |
| 260 | |
| 261 | // Grow the hash table so that it is big enough for the given number of threads. |
| 262 | // This isn't performance-critical since it is only done when a ThreadData is |
| 263 | // created, which only happens once per thread. |
| 264 | fn grow_hashtable(num_threads: usize) { |
| 265 | // Lock all buckets in the existing table and get a reference to it |
| 266 | let old_table = loop { |
| 267 | let table = get_hashtable(); |
| 268 | |
| 269 | // Check if we need to resize the existing table |
| 270 | if table.entries.len() >= LOAD_FACTOR * num_threads { |
| 271 | return; |
| 272 | } |
| 273 | |
| 274 | // Lock all buckets in the old table |
| 275 | for bucket in &table.entries[..] { |
| 276 | bucket.mutex.lock(); |
| 277 | } |
| 278 | |
| 279 | // Now check if our table is still the latest one. Another thread could |
| 280 | // have grown the hash table between us reading HASHTABLE and locking |
| 281 | // the buckets. |
| 282 | if HASHTABLE.load(Ordering::Relaxed) == table as *const _ as *mut _ { |
| 283 | break table; |
| 284 | } |
| 285 | |
| 286 | // Unlock buckets and try again |
| 287 | for bucket in &table.entries[..] { |
| 288 | // SAFETY: We hold the lock here, as required |
| 289 | unsafe { bucket.mutex.unlock() }; |
| 290 | } |
| 291 | }; |
| 292 | |
| 293 | // Create the new table |
| 294 | let mut new_table = HashTable::new(num_threads, old_table); |
| 295 | |
| 296 | // Move the entries from the old table to the new one |
| 297 | for bucket in &old_table.entries[..] { |
| 298 | // SAFETY: The park, unpark* and check_wait_graph_fast functions create only correct linked |
| 299 | // lists. All `ThreadData` instances in these lists will remain valid as long as they are |
| 300 | // present in the lists, meaning as long as their threads are parked. |
| 301 | unsafe { rehash_bucket_into(bucket, &mut new_table) }; |
| 302 | } |
| 303 | |
| 304 | // Publish the new table. No races are possible at this point because |
| 305 | // any other thread trying to grow the hash table is blocked on the bucket |
| 306 | // locks in the old table. |
| 307 | HASHTABLE.store(Box::into_raw(new_table), Ordering::Release); |
| 308 | |
| 309 | // Unlock all buckets in the old table |
| 310 | for bucket in &old_table.entries[..] { |
| 311 | // SAFETY: We hold the lock here, as required |
| 312 | unsafe { bucket.mutex.unlock() }; |
| 313 | } |
| 314 | } |
| 315 | |
| 316 | /// Iterate through all `ThreadData` objects in the bucket and insert them into the given table |
| 317 | /// in the bucket their key correspond to for this table. |
| 318 | /// |
| 319 | /// # Safety |
| 320 | /// |
| 321 | /// The given `bucket` must have a correctly constructed linked list under `queue_head`, containing |
| 322 | /// `ThreadData` instances that must stay valid at least as long as the given `table` is in use. |
| 323 | /// |
| 324 | /// The given `table` must only contain buckets with correctly constructed linked lists. |
| 325 | unsafe fn rehash_bucket_into(bucket: &'static Bucket, table: &mut HashTable) { |
| 326 | let mut current: *const ThreadData = bucket.queue_head.get(); |
| 327 | while !current.is_null() { |
| 328 | let next: *const ThreadData = (*current).next_in_queue.get(); |
| 329 | let hash: usize = hash((*current).key.load(order:Ordering::Relaxed), table.hash_bits); |
| 330 | if table.entries[hash].queue_tail.get().is_null() { |
| 331 | table.entries[hash].queue_head.set(val:current); |
| 332 | } else { |
| 333 | (*table.entries[hash].queue_tail.get()) |
| 334 | .next_in_queue |
| 335 | .set(val:current); |
| 336 | } |
| 337 | table.entries[hash].queue_tail.set(val:current); |
| 338 | (*current).next_in_queue.set(val:ptr::null()); |
| 339 | current = next; |
| 340 | } |
| 341 | } |
| 342 | |
| 343 | // Hash function for addresses |
| 344 | #[cfg (target_pointer_width = "32" )] |
| 345 | #[inline ] |
| 346 | fn hash(key: usize, bits: u32) -> usize { |
| 347 | key.wrapping_mul(0x9E3779B9) >> (32 - bits) |
| 348 | } |
| 349 | #[cfg (target_pointer_width = "64" )] |
| 350 | #[inline ] |
| 351 | fn hash(key: usize, bits: u32) -> usize { |
| 352 | key.wrapping_mul(0x9E3779B97F4A7C15) >> (64 - bits) |
| 353 | } |
| 354 | |
| 355 | /// Locks the bucket for the given key and returns a reference to it. |
| 356 | /// The returned bucket must be unlocked again in order to not cause deadlocks. |
| 357 | #[inline ] |
| 358 | fn lock_bucket(key: usize) -> &'static Bucket { |
| 359 | loop { |
| 360 | let hashtable: &'static HashTable = get_hashtable(); |
| 361 | |
| 362 | let hash: usize = hash(key, hashtable.hash_bits); |
| 363 | let bucket: &Bucket = &hashtable.entries[hash]; |
| 364 | |
| 365 | // Lock the bucket |
| 366 | bucket.mutex.lock(); |
| 367 | |
| 368 | // If no other thread has rehashed the table before we grabbed the lock |
| 369 | // then we are good to go! The lock we grabbed prevents any rehashes. |
| 370 | if HASHTABLE.load(order:Ordering::Relaxed) == hashtable as *const _ as *mut _ { |
| 371 | return bucket; |
| 372 | } |
| 373 | |
| 374 | // Unlock the bucket and try again |
| 375 | // SAFETY: We hold the lock here, as required |
| 376 | unsafe { bucket.mutex.unlock() }; |
| 377 | } |
| 378 | } |
| 379 | |
| 380 | /// Locks the bucket for the given key and returns a reference to it. But checks that the key |
| 381 | /// hasn't been changed in the meantime due to a requeue. |
| 382 | /// The returned bucket must be unlocked again in order to not cause deadlocks. |
| 383 | #[inline ] |
| 384 | fn lock_bucket_checked(key: &AtomicUsize) -> (usize, &'static Bucket) { |
| 385 | loop { |
| 386 | let hashtable = get_hashtable(); |
| 387 | let current_key = key.load(Ordering::Relaxed); |
| 388 | |
| 389 | let hash = hash(current_key, hashtable.hash_bits); |
| 390 | let bucket = &hashtable.entries[hash]; |
| 391 | |
| 392 | // Lock the bucket |
| 393 | bucket.mutex.lock(); |
| 394 | |
| 395 | // Check that both the hash table and key are correct while the bucket |
| 396 | // is locked. Note that the key can't change once we locked the proper |
| 397 | // bucket for it, so we just keep trying until we have the correct key. |
| 398 | if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ |
| 399 | && key.load(Ordering::Relaxed) == current_key |
| 400 | { |
| 401 | return (current_key, bucket); |
| 402 | } |
| 403 | |
| 404 | // Unlock the bucket and try again |
| 405 | // SAFETY: We hold the lock here, as required |
| 406 | unsafe { bucket.mutex.unlock() }; |
| 407 | } |
| 408 | } |
| 409 | |
| 410 | /// Locks the two buckets for the given pair of keys and returns references to them. |
| 411 | /// The returned buckets must be unlocked again in order to not cause deadlocks. |
| 412 | /// |
| 413 | /// If both keys hash to the same value, both returned references will be to the same bucket. Be |
| 414 | /// careful to only unlock it once in this case, always use `unlock_bucket_pair`. |
| 415 | #[inline ] |
| 416 | fn lock_bucket_pair(key1: usize, key2: usize) -> (&'static Bucket, &'static Bucket) { |
| 417 | loop { |
| 418 | let hashtable = get_hashtable(); |
| 419 | |
| 420 | let hash1 = hash(key1, hashtable.hash_bits); |
| 421 | let hash2 = hash(key2, hashtable.hash_bits); |
| 422 | |
| 423 | // Get the bucket at the lowest hash/index first |
| 424 | let bucket1 = if hash1 <= hash2 { |
| 425 | &hashtable.entries[hash1] |
| 426 | } else { |
| 427 | &hashtable.entries[hash2] |
| 428 | }; |
| 429 | |
| 430 | // Lock the first bucket |
| 431 | bucket1.mutex.lock(); |
| 432 | |
| 433 | // If no other thread has rehashed the table before we grabbed the lock |
| 434 | // then we are good to go! The lock we grabbed prevents any rehashes. |
| 435 | if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ { |
| 436 | // Now lock the second bucket and return the two buckets |
| 437 | if hash1 == hash2 { |
| 438 | return (bucket1, bucket1); |
| 439 | } else if hash1 < hash2 { |
| 440 | let bucket2 = &hashtable.entries[hash2]; |
| 441 | bucket2.mutex.lock(); |
| 442 | return (bucket1, bucket2); |
| 443 | } else { |
| 444 | let bucket2 = &hashtable.entries[hash1]; |
| 445 | bucket2.mutex.lock(); |
| 446 | return (bucket2, bucket1); |
| 447 | } |
| 448 | } |
| 449 | |
| 450 | // Unlock the bucket and try again |
| 451 | // SAFETY: We hold the lock here, as required |
| 452 | unsafe { bucket1.mutex.unlock() }; |
| 453 | } |
| 454 | } |
| 455 | |
| 456 | /// Unlock a pair of buckets |
| 457 | /// |
| 458 | /// # Safety |
| 459 | /// |
| 460 | /// Both buckets must be locked |
| 461 | #[inline ] |
| 462 | unsafe fn unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket) { |
| 463 | bucket1.mutex.unlock(); |
| 464 | if !ptr::eq(a:bucket1, b:bucket2) { |
| 465 | bucket2.mutex.unlock(); |
| 466 | } |
| 467 | } |
| 468 | |
| 469 | /// Result of a park operation. |
| 470 | #[derive (Copy, Clone, Eq, PartialEq, Debug)] |
| 471 | pub enum ParkResult { |
| 472 | /// We were unparked by another thread with the given token. |
| 473 | Unparked(UnparkToken), |
| 474 | |
| 475 | /// The validation callback returned false. |
| 476 | Invalid, |
| 477 | |
| 478 | /// The timeout expired. |
| 479 | TimedOut, |
| 480 | } |
| 481 | |
| 482 | impl ParkResult { |
| 483 | /// Returns true if we were unparked by another thread. |
| 484 | #[inline ] |
| 485 | pub fn is_unparked(self) -> bool { |
| 486 | if let ParkResult::Unparked(_) = self { |
| 487 | true |
| 488 | } else { |
| 489 | false |
| 490 | } |
| 491 | } |
| 492 | } |
| 493 | |
| 494 | /// Result of an unpark operation. |
| 495 | #[derive (Copy, Clone, Default, Eq, PartialEq, Debug)] |
| 496 | pub struct UnparkResult { |
| 497 | /// The number of threads that were unparked. |
| 498 | pub unparked_threads: usize, |
| 499 | |
| 500 | /// The number of threads that were requeued. |
| 501 | pub requeued_threads: usize, |
| 502 | |
| 503 | /// Whether there are any threads remaining in the queue. This only returns |
| 504 | /// true if a thread was unparked. |
| 505 | pub have_more_threads: bool, |
| 506 | |
| 507 | /// This is set to true on average once every 0.5ms for any given key. It |
| 508 | /// should be used to switch to a fair unlocking mechanism for a particular |
| 509 | /// unlock. |
| 510 | pub be_fair: bool, |
| 511 | |
| 512 | /// Private field so new fields can be added without breakage. |
| 513 | _sealed: (), |
| 514 | } |
| 515 | |
| 516 | /// Operation that `unpark_requeue` should perform. |
| 517 | #[derive (Copy, Clone, Eq, PartialEq, Debug)] |
| 518 | pub enum RequeueOp { |
| 519 | /// Abort the operation without doing anything. |
| 520 | Abort, |
| 521 | |
| 522 | /// Unpark one thread and requeue the rest onto the target queue. |
| 523 | UnparkOneRequeueRest, |
| 524 | |
| 525 | /// Requeue all threads onto the target queue. |
| 526 | RequeueAll, |
| 527 | |
| 528 | /// Unpark one thread and leave the rest parked. No requeuing is done. |
| 529 | UnparkOne, |
| 530 | |
| 531 | /// Requeue one thread and leave the rest parked on the original queue. |
| 532 | RequeueOne, |
| 533 | } |
| 534 | |
| 535 | /// Operation that `unpark_filter` should perform for each thread. |
| 536 | #[derive (Copy, Clone, Eq, PartialEq, Debug)] |
| 537 | pub enum FilterOp { |
| 538 | /// Unpark the thread and continue scanning the list of parked threads. |
| 539 | Unpark, |
| 540 | |
| 541 | /// Don't unpark the thread and continue scanning the list of parked threads. |
| 542 | Skip, |
| 543 | |
| 544 | /// Don't unpark the thread and stop scanning the list of parked threads. |
| 545 | Stop, |
| 546 | } |
| 547 | |
| 548 | /// A value which is passed from an unparker to a parked thread. |
| 549 | #[derive (Copy, Clone, Eq, PartialEq, Debug)] |
| 550 | pub struct UnparkToken(pub usize); |
| 551 | |
| 552 | /// A value associated with a parked thread which can be used by `unpark_filter`. |
| 553 | #[derive (Copy, Clone, Eq, PartialEq, Debug)] |
| 554 | pub struct ParkToken(pub usize); |
| 555 | |
| 556 | /// A default unpark token to use. |
| 557 | pub const DEFAULT_UNPARK_TOKEN: UnparkToken = UnparkToken(0); |
| 558 | |
| 559 | /// A default park token to use. |
| 560 | pub const DEFAULT_PARK_TOKEN: ParkToken = ParkToken(0); |
| 561 | |
| 562 | /// Parks the current thread in the queue associated with the given key. |
| 563 | /// |
| 564 | /// The `validate` function is called while the queue is locked and can abort |
| 565 | /// the operation by returning false. If `validate` returns true then the |
| 566 | /// current thread is appended to the queue and the queue is unlocked. |
| 567 | /// |
| 568 | /// The `before_sleep` function is called after the queue is unlocked but before |
| 569 | /// the thread is put to sleep. The thread will then sleep until it is unparked |
| 570 | /// or the given timeout is reached. |
| 571 | /// |
| 572 | /// The `timed_out` function is also called while the queue is locked, but only |
| 573 | /// if the timeout was reached. It is passed the key of the queue it was in when |
| 574 | /// it timed out, which may be different from the original key if |
| 575 | /// `unpark_requeue` was called. It is also passed a bool which indicates |
| 576 | /// whether it was the last thread in the queue. |
| 577 | /// |
| 578 | /// # Safety |
| 579 | /// |
| 580 | /// You should only call this function with an address that you control, since |
| 581 | /// you could otherwise interfere with the operation of other synchronization |
| 582 | /// primitives. |
| 583 | /// |
| 584 | /// The `validate` and `timed_out` functions are called while the queue is |
| 585 | /// locked and must not panic or call into any function in `parking_lot`. |
| 586 | /// |
| 587 | /// The `before_sleep` function is called outside the queue lock and is allowed |
| 588 | /// to call `unpark_one`, `unpark_all`, `unpark_requeue` or `unpark_filter`, but |
| 589 | /// it is not allowed to call `park` or panic. |
| 590 | #[inline ] |
| 591 | pub unsafe fn park( |
| 592 | key: usize, |
| 593 | validate: impl FnOnce() -> bool, |
| 594 | before_sleep: impl FnOnce(), |
| 595 | timed_out: impl FnOnce(usize, bool), |
| 596 | park_token: ParkToken, |
| 597 | timeout: Option<Instant>, |
| 598 | ) -> ParkResult { |
| 599 | // Grab our thread data, this also ensures that the hash table exists |
| 600 | with_thread_data(|thread_data| { |
| 601 | // Lock the bucket for the given key |
| 602 | let bucket = lock_bucket(key); |
| 603 | |
| 604 | // If the validation function fails, just return |
| 605 | if !validate() { |
| 606 | // SAFETY: We hold the lock here, as required |
| 607 | bucket.mutex.unlock(); |
| 608 | return ParkResult::Invalid; |
| 609 | } |
| 610 | |
| 611 | // Append our thread data to the queue and unlock the bucket |
| 612 | thread_data.parked_with_timeout.set(timeout.is_some()); |
| 613 | thread_data.next_in_queue.set(ptr::null()); |
| 614 | thread_data.key.store(key, Ordering::Relaxed); |
| 615 | thread_data.park_token.set(park_token); |
| 616 | thread_data.parker.prepare_park(); |
| 617 | if !bucket.queue_head.get().is_null() { |
| 618 | (*bucket.queue_tail.get()).next_in_queue.set(thread_data); |
| 619 | } else { |
| 620 | bucket.queue_head.set(thread_data); |
| 621 | } |
| 622 | bucket.queue_tail.set(thread_data); |
| 623 | // SAFETY: We hold the lock here, as required |
| 624 | bucket.mutex.unlock(); |
| 625 | |
| 626 | // Invoke the pre-sleep callback |
| 627 | before_sleep(); |
| 628 | |
| 629 | // Park our thread and determine whether we were woken up by an unpark |
| 630 | // or by our timeout. Note that this isn't precise: we can still be |
| 631 | // unparked since we are still in the queue. |
| 632 | let unparked = match timeout { |
| 633 | Some(timeout) => thread_data.parker.park_until(timeout), |
| 634 | None => { |
| 635 | thread_data.parker.park(); |
| 636 | // call deadlock detection on_unpark hook |
| 637 | deadlock::on_unpark(thread_data); |
| 638 | true |
| 639 | } |
| 640 | }; |
| 641 | |
| 642 | // If we were unparked, return now |
| 643 | if unparked { |
| 644 | return ParkResult::Unparked(thread_data.unpark_token.get()); |
| 645 | } |
| 646 | |
| 647 | // Lock our bucket again. Note that the hashtable may have been rehashed in |
| 648 | // the meantime. Our key may also have changed if we were requeued. |
| 649 | let (key, bucket) = lock_bucket_checked(&thread_data.key); |
| 650 | |
| 651 | // Now we need to check again if we were unparked or timed out. Unlike the |
| 652 | // last check this is precise because we hold the bucket lock. |
| 653 | if !thread_data.parker.timed_out() { |
| 654 | // SAFETY: We hold the lock here, as required |
| 655 | bucket.mutex.unlock(); |
| 656 | return ParkResult::Unparked(thread_data.unpark_token.get()); |
| 657 | } |
| 658 | |
| 659 | // We timed out, so we now need to remove our thread from the queue |
| 660 | let mut link = &bucket.queue_head; |
| 661 | let mut current = bucket.queue_head.get(); |
| 662 | let mut previous = ptr::null(); |
| 663 | let mut was_last_thread = true; |
| 664 | while !current.is_null() { |
| 665 | if current == thread_data { |
| 666 | let next = (*current).next_in_queue.get(); |
| 667 | link.set(next); |
| 668 | if bucket.queue_tail.get() == current { |
| 669 | bucket.queue_tail.set(previous); |
| 670 | } else { |
| 671 | // Scan the rest of the queue to see if there are any other |
| 672 | // entries with the given key. |
| 673 | let mut scan = next; |
| 674 | while !scan.is_null() { |
| 675 | if (*scan).key.load(Ordering::Relaxed) == key { |
| 676 | was_last_thread = false; |
| 677 | break; |
| 678 | } |
| 679 | scan = (*scan).next_in_queue.get(); |
| 680 | } |
| 681 | } |
| 682 | |
| 683 | // Callback to indicate that we timed out, and whether we were the |
| 684 | // last thread on the queue. |
| 685 | timed_out(key, was_last_thread); |
| 686 | break; |
| 687 | } else { |
| 688 | if (*current).key.load(Ordering::Relaxed) == key { |
| 689 | was_last_thread = false; |
| 690 | } |
| 691 | link = &(*current).next_in_queue; |
| 692 | previous = current; |
| 693 | current = link.get(); |
| 694 | } |
| 695 | } |
| 696 | |
| 697 | // There should be no way for our thread to have been removed from the queue |
| 698 | // if we timed out. |
| 699 | debug_assert!(!current.is_null()); |
| 700 | |
| 701 | // Unlock the bucket, we are done |
| 702 | // SAFETY: We hold the lock here, as required |
| 703 | bucket.mutex.unlock(); |
| 704 | ParkResult::TimedOut |
| 705 | }) |
| 706 | } |
| 707 | |
| 708 | /// Unparks one thread from the queue associated with the given key. |
| 709 | /// |
| 710 | /// The `callback` function is called while the queue is locked and before the |
| 711 | /// target thread is woken up. The `UnparkResult` argument to the function |
| 712 | /// indicates whether a thread was found in the queue and whether this was the |
| 713 | /// last thread in the queue. This value is also returned by `unpark_one`. |
| 714 | /// |
| 715 | /// The `callback` function should return an `UnparkToken` value which will be |
| 716 | /// passed to the thread that is unparked. If no thread is unparked then the |
| 717 | /// returned value is ignored. |
| 718 | /// |
| 719 | /// # Safety |
| 720 | /// |
| 721 | /// You should only call this function with an address that you control, since |
| 722 | /// you could otherwise interfere with the operation of other synchronization |
| 723 | /// primitives. |
| 724 | /// |
| 725 | /// The `callback` function is called while the queue is locked and must not |
| 726 | /// panic or call into any function in `parking_lot`. |
| 727 | /// |
| 728 | /// The `parking_lot` functions are not re-entrant and calling this method |
| 729 | /// from the context of an asynchronous signal handler may result in undefined |
| 730 | /// behavior, including corruption of internal state and/or deadlocks. |
| 731 | #[inline ] |
| 732 | pub unsafe fn unpark_one( |
| 733 | key: usize, |
| 734 | callback: impl FnOnce(UnparkResult) -> UnparkToken, |
| 735 | ) -> UnparkResult { |
| 736 | // Lock the bucket for the given key |
| 737 | let bucket = lock_bucket(key); |
| 738 | |
| 739 | // Find a thread with a matching key and remove it from the queue |
| 740 | let mut link = &bucket.queue_head; |
| 741 | let mut current = bucket.queue_head.get(); |
| 742 | let mut previous = ptr::null(); |
| 743 | let mut result = UnparkResult::default(); |
| 744 | while !current.is_null() { |
| 745 | if (*current).key.load(Ordering::Relaxed) == key { |
| 746 | // Remove the thread from the queue |
| 747 | let next = (*current).next_in_queue.get(); |
| 748 | link.set(next); |
| 749 | if bucket.queue_tail.get() == current { |
| 750 | bucket.queue_tail.set(previous); |
| 751 | } else { |
| 752 | // Scan the rest of the queue to see if there are any other |
| 753 | // entries with the given key. |
| 754 | let mut scan = next; |
| 755 | while !scan.is_null() { |
| 756 | if (*scan).key.load(Ordering::Relaxed) == key { |
| 757 | result.have_more_threads = true; |
| 758 | break; |
| 759 | } |
| 760 | scan = (*scan).next_in_queue.get(); |
| 761 | } |
| 762 | } |
| 763 | |
| 764 | // Invoke the callback before waking up the thread |
| 765 | result.unparked_threads = 1; |
| 766 | result.be_fair = (*bucket.fair_timeout.get()).should_timeout(); |
| 767 | let token = callback(result); |
| 768 | |
| 769 | // Set the token for the target thread |
| 770 | (*current).unpark_token.set(token); |
| 771 | |
| 772 | // This is a bit tricky: we first lock the ThreadParker to prevent |
| 773 | // the thread from exiting and freeing its ThreadData if its wait |
| 774 | // times out. Then we unlock the queue since we don't want to keep |
| 775 | // the queue locked while we perform a system call. Finally we wake |
| 776 | // up the parked thread. |
| 777 | let handle = (*current).parker.unpark_lock(); |
| 778 | // SAFETY: We hold the lock here, as required |
| 779 | bucket.mutex.unlock(); |
| 780 | handle.unpark(); |
| 781 | |
| 782 | return result; |
| 783 | } else { |
| 784 | link = &(*current).next_in_queue; |
| 785 | previous = current; |
| 786 | current = link.get(); |
| 787 | } |
| 788 | } |
| 789 | |
| 790 | // No threads with a matching key were found in the bucket |
| 791 | callback(result); |
| 792 | // SAFETY: We hold the lock here, as required |
| 793 | bucket.mutex.unlock(); |
| 794 | result |
| 795 | } |
| 796 | |
| 797 | /// Unparks all threads in the queue associated with the given key. |
| 798 | /// |
| 799 | /// The given `UnparkToken` is passed to all unparked threads. |
| 800 | /// |
| 801 | /// This function returns the number of threads that were unparked. |
| 802 | /// |
| 803 | /// # Safety |
| 804 | /// |
| 805 | /// You should only call this function with an address that you control, since |
| 806 | /// you could otherwise interfere with the operation of other synchronization |
| 807 | /// primitives. |
| 808 | /// |
| 809 | /// The `parking_lot` functions are not re-entrant and calling this method |
| 810 | /// from the context of an asynchronous signal handler may result in undefined |
| 811 | /// behavior, including corruption of internal state and/or deadlocks. |
| 812 | #[inline ] |
| 813 | pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize { |
| 814 | // Lock the bucket for the given key |
| 815 | let bucket = lock_bucket(key); |
| 816 | |
| 817 | // Remove all threads with the given key in the bucket |
| 818 | let mut link = &bucket.queue_head; |
| 819 | let mut current = bucket.queue_head.get(); |
| 820 | let mut previous = ptr::null(); |
| 821 | let mut threads = SmallVec::<[_; 8]>::new(); |
| 822 | while !current.is_null() { |
| 823 | if (*current).key.load(Ordering::Relaxed) == key { |
| 824 | // Remove the thread from the queue |
| 825 | let next = (*current).next_in_queue.get(); |
| 826 | link.set(next); |
| 827 | if bucket.queue_tail.get() == current { |
| 828 | bucket.queue_tail.set(previous); |
| 829 | } |
| 830 | |
| 831 | // Set the token for the target thread |
| 832 | (*current).unpark_token.set(unpark_token); |
| 833 | |
| 834 | // Don't wake up threads while holding the queue lock. See comment |
| 835 | // in unpark_one. For now just record which threads we need to wake |
| 836 | // up. |
| 837 | threads.push((*current).parker.unpark_lock()); |
| 838 | current = next; |
| 839 | } else { |
| 840 | link = &(*current).next_in_queue; |
| 841 | previous = current; |
| 842 | current = link.get(); |
| 843 | } |
| 844 | } |
| 845 | |
| 846 | // Unlock the bucket |
| 847 | // SAFETY: We hold the lock here, as required |
| 848 | bucket.mutex.unlock(); |
| 849 | |
| 850 | // Now that we are outside the lock, wake up all the threads that we removed |
| 851 | // from the queue. |
| 852 | let num_threads = threads.len(); |
| 853 | for handle in threads.into_iter() { |
| 854 | handle.unpark(); |
| 855 | } |
| 856 | |
| 857 | num_threads |
| 858 | } |
| 859 | |
| 860 | /// Removes all threads from the queue associated with `key_from`, optionally |
| 861 | /// unparks the first one and requeues the rest onto the queue associated with |
| 862 | /// `key_to`. |
| 863 | /// |
| 864 | /// The `validate` function is called while both queues are locked. Its return |
| 865 | /// value will determine which operation is performed, or whether the operation |
| 866 | /// should be aborted. See `RequeueOp` for details about the different possible |
| 867 | /// return values. |
| 868 | /// |
| 869 | /// The `callback` function is also called while both queues are locked. It is |
| 870 | /// passed the `RequeueOp` returned by `validate` and an `UnparkResult` |
| 871 | /// indicating whether a thread was unparked and whether there are threads still |
| 872 | /// parked in the new queue. This `UnparkResult` value is also returned by |
| 873 | /// `unpark_requeue`. |
| 874 | /// |
| 875 | /// The `callback` function should return an `UnparkToken` value which will be |
| 876 | /// passed to the thread that is unparked. If no thread is unparked then the |
| 877 | /// returned value is ignored. |
| 878 | /// |
| 879 | /// # Safety |
| 880 | /// |
| 881 | /// You should only call this function with an address that you control, since |
| 882 | /// you could otherwise interfere with the operation of other synchronization |
| 883 | /// primitives. |
| 884 | /// |
| 885 | /// The `validate` and `callback` functions are called while the queue is locked |
| 886 | /// and must not panic or call into any function in `parking_lot`. |
| 887 | #[inline ] |
| 888 | pub unsafe fn unpark_requeue( |
| 889 | key_from: usize, |
| 890 | key_to: usize, |
| 891 | validate: impl FnOnce() -> RequeueOp, |
| 892 | callback: impl FnOnce(RequeueOp, UnparkResult) -> UnparkToken, |
| 893 | ) -> UnparkResult { |
| 894 | // Lock the two buckets for the given key |
| 895 | let (bucket_from, bucket_to) = lock_bucket_pair(key_from, key_to); |
| 896 | |
| 897 | // If the validation function fails, just return |
| 898 | let mut result = UnparkResult::default(); |
| 899 | let op = validate(); |
| 900 | if op == RequeueOp::Abort { |
| 901 | // SAFETY: Both buckets are locked, as required. |
| 902 | unlock_bucket_pair(bucket_from, bucket_to); |
| 903 | return result; |
| 904 | } |
| 905 | |
| 906 | // Remove all threads with the given key in the source bucket |
| 907 | let mut link = &bucket_from.queue_head; |
| 908 | let mut current = bucket_from.queue_head.get(); |
| 909 | let mut previous = ptr::null(); |
| 910 | let mut requeue_threads: *const ThreadData = ptr::null(); |
| 911 | let mut requeue_threads_tail: *const ThreadData = ptr::null(); |
| 912 | let mut wakeup_thread = None; |
| 913 | while !current.is_null() { |
| 914 | if (*current).key.load(Ordering::Relaxed) == key_from { |
| 915 | // Remove the thread from the queue |
| 916 | let next = (*current).next_in_queue.get(); |
| 917 | link.set(next); |
| 918 | if bucket_from.queue_tail.get() == current { |
| 919 | bucket_from.queue_tail.set(previous); |
| 920 | } |
| 921 | |
| 922 | // Prepare the first thread for wakeup and requeue the rest. |
| 923 | if (op == RequeueOp::UnparkOneRequeueRest || op == RequeueOp::UnparkOne) |
| 924 | && wakeup_thread.is_none() |
| 925 | { |
| 926 | wakeup_thread = Some(current); |
| 927 | result.unparked_threads = 1; |
| 928 | } else { |
| 929 | if !requeue_threads.is_null() { |
| 930 | (*requeue_threads_tail).next_in_queue.set(current); |
| 931 | } else { |
| 932 | requeue_threads = current; |
| 933 | } |
| 934 | requeue_threads_tail = current; |
| 935 | (*current).key.store(key_to, Ordering::Relaxed); |
| 936 | result.requeued_threads += 1; |
| 937 | } |
| 938 | if op == RequeueOp::UnparkOne || op == RequeueOp::RequeueOne { |
| 939 | // Scan the rest of the queue to see if there are any other |
| 940 | // entries with the given key. |
| 941 | let mut scan = next; |
| 942 | while !scan.is_null() { |
| 943 | if (*scan).key.load(Ordering::Relaxed) == key_from { |
| 944 | result.have_more_threads = true; |
| 945 | break; |
| 946 | } |
| 947 | scan = (*scan).next_in_queue.get(); |
| 948 | } |
| 949 | break; |
| 950 | } |
| 951 | current = next; |
| 952 | } else { |
| 953 | link = &(*current).next_in_queue; |
| 954 | previous = current; |
| 955 | current = link.get(); |
| 956 | } |
| 957 | } |
| 958 | |
| 959 | // Add the requeued threads to the destination bucket |
| 960 | if !requeue_threads.is_null() { |
| 961 | (*requeue_threads_tail).next_in_queue.set(ptr::null()); |
| 962 | if !bucket_to.queue_head.get().is_null() { |
| 963 | (*bucket_to.queue_tail.get()) |
| 964 | .next_in_queue |
| 965 | .set(requeue_threads); |
| 966 | } else { |
| 967 | bucket_to.queue_head.set(requeue_threads); |
| 968 | } |
| 969 | bucket_to.queue_tail.set(requeue_threads_tail); |
| 970 | } |
| 971 | |
| 972 | // Invoke the callback before waking up the thread |
| 973 | if result.unparked_threads != 0 { |
| 974 | result.be_fair = (*bucket_from.fair_timeout.get()).should_timeout(); |
| 975 | } |
| 976 | let token = callback(op, result); |
| 977 | |
| 978 | // See comment in unpark_one for why we mess with the locking |
| 979 | if let Some(wakeup_thread) = wakeup_thread { |
| 980 | (*wakeup_thread).unpark_token.set(token); |
| 981 | let handle = (*wakeup_thread).parker.unpark_lock(); |
| 982 | // SAFETY: Both buckets are locked, as required. |
| 983 | unlock_bucket_pair(bucket_from, bucket_to); |
| 984 | handle.unpark(); |
| 985 | } else { |
| 986 | // SAFETY: Both buckets are locked, as required. |
| 987 | unlock_bucket_pair(bucket_from, bucket_to); |
| 988 | } |
| 989 | |
| 990 | result |
| 991 | } |
| 992 | |
| 993 | /// Unparks a number of threads from the front of the queue associated with |
| 994 | /// `key` depending on the results of a filter function which inspects the |
| 995 | /// `ParkToken` associated with each thread. |
| 996 | /// |
| 997 | /// The `filter` function is called for each thread in the queue or until |
| 998 | /// `FilterOp::Stop` is returned. This function is passed the `ParkToken` |
| 999 | /// associated with a particular thread, which is unparked if `FilterOp::Unpark` |
| 1000 | /// is returned. |
| 1001 | /// |
| 1002 | /// The `callback` function is also called while both queues are locked. It is |
| 1003 | /// passed an `UnparkResult` indicating the number of threads that were unparked |
| 1004 | /// and whether there are still parked threads in the queue. This `UnparkResult` |
| 1005 | /// value is also returned by `unpark_filter`. |
| 1006 | /// |
| 1007 | /// The `callback` function should return an `UnparkToken` value which will be |
| 1008 | /// passed to all threads that are unparked. If no thread is unparked then the |
| 1009 | /// returned value is ignored. |
| 1010 | /// |
| 1011 | /// # Safety |
| 1012 | /// |
| 1013 | /// You should only call this function with an address that you control, since |
| 1014 | /// you could otherwise interfere with the operation of other synchronization |
| 1015 | /// primitives. |
| 1016 | /// |
| 1017 | /// The `filter` and `callback` functions are called while the queue is locked |
| 1018 | /// and must not panic or call into any function in `parking_lot`. |
| 1019 | #[inline ] |
| 1020 | pub unsafe fn unpark_filter( |
| 1021 | key: usize, |
| 1022 | mut filter: impl FnMut(ParkToken) -> FilterOp, |
| 1023 | callback: impl FnOnce(UnparkResult) -> UnparkToken, |
| 1024 | ) -> UnparkResult { |
| 1025 | // Lock the bucket for the given key |
| 1026 | let bucket = lock_bucket(key); |
| 1027 | |
| 1028 | // Go through the queue looking for threads with a matching key |
| 1029 | let mut link = &bucket.queue_head; |
| 1030 | let mut current = bucket.queue_head.get(); |
| 1031 | let mut previous = ptr::null(); |
| 1032 | let mut threads = SmallVec::<[_; 8]>::new(); |
| 1033 | let mut result = UnparkResult::default(); |
| 1034 | while !current.is_null() { |
| 1035 | if (*current).key.load(Ordering::Relaxed) == key { |
| 1036 | // Call the filter function with the thread's ParkToken |
| 1037 | let next = (*current).next_in_queue.get(); |
| 1038 | match filter((*current).park_token.get()) { |
| 1039 | FilterOp::Unpark => { |
| 1040 | // Remove the thread from the queue |
| 1041 | link.set(next); |
| 1042 | if bucket.queue_tail.get() == current { |
| 1043 | bucket.queue_tail.set(previous); |
| 1044 | } |
| 1045 | |
| 1046 | // Add the thread to our list of threads to unpark |
| 1047 | threads.push((current, None)); |
| 1048 | |
| 1049 | current = next; |
| 1050 | } |
| 1051 | FilterOp::Skip => { |
| 1052 | result.have_more_threads = true; |
| 1053 | link = &(*current).next_in_queue; |
| 1054 | previous = current; |
| 1055 | current = link.get(); |
| 1056 | } |
| 1057 | FilterOp::Stop => { |
| 1058 | result.have_more_threads = true; |
| 1059 | break; |
| 1060 | } |
| 1061 | } |
| 1062 | } else { |
| 1063 | link = &(*current).next_in_queue; |
| 1064 | previous = current; |
| 1065 | current = link.get(); |
| 1066 | } |
| 1067 | } |
| 1068 | |
| 1069 | // Invoke the callback before waking up the threads |
| 1070 | result.unparked_threads = threads.len(); |
| 1071 | if result.unparked_threads != 0 { |
| 1072 | result.be_fair = (*bucket.fair_timeout.get()).should_timeout(); |
| 1073 | } |
| 1074 | let token = callback(result); |
| 1075 | |
| 1076 | // Pass the token to all threads that are going to be unparked and prepare |
| 1077 | // them for unparking. |
| 1078 | for t in threads.iter_mut() { |
| 1079 | (*t.0).unpark_token.set(token); |
| 1080 | t.1 = Some((*t.0).parker.unpark_lock()); |
| 1081 | } |
| 1082 | |
| 1083 | // SAFETY: We hold the lock here, as required |
| 1084 | bucket.mutex.unlock(); |
| 1085 | |
| 1086 | // Now that we are outside the lock, wake up all the threads that we removed |
| 1087 | // from the queue. |
| 1088 | for (_, handle) in threads.into_iter() { |
| 1089 | handle.unchecked_unwrap().unpark(); |
| 1090 | } |
| 1091 | |
| 1092 | result |
| 1093 | } |
| 1094 | |
| 1095 | /// \[Experimental\] Deadlock detection |
| 1096 | /// |
| 1097 | /// Enabled via the `deadlock_detection` feature flag. |
| 1098 | pub mod deadlock { |
| 1099 | #[cfg (feature = "deadlock_detection" )] |
| 1100 | use super::deadlock_impl; |
| 1101 | |
| 1102 | #[cfg (feature = "deadlock_detection" )] |
| 1103 | pub(super) use super::deadlock_impl::DeadlockData; |
| 1104 | |
| 1105 | /// Acquire a resource identified by key in the deadlock detector |
| 1106 | /// Noop if `deadlock_detection` feature isn't enabled. |
| 1107 | /// |
| 1108 | /// # Safety |
| 1109 | /// |
| 1110 | /// Call after the resource is acquired |
| 1111 | #[inline ] |
| 1112 | pub unsafe fn acquire_resource(_key: usize) { |
| 1113 | #[cfg (feature = "deadlock_detection" )] |
| 1114 | deadlock_impl::acquire_resource(_key); |
| 1115 | } |
| 1116 | |
| 1117 | /// Release a resource identified by key in the deadlock detector. |
| 1118 | /// Noop if `deadlock_detection` feature isn't enabled. |
| 1119 | /// |
| 1120 | /// # Panics |
| 1121 | /// |
| 1122 | /// Panics if the resource was already released or wasn't acquired in this thread. |
| 1123 | /// |
| 1124 | /// # Safety |
| 1125 | /// |
| 1126 | /// Call before the resource is released |
| 1127 | #[inline ] |
| 1128 | pub unsafe fn release_resource(_key: usize) { |
| 1129 | #[cfg (feature = "deadlock_detection" )] |
| 1130 | deadlock_impl::release_resource(_key); |
| 1131 | } |
| 1132 | |
| 1133 | /// Returns all deadlocks detected *since* the last call. |
| 1134 | /// Each cycle consist of a vector of `DeadlockedThread`. |
| 1135 | #[cfg (feature = "deadlock_detection" )] |
| 1136 | #[inline ] |
| 1137 | pub fn check_deadlock() -> Vec<Vec<deadlock_impl::DeadlockedThread>> { |
| 1138 | deadlock_impl::check_deadlock() |
| 1139 | } |
| 1140 | |
| 1141 | #[inline ] |
| 1142 | pub(super) unsafe fn on_unpark(_td: &super::ThreadData) { |
| 1143 | #[cfg (feature = "deadlock_detection" )] |
| 1144 | deadlock_impl::on_unpark(_td); |
| 1145 | } |
| 1146 | } |
| 1147 | |
| 1148 | #[cfg (feature = "deadlock_detection" )] |
| 1149 | mod deadlock_impl { |
| 1150 | use super::{get_hashtable, lock_bucket, with_thread_data, ThreadData, NUM_THREADS}; |
| 1151 | use crate::thread_parker::{ThreadParkerT, UnparkHandleT}; |
| 1152 | use crate::word_lock::WordLock; |
| 1153 | use backtrace::Backtrace; |
| 1154 | use petgraph; |
| 1155 | use petgraph::graphmap::DiGraphMap; |
| 1156 | use std::cell::{Cell, UnsafeCell}; |
| 1157 | use std::collections::HashSet; |
| 1158 | use std::sync::atomic::Ordering; |
| 1159 | use std::sync::mpsc; |
| 1160 | use thread_id; |
| 1161 | |
| 1162 | /// Representation of a deadlocked thread |
| 1163 | pub struct DeadlockedThread { |
| 1164 | thread_id: usize, |
| 1165 | backtrace: Backtrace, |
| 1166 | } |
| 1167 | |
| 1168 | impl DeadlockedThread { |
| 1169 | /// The system thread id |
| 1170 | pub fn thread_id(&self) -> usize { |
| 1171 | self.thread_id |
| 1172 | } |
| 1173 | |
| 1174 | /// The thread backtrace |
| 1175 | pub fn backtrace(&self) -> &Backtrace { |
| 1176 | &self.backtrace |
| 1177 | } |
| 1178 | } |
| 1179 | |
| 1180 | pub struct DeadlockData { |
| 1181 | // Currently owned resources (keys) |
| 1182 | resources: UnsafeCell<Vec<usize>>, |
| 1183 | |
| 1184 | // Set when there's a pending callstack request |
| 1185 | deadlocked: Cell<bool>, |
| 1186 | |
| 1187 | // Sender used to report the backtrace |
| 1188 | backtrace_sender: UnsafeCell<Option<mpsc::Sender<DeadlockedThread>>>, |
| 1189 | |
| 1190 | // System thread id |
| 1191 | thread_id: usize, |
| 1192 | } |
| 1193 | |
| 1194 | impl DeadlockData { |
| 1195 | pub fn new() -> Self { |
| 1196 | DeadlockData { |
| 1197 | resources: UnsafeCell::new(Vec::new()), |
| 1198 | deadlocked: Cell::new(false), |
| 1199 | backtrace_sender: UnsafeCell::new(None), |
| 1200 | thread_id: thread_id::get(), |
| 1201 | } |
| 1202 | } |
| 1203 | } |
| 1204 | |
| 1205 | pub(super) unsafe fn on_unpark(td: &ThreadData) { |
| 1206 | if td.deadlock_data.deadlocked.get() { |
| 1207 | let sender = (*td.deadlock_data.backtrace_sender.get()).take().unwrap(); |
| 1208 | sender |
| 1209 | .send(DeadlockedThread { |
| 1210 | thread_id: td.deadlock_data.thread_id, |
| 1211 | backtrace: Backtrace::new(), |
| 1212 | }) |
| 1213 | .unwrap(); |
| 1214 | // make sure to close this sender |
| 1215 | drop(sender); |
| 1216 | |
| 1217 | // park until the end of the time |
| 1218 | td.parker.prepare_park(); |
| 1219 | td.parker.park(); |
| 1220 | unreachable!("unparked deadlocked thread!" ); |
| 1221 | } |
| 1222 | } |
| 1223 | |
| 1224 | pub unsafe fn acquire_resource(key: usize) { |
| 1225 | with_thread_data(|thread_data| { |
| 1226 | (*thread_data.deadlock_data.resources.get()).push(key); |
| 1227 | }); |
| 1228 | } |
| 1229 | |
| 1230 | pub unsafe fn release_resource(key: usize) { |
| 1231 | with_thread_data(|thread_data| { |
| 1232 | let resources = &mut (*thread_data.deadlock_data.resources.get()); |
| 1233 | |
| 1234 | // There is only one situation where we can fail to find the |
| 1235 | // resource: we are currently running TLS destructors and our |
| 1236 | // ThreadData has already been freed. There isn't much we can do |
| 1237 | // about it at this point, so just ignore it. |
| 1238 | if let Some(p) = resources.iter().rposition(|x| *x == key) { |
| 1239 | resources.swap_remove(p); |
| 1240 | } |
| 1241 | }); |
| 1242 | } |
| 1243 | |
| 1244 | pub fn check_deadlock() -> Vec<Vec<DeadlockedThread>> { |
| 1245 | unsafe { |
| 1246 | // fast pass |
| 1247 | if check_wait_graph_fast() { |
| 1248 | // double check |
| 1249 | check_wait_graph_slow() |
| 1250 | } else { |
| 1251 | Vec::new() |
| 1252 | } |
| 1253 | } |
| 1254 | } |
| 1255 | |
| 1256 | // Simple algorithm that builds a wait graph f the threads and the resources, |
| 1257 | // then checks for the presence of cycles (deadlocks). |
| 1258 | // This variant isn't precise as it doesn't lock the entire table before checking |
| 1259 | unsafe fn check_wait_graph_fast() -> bool { |
| 1260 | let table = get_hashtable(); |
| 1261 | let thread_count = NUM_THREADS.load(Ordering::Relaxed); |
| 1262 | let mut graph = DiGraphMap::<usize, ()>::with_capacity(thread_count * 2, thread_count * 2); |
| 1263 | |
| 1264 | for b in &(*table).entries[..] { |
| 1265 | b.mutex.lock(); |
| 1266 | let mut current = b.queue_head.get(); |
| 1267 | while !current.is_null() { |
| 1268 | if !(*current).parked_with_timeout.get() |
| 1269 | && !(*current).deadlock_data.deadlocked.get() |
| 1270 | { |
| 1271 | // .resources are waiting for their owner |
| 1272 | for &resource in &(*(*current).deadlock_data.resources.get()) { |
| 1273 | graph.add_edge(resource, current as usize, ()); |
| 1274 | } |
| 1275 | // owner waits for resource .key |
| 1276 | graph.add_edge(current as usize, (*current).key.load(Ordering::Relaxed), ()); |
| 1277 | } |
| 1278 | current = (*current).next_in_queue.get(); |
| 1279 | } |
| 1280 | // SAFETY: We hold the lock here, as required |
| 1281 | b.mutex.unlock(); |
| 1282 | } |
| 1283 | |
| 1284 | petgraph::algo::is_cyclic_directed(&graph) |
| 1285 | } |
| 1286 | |
| 1287 | #[derive (Hash, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)] |
| 1288 | enum WaitGraphNode { |
| 1289 | Thread(*const ThreadData), |
| 1290 | Resource(usize), |
| 1291 | } |
| 1292 | |
| 1293 | use self::WaitGraphNode::*; |
| 1294 | |
| 1295 | // Contrary to the _fast variant this locks the entries table before looking for cycles. |
| 1296 | // Returns all detected thread wait cycles. |
| 1297 | // Note that once a cycle is reported it's never reported again. |
| 1298 | unsafe fn check_wait_graph_slow() -> Vec<Vec<DeadlockedThread>> { |
| 1299 | static DEADLOCK_DETECTION_LOCK: WordLock = WordLock::new(); |
| 1300 | DEADLOCK_DETECTION_LOCK.lock(); |
| 1301 | |
| 1302 | let mut table = get_hashtable(); |
| 1303 | loop { |
| 1304 | // Lock all buckets in the old table |
| 1305 | for b in &table.entries[..] { |
| 1306 | b.mutex.lock(); |
| 1307 | } |
| 1308 | |
| 1309 | // Now check if our table is still the latest one. Another thread could |
| 1310 | // have grown the hash table between us getting and locking the hash table. |
| 1311 | let new_table = get_hashtable(); |
| 1312 | if new_table as *const _ == table as *const _ { |
| 1313 | break; |
| 1314 | } |
| 1315 | |
| 1316 | // Unlock buckets and try again |
| 1317 | for b in &table.entries[..] { |
| 1318 | // SAFETY: We hold the lock here, as required |
| 1319 | b.mutex.unlock(); |
| 1320 | } |
| 1321 | |
| 1322 | table = new_table; |
| 1323 | } |
| 1324 | |
| 1325 | let thread_count = NUM_THREADS.load(Ordering::Relaxed); |
| 1326 | let mut graph = |
| 1327 | DiGraphMap::<WaitGraphNode, ()>::with_capacity(thread_count * 2, thread_count * 2); |
| 1328 | |
| 1329 | for b in &table.entries[..] { |
| 1330 | let mut current = b.queue_head.get(); |
| 1331 | while !current.is_null() { |
| 1332 | if !(*current).parked_with_timeout.get() |
| 1333 | && !(*current).deadlock_data.deadlocked.get() |
| 1334 | { |
| 1335 | // .resources are waiting for their owner |
| 1336 | for &resource in &(*(*current).deadlock_data.resources.get()) { |
| 1337 | graph.add_edge(Resource(resource), Thread(current), ()); |
| 1338 | } |
| 1339 | // owner waits for resource .key |
| 1340 | graph.add_edge( |
| 1341 | Thread(current), |
| 1342 | Resource((*current).key.load(Ordering::Relaxed)), |
| 1343 | (), |
| 1344 | ); |
| 1345 | } |
| 1346 | current = (*current).next_in_queue.get(); |
| 1347 | } |
| 1348 | } |
| 1349 | |
| 1350 | for b in &table.entries[..] { |
| 1351 | // SAFETY: We hold the lock here, as required |
| 1352 | b.mutex.unlock(); |
| 1353 | } |
| 1354 | |
| 1355 | // find cycles |
| 1356 | let cycles = graph_cycles(&graph); |
| 1357 | |
| 1358 | let mut results = Vec::with_capacity(cycles.len()); |
| 1359 | |
| 1360 | for cycle in cycles { |
| 1361 | let (sender, receiver) = mpsc::channel(); |
| 1362 | for td in cycle { |
| 1363 | let bucket = lock_bucket((*td).key.load(Ordering::Relaxed)); |
| 1364 | (*td).deadlock_data.deadlocked.set(true); |
| 1365 | *(*td).deadlock_data.backtrace_sender.get() = Some(sender.clone()); |
| 1366 | let handle = (*td).parker.unpark_lock(); |
| 1367 | // SAFETY: We hold the lock here, as required |
| 1368 | bucket.mutex.unlock(); |
| 1369 | // unpark the deadlocked thread! |
| 1370 | // on unpark it'll notice the deadlocked flag and report back |
| 1371 | handle.unpark(); |
| 1372 | } |
| 1373 | // make sure to drop our sender before collecting results |
| 1374 | drop(sender); |
| 1375 | results.push(receiver.iter().collect()); |
| 1376 | } |
| 1377 | |
| 1378 | DEADLOCK_DETECTION_LOCK.unlock(); |
| 1379 | |
| 1380 | results |
| 1381 | } |
| 1382 | |
| 1383 | // normalize a cycle to start with the "smallest" node |
| 1384 | fn normalize_cycle<T: Ord + Copy + Clone>(input: &[T]) -> Vec<T> { |
| 1385 | let min_pos = input |
| 1386 | .iter() |
| 1387 | .enumerate() |
| 1388 | .min_by_key(|&(_, &t)| t) |
| 1389 | .map(|(p, _)| p) |
| 1390 | .unwrap_or(0); |
| 1391 | input |
| 1392 | .iter() |
| 1393 | .cycle() |
| 1394 | .skip(min_pos) |
| 1395 | .take(input.len()) |
| 1396 | .cloned() |
| 1397 | .collect() |
| 1398 | } |
| 1399 | |
| 1400 | // returns all thread cycles in the wait graph |
| 1401 | fn graph_cycles(g: &DiGraphMap<WaitGraphNode, ()>) -> Vec<Vec<*const ThreadData>> { |
| 1402 | use petgraph::visit::depth_first_search; |
| 1403 | use petgraph::visit::DfsEvent; |
| 1404 | use petgraph::visit::NodeIndexable; |
| 1405 | |
| 1406 | let mut cycles = HashSet::new(); |
| 1407 | let mut path = Vec::with_capacity(g.node_bound()); |
| 1408 | // start from threads to get the correct threads cycle |
| 1409 | let threads = g |
| 1410 | .nodes() |
| 1411 | .filter(|n| if let &Thread(_) = n { true } else { false }); |
| 1412 | |
| 1413 | depth_first_search(g, threads, |e| match e { |
| 1414 | DfsEvent::Discover(Thread(n), _) => path.push(n), |
| 1415 | DfsEvent::Finish(Thread(_), _) => { |
| 1416 | path.pop(); |
| 1417 | } |
| 1418 | DfsEvent::BackEdge(_, Thread(n)) => { |
| 1419 | let from = path.iter().rposition(|&i| i == n).unwrap(); |
| 1420 | cycles.insert(normalize_cycle(&path[from..])); |
| 1421 | } |
| 1422 | _ => (), |
| 1423 | }); |
| 1424 | |
| 1425 | cycles.iter().cloned().collect() |
| 1426 | } |
| 1427 | } |
| 1428 | |
| 1429 | #[cfg (test)] |
| 1430 | mod tests { |
| 1431 | use super::{ThreadData, DEFAULT_PARK_TOKEN, DEFAULT_UNPARK_TOKEN}; |
| 1432 | use std::{ |
| 1433 | ptr, |
| 1434 | sync::{ |
| 1435 | atomic::{AtomicIsize, AtomicPtr, AtomicUsize, Ordering}, |
| 1436 | Arc, |
| 1437 | }, |
| 1438 | thread, |
| 1439 | time::Duration, |
| 1440 | }; |
| 1441 | |
| 1442 | /// Calls a closure for every `ThreadData` currently parked on a given key |
| 1443 | fn for_each(key: usize, mut f: impl FnMut(&ThreadData)) { |
| 1444 | let bucket = super::lock_bucket(key); |
| 1445 | |
| 1446 | let mut current: *const ThreadData = bucket.queue_head.get(); |
| 1447 | while !current.is_null() { |
| 1448 | let current_ref = unsafe { &*current }; |
| 1449 | if current_ref.key.load(Ordering::Relaxed) == key { |
| 1450 | f(current_ref); |
| 1451 | } |
| 1452 | current = current_ref.next_in_queue.get(); |
| 1453 | } |
| 1454 | |
| 1455 | // SAFETY: We hold the lock here, as required |
| 1456 | unsafe { bucket.mutex.unlock() }; |
| 1457 | } |
| 1458 | |
| 1459 | macro_rules! test { |
| 1460 | ( $( $name:ident( |
| 1461 | repeats: $repeats:expr, |
| 1462 | latches: $latches:expr, |
| 1463 | delay: $delay:expr, |
| 1464 | threads: $threads:expr, |
| 1465 | single_unparks: $single_unparks:expr); |
| 1466 | )* ) => { |
| 1467 | $(#[test] |
| 1468 | fn $name() { |
| 1469 | let delay = Duration::from_micros($delay); |
| 1470 | for _ in 0..$repeats { |
| 1471 | run_parking_test($latches, delay, $threads, $single_unparks); |
| 1472 | } |
| 1473 | })* |
| 1474 | }; |
| 1475 | } |
| 1476 | |
| 1477 | test ! { |
| 1478 | unpark_all_one_fast( |
| 1479 | repeats: 1000, latches: 1, delay: 0, threads: 1, single_unparks: 0 |
| 1480 | ); |
| 1481 | unpark_all_hundred_fast( |
| 1482 | repeats: 100, latches: 1, delay: 0, threads: 100, single_unparks: 0 |
| 1483 | ); |
| 1484 | unpark_one_one_fast( |
| 1485 | repeats: 1000, latches: 1, delay: 0, threads: 1, single_unparks: 1 |
| 1486 | ); |
| 1487 | unpark_one_hundred_fast( |
| 1488 | repeats: 20, latches: 1, delay: 0, threads: 100, single_unparks: 100 |
| 1489 | ); |
| 1490 | unpark_one_fifty_then_fifty_all_fast( |
| 1491 | repeats: 50, latches: 1, delay: 0, threads: 100, single_unparks: 50 |
| 1492 | ); |
| 1493 | unpark_all_one( |
| 1494 | repeats: 100, latches: 1, delay: 10000, threads: 1, single_unparks: 0 |
| 1495 | ); |
| 1496 | unpark_all_hundred( |
| 1497 | repeats: 100, latches: 1, delay: 10000, threads: 100, single_unparks: 0 |
| 1498 | ); |
| 1499 | unpark_one_one( |
| 1500 | repeats: 10, latches: 1, delay: 10000, threads: 1, single_unparks: 1 |
| 1501 | ); |
| 1502 | unpark_one_fifty( |
| 1503 | repeats: 1, latches: 1, delay: 10000, threads: 50, single_unparks: 50 |
| 1504 | ); |
| 1505 | unpark_one_fifty_then_fifty_all( |
| 1506 | repeats: 2, latches: 1, delay: 10000, threads: 100, single_unparks: 50 |
| 1507 | ); |
| 1508 | hundred_unpark_all_one_fast( |
| 1509 | repeats: 100, latches: 100, delay: 0, threads: 1, single_unparks: 0 |
| 1510 | ); |
| 1511 | hundred_unpark_all_one( |
| 1512 | repeats: 1, latches: 100, delay: 10000, threads: 1, single_unparks: 0 |
| 1513 | ); |
| 1514 | } |
| 1515 | |
| 1516 | fn run_parking_test( |
| 1517 | num_latches: usize, |
| 1518 | delay: Duration, |
| 1519 | num_threads: usize, |
| 1520 | num_single_unparks: usize, |
| 1521 | ) { |
| 1522 | let mut tests = Vec::with_capacity(num_latches); |
| 1523 | |
| 1524 | for _ in 0..num_latches { |
| 1525 | let test = Arc::new(SingleLatchTest::new(num_threads)); |
| 1526 | let mut threads = Vec::with_capacity(num_threads); |
| 1527 | for _ in 0..num_threads { |
| 1528 | let test = test .clone(); |
| 1529 | threads.push(thread::spawn(move || test .run())); |
| 1530 | } |
| 1531 | tests.push((test , threads)); |
| 1532 | } |
| 1533 | |
| 1534 | for unpark_index in 0..num_single_unparks { |
| 1535 | thread::sleep(delay); |
| 1536 | for (test, _) in &tests { |
| 1537 | test .unpark_one(unpark_index); |
| 1538 | } |
| 1539 | } |
| 1540 | |
| 1541 | for (test, threads) in tests { |
| 1542 | test .finish(num_single_unparks); |
| 1543 | for thread in threads { |
| 1544 | thread.join().expect("Test thread panic" ); |
| 1545 | } |
| 1546 | } |
| 1547 | } |
| 1548 | |
| 1549 | struct SingleLatchTest { |
| 1550 | semaphore: AtomicIsize, |
| 1551 | num_awake: AtomicUsize, |
| 1552 | /// Holds the pointer to the last *unprocessed* woken up thread. |
| 1553 | last_awoken: AtomicPtr<ThreadData>, |
| 1554 | /// Total number of threads participating in this test. |
| 1555 | num_threads: usize, |
| 1556 | } |
| 1557 | |
| 1558 | impl SingleLatchTest { |
| 1559 | pub fn new(num_threads: usize) -> Self { |
| 1560 | Self { |
| 1561 | // This implements a fair (FIFO) semaphore, and it starts out unavailable. |
| 1562 | semaphore: AtomicIsize::new(0), |
| 1563 | num_awake: AtomicUsize::new(0), |
| 1564 | last_awoken: AtomicPtr::new(ptr::null_mut()), |
| 1565 | num_threads, |
| 1566 | } |
| 1567 | } |
| 1568 | |
| 1569 | pub fn run(&self) { |
| 1570 | // Get one slot from the semaphore |
| 1571 | self.down(); |
| 1572 | |
| 1573 | // Report back to the test verification code that this thread woke up |
| 1574 | let this_thread_ptr = super::with_thread_data(|t| t as *const _ as *mut _); |
| 1575 | self.last_awoken.store(this_thread_ptr, Ordering::SeqCst); |
| 1576 | self.num_awake.fetch_add(1, Ordering::SeqCst); |
| 1577 | } |
| 1578 | |
| 1579 | pub fn unpark_one(&self, single_unpark_index: usize) { |
| 1580 | // last_awoken should be null at all times except between self.up() and at the bottom |
| 1581 | // of this method where it's reset to null again |
| 1582 | assert!(self.last_awoken.load(Ordering::SeqCst).is_null()); |
| 1583 | |
| 1584 | let mut queue: Vec<*mut ThreadData> = Vec::with_capacity(self.num_threads); |
| 1585 | for_each(self.semaphore_addr(), |thread_data| { |
| 1586 | queue.push(thread_data as *const _ as *mut _); |
| 1587 | }); |
| 1588 | assert!(queue.len() <= self.num_threads - single_unpark_index); |
| 1589 | |
| 1590 | let num_awake_before_up = self.num_awake.load(Ordering::SeqCst); |
| 1591 | |
| 1592 | self.up(); |
| 1593 | |
| 1594 | // Wait for a parked thread to wake up and update num_awake + last_awoken. |
| 1595 | while self.num_awake.load(Ordering::SeqCst) != num_awake_before_up + 1 { |
| 1596 | thread::yield_now(); |
| 1597 | } |
| 1598 | |
| 1599 | // At this point the other thread should have set last_awoken inside the run() method |
| 1600 | let last_awoken = self.last_awoken.load(Ordering::SeqCst); |
| 1601 | assert!(!last_awoken.is_null()); |
| 1602 | if !queue.is_empty() && queue[0] != last_awoken { |
| 1603 | panic!( |
| 1604 | "Woke up wrong thread: \n\tqueue: {:?} \n\tlast awoken: {:?}" , |
| 1605 | queue, last_awoken |
| 1606 | ); |
| 1607 | } |
| 1608 | self.last_awoken.store(ptr::null_mut(), Ordering::SeqCst); |
| 1609 | } |
| 1610 | |
| 1611 | pub fn finish(&self, num_single_unparks: usize) { |
| 1612 | // The amount of threads not unparked via unpark_one |
| 1613 | let mut num_threads_left = self.num_threads.checked_sub(num_single_unparks).unwrap(); |
| 1614 | |
| 1615 | // Wake remaining threads up with unpark_all. Has to be in a loop, because there might |
| 1616 | // still be threads that has not yet parked. |
| 1617 | while num_threads_left > 0 { |
| 1618 | let mut num_waiting_on_address = 0; |
| 1619 | for_each(self.semaphore_addr(), |_thread_data| { |
| 1620 | num_waiting_on_address += 1; |
| 1621 | }); |
| 1622 | assert!(num_waiting_on_address <= num_threads_left); |
| 1623 | |
| 1624 | let num_awake_before_unpark = self.num_awake.load(Ordering::SeqCst); |
| 1625 | |
| 1626 | let num_unparked = |
| 1627 | unsafe { super::unpark_all(self.semaphore_addr(), DEFAULT_UNPARK_TOKEN) }; |
| 1628 | assert!(num_unparked >= num_waiting_on_address); |
| 1629 | assert!(num_unparked <= num_threads_left); |
| 1630 | |
| 1631 | // Wait for all unparked threads to wake up and update num_awake + last_awoken. |
| 1632 | while self.num_awake.load(Ordering::SeqCst) |
| 1633 | != num_awake_before_unpark + num_unparked |
| 1634 | { |
| 1635 | thread::yield_now() |
| 1636 | } |
| 1637 | |
| 1638 | num_threads_left = num_threads_left.checked_sub(num_unparked).unwrap(); |
| 1639 | } |
| 1640 | // By now, all threads should have been woken up |
| 1641 | assert_eq!(self.num_awake.load(Ordering::SeqCst), self.num_threads); |
| 1642 | |
| 1643 | // Make sure no thread is parked on our semaphore address |
| 1644 | let mut num_waiting_on_address = 0; |
| 1645 | for_each(self.semaphore_addr(), |_thread_data| { |
| 1646 | num_waiting_on_address += 1; |
| 1647 | }); |
| 1648 | assert_eq!(num_waiting_on_address, 0); |
| 1649 | } |
| 1650 | |
| 1651 | pub fn down(&self) { |
| 1652 | let old_semaphore_value = self.semaphore.fetch_sub(1, Ordering::SeqCst); |
| 1653 | |
| 1654 | if old_semaphore_value > 0 { |
| 1655 | // We acquired the semaphore. Done. |
| 1656 | return; |
| 1657 | } |
| 1658 | |
| 1659 | // We need to wait. |
| 1660 | let validate = || true; |
| 1661 | let before_sleep = || {}; |
| 1662 | let timed_out = |_, _| {}; |
| 1663 | unsafe { |
| 1664 | super::park( |
| 1665 | self.semaphore_addr(), |
| 1666 | validate, |
| 1667 | before_sleep, |
| 1668 | timed_out, |
| 1669 | DEFAULT_PARK_TOKEN, |
| 1670 | None, |
| 1671 | ); |
| 1672 | } |
| 1673 | } |
| 1674 | |
| 1675 | pub fn up(&self) { |
| 1676 | let old_semaphore_value = self.semaphore.fetch_add(1, Ordering::SeqCst); |
| 1677 | |
| 1678 | // Check if anyone was waiting on the semaphore. If they were, then pass ownership to them. |
| 1679 | if old_semaphore_value < 0 { |
| 1680 | // We need to continue until we have actually unparked someone. It might be that |
| 1681 | // the thread we want to pass ownership to has decremented the semaphore counter, |
| 1682 | // but not yet parked. |
| 1683 | loop { |
| 1684 | match unsafe { |
| 1685 | super::unpark_one(self.semaphore_addr(), |_| DEFAULT_UNPARK_TOKEN) |
| 1686 | .unparked_threads |
| 1687 | } { |
| 1688 | 1 => break, |
| 1689 | 0 => (), |
| 1690 | i => panic!("Should not wake up {} threads" , i), |
| 1691 | } |
| 1692 | } |
| 1693 | } |
| 1694 | } |
| 1695 | |
| 1696 | fn semaphore_addr(&self) -> usize { |
| 1697 | &self.semaphore as *const _ as usize |
| 1698 | } |
| 1699 | } |
| 1700 | } |
| 1701 | |