1use alloc::boxed::Box;
2use core::mem::MaybeUninit;
3use core::ptr;
4
5use crossbeam_utils::CachePadded;
6
7use crate::const_fn;
8use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
9use crate::sync::cell::UnsafeCell;
10#[allow(unused_imports)]
11use crate::sync::prelude::*;
12use crate::{busy_wait, PopError, PushError};
13
14// Bits indicating the state of a slot:
15// * If a value has been written into the slot, `WRITE` is set.
16// * If a value has been read from the slot, `READ` is set.
17// * If the block is being destroyed, `DESTROY` is set.
18const WRITE: usize = 1;
19const READ: usize = 2;
20const DESTROY: usize = 4;
21
22// Each block covers one "lap" of indices.
23const LAP: usize = 32;
24// The maximum number of items a block can hold.
25const BLOCK_CAP: usize = LAP - 1;
26// How many lower bits are reserved for metadata.
27const SHIFT: usize = 1;
28// Has two different purposes:
29// * If set in head, indicates that the block is not the last one.
30// * If set in tail, indicates that the queue is closed.
31const MARK_BIT: usize = 1;
32
33/// A slot in a block.
34struct Slot<T> {
35 /// The value.
36 value: UnsafeCell<MaybeUninit<T>>,
37
38 /// The state of the slot.
39 state: AtomicUsize,
40}
41
42impl<T> Slot<T> {
43 #[cfg(not(loom))]
44 const UNINIT: Slot<T> = Slot {
45 value: UnsafeCell::new(MaybeUninit::uninit()),
46 state: AtomicUsize::new(0),
47 };
48
49 #[cfg(not(loom))]
50 fn uninit_block() -> [Slot<T>; BLOCK_CAP] {
51 [Self::UNINIT; BLOCK_CAP]
52 }
53
54 #[cfg(loom)]
55 fn uninit_block() -> [Slot<T>; BLOCK_CAP] {
56 // Repeat this expression 31 times.
57 // Update if we change BLOCK_CAP
58 macro_rules! repeat_31 {
59 ($e: expr) => {
60 [
61 $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e,
62 $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e,
63 ]
64 };
65 }
66
67 repeat_31!(Slot {
68 value: UnsafeCell::new(MaybeUninit::uninit()),
69 state: AtomicUsize::new(0),
70 })
71 }
72
73 /// Waits until a value is written into the slot.
74 fn wait_write(&self) {
75 while self.state.load(Ordering::Acquire) & WRITE == 0 {
76 busy_wait();
77 }
78 }
79}
80
81/// A block in a linked list.
82///
83/// Each block in the list can hold up to `BLOCK_CAP` values.
84struct Block<T> {
85 /// The next block in the linked list.
86 next: AtomicPtr<Block<T>>,
87
88 /// Slots for values.
89 slots: [Slot<T>; BLOCK_CAP],
90}
91
92impl<T> Block<T> {
93 /// Creates an empty block.
94 fn new() -> Block<T> {
95 Block {
96 next: AtomicPtr::new(ptr::null_mut()),
97 slots: Slot::uninit_block(),
98 }
99 }
100
101 /// Waits until the next pointer is set.
102 fn wait_next(&self) -> *mut Block<T> {
103 loop {
104 let next = self.next.load(Ordering::Acquire);
105 if !next.is_null() {
106 return next;
107 }
108 busy_wait();
109 }
110 }
111
112 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
113 unsafe fn destroy(this: *mut Block<T>, start: usize) {
114 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
115 // begun destruction of the block.
116 for i in start..BLOCK_CAP - 1 {
117 let slot = (*this).slots.get_unchecked(i);
118
119 // Mark the `DESTROY` bit if a thread is still using the slot.
120 if slot.state.load(Ordering::Acquire) & READ == 0
121 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
122 {
123 // If a thread is still using the slot, it will continue destruction of the block.
124 return;
125 }
126 }
127
128 // No thread is using the block, now it is safe to destroy it.
129 drop(Box::from_raw(this));
130 }
131}
132
133/// A position in a queue.
134struct Position<T> {
135 /// The index in the queue.
136 index: AtomicUsize,
137
138 /// The block in the linked list.
139 block: AtomicPtr<Block<T>>,
140}
141
142/// An unbounded queue.
143pub struct Unbounded<T> {
144 /// The head of the queue.
145 head: CachePadded<Position<T>>,
146
147 /// The tail of the queue.
148 tail: CachePadded<Position<T>>,
149}
150
151impl<T> Unbounded<T> {
152 const_fn!(
153 const_if: #[cfg(not(loom))];
154 /// Creates a new unbounded queue.
155 pub const fn new() -> Unbounded<T> {
156 Unbounded {
157 head: CachePadded::new(Position {
158 block: AtomicPtr::new(ptr::null_mut()),
159 index: AtomicUsize::new(0),
160 }),
161 tail: CachePadded::new(Position {
162 block: AtomicPtr::new(ptr::null_mut()),
163 index: AtomicUsize::new(0),
164 }),
165 }
166 }
167 );
168
169 /// Pushes an item into the queue.
170 pub fn push(&self, value: T) -> Result<(), PushError<T>> {
171 let mut tail = self.tail.index.load(Ordering::Acquire);
172 let mut block = self.tail.block.load(Ordering::Acquire);
173 let mut next_block = None;
174
175 loop {
176 // Check if the queue is closed.
177 if tail & MARK_BIT != 0 {
178 return Err(PushError::Closed(value));
179 }
180
181 // Calculate the offset of the index into the block.
182 let offset = (tail >> SHIFT) % LAP;
183
184 // If we reached the end of the block, wait until the next one is installed.
185 if offset == BLOCK_CAP {
186 busy_wait();
187 tail = self.tail.index.load(Ordering::Acquire);
188 block = self.tail.block.load(Ordering::Acquire);
189 continue;
190 }
191
192 // If we're going to have to install the next block, allocate it in advance in order to
193 // make the wait for other threads as short as possible.
194 if offset + 1 == BLOCK_CAP && next_block.is_none() {
195 next_block = Some(Box::new(Block::<T>::new()));
196 }
197
198 // If this is the first value to be pushed into the queue, we need to allocate the
199 // first block and install it.
200 if block.is_null() {
201 let new = Box::into_raw(Box::new(Block::<T>::new()));
202
203 if self
204 .tail
205 .block
206 .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
207 .is_ok()
208 {
209 self.head.block.store(new, Ordering::Release);
210 block = new;
211 } else {
212 next_block = unsafe { Some(Box::from_raw(new)) };
213 tail = self.tail.index.load(Ordering::Acquire);
214 block = self.tail.block.load(Ordering::Acquire);
215 continue;
216 }
217 }
218
219 let new_tail = tail + (1 << SHIFT);
220
221 // Try advancing the tail forward.
222 match self.tail.index.compare_exchange_weak(
223 tail,
224 new_tail,
225 Ordering::SeqCst,
226 Ordering::Acquire,
227 ) {
228 Ok(_) => unsafe {
229 // If we've reached the end of the block, install the next one.
230 if offset + 1 == BLOCK_CAP {
231 let next_block = Box::into_raw(next_block.unwrap());
232 self.tail.block.store(next_block, Ordering::Release);
233 self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
234 (*block).next.store(next_block, Ordering::Release);
235 }
236
237 // Write the value into the slot.
238 let slot = (*block).slots.get_unchecked(offset);
239 slot.value.with_mut(|slot| {
240 slot.write(MaybeUninit::new(value));
241 });
242 slot.state.fetch_or(WRITE, Ordering::Release);
243 return Ok(());
244 },
245 Err(t) => {
246 tail = t;
247 block = self.tail.block.load(Ordering::Acquire);
248 }
249 }
250 }
251 }
252
253 /// Pops an item from the queue.
254 pub fn pop(&self) -> Result<T, PopError> {
255 let mut head = self.head.index.load(Ordering::Acquire);
256 let mut block = self.head.block.load(Ordering::Acquire);
257
258 loop {
259 // Calculate the offset of the index into the block.
260 let offset = (head >> SHIFT) % LAP;
261
262 // If we reached the end of the block, wait until the next one is installed.
263 if offset == BLOCK_CAP {
264 busy_wait();
265 head = self.head.index.load(Ordering::Acquire);
266 block = self.head.block.load(Ordering::Acquire);
267 continue;
268 }
269
270 let mut new_head = head + (1 << SHIFT);
271
272 if new_head & MARK_BIT == 0 {
273 crate::full_fence();
274 let tail = self.tail.index.load(Ordering::Relaxed);
275
276 // If the tail equals the head, that means the queue is empty.
277 if head >> SHIFT == tail >> SHIFT {
278 // Check if the queue is closed.
279 if tail & MARK_BIT != 0 {
280 return Err(PopError::Closed);
281 } else {
282 return Err(PopError::Empty);
283 }
284 }
285
286 // If head and tail are not in the same block, set `MARK_BIT` in head.
287 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
288 new_head |= MARK_BIT;
289 }
290 }
291
292 // The block can be null here only if the first push operation is in progress.
293 if block.is_null() {
294 busy_wait();
295 head = self.head.index.load(Ordering::Acquire);
296 block = self.head.block.load(Ordering::Acquire);
297 continue;
298 }
299
300 // Try moving the head index forward.
301 match self.head.index.compare_exchange_weak(
302 head,
303 new_head,
304 Ordering::SeqCst,
305 Ordering::Acquire,
306 ) {
307 Ok(_) => unsafe {
308 // If we've reached the end of the block, move to the next one.
309 if offset + 1 == BLOCK_CAP {
310 let next = (*block).wait_next();
311 let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
312 if !(*next).next.load(Ordering::Relaxed).is_null() {
313 next_index |= MARK_BIT;
314 }
315
316 self.head.block.store(next, Ordering::Release);
317 self.head.index.store(next_index, Ordering::Release);
318 }
319
320 // Read the value.
321 let slot = (*block).slots.get_unchecked(offset);
322 slot.wait_write();
323 let value = slot.value.with_mut(|slot| slot.read().assume_init());
324
325 // Destroy the block if we've reached the end, or if another thread wanted to
326 // destroy but couldn't because we were busy reading from the slot.
327 if offset + 1 == BLOCK_CAP {
328 Block::destroy(block, 0);
329 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
330 Block::destroy(block, offset + 1);
331 }
332
333 return Ok(value);
334 },
335 Err(h) => {
336 head = h;
337 block = self.head.block.load(Ordering::Acquire);
338 }
339 }
340 }
341 }
342
343 /// Returns the number of items in the queue.
344 pub fn len(&self) -> usize {
345 loop {
346 // Load the tail index, then load the head index.
347 let mut tail = self.tail.index.load(Ordering::SeqCst);
348 let mut head = self.head.index.load(Ordering::SeqCst);
349
350 // If the tail index didn't change, we've got consistent indices to work with.
351 if self.tail.index.load(Ordering::SeqCst) == tail {
352 // Erase the lower bits.
353 tail &= !((1 << SHIFT) - 1);
354 head &= !((1 << SHIFT) - 1);
355
356 // Fix up indices if they fall onto block ends.
357 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
358 tail = tail.wrapping_add(1 << SHIFT);
359 }
360 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
361 head = head.wrapping_add(1 << SHIFT);
362 }
363
364 // Rotate indices so that head falls into the first block.
365 let lap = (head >> SHIFT) / LAP;
366 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
367 head = head.wrapping_sub((lap * LAP) << SHIFT);
368
369 // Remove the lower bits.
370 tail >>= SHIFT;
371 head >>= SHIFT;
372
373 // Return the difference minus the number of blocks between tail and head.
374 return tail - head - tail / LAP;
375 }
376 }
377 }
378
379 /// Returns `true` if the queue is empty.
380 pub fn is_empty(&self) -> bool {
381 let head = self.head.index.load(Ordering::SeqCst);
382 let tail = self.tail.index.load(Ordering::SeqCst);
383 head >> SHIFT == tail >> SHIFT
384 }
385
386 /// Returns `true` if the queue is full.
387 pub fn is_full(&self) -> bool {
388 false
389 }
390
391 /// Closes the queue.
392 ///
393 /// Returns `true` if this call closed the queue.
394 pub fn close(&self) -> bool {
395 let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
396 tail & MARK_BIT == 0
397 }
398
399 /// Returns `true` if the queue is closed.
400 pub fn is_closed(&self) -> bool {
401 self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
402 }
403}
404
405impl<T> Drop for Unbounded<T> {
406 fn drop(&mut self) {
407 let Self { head, tail } = self;
408 let Position { index: head, block } = &mut **head;
409
410 head.with_mut(|&mut mut head| {
411 tail.index.with_mut(|&mut mut tail| {
412 // Erase the lower bits.
413 head &= !((1 << SHIFT) - 1);
414 tail &= !((1 << SHIFT) - 1);
415
416 unsafe {
417 // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
418 while head != tail {
419 let offset = (head >> SHIFT) % LAP;
420
421 if offset < BLOCK_CAP {
422 // Drop the value in the slot.
423 block.with_mut(|block| {
424 let slot = (**block).slots.get_unchecked(offset);
425 slot.value.with_mut(|slot| {
426 let value = &mut *slot;
427 value.as_mut_ptr().drop_in_place();
428 });
429 });
430 } else {
431 // Deallocate the block and move to the next one.
432 block.with_mut(|block| {
433 let next_block = (**block).next.with_mut(|next| *next);
434 drop(Box::from_raw(*block));
435 *block = next_block;
436 });
437 }
438
439 head = head.wrapping_add(1 << SHIFT);
440 }
441
442 // Deallocate the last remaining block.
443 block.with_mut(|block| {
444 if !block.is_null() {
445 drop(Box::from_raw(*block));
446 }
447 });
448 }
449 });
450 });
451 }
452}
453