1//! Michael-Scott lock-free queue.
2//!
3//! Usable with any number of producers and consumers.
4//!
5//! Michael and Scott. Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue
6//! Algorithms. PODC 1996. <http://dl.acm.org/citation.cfm?id=248106>
7//!
8//! Simon Doherty, Lindsay Groves, Victor Luchangco, and Mark Moir. 2004b. Formal Verification of a
9//! Practical Lock-Free Queue Algorithm. <https://doi.org/10.1007/978-3-540-30232-2_7>
10
11use core::mem::MaybeUninit;
12use core::sync::atomic::Ordering::{Acquire, Relaxed, Release};
13
14use crossbeam_utils::CachePadded;
15
16use crate::{unprotected, Atomic, Guard, Owned, Shared};
17
18// The representation here is a singly-linked list, with a sentinel node at the front. In general
19// the `tail` pointer may lag behind the actual tail. Non-sentinel nodes are either all `Data` or
20// all `Blocked` (requests for data from blocked threads).
21#[derive(Debug)]
22pub(crate) struct Queue<T> {
23 head: CachePadded<Atomic<Node<T>>>,
24 tail: CachePadded<Atomic<Node<T>>>,
25}
26
27struct Node<T> {
28 /// The slot in which a value of type `T` can be stored.
29 ///
30 /// The type of `data` is `MaybeUninit<T>` because a `Node<T>` doesn't always contain a `T`.
31 /// For example, the sentinel node in a queue never contains a value: its slot is always empty.
32 /// Other nodes start their life with a push operation and contain a value until it gets popped
33 /// out. After that such empty nodes get added to the collector for destruction.
34 data: MaybeUninit<T>,
35
36 next: Atomic<Node<T>>,
37}
38
39// Any particular `T` should never be accessed concurrently, so no need for `Sync`.
40unsafe impl<T: Send> Sync for Queue<T> {}
41unsafe impl<T: Send> Send for Queue<T> {}
42
43impl<T> Queue<T> {
44 /// Create a new, empty queue.
45 pub(crate) fn new() -> Queue<T> {
46 let q = Queue {
47 head: CachePadded::new(Atomic::null()),
48 tail: CachePadded::new(Atomic::null()),
49 };
50 let sentinel = Owned::new(Node {
51 data: MaybeUninit::uninit(),
52 next: Atomic::null(),
53 });
54 unsafe {
55 let guard = unprotected();
56 let sentinel = sentinel.into_shared(guard);
57 q.head.store(sentinel, Relaxed);
58 q.tail.store(sentinel, Relaxed);
59 q
60 }
61 }
62
63 /// Attempts to atomically place `n` into the `next` pointer of `onto`, and returns `true` on
64 /// success. The queue's `tail` pointer may be updated.
65 #[inline(always)]
66 fn push_internal(
67 &self,
68 onto: Shared<'_, Node<T>>,
69 new: Shared<'_, Node<T>>,
70 guard: &Guard,
71 ) -> bool {
72 // is `onto` the actual tail?
73 let o = unsafe { onto.deref() };
74 let next = o.next.load(Acquire, guard);
75 if unsafe { next.as_ref().is_some() } {
76 // if not, try to "help" by moving the tail pointer forward
77 let _ = self
78 .tail
79 .compare_exchange(onto, next, Release, Relaxed, guard);
80 false
81 } else {
82 // looks like the actual tail; attempt to link in `n`
83 let result = o
84 .next
85 .compare_exchange(Shared::null(), new, Release, Relaxed, guard)
86 .is_ok();
87 if result {
88 // try to move the tail pointer forward
89 let _ = self
90 .tail
91 .compare_exchange(onto, new, Release, Relaxed, guard);
92 }
93 result
94 }
95 }
96
97 /// Adds `t` to the back of the queue, possibly waking up threads blocked on `pop`.
98 pub(crate) fn push(&self, t: T, guard: &Guard) {
99 let new = Owned::new(Node {
100 data: MaybeUninit::new(t),
101 next: Atomic::null(),
102 });
103 let new = Owned::into_shared(new, guard);
104
105 loop {
106 // We push onto the tail, so we'll start optimistically by looking there first.
107 let tail = self.tail.load(Acquire, guard);
108
109 // Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed.
110 if self.push_internal(tail, new, guard) {
111 break;
112 }
113 }
114 }
115
116 /// Attempts to pop a data node. `Ok(None)` if queue is empty; `Err(())` if lost race to pop.
117 #[inline(always)]
118 fn pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()> {
119 let head = self.head.load(Acquire, guard);
120 let h = unsafe { head.deref() };
121 let next = h.next.load(Acquire, guard);
122 match unsafe { next.as_ref() } {
123 Some(n) => unsafe {
124 self.head
125 .compare_exchange(head, next, Release, Relaxed, guard)
126 .map(|_| {
127 let tail = self.tail.load(Relaxed, guard);
128 // Advance the tail so that we don't retire a pointer to a reachable node.
129 if head == tail {
130 let _ = self
131 .tail
132 .compare_exchange(tail, next, Release, Relaxed, guard);
133 }
134 guard.defer_destroy(head);
135 Some(n.data.assume_init_read())
136 })
137 .map_err(|_| ())
138 },
139 None => Ok(None),
140 }
141 }
142
143 /// Attempts to pop a data node, if the data satisfies the given condition. `Ok(None)` if queue
144 /// is empty or the data does not satisfy the condition; `Err(())` if lost race to pop.
145 #[inline(always)]
146 fn pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()>
147 where
148 T: Sync,
149 F: Fn(&T) -> bool,
150 {
151 let head = self.head.load(Acquire, guard);
152 let h = unsafe { head.deref() };
153 let next = h.next.load(Acquire, guard);
154 match unsafe { next.as_ref() } {
155 Some(n) if condition(unsafe { &*n.data.as_ptr() }) => unsafe {
156 self.head
157 .compare_exchange(head, next, Release, Relaxed, guard)
158 .map(|_| {
159 let tail = self.tail.load(Relaxed, guard);
160 // Advance the tail so that we don't retire a pointer to a reachable node.
161 if head == tail {
162 let _ = self
163 .tail
164 .compare_exchange(tail, next, Release, Relaxed, guard);
165 }
166 guard.defer_destroy(head);
167 Some(n.data.assume_init_read())
168 })
169 .map_err(|_| ())
170 },
171 None | Some(_) => Ok(None),
172 }
173 }
174
175 /// Attempts to dequeue from the front.
176 ///
177 /// Returns `None` if the queue is observed to be empty.
178 pub(crate) fn try_pop(&self, guard: &Guard) -> Option<T> {
179 loop {
180 if let Ok(head) = self.pop_internal(guard) {
181 return head;
182 }
183 }
184 }
185
186 /// Attempts to dequeue from the front, if the item satisfies the given condition.
187 ///
188 /// Returns `None` if the queue is observed to be empty, or the head does not satisfy the given
189 /// condition.
190 pub(crate) fn try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T>
191 where
192 T: Sync,
193 F: Fn(&T) -> bool,
194 {
195 loop {
196 if let Ok(head) = self.pop_if_internal(&condition, guard) {
197 return head;
198 }
199 }
200 }
201}
202
203impl<T> Drop for Queue<T> {
204 fn drop(&mut self) {
205 unsafe {
206 let guard: &Guard = unprotected();
207
208 while self.try_pop(guard).is_some() {}
209
210 // Destroy the remaining sentinel node.
211 let sentinel: Shared<'_, Node> = self.head.load(ord:Relaxed, guard);
212 drop(sentinel.into_owned());
213 }
214 }
215}
216
217#[cfg(all(test, not(crossbeam_loom)))]
218mod test {
219 use super::*;
220 use crate::pin;
221 use crossbeam_utils::thread;
222
223 struct Queue<T> {
224 queue: super::Queue<T>,
225 }
226
227 impl<T> Queue<T> {
228 pub(crate) fn new() -> Queue<T> {
229 Queue {
230 queue: super::Queue::new(),
231 }
232 }
233
234 pub(crate) fn push(&self, t: T) {
235 let guard = &pin();
236 self.queue.push(t, guard);
237 }
238
239 pub(crate) fn is_empty(&self) -> bool {
240 let guard = &pin();
241 let head = self.queue.head.load(Acquire, guard);
242 let h = unsafe { head.deref() };
243 h.next.load(Acquire, guard).is_null()
244 }
245
246 pub(crate) fn try_pop(&self) -> Option<T> {
247 let guard = &pin();
248 self.queue.try_pop(guard)
249 }
250
251 pub(crate) fn pop(&self) -> T {
252 loop {
253 match self.try_pop() {
254 None => continue,
255 Some(t) => return t,
256 }
257 }
258 }
259 }
260
261 #[cfg(miri)]
262 const CONC_COUNT: i64 = 1000;
263 #[cfg(not(miri))]
264 const CONC_COUNT: i64 = 1000000;
265
266 #[test]
267 fn push_try_pop_1() {
268 let q: Queue<i64> = Queue::new();
269 assert!(q.is_empty());
270 q.push(37);
271 assert!(!q.is_empty());
272 assert_eq!(q.try_pop(), Some(37));
273 assert!(q.is_empty());
274 }
275
276 #[test]
277 fn push_try_pop_2() {
278 let q: Queue<i64> = Queue::new();
279 assert!(q.is_empty());
280 q.push(37);
281 q.push(48);
282 assert_eq!(q.try_pop(), Some(37));
283 assert!(!q.is_empty());
284 assert_eq!(q.try_pop(), Some(48));
285 assert!(q.is_empty());
286 }
287
288 #[test]
289 fn push_try_pop_many_seq() {
290 let q: Queue<i64> = Queue::new();
291 assert!(q.is_empty());
292 for i in 0..200 {
293 q.push(i)
294 }
295 assert!(!q.is_empty());
296 for i in 0..200 {
297 assert_eq!(q.try_pop(), Some(i));
298 }
299 assert!(q.is_empty());
300 }
301
302 #[test]
303 fn push_pop_1() {
304 let q: Queue<i64> = Queue::new();
305 assert!(q.is_empty());
306 q.push(37);
307 assert!(!q.is_empty());
308 assert_eq!(q.pop(), 37);
309 assert!(q.is_empty());
310 }
311
312 #[test]
313 fn push_pop_2() {
314 let q: Queue<i64> = Queue::new();
315 q.push(37);
316 q.push(48);
317 assert_eq!(q.pop(), 37);
318 assert_eq!(q.pop(), 48);
319 }
320
321 #[test]
322 fn push_pop_many_seq() {
323 let q: Queue<i64> = Queue::new();
324 assert!(q.is_empty());
325 for i in 0..200 {
326 q.push(i)
327 }
328 assert!(!q.is_empty());
329 for i in 0..200 {
330 assert_eq!(q.pop(), i);
331 }
332 assert!(q.is_empty());
333 }
334
335 #[test]
336 fn push_try_pop_many_spsc() {
337 let q: Queue<i64> = Queue::new();
338 assert!(q.is_empty());
339
340 thread::scope(|scope| {
341 scope.spawn(|_| {
342 let mut next = 0;
343
344 while next < CONC_COUNT {
345 if let Some(elem) = q.try_pop() {
346 assert_eq!(elem, next);
347 next += 1;
348 }
349 }
350 });
351
352 for i in 0..CONC_COUNT {
353 q.push(i)
354 }
355 })
356 .unwrap();
357 }
358
359 #[test]
360 fn push_try_pop_many_spmc() {
361 fn recv(_t: i32, q: &Queue<i64>) {
362 let mut cur = -1;
363 for _i in 0..CONC_COUNT {
364 if let Some(elem) = q.try_pop() {
365 assert!(elem > cur);
366 cur = elem;
367
368 if cur == CONC_COUNT - 1 {
369 break;
370 }
371 }
372 }
373 }
374
375 let q: Queue<i64> = Queue::new();
376 assert!(q.is_empty());
377 thread::scope(|scope| {
378 for i in 0..3 {
379 let q = &q;
380 scope.spawn(move |_| recv(i, q));
381 }
382
383 scope.spawn(|_| {
384 for i in 0..CONC_COUNT {
385 q.push(i);
386 }
387 });
388 })
389 .unwrap();
390 }
391
392 #[test]
393 fn push_try_pop_many_mpmc() {
394 enum LR {
395 Left(i64),
396 Right(i64),
397 }
398
399 let q: Queue<LR> = Queue::new();
400 assert!(q.is_empty());
401
402 thread::scope(|scope| {
403 for _t in 0..2 {
404 scope.spawn(|_| {
405 for i in CONC_COUNT - 1..CONC_COUNT {
406 q.push(LR::Left(i))
407 }
408 });
409 scope.spawn(|_| {
410 for i in CONC_COUNT - 1..CONC_COUNT {
411 q.push(LR::Right(i))
412 }
413 });
414 scope.spawn(|_| {
415 let mut vl = vec![];
416 let mut vr = vec![];
417 for _i in 0..CONC_COUNT {
418 match q.try_pop() {
419 Some(LR::Left(x)) => vl.push(x),
420 Some(LR::Right(x)) => vr.push(x),
421 _ => {}
422 }
423 }
424
425 let mut vl2 = vl.clone();
426 let mut vr2 = vr.clone();
427 vl2.sort_unstable();
428 vr2.sort_unstable();
429
430 assert_eq!(vl, vl2);
431 assert_eq!(vr, vr2);
432 });
433 }
434 })
435 .unwrap();
436 }
437
438 #[test]
439 fn push_pop_many_spsc() {
440 let q: Queue<i64> = Queue::new();
441
442 thread::scope(|scope| {
443 scope.spawn(|_| {
444 let mut next = 0;
445 while next < CONC_COUNT {
446 assert_eq!(q.pop(), next);
447 next += 1;
448 }
449 });
450
451 for i in 0..CONC_COUNT {
452 q.push(i)
453 }
454 })
455 .unwrap();
456 assert!(q.is_empty());
457 }
458
459 #[test]
460 fn is_empty_dont_pop() {
461 let q: Queue<i64> = Queue::new();
462 q.push(20);
463 q.push(20);
464 assert!(!q.is_empty());
465 assert!(!q.is_empty());
466 assert!(q.try_pop().is_some());
467 }
468}
469