1use crate::loom::cell::UnsafeCell;
2use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize};
3
4use std::alloc::Layout;
5use std::mem::MaybeUninit;
6use std::ops;
7use std::ptr::{self, NonNull};
8use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release};
9
10/// A block in a linked list.
11///
12/// Each block in the list can hold up to `BLOCK_CAP` messages.
13pub(crate) struct Block<T> {
14 /// The header fields.
15 header: BlockHeader<T>,
16
17 /// Array containing values pushed into the block. Values are stored in a
18 /// continuous array in order to improve cache line behavior when reading.
19 /// The values must be manually dropped.
20 values: Values<T>,
21}
22
23/// Extra fields for a `Block<T>`.
24struct BlockHeader<T> {
25 /// The start index of this block.
26 ///
27 /// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`.
28 start_index: usize,
29
30 /// The next block in the linked list.
31 next: AtomicPtr<Block<T>>,
32
33 /// Bitfield tracking slots that are ready to have their values consumed.
34 ready_slots: AtomicUsize,
35
36 /// The observed `tail_position` value *after* the block has been passed by
37 /// `block_tail`.
38 observed_tail_position: UnsafeCell<usize>,
39}
40
41pub(crate) enum Read<T> {
42 Value(T),
43 Closed,
44}
45
46#[repr(transparent)]
47struct Values<T>([UnsafeCell<MaybeUninit<T>>; BLOCK_CAP]);
48
49use super::BLOCK_CAP;
50
51/// Masks an index to get the block identifier.
52const BLOCK_MASK: usize = !(BLOCK_CAP - 1);
53
54/// Masks an index to get the value offset in a block.
55const SLOT_MASK: usize = BLOCK_CAP - 1;
56
57/// Flag tracking that a block has gone through the sender's release routine.
58///
59/// When this is set, the receiver may consider freeing the block.
60const RELEASED: usize = 1 << BLOCK_CAP;
61
62/// Flag tracking all senders dropped.
63///
64/// When this flag is set, the send half of the channel has closed.
65const TX_CLOSED: usize = RELEASED << 1;
66
67/// Mask covering all bits used to track slot readiness.
68const READY_MASK: usize = RELEASED - 1;
69
70/// Returns the index of the first slot in the block referenced by `slot_index`.
71#[inline(always)]
72pub(crate) fn start_index(slot_index: usize) -> usize {
73 BLOCK_MASK & slot_index
74}
75
76/// Returns the offset into the block referenced by `slot_index`.
77#[inline(always)]
78pub(crate) fn offset(slot_index: usize) -> usize {
79 SLOT_MASK & slot_index
80}
81
82generate_addr_of_methods! {
83 impl<T> Block<T> {
84 unsafe fn addr_of_header(self: NonNull<Self>) -> NonNull<BlockHeader<T>> {
85 &self.header
86 }
87
88 unsafe fn addr_of_values(self: NonNull<Self>) -> NonNull<Values<T>> {
89 &self.values
90 }
91 }
92}
93
94impl<T> Block<T> {
95 pub(crate) fn new(start_index: usize) -> Box<Block<T>> {
96 unsafe {
97 // Allocate the block on the heap.
98 // SAFETY: The size of the Block<T> is non-zero, since it is at least the size of the header.
99 let block = std::alloc::alloc(Layout::new::<Block<T>>()) as *mut Block<T>;
100 let block = match NonNull::new(block) {
101 Some(block) => block,
102 None => std::alloc::handle_alloc_error(Layout::new::<Block<T>>()),
103 };
104
105 // Write the header to the block.
106 Block::addr_of_header(block).as_ptr().write(BlockHeader {
107 // The absolute index in the channel of the first slot in the block.
108 start_index,
109
110 // Pointer to the next block in the linked list.
111 next: AtomicPtr::new(ptr::null_mut()),
112
113 ready_slots: AtomicUsize::new(0),
114
115 observed_tail_position: UnsafeCell::new(0),
116 });
117
118 // Initialize the values array.
119 Values::initialize(Block::addr_of_values(block));
120
121 // Convert the pointer to a `Box`.
122 // Safety: The raw pointer was allocated using the global allocator, and with
123 // the layout for a `Block<T>`, so it's valid to convert it to box.
124 Box::from_raw(block.as_ptr())
125 }
126 }
127
128 /// Returns `true` if the block matches the given index.
129 pub(crate) fn is_at_index(&self, index: usize) -> bool {
130 debug_assert!(offset(index) == 0);
131 self.header.start_index == index
132 }
133
134 /// Returns the number of blocks between `self` and the block at the
135 /// specified index.
136 ///
137 /// `start_index` must represent a block *after* `self`.
138 pub(crate) fn distance(&self, other_index: usize) -> usize {
139 debug_assert!(offset(other_index) == 0);
140 other_index.wrapping_sub(self.header.start_index) / BLOCK_CAP
141 }
142
143 /// Reads the value at the given offset.
144 ///
145 /// Returns `None` if the slot is empty.
146 ///
147 /// # Safety
148 ///
149 /// To maintain safety, the caller must ensure:
150 ///
151 /// * No concurrent access to the slot.
152 pub(crate) unsafe fn read(&self, slot_index: usize) -> Option<Read<T>> {
153 let offset = offset(slot_index);
154
155 let ready_bits = self.header.ready_slots.load(Acquire);
156
157 if !is_ready(ready_bits, offset) {
158 if is_tx_closed(ready_bits) {
159 return Some(Read::Closed);
160 }
161
162 return None;
163 }
164
165 // Get the value
166 let value = self.values[offset].with(|ptr| ptr::read(ptr));
167
168 Some(Read::Value(value.assume_init()))
169 }
170
171 /// Writes a value to the block at the given offset.
172 ///
173 /// # Safety
174 ///
175 /// To maintain safety, the caller must ensure:
176 ///
177 /// * The slot is empty.
178 /// * No concurrent access to the slot.
179 pub(crate) unsafe fn write(&self, slot_index: usize, value: T) {
180 // Get the offset into the block
181 let slot_offset = offset(slot_index);
182
183 self.values[slot_offset].with_mut(|ptr| {
184 ptr::write(ptr, MaybeUninit::new(value));
185 });
186
187 // Release the value. After this point, the slot ref may no longer
188 // be used. It is possible for the receiver to free the memory at
189 // any point.
190 self.set_ready(slot_offset);
191 }
192
193 /// Signal to the receiver that the sender half of the list is closed.
194 pub(crate) unsafe fn tx_close(&self) {
195 self.header.ready_slots.fetch_or(TX_CLOSED, Release);
196 }
197
198 /// Resets the block to a blank state. This enables reusing blocks in the
199 /// channel.
200 ///
201 /// # Safety
202 ///
203 /// To maintain safety, the caller must ensure:
204 ///
205 /// * All slots are empty.
206 /// * The caller holds a unique pointer to the block.
207 pub(crate) unsafe fn reclaim(&mut self) {
208 self.header.start_index = 0;
209 self.header.next = AtomicPtr::new(ptr::null_mut());
210 self.header.ready_slots = AtomicUsize::new(0);
211 }
212
213 /// Releases the block to the rx half for freeing.
214 ///
215 /// This function is called by the tx half once it can be guaranteed that no
216 /// more senders will attempt to access the block.
217 ///
218 /// # Safety
219 ///
220 /// To maintain safety, the caller must ensure:
221 ///
222 /// * The block will no longer be accessed by any sender.
223 pub(crate) unsafe fn tx_release(&self, tail_position: usize) {
224 // Track the observed tail_position. Any sender targeting a greater
225 // tail_position is guaranteed to not access this block.
226 self.header
227 .observed_tail_position
228 .with_mut(|ptr| *ptr = tail_position);
229
230 // Set the released bit, signalling to the receiver that it is safe to
231 // free the block's memory as soon as all slots **prior** to
232 // `observed_tail_position` have been filled.
233 self.header.ready_slots.fetch_or(RELEASED, Release);
234 }
235
236 /// Mark a slot as ready
237 fn set_ready(&self, slot: usize) {
238 let mask = 1 << slot;
239 self.header.ready_slots.fetch_or(mask, Release);
240 }
241
242 /// Returns `true` when all slots have their `ready` bits set.
243 ///
244 /// This indicates that the block is in its final state and will no longer
245 /// be mutated.
246 ///
247 /// # Implementation
248 ///
249 /// The implementation walks each slot checking the `ready` flag. It might
250 /// be that it would make more sense to coalesce ready flags as bits in a
251 /// single atomic cell. However, this could have negative impact on cache
252 /// behavior as there would be many more mutations to a single slot.
253 pub(crate) fn is_final(&self) -> bool {
254 self.header.ready_slots.load(Acquire) & READY_MASK == READY_MASK
255 }
256
257 /// Returns the `observed_tail_position` value, if set
258 pub(crate) fn observed_tail_position(&self) -> Option<usize> {
259 if 0 == RELEASED & self.header.ready_slots.load(Acquire) {
260 None
261 } else {
262 Some(
263 self.header
264 .observed_tail_position
265 .with(|ptr| unsafe { *ptr }),
266 )
267 }
268 }
269
270 /// Loads the next block
271 pub(crate) fn load_next(&self, ordering: Ordering) -> Option<NonNull<Block<T>>> {
272 let ret = NonNull::new(self.header.next.load(ordering));
273
274 debug_assert!(unsafe {
275 ret.map(|block| {
276 block.as_ref().header.start_index == self.header.start_index.wrapping_add(BLOCK_CAP)
277 })
278 .unwrap_or(true)
279 });
280
281 ret
282 }
283
284 /// Pushes `block` as the next block in the link.
285 ///
286 /// Returns Ok if successful, otherwise, a pointer to the next block in
287 /// the list is returned.
288 ///
289 /// This requires that the next pointer is null.
290 ///
291 /// # Ordering
292 ///
293 /// This performs a compare-and-swap on `next` using AcqRel ordering.
294 ///
295 /// # Safety
296 ///
297 /// To maintain safety, the caller must ensure:
298 ///
299 /// * `block` is not freed until it has been removed from the list.
300 pub(crate) unsafe fn try_push(
301 &self,
302 block: &mut NonNull<Block<T>>,
303 success: Ordering,
304 failure: Ordering,
305 ) -> Result<(), NonNull<Block<T>>> {
306 block.as_mut().header.start_index = self.header.start_index.wrapping_add(BLOCK_CAP);
307
308 let next_ptr = self
309 .header
310 .next
311 .compare_exchange(ptr::null_mut(), block.as_ptr(), success, failure)
312 .unwrap_or_else(|x| x);
313
314 match NonNull::new(next_ptr) {
315 Some(next_ptr) => Err(next_ptr),
316 None => Ok(()),
317 }
318 }
319
320 /// Grows the `Block` linked list by allocating and appending a new block.
321 ///
322 /// The next block in the linked list is returned. This may or may not be
323 /// the one allocated by the function call.
324 ///
325 /// # Implementation
326 ///
327 /// It is assumed that `self.next` is null. A new block is allocated with
328 /// `start_index` set to be the next block. A compare-and-swap is performed
329 /// with AcqRel memory ordering. If the compare-and-swap is successful, the
330 /// newly allocated block is released to other threads walking the block
331 /// linked list. If the compare-and-swap fails, the current thread acquires
332 /// the next block in the linked list, allowing the current thread to access
333 /// the slots.
334 pub(crate) fn grow(&self) -> NonNull<Block<T>> {
335 // Create the new block. It is assumed that the block will become the
336 // next one after `&self`. If this turns out to not be the case,
337 // `start_index` is updated accordingly.
338 let new_block = Block::new(self.header.start_index + BLOCK_CAP);
339
340 let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) };
341
342 // Attempt to store the block. The first compare-and-swap attempt is
343 // "unrolled" due to minor differences in logic
344 //
345 // `AcqRel` is used as the ordering **only** when attempting the
346 // compare-and-swap on self.next.
347 //
348 // If the compare-and-swap fails, then the actual value of the cell is
349 // returned from this function and accessed by the caller. Given this,
350 // the memory must be acquired.
351 //
352 // `Release` ensures that the newly allocated block is available to
353 // other threads acquiring the next pointer.
354 let next = NonNull::new(
355 self.header
356 .next
357 .compare_exchange(ptr::null_mut(), new_block.as_ptr(), AcqRel, Acquire)
358 .unwrap_or_else(|x| x),
359 );
360
361 let next = match next {
362 Some(next) => next,
363 None => {
364 // The compare-and-swap succeeded and the newly allocated block
365 // is successfully pushed.
366 return new_block;
367 }
368 };
369
370 // There already is a next block in the linked list. The newly allocated
371 // block could be dropped and the discovered next block returned;
372 // however, that would be wasteful. Instead, the linked list is walked
373 // by repeatedly attempting to compare-and-swap the pointer into the
374 // `next` register until the compare-and-swap succeed.
375 //
376 // Care is taken to update new_block's start_index field as appropriate.
377
378 let mut curr = next;
379
380 // TODO: Should this iteration be capped?
381 loop {
382 let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel, Acquire) };
383
384 curr = match actual {
385 Ok(_) => {
386 return next;
387 }
388 Err(curr) => curr,
389 };
390
391 crate::loom::thread::yield_now();
392 }
393 }
394}
395
396/// Returns `true` if the specified slot has a value ready to be consumed.
397fn is_ready(bits: usize, slot: usize) -> bool {
398 let mask: usize = 1 << slot;
399 mask == mask & bits
400}
401
402/// Returns `true` if the closed flag has been set.
403fn is_tx_closed(bits: usize) -> bool {
404 TX_CLOSED == bits & TX_CLOSED
405}
406
407impl<T> Values<T> {
408 /// Initialize a `Values` struct from a pointer.
409 ///
410 /// # Safety
411 ///
412 /// The raw pointer must be valid for writing a `Values<T>`.
413 unsafe fn initialize(_value: NonNull<Values<T>>) {
414 // When fuzzing, `UnsafeCell` needs to be initialized.
415 if_loom! {
416 let p = _value.as_ptr() as *mut UnsafeCell<MaybeUninit<T>>;
417 for i in 0..BLOCK_CAP {
418 p.add(i)
419 .write(UnsafeCell::new(MaybeUninit::uninit()));
420 }
421 }
422 }
423}
424
425impl<T> ops::Index<usize> for Values<T> {
426 type Output = UnsafeCell<MaybeUninit<T>>;
427
428 fn index(&self, index: usize) -> &Self::Output {
429 self.0.index(index)
430 }
431}
432
433#[cfg(all(test, not(loom)))]
434#[test]
435fn assert_no_stack_overflow() {
436 // https://github.com/tokio-rs/tokio/issues/5293
437
438 struct Foo {
439 _a: [u8; 2_000_000],
440 }
441
442 assert_eq!(
443 Layout::new::<MaybeUninit<Block<Foo>>>(),
444 Layout::new::<Block<Foo>>()
445 );
446
447 let _block = Block::<Foo>::new(0);
448}
449