1use alloc::boxed::Box;
2use core::mem::MaybeUninit;
3use core::ptr;
4
5use crossbeam_utils::CachePadded;
6
7use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
8use crate::sync::cell::UnsafeCell;
9#[allow(unused_imports)]
10use crate::sync::prelude::*;
11use crate::{busy_wait, PopError, PushError};
12
13// Bits indicating the state of a slot:
14// * If a value has been written into the slot, `WRITE` is set.
15// * If a value has been read from the slot, `READ` is set.
16// * If the block is being destroyed, `DESTROY` is set.
17const WRITE: usize = 1;
18const READ: usize = 2;
19const DESTROY: usize = 4;
20
21// Each block covers one "lap" of indices.
22const LAP: usize = 32;
23// The maximum number of items a block can hold.
24const BLOCK_CAP: usize = LAP - 1;
25// How many lower bits are reserved for metadata.
26const SHIFT: usize = 1;
27// Has two different purposes:
28// * If set in head, indicates that the block is not the last one.
29// * If set in tail, indicates that the queue is closed.
30const MARK_BIT: usize = 1;
31
32/// A slot in a block.
33struct Slot<T> {
34 /// The value.
35 value: UnsafeCell<MaybeUninit<T>>,
36
37 /// The state of the slot.
38 state: AtomicUsize,
39}
40
41impl<T> Slot<T> {
42 #[cfg(not(loom))]
43 const UNINIT: Slot<T> = Slot {
44 value: UnsafeCell::new(MaybeUninit::uninit()),
45 state: AtomicUsize::new(0),
46 };
47
48 #[cfg(not(loom))]
49 fn uninit_block() -> [Slot<T>; BLOCK_CAP] {
50 [Self::UNINIT; BLOCK_CAP]
51 }
52
53 #[cfg(loom)]
54 fn uninit_block() -> [Slot<T>; BLOCK_CAP] {
55 // Repeat this expression 31 times.
56 // Update if we change BLOCK_CAP
57 macro_rules! repeat_31 {
58 ($e: expr) => {
59 [
60 $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e,
61 $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e,
62 ]
63 };
64 }
65
66 repeat_31!(Slot {
67 value: UnsafeCell::new(MaybeUninit::uninit()),
68 state: AtomicUsize::new(0),
69 })
70 }
71
72 /// Waits until a value is written into the slot.
73 fn wait_write(&self) {
74 while self.state.load(Ordering::Acquire) & WRITE == 0 {
75 busy_wait();
76 }
77 }
78}
79
80/// A block in a linked list.
81///
82/// Each block in the list can hold up to `BLOCK_CAP` values.
83struct Block<T> {
84 /// The next block in the linked list.
85 next: AtomicPtr<Block<T>>,
86
87 /// Slots for values.
88 slots: [Slot<T>; BLOCK_CAP],
89}
90
91impl<T> Block<T> {
92 /// Creates an empty block.
93 fn new() -> Block<T> {
94 Block {
95 next: AtomicPtr::new(ptr::null_mut()),
96 slots: Slot::uninit_block(),
97 }
98 }
99
100 /// Waits until the next pointer is set.
101 fn wait_next(&self) -> *mut Block<T> {
102 loop {
103 let next = self.next.load(Ordering::Acquire);
104 if !next.is_null() {
105 return next;
106 }
107 busy_wait();
108 }
109 }
110
111 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
112 unsafe fn destroy(this: *mut Block<T>, start: usize) {
113 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
114 // begun destruction of the block.
115 for i in start..BLOCK_CAP - 1 {
116 let slot = (*this).slots.get_unchecked(i);
117
118 // Mark the `DESTROY` bit if a thread is still using the slot.
119 if slot.state.load(Ordering::Acquire) & READ == 0
120 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
121 {
122 // If a thread is still using the slot, it will continue destruction of the block.
123 return;
124 }
125 }
126
127 // No thread is using the block, now it is safe to destroy it.
128 drop(Box::from_raw(this));
129 }
130}
131
132/// A position in a queue.
133struct Position<T> {
134 /// The index in the queue.
135 index: AtomicUsize,
136
137 /// The block in the linked list.
138 block: AtomicPtr<Block<T>>,
139}
140
141/// An unbounded queue.
142pub struct Unbounded<T> {
143 /// The head of the queue.
144 head: CachePadded<Position<T>>,
145
146 /// The tail of the queue.
147 tail: CachePadded<Position<T>>,
148}
149
150impl<T> Unbounded<T> {
151 /// Creates a new unbounded queue.
152 pub fn new() -> Unbounded<T> {
153 Unbounded {
154 head: CachePadded::new(Position {
155 block: AtomicPtr::new(ptr::null_mut()),
156 index: AtomicUsize::new(0),
157 }),
158 tail: CachePadded::new(Position {
159 block: AtomicPtr::new(ptr::null_mut()),
160 index: AtomicUsize::new(0),
161 }),
162 }
163 }
164
165 /// Pushes an item into the queue.
166 pub fn push(&self, value: T) -> Result<(), PushError<T>> {
167 let mut tail = self.tail.index.load(Ordering::Acquire);
168 let mut block = self.tail.block.load(Ordering::Acquire);
169 let mut next_block = None;
170
171 loop {
172 // Check if the queue is closed.
173 if tail & MARK_BIT != 0 {
174 return Err(PushError::Closed(value));
175 }
176
177 // Calculate the offset of the index into the block.
178 let offset = (tail >> SHIFT) % LAP;
179
180 // If we reached the end of the block, wait until the next one is installed.
181 if offset == BLOCK_CAP {
182 busy_wait();
183 tail = self.tail.index.load(Ordering::Acquire);
184 block = self.tail.block.load(Ordering::Acquire);
185 continue;
186 }
187
188 // If we're going to have to install the next block, allocate it in advance in order to
189 // make the wait for other threads as short as possible.
190 if offset + 1 == BLOCK_CAP && next_block.is_none() {
191 next_block = Some(Box::new(Block::<T>::new()));
192 }
193
194 // If this is the first value to be pushed into the queue, we need to allocate the
195 // first block and install it.
196 if block.is_null() {
197 let new = Box::into_raw(Box::new(Block::<T>::new()));
198
199 if self
200 .tail
201 .block
202 .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
203 .is_ok()
204 {
205 self.head.block.store(new, Ordering::Release);
206 block = new;
207 } else {
208 next_block = unsafe { Some(Box::from_raw(new)) };
209 tail = self.tail.index.load(Ordering::Acquire);
210 block = self.tail.block.load(Ordering::Acquire);
211 continue;
212 }
213 }
214
215 let new_tail = tail + (1 << SHIFT);
216
217 // Try advancing the tail forward.
218 match self.tail.index.compare_exchange_weak(
219 tail,
220 new_tail,
221 Ordering::SeqCst,
222 Ordering::Acquire,
223 ) {
224 Ok(_) => unsafe {
225 // If we've reached the end of the block, install the next one.
226 if offset + 1 == BLOCK_CAP {
227 let next_block = Box::into_raw(next_block.unwrap());
228 self.tail.block.store(next_block, Ordering::Release);
229 self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
230 (*block).next.store(next_block, Ordering::Release);
231 }
232
233 // Write the value into the slot.
234 let slot = (*block).slots.get_unchecked(offset);
235 slot.value.with_mut(|slot| {
236 slot.write(MaybeUninit::new(value));
237 });
238 slot.state.fetch_or(WRITE, Ordering::Release);
239 return Ok(());
240 },
241 Err(t) => {
242 tail = t;
243 block = self.tail.block.load(Ordering::Acquire);
244 }
245 }
246 }
247 }
248
249 /// Pops an item from the queue.
250 pub fn pop(&self) -> Result<T, PopError> {
251 let mut head = self.head.index.load(Ordering::Acquire);
252 let mut block = self.head.block.load(Ordering::Acquire);
253
254 loop {
255 // Calculate the offset of the index into the block.
256 let offset = (head >> SHIFT) % LAP;
257
258 // If we reached the end of the block, wait until the next one is installed.
259 if offset == BLOCK_CAP {
260 busy_wait();
261 head = self.head.index.load(Ordering::Acquire);
262 block = self.head.block.load(Ordering::Acquire);
263 continue;
264 }
265
266 let mut new_head = head + (1 << SHIFT);
267
268 if new_head & MARK_BIT == 0 {
269 crate::full_fence();
270 let tail = self.tail.index.load(Ordering::Relaxed);
271
272 // If the tail equals the head, that means the queue is empty.
273 if head >> SHIFT == tail >> SHIFT {
274 // Check if the queue is closed.
275 if tail & MARK_BIT != 0 {
276 return Err(PopError::Closed);
277 } else {
278 return Err(PopError::Empty);
279 }
280 }
281
282 // If head and tail are not in the same block, set `MARK_BIT` in head.
283 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
284 new_head |= MARK_BIT;
285 }
286 }
287
288 // The block can be null here only if the first push operation is in progress.
289 if block.is_null() {
290 busy_wait();
291 head = self.head.index.load(Ordering::Acquire);
292 block = self.head.block.load(Ordering::Acquire);
293 continue;
294 }
295
296 // Try moving the head index forward.
297 match self.head.index.compare_exchange_weak(
298 head,
299 new_head,
300 Ordering::SeqCst,
301 Ordering::Acquire,
302 ) {
303 Ok(_) => unsafe {
304 // If we've reached the end of the block, move to the next one.
305 if offset + 1 == BLOCK_CAP {
306 let next = (*block).wait_next();
307 let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
308 if !(*next).next.load(Ordering::Relaxed).is_null() {
309 next_index |= MARK_BIT;
310 }
311
312 self.head.block.store(next, Ordering::Release);
313 self.head.index.store(next_index, Ordering::Release);
314 }
315
316 // Read the value.
317 let slot = (*block).slots.get_unchecked(offset);
318 slot.wait_write();
319 let value = slot.value.with_mut(|slot| slot.read().assume_init());
320
321 // Destroy the block if we've reached the end, or if another thread wanted to
322 // destroy but couldn't because we were busy reading from the slot.
323 if offset + 1 == BLOCK_CAP {
324 Block::destroy(block, 0);
325 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
326 Block::destroy(block, offset + 1);
327 }
328
329 return Ok(value);
330 },
331 Err(h) => {
332 head = h;
333 block = self.head.block.load(Ordering::Acquire);
334 }
335 }
336 }
337 }
338
339 /// Returns the number of items in the queue.
340 pub fn len(&self) -> usize {
341 loop {
342 // Load the tail index, then load the head index.
343 let mut tail = self.tail.index.load(Ordering::SeqCst);
344 let mut head = self.head.index.load(Ordering::SeqCst);
345
346 // If the tail index didn't change, we've got consistent indices to work with.
347 if self.tail.index.load(Ordering::SeqCst) == tail {
348 // Erase the lower bits.
349 tail &= !((1 << SHIFT) - 1);
350 head &= !((1 << SHIFT) - 1);
351
352 // Fix up indices if they fall onto block ends.
353 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
354 tail = tail.wrapping_add(1 << SHIFT);
355 }
356 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
357 head = head.wrapping_add(1 << SHIFT);
358 }
359
360 // Rotate indices so that head falls into the first block.
361 let lap = (head >> SHIFT) / LAP;
362 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
363 head = head.wrapping_sub((lap * LAP) << SHIFT);
364
365 // Remove the lower bits.
366 tail >>= SHIFT;
367 head >>= SHIFT;
368
369 // Return the difference minus the number of blocks between tail and head.
370 return tail - head - tail / LAP;
371 }
372 }
373 }
374
375 /// Returns `true` if the queue is empty.
376 pub fn is_empty(&self) -> bool {
377 let head = self.head.index.load(Ordering::SeqCst);
378 let tail = self.tail.index.load(Ordering::SeqCst);
379 head >> SHIFT == tail >> SHIFT
380 }
381
382 /// Returns `true` if the queue is full.
383 pub fn is_full(&self) -> bool {
384 false
385 }
386
387 /// Closes the queue.
388 ///
389 /// Returns `true` if this call closed the queue.
390 pub fn close(&self) -> bool {
391 let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
392 tail & MARK_BIT == 0
393 }
394
395 /// Returns `true` if the queue is closed.
396 pub fn is_closed(&self) -> bool {
397 self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
398 }
399}
400
401impl<T> Drop for Unbounded<T> {
402 fn drop(&mut self) {
403 let Self { head, tail } = self;
404 let Position { index: head, block } = &mut **head;
405
406 head.with_mut(|&mut mut head| {
407 tail.index.with_mut(|&mut mut tail| {
408 // Erase the lower bits.
409 head &= !((1 << SHIFT) - 1);
410 tail &= !((1 << SHIFT) - 1);
411
412 unsafe {
413 // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
414 while head != tail {
415 let offset = (head >> SHIFT) % LAP;
416
417 if offset < BLOCK_CAP {
418 // Drop the value in the slot.
419 block.with_mut(|block| {
420 let slot = (**block).slots.get_unchecked(offset);
421 slot.value.with_mut(|slot| {
422 let value = &mut *slot;
423 value.as_mut_ptr().drop_in_place();
424 });
425 });
426 } else {
427 // Deallocate the block and move to the next one.
428 block.with_mut(|block| {
429 let next_block = (**block).next.with_mut(|next| *next);
430 drop(Box::from_raw(*block));
431 *block = next_block;
432 });
433 }
434
435 head = head.wrapping_add(1 << SHIFT);
436 }
437
438 // Deallocate the last remaining block.
439 block.with_mut(|block| {
440 if !block.is_null() {
441 drop(Box::from_raw(*block));
442 }
443 });
444 }
445 });
446 });
447 }
448}
449