1 | use alloc::{boxed::Box, vec::Vec}; |
2 | use core::mem::MaybeUninit; |
3 | |
4 | use crossbeam_utils::CachePadded; |
5 | |
6 | use crate::sync::atomic::{AtomicUsize, Ordering}; |
7 | use crate::sync::cell::UnsafeCell; |
8 | #[allow (unused_imports)] |
9 | use crate::sync::prelude::*; |
10 | use crate::{busy_wait, PopError, PushError}; |
11 | |
12 | /// A slot in a queue. |
13 | struct Slot<T> { |
14 | /// The current stamp. |
15 | stamp: AtomicUsize, |
16 | |
17 | /// The value in this slot. |
18 | value: UnsafeCell<MaybeUninit<T>>, |
19 | } |
20 | |
21 | /// A bounded queue. |
22 | pub struct Bounded<T> { |
23 | /// The head of the queue. |
24 | /// |
25 | /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but |
26 | /// packed into a single `usize`. The lower bits represent the index, while the upper bits |
27 | /// represent the lap. The mark bit in the head is always zero. |
28 | /// |
29 | /// Values are popped from the head of the queue. |
30 | head: CachePadded<AtomicUsize>, |
31 | |
32 | /// The tail of the queue. |
33 | /// |
34 | /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but |
35 | /// packed into a single `usize`. The lower bits represent the index, while the upper bits |
36 | /// represent the lap. The mark bit indicates that the queue is closed. |
37 | /// |
38 | /// Values are pushed into the tail of the queue. |
39 | tail: CachePadded<AtomicUsize>, |
40 | |
41 | /// The buffer holding slots. |
42 | buffer: Box<[Slot<T>]>, |
43 | |
44 | /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`. |
45 | one_lap: usize, |
46 | |
47 | /// If this bit is set in the tail, that means the queue is closed. |
48 | mark_bit: usize, |
49 | } |
50 | |
51 | impl<T> Bounded<T> { |
52 | /// Creates a new bounded queue. |
53 | pub fn new(cap: usize) -> Bounded<T> { |
54 | assert!(cap > 0, "capacity must be positive" ); |
55 | |
56 | // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`. |
57 | let head = 0; |
58 | // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`. |
59 | let tail = 0; |
60 | |
61 | // Allocate a buffer of `cap` slots initialized with stamps. |
62 | let mut buffer = Vec::with_capacity(cap); |
63 | for i in 0..cap { |
64 | // Set the stamp to `{ lap: 0, mark: 0, index: i }`. |
65 | buffer.push(Slot { |
66 | stamp: AtomicUsize::new(i), |
67 | value: UnsafeCell::new(MaybeUninit::uninit()), |
68 | }); |
69 | } |
70 | |
71 | // Compute constants `mark_bit` and `one_lap`. |
72 | let mark_bit = (cap + 1).next_power_of_two(); |
73 | let one_lap = mark_bit * 2; |
74 | |
75 | Bounded { |
76 | buffer: buffer.into(), |
77 | one_lap, |
78 | mark_bit, |
79 | head: CachePadded::new(AtomicUsize::new(head)), |
80 | tail: CachePadded::new(AtomicUsize::new(tail)), |
81 | } |
82 | } |
83 | |
84 | /// Attempts to push an item into the queue. |
85 | pub fn push(&self, value: T) -> Result<(), PushError<T>> { |
86 | let mut tail = self.tail.load(Ordering::Relaxed); |
87 | |
88 | loop { |
89 | // Check if the queue is closed. |
90 | if tail & self.mark_bit != 0 { |
91 | return Err(PushError::Closed(value)); |
92 | } |
93 | |
94 | // Deconstruct the tail. |
95 | let index = tail & (self.mark_bit - 1); |
96 | let lap = tail & !(self.one_lap - 1); |
97 | |
98 | // Inspect the corresponding slot. |
99 | let slot = &self.buffer[index]; |
100 | let stamp = slot.stamp.load(Ordering::Acquire); |
101 | |
102 | // If the tail and the stamp match, we may attempt to push. |
103 | if tail == stamp { |
104 | let new_tail = if index + 1 < self.buffer.len() { |
105 | // Same lap, incremented index. |
106 | // Set to `{ lap: lap, mark: 0, index: index + 1 }`. |
107 | tail + 1 |
108 | } else { |
109 | // One lap forward, index wraps around to zero. |
110 | // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. |
111 | lap.wrapping_add(self.one_lap) |
112 | }; |
113 | |
114 | // Try moving the tail. |
115 | match self.tail.compare_exchange_weak( |
116 | tail, |
117 | new_tail, |
118 | Ordering::SeqCst, |
119 | Ordering::Relaxed, |
120 | ) { |
121 | Ok(_) => { |
122 | // Write the value into the slot and update the stamp. |
123 | slot.value.with_mut(|slot| unsafe { |
124 | slot.write(MaybeUninit::new(value)); |
125 | }); |
126 | slot.stamp.store(tail + 1, Ordering::Release); |
127 | return Ok(()); |
128 | } |
129 | Err(t) => { |
130 | tail = t; |
131 | } |
132 | } |
133 | } else if stamp.wrapping_add(self.one_lap) == tail + 1 { |
134 | crate::full_fence(); |
135 | let head = self.head.load(Ordering::Relaxed); |
136 | |
137 | // If the head lags one lap behind the tail as well... |
138 | if head.wrapping_add(self.one_lap) == tail { |
139 | // ...then the queue is full. |
140 | return Err(PushError::Full(value)); |
141 | } |
142 | |
143 | // Loom complains if there isn't an explicit busy wait here. |
144 | #[cfg (loom)] |
145 | busy_wait(); |
146 | |
147 | tail = self.tail.load(Ordering::Relaxed); |
148 | } else { |
149 | // Yield because we need to wait for the stamp to get updated. |
150 | busy_wait(); |
151 | tail = self.tail.load(Ordering::Relaxed); |
152 | } |
153 | } |
154 | } |
155 | |
156 | /// Attempts to pop an item from the queue. |
157 | pub fn pop(&self) -> Result<T, PopError> { |
158 | let mut head = self.head.load(Ordering::Relaxed); |
159 | |
160 | loop { |
161 | // Deconstruct the head. |
162 | let index = head & (self.mark_bit - 1); |
163 | let lap = head & !(self.one_lap - 1); |
164 | |
165 | // Inspect the corresponding slot. |
166 | let slot = &self.buffer[index]; |
167 | let stamp = slot.stamp.load(Ordering::Acquire); |
168 | |
169 | // If the the stamp is ahead of the head by 1, we may attempt to pop. |
170 | if head + 1 == stamp { |
171 | let new = if index + 1 < self.buffer.len() { |
172 | // Same lap, incremented index. |
173 | // Set to `{ lap: lap, mark: 0, index: index + 1 }`. |
174 | head + 1 |
175 | } else { |
176 | // One lap forward, index wraps around to zero. |
177 | // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. |
178 | lap.wrapping_add(self.one_lap) |
179 | }; |
180 | |
181 | // Try moving the head. |
182 | match self.head.compare_exchange_weak( |
183 | head, |
184 | new, |
185 | Ordering::SeqCst, |
186 | Ordering::Relaxed, |
187 | ) { |
188 | Ok(_) => { |
189 | // Read the value from the slot and update the stamp. |
190 | let value = slot |
191 | .value |
192 | .with_mut(|slot| unsafe { slot.read().assume_init() }); |
193 | slot.stamp |
194 | .store(head.wrapping_add(self.one_lap), Ordering::Release); |
195 | return Ok(value); |
196 | } |
197 | Err(h) => { |
198 | head = h; |
199 | } |
200 | } |
201 | } else if stamp == head { |
202 | crate::full_fence(); |
203 | let tail = self.tail.load(Ordering::Relaxed); |
204 | |
205 | // If the tail equals the head, that means the queue is empty. |
206 | if (tail & !self.mark_bit) == head { |
207 | // Check if the queue is closed. |
208 | if tail & self.mark_bit != 0 { |
209 | return Err(PopError::Closed); |
210 | } else { |
211 | return Err(PopError::Empty); |
212 | } |
213 | } |
214 | |
215 | // Loom complains if there isn't a busy-wait here. |
216 | #[cfg (loom)] |
217 | busy_wait(); |
218 | |
219 | head = self.head.load(Ordering::Relaxed); |
220 | } else { |
221 | // Yield because we need to wait for the stamp to get updated. |
222 | busy_wait(); |
223 | head = self.head.load(Ordering::Relaxed); |
224 | } |
225 | } |
226 | } |
227 | |
228 | /// Returns the number of items in the queue. |
229 | pub fn len(&self) -> usize { |
230 | loop { |
231 | // Load the tail, then load the head. |
232 | let tail = self.tail.load(Ordering::SeqCst); |
233 | let head = self.head.load(Ordering::SeqCst); |
234 | |
235 | // If the tail didn't change, we've got consistent values to work with. |
236 | if self.tail.load(Ordering::SeqCst) == tail { |
237 | let hix = head & (self.mark_bit - 1); |
238 | let tix = tail & (self.mark_bit - 1); |
239 | |
240 | return if hix < tix { |
241 | tix - hix |
242 | } else if hix > tix { |
243 | self.buffer.len() - hix + tix |
244 | } else if (tail & !self.mark_bit) == head { |
245 | 0 |
246 | } else { |
247 | self.buffer.len() |
248 | }; |
249 | } |
250 | } |
251 | } |
252 | |
253 | /// Returns `true` if the queue is empty. |
254 | pub fn is_empty(&self) -> bool { |
255 | let head = self.head.load(Ordering::SeqCst); |
256 | let tail = self.tail.load(Ordering::SeqCst); |
257 | |
258 | // Is the tail equal to the head? |
259 | // |
260 | // Note: If the head changes just before we load the tail, that means there was a moment |
261 | // when the queue was not empty, so it is safe to just return `false`. |
262 | (tail & !self.mark_bit) == head |
263 | } |
264 | |
265 | /// Returns `true` if the queue is full. |
266 | pub fn is_full(&self) -> bool { |
267 | let tail = self.tail.load(Ordering::SeqCst); |
268 | let head = self.head.load(Ordering::SeqCst); |
269 | |
270 | // Is the head lagging one lap behind tail? |
271 | // |
272 | // Note: If the tail changes just before we load the head, that means there was a moment |
273 | // when the queue was not full, so it is safe to just return `false`. |
274 | head.wrapping_add(self.one_lap) == tail & !self.mark_bit |
275 | } |
276 | |
277 | /// Returns the capacity of the queue. |
278 | pub fn capacity(&self) -> usize { |
279 | self.buffer.len() |
280 | } |
281 | |
282 | /// Closes the queue. |
283 | /// |
284 | /// Returns `true` if this call closed the queue. |
285 | pub fn close(&self) -> bool { |
286 | let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst); |
287 | tail & self.mark_bit == 0 |
288 | } |
289 | |
290 | /// Returns `true` if the queue is closed. |
291 | pub fn is_closed(&self) -> bool { |
292 | self.tail.load(Ordering::SeqCst) & self.mark_bit != 0 |
293 | } |
294 | } |
295 | |
296 | impl<T> Drop for Bounded<T> { |
297 | fn drop(&mut self) { |
298 | // Get the index of the head. |
299 | let Self { |
300 | head, |
301 | tail, |
302 | buffer, |
303 | mark_bit, |
304 | .. |
305 | } = self; |
306 | |
307 | let mark_bit = *mark_bit; |
308 | |
309 | head.with_mut(|&mut head| { |
310 | tail.with_mut(|&mut tail| { |
311 | let hix = head & (mark_bit - 1); |
312 | let tix = tail & (mark_bit - 1); |
313 | |
314 | let len = if hix < tix { |
315 | tix - hix |
316 | } else if hix > tix { |
317 | buffer.len() - hix + tix |
318 | } else if (tail & !mark_bit) == head { |
319 | 0 |
320 | } else { |
321 | buffer.len() |
322 | }; |
323 | |
324 | // Loop over all slots that hold a value and drop them. |
325 | for i in 0..len { |
326 | // Compute the index of the next slot holding a value. |
327 | let index = if hix + i < buffer.len() { |
328 | hix + i |
329 | } else { |
330 | hix + i - buffer.len() |
331 | }; |
332 | |
333 | // Drop the value in the slot. |
334 | let slot = &buffer[index]; |
335 | slot.value.with_mut(|slot| unsafe { |
336 | let value = &mut *slot; |
337 | value.as_mut_ptr().drop_in_place(); |
338 | }); |
339 | } |
340 | }); |
341 | }); |
342 | } |
343 | } |
344 | |