| 1 | //! Fixed capacity Single Producer Single Consumer (SPSC) queue |
| 2 | //! |
| 3 | //! Implementation based on <https://www.codeproject.com/Articles/43510/Lock-Free-Single-Producer-Single-Consumer-Circular> |
| 4 | //! |
| 5 | //! # Portability |
| 6 | //! |
| 7 | //! This module requires CAS atomic instructions which are not available on all architectures |
| 8 | //! (e.g. ARMv6-M (`thumbv6m-none-eabi`) and MSP430 (`msp430-none-elf`)). These atomics can be |
| 9 | //! emulated however with [`portable-atomic`](https://crates.io/crates/portable-atomic), which is |
| 10 | //! enabled with the `cas` feature and is enabled by default for `thumbv6m-none-eabi` and `riscv32` |
| 11 | //! targets. |
| 12 | //! |
| 13 | //! # Examples |
| 14 | //! |
| 15 | //! - `Queue` can be used as a plain queue |
| 16 | //! |
| 17 | //! ``` |
| 18 | //! use heapless::spsc::Queue; |
| 19 | //! |
| 20 | //! let mut rb: Queue<u8, 4> = Queue::new(); |
| 21 | //! |
| 22 | //! assert!(rb.enqueue(0).is_ok()); |
| 23 | //! assert!(rb.enqueue(1).is_ok()); |
| 24 | //! assert!(rb.enqueue(2).is_ok()); |
| 25 | //! assert!(rb.enqueue(3).is_err()); // full |
| 26 | //! |
| 27 | //! assert_eq!(rb.dequeue(), Some(0)); |
| 28 | //! ``` |
| 29 | //! |
| 30 | //! - `Queue` can be `split` and then be used in Single Producer Single Consumer mode. |
| 31 | //! |
| 32 | //! "no alloc" applications can create a `&'static mut` reference to a `Queue` -- using a static |
| 33 | //! variable -- and then `split` it: this consumes the static reference. The resulting `Consumer` |
| 34 | //! and `Producer` can then be moved into different execution contexts (threads, interrupt handlers, |
| 35 | //! etc.) |
| 36 | //! |
| 37 | //! ``` |
| 38 | //! use heapless::spsc::{Producer, Queue}; |
| 39 | //! |
| 40 | //! enum Event { A, B } |
| 41 | //! |
| 42 | //! fn main() { |
| 43 | //! let queue: &'static mut Queue<Event, 4> = { |
| 44 | //! static mut Q: Queue<Event, 4> = Queue::new(); |
| 45 | //! unsafe { &mut Q } |
| 46 | //! }; |
| 47 | //! |
| 48 | //! let (producer, mut consumer) = queue.split(); |
| 49 | //! |
| 50 | //! // `producer` can be moved into `interrupt_handler` using a static mutex or the mechanism |
| 51 | //! // provided by the concurrency framework you are using (e.g. a resource in RTIC) |
| 52 | //! |
| 53 | //! loop { |
| 54 | //! match consumer.dequeue() { |
| 55 | //! Some(Event::A) => { /* .. */ }, |
| 56 | //! Some(Event::B) => { /* .. */ }, |
| 57 | //! None => { /* sleep */ }, |
| 58 | //! } |
| 59 | //! # break |
| 60 | //! } |
| 61 | //! } |
| 62 | //! |
| 63 | //! // this is a different execution context that can preempt `main` |
| 64 | //! fn interrupt_handler(producer: &mut Producer<'static, Event, 4>) { |
| 65 | //! # let condition = true; |
| 66 | //! |
| 67 | //! // .. |
| 68 | //! |
| 69 | //! if condition { |
| 70 | //! producer.enqueue(Event::A).ok().unwrap(); |
| 71 | //! } else { |
| 72 | //! producer.enqueue(Event::B).ok().unwrap(); |
| 73 | //! } |
| 74 | //! |
| 75 | //! // .. |
| 76 | //! } |
| 77 | //! ``` |
| 78 | //! |
| 79 | //! # Benchmarks |
| 80 | //! |
| 81 | //! Measured on a ARM Cortex-M3 core running at 8 MHz and with zero Flash wait cycles |
| 82 | //! |
| 83 | //! `-C opt-level` |`3`| |
| 84 | //! -----------------------|---| |
| 85 | //! `Consumer<u8>::dequeue`| 15| |
| 86 | //! `Queue<u8>::dequeue` | 12| |
| 87 | //! `Producer<u8>::enqueue`| 16| |
| 88 | //! `Queue<u8>::enqueue` | 14| |
| 89 | //! |
| 90 | //! - All execution times are in clock cycles. 1 clock cycle = 125 ns. |
| 91 | //! - Execution time is *dependent* of `mem::size_of::<T>()`. Both operations include one |
| 92 | //! `memcpy(T)` in their successful path. |
| 93 | //! - The optimization level is indicated in the first row. |
| 94 | //! - The numbers reported correspond to the successful path (i.e. `Some` is returned by `dequeue` |
| 95 | //! and `Ok` is returned by `enqueue`). |
| 96 | |
| 97 | use core::{cell::UnsafeCell, fmt, hash, mem::MaybeUninit, ptr}; |
| 98 | |
| 99 | #[cfg (not(feature = "portable-atomic" ))] |
| 100 | use core::sync::atomic; |
| 101 | #[cfg (feature = "portable-atomic" )] |
| 102 | use portable_atomic as atomic; |
| 103 | |
| 104 | use atomic::{AtomicUsize, Ordering}; |
| 105 | |
| 106 | /// A statically allocated single producer single consumer queue with a capacity of `N - 1` elements |
| 107 | /// |
| 108 | /// *IMPORTANT*: To get better performance use a value for `N` that is a power of 2 (e.g. `16`, `32`, |
| 109 | /// etc.). |
| 110 | pub struct Queue<T, const N: usize> { |
| 111 | // this is from where we dequeue items |
| 112 | pub(crate) head: AtomicUsize, |
| 113 | |
| 114 | // this is where we enqueue new items |
| 115 | pub(crate) tail: AtomicUsize, |
| 116 | |
| 117 | pub(crate) buffer: [UnsafeCell<MaybeUninit<T>>; N], |
| 118 | } |
| 119 | |
| 120 | impl<T, const N: usize> Queue<T, N> { |
| 121 | const INIT: UnsafeCell<MaybeUninit<T>> = UnsafeCell::new(MaybeUninit::uninit()); |
| 122 | |
| 123 | #[inline ] |
| 124 | fn increment(val: usize) -> usize { |
| 125 | (val + 1) % N |
| 126 | } |
| 127 | |
| 128 | /// Creates an empty queue with a fixed capacity of `N - 1` |
| 129 | pub const fn new() -> Self { |
| 130 | // Const assert N > 1 |
| 131 | crate::sealed::greater_than_1::<N>(); |
| 132 | |
| 133 | Queue { |
| 134 | head: AtomicUsize::new(0), |
| 135 | tail: AtomicUsize::new(0), |
| 136 | buffer: [Self::INIT; N], |
| 137 | } |
| 138 | } |
| 139 | |
| 140 | /// Returns the maximum number of elements the queue can hold |
| 141 | #[inline ] |
| 142 | pub const fn capacity(&self) -> usize { |
| 143 | N - 1 |
| 144 | } |
| 145 | |
| 146 | /// Returns the number of elements in the queue |
| 147 | #[inline ] |
| 148 | pub fn len(&self) -> usize { |
| 149 | let current_head = self.head.load(Ordering::Relaxed); |
| 150 | let current_tail = self.tail.load(Ordering::Relaxed); |
| 151 | |
| 152 | current_tail.wrapping_sub(current_head).wrapping_add(N) % N |
| 153 | } |
| 154 | |
| 155 | /// Returns `true` if the queue is empty |
| 156 | #[inline ] |
| 157 | pub fn is_empty(&self) -> bool { |
| 158 | self.head.load(Ordering::Relaxed) == self.tail.load(Ordering::Relaxed) |
| 159 | } |
| 160 | |
| 161 | /// Returns `true` if the queue is full |
| 162 | #[inline ] |
| 163 | pub fn is_full(&self) -> bool { |
| 164 | Self::increment(self.tail.load(Ordering::Relaxed)) == self.head.load(Ordering::Relaxed) |
| 165 | } |
| 166 | |
| 167 | /// Iterates from the front of the queue to the back |
| 168 | pub fn iter(&self) -> Iter<'_, T, N> { |
| 169 | Iter { |
| 170 | rb: self, |
| 171 | index: 0, |
| 172 | len: self.len(), |
| 173 | } |
| 174 | } |
| 175 | |
| 176 | /// Returns an iterator that allows modifying each value |
| 177 | pub fn iter_mut(&mut self) -> IterMut<'_, T, N> { |
| 178 | let len = self.len(); |
| 179 | IterMut { |
| 180 | rb: self, |
| 181 | index: 0, |
| 182 | len, |
| 183 | } |
| 184 | } |
| 185 | |
| 186 | /// Adds an `item` to the end of the queue |
| 187 | /// |
| 188 | /// Returns back the `item` if the queue is full |
| 189 | #[inline ] |
| 190 | pub fn enqueue(&mut self, val: T) -> Result<(), T> { |
| 191 | unsafe { self.inner_enqueue(val) } |
| 192 | } |
| 193 | |
| 194 | /// Returns the item in the front of the queue, or `None` if the queue is empty |
| 195 | #[inline ] |
| 196 | pub fn dequeue(&mut self) -> Option<T> { |
| 197 | unsafe { self.inner_dequeue() } |
| 198 | } |
| 199 | |
| 200 | /// Returns a reference to the item in the front of the queue without dequeuing, or |
| 201 | /// `None` if the queue is empty. |
| 202 | /// |
| 203 | /// # Examples |
| 204 | /// ``` |
| 205 | /// use heapless::spsc::Queue; |
| 206 | /// |
| 207 | /// let mut queue: Queue<u8, 235> = Queue::new(); |
| 208 | /// let (mut producer, mut consumer) = queue.split(); |
| 209 | /// assert_eq!(None, consumer.peek()); |
| 210 | /// producer.enqueue(1); |
| 211 | /// assert_eq!(Some(&1), consumer.peek()); |
| 212 | /// assert_eq!(Some(1), consumer.dequeue()); |
| 213 | /// assert_eq!(None, consumer.peek()); |
| 214 | /// ``` |
| 215 | pub fn peek(&self) -> Option<&T> { |
| 216 | if !self.is_empty() { |
| 217 | let head = self.head.load(Ordering::Relaxed); |
| 218 | Some(unsafe { &*(self.buffer.get_unchecked(head).get() as *const T) }) |
| 219 | } else { |
| 220 | None |
| 221 | } |
| 222 | } |
| 223 | |
| 224 | // The memory for enqueueing is "owned" by the tail pointer. |
| 225 | // NOTE: This internal function uses internal mutability to allow the [`Producer`] to enqueue |
| 226 | // items without doing pointer arithmetic and accessing internal fields of this type. |
| 227 | unsafe fn inner_enqueue(&self, val: T) -> Result<(), T> { |
| 228 | let current_tail = self.tail.load(Ordering::Relaxed); |
| 229 | let next_tail = Self::increment(current_tail); |
| 230 | |
| 231 | if next_tail != self.head.load(Ordering::Acquire) { |
| 232 | (self.buffer.get_unchecked(current_tail).get()).write(MaybeUninit::new(val)); |
| 233 | self.tail.store(next_tail, Ordering::Release); |
| 234 | |
| 235 | Ok(()) |
| 236 | } else { |
| 237 | Err(val) |
| 238 | } |
| 239 | } |
| 240 | |
| 241 | // The memory for enqueueing is "owned" by the tail pointer. |
| 242 | // NOTE: This internal function uses internal mutability to allow the [`Producer`] to enqueue |
| 243 | // items without doing pointer arithmetic and accessing internal fields of this type. |
| 244 | unsafe fn inner_enqueue_unchecked(&self, val: T) { |
| 245 | let current_tail = self.tail.load(Ordering::Relaxed); |
| 246 | |
| 247 | (self.buffer.get_unchecked(current_tail).get()).write(MaybeUninit::new(val)); |
| 248 | self.tail |
| 249 | .store(Self::increment(current_tail), Ordering::Release); |
| 250 | } |
| 251 | |
| 252 | /// Adds an `item` to the end of the queue, without checking if it's full |
| 253 | /// |
| 254 | /// # Unsafety |
| 255 | /// |
| 256 | /// If the queue is full this operation will leak a value (T's destructor won't run on |
| 257 | /// the value that got overwritten by `item`), *and* will allow the `dequeue` operation |
| 258 | /// to create a copy of `item`, which could result in `T`'s destructor running on `item` |
| 259 | /// twice. |
| 260 | pub unsafe fn enqueue_unchecked(&mut self, val: T) { |
| 261 | self.inner_enqueue_unchecked(val) |
| 262 | } |
| 263 | |
| 264 | // The memory for dequeuing is "owned" by the head pointer,. |
| 265 | // NOTE: This internal function uses internal mutability to allow the [`Consumer`] to dequeue |
| 266 | // items without doing pointer arithmetic and accessing internal fields of this type. |
| 267 | unsafe fn inner_dequeue(&self) -> Option<T> { |
| 268 | let current_head = self.head.load(Ordering::Relaxed); |
| 269 | |
| 270 | if current_head == self.tail.load(Ordering::Acquire) { |
| 271 | None |
| 272 | } else { |
| 273 | let v = (self.buffer.get_unchecked(current_head).get() as *const T).read(); |
| 274 | |
| 275 | self.head |
| 276 | .store(Self::increment(current_head), Ordering::Release); |
| 277 | |
| 278 | Some(v) |
| 279 | } |
| 280 | } |
| 281 | |
| 282 | // The memory for dequeuing is "owned" by the head pointer,. |
| 283 | // NOTE: This internal function uses internal mutability to allow the [`Consumer`] to dequeue |
| 284 | // items without doing pointer arithmetic and accessing internal fields of this type. |
| 285 | unsafe fn inner_dequeue_unchecked(&self) -> T { |
| 286 | let current_head = self.head.load(Ordering::Relaxed); |
| 287 | let v = (self.buffer.get_unchecked(current_head).get() as *const T).read(); |
| 288 | |
| 289 | self.head |
| 290 | .store(Self::increment(current_head), Ordering::Release); |
| 291 | |
| 292 | v |
| 293 | } |
| 294 | |
| 295 | /// Returns the item in the front of the queue, without checking if there is something in the |
| 296 | /// queue |
| 297 | /// |
| 298 | /// # Unsafety |
| 299 | /// |
| 300 | /// If the queue is empty this operation will return uninitialized memory. |
| 301 | pub unsafe fn dequeue_unchecked(&mut self) -> T { |
| 302 | self.inner_dequeue_unchecked() |
| 303 | } |
| 304 | |
| 305 | /// Splits a queue into producer and consumer endpoints |
| 306 | pub fn split(&mut self) -> (Producer<'_, T, N>, Consumer<'_, T, N>) { |
| 307 | (Producer { rb: self }, Consumer { rb: self }) |
| 308 | } |
| 309 | } |
| 310 | |
| 311 | impl<T, const N: usize> Default for Queue<T, N> { |
| 312 | fn default() -> Self { |
| 313 | Self::new() |
| 314 | } |
| 315 | } |
| 316 | |
| 317 | impl<T, const N: usize> Clone for Queue<T, N> |
| 318 | where |
| 319 | T: Clone, |
| 320 | { |
| 321 | fn clone(&self) -> Self { |
| 322 | let mut new: Queue<T, N> = Queue::new(); |
| 323 | |
| 324 | for s: &T in self.iter() { |
| 325 | unsafe { |
| 326 | // NOTE(unsafe) new.capacity() == self.capacity() >= self.len() |
| 327 | // no overflow possible |
| 328 | new.enqueue_unchecked(val:s.clone()); |
| 329 | } |
| 330 | } |
| 331 | |
| 332 | new |
| 333 | } |
| 334 | } |
| 335 | |
| 336 | impl<T, const N: usize, const N2: usize> PartialEq<Queue<T, N2>> for Queue<T, N> |
| 337 | where |
| 338 | T: PartialEq, |
| 339 | { |
| 340 | fn eq(&self, other: &Queue<T, N2>) -> bool { |
| 341 | self.len() == other.len() && self.iter().zip(other.iter()).all(|(v1: &T, v2: &T)| v1 == v2) |
| 342 | } |
| 343 | } |
| 344 | |
| 345 | impl<T, const N: usize> Eq for Queue<T, N> where T: Eq {} |
| 346 | |
| 347 | /// An iterator over the items of a queue |
| 348 | pub struct Iter<'a, T, const N: usize> { |
| 349 | rb: &'a Queue<T, N>, |
| 350 | index: usize, |
| 351 | len: usize, |
| 352 | } |
| 353 | |
| 354 | impl<'a, T, const N: usize> Clone for Iter<'a, T, N> { |
| 355 | fn clone(&self) -> Self { |
| 356 | Self { |
| 357 | rb: self.rb, |
| 358 | index: self.index, |
| 359 | len: self.len, |
| 360 | } |
| 361 | } |
| 362 | } |
| 363 | |
| 364 | /// A mutable iterator over the items of a queue |
| 365 | pub struct IterMut<'a, T, const N: usize> { |
| 366 | rb: &'a mut Queue<T, N>, |
| 367 | index: usize, |
| 368 | len: usize, |
| 369 | } |
| 370 | |
| 371 | impl<'a, T, const N: usize> Iterator for Iter<'a, T, N> { |
| 372 | type Item = &'a T; |
| 373 | |
| 374 | fn next(&mut self) -> Option<Self::Item> { |
| 375 | if self.index < self.len { |
| 376 | let head: usize = self.rb.head.load(order:Ordering::Relaxed); |
| 377 | |
| 378 | let i: usize = (head + self.index) % N; |
| 379 | self.index += 1; |
| 380 | |
| 381 | Some(unsafe { &*(self.rb.buffer.get_unchecked(index:i).get() as *const T) }) |
| 382 | } else { |
| 383 | None |
| 384 | } |
| 385 | } |
| 386 | } |
| 387 | |
| 388 | impl<'a, T, const N: usize> Iterator for IterMut<'a, T, N> { |
| 389 | type Item = &'a mut T; |
| 390 | |
| 391 | fn next(&mut self) -> Option<Self::Item> { |
| 392 | if self.index < self.len { |
| 393 | let head: usize = self.rb.head.load(order:Ordering::Relaxed); |
| 394 | |
| 395 | let i: usize = (head + self.index) % N; |
| 396 | self.index += 1; |
| 397 | |
| 398 | Some(unsafe { &mut *(self.rb.buffer.get_unchecked(index:i).get() as *mut T) }) |
| 399 | } else { |
| 400 | None |
| 401 | } |
| 402 | } |
| 403 | } |
| 404 | |
| 405 | impl<'a, T, const N: usize> DoubleEndedIterator for Iter<'a, T, N> { |
| 406 | fn next_back(&mut self) -> Option<Self::Item> { |
| 407 | if self.index < self.len { |
| 408 | let head: usize = self.rb.head.load(order:Ordering::Relaxed); |
| 409 | |
| 410 | // self.len > 0, since it's larger than self.index > 0 |
| 411 | let i: usize = (head + self.len - 1) % N; |
| 412 | self.len -= 1; |
| 413 | Some(unsafe { &*(self.rb.buffer.get_unchecked(index:i).get() as *const T) }) |
| 414 | } else { |
| 415 | None |
| 416 | } |
| 417 | } |
| 418 | } |
| 419 | |
| 420 | impl<'a, T, const N: usize> DoubleEndedIterator for IterMut<'a, T, N> { |
| 421 | fn next_back(&mut self) -> Option<Self::Item> { |
| 422 | if self.index < self.len { |
| 423 | let head: usize = self.rb.head.load(order:Ordering::Relaxed); |
| 424 | |
| 425 | // self.len > 0, since it's larger than self.index > 0 |
| 426 | let i: usize = (head + self.len - 1) % N; |
| 427 | self.len -= 1; |
| 428 | Some(unsafe { &mut *(self.rb.buffer.get_unchecked(index:i).get() as *mut T) }) |
| 429 | } else { |
| 430 | None |
| 431 | } |
| 432 | } |
| 433 | } |
| 434 | |
| 435 | impl<T, const N: usize> Drop for Queue<T, N> { |
| 436 | fn drop(&mut self) { |
| 437 | for item: &mut T in self { |
| 438 | unsafe { |
| 439 | ptr::drop_in_place(to_drop:item); |
| 440 | } |
| 441 | } |
| 442 | } |
| 443 | } |
| 444 | |
| 445 | impl<T, const N: usize> fmt::Debug for Queue<T, N> |
| 446 | where |
| 447 | T: fmt::Debug, |
| 448 | { |
| 449 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 450 | f.debug_list().entries(self.iter()).finish() |
| 451 | } |
| 452 | } |
| 453 | |
| 454 | impl<T, const N: usize> hash::Hash for Queue<T, N> |
| 455 | where |
| 456 | T: hash::Hash, |
| 457 | { |
| 458 | fn hash<H: hash::Hasher>(&self, state: &mut H) { |
| 459 | // iterate over self in order |
| 460 | for t: &T in self.iter() { |
| 461 | hash::Hash::hash(self:t, state); |
| 462 | } |
| 463 | } |
| 464 | } |
| 465 | |
| 466 | impl<'a, T, const N: usize> IntoIterator for &'a Queue<T, N> { |
| 467 | type Item = &'a T; |
| 468 | type IntoIter = Iter<'a, T, N>; |
| 469 | |
| 470 | fn into_iter(self) -> Self::IntoIter { |
| 471 | self.iter() |
| 472 | } |
| 473 | } |
| 474 | |
| 475 | impl<'a, T, const N: usize> IntoIterator for &'a mut Queue<T, N> { |
| 476 | type Item = &'a mut T; |
| 477 | type IntoIter = IterMut<'a, T, N>; |
| 478 | |
| 479 | fn into_iter(self) -> Self::IntoIter { |
| 480 | self.iter_mut() |
| 481 | } |
| 482 | } |
| 483 | |
| 484 | /// A queue "consumer"; it can dequeue items from the queue |
| 485 | /// NOTE the consumer semantically owns the `head` pointer of the queue |
| 486 | pub struct Consumer<'a, T, const N: usize> { |
| 487 | rb: &'a Queue<T, N>, |
| 488 | } |
| 489 | |
| 490 | unsafe impl<'a, T, const N: usize> Send for Consumer<'a, T, N> where T: Send {} |
| 491 | |
| 492 | /// A queue "producer"; it can enqueue items into the queue |
| 493 | /// NOTE the producer semantically owns the `tail` pointer of the queue |
| 494 | pub struct Producer<'a, T, const N: usize> { |
| 495 | rb: &'a Queue<T, N>, |
| 496 | } |
| 497 | |
| 498 | unsafe impl<'a, T, const N: usize> Send for Producer<'a, T, N> where T: Send {} |
| 499 | |
| 500 | impl<'a, T, const N: usize> Consumer<'a, T, N> { |
| 501 | /// Returns the item in the front of the queue, or `None` if the queue is empty |
| 502 | #[inline ] |
| 503 | pub fn dequeue(&mut self) -> Option<T> { |
| 504 | unsafe { self.rb.inner_dequeue() } |
| 505 | } |
| 506 | |
| 507 | /// Returns the item in the front of the queue, without checking if there are elements in the |
| 508 | /// queue |
| 509 | /// |
| 510 | /// See [`Queue::dequeue_unchecked`] for safety |
| 511 | #[inline ] |
| 512 | pub unsafe fn dequeue_unchecked(&mut self) -> T { |
| 513 | self.rb.inner_dequeue_unchecked() |
| 514 | } |
| 515 | |
| 516 | /// Returns if there are any items to dequeue. When this returns `true`, at least the |
| 517 | /// first subsequent dequeue will succeed |
| 518 | #[inline ] |
| 519 | pub fn ready(&self) -> bool { |
| 520 | !self.rb.is_empty() |
| 521 | } |
| 522 | |
| 523 | /// Returns the number of elements in the queue |
| 524 | #[inline ] |
| 525 | pub fn len(&self) -> usize { |
| 526 | self.rb.len() |
| 527 | } |
| 528 | |
| 529 | /// Returns the maximum number of elements the queue can hold |
| 530 | #[inline ] |
| 531 | pub fn capacity(&self) -> usize { |
| 532 | self.rb.capacity() |
| 533 | } |
| 534 | |
| 535 | /// Returns the item in the front of the queue without dequeuing, or `None` if the queue is |
| 536 | /// empty |
| 537 | /// |
| 538 | /// # Examples |
| 539 | /// ``` |
| 540 | /// use heapless::spsc::Queue; |
| 541 | /// |
| 542 | /// let mut queue: Queue<u8, 235> = Queue::new(); |
| 543 | /// let (mut producer, mut consumer) = queue.split(); |
| 544 | /// assert_eq!(None, consumer.peek()); |
| 545 | /// producer.enqueue(1); |
| 546 | /// assert_eq!(Some(&1), consumer.peek()); |
| 547 | /// assert_eq!(Some(1), consumer.dequeue()); |
| 548 | /// assert_eq!(None, consumer.peek()); |
| 549 | /// ``` |
| 550 | #[inline ] |
| 551 | pub fn peek(&self) -> Option<&T> { |
| 552 | self.rb.peek() |
| 553 | } |
| 554 | } |
| 555 | |
| 556 | impl<'a, T, const N: usize> Producer<'a, T, N> { |
| 557 | /// Adds an `item` to the end of the queue, returns back the `item` if the queue is full |
| 558 | #[inline ] |
| 559 | pub fn enqueue(&mut self, val: T) -> Result<(), T> { |
| 560 | unsafe { self.rb.inner_enqueue(val) } |
| 561 | } |
| 562 | |
| 563 | /// Adds an `item` to the end of the queue, without checking if the queue is full |
| 564 | /// |
| 565 | /// See [`Queue::enqueue_unchecked`] for safety |
| 566 | #[inline ] |
| 567 | pub unsafe fn enqueue_unchecked(&mut self, val: T) { |
| 568 | self.rb.inner_enqueue_unchecked(val) |
| 569 | } |
| 570 | |
| 571 | /// Returns if there is any space to enqueue a new item. When this returns true, at |
| 572 | /// least the first subsequent enqueue will succeed. |
| 573 | #[inline ] |
| 574 | pub fn ready(&self) -> bool { |
| 575 | !self.rb.is_full() |
| 576 | } |
| 577 | |
| 578 | /// Returns the number of elements in the queue |
| 579 | #[inline ] |
| 580 | pub fn len(&self) -> usize { |
| 581 | self.rb.len() |
| 582 | } |
| 583 | |
| 584 | /// Returns the maximum number of elements the queue can hold |
| 585 | #[inline ] |
| 586 | pub fn capacity(&self) -> usize { |
| 587 | self.rb.capacity() |
| 588 | } |
| 589 | } |
| 590 | |
| 591 | #[cfg (test)] |
| 592 | mod tests { |
| 593 | use std::hash::{Hash, Hasher}; |
| 594 | |
| 595 | use crate::spsc::Queue; |
| 596 | |
| 597 | #[test ] |
| 598 | fn full() { |
| 599 | let mut rb: Queue<i32, 3> = Queue::new(); |
| 600 | |
| 601 | assert_eq!(rb.is_full(), false); |
| 602 | |
| 603 | rb.enqueue(1).unwrap(); |
| 604 | assert_eq!(rb.is_full(), false); |
| 605 | |
| 606 | rb.enqueue(2).unwrap(); |
| 607 | assert_eq!(rb.is_full(), true); |
| 608 | } |
| 609 | |
| 610 | #[test ] |
| 611 | fn empty() { |
| 612 | let mut rb: Queue<i32, 3> = Queue::new(); |
| 613 | |
| 614 | assert_eq!(rb.is_empty(), true); |
| 615 | |
| 616 | rb.enqueue(1).unwrap(); |
| 617 | assert_eq!(rb.is_empty(), false); |
| 618 | |
| 619 | rb.enqueue(2).unwrap(); |
| 620 | assert_eq!(rb.is_empty(), false); |
| 621 | } |
| 622 | |
| 623 | #[test ] |
| 624 | #[cfg_attr (miri, ignore)] // too slow |
| 625 | fn len() { |
| 626 | let mut rb: Queue<i32, 3> = Queue::new(); |
| 627 | |
| 628 | assert_eq!(rb.len(), 0); |
| 629 | |
| 630 | rb.enqueue(1).unwrap(); |
| 631 | assert_eq!(rb.len(), 1); |
| 632 | |
| 633 | rb.enqueue(2).unwrap(); |
| 634 | assert_eq!(rb.len(), 2); |
| 635 | |
| 636 | for _ in 0..1_000_000 { |
| 637 | let v = rb.dequeue().unwrap(); |
| 638 | println!("{}" , v); |
| 639 | rb.enqueue(v).unwrap(); |
| 640 | assert_eq!(rb.len(), 2); |
| 641 | } |
| 642 | } |
| 643 | |
| 644 | #[test ] |
| 645 | #[cfg_attr (miri, ignore)] // too slow |
| 646 | fn try_overflow() { |
| 647 | const N: usize = 23; |
| 648 | let mut rb: Queue<i32, N> = Queue::new(); |
| 649 | |
| 650 | for i in 0..N as i32 - 1 { |
| 651 | rb.enqueue(i).unwrap(); |
| 652 | } |
| 653 | |
| 654 | for _ in 0..1_000_000 { |
| 655 | for i in 0..N as i32 - 1 { |
| 656 | let d = rb.dequeue().unwrap(); |
| 657 | assert_eq!(d, i); |
| 658 | rb.enqueue(i).unwrap(); |
| 659 | } |
| 660 | } |
| 661 | } |
| 662 | |
| 663 | #[test ] |
| 664 | fn sanity() { |
| 665 | let mut rb: Queue<i32, 10> = Queue::new(); |
| 666 | |
| 667 | let (mut p, mut c) = rb.split(); |
| 668 | |
| 669 | assert_eq!(p.ready(), true); |
| 670 | |
| 671 | assert_eq!(c.ready(), false); |
| 672 | |
| 673 | assert_eq!(c.dequeue(), None); |
| 674 | |
| 675 | p.enqueue(0).unwrap(); |
| 676 | |
| 677 | assert_eq!(c.dequeue(), Some(0)); |
| 678 | } |
| 679 | |
| 680 | #[test ] |
| 681 | fn static_new() { |
| 682 | static mut _Q: Queue<i32, 4> = Queue::new(); |
| 683 | } |
| 684 | |
| 685 | #[test ] |
| 686 | fn drop() { |
| 687 | struct Droppable; |
| 688 | impl Droppable { |
| 689 | fn new() -> Self { |
| 690 | unsafe { |
| 691 | COUNT += 1; |
| 692 | } |
| 693 | Droppable |
| 694 | } |
| 695 | } |
| 696 | |
| 697 | impl Drop for Droppable { |
| 698 | fn drop(&mut self) { |
| 699 | unsafe { |
| 700 | COUNT -= 1; |
| 701 | } |
| 702 | } |
| 703 | } |
| 704 | |
| 705 | static mut COUNT: i32 = 0; |
| 706 | |
| 707 | { |
| 708 | let mut v: Queue<Droppable, 4> = Queue::new(); |
| 709 | v.enqueue(Droppable::new()).ok().unwrap(); |
| 710 | v.enqueue(Droppable::new()).ok().unwrap(); |
| 711 | v.dequeue().unwrap(); |
| 712 | } |
| 713 | |
| 714 | assert_eq!(unsafe { COUNT }, 0); |
| 715 | |
| 716 | { |
| 717 | let mut v: Queue<Droppable, 4> = Queue::new(); |
| 718 | v.enqueue(Droppable::new()).ok().unwrap(); |
| 719 | v.enqueue(Droppable::new()).ok().unwrap(); |
| 720 | } |
| 721 | |
| 722 | assert_eq!(unsafe { COUNT }, 0); |
| 723 | } |
| 724 | |
| 725 | #[test ] |
| 726 | fn iter() { |
| 727 | let mut rb: Queue<i32, 4> = Queue::new(); |
| 728 | |
| 729 | rb.enqueue(0).unwrap(); |
| 730 | rb.dequeue().unwrap(); |
| 731 | rb.enqueue(1).unwrap(); |
| 732 | rb.enqueue(2).unwrap(); |
| 733 | rb.enqueue(3).unwrap(); |
| 734 | |
| 735 | let mut items = rb.iter(); |
| 736 | |
| 737 | // assert_eq!(items.next(), Some(&0)); |
| 738 | assert_eq!(items.next(), Some(&1)); |
| 739 | assert_eq!(items.next(), Some(&2)); |
| 740 | assert_eq!(items.next(), Some(&3)); |
| 741 | assert_eq!(items.next(), None); |
| 742 | } |
| 743 | |
| 744 | #[test ] |
| 745 | fn iter_double_ended() { |
| 746 | let mut rb: Queue<i32, 4> = Queue::new(); |
| 747 | |
| 748 | rb.enqueue(0).unwrap(); |
| 749 | rb.enqueue(1).unwrap(); |
| 750 | rb.enqueue(2).unwrap(); |
| 751 | |
| 752 | let mut items = rb.iter(); |
| 753 | |
| 754 | assert_eq!(items.next(), Some(&0)); |
| 755 | assert_eq!(items.next_back(), Some(&2)); |
| 756 | assert_eq!(items.next(), Some(&1)); |
| 757 | assert_eq!(items.next(), None); |
| 758 | assert_eq!(items.next_back(), None); |
| 759 | } |
| 760 | |
| 761 | #[test ] |
| 762 | fn iter_mut() { |
| 763 | let mut rb: Queue<i32, 4> = Queue::new(); |
| 764 | |
| 765 | rb.enqueue(0).unwrap(); |
| 766 | rb.enqueue(1).unwrap(); |
| 767 | rb.enqueue(2).unwrap(); |
| 768 | |
| 769 | let mut items = rb.iter_mut(); |
| 770 | |
| 771 | assert_eq!(items.next(), Some(&mut 0)); |
| 772 | assert_eq!(items.next(), Some(&mut 1)); |
| 773 | assert_eq!(items.next(), Some(&mut 2)); |
| 774 | assert_eq!(items.next(), None); |
| 775 | } |
| 776 | |
| 777 | #[test ] |
| 778 | fn iter_mut_double_ended() { |
| 779 | let mut rb: Queue<i32, 4> = Queue::new(); |
| 780 | |
| 781 | rb.enqueue(0).unwrap(); |
| 782 | rb.enqueue(1).unwrap(); |
| 783 | rb.enqueue(2).unwrap(); |
| 784 | |
| 785 | let mut items = rb.iter_mut(); |
| 786 | |
| 787 | assert_eq!(items.next(), Some(&mut 0)); |
| 788 | assert_eq!(items.next_back(), Some(&mut 2)); |
| 789 | assert_eq!(items.next(), Some(&mut 1)); |
| 790 | assert_eq!(items.next(), None); |
| 791 | assert_eq!(items.next_back(), None); |
| 792 | } |
| 793 | |
| 794 | #[test ] |
| 795 | fn wrap_around() { |
| 796 | let mut rb: Queue<i32, 4> = Queue::new(); |
| 797 | |
| 798 | rb.enqueue(0).unwrap(); |
| 799 | rb.enqueue(1).unwrap(); |
| 800 | rb.enqueue(2).unwrap(); |
| 801 | rb.dequeue().unwrap(); |
| 802 | rb.dequeue().unwrap(); |
| 803 | rb.dequeue().unwrap(); |
| 804 | rb.enqueue(3).unwrap(); |
| 805 | rb.enqueue(4).unwrap(); |
| 806 | |
| 807 | assert_eq!(rb.len(), 2); |
| 808 | } |
| 809 | |
| 810 | #[test ] |
| 811 | fn ready_flag() { |
| 812 | let mut rb: Queue<i32, 3> = Queue::new(); |
| 813 | let (mut p, mut c) = rb.split(); |
| 814 | assert_eq!(c.ready(), false); |
| 815 | assert_eq!(p.ready(), true); |
| 816 | |
| 817 | p.enqueue(0).unwrap(); |
| 818 | |
| 819 | assert_eq!(c.ready(), true); |
| 820 | assert_eq!(p.ready(), true); |
| 821 | |
| 822 | p.enqueue(1).unwrap(); |
| 823 | |
| 824 | assert_eq!(c.ready(), true); |
| 825 | assert_eq!(p.ready(), false); |
| 826 | |
| 827 | c.dequeue().unwrap(); |
| 828 | |
| 829 | assert_eq!(c.ready(), true); |
| 830 | assert_eq!(p.ready(), true); |
| 831 | |
| 832 | c.dequeue().unwrap(); |
| 833 | |
| 834 | assert_eq!(c.ready(), false); |
| 835 | assert_eq!(p.ready(), true); |
| 836 | } |
| 837 | |
| 838 | #[test ] |
| 839 | fn clone() { |
| 840 | let mut rb1: Queue<i32, 4> = Queue::new(); |
| 841 | rb1.enqueue(0).unwrap(); |
| 842 | rb1.enqueue(0).unwrap(); |
| 843 | rb1.dequeue().unwrap(); |
| 844 | rb1.enqueue(0).unwrap(); |
| 845 | let rb2 = rb1.clone(); |
| 846 | assert_eq!(rb1.capacity(), rb2.capacity()); |
| 847 | assert_eq!(rb1.len(), rb2.len()); |
| 848 | assert!(rb1.iter().zip(rb2.iter()).all(|(v1, v2)| v1 == v2)); |
| 849 | } |
| 850 | |
| 851 | #[test ] |
| 852 | fn eq() { |
| 853 | // generate two queues with same content |
| 854 | // but different buffer alignment |
| 855 | let mut rb1: Queue<i32, 4> = Queue::new(); |
| 856 | rb1.enqueue(0).unwrap(); |
| 857 | rb1.enqueue(0).unwrap(); |
| 858 | rb1.dequeue().unwrap(); |
| 859 | rb1.enqueue(0).unwrap(); |
| 860 | let mut rb2: Queue<i32, 4> = Queue::new(); |
| 861 | rb2.enqueue(0).unwrap(); |
| 862 | rb2.enqueue(0).unwrap(); |
| 863 | assert!(rb1 == rb2); |
| 864 | // test for symmetry |
| 865 | assert!(rb2 == rb1); |
| 866 | // test for changes in content |
| 867 | rb1.enqueue(0).unwrap(); |
| 868 | assert!(rb1 != rb2); |
| 869 | rb2.enqueue(1).unwrap(); |
| 870 | assert!(rb1 != rb2); |
| 871 | // test for refexive relation |
| 872 | assert!(rb1 == rb1); |
| 873 | assert!(rb2 == rb2); |
| 874 | } |
| 875 | |
| 876 | #[test ] |
| 877 | fn hash_equality() { |
| 878 | // generate two queues with same content |
| 879 | // but different buffer alignment |
| 880 | let rb1 = { |
| 881 | let mut rb1: Queue<i32, 4> = Queue::new(); |
| 882 | rb1.enqueue(0).unwrap(); |
| 883 | rb1.enqueue(0).unwrap(); |
| 884 | rb1.dequeue().unwrap(); |
| 885 | rb1.enqueue(0).unwrap(); |
| 886 | rb1 |
| 887 | }; |
| 888 | let rb2 = { |
| 889 | let mut rb2: Queue<i32, 4> = Queue::new(); |
| 890 | rb2.enqueue(0).unwrap(); |
| 891 | rb2.enqueue(0).unwrap(); |
| 892 | rb2 |
| 893 | }; |
| 894 | let hash1 = { |
| 895 | let mut hasher1 = hash32::FnvHasher::default(); |
| 896 | rb1.hash(&mut hasher1); |
| 897 | let hash1 = hasher1.finish(); |
| 898 | hash1 |
| 899 | }; |
| 900 | let hash2 = { |
| 901 | let mut hasher2 = hash32::FnvHasher::default(); |
| 902 | rb2.hash(&mut hasher2); |
| 903 | let hash2 = hasher2.finish(); |
| 904 | hash2 |
| 905 | }; |
| 906 | assert_eq!(hash1, hash2); |
| 907 | } |
| 908 | } |
| 909 | |