| 1 | use crate::loom::cell::UnsafeCell; |
| 2 | use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize}; |
| 3 | |
| 4 | use std::alloc::Layout; |
| 5 | use std::mem::MaybeUninit; |
| 6 | use std::ops; |
| 7 | use std::ptr::{self, NonNull}; |
| 8 | use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release}; |
| 9 | |
| 10 | /// A block in a linked list. |
| 11 | /// |
| 12 | /// Each block in the list can hold up to `BLOCK_CAP` messages. |
| 13 | pub(crate) struct Block<T> { |
| 14 | /// The header fields. |
| 15 | header: BlockHeader<T>, |
| 16 | |
| 17 | /// Array containing values pushed into the block. Values are stored in a |
| 18 | /// continuous array in order to improve cache line behavior when reading. |
| 19 | /// The values must be manually dropped. |
| 20 | values: Values<T>, |
| 21 | } |
| 22 | |
| 23 | /// Extra fields for a `Block<T>`. |
| 24 | struct BlockHeader<T> { |
| 25 | /// The start index of this block. |
| 26 | /// |
| 27 | /// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`. |
| 28 | start_index: usize, |
| 29 | |
| 30 | /// The next block in the linked list. |
| 31 | next: AtomicPtr<Block<T>>, |
| 32 | |
| 33 | /// Bitfield tracking slots that are ready to have their values consumed. |
| 34 | ready_slots: AtomicUsize, |
| 35 | |
| 36 | /// The observed `tail_position` value *after* the block has been passed by |
| 37 | /// `block_tail`. |
| 38 | observed_tail_position: UnsafeCell<usize>, |
| 39 | } |
| 40 | |
| 41 | pub(crate) enum Read<T> { |
| 42 | Value(T), |
| 43 | Closed, |
| 44 | } |
| 45 | |
| 46 | #[repr (transparent)] |
| 47 | struct Values<T>([UnsafeCell<MaybeUninit<T>>; BLOCK_CAP]); |
| 48 | |
| 49 | use super::BLOCK_CAP; |
| 50 | |
| 51 | /// Masks an index to get the block identifier. |
| 52 | const BLOCK_MASK: usize = !(BLOCK_CAP - 1); |
| 53 | |
| 54 | /// Masks an index to get the value offset in a block. |
| 55 | const SLOT_MASK: usize = BLOCK_CAP - 1; |
| 56 | |
| 57 | /// Flag tracking that a block has gone through the sender's release routine. |
| 58 | /// |
| 59 | /// When this is set, the receiver may consider freeing the block. |
| 60 | const RELEASED: usize = 1 << BLOCK_CAP; |
| 61 | |
| 62 | /// Flag tracking all senders dropped. |
| 63 | /// |
| 64 | /// When this flag is set, the send half of the channel has closed. |
| 65 | const TX_CLOSED: usize = RELEASED << 1; |
| 66 | |
| 67 | /// Mask covering all bits used to track slot readiness. |
| 68 | const READY_MASK: usize = RELEASED - 1; |
| 69 | |
| 70 | /// Returns the index of the first slot in the block referenced by `slot_index`. |
| 71 | #[inline (always)] |
| 72 | pub(crate) fn start_index(slot_index: usize) -> usize { |
| 73 | BLOCK_MASK & slot_index |
| 74 | } |
| 75 | |
| 76 | /// Returns the offset into the block referenced by `slot_index`. |
| 77 | #[inline (always)] |
| 78 | pub(crate) fn offset(slot_index: usize) -> usize { |
| 79 | SLOT_MASK & slot_index |
| 80 | } |
| 81 | |
| 82 | generate_addr_of_methods! { |
| 83 | impl<T> Block<T> { |
| 84 | unsafe fn addr_of_header(self: NonNull<Self>) -> NonNull<BlockHeader<T>> { |
| 85 | &self.header |
| 86 | } |
| 87 | |
| 88 | unsafe fn addr_of_values(self: NonNull<Self>) -> NonNull<Values<T>> { |
| 89 | &self.values |
| 90 | } |
| 91 | } |
| 92 | } |
| 93 | |
| 94 | impl<T> Block<T> { |
| 95 | pub(crate) fn new(start_index: usize) -> Box<Block<T>> { |
| 96 | unsafe { |
| 97 | // Allocate the block on the heap. |
| 98 | // SAFETY: The size of the Block<T> is non-zero, since it is at least the size of the header. |
| 99 | let block = std::alloc::alloc(Layout::new::<Block<T>>()) as *mut Block<T>; |
| 100 | let block = match NonNull::new(block) { |
| 101 | Some(block) => block, |
| 102 | None => std::alloc::handle_alloc_error(Layout::new::<Block<T>>()), |
| 103 | }; |
| 104 | |
| 105 | // Write the header to the block. |
| 106 | Block::addr_of_header(block).as_ptr().write(BlockHeader { |
| 107 | // The absolute index in the channel of the first slot in the block. |
| 108 | start_index, |
| 109 | |
| 110 | // Pointer to the next block in the linked list. |
| 111 | next: AtomicPtr::new(ptr::null_mut()), |
| 112 | |
| 113 | ready_slots: AtomicUsize::new(0), |
| 114 | |
| 115 | observed_tail_position: UnsafeCell::new(0), |
| 116 | }); |
| 117 | |
| 118 | // Initialize the values array. |
| 119 | Values::initialize(Block::addr_of_values(block)); |
| 120 | |
| 121 | // Convert the pointer to a `Box`. |
| 122 | // Safety: The raw pointer was allocated using the global allocator, and with |
| 123 | // the layout for a `Block<T>`, so it's valid to convert it to box. |
| 124 | Box::from_raw(block.as_ptr()) |
| 125 | } |
| 126 | } |
| 127 | |
| 128 | /// Returns `true` if the block matches the given index. |
| 129 | pub(crate) fn is_at_index(&self, index: usize) -> bool { |
| 130 | debug_assert!(offset(index) == 0); |
| 131 | self.header.start_index == index |
| 132 | } |
| 133 | |
| 134 | /// Returns the number of blocks between `self` and the block at the |
| 135 | /// specified index. |
| 136 | /// |
| 137 | /// `start_index` must represent a block *after* `self`. |
| 138 | pub(crate) fn distance(&self, other_index: usize) -> usize { |
| 139 | debug_assert!(offset(other_index) == 0); |
| 140 | other_index.wrapping_sub(self.header.start_index) / BLOCK_CAP |
| 141 | } |
| 142 | |
| 143 | /// Reads the value at the given offset. |
| 144 | /// |
| 145 | /// Returns `None` if the slot is empty. |
| 146 | /// |
| 147 | /// # Safety |
| 148 | /// |
| 149 | /// To maintain safety, the caller must ensure: |
| 150 | /// |
| 151 | /// * No concurrent access to the slot. |
| 152 | pub(crate) unsafe fn read(&self, slot_index: usize) -> Option<Read<T>> { |
| 153 | let offset = offset(slot_index); |
| 154 | |
| 155 | let ready_bits = self.header.ready_slots.load(Acquire); |
| 156 | |
| 157 | if !is_ready(ready_bits, offset) { |
| 158 | if is_tx_closed(ready_bits) { |
| 159 | return Some(Read::Closed); |
| 160 | } |
| 161 | |
| 162 | return None; |
| 163 | } |
| 164 | |
| 165 | // Get the value |
| 166 | let value = self.values[offset].with(|ptr| ptr::read(ptr)); |
| 167 | |
| 168 | Some(Read::Value(value.assume_init())) |
| 169 | } |
| 170 | |
| 171 | /// Returns true if *this* block has a value in the given slot. |
| 172 | /// |
| 173 | /// Always returns false when given an index from a different block. |
| 174 | pub(crate) fn has_value(&self, slot_index: usize) -> bool { |
| 175 | if slot_index < self.header.start_index { |
| 176 | return false; |
| 177 | } |
| 178 | if slot_index >= self.header.start_index + super::BLOCK_CAP { |
| 179 | return false; |
| 180 | } |
| 181 | |
| 182 | let offset = offset(slot_index); |
| 183 | let ready_bits = self.header.ready_slots.load(Acquire); |
| 184 | is_ready(ready_bits, offset) |
| 185 | } |
| 186 | |
| 187 | /// Writes a value to the block at the given offset. |
| 188 | /// |
| 189 | /// # Safety |
| 190 | /// |
| 191 | /// To maintain safety, the caller must ensure: |
| 192 | /// |
| 193 | /// * The slot is empty. |
| 194 | /// * No concurrent access to the slot. |
| 195 | pub(crate) unsafe fn write(&self, slot_index: usize, value: T) { |
| 196 | // Get the offset into the block |
| 197 | let slot_offset = offset(slot_index); |
| 198 | |
| 199 | self.values[slot_offset].with_mut(|ptr| { |
| 200 | ptr::write(ptr, MaybeUninit::new(value)); |
| 201 | }); |
| 202 | |
| 203 | // Release the value. After this point, the slot ref may no longer |
| 204 | // be used. It is possible for the receiver to free the memory at |
| 205 | // any point. |
| 206 | self.set_ready(slot_offset); |
| 207 | } |
| 208 | |
| 209 | /// Signal to the receiver that the sender half of the list is closed. |
| 210 | pub(crate) unsafe fn tx_close(&self) { |
| 211 | self.header.ready_slots.fetch_or(TX_CLOSED, Release); |
| 212 | } |
| 213 | |
| 214 | pub(crate) unsafe fn is_closed(&self) -> bool { |
| 215 | let ready_bits = self.header.ready_slots.load(Acquire); |
| 216 | is_tx_closed(ready_bits) |
| 217 | } |
| 218 | |
| 219 | /// Resets the block to a blank state. This enables reusing blocks in the |
| 220 | /// channel. |
| 221 | /// |
| 222 | /// # Safety |
| 223 | /// |
| 224 | /// To maintain safety, the caller must ensure: |
| 225 | /// |
| 226 | /// * All slots are empty. |
| 227 | /// * The caller holds a unique pointer to the block. |
| 228 | pub(crate) unsafe fn reclaim(&mut self) { |
| 229 | self.header.start_index = 0; |
| 230 | self.header.next = AtomicPtr::new(ptr::null_mut()); |
| 231 | self.header.ready_slots = AtomicUsize::new(0); |
| 232 | } |
| 233 | |
| 234 | /// Releases the block to the rx half for freeing. |
| 235 | /// |
| 236 | /// This function is called by the tx half once it can be guaranteed that no |
| 237 | /// more senders will attempt to access the block. |
| 238 | /// |
| 239 | /// # Safety |
| 240 | /// |
| 241 | /// To maintain safety, the caller must ensure: |
| 242 | /// |
| 243 | /// * The block will no longer be accessed by any sender. |
| 244 | pub(crate) unsafe fn tx_release(&self, tail_position: usize) { |
| 245 | // Track the observed tail_position. Any sender targeting a greater |
| 246 | // tail_position is guaranteed to not access this block. |
| 247 | self.header |
| 248 | .observed_tail_position |
| 249 | .with_mut(|ptr| *ptr = tail_position); |
| 250 | |
| 251 | // Set the released bit, signalling to the receiver that it is safe to |
| 252 | // free the block's memory as soon as all slots **prior** to |
| 253 | // `observed_tail_position` have been filled. |
| 254 | self.header.ready_slots.fetch_or(RELEASED, Release); |
| 255 | } |
| 256 | |
| 257 | /// Mark a slot as ready |
| 258 | fn set_ready(&self, slot: usize) { |
| 259 | let mask = 1 << slot; |
| 260 | self.header.ready_slots.fetch_or(mask, Release); |
| 261 | } |
| 262 | |
| 263 | /// Returns `true` when all slots have their `ready` bits set. |
| 264 | /// |
| 265 | /// This indicates that the block is in its final state and will no longer |
| 266 | /// be mutated. |
| 267 | pub(crate) fn is_final(&self) -> bool { |
| 268 | self.header.ready_slots.load(Acquire) & READY_MASK == READY_MASK |
| 269 | } |
| 270 | |
| 271 | /// Returns the `observed_tail_position` value, if set |
| 272 | pub(crate) fn observed_tail_position(&self) -> Option<usize> { |
| 273 | if 0 == RELEASED & self.header.ready_slots.load(Acquire) { |
| 274 | None |
| 275 | } else { |
| 276 | Some( |
| 277 | self.header |
| 278 | .observed_tail_position |
| 279 | .with(|ptr| unsafe { *ptr }), |
| 280 | ) |
| 281 | } |
| 282 | } |
| 283 | |
| 284 | /// Loads the next block |
| 285 | pub(crate) fn load_next(&self, ordering: Ordering) -> Option<NonNull<Block<T>>> { |
| 286 | let ret = NonNull::new(self.header.next.load(ordering)); |
| 287 | |
| 288 | debug_assert!(unsafe { |
| 289 | ret.map_or(true, |block| { |
| 290 | block.as_ref().header.start_index == self.header.start_index.wrapping_add(BLOCK_CAP) |
| 291 | }) |
| 292 | }); |
| 293 | |
| 294 | ret |
| 295 | } |
| 296 | |
| 297 | /// Pushes `block` as the next block in the link. |
| 298 | /// |
| 299 | /// Returns Ok if successful, otherwise, a pointer to the next block in |
| 300 | /// the list is returned. |
| 301 | /// |
| 302 | /// This requires that the next pointer is null. |
| 303 | /// |
| 304 | /// # Ordering |
| 305 | /// |
| 306 | /// This performs a compare-and-swap on `next` using `AcqRel` ordering. |
| 307 | /// |
| 308 | /// # Safety |
| 309 | /// |
| 310 | /// To maintain safety, the caller must ensure: |
| 311 | /// |
| 312 | /// * `block` is not freed until it has been removed from the list. |
| 313 | pub(crate) unsafe fn try_push( |
| 314 | &self, |
| 315 | block: &mut NonNull<Block<T>>, |
| 316 | success: Ordering, |
| 317 | failure: Ordering, |
| 318 | ) -> Result<(), NonNull<Block<T>>> { |
| 319 | block.as_mut().header.start_index = self.header.start_index.wrapping_add(BLOCK_CAP); |
| 320 | |
| 321 | let next_ptr = self |
| 322 | .header |
| 323 | .next |
| 324 | .compare_exchange(ptr::null_mut(), block.as_ptr(), success, failure) |
| 325 | .unwrap_or_else(|x| x); |
| 326 | |
| 327 | match NonNull::new(next_ptr) { |
| 328 | Some(next_ptr) => Err(next_ptr), |
| 329 | None => Ok(()), |
| 330 | } |
| 331 | } |
| 332 | |
| 333 | /// Grows the `Block` linked list by allocating and appending a new block. |
| 334 | /// |
| 335 | /// The next block in the linked list is returned. This may or may not be |
| 336 | /// the one allocated by the function call. |
| 337 | /// |
| 338 | /// # Implementation |
| 339 | /// |
| 340 | /// It is assumed that `self.next` is null. A new block is allocated with |
| 341 | /// `start_index` set to be the next block. A compare-and-swap is performed |
| 342 | /// with `AcqRel` memory ordering. If the compare-and-swap is successful, the |
| 343 | /// newly allocated block is released to other threads walking the block |
| 344 | /// linked list. If the compare-and-swap fails, the current thread acquires |
| 345 | /// the next block in the linked list, allowing the current thread to access |
| 346 | /// the slots. |
| 347 | pub(crate) fn grow(&self) -> NonNull<Block<T>> { |
| 348 | // Create the new block. It is assumed that the block will become the |
| 349 | // next one after `&self`. If this turns out to not be the case, |
| 350 | // `start_index` is updated accordingly. |
| 351 | let new_block = Block::new(self.header.start_index + BLOCK_CAP); |
| 352 | |
| 353 | let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) }; |
| 354 | |
| 355 | // Attempt to store the block. The first compare-and-swap attempt is |
| 356 | // "unrolled" due to minor differences in logic |
| 357 | // |
| 358 | // `AcqRel` is used as the ordering **only** when attempting the |
| 359 | // compare-and-swap on self.next. |
| 360 | // |
| 361 | // If the compare-and-swap fails, then the actual value of the cell is |
| 362 | // returned from this function and accessed by the caller. Given this, |
| 363 | // the memory must be acquired. |
| 364 | // |
| 365 | // `Release` ensures that the newly allocated block is available to |
| 366 | // other threads acquiring the next pointer. |
| 367 | let next = NonNull::new( |
| 368 | self.header |
| 369 | .next |
| 370 | .compare_exchange(ptr::null_mut(), new_block.as_ptr(), AcqRel, Acquire) |
| 371 | .unwrap_or_else(|x| x), |
| 372 | ); |
| 373 | |
| 374 | let next = match next { |
| 375 | Some(next) => next, |
| 376 | None => { |
| 377 | // The compare-and-swap succeeded and the newly allocated block |
| 378 | // is successfully pushed. |
| 379 | return new_block; |
| 380 | } |
| 381 | }; |
| 382 | |
| 383 | // There already is a next block in the linked list. The newly allocated |
| 384 | // block could be dropped and the discovered next block returned; |
| 385 | // however, that would be wasteful. Instead, the linked list is walked |
| 386 | // by repeatedly attempting to compare-and-swap the pointer into the |
| 387 | // `next` register until the compare-and-swap succeed. |
| 388 | // |
| 389 | // Care is taken to update new_block's start_index field as appropriate. |
| 390 | |
| 391 | let mut curr = next; |
| 392 | |
| 393 | // TODO: Should this iteration be capped? |
| 394 | loop { |
| 395 | let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel, Acquire) }; |
| 396 | |
| 397 | curr = match actual { |
| 398 | Ok(()) => { |
| 399 | return next; |
| 400 | } |
| 401 | Err(curr) => curr, |
| 402 | }; |
| 403 | |
| 404 | crate::loom::thread::yield_now(); |
| 405 | } |
| 406 | } |
| 407 | } |
| 408 | |
| 409 | /// Returns `true` if the specified slot has a value ready to be consumed. |
| 410 | fn is_ready(bits: usize, slot: usize) -> bool { |
| 411 | let mask: usize = 1 << slot; |
| 412 | mask == mask & bits |
| 413 | } |
| 414 | |
| 415 | /// Returns `true` if the closed flag has been set. |
| 416 | fn is_tx_closed(bits: usize) -> bool { |
| 417 | TX_CLOSED == bits & TX_CLOSED |
| 418 | } |
| 419 | |
| 420 | impl<T> Values<T> { |
| 421 | /// Initialize a `Values` struct from a pointer. |
| 422 | /// |
| 423 | /// # Safety |
| 424 | /// |
| 425 | /// The raw pointer must be valid for writing a `Values<T>`. |
| 426 | unsafe fn initialize(_value: NonNull<Values<T>>) { |
| 427 | // When fuzzing, `UnsafeCell` needs to be initialized. |
| 428 | if_loom! { |
| 429 | let p = _value.as_ptr() as *mut UnsafeCell<MaybeUninit<T>>; |
| 430 | for i in 0..BLOCK_CAP { |
| 431 | p.add(i) |
| 432 | .write(UnsafeCell::new(MaybeUninit::uninit())); |
| 433 | } |
| 434 | } |
| 435 | } |
| 436 | } |
| 437 | |
| 438 | impl<T> ops::Index<usize> for Values<T> { |
| 439 | type Output = UnsafeCell<MaybeUninit<T>>; |
| 440 | |
| 441 | fn index(&self, index: usize) -> &Self::Output { |
| 442 | self.0.index(index) |
| 443 | } |
| 444 | } |
| 445 | |
| 446 | #[cfg (all(test, not(loom)))] |
| 447 | #[test ] |
| 448 | fn assert_no_stack_overflow() { |
| 449 | // https://github.com/tokio-rs/tokio/issues/5293 |
| 450 | |
| 451 | struct Foo { |
| 452 | _a: [u8; 2_000_000], |
| 453 | } |
| 454 | |
| 455 | assert_eq!( |
| 456 | Layout::new::<MaybeUninit<Block<Foo>>>(), |
| 457 | Layout::new::<Block<Foo>>() |
| 458 | ); |
| 459 | |
| 460 | let _block = Block::<Foo>::new(0); |
| 461 | } |
| 462 | |