1 | use core::mem::MaybeUninit; |
2 | |
3 | use crate::sync::atomic::{AtomicUsize, Ordering}; |
4 | use crate::sync::cell::UnsafeCell; |
5 | #[allow (unused_imports)] |
6 | use crate::sync::prelude::*; |
7 | use crate::{busy_wait, PopError, PushError}; |
8 | |
9 | const LOCKED: usize = 1 << 0; |
10 | const PUSHED: usize = 1 << 1; |
11 | const CLOSED: usize = 1 << 2; |
12 | |
13 | /// A single-element queue. |
14 | pub struct Single<T> { |
15 | state: AtomicUsize, |
16 | slot: UnsafeCell<MaybeUninit<T>>, |
17 | } |
18 | |
19 | impl<T> Single<T> { |
20 | /// Creates a new single-element queue. |
21 | pub fn new() -> Single<T> { |
22 | Single { |
23 | state: AtomicUsize::new(0), |
24 | slot: UnsafeCell::new(MaybeUninit::uninit()), |
25 | } |
26 | } |
27 | |
28 | /// Attempts to push an item into the queue. |
29 | pub fn push(&self, value: T) -> Result<(), PushError<T>> { |
30 | // Lock and fill the slot. |
31 | let state = self |
32 | .state |
33 | .compare_exchange(0, LOCKED | PUSHED, Ordering::SeqCst, Ordering::SeqCst) |
34 | .unwrap_or_else(|x| x); |
35 | |
36 | if state == 0 { |
37 | // Write the value and unlock. |
38 | self.slot.with_mut(|slot| unsafe { |
39 | slot.write(MaybeUninit::new(value)); |
40 | }); |
41 | self.state.fetch_and(!LOCKED, Ordering::Release); |
42 | Ok(()) |
43 | } else if state & CLOSED != 0 { |
44 | Err(PushError::Closed(value)) |
45 | } else { |
46 | Err(PushError::Full(value)) |
47 | } |
48 | } |
49 | |
50 | /// Attempts to pop an item from the queue. |
51 | pub fn pop(&self) -> Result<T, PopError> { |
52 | let mut state = PUSHED; |
53 | loop { |
54 | // Lock and empty the slot. |
55 | let prev = self |
56 | .state |
57 | .compare_exchange( |
58 | state, |
59 | (state | LOCKED) & !PUSHED, |
60 | Ordering::SeqCst, |
61 | Ordering::SeqCst, |
62 | ) |
63 | .unwrap_or_else(|x| x); |
64 | |
65 | if prev == state { |
66 | // Read the value and unlock. |
67 | let value = self |
68 | .slot |
69 | .with_mut(|slot| unsafe { slot.read().assume_init() }); |
70 | self.state.fetch_and(!LOCKED, Ordering::Release); |
71 | return Ok(value); |
72 | } |
73 | |
74 | if prev & PUSHED == 0 { |
75 | if prev & CLOSED == 0 { |
76 | return Err(PopError::Empty); |
77 | } else { |
78 | return Err(PopError::Closed); |
79 | } |
80 | } |
81 | |
82 | if prev & LOCKED == 0 { |
83 | state = prev; |
84 | } else { |
85 | busy_wait(); |
86 | state = prev & !LOCKED; |
87 | } |
88 | } |
89 | } |
90 | |
91 | /// Returns the number of items in the queue. |
92 | pub fn len(&self) -> usize { |
93 | usize::from(self.state.load(Ordering::SeqCst) & PUSHED != 0) |
94 | } |
95 | |
96 | /// Returns `true` if the queue is empty. |
97 | pub fn is_empty(&self) -> bool { |
98 | self.len() == 0 |
99 | } |
100 | |
101 | /// Returns `true` if the queue is full. |
102 | pub fn is_full(&self) -> bool { |
103 | self.len() == 1 |
104 | } |
105 | |
106 | /// Closes the queue. |
107 | /// |
108 | /// Returns `true` if this call closed the queue. |
109 | pub fn close(&self) -> bool { |
110 | let state = self.state.fetch_or(CLOSED, Ordering::SeqCst); |
111 | state & CLOSED == 0 |
112 | } |
113 | |
114 | /// Returns `true` if the queue is closed. |
115 | pub fn is_closed(&self) -> bool { |
116 | self.state.load(Ordering::SeqCst) & CLOSED != 0 |
117 | } |
118 | } |
119 | |
120 | impl<T> Drop for Single<T> { |
121 | fn drop(&mut self) { |
122 | // Drop the value in the slot. |
123 | let Self { state: &mut AtomicUsize, slot: &mut UnsafeCell> } = self; |
124 | state.with_mut(|state: &mut usize| { |
125 | if *state & PUSHED != 0 { |
126 | slot.with_mut(|slot: *mut MaybeUninit| unsafe { |
127 | let value: &mut MaybeUninit = &mut *slot; |
128 | value.as_mut_ptr().drop_in_place(); |
129 | }); |
130 | } |
131 | }); |
132 | } |
133 | } |
134 | |