1 | use alloc::{boxed::Box, vec::Vec}; |
2 | use core::mem::MaybeUninit; |
3 | |
4 | use crossbeam_utils::CachePadded; |
5 | |
6 | use crate::sync::atomic::{AtomicUsize, Ordering}; |
7 | use crate::sync::cell::UnsafeCell; |
8 | #[allow (unused_imports)] |
9 | use crate::sync::prelude::*; |
10 | use crate::{busy_wait, ForcePushError, PopError, PushError}; |
11 | |
12 | /// A slot in a queue. |
13 | struct Slot<T> { |
14 | /// The current stamp. |
15 | stamp: AtomicUsize, |
16 | |
17 | /// The value in this slot. |
18 | value: UnsafeCell<MaybeUninit<T>>, |
19 | } |
20 | |
21 | /// A bounded queue. |
22 | pub struct Bounded<T> { |
23 | /// The head of the queue. |
24 | /// |
25 | /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but |
26 | /// packed into a single `usize`. The lower bits represent the index, while the upper bits |
27 | /// represent the lap. The mark bit in the head is always zero. |
28 | /// |
29 | /// Values are popped from the head of the queue. |
30 | head: CachePadded<AtomicUsize>, |
31 | |
32 | /// The tail of the queue. |
33 | /// |
34 | /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but |
35 | /// packed into a single `usize`. The lower bits represent the index, while the upper bits |
36 | /// represent the lap. The mark bit indicates that the queue is closed. |
37 | /// |
38 | /// Values are pushed into the tail of the queue. |
39 | tail: CachePadded<AtomicUsize>, |
40 | |
41 | /// The buffer holding slots. |
42 | buffer: Box<[Slot<T>]>, |
43 | |
44 | /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`. |
45 | one_lap: usize, |
46 | |
47 | /// If this bit is set in the tail, that means the queue is closed. |
48 | mark_bit: usize, |
49 | } |
50 | |
51 | impl<T> Bounded<T> { |
52 | /// Creates a new bounded queue. |
53 | pub fn new(cap: usize) -> Bounded<T> { |
54 | assert!(cap > 0, "capacity must be positive" ); |
55 | |
56 | // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`. |
57 | let head = 0; |
58 | // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`. |
59 | let tail = 0; |
60 | |
61 | // Allocate a buffer of `cap` slots initialized with stamps. |
62 | let mut buffer = Vec::with_capacity(cap); |
63 | for i in 0..cap { |
64 | // Set the stamp to `{ lap: 0, mark: 0, index: i }`. |
65 | buffer.push(Slot { |
66 | stamp: AtomicUsize::new(i), |
67 | value: UnsafeCell::new(MaybeUninit::uninit()), |
68 | }); |
69 | } |
70 | |
71 | // Compute constants `mark_bit` and `one_lap`. |
72 | let mark_bit = (cap + 1).next_power_of_two(); |
73 | let one_lap = mark_bit * 2; |
74 | |
75 | Bounded { |
76 | buffer: buffer.into(), |
77 | one_lap, |
78 | mark_bit, |
79 | head: CachePadded::new(AtomicUsize::new(head)), |
80 | tail: CachePadded::new(AtomicUsize::new(tail)), |
81 | } |
82 | } |
83 | |
84 | /// Attempts to push an item into the queue. |
85 | pub fn push(&self, value: T) -> Result<(), PushError<T>> { |
86 | self.push_or_else(value, |value, tail, _, _| { |
87 | let head = self.head.load(Ordering::Relaxed); |
88 | |
89 | // If the head lags one lap behind the tail as well... |
90 | if head.wrapping_add(self.one_lap) == tail { |
91 | // ...then the queue is full. |
92 | Err(PushError::Full(value)) |
93 | } else { |
94 | Ok(value) |
95 | } |
96 | }) |
97 | } |
98 | |
99 | /// Pushes an item into the queue, displacing another item if needed. |
100 | pub fn force_push(&self, value: T) -> Result<Option<T>, ForcePushError<T>> { |
101 | let result = self.push_or_else(value, |value, tail, new_tail, slot| { |
102 | let head = tail.wrapping_sub(self.one_lap); |
103 | let new_head = new_tail.wrapping_sub(self.one_lap); |
104 | |
105 | // Try to move the head. |
106 | if self |
107 | .head |
108 | .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Relaxed) |
109 | .is_ok() |
110 | { |
111 | // Move the tail. |
112 | self.tail.store(new_tail, Ordering::SeqCst); |
113 | |
114 | // Swap out the old value. |
115 | // SAFETY: We know this is initialized, since it's covered by the current queue. |
116 | let old = unsafe { |
117 | slot.value |
118 | .with_mut(|slot| slot.replace(MaybeUninit::new(value)).assume_init()) |
119 | }; |
120 | |
121 | // Update the stamp. |
122 | slot.stamp.store(tail + 1, Ordering::Release); |
123 | |
124 | // Return a PushError. |
125 | Err(PushError::Full(old)) |
126 | } else { |
127 | Ok(value) |
128 | } |
129 | }); |
130 | |
131 | match result { |
132 | Ok(()) => Ok(None), |
133 | Err(PushError::Full(old_value)) => Ok(Some(old_value)), |
134 | Err(PushError::Closed(value)) => Err(ForcePushError(value)), |
135 | } |
136 | } |
137 | |
138 | /// Attempts to push an item into the queue, running a closure on failure. |
139 | /// |
140 | /// `fail` is run when there is no more room left in the tail of the queue. The parameters of |
141 | /// this function are as follows: |
142 | /// |
143 | /// - The item that failed to push. |
144 | /// - The value of `self.tail` before the new value would be inserted. |
145 | /// - The value of `self.tail` after the new value would be inserted. |
146 | /// - The slot that we attempted to push into. |
147 | /// |
148 | /// If `fail` returns `Ok(val)`, we will try pushing `val` to the head of the queue. Otherwise, |
149 | /// this function will return the error. |
150 | fn push_or_else<F>(&self, mut value: T, mut fail: F) -> Result<(), PushError<T>> |
151 | where |
152 | F: FnMut(T, usize, usize, &Slot<T>) -> Result<T, PushError<T>>, |
153 | { |
154 | let mut tail = self.tail.load(Ordering::Relaxed); |
155 | |
156 | loop { |
157 | // Check if the queue is closed. |
158 | if tail & self.mark_bit != 0 { |
159 | return Err(PushError::Closed(value)); |
160 | } |
161 | |
162 | // Deconstruct the tail. |
163 | let index = tail & (self.mark_bit - 1); |
164 | let lap = tail & !(self.one_lap - 1); |
165 | |
166 | // Calculate the new location of the tail. |
167 | let new_tail = if index + 1 < self.buffer.len() { |
168 | // Same lap, incremented index. |
169 | // Set to `{ lap: lap, mark: 0, index: index + 1 }`. |
170 | tail + 1 |
171 | } else { |
172 | // One lap forward, index wraps around to zero. |
173 | // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. |
174 | lap.wrapping_add(self.one_lap) |
175 | }; |
176 | |
177 | // Inspect the corresponding slot. |
178 | let slot = &self.buffer[index]; |
179 | let stamp = slot.stamp.load(Ordering::Acquire); |
180 | |
181 | // If the tail and the stamp match, we may attempt to push. |
182 | if tail == stamp { |
183 | // Try moving the tail. |
184 | match self.tail.compare_exchange_weak( |
185 | tail, |
186 | new_tail, |
187 | Ordering::SeqCst, |
188 | Ordering::Relaxed, |
189 | ) { |
190 | Ok(_) => { |
191 | // Write the value into the slot and update the stamp. |
192 | slot.value.with_mut(|slot| unsafe { |
193 | slot.write(MaybeUninit::new(value)); |
194 | }); |
195 | slot.stamp.store(tail + 1, Ordering::Release); |
196 | return Ok(()); |
197 | } |
198 | Err(t) => { |
199 | tail = t; |
200 | } |
201 | } |
202 | } else if stamp.wrapping_add(self.one_lap) == tail + 1 { |
203 | crate::full_fence(); |
204 | |
205 | // We've failed to push; run our failure closure. |
206 | value = fail(value, tail, new_tail, slot)?; |
207 | |
208 | // Loom complains if there isn't an explicit busy wait here. |
209 | #[cfg (loom)] |
210 | busy_wait(); |
211 | |
212 | tail = self.tail.load(Ordering::Relaxed); |
213 | } else { |
214 | // Yield because we need to wait for the stamp to get updated. |
215 | busy_wait(); |
216 | tail = self.tail.load(Ordering::Relaxed); |
217 | } |
218 | } |
219 | } |
220 | |
221 | /// Attempts to pop an item from the queue. |
222 | pub fn pop(&self) -> Result<T, PopError> { |
223 | let mut head = self.head.load(Ordering::Relaxed); |
224 | |
225 | loop { |
226 | // Deconstruct the head. |
227 | let index = head & (self.mark_bit - 1); |
228 | let lap = head & !(self.one_lap - 1); |
229 | |
230 | // Inspect the corresponding slot. |
231 | let slot = &self.buffer[index]; |
232 | let stamp = slot.stamp.load(Ordering::Acquire); |
233 | |
234 | // If the the stamp is ahead of the head by 1, we may attempt to pop. |
235 | if head + 1 == stamp { |
236 | let new = if index + 1 < self.buffer.len() { |
237 | // Same lap, incremented index. |
238 | // Set to `{ lap: lap, mark: 0, index: index + 1 }`. |
239 | head + 1 |
240 | } else { |
241 | // One lap forward, index wraps around to zero. |
242 | // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. |
243 | lap.wrapping_add(self.one_lap) |
244 | }; |
245 | |
246 | // Try moving the head. |
247 | match self.head.compare_exchange_weak( |
248 | head, |
249 | new, |
250 | Ordering::SeqCst, |
251 | Ordering::Relaxed, |
252 | ) { |
253 | Ok(_) => { |
254 | // Read the value from the slot and update the stamp. |
255 | let value = slot |
256 | .value |
257 | .with_mut(|slot| unsafe { slot.read().assume_init() }); |
258 | slot.stamp |
259 | .store(head.wrapping_add(self.one_lap), Ordering::Release); |
260 | return Ok(value); |
261 | } |
262 | Err(h) => { |
263 | head = h; |
264 | } |
265 | } |
266 | } else if stamp == head { |
267 | crate::full_fence(); |
268 | let tail = self.tail.load(Ordering::Relaxed); |
269 | |
270 | // If the tail equals the head, that means the queue is empty. |
271 | if (tail & !self.mark_bit) == head { |
272 | // Check if the queue is closed. |
273 | if tail & self.mark_bit != 0 { |
274 | return Err(PopError::Closed); |
275 | } else { |
276 | return Err(PopError::Empty); |
277 | } |
278 | } |
279 | |
280 | // Loom complains if there isn't a busy-wait here. |
281 | #[cfg (loom)] |
282 | busy_wait(); |
283 | |
284 | head = self.head.load(Ordering::Relaxed); |
285 | } else { |
286 | // Yield because we need to wait for the stamp to get updated. |
287 | busy_wait(); |
288 | head = self.head.load(Ordering::Relaxed); |
289 | } |
290 | } |
291 | } |
292 | |
293 | /// Returns the number of items in the queue. |
294 | pub fn len(&self) -> usize { |
295 | loop { |
296 | // Load the tail, then load the head. |
297 | let tail = self.tail.load(Ordering::SeqCst); |
298 | let head = self.head.load(Ordering::SeqCst); |
299 | |
300 | // If the tail didn't change, we've got consistent values to work with. |
301 | if self.tail.load(Ordering::SeqCst) == tail { |
302 | let hix = head & (self.mark_bit - 1); |
303 | let tix = tail & (self.mark_bit - 1); |
304 | |
305 | return if hix < tix { |
306 | tix - hix |
307 | } else if hix > tix { |
308 | self.buffer.len() - hix + tix |
309 | } else if (tail & !self.mark_bit) == head { |
310 | 0 |
311 | } else { |
312 | self.buffer.len() |
313 | }; |
314 | } |
315 | } |
316 | } |
317 | |
318 | /// Returns `true` if the queue is empty. |
319 | pub fn is_empty(&self) -> bool { |
320 | let head = self.head.load(Ordering::SeqCst); |
321 | let tail = self.tail.load(Ordering::SeqCst); |
322 | |
323 | // Is the tail equal to the head? |
324 | // |
325 | // Note: If the head changes just before we load the tail, that means there was a moment |
326 | // when the queue was not empty, so it is safe to just return `false`. |
327 | (tail & !self.mark_bit) == head |
328 | } |
329 | |
330 | /// Returns `true` if the queue is full. |
331 | pub fn is_full(&self) -> bool { |
332 | let tail = self.tail.load(Ordering::SeqCst); |
333 | let head = self.head.load(Ordering::SeqCst); |
334 | |
335 | // Is the head lagging one lap behind tail? |
336 | // |
337 | // Note: If the tail changes just before we load the head, that means there was a moment |
338 | // when the queue was not full, so it is safe to just return `false`. |
339 | head.wrapping_add(self.one_lap) == tail & !self.mark_bit |
340 | } |
341 | |
342 | /// Returns the capacity of the queue. |
343 | pub fn capacity(&self) -> usize { |
344 | self.buffer.len() |
345 | } |
346 | |
347 | /// Closes the queue. |
348 | /// |
349 | /// Returns `true` if this call closed the queue. |
350 | pub fn close(&self) -> bool { |
351 | let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst); |
352 | tail & self.mark_bit == 0 |
353 | } |
354 | |
355 | /// Returns `true` if the queue is closed. |
356 | pub fn is_closed(&self) -> bool { |
357 | self.tail.load(Ordering::SeqCst) & self.mark_bit != 0 |
358 | } |
359 | } |
360 | |
361 | impl<T> Drop for Bounded<T> { |
362 | fn drop(&mut self) { |
363 | // Get the index of the head. |
364 | let Self { |
365 | head, |
366 | tail, |
367 | buffer, |
368 | mark_bit, |
369 | .. |
370 | } = self; |
371 | |
372 | let mark_bit = *mark_bit; |
373 | |
374 | head.with_mut(|&mut head| { |
375 | tail.with_mut(|&mut tail| { |
376 | let hix = head & (mark_bit - 1); |
377 | let tix = tail & (mark_bit - 1); |
378 | |
379 | let len = if hix < tix { |
380 | tix - hix |
381 | } else if hix > tix { |
382 | buffer.len() - hix + tix |
383 | } else if (tail & !mark_bit) == head { |
384 | 0 |
385 | } else { |
386 | buffer.len() |
387 | }; |
388 | |
389 | // Loop over all slots that hold a value and drop them. |
390 | for i in 0..len { |
391 | // Compute the index of the next slot holding a value. |
392 | let index = if hix + i < buffer.len() { |
393 | hix + i |
394 | } else { |
395 | hix + i - buffer.len() |
396 | }; |
397 | |
398 | // Drop the value in the slot. |
399 | let slot = &buffer[index]; |
400 | slot.value.with_mut(|slot| unsafe { |
401 | let value = &mut *slot; |
402 | value.as_mut_ptr().drop_in_place(); |
403 | }); |
404 | } |
405 | }); |
406 | }); |
407 | } |
408 | } |
409 | |