1//! Michael-Scott lock-free queue.
2//!
3//! Usable with any number of producers and consumers.
4//!
5//! Michael and Scott. Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue
6//! Algorithms. PODC 1996. <http://dl.acm.org/citation.cfm?id=248106>
7//!
8//! Simon Doherty, Lindsay Groves, Victor Luchangco, and Mark Moir. 2004b. Formal Verification of a
9//! Practical Lock-Free Queue Algorithm. <https://doi.org/10.1007/978-3-540-30232-2_7>
10
11use core::mem::MaybeUninit;
12use core::sync::atomic::Ordering::{Acquire, Relaxed, Release};
13
14use crossbeam_utils::CachePadded;
15
16use crate::{unprotected, Atomic, Guard, Owned, Shared};
17
18// The representation here is a singly-linked list, with a sentinel node at the front. In general
19// the `tail` pointer may lag behind the actual tail. Non-sentinel nodes are either all `Data` or
20// all `Blocked` (requests for data from blocked threads).
21#[derive(Debug)]
22pub(crate) struct Queue<T> {
23 head: CachePadded<Atomic<Node<T>>>,
24 tail: CachePadded<Atomic<Node<T>>>,
25}
26
27struct Node<T> {
28 /// The slot in which a value of type `T` can be stored.
29 ///
30 /// The type of `data` is `MaybeUninit<T>` because a `Node<T>` doesn't always contain a `T`.
31 /// For example, the sentinel node in a queue never contains a value: its slot is always empty.
32 /// Other nodes start their life with a push operation and contain a value until it gets popped
33 /// out. After that such empty nodes get added to the collector for destruction.
34 data: MaybeUninit<T>,
35
36 next: Atomic<Node<T>>,
37}
38
39// Any particular `T` should never be accessed concurrently, so no need for `Sync`.
40unsafe impl<T: Send> Sync for Queue<T> {}
41unsafe impl<T: Send> Send for Queue<T> {}
42
43impl<T> Queue<T> {
44 /// Create a new, empty queue.
45 pub(crate) fn new() -> Queue<T> {
46 let q = Queue {
47 head: CachePadded::new(Atomic::null()),
48 tail: CachePadded::new(Atomic::null()),
49 };
50 let sentinel = Owned::new(Node {
51 data: MaybeUninit::uninit(),
52 next: Atomic::null(),
53 });
54 unsafe {
55 let guard = unprotected();
56 let sentinel = sentinel.into_shared(guard);
57 q.head.store(sentinel, Relaxed);
58 q.tail.store(sentinel, Relaxed);
59 q
60 }
61 }
62
63 /// Attempts to atomically place `n` into the `next` pointer of `onto`, and returns `true` on
64 /// success. The queue's `tail` pointer may be updated.
65 #[inline(always)]
66 fn push_internal(
67 &self,
68 onto: Shared<'_, Node<T>>,
69 new: Shared<'_, Node<T>>,
70 guard: &Guard,
71 ) -> bool {
72 // is `onto` the actual tail?
73 let o = unsafe { onto.deref() };
74 let next = o.next.load(Acquire, guard);
75 if unsafe { next.as_ref().is_some() } {
76 // if not, try to "help" by moving the tail pointer forward
77 let _ = self
78 .tail
79 .compare_exchange(onto, next, Release, Relaxed, guard);
80 false
81 } else {
82 // looks like the actual tail; attempt to link in `n`
83 let result = o
84 .next
85 .compare_exchange(Shared::null(), new, Release, Relaxed, guard)
86 .is_ok();
87 if result {
88 // try to move the tail pointer forward
89 let _ = self
90 .tail
91 .compare_exchange(onto, new, Release, Relaxed, guard);
92 }
93 result
94 }
95 }
96
97 /// Adds `t` to the back of the queue, possibly waking up threads blocked on `pop`.
98 pub(crate) fn push(&self, t: T, guard: &Guard) {
99 let new = Owned::new(Node {
100 data: MaybeUninit::new(t),
101 next: Atomic::null(),
102 });
103 let new = Owned::into_shared(new, guard);
104
105 loop {
106 // We push onto the tail, so we'll start optimistically by looking there first.
107 let tail = self.tail.load(Acquire, guard);
108
109 // Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed.
110 if self.push_internal(tail, new, guard) {
111 break;
112 }
113 }
114 }
115
116 /// Attempts to pop a data node. `Ok(None)` if queue is empty; `Err(())` if lost race to pop.
117 #[inline(always)]
118 fn pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()> {
119 let head = self.head.load(Acquire, guard);
120 let h = unsafe { head.deref() };
121 let next = h.next.load(Acquire, guard);
122 match unsafe { next.as_ref() } {
123 Some(n) => unsafe {
124 self.head
125 .compare_exchange(head, next, Release, Relaxed, guard)
126 .map(|_| {
127 let tail = self.tail.load(Relaxed, guard);
128 // Advance the tail so that we don't retire a pointer to a reachable node.
129 if head == tail {
130 let _ = self
131 .tail
132 .compare_exchange(tail, next, Release, Relaxed, guard);
133 }
134 guard.defer_destroy(head);
135 // TODO: Replace with MaybeUninit::read when api is stable
136 Some(n.data.as_ptr().read())
137 })
138 .map_err(|_| ())
139 },
140 None => Ok(None),
141 }
142 }
143
144 /// Attempts to pop a data node, if the data satisfies the given condition. `Ok(None)` if queue
145 /// is empty or the data does not satisfy the condition; `Err(())` if lost race to pop.
146 #[inline(always)]
147 fn pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()>
148 where
149 T: Sync,
150 F: Fn(&T) -> bool,
151 {
152 let head = self.head.load(Acquire, guard);
153 let h = unsafe { head.deref() };
154 let next = h.next.load(Acquire, guard);
155 match unsafe { next.as_ref() } {
156 Some(n) if condition(unsafe { &*n.data.as_ptr() }) => unsafe {
157 self.head
158 .compare_exchange(head, next, Release, Relaxed, guard)
159 .map(|_| {
160 let tail = self.tail.load(Relaxed, guard);
161 // Advance the tail so that we don't retire a pointer to a reachable node.
162 if head == tail {
163 let _ = self
164 .tail
165 .compare_exchange(tail, next, Release, Relaxed, guard);
166 }
167 guard.defer_destroy(head);
168 Some(n.data.as_ptr().read())
169 })
170 .map_err(|_| ())
171 },
172 None | Some(_) => Ok(None),
173 }
174 }
175
176 /// Attempts to dequeue from the front.
177 ///
178 /// Returns `None` if the queue is observed to be empty.
179 pub(crate) fn try_pop(&self, guard: &Guard) -> Option<T> {
180 loop {
181 if let Ok(head) = self.pop_internal(guard) {
182 return head;
183 }
184 }
185 }
186
187 /// Attempts to dequeue from the front, if the item satisfies the given condition.
188 ///
189 /// Returns `None` if the queue is observed to be empty, or the head does not satisfy the given
190 /// condition.
191 pub(crate) fn try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T>
192 where
193 T: Sync,
194 F: Fn(&T) -> bool,
195 {
196 loop {
197 if let Ok(head) = self.pop_if_internal(&condition, guard) {
198 return head;
199 }
200 }
201 }
202}
203
204impl<T> Drop for Queue<T> {
205 fn drop(&mut self) {
206 unsafe {
207 let guard: &Guard = unprotected();
208
209 while self.try_pop(guard).is_some() {}
210
211 // Destroy the remaining sentinel node.
212 let sentinel: Shared<'_, Node> = self.head.load(ord:Relaxed, guard);
213 drop(sentinel.into_owned());
214 }
215 }
216}
217
218#[cfg(all(test, not(crossbeam_loom)))]
219mod test {
220 use super::*;
221 use crate::pin;
222 use crossbeam_utils::thread;
223
224 struct Queue<T> {
225 queue: super::Queue<T>,
226 }
227
228 impl<T> Queue<T> {
229 pub(crate) fn new() -> Queue<T> {
230 Queue {
231 queue: super::Queue::new(),
232 }
233 }
234
235 pub(crate) fn push(&self, t: T) {
236 let guard = &pin();
237 self.queue.push(t, guard);
238 }
239
240 pub(crate) fn is_empty(&self) -> bool {
241 let guard = &pin();
242 let head = self.queue.head.load(Acquire, guard);
243 let h = unsafe { head.deref() };
244 h.next.load(Acquire, guard).is_null()
245 }
246
247 pub(crate) fn try_pop(&self) -> Option<T> {
248 let guard = &pin();
249 self.queue.try_pop(guard)
250 }
251
252 pub(crate) fn pop(&self) -> T {
253 loop {
254 match self.try_pop() {
255 None => continue,
256 Some(t) => return t,
257 }
258 }
259 }
260 }
261
262 #[cfg(miri)]
263 const CONC_COUNT: i64 = 1000;
264 #[cfg(not(miri))]
265 const CONC_COUNT: i64 = 1000000;
266
267 #[test]
268 fn push_try_pop_1() {
269 let q: Queue<i64> = Queue::new();
270 assert!(q.is_empty());
271 q.push(37);
272 assert!(!q.is_empty());
273 assert_eq!(q.try_pop(), Some(37));
274 assert!(q.is_empty());
275 }
276
277 #[test]
278 fn push_try_pop_2() {
279 let q: Queue<i64> = Queue::new();
280 assert!(q.is_empty());
281 q.push(37);
282 q.push(48);
283 assert_eq!(q.try_pop(), Some(37));
284 assert!(!q.is_empty());
285 assert_eq!(q.try_pop(), Some(48));
286 assert!(q.is_empty());
287 }
288
289 #[test]
290 fn push_try_pop_many_seq() {
291 let q: Queue<i64> = Queue::new();
292 assert!(q.is_empty());
293 for i in 0..200 {
294 q.push(i)
295 }
296 assert!(!q.is_empty());
297 for i in 0..200 {
298 assert_eq!(q.try_pop(), Some(i));
299 }
300 assert!(q.is_empty());
301 }
302
303 #[test]
304 fn push_pop_1() {
305 let q: Queue<i64> = Queue::new();
306 assert!(q.is_empty());
307 q.push(37);
308 assert!(!q.is_empty());
309 assert_eq!(q.pop(), 37);
310 assert!(q.is_empty());
311 }
312
313 #[test]
314 fn push_pop_2() {
315 let q: Queue<i64> = Queue::new();
316 q.push(37);
317 q.push(48);
318 assert_eq!(q.pop(), 37);
319 assert_eq!(q.pop(), 48);
320 }
321
322 #[test]
323 fn push_pop_many_seq() {
324 let q: Queue<i64> = Queue::new();
325 assert!(q.is_empty());
326 for i in 0..200 {
327 q.push(i)
328 }
329 assert!(!q.is_empty());
330 for i in 0..200 {
331 assert_eq!(q.pop(), i);
332 }
333 assert!(q.is_empty());
334 }
335
336 #[test]
337 fn push_try_pop_many_spsc() {
338 let q: Queue<i64> = Queue::new();
339 assert!(q.is_empty());
340
341 thread::scope(|scope| {
342 scope.spawn(|_| {
343 let mut next = 0;
344
345 while next < CONC_COUNT {
346 if let Some(elem) = q.try_pop() {
347 assert_eq!(elem, next);
348 next += 1;
349 }
350 }
351 });
352
353 for i in 0..CONC_COUNT {
354 q.push(i)
355 }
356 })
357 .unwrap();
358 }
359
360 #[test]
361 fn push_try_pop_many_spmc() {
362 fn recv(_t: i32, q: &Queue<i64>) {
363 let mut cur = -1;
364 for _i in 0..CONC_COUNT {
365 if let Some(elem) = q.try_pop() {
366 assert!(elem > cur);
367 cur = elem;
368
369 if cur == CONC_COUNT - 1 {
370 break;
371 }
372 }
373 }
374 }
375
376 let q: Queue<i64> = Queue::new();
377 assert!(q.is_empty());
378 thread::scope(|scope| {
379 for i in 0..3 {
380 let q = &q;
381 scope.spawn(move |_| recv(i, q));
382 }
383
384 scope.spawn(|_| {
385 for i in 0..CONC_COUNT {
386 q.push(i);
387 }
388 });
389 })
390 .unwrap();
391 }
392
393 #[test]
394 fn push_try_pop_many_mpmc() {
395 enum LR {
396 Left(i64),
397 Right(i64),
398 }
399
400 let q: Queue<LR> = Queue::new();
401 assert!(q.is_empty());
402
403 thread::scope(|scope| {
404 for _t in 0..2 {
405 scope.spawn(|_| {
406 for i in CONC_COUNT - 1..CONC_COUNT {
407 q.push(LR::Left(i))
408 }
409 });
410 scope.spawn(|_| {
411 for i in CONC_COUNT - 1..CONC_COUNT {
412 q.push(LR::Right(i))
413 }
414 });
415 scope.spawn(|_| {
416 let mut vl = vec![];
417 let mut vr = vec![];
418 for _i in 0..CONC_COUNT {
419 match q.try_pop() {
420 Some(LR::Left(x)) => vl.push(x),
421 Some(LR::Right(x)) => vr.push(x),
422 _ => {}
423 }
424 }
425
426 let mut vl2 = vl.clone();
427 let mut vr2 = vr.clone();
428 vl2.sort_unstable();
429 vr2.sort_unstable();
430
431 assert_eq!(vl, vl2);
432 assert_eq!(vr, vr2);
433 });
434 }
435 })
436 .unwrap();
437 }
438
439 #[test]
440 fn push_pop_many_spsc() {
441 let q: Queue<i64> = Queue::new();
442
443 thread::scope(|scope| {
444 scope.spawn(|_| {
445 let mut next = 0;
446 while next < CONC_COUNT {
447 assert_eq!(q.pop(), next);
448 next += 1;
449 }
450 });
451
452 for i in 0..CONC_COUNT {
453 q.push(i)
454 }
455 })
456 .unwrap();
457 assert!(q.is_empty());
458 }
459
460 #[test]
461 fn is_empty_dont_pop() {
462 let q: Queue<i64> = Queue::new();
463 q.push(20);
464 q.push(20);
465 assert!(!q.is_empty());
466 assert!(!q.is_empty());
467 assert!(q.try_pop().is_some());
468 }
469}
470