| 1 | //! Run-queue structures to support a work-stealing scheduler |
| 2 | |
| 3 | use crate::loom::cell::UnsafeCell; |
| 4 | use crate::loom::sync::Arc; |
| 5 | use crate::runtime::scheduler::multi_thread::{Overflow, Stats}; |
| 6 | use crate::runtime::task; |
| 7 | |
| 8 | use std::mem::{self, MaybeUninit}; |
| 9 | use std::ptr; |
| 10 | use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; |
| 11 | |
| 12 | // Use wider integers when possible to increase ABA resilience. |
| 13 | // |
| 14 | // See issue #5041: <https://github.com/tokio-rs/tokio/issues/5041>. |
| 15 | cfg_has_atomic_u64! { |
| 16 | type UnsignedShort = u32; |
| 17 | type UnsignedLong = u64; |
| 18 | type AtomicUnsignedShort = crate::loom::sync::atomic::AtomicU32; |
| 19 | type AtomicUnsignedLong = crate::loom::sync::atomic::AtomicU64; |
| 20 | } |
| 21 | cfg_not_has_atomic_u64! { |
| 22 | type UnsignedShort = u16; |
| 23 | type UnsignedLong = u32; |
| 24 | type AtomicUnsignedShort = crate::loom::sync::atomic::AtomicU16; |
| 25 | type AtomicUnsignedLong = crate::loom::sync::atomic::AtomicU32; |
| 26 | } |
| 27 | |
| 28 | /// Producer handle. May only be used from a single thread. |
| 29 | pub(crate) struct Local<T: 'static> { |
| 30 | inner: Arc<Inner<T>>, |
| 31 | } |
| 32 | |
| 33 | /// Consumer handle. May be used from many threads. |
| 34 | pub(crate) struct Steal<T: 'static>(Arc<Inner<T>>); |
| 35 | |
| 36 | pub(crate) struct Inner<T: 'static> { |
| 37 | /// Concurrently updated by many threads. |
| 38 | /// |
| 39 | /// Contains two `UnsignedShort` values. The `LSB` byte is the "real" head of |
| 40 | /// the queue. The `UnsignedShort` in the `MSB` is set by a stealer in process |
| 41 | /// of stealing values. It represents the first value being stolen in the |
| 42 | /// batch. The `UnsignedShort` indices are intentionally wider than strictly |
| 43 | /// required for buffer indexing in order to provide ABA mitigation and make |
| 44 | /// it possible to distinguish between full and empty buffers. |
| 45 | /// |
| 46 | /// When both `UnsignedShort` values are the same, there is no active |
| 47 | /// stealer. |
| 48 | /// |
| 49 | /// Tracking an in-progress stealer prevents a wrapping scenario. |
| 50 | head: AtomicUnsignedLong, |
| 51 | |
| 52 | /// Only updated by producer thread but read by many threads. |
| 53 | tail: AtomicUnsignedShort, |
| 54 | |
| 55 | /// Elements |
| 56 | buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>; LOCAL_QUEUE_CAPACITY]>, |
| 57 | } |
| 58 | |
| 59 | unsafe impl<T> Send for Inner<T> {} |
| 60 | unsafe impl<T> Sync for Inner<T> {} |
| 61 | |
| 62 | #[cfg (not(loom))] |
| 63 | const LOCAL_QUEUE_CAPACITY: usize = 256; |
| 64 | |
| 65 | // Shrink the size of the local queue when using loom. This shouldn't impact |
| 66 | // logic, but allows loom to test more edge cases in a reasonable a mount of |
| 67 | // time. |
| 68 | #[cfg (loom)] |
| 69 | const LOCAL_QUEUE_CAPACITY: usize = 4; |
| 70 | |
| 71 | const MASK: usize = LOCAL_QUEUE_CAPACITY - 1; |
| 72 | |
| 73 | // Constructing the fixed size array directly is very awkward. The only way to |
| 74 | // do it is to repeat `UnsafeCell::new(MaybeUninit::uninit())` 256 times, as |
| 75 | // the contents are not Copy. The trick with defining a const doesn't work for |
| 76 | // generic types. |
| 77 | fn make_fixed_size<T>(buffer: Box<[T]>) -> Box<[T; LOCAL_QUEUE_CAPACITY]> { |
| 78 | assert_eq!(buffer.len(), LOCAL_QUEUE_CAPACITY); |
| 79 | |
| 80 | // safety: We check that the length is correct. |
| 81 | unsafe { Box::from_raw(Box::into_raw(buffer).cast()) } |
| 82 | } |
| 83 | |
| 84 | /// Create a new local run-queue |
| 85 | pub(crate) fn local<T: 'static>() -> (Steal<T>, Local<T>) { |
| 86 | let mut buffer: Vec>> = Vec::with_capacity(LOCAL_QUEUE_CAPACITY); |
| 87 | |
| 88 | for _ in 0..LOCAL_QUEUE_CAPACITY { |
| 89 | buffer.push(UnsafeCell::new(data:MaybeUninit::uninit())); |
| 90 | } |
| 91 | |
| 92 | let inner: Arc> = Arc::new(data:Inner { |
| 93 | head: AtomicUnsignedLong::new(0), |
| 94 | tail: AtomicUnsignedShort::new(val:0), |
| 95 | buffer: make_fixed_size(buffer.into_boxed_slice()), |
| 96 | }); |
| 97 | |
| 98 | let local: Local = Local { |
| 99 | inner: inner.clone(), |
| 100 | }; |
| 101 | |
| 102 | let remote: Steal = Steal(inner); |
| 103 | |
| 104 | (remote, local) |
| 105 | } |
| 106 | |
| 107 | impl<T> Local<T> { |
| 108 | /// Returns the number of entries in the queue |
| 109 | pub(crate) fn len(&self) -> usize { |
| 110 | self.inner.len() as usize |
| 111 | } |
| 112 | |
| 113 | /// How many tasks can be pushed into the queue |
| 114 | pub(crate) fn remaining_slots(&self) -> usize { |
| 115 | self.inner.remaining_slots() |
| 116 | } |
| 117 | |
| 118 | pub(crate) fn max_capacity(&self) -> usize { |
| 119 | LOCAL_QUEUE_CAPACITY |
| 120 | } |
| 121 | |
| 122 | /// Returns false if there are any entries in the queue |
| 123 | /// |
| 124 | /// Separate to `is_stealable` so that refactors of `is_stealable` to "protect" |
| 125 | /// some tasks from stealing won't affect this |
| 126 | pub(crate) fn has_tasks(&self) -> bool { |
| 127 | !self.inner.is_empty() |
| 128 | } |
| 129 | |
| 130 | /// Pushes a batch of tasks to the back of the queue. All tasks must fit in |
| 131 | /// the local queue. |
| 132 | /// |
| 133 | /// # Panics |
| 134 | /// |
| 135 | /// The method panics if there is not enough capacity to fit in the queue. |
| 136 | pub(crate) fn push_back(&mut self, tasks: impl ExactSizeIterator<Item = task::Notified<T>>) { |
| 137 | let len = tasks.len(); |
| 138 | assert!(len <= LOCAL_QUEUE_CAPACITY); |
| 139 | |
| 140 | if len == 0 { |
| 141 | // Nothing to do |
| 142 | return; |
| 143 | } |
| 144 | |
| 145 | let head = self.inner.head.load(Acquire); |
| 146 | let (steal, _) = unpack(head); |
| 147 | |
| 148 | // safety: this is the **only** thread that updates this cell. |
| 149 | let mut tail = unsafe { self.inner.tail.unsync_load() }; |
| 150 | |
| 151 | if tail.wrapping_sub(steal) <= (LOCAL_QUEUE_CAPACITY - len) as UnsignedShort { |
| 152 | // Yes, this if condition is structured a bit weird (first block |
| 153 | // does nothing, second returns an error). It is this way to match |
| 154 | // `push_back_or_overflow`. |
| 155 | } else { |
| 156 | panic!() |
| 157 | } |
| 158 | |
| 159 | for task in tasks { |
| 160 | let idx = tail as usize & MASK; |
| 161 | |
| 162 | self.inner.buffer[idx].with_mut(|ptr| { |
| 163 | // Write the task to the slot |
| 164 | // |
| 165 | // Safety: There is only one producer and the above `if` |
| 166 | // condition ensures we don't touch a cell if there is a |
| 167 | // value, thus no consumer. |
| 168 | unsafe { |
| 169 | ptr::write((*ptr).as_mut_ptr(), task); |
| 170 | } |
| 171 | }); |
| 172 | |
| 173 | tail = tail.wrapping_add(1); |
| 174 | } |
| 175 | |
| 176 | self.inner.tail.store(tail, Release); |
| 177 | } |
| 178 | |
| 179 | /// Pushes a task to the back of the local queue, if there is not enough |
| 180 | /// capacity in the queue, this triggers the overflow operation. |
| 181 | /// |
| 182 | /// When the queue overflows, half of the current contents of the queue is |
| 183 | /// moved to the given Injection queue. This frees up capacity for more |
| 184 | /// tasks to be pushed into the local queue. |
| 185 | pub(crate) fn push_back_or_overflow<O: Overflow<T>>( |
| 186 | &mut self, |
| 187 | mut task: task::Notified<T>, |
| 188 | overflow: &O, |
| 189 | stats: &mut Stats, |
| 190 | ) { |
| 191 | let tail = loop { |
| 192 | let head = self.inner.head.load(Acquire); |
| 193 | let (steal, real) = unpack(head); |
| 194 | |
| 195 | // safety: this is the **only** thread that updates this cell. |
| 196 | let tail = unsafe { self.inner.tail.unsync_load() }; |
| 197 | |
| 198 | if tail.wrapping_sub(steal) < LOCAL_QUEUE_CAPACITY as UnsignedShort { |
| 199 | // There is capacity for the task |
| 200 | break tail; |
| 201 | } else if steal != real { |
| 202 | // Concurrently stealing, this will free up capacity, so only |
| 203 | // push the task onto the inject queue |
| 204 | overflow.push(task); |
| 205 | return; |
| 206 | } else { |
| 207 | // Push the current task and half of the queue into the |
| 208 | // inject queue. |
| 209 | match self.push_overflow(task, real, tail, overflow, stats) { |
| 210 | Ok(_) => return, |
| 211 | // Lost the race, try again |
| 212 | Err(v) => { |
| 213 | task = v; |
| 214 | } |
| 215 | } |
| 216 | } |
| 217 | }; |
| 218 | |
| 219 | self.push_back_finish(task, tail); |
| 220 | } |
| 221 | |
| 222 | // Second half of `push_back` |
| 223 | fn push_back_finish(&self, task: task::Notified<T>, tail: UnsignedShort) { |
| 224 | // Map the position to a slot index. |
| 225 | let idx = tail as usize & MASK; |
| 226 | |
| 227 | self.inner.buffer[idx].with_mut(|ptr| { |
| 228 | // Write the task to the slot |
| 229 | // |
| 230 | // Safety: There is only one producer and the above `if` |
| 231 | // condition ensures we don't touch a cell if there is a |
| 232 | // value, thus no consumer. |
| 233 | unsafe { |
| 234 | ptr::write((*ptr).as_mut_ptr(), task); |
| 235 | } |
| 236 | }); |
| 237 | |
| 238 | // Make the task available. Synchronizes with a load in |
| 239 | // `steal_into2`. |
| 240 | self.inner.tail.store(tail.wrapping_add(1), Release); |
| 241 | } |
| 242 | |
| 243 | /// Moves a batch of tasks into the inject queue. |
| 244 | /// |
| 245 | /// This will temporarily make some of the tasks unavailable to stealers. |
| 246 | /// Once `push_overflow` is done, a notification is sent out, so if other |
| 247 | /// workers "missed" some of the tasks during a steal, they will get |
| 248 | /// another opportunity. |
| 249 | #[inline (never)] |
| 250 | fn push_overflow<O: Overflow<T>>( |
| 251 | &mut self, |
| 252 | task: task::Notified<T>, |
| 253 | head: UnsignedShort, |
| 254 | tail: UnsignedShort, |
| 255 | overflow: &O, |
| 256 | stats: &mut Stats, |
| 257 | ) -> Result<(), task::Notified<T>> { |
| 258 | /// How many elements are we taking from the local queue. |
| 259 | /// |
| 260 | /// This is one less than the number of tasks pushed to the inject |
| 261 | /// queue as we are also inserting the `task` argument. |
| 262 | const NUM_TASKS_TAKEN: UnsignedShort = (LOCAL_QUEUE_CAPACITY / 2) as UnsignedShort; |
| 263 | |
| 264 | assert_eq!( |
| 265 | tail.wrapping_sub(head) as usize, |
| 266 | LOCAL_QUEUE_CAPACITY, |
| 267 | "queue is not full; tail = {tail}; head = {head}" |
| 268 | ); |
| 269 | |
| 270 | let prev = pack(head, head); |
| 271 | |
| 272 | // Claim a bunch of tasks |
| 273 | // |
| 274 | // We are claiming the tasks **before** reading them out of the buffer. |
| 275 | // This is safe because only the **current** thread is able to push new |
| 276 | // tasks. |
| 277 | // |
| 278 | // There isn't really any need for memory ordering... Relaxed would |
| 279 | // work. This is because all tasks are pushed into the queue from the |
| 280 | // current thread (or memory has been acquired if the local queue handle |
| 281 | // moved). |
| 282 | if self |
| 283 | .inner |
| 284 | .head |
| 285 | .compare_exchange( |
| 286 | prev, |
| 287 | pack( |
| 288 | head.wrapping_add(NUM_TASKS_TAKEN), |
| 289 | head.wrapping_add(NUM_TASKS_TAKEN), |
| 290 | ), |
| 291 | Release, |
| 292 | Relaxed, |
| 293 | ) |
| 294 | .is_err() |
| 295 | { |
| 296 | // We failed to claim the tasks, losing the race. Return out of |
| 297 | // this function and try the full `push` routine again. The queue |
| 298 | // may not be full anymore. |
| 299 | return Err(task); |
| 300 | } |
| 301 | |
| 302 | /// An iterator that takes elements out of the run queue. |
| 303 | struct BatchTaskIter<'a, T: 'static> { |
| 304 | buffer: &'a [UnsafeCell<MaybeUninit<task::Notified<T>>>; LOCAL_QUEUE_CAPACITY], |
| 305 | head: UnsignedLong, |
| 306 | i: UnsignedLong, |
| 307 | } |
| 308 | impl<'a, T: 'static> Iterator for BatchTaskIter<'a, T> { |
| 309 | type Item = task::Notified<T>; |
| 310 | |
| 311 | #[inline ] |
| 312 | fn next(&mut self) -> Option<task::Notified<T>> { |
| 313 | if self.i == UnsignedLong::from(NUM_TASKS_TAKEN) { |
| 314 | None |
| 315 | } else { |
| 316 | let i_idx = self.i.wrapping_add(self.head) as usize & MASK; |
| 317 | let slot = &self.buffer[i_idx]; |
| 318 | |
| 319 | // safety: Our CAS from before has assumed exclusive ownership |
| 320 | // of the task pointers in this range. |
| 321 | let task = slot.with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); |
| 322 | |
| 323 | self.i += 1; |
| 324 | Some(task) |
| 325 | } |
| 326 | } |
| 327 | } |
| 328 | |
| 329 | // safety: The CAS above ensures that no consumer will look at these |
| 330 | // values again, and we are the only producer. |
| 331 | let batch_iter = BatchTaskIter { |
| 332 | buffer: &self.inner.buffer, |
| 333 | head: head as UnsignedLong, |
| 334 | i: 0, |
| 335 | }; |
| 336 | overflow.push_batch(batch_iter.chain(std::iter::once(task))); |
| 337 | |
| 338 | // Add 1 to factor in the task currently being scheduled. |
| 339 | stats.incr_overflow_count(); |
| 340 | |
| 341 | Ok(()) |
| 342 | } |
| 343 | |
| 344 | /// Pops a task from the local queue. |
| 345 | pub(crate) fn pop(&mut self) -> Option<task::Notified<T>> { |
| 346 | let mut head = self.inner.head.load(Acquire); |
| 347 | |
| 348 | let idx = loop { |
| 349 | let (steal, real) = unpack(head); |
| 350 | |
| 351 | // safety: this is the **only** thread that updates this cell. |
| 352 | let tail = unsafe { self.inner.tail.unsync_load() }; |
| 353 | |
| 354 | if real == tail { |
| 355 | // queue is empty |
| 356 | return None; |
| 357 | } |
| 358 | |
| 359 | let next_real = real.wrapping_add(1); |
| 360 | |
| 361 | // If `steal == real` there are no concurrent stealers. Both `steal` |
| 362 | // and `real` are updated. |
| 363 | let next = if steal == real { |
| 364 | pack(next_real, next_real) |
| 365 | } else { |
| 366 | assert_ne!(steal, next_real); |
| 367 | pack(steal, next_real) |
| 368 | }; |
| 369 | |
| 370 | // Attempt to claim a task. |
| 371 | let res = self |
| 372 | .inner |
| 373 | .head |
| 374 | .compare_exchange(head, next, AcqRel, Acquire); |
| 375 | |
| 376 | match res { |
| 377 | Ok(_) => break real as usize & MASK, |
| 378 | Err(actual) => head = actual, |
| 379 | } |
| 380 | }; |
| 381 | |
| 382 | Some(self.inner.buffer[idx].with(|ptr| unsafe { ptr::read(ptr).assume_init() })) |
| 383 | } |
| 384 | } |
| 385 | |
| 386 | impl<T> Steal<T> { |
| 387 | pub(crate) fn is_empty(&self) -> bool { |
| 388 | self.0.is_empty() |
| 389 | } |
| 390 | |
| 391 | /// Steals half the tasks from self and place them into `dst`. |
| 392 | pub(crate) fn steal_into( |
| 393 | &self, |
| 394 | dst: &mut Local<T>, |
| 395 | dst_stats: &mut Stats, |
| 396 | ) -> Option<task::Notified<T>> { |
| 397 | // Safety: the caller is the only thread that mutates `dst.tail` and |
| 398 | // holds a mutable reference. |
| 399 | let dst_tail = unsafe { dst.inner.tail.unsync_load() }; |
| 400 | |
| 401 | // To the caller, `dst` may **look** empty but still have values |
| 402 | // contained in the buffer. If another thread is concurrently stealing |
| 403 | // from `dst` there may not be enough capacity to steal. |
| 404 | let (steal, _) = unpack(dst.inner.head.load(Acquire)); |
| 405 | |
| 406 | if dst_tail.wrapping_sub(steal) > LOCAL_QUEUE_CAPACITY as UnsignedShort / 2 { |
| 407 | // we *could* try to steal less here, but for simplicity, we're just |
| 408 | // going to abort. |
| 409 | return None; |
| 410 | } |
| 411 | |
| 412 | // Steal the tasks into `dst`'s buffer. This does not yet expose the |
| 413 | // tasks in `dst`. |
| 414 | let mut n = self.steal_into2(dst, dst_tail); |
| 415 | |
| 416 | if n == 0 { |
| 417 | // No tasks were stolen |
| 418 | return None; |
| 419 | } |
| 420 | |
| 421 | dst_stats.incr_steal_count(n as u16); |
| 422 | dst_stats.incr_steal_operations(); |
| 423 | |
| 424 | // We are returning a task here |
| 425 | n -= 1; |
| 426 | |
| 427 | let ret_pos = dst_tail.wrapping_add(n); |
| 428 | let ret_idx = ret_pos as usize & MASK; |
| 429 | |
| 430 | // safety: the value was written as part of `steal_into2` and not |
| 431 | // exposed to stealers, so no other thread can access it. |
| 432 | let ret = dst.inner.buffer[ret_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); |
| 433 | |
| 434 | if n == 0 { |
| 435 | // The `dst` queue is empty, but a single task was stolen |
| 436 | return Some(ret); |
| 437 | } |
| 438 | |
| 439 | // Make the stolen items available to consumers |
| 440 | dst.inner.tail.store(dst_tail.wrapping_add(n), Release); |
| 441 | |
| 442 | Some(ret) |
| 443 | } |
| 444 | |
| 445 | // Steal tasks from `self`, placing them into `dst`. Returns the number of |
| 446 | // tasks that were stolen. |
| 447 | fn steal_into2(&self, dst: &mut Local<T>, dst_tail: UnsignedShort) -> UnsignedShort { |
| 448 | let mut prev_packed = self.0.head.load(Acquire); |
| 449 | let mut next_packed; |
| 450 | |
| 451 | let n = loop { |
| 452 | let (src_head_steal, src_head_real) = unpack(prev_packed); |
| 453 | let src_tail = self.0.tail.load(Acquire); |
| 454 | |
| 455 | // If these two do not match, another thread is concurrently |
| 456 | // stealing from the queue. |
| 457 | if src_head_steal != src_head_real { |
| 458 | return 0; |
| 459 | } |
| 460 | |
| 461 | // Number of available tasks to steal |
| 462 | let n = src_tail.wrapping_sub(src_head_real); |
| 463 | let n = n - n / 2; |
| 464 | |
| 465 | if n == 0 { |
| 466 | // No tasks available to steal |
| 467 | return 0; |
| 468 | } |
| 469 | |
| 470 | // Update the real head index to acquire the tasks. |
| 471 | let steal_to = src_head_real.wrapping_add(n); |
| 472 | assert_ne!(src_head_steal, steal_to); |
| 473 | next_packed = pack(src_head_steal, steal_to); |
| 474 | |
| 475 | // Claim all those tasks. This is done by incrementing the "real" |
| 476 | // head but not the steal. By doing this, no other thread is able to |
| 477 | // steal from this queue until the current thread completes. |
| 478 | let res = self |
| 479 | .0 |
| 480 | .head |
| 481 | .compare_exchange(prev_packed, next_packed, AcqRel, Acquire); |
| 482 | |
| 483 | match res { |
| 484 | Ok(_) => break n, |
| 485 | Err(actual) => prev_packed = actual, |
| 486 | } |
| 487 | }; |
| 488 | |
| 489 | assert!( |
| 490 | n <= LOCAL_QUEUE_CAPACITY as UnsignedShort / 2, |
| 491 | "actual = {n}" |
| 492 | ); |
| 493 | |
| 494 | let (first, _) = unpack(next_packed); |
| 495 | |
| 496 | // Take all the tasks |
| 497 | for i in 0..n { |
| 498 | // Compute the positions |
| 499 | let src_pos = first.wrapping_add(i); |
| 500 | let dst_pos = dst_tail.wrapping_add(i); |
| 501 | |
| 502 | // Map to slots |
| 503 | let src_idx = src_pos as usize & MASK; |
| 504 | let dst_idx = dst_pos as usize & MASK; |
| 505 | |
| 506 | // Read the task |
| 507 | // |
| 508 | // safety: We acquired the task with the atomic exchange above. |
| 509 | let task = self.0.buffer[src_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); |
| 510 | |
| 511 | // Write the task to the new slot |
| 512 | // |
| 513 | // safety: `dst` queue is empty and we are the only producer to |
| 514 | // this queue. |
| 515 | dst.inner.buffer[dst_idx] |
| 516 | .with_mut(|ptr| unsafe { ptr::write((*ptr).as_mut_ptr(), task) }); |
| 517 | } |
| 518 | |
| 519 | let mut prev_packed = next_packed; |
| 520 | |
| 521 | // Update `src_head_steal` to match `src_head_real` signalling that the |
| 522 | // stealing routine is complete. |
| 523 | loop { |
| 524 | let head = unpack(prev_packed).1; |
| 525 | next_packed = pack(head, head); |
| 526 | |
| 527 | let res = self |
| 528 | .0 |
| 529 | .head |
| 530 | .compare_exchange(prev_packed, next_packed, AcqRel, Acquire); |
| 531 | |
| 532 | match res { |
| 533 | Ok(_) => return n, |
| 534 | Err(actual) => { |
| 535 | let (actual_steal, actual_real) = unpack(actual); |
| 536 | |
| 537 | assert_ne!(actual_steal, actual_real); |
| 538 | |
| 539 | prev_packed = actual; |
| 540 | } |
| 541 | } |
| 542 | } |
| 543 | } |
| 544 | } |
| 545 | |
| 546 | cfg_unstable_metrics! { |
| 547 | impl<T> Steal<T> { |
| 548 | pub(crate) fn len(&self) -> usize { |
| 549 | self.0.len() as _ |
| 550 | } |
| 551 | } |
| 552 | } |
| 553 | |
| 554 | impl<T> Clone for Steal<T> { |
| 555 | fn clone(&self) -> Steal<T> { |
| 556 | Steal(self.0.clone()) |
| 557 | } |
| 558 | } |
| 559 | |
| 560 | impl<T> Drop for Local<T> { |
| 561 | fn drop(&mut self) { |
| 562 | if !std::thread::panicking() { |
| 563 | assert!(self.pop().is_none(), "queue not empty" ); |
| 564 | } |
| 565 | } |
| 566 | } |
| 567 | |
| 568 | impl<T> Inner<T> { |
| 569 | fn remaining_slots(&self) -> usize { |
| 570 | let (steal: u32, _) = unpack(self.head.load(order:Acquire)); |
| 571 | let tail: u32 = self.tail.load(order:Acquire); |
| 572 | |
| 573 | LOCAL_QUEUE_CAPACITY - (tail.wrapping_sub(steal) as usize) |
| 574 | } |
| 575 | |
| 576 | fn len(&self) -> UnsignedShort { |
| 577 | let (_, head: u32) = unpack(self.head.load(order:Acquire)); |
| 578 | let tail: u32 = self.tail.load(order:Acquire); |
| 579 | |
| 580 | tail.wrapping_sub(head) |
| 581 | } |
| 582 | |
| 583 | fn is_empty(&self) -> bool { |
| 584 | self.len() == 0 |
| 585 | } |
| 586 | } |
| 587 | |
| 588 | /// Split the head value into the real head and the index a stealer is working |
| 589 | /// on. |
| 590 | fn unpack(n: UnsignedLong) -> (UnsignedShort, UnsignedShort) { |
| 591 | let real: u64 = n & UnsignedShort::MAX as UnsignedLong; |
| 592 | let steal: u64 = n >> (mem::size_of::<UnsignedShort>() * 8); |
| 593 | |
| 594 | (steal as UnsignedShort, real as UnsignedShort) |
| 595 | } |
| 596 | |
| 597 | /// Join the two head values |
| 598 | fn pack(steal: UnsignedShort, real: UnsignedShort) -> UnsignedLong { |
| 599 | (real as UnsignedLong) | ((steal as UnsignedLong) << (mem::size_of::<UnsignedShort>() * 8)) |
| 600 | } |
| 601 | |
| 602 | #[test ] |
| 603 | fn test_local_queue_capacity() { |
| 604 | assert!(LOCAL_QUEUE_CAPACITY - 1 <= u8::MAX as usize); |
| 605 | } |
| 606 | |