1use alloc::{boxed::Box, vec::Vec};
2use core::mem::MaybeUninit;
3
4use crossbeam_utils::CachePadded;
5
6use crate::sync::atomic::{AtomicUsize, Ordering};
7use crate::sync::cell::UnsafeCell;
8#[allow(unused_imports)]
9use crate::sync::prelude::*;
10use crate::{busy_wait, PopError, PushError};
11
12/// A slot in a queue.
13struct Slot<T> {
14 /// The current stamp.
15 stamp: AtomicUsize,
16
17 /// The value in this slot.
18 value: UnsafeCell<MaybeUninit<T>>,
19}
20
21/// A bounded queue.
22pub struct Bounded<T> {
23 /// The head of the queue.
24 ///
25 /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
26 /// packed into a single `usize`. The lower bits represent the index, while the upper bits
27 /// represent the lap. The mark bit in the head is always zero.
28 ///
29 /// Values are popped from the head of the queue.
30 head: CachePadded<AtomicUsize>,
31
32 /// The tail of the queue.
33 ///
34 /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
35 /// packed into a single `usize`. The lower bits represent the index, while the upper bits
36 /// represent the lap. The mark bit indicates that the queue is closed.
37 ///
38 /// Values are pushed into the tail of the queue.
39 tail: CachePadded<AtomicUsize>,
40
41 /// The buffer holding slots.
42 buffer: Box<[Slot<T>]>,
43
44 /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`.
45 one_lap: usize,
46
47 /// If this bit is set in the tail, that means the queue is closed.
48 mark_bit: usize,
49}
50
51impl<T> Bounded<T> {
52 /// Creates a new bounded queue.
53 pub fn new(cap: usize) -> Bounded<T> {
54 assert!(cap > 0, "capacity must be positive");
55
56 // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`.
57 let head = 0;
58 // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`.
59 let tail = 0;
60
61 // Allocate a buffer of `cap` slots initialized with stamps.
62 let mut buffer = Vec::with_capacity(cap);
63 for i in 0..cap {
64 // Set the stamp to `{ lap: 0, mark: 0, index: i }`.
65 buffer.push(Slot {
66 stamp: AtomicUsize::new(i),
67 value: UnsafeCell::new(MaybeUninit::uninit()),
68 });
69 }
70
71 // Compute constants `mark_bit` and `one_lap`.
72 let mark_bit = (cap + 1).next_power_of_two();
73 let one_lap = mark_bit * 2;
74
75 Bounded {
76 buffer: buffer.into(),
77 one_lap,
78 mark_bit,
79 head: CachePadded::new(AtomicUsize::new(head)),
80 tail: CachePadded::new(AtomicUsize::new(tail)),
81 }
82 }
83
84 /// Attempts to push an item into the queue.
85 pub fn push(&self, value: T) -> Result<(), PushError<T>> {
86 let mut tail = self.tail.load(Ordering::Relaxed);
87
88 loop {
89 // Check if the queue is closed.
90 if tail & self.mark_bit != 0 {
91 return Err(PushError::Closed(value));
92 }
93
94 // Deconstruct the tail.
95 let index = tail & (self.mark_bit - 1);
96 let lap = tail & !(self.one_lap - 1);
97
98 // Inspect the corresponding slot.
99 let slot = &self.buffer[index];
100 let stamp = slot.stamp.load(Ordering::Acquire);
101
102 // If the tail and the stamp match, we may attempt to push.
103 if tail == stamp {
104 let new_tail = if index + 1 < self.buffer.len() {
105 // Same lap, incremented index.
106 // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
107 tail + 1
108 } else {
109 // One lap forward, index wraps around to zero.
110 // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
111 lap.wrapping_add(self.one_lap)
112 };
113
114 // Try moving the tail.
115 match self.tail.compare_exchange_weak(
116 tail,
117 new_tail,
118 Ordering::SeqCst,
119 Ordering::Relaxed,
120 ) {
121 Ok(_) => {
122 // Write the value into the slot and update the stamp.
123 slot.value.with_mut(|slot| unsafe {
124 slot.write(MaybeUninit::new(value));
125 });
126 slot.stamp.store(tail + 1, Ordering::Release);
127 return Ok(());
128 }
129 Err(t) => {
130 tail = t;
131 }
132 }
133 } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
134 crate::full_fence();
135 let head = self.head.load(Ordering::Relaxed);
136
137 // If the head lags one lap behind the tail as well...
138 if head.wrapping_add(self.one_lap) == tail {
139 // ...then the queue is full.
140 return Err(PushError::Full(value));
141 }
142
143 // Loom complains if there isn't an explicit busy wait here.
144 #[cfg(loom)]
145 busy_wait();
146
147 tail = self.tail.load(Ordering::Relaxed);
148 } else {
149 // Yield because we need to wait for the stamp to get updated.
150 busy_wait();
151 tail = self.tail.load(Ordering::Relaxed);
152 }
153 }
154 }
155
156 /// Attempts to pop an item from the queue.
157 pub fn pop(&self) -> Result<T, PopError> {
158 let mut head = self.head.load(Ordering::Relaxed);
159
160 loop {
161 // Deconstruct the head.
162 let index = head & (self.mark_bit - 1);
163 let lap = head & !(self.one_lap - 1);
164
165 // Inspect the corresponding slot.
166 let slot = &self.buffer[index];
167 let stamp = slot.stamp.load(Ordering::Acquire);
168
169 // If the the stamp is ahead of the head by 1, we may attempt to pop.
170 if head + 1 == stamp {
171 let new = if index + 1 < self.buffer.len() {
172 // Same lap, incremented index.
173 // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
174 head + 1
175 } else {
176 // One lap forward, index wraps around to zero.
177 // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
178 lap.wrapping_add(self.one_lap)
179 };
180
181 // Try moving the head.
182 match self.head.compare_exchange_weak(
183 head,
184 new,
185 Ordering::SeqCst,
186 Ordering::Relaxed,
187 ) {
188 Ok(_) => {
189 // Read the value from the slot and update the stamp.
190 let value = slot
191 .value
192 .with_mut(|slot| unsafe { slot.read().assume_init() });
193 slot.stamp
194 .store(head.wrapping_add(self.one_lap), Ordering::Release);
195 return Ok(value);
196 }
197 Err(h) => {
198 head = h;
199 }
200 }
201 } else if stamp == head {
202 crate::full_fence();
203 let tail = self.tail.load(Ordering::Relaxed);
204
205 // If the tail equals the head, that means the queue is empty.
206 if (tail & !self.mark_bit) == head {
207 // Check if the queue is closed.
208 if tail & self.mark_bit != 0 {
209 return Err(PopError::Closed);
210 } else {
211 return Err(PopError::Empty);
212 }
213 }
214
215 // Loom complains if there isn't a busy-wait here.
216 #[cfg(loom)]
217 busy_wait();
218
219 head = self.head.load(Ordering::Relaxed);
220 } else {
221 // Yield because we need to wait for the stamp to get updated.
222 busy_wait();
223 head = self.head.load(Ordering::Relaxed);
224 }
225 }
226 }
227
228 /// Returns the number of items in the queue.
229 pub fn len(&self) -> usize {
230 loop {
231 // Load the tail, then load the head.
232 let tail = self.tail.load(Ordering::SeqCst);
233 let head = self.head.load(Ordering::SeqCst);
234
235 // If the tail didn't change, we've got consistent values to work with.
236 if self.tail.load(Ordering::SeqCst) == tail {
237 let hix = head & (self.mark_bit - 1);
238 let tix = tail & (self.mark_bit - 1);
239
240 return if hix < tix {
241 tix - hix
242 } else if hix > tix {
243 self.buffer.len() - hix + tix
244 } else if (tail & !self.mark_bit) == head {
245 0
246 } else {
247 self.buffer.len()
248 };
249 }
250 }
251 }
252
253 /// Returns `true` if the queue is empty.
254 pub fn is_empty(&self) -> bool {
255 let head = self.head.load(Ordering::SeqCst);
256 let tail = self.tail.load(Ordering::SeqCst);
257
258 // Is the tail equal to the head?
259 //
260 // Note: If the head changes just before we load the tail, that means there was a moment
261 // when the queue was not empty, so it is safe to just return `false`.
262 (tail & !self.mark_bit) == head
263 }
264
265 /// Returns `true` if the queue is full.
266 pub fn is_full(&self) -> bool {
267 let tail = self.tail.load(Ordering::SeqCst);
268 let head = self.head.load(Ordering::SeqCst);
269
270 // Is the head lagging one lap behind tail?
271 //
272 // Note: If the tail changes just before we load the head, that means there was a moment
273 // when the queue was not full, so it is safe to just return `false`.
274 head.wrapping_add(self.one_lap) == tail & !self.mark_bit
275 }
276
277 /// Returns the capacity of the queue.
278 pub fn capacity(&self) -> usize {
279 self.buffer.len()
280 }
281
282 /// Closes the queue.
283 ///
284 /// Returns `true` if this call closed the queue.
285 pub fn close(&self) -> bool {
286 let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
287 tail & self.mark_bit == 0
288 }
289
290 /// Returns `true` if the queue is closed.
291 pub fn is_closed(&self) -> bool {
292 self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
293 }
294}
295
296impl<T> Drop for Bounded<T> {
297 fn drop(&mut self) {
298 // Get the index of the head.
299 let Self {
300 head,
301 tail,
302 buffer,
303 mark_bit,
304 ..
305 } = self;
306
307 let mark_bit = *mark_bit;
308
309 head.with_mut(|&mut head| {
310 tail.with_mut(|&mut tail| {
311 let hix = head & (mark_bit - 1);
312 let tix = tail & (mark_bit - 1);
313
314 let len = if hix < tix {
315 tix - hix
316 } else if hix > tix {
317 buffer.len() - hix + tix
318 } else if (tail & !mark_bit) == head {
319 0
320 } else {
321 buffer.len()
322 };
323
324 // Loop over all slots that hold a value and drop them.
325 for i in 0..len {
326 // Compute the index of the next slot holding a value.
327 let index = if hix + i < buffer.len() {
328 hix + i
329 } else {
330 hix + i - buffer.len()
331 };
332
333 // Drop the value in the slot.
334 let slot = &buffer[index];
335 slot.value.with_mut(|slot| unsafe {
336 let value = &mut *slot;
337 value.as_mut_ptr().drop_in_place();
338 });
339 }
340 });
341 });
342 }
343}
344