1//! A concurrent multi-producer multi-consumer queue.
2//!
3//! There are two kinds of queues:
4//!
5//! 1. [Bounded] queue with limited capacity.
6//! 2. [Unbounded] queue with unlimited capacity.
7//!
8//! Queues also have the capability to get [closed] at any point. When closed, no more items can be
9//! pushed into the queue, although the remaining items can still be popped.
10//!
11//! These features make it easy to build channels similar to [`std::sync::mpsc`] on top of this
12//! crate.
13//!
14//! # Examples
15//!
16//! ```
17//! use concurrent_queue::ConcurrentQueue;
18//!
19//! let q = ConcurrentQueue::unbounded();
20//! q.push(1).unwrap();
21//! q.push(2).unwrap();
22//!
23//! assert_eq!(q.pop(), Ok(1));
24//! assert_eq!(q.pop(), Ok(2));
25//! ```
26//!
27//! # Features
28//!
29//! `concurrent-queue` uses an `std` default feature. With this feature enabled, this crate will
30//! use [`std::thread::yield_now`] to avoid busy waiting in tight loops. However, with this
31//! feature disabled, [`core::hint::spin_loop`] will be used instead. Disabling `std` will allow
32//! this crate to be used on `no_std` platforms at the potential expense of more busy waiting.
33//!
34//! There is also a `portable-atomic` feature, which uses a polyfill from the
35//! [`portable-atomic`] crate to provide atomic operations on platforms that do not support them.
36//! See the [`README`] for the [`portable-atomic`] crate for more information on how to use it.
37//! Note that even with this feature enabled, `concurrent-queue` still requires a global allocator
38//! to be available. See the documentation for the [`std::alloc::GlobalAlloc`] trait for more
39//! information.
40//!
41//! [Bounded]: `ConcurrentQueue::bounded()`
42//! [Unbounded]: `ConcurrentQueue::unbounded()`
43//! [closed]: `ConcurrentQueue::close()`
44//! [`portable-atomic`]: https://crates.io/crates/portable-atomic
45//! [`README`]: https://github.com/taiki-e/portable-atomic/blob/main/README.md#optional-cfg
46
47#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
48#![no_std]
49#![doc(
50 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
51)]
52#![doc(
53 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
54)]
55
56extern crate alloc;
57#[cfg(feature = "std")]
58extern crate std;
59
60use core::fmt;
61use core::panic::{RefUnwindSafe, UnwindSafe};
62use sync::atomic::{self, Ordering};
63
64#[cfg(feature = "std")]
65use std::error;
66
67use crate::bounded::Bounded;
68use crate::single::Single;
69use crate::sync::busy_wait;
70use crate::unbounded::Unbounded;
71
72mod bounded;
73mod single;
74mod unbounded;
75
76mod sync;
77
78/// A concurrent queue.
79///
80/// # Examples
81///
82/// ```
83/// use concurrent_queue::{ConcurrentQueue, PopError, PushError};
84///
85/// let q = ConcurrentQueue::bounded(2);
86///
87/// assert_eq!(q.push('a'), Ok(()));
88/// assert_eq!(q.push('b'), Ok(()));
89/// assert_eq!(q.push('c'), Err(PushError::Full('c')));
90///
91/// assert_eq!(q.pop(), Ok('a'));
92/// assert_eq!(q.pop(), Ok('b'));
93/// assert_eq!(q.pop(), Err(PopError::Empty));
94/// ```
95pub struct ConcurrentQueue<T>(Inner<T>);
96
97unsafe impl<T: Send> Send for ConcurrentQueue<T> {}
98unsafe impl<T: Send> Sync for ConcurrentQueue<T> {}
99
100impl<T> UnwindSafe for ConcurrentQueue<T> {}
101impl<T> RefUnwindSafe for ConcurrentQueue<T> {}
102
103#[allow(clippy::large_enum_variant)]
104enum Inner<T> {
105 Single(Single<T>),
106 Bounded(Bounded<T>),
107 Unbounded(Unbounded<T>),
108}
109
110impl<T> ConcurrentQueue<T> {
111 /// Creates a new bounded queue.
112 ///
113 /// The queue allocates enough space for `cap` items.
114 ///
115 /// # Panics
116 ///
117 /// If the capacity is zero, this constructor will panic.
118 ///
119 /// # Examples
120 ///
121 /// ```
122 /// use concurrent_queue::ConcurrentQueue;
123 ///
124 /// let q = ConcurrentQueue::<i32>::bounded(100);
125 /// ```
126 pub fn bounded(cap: usize) -> ConcurrentQueue<T> {
127 if cap == 1 {
128 ConcurrentQueue(Inner::Single(Single::new()))
129 } else {
130 ConcurrentQueue(Inner::Bounded(Bounded::new(cap)))
131 }
132 }
133
134 /// Creates a new unbounded queue.
135 ///
136 /// # Examples
137 ///
138 /// ```
139 /// use concurrent_queue::ConcurrentQueue;
140 ///
141 /// let q = ConcurrentQueue::<i32>::unbounded();
142 /// ```
143 pub fn unbounded() -> ConcurrentQueue<T> {
144 ConcurrentQueue(Inner::Unbounded(Unbounded::new()))
145 }
146
147 /// Attempts to push an item into the queue.
148 ///
149 /// If the queue is full or closed, the item is returned back as an error.
150 ///
151 /// # Examples
152 ///
153 /// ```
154 /// use concurrent_queue::{ConcurrentQueue, PushError};
155 ///
156 /// let q = ConcurrentQueue::bounded(1);
157 ///
158 /// // Push succeeds because there is space in the queue.
159 /// assert_eq!(q.push(10), Ok(()));
160 ///
161 /// // Push errors because the queue is now full.
162 /// assert_eq!(q.push(20), Err(PushError::Full(20)));
163 ///
164 /// // Close the queue, which will prevent further pushes.
165 /// q.close();
166 ///
167 /// // Pushing now errors indicating the queue is closed.
168 /// assert_eq!(q.push(20), Err(PushError::Closed(20)));
169 ///
170 /// // Pop the single item in the queue.
171 /// assert_eq!(q.pop(), Ok(10));
172 ///
173 /// // Even though there is space, no more items can be pushed.
174 /// assert_eq!(q.push(20), Err(PushError::Closed(20)));
175 /// ```
176 pub fn push(&self, value: T) -> Result<(), PushError<T>> {
177 match &self.0 {
178 Inner::Single(q) => q.push(value),
179 Inner::Bounded(q) => q.push(value),
180 Inner::Unbounded(q) => q.push(value),
181 }
182 }
183
184 /// Attempts to pop an item from the queue.
185 ///
186 /// If the queue is empty, an error is returned.
187 ///
188 /// # Examples
189 ///
190 /// ```
191 /// use concurrent_queue::{ConcurrentQueue, PopError};
192 ///
193 /// let q = ConcurrentQueue::bounded(1);
194 ///
195 /// // Pop errors when the queue is empty.
196 /// assert_eq!(q.pop(), Err(PopError::Empty));
197 ///
198 /// // Push one item and close the queue.
199 /// assert_eq!(q.push(10), Ok(()));
200 /// q.close();
201 ///
202 /// // Remaining items can be popped.
203 /// assert_eq!(q.pop(), Ok(10));
204 ///
205 /// // Again, pop errors when the queue is empty,
206 /// // but now also indicates that the queue is closed.
207 /// assert_eq!(q.pop(), Err(PopError::Closed));
208 /// ```
209 pub fn pop(&self) -> Result<T, PopError> {
210 match &self.0 {
211 Inner::Single(q) => q.pop(),
212 Inner::Bounded(q) => q.pop(),
213 Inner::Unbounded(q) => q.pop(),
214 }
215 }
216
217 /// Get an iterator over the items in the queue.
218 ///
219 /// The iterator will continue until the queue is empty or closed. It will never block;
220 /// if the queue is empty, the iterator will return `None`. If new items are pushed into
221 /// the queue, the iterator may return `Some` in the future after returning `None`.
222 ///
223 /// # Examples
224 ///
225 /// ```
226 /// use concurrent_queue::ConcurrentQueue;
227 ///
228 /// let q = ConcurrentQueue::bounded(5);
229 /// q.push(1).unwrap();
230 /// q.push(2).unwrap();
231 /// q.push(3).unwrap();
232 ///
233 /// let mut iter = q.try_iter();
234 /// assert_eq!(iter.by_ref().sum::<i32>(), 6);
235 /// assert_eq!(iter.next(), None);
236 ///
237 /// // Pushing more items will make them available to the iterator.
238 /// q.push(4).unwrap();
239 /// assert_eq!(iter.next(), Some(4));
240 /// assert_eq!(iter.next(), None);
241 /// ```
242 pub fn try_iter(&self) -> TryIter<'_, T> {
243 TryIter { queue: self }
244 }
245
246 /// Returns `true` if the queue is empty.
247 ///
248 /// # Examples
249 ///
250 /// ```
251 /// use concurrent_queue::ConcurrentQueue;
252 ///
253 /// let q = ConcurrentQueue::<i32>::unbounded();
254 ///
255 /// assert!(q.is_empty());
256 /// q.push(1).unwrap();
257 /// assert!(!q.is_empty());
258 /// ```
259 pub fn is_empty(&self) -> bool {
260 match &self.0 {
261 Inner::Single(q) => q.is_empty(),
262 Inner::Bounded(q) => q.is_empty(),
263 Inner::Unbounded(q) => q.is_empty(),
264 }
265 }
266
267 /// Returns `true` if the queue is full.
268 ///
269 /// An unbounded queue is never full.
270 ///
271 /// # Examples
272 ///
273 /// ```
274 /// use concurrent_queue::ConcurrentQueue;
275 ///
276 /// let q = ConcurrentQueue::bounded(1);
277 ///
278 /// assert!(!q.is_full());
279 /// q.push(1).unwrap();
280 /// assert!(q.is_full());
281 /// ```
282 pub fn is_full(&self) -> bool {
283 match &self.0 {
284 Inner::Single(q) => q.is_full(),
285 Inner::Bounded(q) => q.is_full(),
286 Inner::Unbounded(q) => q.is_full(),
287 }
288 }
289
290 /// Returns the number of items in the queue.
291 ///
292 /// # Examples
293 ///
294 /// ```
295 /// use concurrent_queue::ConcurrentQueue;
296 ///
297 /// let q = ConcurrentQueue::unbounded();
298 /// assert_eq!(q.len(), 0);
299 ///
300 /// assert_eq!(q.push(10), Ok(()));
301 /// assert_eq!(q.len(), 1);
302 ///
303 /// assert_eq!(q.push(20), Ok(()));
304 /// assert_eq!(q.len(), 2);
305 /// ```
306 pub fn len(&self) -> usize {
307 match &self.0 {
308 Inner::Single(q) => q.len(),
309 Inner::Bounded(q) => q.len(),
310 Inner::Unbounded(q) => q.len(),
311 }
312 }
313
314 /// Returns the capacity of the queue.
315 ///
316 /// Unbounded queues have infinite capacity, represented as [`None`].
317 ///
318 /// # Examples
319 ///
320 /// ```
321 /// use concurrent_queue::ConcurrentQueue;
322 ///
323 /// let q = ConcurrentQueue::<i32>::bounded(7);
324 /// assert_eq!(q.capacity(), Some(7));
325 ///
326 /// let q = ConcurrentQueue::<i32>::unbounded();
327 /// assert_eq!(q.capacity(), None);
328 /// ```
329 pub fn capacity(&self) -> Option<usize> {
330 match &self.0 {
331 Inner::Single(_) => Some(1),
332 Inner::Bounded(q) => Some(q.capacity()),
333 Inner::Unbounded(_) => None,
334 }
335 }
336
337 /// Closes the queue.
338 ///
339 /// Returns `true` if this call closed the queue, or `false` if it was already closed.
340 ///
341 /// When a queue is closed, no more items can be pushed but the remaining items can still be
342 /// popped.
343 ///
344 /// # Examples
345 ///
346 /// ```
347 /// use concurrent_queue::{ConcurrentQueue, PopError, PushError};
348 ///
349 /// let q = ConcurrentQueue::unbounded();
350 /// assert_eq!(q.push(10), Ok(()));
351 ///
352 /// assert!(q.close()); // `true` because this call closes the queue.
353 /// assert!(!q.close()); // `false` because the queue is already closed.
354 ///
355 /// // Cannot push any more items when closed.
356 /// assert_eq!(q.push(20), Err(PushError::Closed(20)));
357 ///
358 /// // Remaining items can still be popped.
359 /// assert_eq!(q.pop(), Ok(10));
360 ///
361 /// // When no more items are present, the error is `Closed`.
362 /// assert_eq!(q.pop(), Err(PopError::Closed));
363 /// ```
364 pub fn close(&self) -> bool {
365 match &self.0 {
366 Inner::Single(q) => q.close(),
367 Inner::Bounded(q) => q.close(),
368 Inner::Unbounded(q) => q.close(),
369 }
370 }
371
372 /// Returns `true` if the queue is closed.
373 ///
374 /// # Examples
375 ///
376 /// ```
377 /// use concurrent_queue::ConcurrentQueue;
378 ///
379 /// let q = ConcurrentQueue::<i32>::unbounded();
380 ///
381 /// assert!(!q.is_closed());
382 /// q.close();
383 /// assert!(q.is_closed());
384 /// ```
385 pub fn is_closed(&self) -> bool {
386 match &self.0 {
387 Inner::Single(q) => q.is_closed(),
388 Inner::Bounded(q) => q.is_closed(),
389 Inner::Unbounded(q) => q.is_closed(),
390 }
391 }
392}
393
394impl<T> fmt::Debug for ConcurrentQueue<T> {
395 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
396 f&mut DebugStruct<'_, '_>.debug_struct("ConcurrentQueue")
397 .field("len", &self.len())
398 .field("capacity", &self.capacity())
399 .field(name:"is_closed", &self.is_closed())
400 .finish()
401 }
402}
403
404/// An iterator that pops items from a [`ConcurrentQueue`].
405///
406/// This iterator will never block; it will return `None` once the queue has
407/// been exhausted. Calling `next` after `None` may yield `Some(item)` if more items
408/// are pushed to the queue.
409#[must_use = "iterators are lazy and do nothing unless consumed"]
410#[derive(Clone)]
411pub struct TryIter<'a, T> {
412 queue: &'a ConcurrentQueue<T>,
413}
414
415impl<T> fmt::Debug for TryIter<'_, T> {
416 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
417 f.debug_tuple(name:"Iter").field(&self.queue).finish()
418 }
419}
420
421impl<T> Iterator for TryIter<'_, T> {
422 type Item = T;
423
424 fn next(&mut self) -> Option<Self::Item> {
425 self.queue.pop().ok()
426 }
427}
428
429/// Error which occurs when popping from an empty queue.
430#[derive(Clone, Copy, Eq, PartialEq)]
431pub enum PopError {
432 /// The queue is empty but not closed.
433 Empty,
434
435 /// The queue is empty and closed.
436 Closed,
437}
438
439impl PopError {
440 /// Returns `true` if the queue is empty but not closed.
441 pub fn is_empty(&self) -> bool {
442 match self {
443 PopError::Empty => true,
444 PopError::Closed => false,
445 }
446 }
447
448 /// Returns `true` if the queue is empty and closed.
449 pub fn is_closed(&self) -> bool {
450 match self {
451 PopError::Empty => false,
452 PopError::Closed => true,
453 }
454 }
455}
456
457#[cfg(feature = "std")]
458impl error::Error for PopError {}
459
460impl fmt::Debug for PopError {
461 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
462 match self {
463 PopError::Empty => write!(f, "Empty"),
464 PopError::Closed => write!(f, "Closed"),
465 }
466 }
467}
468
469impl fmt::Display for PopError {
470 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
471 match self {
472 PopError::Empty => write!(f, "Empty"),
473 PopError::Closed => write!(f, "Closed"),
474 }
475 }
476}
477
478/// Error which occurs when pushing into a full or closed queue.
479#[derive(Clone, Copy, Eq, PartialEq)]
480pub enum PushError<T> {
481 /// The queue is full but not closed.
482 Full(T),
483
484 /// The queue is closed.
485 Closed(T),
486}
487
488impl<T> PushError<T> {
489 /// Unwraps the item that couldn't be pushed.
490 pub fn into_inner(self) -> T {
491 match self {
492 PushError::Full(t) => t,
493 PushError::Closed(t) => t,
494 }
495 }
496
497 /// Returns `true` if the queue is full but not closed.
498 pub fn is_full(&self) -> bool {
499 match self {
500 PushError::Full(_) => true,
501 PushError::Closed(_) => false,
502 }
503 }
504
505 /// Returns `true` if the queue is closed.
506 pub fn is_closed(&self) -> bool {
507 match self {
508 PushError::Full(_) => false,
509 PushError::Closed(_) => true,
510 }
511 }
512}
513
514#[cfg(feature = "std")]
515impl<T: fmt::Debug> error::Error for PushError<T> {}
516
517impl<T: fmt::Debug> fmt::Debug for PushError<T> {
518 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
519 match self {
520 PushError::Full(t: &T) => f.debug_tuple(name:"Full").field(t).finish(),
521 PushError::Closed(t: &T) => f.debug_tuple(name:"Closed").field(t).finish(),
522 }
523 }
524}
525
526impl<T> fmt::Display for PushError<T> {
527 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
528 match self {
529 PushError::Full(_) => write!(f, "Full"),
530 PushError::Closed(_) => write!(f, "Closed"),
531 }
532 }
533}
534
535/// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster.
536#[inline]
537fn full_fence() {
538 #[cfg(all(any(target_arch = "x86", target_arch = "x86_64"), not(miri), not(loom)))]
539 {
540 use core::{arch::asm, cell::UnsafeCell};
541 // HACK(stjepang): On x86 architectures there are two different ways of executing
542 // a `SeqCst` fence.
543 //
544 // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
545 // 2. A `lock <op>` instruction.
546 //
547 // Both instructions have the effect of a full barrier, but empirical benchmarks have shown
548 // that the second one is sometimes a bit faster.
549 let a = UnsafeCell::new(0_usize);
550 // It is common to use `lock or` here, but when using a local variable, `lock not`, which
551 // does not change the flag, should be slightly more efficient.
552 // Refs: https://www.felixcloutier.com/x86/not
553 unsafe {
554 #[cfg(target_pointer_width = "64")]
555 asm!("lock not qword ptr [{0}]", in(reg) a.get(), options(nostack, preserves_flags));
556 #[cfg(target_pointer_width = "32")]
557 asm!("lock not dword ptr [{0:e}]", in(reg) a.get(), options(nostack, preserves_flags));
558 }
559 return;
560 }
561 #[allow(unreachable_code)]
562 {
563 atomic::fence(Ordering::SeqCst);
564 }
565}
566