1use core::mem::MaybeUninit;
2
3use crate::sync::atomic::{AtomicUsize, Ordering};
4use crate::sync::cell::UnsafeCell;
5#[allow(unused_imports)]
6use crate::sync::prelude::*;
7use crate::{busy_wait, PopError, PushError};
8
9const LOCKED: usize = 1 << 0;
10const PUSHED: usize = 1 << 1;
11const CLOSED: usize = 1 << 2;
12
13/// A single-element queue.
14pub struct Single<T> {
15 state: AtomicUsize,
16 slot: UnsafeCell<MaybeUninit<T>>,
17}
18
19impl<T> Single<T> {
20 /// Creates a new single-element queue.
21 pub fn new() -> Single<T> {
22 Single {
23 state: AtomicUsize::new(0),
24 slot: UnsafeCell::new(MaybeUninit::uninit()),
25 }
26 }
27
28 /// Attempts to push an item into the queue.
29 pub fn push(&self, value: T) -> Result<(), PushError<T>> {
30 // Lock and fill the slot.
31 let state = self
32 .state
33 .compare_exchange(0, LOCKED | PUSHED, Ordering::SeqCst, Ordering::SeqCst)
34 .unwrap_or_else(|x| x);
35
36 if state == 0 {
37 // Write the value and unlock.
38 self.slot.with_mut(|slot| unsafe {
39 slot.write(MaybeUninit::new(value));
40 });
41 self.state.fetch_and(!LOCKED, Ordering::Release);
42 Ok(())
43 } else if state & CLOSED != 0 {
44 Err(PushError::Closed(value))
45 } else {
46 Err(PushError::Full(value))
47 }
48 }
49
50 /// Attempts to pop an item from the queue.
51 pub fn pop(&self) -> Result<T, PopError> {
52 let mut state = PUSHED;
53 loop {
54 // Lock and empty the slot.
55 let prev = self
56 .state
57 .compare_exchange(
58 state,
59 (state | LOCKED) & !PUSHED,
60 Ordering::SeqCst,
61 Ordering::SeqCst,
62 )
63 .unwrap_or_else(|x| x);
64
65 if prev == state {
66 // Read the value and unlock.
67 let value = self
68 .slot
69 .with_mut(|slot| unsafe { slot.read().assume_init() });
70 self.state.fetch_and(!LOCKED, Ordering::Release);
71 return Ok(value);
72 }
73
74 if prev & PUSHED == 0 {
75 if prev & CLOSED == 0 {
76 return Err(PopError::Empty);
77 } else {
78 return Err(PopError::Closed);
79 }
80 }
81
82 if prev & LOCKED == 0 {
83 state = prev;
84 } else {
85 busy_wait();
86 state = prev & !LOCKED;
87 }
88 }
89 }
90
91 /// Returns the number of items in the queue.
92 pub fn len(&self) -> usize {
93 usize::from(self.state.load(Ordering::SeqCst) & PUSHED != 0)
94 }
95
96 /// Returns `true` if the queue is empty.
97 pub fn is_empty(&self) -> bool {
98 self.len() == 0
99 }
100
101 /// Returns `true` if the queue is full.
102 pub fn is_full(&self) -> bool {
103 self.len() == 1
104 }
105
106 /// Closes the queue.
107 ///
108 /// Returns `true` if this call closed the queue.
109 pub fn close(&self) -> bool {
110 let state = self.state.fetch_or(CLOSED, Ordering::SeqCst);
111 state & CLOSED == 0
112 }
113
114 /// Returns `true` if the queue is closed.
115 pub fn is_closed(&self) -> bool {
116 self.state.load(Ordering::SeqCst) & CLOSED != 0
117 }
118}
119
120impl<T> Drop for Single<T> {
121 fn drop(&mut self) {
122 // Drop the value in the slot.
123 let Self { state: &mut AtomicUsize, slot: &mut UnsafeCell> } = self;
124 state.with_mut(|state: &mut usize| {
125 if *state & PUSHED != 0 {
126 slot.with_mut(|slot: *mut MaybeUninit| unsafe {
127 let value: &mut MaybeUninit = &mut *slot;
128 value.as_mut_ptr().drop_in_place();
129 });
130 }
131 });
132 }
133}
134