1 | //! A concurrent, lock-free, FIFO list. |
2 | |
3 | use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize}; |
4 | use crate::loom::thread; |
5 | use crate::sync::mpsc::block::{self, Block}; |
6 | |
7 | use std::fmt; |
8 | use std::ptr::NonNull; |
9 | use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; |
10 | |
11 | /// List queue transmit handle. |
12 | pub(crate) struct Tx<T> { |
13 | /// Tail in the `Block` mpmc list. |
14 | block_tail: AtomicPtr<Block<T>>, |
15 | |
16 | /// Position to push the next message. This references a block and offset |
17 | /// into the block. |
18 | tail_position: AtomicUsize, |
19 | } |
20 | |
21 | /// List queue receive handle |
22 | pub(crate) struct Rx<T> { |
23 | /// Pointer to the block being processed. |
24 | head: NonNull<Block<T>>, |
25 | |
26 | /// Next slot index to process. |
27 | index: usize, |
28 | |
29 | /// Pointer to the next block pending release. |
30 | free_head: NonNull<Block<T>>, |
31 | } |
32 | |
33 | /// Return value of `Rx::try_pop`. |
34 | pub(crate) enum TryPopResult<T> { |
35 | /// Successfully popped a value. |
36 | Ok(T), |
37 | /// The channel is empty. |
38 | Empty, |
39 | /// The channel is empty and closed. |
40 | Closed, |
41 | /// The channel is not empty, but the first value is being written. |
42 | Busy, |
43 | } |
44 | |
45 | pub(crate) fn channel<T>() -> (Tx<T>, Rx<T>) { |
46 | // Create the initial block shared between the tx and rx halves. |
47 | let initial_block: Box> = Block::new(start_index:0); |
48 | let initial_block_ptr: *mut Block = Box::into_raw(initial_block); |
49 | |
50 | let tx: Tx = Tx { |
51 | block_tail: AtomicPtr::new(initial_block_ptr), |
52 | tail_position: AtomicUsize::new(val:0), |
53 | }; |
54 | |
55 | let head: NonNull> = NonNull::new(initial_block_ptr).unwrap(); |
56 | |
57 | let rx: Rx = Rx { |
58 | head, |
59 | index: 0, |
60 | free_head: head, |
61 | }; |
62 | |
63 | (tx, rx) |
64 | } |
65 | |
66 | impl<T> Tx<T> { |
67 | /// Pushes a value into the list. |
68 | pub(crate) fn push(&self, value: T) { |
69 | // First, claim a slot for the value. `Acquire` is used here to |
70 | // synchronize with the `fetch_add` in `reclaim_blocks`. |
71 | let slot_index = self.tail_position.fetch_add(1, Acquire); |
72 | |
73 | // Load the current block and write the value |
74 | let block = self.find_block(slot_index); |
75 | |
76 | unsafe { |
77 | // Write the value to the block |
78 | block.as_ref().write(slot_index, value); |
79 | } |
80 | } |
81 | |
82 | /// Closes the send half of the list. |
83 | /// |
84 | /// Similar process as pushing a value, but instead of writing the value & |
85 | /// setting the ready flag, the `TX_CLOSED` flag is set on the block. |
86 | pub(crate) fn close(&self) { |
87 | // First, claim a slot for the value. This is the last slot that will be |
88 | // claimed. |
89 | let slot_index = self.tail_position.fetch_add(1, Acquire); |
90 | |
91 | let block = self.find_block(slot_index); |
92 | |
93 | unsafe { block.as_ref().tx_close() } |
94 | } |
95 | |
96 | fn find_block(&self, slot_index: usize) -> NonNull<Block<T>> { |
97 | // The start index of the block that contains `index`. |
98 | let start_index = block::start_index(slot_index); |
99 | |
100 | // The index offset into the block |
101 | let offset = block::offset(slot_index); |
102 | |
103 | // Load the current head of the block |
104 | let mut block_ptr = self.block_tail.load(Acquire); |
105 | |
106 | let block = unsafe { &*block_ptr }; |
107 | |
108 | // Calculate the distance between the tail ptr and the target block |
109 | let distance = block.distance(start_index); |
110 | |
111 | // Decide if this call to `find_block` should attempt to update the |
112 | // `block_tail` pointer. |
113 | // |
114 | // Updating `block_tail` is not always performed in order to reduce |
115 | // contention. |
116 | // |
117 | // When set, as the routine walks the linked list, it attempts to update |
118 | // `block_tail`. If the update cannot be performed, `try_updating_tail` |
119 | // is unset. |
120 | let mut try_updating_tail = distance > offset; |
121 | |
122 | // Walk the linked list of blocks until the block with `start_index` is |
123 | // found. |
124 | loop { |
125 | let block = unsafe { &(*block_ptr) }; |
126 | |
127 | if block.is_at_index(start_index) { |
128 | return unsafe { NonNull::new_unchecked(block_ptr) }; |
129 | } |
130 | |
131 | let next_block = block |
132 | .load_next(Acquire) |
133 | // There is no allocated next block, grow the linked list. |
134 | .unwrap_or_else(|| block.grow()); |
135 | |
136 | // If the block is **not** final, then the tail pointer cannot be |
137 | // advanced any more. |
138 | try_updating_tail &= block.is_final(); |
139 | |
140 | if try_updating_tail { |
141 | // Advancing `block_tail` must happen when walking the linked |
142 | // list. `block_tail` may not advance passed any blocks that are |
143 | // not "final". At the point a block is finalized, it is unknown |
144 | // if there are any prior blocks that are unfinalized, which |
145 | // makes it impossible to advance `block_tail`. |
146 | // |
147 | // While walking the linked list, `block_tail` can be advanced |
148 | // as long as finalized blocks are traversed. |
149 | // |
150 | // Release ordering is used to ensure that any subsequent reads |
151 | // are able to see the memory pointed to by `block_tail`. |
152 | // |
153 | // Acquire is not needed as any "actual" value is not accessed. |
154 | // At this point, the linked list is walked to acquire blocks. |
155 | if self |
156 | .block_tail |
157 | .compare_exchange(block_ptr, next_block.as_ptr(), Release, Relaxed) |
158 | .is_ok() |
159 | { |
160 | // Synchronize with any senders |
161 | let tail_position = self.tail_position.fetch_add(0, Release); |
162 | |
163 | unsafe { |
164 | block.tx_release(tail_position); |
165 | } |
166 | } else { |
167 | // A concurrent sender is also working on advancing |
168 | // `block_tail` and this thread is falling behind. |
169 | // |
170 | // Stop trying to advance the tail pointer |
171 | try_updating_tail = false; |
172 | } |
173 | } |
174 | |
175 | block_ptr = next_block.as_ptr(); |
176 | |
177 | thread::yield_now(); |
178 | } |
179 | } |
180 | |
181 | pub(crate) unsafe fn reclaim_block(&self, mut block: NonNull<Block<T>>) { |
182 | // The block has been removed from the linked list and ownership |
183 | // is reclaimed. |
184 | // |
185 | // Before dropping the block, see if it can be reused by |
186 | // inserting it back at the end of the linked list. |
187 | // |
188 | // First, reset the data |
189 | block.as_mut().reclaim(); |
190 | |
191 | let mut reused = false; |
192 | |
193 | // Attempt to insert the block at the end |
194 | // |
195 | // Walk at most three times |
196 | // |
197 | let curr_ptr = self.block_tail.load(Acquire); |
198 | |
199 | // The pointer can never be null |
200 | debug_assert!(!curr_ptr.is_null()); |
201 | |
202 | let mut curr = NonNull::new_unchecked(curr_ptr); |
203 | |
204 | // TODO: Unify this logic with Block::grow |
205 | for _ in 0..3 { |
206 | match curr.as_ref().try_push(&mut block, AcqRel, Acquire) { |
207 | Ok(()) => { |
208 | reused = true; |
209 | break; |
210 | } |
211 | Err(next) => { |
212 | curr = next; |
213 | } |
214 | } |
215 | } |
216 | |
217 | if !reused { |
218 | let _ = Box::from_raw(block.as_ptr()); |
219 | } |
220 | } |
221 | } |
222 | |
223 | impl<T> fmt::Debug for Tx<T> { |
224 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
225 | fmt&mut DebugStruct<'_, '_>.debug_struct("Tx" ) |
226 | .field("block_tail" , &self.block_tail.load(Relaxed)) |
227 | .field(name:"tail_position" , &self.tail_position.load(order:Relaxed)) |
228 | .finish() |
229 | } |
230 | } |
231 | |
232 | impl<T> Rx<T> { |
233 | /// Pops the next value off the queue. |
234 | pub(crate) fn pop(&mut self, tx: &Tx<T>) -> Option<block::Read<T>> { |
235 | // Advance `head`, if needed |
236 | if !self.try_advancing_head() { |
237 | return None; |
238 | } |
239 | |
240 | self.reclaim_blocks(tx); |
241 | |
242 | unsafe { |
243 | let block = self.head.as_ref(); |
244 | |
245 | let ret = block.read(self.index); |
246 | |
247 | if let Some(block::Read::Value(..)) = ret { |
248 | self.index = self.index.wrapping_add(1); |
249 | } |
250 | |
251 | ret |
252 | } |
253 | } |
254 | |
255 | /// Pops the next value off the queue, detecting whether the block |
256 | /// is busy or empty on failure. |
257 | /// |
258 | /// This function exists because `Rx::pop` can return `None` even if the |
259 | /// channel's queue contains a message that has been completely written. |
260 | /// This can happen if the fully delivered message is behind another message |
261 | /// that is in the middle of being written to the block, since the channel |
262 | /// can't return the messages out of order. |
263 | pub(crate) fn try_pop(&mut self, tx: &Tx<T>) -> TryPopResult<T> { |
264 | let tail_position = tx.tail_position.load(Acquire); |
265 | let result = self.pop(tx); |
266 | |
267 | match result { |
268 | Some(block::Read::Value(t)) => TryPopResult::Ok(t), |
269 | Some(block::Read::Closed) => TryPopResult::Closed, |
270 | None if tail_position == self.index => TryPopResult::Empty, |
271 | None => TryPopResult::Busy, |
272 | } |
273 | } |
274 | |
275 | /// Tries advancing the block pointer to the block referenced by `self.index`. |
276 | /// |
277 | /// Returns `true` if successful, `false` if there is no next block to load. |
278 | fn try_advancing_head(&mut self) -> bool { |
279 | let block_index = block::start_index(self.index); |
280 | |
281 | loop { |
282 | let next_block = { |
283 | let block = unsafe { self.head.as_ref() }; |
284 | |
285 | if block.is_at_index(block_index) { |
286 | return true; |
287 | } |
288 | |
289 | block.load_next(Acquire) |
290 | }; |
291 | |
292 | let next_block = match next_block { |
293 | Some(next_block) => next_block, |
294 | None => { |
295 | return false; |
296 | } |
297 | }; |
298 | |
299 | self.head = next_block; |
300 | |
301 | thread::yield_now(); |
302 | } |
303 | } |
304 | |
305 | fn reclaim_blocks(&mut self, tx: &Tx<T>) { |
306 | while self.free_head != self.head { |
307 | unsafe { |
308 | // Get a handle to the block that will be freed and update |
309 | // `free_head` to point to the next block. |
310 | let block = self.free_head; |
311 | |
312 | let observed_tail_position = block.as_ref().observed_tail_position(); |
313 | |
314 | let required_index = match observed_tail_position { |
315 | Some(i) => i, |
316 | None => return, |
317 | }; |
318 | |
319 | if required_index > self.index { |
320 | return; |
321 | } |
322 | |
323 | // We may read the next pointer with `Relaxed` ordering as it is |
324 | // guaranteed that the `reclaim_blocks` routine trails the `recv` |
325 | // routine. Any memory accessed by `reclaim_blocks` has already |
326 | // been acquired by `recv`. |
327 | let next_block = block.as_ref().load_next(Relaxed); |
328 | |
329 | // Update the free list head |
330 | self.free_head = next_block.unwrap(); |
331 | |
332 | // Push the emptied block onto the back of the queue, making it |
333 | // available to senders. |
334 | tx.reclaim_block(block); |
335 | } |
336 | |
337 | thread::yield_now(); |
338 | } |
339 | } |
340 | |
341 | /// Effectively `Drop` all the blocks. Should only be called once, when |
342 | /// the list is dropping. |
343 | pub(super) unsafe fn free_blocks(&mut self) { |
344 | debug_assert_ne!(self.free_head, NonNull::dangling()); |
345 | |
346 | let mut cur = Some(self.free_head); |
347 | |
348 | #[cfg (debug_assertions)] |
349 | { |
350 | // to trigger the debug assert above so as to catch that we |
351 | // don't call `free_blocks` more than once. |
352 | self.free_head = NonNull::dangling(); |
353 | self.head = NonNull::dangling(); |
354 | } |
355 | |
356 | while let Some(block) = cur { |
357 | cur = block.as_ref().load_next(Relaxed); |
358 | drop(Box::from_raw(block.as_ptr())); |
359 | } |
360 | } |
361 | } |
362 | |
363 | impl<T> fmt::Debug for Rx<T> { |
364 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
365 | fmt&mut DebugStruct<'_, '_>.debug_struct("Rx" ) |
366 | .field("head" , &self.head) |
367 | .field("index" , &self.index) |
368 | .field(name:"free_head" , &self.free_head) |
369 | .finish() |
370 | } |
371 | } |
372 | |