1//! A concurrent, lock-free, FIFO list.
2
3use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize};
4use crate::loom::thread;
5use crate::sync::mpsc::block::{self, Block};
6
7use std::fmt;
8use std::ptr::NonNull;
9use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
10
11/// List queue transmit handle.
12pub(crate) struct Tx<T> {
13 /// Tail in the `Block` mpmc list.
14 block_tail: AtomicPtr<Block<T>>,
15
16 /// Position to push the next message. This references a block and offset
17 /// into the block.
18 tail_position: AtomicUsize,
19}
20
21/// List queue receive handle
22pub(crate) struct Rx<T> {
23 /// Pointer to the block being processed.
24 head: NonNull<Block<T>>,
25
26 /// Next slot index to process.
27 index: usize,
28
29 /// Pointer to the next block pending release.
30 free_head: NonNull<Block<T>>,
31}
32
33/// Return value of `Rx::try_pop`.
34pub(crate) enum TryPopResult<T> {
35 /// Successfully popped a value.
36 Ok(T),
37 /// The channel is empty.
38 Empty,
39 /// The channel is empty and closed.
40 Closed,
41 /// The channel is not empty, but the first value is being written.
42 Busy,
43}
44
45pub(crate) fn channel<T>() -> (Tx<T>, Rx<T>) {
46 // Create the initial block shared between the tx and rx halves.
47 let initial_block = Block::new(0);
48 let initial_block_ptr = Box::into_raw(initial_block);
49
50 let tx = Tx {
51 block_tail: AtomicPtr::new(initial_block_ptr),
52 tail_position: AtomicUsize::new(0),
53 };
54
55 let head = NonNull::new(initial_block_ptr).unwrap();
56
57 let rx = Rx {
58 head,
59 index: 0,
60 free_head: head,
61 };
62
63 (tx, rx)
64}
65
66impl<T> Tx<T> {
67 /// Pushes a value into the list.
68 pub(crate) fn push(&self, value: T) {
69 // First, claim a slot for the value. `Acquire` is used here to
70 // synchronize with the `fetch_add` in `reclaim_blocks`.
71 let slot_index = self.tail_position.fetch_add(1, Acquire);
72
73 // Load the current block and write the value
74 let block = self.find_block(slot_index);
75
76 unsafe {
77 // Write the value to the block
78 block.as_ref().write(slot_index, value);
79 }
80 }
81
82 /// Closes the send half of the list.
83 ///
84 /// Similar process as pushing a value, but instead of writing the value &
85 /// setting the ready flag, the `TX_CLOSED` flag is set on the block.
86 pub(crate) fn close(&self) {
87 // First, claim a slot for the value. This is the last slot that will be
88 // claimed.
89 let slot_index = self.tail_position.fetch_add(1, Acquire);
90
91 let block = self.find_block(slot_index);
92
93 unsafe { block.as_ref().tx_close() }
94 }
95
96 fn find_block(&self, slot_index: usize) -> NonNull<Block<T>> {
97 // The start index of the block that contains `index`.
98 let start_index = block::start_index(slot_index);
99
100 // The index offset into the block
101 let offset = block::offset(slot_index);
102
103 // Load the current head of the block
104 let mut block_ptr = self.block_tail.load(Acquire);
105
106 let block = unsafe { &*block_ptr };
107
108 // Calculate the distance between the tail ptr and the target block
109 let distance = block.distance(start_index);
110
111 // Decide if this call to `find_block` should attempt to update the
112 // `block_tail` pointer.
113 //
114 // Updating `block_tail` is not always performed in order to reduce
115 // contention.
116 //
117 // When set, as the routine walks the linked list, it attempts to update
118 // `block_tail`. If the update cannot be performed, `try_updating_tail`
119 // is unset.
120 let mut try_updating_tail = distance > offset;
121
122 // Walk the linked list of blocks until the block with `start_index` is
123 // found.
124 loop {
125 let block = unsafe { &(*block_ptr) };
126
127 if block.is_at_index(start_index) {
128 return unsafe { NonNull::new_unchecked(block_ptr) };
129 }
130
131 let next_block = block
132 .load_next(Acquire)
133 // There is no allocated next block, grow the linked list.
134 .unwrap_or_else(|| block.grow());
135
136 // If the block is **not** final, then the tail pointer cannot be
137 // advanced any more.
138 try_updating_tail &= block.is_final();
139
140 if try_updating_tail {
141 // Advancing `block_tail` must happen when walking the linked
142 // list. `block_tail` may not advance passed any blocks that are
143 // not "final". At the point a block is finalized, it is unknown
144 // if there are any prior blocks that are unfinalized, which
145 // makes it impossible to advance `block_tail`.
146 //
147 // While walking the linked list, `block_tail` can be advanced
148 // as long as finalized blocks are traversed.
149 //
150 // Release ordering is used to ensure that any subsequent reads
151 // are able to see the memory pointed to by `block_tail`.
152 //
153 // Acquire is not needed as any "actual" value is not accessed.
154 // At this point, the linked list is walked to acquire blocks.
155 if self
156 .block_tail
157 .compare_exchange(block_ptr, next_block.as_ptr(), Release, Relaxed)
158 .is_ok()
159 {
160 // Synchronize with any senders
161 let tail_position = self.tail_position.fetch_add(0, Release);
162
163 unsafe {
164 block.tx_release(tail_position);
165 }
166 } else {
167 // A concurrent sender is also working on advancing
168 // `block_tail` and this thread is falling behind.
169 //
170 // Stop trying to advance the tail pointer
171 try_updating_tail = false;
172 }
173 }
174
175 block_ptr = next_block.as_ptr();
176
177 thread::yield_now();
178 }
179 }
180
181 pub(crate) unsafe fn reclaim_block(&self, mut block: NonNull<Block<T>>) {
182 // The block has been removed from the linked list and ownership
183 // is reclaimed.
184 //
185 // Before dropping the block, see if it can be reused by
186 // inserting it back at the end of the linked list.
187 //
188 // First, reset the data
189 block.as_mut().reclaim();
190
191 let mut reused = false;
192
193 // Attempt to insert the block at the end
194 //
195 // Walk at most three times
196 //
197 let curr_ptr = self.block_tail.load(Acquire);
198
199 // The pointer can never be null
200 debug_assert!(!curr_ptr.is_null());
201
202 let mut curr = NonNull::new_unchecked(curr_ptr);
203
204 // TODO: Unify this logic with Block::grow
205 for _ in 0..3 {
206 match curr.as_ref().try_push(&mut block, AcqRel, Acquire) {
207 Ok(()) => {
208 reused = true;
209 break;
210 }
211 Err(next) => {
212 curr = next;
213 }
214 }
215 }
216
217 if !reused {
218 let _ = Box::from_raw(block.as_ptr());
219 }
220 }
221}
222
223impl<T> fmt::Debug for Tx<T> {
224 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
225 fmt.debug_struct("Tx")
226 .field("block_tail", &self.block_tail.load(Relaxed))
227 .field("tail_position", &self.tail_position.load(Relaxed))
228 .finish()
229 }
230}
231
232impl<T> Rx<T> {
233 /// Pops the next value off the queue.
234 pub(crate) fn pop(&mut self, tx: &Tx<T>) -> Option<block::Read<T>> {
235 // Advance `head`, if needed
236 if !self.try_advancing_head() {
237 return None;
238 }
239
240 self.reclaim_blocks(tx);
241
242 unsafe {
243 let block = self.head.as_ref();
244
245 let ret = block.read(self.index);
246
247 if let Some(block::Read::Value(..)) = ret {
248 self.index = self.index.wrapping_add(1);
249 }
250
251 ret
252 }
253 }
254
255 /// Pops the next value off the queue, detecting whether the block
256 /// is busy or empty on failure.
257 ///
258 /// This function exists because `Rx::pop` can return `None` even if the
259 /// channel's queue contains a message that has been completely written.
260 /// This can happen if the fully delivered message is behind another message
261 /// that is in the middle of being written to the block, since the channel
262 /// can't return the messages out of order.
263 pub(crate) fn try_pop(&mut self, tx: &Tx<T>) -> TryPopResult<T> {
264 let tail_position = tx.tail_position.load(Acquire);
265 let result = self.pop(tx);
266
267 match result {
268 Some(block::Read::Value(t)) => TryPopResult::Ok(t),
269 Some(block::Read::Closed) => TryPopResult::Closed,
270 None if tail_position == self.index => TryPopResult::Empty,
271 None => TryPopResult::Busy,
272 }
273 }
274
275 /// Tries advancing the block pointer to the block referenced by `self.index`.
276 ///
277 /// Returns `true` if successful, `false` if there is no next block to load.
278 fn try_advancing_head(&mut self) -> bool {
279 let block_index = block::start_index(self.index);
280
281 loop {
282 let next_block = {
283 let block = unsafe { self.head.as_ref() };
284
285 if block.is_at_index(block_index) {
286 return true;
287 }
288
289 block.load_next(Acquire)
290 };
291
292 let next_block = match next_block {
293 Some(next_block) => next_block,
294 None => {
295 return false;
296 }
297 };
298
299 self.head = next_block;
300
301 thread::yield_now();
302 }
303 }
304
305 fn reclaim_blocks(&mut self, tx: &Tx<T>) {
306 while self.free_head != self.head {
307 unsafe {
308 // Get a handle to the block that will be freed and update
309 // `free_head` to point to the next block.
310 let block = self.free_head;
311
312 let observed_tail_position = block.as_ref().observed_tail_position();
313
314 let required_index = match observed_tail_position {
315 Some(i) => i,
316 None => return,
317 };
318
319 if required_index > self.index {
320 return;
321 }
322
323 // We may read the next pointer with `Relaxed` ordering as it is
324 // guaranteed that the `reclaim_blocks` routine trails the `recv`
325 // routine. Any memory accessed by `reclaim_blocks` has already
326 // been acquired by `recv`.
327 let next_block = block.as_ref().load_next(Relaxed);
328
329 // Update the free list head
330 self.free_head = next_block.unwrap();
331
332 // Push the emptied block onto the back of the queue, making it
333 // available to senders.
334 tx.reclaim_block(block);
335 }
336
337 thread::yield_now();
338 }
339 }
340
341 /// Effectively `Drop` all the blocks. Should only be called once, when
342 /// the list is dropping.
343 pub(super) unsafe fn free_blocks(&mut self) {
344 debug_assert_ne!(self.free_head, NonNull::dangling());
345
346 let mut cur = Some(self.free_head);
347
348 #[cfg(debug_assertions)]
349 {
350 // to trigger the debug assert above so as to catch that we
351 // don't call `free_blocks` more than once.
352 self.free_head = NonNull::dangling();
353 self.head = NonNull::dangling();
354 }
355
356 while let Some(block) = cur {
357 cur = block.as_ref().load_next(Relaxed);
358 drop(Box::from_raw(block.as_ptr()));
359 }
360 }
361}
362
363impl<T> fmt::Debug for Rx<T> {
364 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
365 fmt.debug_struct("Rx")
366 .field("head", &self.head)
367 .field("index", &self.index)
368 .field("free_head", &self.free_head)
369 .finish()
370 }
371}
372