1 | use crate::loom::cell::UnsafeCell; |
2 | use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize}; |
3 | |
4 | use std::alloc::Layout; |
5 | use std::mem::MaybeUninit; |
6 | use std::ops; |
7 | use std::ptr::{self, NonNull}; |
8 | use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release}; |
9 | |
10 | /// A block in a linked list. |
11 | /// |
12 | /// Each block in the list can hold up to `BLOCK_CAP` messages. |
13 | pub(crate) struct Block<T> { |
14 | /// The header fields. |
15 | header: BlockHeader<T>, |
16 | |
17 | /// Array containing values pushed into the block. Values are stored in a |
18 | /// continuous array in order to improve cache line behavior when reading. |
19 | /// The values must be manually dropped. |
20 | values: Values<T>, |
21 | } |
22 | |
23 | /// Extra fields for a `Block<T>`. |
24 | struct BlockHeader<T> { |
25 | /// The start index of this block. |
26 | /// |
27 | /// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`. |
28 | start_index: usize, |
29 | |
30 | /// The next block in the linked list. |
31 | next: AtomicPtr<Block<T>>, |
32 | |
33 | /// Bitfield tracking slots that are ready to have their values consumed. |
34 | ready_slots: AtomicUsize, |
35 | |
36 | /// The observed `tail_position` value *after* the block has been passed by |
37 | /// `block_tail`. |
38 | observed_tail_position: UnsafeCell<usize>, |
39 | } |
40 | |
41 | pub(crate) enum Read<T> { |
42 | Value(T), |
43 | Closed, |
44 | } |
45 | |
46 | #[repr (transparent)] |
47 | struct Values<T>([UnsafeCell<MaybeUninit<T>>; BLOCK_CAP]); |
48 | |
49 | use super::BLOCK_CAP; |
50 | |
51 | /// Masks an index to get the block identifier. |
52 | const BLOCK_MASK: usize = !(BLOCK_CAP - 1); |
53 | |
54 | /// Masks an index to get the value offset in a block. |
55 | const SLOT_MASK: usize = BLOCK_CAP - 1; |
56 | |
57 | /// Flag tracking that a block has gone through the sender's release routine. |
58 | /// |
59 | /// When this is set, the receiver may consider freeing the block. |
60 | const RELEASED: usize = 1 << BLOCK_CAP; |
61 | |
62 | /// Flag tracking all senders dropped. |
63 | /// |
64 | /// When this flag is set, the send half of the channel has closed. |
65 | const TX_CLOSED: usize = RELEASED << 1; |
66 | |
67 | /// Mask covering all bits used to track slot readiness. |
68 | const READY_MASK: usize = RELEASED - 1; |
69 | |
70 | /// Returns the index of the first slot in the block referenced by `slot_index`. |
71 | #[inline (always)] |
72 | pub(crate) fn start_index(slot_index: usize) -> usize { |
73 | BLOCK_MASK & slot_index |
74 | } |
75 | |
76 | /// Returns the offset into the block referenced by `slot_index`. |
77 | #[inline (always)] |
78 | pub(crate) fn offset(slot_index: usize) -> usize { |
79 | SLOT_MASK & slot_index |
80 | } |
81 | |
82 | generate_addr_of_methods! { |
83 | impl<T> Block<T> { |
84 | unsafe fn addr_of_header(self: NonNull<Self>) -> NonNull<BlockHeader<T>> { |
85 | &self.header |
86 | } |
87 | |
88 | unsafe fn addr_of_values(self: NonNull<Self>) -> NonNull<Values<T>> { |
89 | &self.values |
90 | } |
91 | } |
92 | } |
93 | |
94 | impl<T> Block<T> { |
95 | pub(crate) fn new(start_index: usize) -> Box<Block<T>> { |
96 | unsafe { |
97 | // Allocate the block on the heap. |
98 | // SAFETY: The size of the Block<T> is non-zero, since it is at least the size of the header. |
99 | let block = std::alloc::alloc(Layout::new::<Block<T>>()) as *mut Block<T>; |
100 | let block = match NonNull::new(block) { |
101 | Some(block) => block, |
102 | None => std::alloc::handle_alloc_error(Layout::new::<Block<T>>()), |
103 | }; |
104 | |
105 | // Write the header to the block. |
106 | Block::addr_of_header(block).as_ptr().write(BlockHeader { |
107 | // The absolute index in the channel of the first slot in the block. |
108 | start_index, |
109 | |
110 | // Pointer to the next block in the linked list. |
111 | next: AtomicPtr::new(ptr::null_mut()), |
112 | |
113 | ready_slots: AtomicUsize::new(0), |
114 | |
115 | observed_tail_position: UnsafeCell::new(0), |
116 | }); |
117 | |
118 | // Initialize the values array. |
119 | Values::initialize(Block::addr_of_values(block)); |
120 | |
121 | // Convert the pointer to a `Box`. |
122 | // Safety: The raw pointer was allocated using the global allocator, and with |
123 | // the layout for a `Block<T>`, so it's valid to convert it to box. |
124 | Box::from_raw(block.as_ptr()) |
125 | } |
126 | } |
127 | |
128 | /// Returns `true` if the block matches the given index. |
129 | pub(crate) fn is_at_index(&self, index: usize) -> bool { |
130 | debug_assert!(offset(index) == 0); |
131 | self.header.start_index == index |
132 | } |
133 | |
134 | /// Returns the number of blocks between `self` and the block at the |
135 | /// specified index. |
136 | /// |
137 | /// `start_index` must represent a block *after* `self`. |
138 | pub(crate) fn distance(&self, other_index: usize) -> usize { |
139 | debug_assert!(offset(other_index) == 0); |
140 | other_index.wrapping_sub(self.header.start_index) / BLOCK_CAP |
141 | } |
142 | |
143 | /// Reads the value at the given offset. |
144 | /// |
145 | /// Returns `None` if the slot is empty. |
146 | /// |
147 | /// # Safety |
148 | /// |
149 | /// To maintain safety, the caller must ensure: |
150 | /// |
151 | /// * No concurrent access to the slot. |
152 | pub(crate) unsafe fn read(&self, slot_index: usize) -> Option<Read<T>> { |
153 | let offset = offset(slot_index); |
154 | |
155 | let ready_bits = self.header.ready_slots.load(Acquire); |
156 | |
157 | if !is_ready(ready_bits, offset) { |
158 | if is_tx_closed(ready_bits) { |
159 | return Some(Read::Closed); |
160 | } |
161 | |
162 | return None; |
163 | } |
164 | |
165 | // Get the value |
166 | let value = self.values[offset].with(|ptr| ptr::read(ptr)); |
167 | |
168 | Some(Read::Value(value.assume_init())) |
169 | } |
170 | |
171 | /// Writes a value to the block at the given offset. |
172 | /// |
173 | /// # Safety |
174 | /// |
175 | /// To maintain safety, the caller must ensure: |
176 | /// |
177 | /// * The slot is empty. |
178 | /// * No concurrent access to the slot. |
179 | pub(crate) unsafe fn write(&self, slot_index: usize, value: T) { |
180 | // Get the offset into the block |
181 | let slot_offset = offset(slot_index); |
182 | |
183 | self.values[slot_offset].with_mut(|ptr| { |
184 | ptr::write(ptr, MaybeUninit::new(value)); |
185 | }); |
186 | |
187 | // Release the value. After this point, the slot ref may no longer |
188 | // be used. It is possible for the receiver to free the memory at |
189 | // any point. |
190 | self.set_ready(slot_offset); |
191 | } |
192 | |
193 | /// Signal to the receiver that the sender half of the list is closed. |
194 | pub(crate) unsafe fn tx_close(&self) { |
195 | self.header.ready_slots.fetch_or(TX_CLOSED, Release); |
196 | } |
197 | |
198 | /// Resets the block to a blank state. This enables reusing blocks in the |
199 | /// channel. |
200 | /// |
201 | /// # Safety |
202 | /// |
203 | /// To maintain safety, the caller must ensure: |
204 | /// |
205 | /// * All slots are empty. |
206 | /// * The caller holds a unique pointer to the block. |
207 | pub(crate) unsafe fn reclaim(&mut self) { |
208 | self.header.start_index = 0; |
209 | self.header.next = AtomicPtr::new(ptr::null_mut()); |
210 | self.header.ready_slots = AtomicUsize::new(0); |
211 | } |
212 | |
213 | /// Releases the block to the rx half for freeing. |
214 | /// |
215 | /// This function is called by the tx half once it can be guaranteed that no |
216 | /// more senders will attempt to access the block. |
217 | /// |
218 | /// # Safety |
219 | /// |
220 | /// To maintain safety, the caller must ensure: |
221 | /// |
222 | /// * The block will no longer be accessed by any sender. |
223 | pub(crate) unsafe fn tx_release(&self, tail_position: usize) { |
224 | // Track the observed tail_position. Any sender targeting a greater |
225 | // tail_position is guaranteed to not access this block. |
226 | self.header |
227 | .observed_tail_position |
228 | .with_mut(|ptr| *ptr = tail_position); |
229 | |
230 | // Set the released bit, signalling to the receiver that it is safe to |
231 | // free the block's memory as soon as all slots **prior** to |
232 | // `observed_tail_position` have been filled. |
233 | self.header.ready_slots.fetch_or(RELEASED, Release); |
234 | } |
235 | |
236 | /// Mark a slot as ready |
237 | fn set_ready(&self, slot: usize) { |
238 | let mask = 1 << slot; |
239 | self.header.ready_slots.fetch_or(mask, Release); |
240 | } |
241 | |
242 | /// Returns `true` when all slots have their `ready` bits set. |
243 | /// |
244 | /// This indicates that the block is in its final state and will no longer |
245 | /// be mutated. |
246 | /// |
247 | /// # Implementation |
248 | /// |
249 | /// The implementation walks each slot checking the `ready` flag. It might |
250 | /// be that it would make more sense to coalesce ready flags as bits in a |
251 | /// single atomic cell. However, this could have negative impact on cache |
252 | /// behavior as there would be many more mutations to a single slot. |
253 | pub(crate) fn is_final(&self) -> bool { |
254 | self.header.ready_slots.load(Acquire) & READY_MASK == READY_MASK |
255 | } |
256 | |
257 | /// Returns the `observed_tail_position` value, if set |
258 | pub(crate) fn observed_tail_position(&self) -> Option<usize> { |
259 | if 0 == RELEASED & self.header.ready_slots.load(Acquire) { |
260 | None |
261 | } else { |
262 | Some( |
263 | self.header |
264 | .observed_tail_position |
265 | .with(|ptr| unsafe { *ptr }), |
266 | ) |
267 | } |
268 | } |
269 | |
270 | /// Loads the next block |
271 | pub(crate) fn load_next(&self, ordering: Ordering) -> Option<NonNull<Block<T>>> { |
272 | let ret = NonNull::new(self.header.next.load(ordering)); |
273 | |
274 | debug_assert!(unsafe { |
275 | ret.map(|block| { |
276 | block.as_ref().header.start_index == self.header.start_index.wrapping_add(BLOCK_CAP) |
277 | }) |
278 | .unwrap_or(true) |
279 | }); |
280 | |
281 | ret |
282 | } |
283 | |
284 | /// Pushes `block` as the next block in the link. |
285 | /// |
286 | /// Returns Ok if successful, otherwise, a pointer to the next block in |
287 | /// the list is returned. |
288 | /// |
289 | /// This requires that the next pointer is null. |
290 | /// |
291 | /// # Ordering |
292 | /// |
293 | /// This performs a compare-and-swap on `next` using AcqRel ordering. |
294 | /// |
295 | /// # Safety |
296 | /// |
297 | /// To maintain safety, the caller must ensure: |
298 | /// |
299 | /// * `block` is not freed until it has been removed from the list. |
300 | pub(crate) unsafe fn try_push( |
301 | &self, |
302 | block: &mut NonNull<Block<T>>, |
303 | success: Ordering, |
304 | failure: Ordering, |
305 | ) -> Result<(), NonNull<Block<T>>> { |
306 | block.as_mut().header.start_index = self.header.start_index.wrapping_add(BLOCK_CAP); |
307 | |
308 | let next_ptr = self |
309 | .header |
310 | .next |
311 | .compare_exchange(ptr::null_mut(), block.as_ptr(), success, failure) |
312 | .unwrap_or_else(|x| x); |
313 | |
314 | match NonNull::new(next_ptr) { |
315 | Some(next_ptr) => Err(next_ptr), |
316 | None => Ok(()), |
317 | } |
318 | } |
319 | |
320 | /// Grows the `Block` linked list by allocating and appending a new block. |
321 | /// |
322 | /// The next block in the linked list is returned. This may or may not be |
323 | /// the one allocated by the function call. |
324 | /// |
325 | /// # Implementation |
326 | /// |
327 | /// It is assumed that `self.next` is null. A new block is allocated with |
328 | /// `start_index` set to be the next block. A compare-and-swap is performed |
329 | /// with AcqRel memory ordering. If the compare-and-swap is successful, the |
330 | /// newly allocated block is released to other threads walking the block |
331 | /// linked list. If the compare-and-swap fails, the current thread acquires |
332 | /// the next block in the linked list, allowing the current thread to access |
333 | /// the slots. |
334 | pub(crate) fn grow(&self) -> NonNull<Block<T>> { |
335 | // Create the new block. It is assumed that the block will become the |
336 | // next one after `&self`. If this turns out to not be the case, |
337 | // `start_index` is updated accordingly. |
338 | let new_block = Block::new(self.header.start_index + BLOCK_CAP); |
339 | |
340 | let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) }; |
341 | |
342 | // Attempt to store the block. The first compare-and-swap attempt is |
343 | // "unrolled" due to minor differences in logic |
344 | // |
345 | // `AcqRel` is used as the ordering **only** when attempting the |
346 | // compare-and-swap on self.next. |
347 | // |
348 | // If the compare-and-swap fails, then the actual value of the cell is |
349 | // returned from this function and accessed by the caller. Given this, |
350 | // the memory must be acquired. |
351 | // |
352 | // `Release` ensures that the newly allocated block is available to |
353 | // other threads acquiring the next pointer. |
354 | let next = NonNull::new( |
355 | self.header |
356 | .next |
357 | .compare_exchange(ptr::null_mut(), new_block.as_ptr(), AcqRel, Acquire) |
358 | .unwrap_or_else(|x| x), |
359 | ); |
360 | |
361 | let next = match next { |
362 | Some(next) => next, |
363 | None => { |
364 | // The compare-and-swap succeeded and the newly allocated block |
365 | // is successfully pushed. |
366 | return new_block; |
367 | } |
368 | }; |
369 | |
370 | // There already is a next block in the linked list. The newly allocated |
371 | // block could be dropped and the discovered next block returned; |
372 | // however, that would be wasteful. Instead, the linked list is walked |
373 | // by repeatedly attempting to compare-and-swap the pointer into the |
374 | // `next` register until the compare-and-swap succeed. |
375 | // |
376 | // Care is taken to update new_block's start_index field as appropriate. |
377 | |
378 | let mut curr = next; |
379 | |
380 | // TODO: Should this iteration be capped? |
381 | loop { |
382 | let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel, Acquire) }; |
383 | |
384 | curr = match actual { |
385 | Ok(_) => { |
386 | return next; |
387 | } |
388 | Err(curr) => curr, |
389 | }; |
390 | |
391 | crate::loom::thread::yield_now(); |
392 | } |
393 | } |
394 | } |
395 | |
396 | /// Returns `true` if the specified slot has a value ready to be consumed. |
397 | fn is_ready(bits: usize, slot: usize) -> bool { |
398 | let mask: usize = 1 << slot; |
399 | mask == mask & bits |
400 | } |
401 | |
402 | /// Returns `true` if the closed flag has been set. |
403 | fn is_tx_closed(bits: usize) -> bool { |
404 | TX_CLOSED == bits & TX_CLOSED |
405 | } |
406 | |
407 | impl<T> Values<T> { |
408 | /// Initialize a `Values` struct from a pointer. |
409 | /// |
410 | /// # Safety |
411 | /// |
412 | /// The raw pointer must be valid for writing a `Values<T>`. |
413 | unsafe fn initialize(_value: NonNull<Values<T>>) { |
414 | // When fuzzing, `UnsafeCell` needs to be initialized. |
415 | if_loom! { |
416 | let p = _value.as_ptr() as *mut UnsafeCell<MaybeUninit<T>>; |
417 | for i in 0..BLOCK_CAP { |
418 | p.add(i) |
419 | .write(UnsafeCell::new(MaybeUninit::uninit())); |
420 | } |
421 | } |
422 | } |
423 | } |
424 | |
425 | impl<T> ops::Index<usize> for Values<T> { |
426 | type Output = UnsafeCell<MaybeUninit<T>>; |
427 | |
428 | fn index(&self, index: usize) -> &Self::Output { |
429 | self.0.index(index) |
430 | } |
431 | } |
432 | |
433 | #[cfg (all(test, not(loom)))] |
434 | #[test ] |
435 | fn assert_no_stack_overflow() { |
436 | // https://github.com/tokio-rs/tokio/issues/5293 |
437 | |
438 | struct Foo { |
439 | _a: [u8; 2_000_000], |
440 | } |
441 | |
442 | assert_eq!( |
443 | Layout::new::<MaybeUninit<Block<Foo>>>(), |
444 | Layout::new::<Block<Foo>>() |
445 | ); |
446 | |
447 | let _block = Block::<Foo>::new(0); |
448 | } |
449 | |