1use std::cell::{Cell, UnsafeCell};
2use std::cmp;
3use std::fmt;
4use std::marker::PhantomData;
5use std::mem::{self, MaybeUninit};
6use std::ptr;
7use std::slice;
8use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
9use std::sync::Arc;
10
11use crossbeam_epoch::{self as epoch, Atomic, Owned};
12use crossbeam_utils::{Backoff, CachePadded};
13
14// Minimum buffer capacity.
15const MIN_CAP: usize = 64;
16// Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`.
17const MAX_BATCH: usize = 32;
18// If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets
19// deallocated as soon as possible.
20const FLUSH_THRESHOLD_BYTES: usize = 1 << 10;
21
22/// A buffer that holds tasks in a worker queue.
23///
24/// This is just a pointer to the buffer and its length - dropping an instance of this struct will
25/// *not* deallocate the buffer.
26struct Buffer<T> {
27 /// Pointer to the allocated memory.
28 ptr: *mut T,
29
30 /// Capacity of the buffer. Always a power of two.
31 cap: usize,
32}
33
34unsafe impl<T> Send for Buffer<T> {}
35
36impl<T> Buffer<T> {
37 /// Allocates a new buffer with the specified capacity.
38 fn alloc(cap: usize) -> Buffer<T> {
39 debug_assert_eq!(cap, cap.next_power_of_two());
40
41 let ptr = Box::into_raw(
42 (0..cap)
43 .map(|_| MaybeUninit::<T>::uninit())
44 .collect::<Box<[_]>>(),
45 )
46 .cast::<T>();
47
48 Buffer { ptr, cap }
49 }
50
51 /// Deallocates the buffer.
52 unsafe fn dealloc(self) {
53 drop(Box::from_raw(slice::from_raw_parts_mut(
54 self.ptr.cast::<MaybeUninit<T>>(),
55 self.cap,
56 )));
57 }
58
59 /// Returns a pointer to the task at the specified `index`.
60 unsafe fn at(&self, index: isize) -> *mut T {
61 // `self.cap` is always a power of two.
62 // We do all the loads at `MaybeUninit` because we might realize, after loading, that we
63 // don't actually have the right to access this memory.
64 self.ptr.offset(index & (self.cap - 1) as isize)
65 }
66
67 /// Writes `task` into the specified `index`.
68 ///
69 /// This method might be concurrently called with another `read` at the same index, which is
70 /// technically speaking a data race and therefore UB. We should use an atomic store here, but
71 /// that would be more expensive and difficult to implement generically for all types `T`.
72 /// Hence, as a hack, we use a volatile write instead.
73 unsafe fn write(&self, index: isize, task: MaybeUninit<T>) {
74 ptr::write_volatile(self.at(index).cast::<MaybeUninit<T>>(), task)
75 }
76
77 /// Reads a task from the specified `index`.
78 ///
79 /// This method might be concurrently called with another `write` at the same index, which is
80 /// technically speaking a data race and therefore UB. We should use an atomic load here, but
81 /// that would be more expensive and difficult to implement generically for all types `T`.
82 /// Hence, as a hack, we use a volatile load instead.
83 unsafe fn read(&self, index: isize) -> MaybeUninit<T> {
84 ptr::read_volatile(self.at(index).cast::<MaybeUninit<T>>())
85 }
86}
87
88impl<T> Clone for Buffer<T> {
89 fn clone(&self) -> Buffer<T> {
90 *self
91 }
92}
93
94impl<T> Copy for Buffer<T> {}
95
96/// Internal queue data shared between the worker and stealers.
97///
98/// The implementation is based on the following work:
99///
100/// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev]
101/// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models.
102/// PPoPP 2013.][weak-mem]
103/// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++
104/// atomics. OOPSLA 2013.][checker]
105///
106/// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974
107/// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524
108/// [checker]: https://dl.acm.org/citation.cfm?id=2509514
109struct Inner<T> {
110 /// The front index.
111 front: AtomicIsize,
112
113 /// The back index.
114 back: AtomicIsize,
115
116 /// The underlying buffer.
117 buffer: CachePadded<Atomic<Buffer<T>>>,
118}
119
120impl<T> Drop for Inner<T> {
121 fn drop(&mut self) {
122 // Load the back index, front index, and buffer.
123 let b: isize = *self.back.get_mut();
124 let f: isize = *self.front.get_mut();
125
126 unsafe {
127 let buffer: Shared<'_, Buffer> = self.buffer.load(ord:Ordering::Relaxed, epoch::unprotected());
128
129 // Go through the buffer from front to back and drop all tasks in the queue.
130 let mut i: isize = f;
131 while i != b {
132 buffer.deref().at(index:i).drop_in_place();
133 i = i.wrapping_add(1);
134 }
135
136 // Free the memory allocated by the buffer.
137 buffer.into_owned().into_box().dealloc();
138 }
139 }
140}
141
142/// Worker queue flavor: FIFO or LIFO.
143#[derive(Clone, Copy, Debug, Eq, PartialEq)]
144enum Flavor {
145 /// The first-in first-out flavor.
146 Fifo,
147
148 /// The last-in first-out flavor.
149 Lifo,
150}
151
152/// A worker queue.
153///
154/// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal
155/// tasks from it. Task schedulers typically create a single worker queue per thread.
156///
157/// # Examples
158///
159/// A FIFO worker:
160///
161/// ```
162/// use crossbeam_deque::{Steal, Worker};
163///
164/// let w = Worker::new_fifo();
165/// let s = w.stealer();
166///
167/// w.push(1);
168/// w.push(2);
169/// w.push(3);
170///
171/// assert_eq!(s.steal(), Steal::Success(1));
172/// assert_eq!(w.pop(), Some(2));
173/// assert_eq!(w.pop(), Some(3));
174/// ```
175///
176/// A LIFO worker:
177///
178/// ```
179/// use crossbeam_deque::{Steal, Worker};
180///
181/// let w = Worker::new_lifo();
182/// let s = w.stealer();
183///
184/// w.push(1);
185/// w.push(2);
186/// w.push(3);
187///
188/// assert_eq!(s.steal(), Steal::Success(1));
189/// assert_eq!(w.pop(), Some(3));
190/// assert_eq!(w.pop(), Some(2));
191/// ```
192pub struct Worker<T> {
193 /// A reference to the inner representation of the queue.
194 inner: Arc<CachePadded<Inner<T>>>,
195
196 /// A copy of `inner.buffer` for quick access.
197 buffer: Cell<Buffer<T>>,
198
199 /// The flavor of the queue.
200 flavor: Flavor,
201
202 /// Indicates that the worker cannot be shared among threads.
203 _marker: PhantomData<*mut ()>, // !Send + !Sync
204}
205
206unsafe impl<T: Send> Send for Worker<T> {}
207
208impl<T> Worker<T> {
209 /// Creates a FIFO worker queue.
210 ///
211 /// Tasks are pushed and popped from opposite ends.
212 ///
213 /// # Examples
214 ///
215 /// ```
216 /// use crossbeam_deque::Worker;
217 ///
218 /// let w = Worker::<i32>::new_fifo();
219 /// ```
220 pub fn new_fifo() -> Worker<T> {
221 let buffer = Buffer::alloc(MIN_CAP);
222
223 let inner = Arc::new(CachePadded::new(Inner {
224 front: AtomicIsize::new(0),
225 back: AtomicIsize::new(0),
226 buffer: CachePadded::new(Atomic::new(buffer)),
227 }));
228
229 Worker {
230 inner,
231 buffer: Cell::new(buffer),
232 flavor: Flavor::Fifo,
233 _marker: PhantomData,
234 }
235 }
236
237 /// Creates a LIFO worker queue.
238 ///
239 /// Tasks are pushed and popped from the same end.
240 ///
241 /// # Examples
242 ///
243 /// ```
244 /// use crossbeam_deque::Worker;
245 ///
246 /// let w = Worker::<i32>::new_lifo();
247 /// ```
248 pub fn new_lifo() -> Worker<T> {
249 let buffer = Buffer::alloc(MIN_CAP);
250
251 let inner = Arc::new(CachePadded::new(Inner {
252 front: AtomicIsize::new(0),
253 back: AtomicIsize::new(0),
254 buffer: CachePadded::new(Atomic::new(buffer)),
255 }));
256
257 Worker {
258 inner,
259 buffer: Cell::new(buffer),
260 flavor: Flavor::Lifo,
261 _marker: PhantomData,
262 }
263 }
264
265 /// Creates a stealer for this queue.
266 ///
267 /// The returned stealer can be shared among threads and cloned.
268 ///
269 /// # Examples
270 ///
271 /// ```
272 /// use crossbeam_deque::Worker;
273 ///
274 /// let w = Worker::<i32>::new_lifo();
275 /// let s = w.stealer();
276 /// ```
277 pub fn stealer(&self) -> Stealer<T> {
278 Stealer {
279 inner: self.inner.clone(),
280 flavor: self.flavor,
281 }
282 }
283
284 /// Resizes the internal buffer to the new capacity of `new_cap`.
285 #[cold]
286 unsafe fn resize(&self, new_cap: usize) {
287 // Load the back index, front index, and buffer.
288 let b = self.inner.back.load(Ordering::Relaxed);
289 let f = self.inner.front.load(Ordering::Relaxed);
290 let buffer = self.buffer.get();
291
292 // Allocate a new buffer and copy data from the old buffer to the new one.
293 let new = Buffer::alloc(new_cap);
294 let mut i = f;
295 while i != b {
296 ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1);
297 i = i.wrapping_add(1);
298 }
299
300 let guard = &epoch::pin();
301
302 // Replace the old buffer with the new one.
303 self.buffer.replace(new);
304 let old =
305 self.inner
306 .buffer
307 .swap(Owned::new(new).into_shared(guard), Ordering::Release, guard);
308
309 // Destroy the old buffer later.
310 guard.defer_unchecked(move || old.into_owned().into_box().dealloc());
311
312 // If the buffer is very large, then flush the thread-local garbage in order to deallocate
313 // it as soon as possible.
314 if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES {
315 guard.flush();
316 }
317 }
318
319 /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the
320 /// buffer.
321 fn reserve(&self, reserve_cap: usize) {
322 if reserve_cap > 0 {
323 // Compute the current length.
324 let b = self.inner.back.load(Ordering::Relaxed);
325 let f = self.inner.front.load(Ordering::SeqCst);
326 let len = b.wrapping_sub(f) as usize;
327
328 // The current capacity.
329 let cap = self.buffer.get().cap;
330
331 // Is there enough capacity to push `reserve_cap` tasks?
332 if cap - len < reserve_cap {
333 // Keep doubling the capacity as much as is needed.
334 let mut new_cap = cap * 2;
335 while new_cap - len < reserve_cap {
336 new_cap *= 2;
337 }
338
339 // Resize the buffer.
340 unsafe {
341 self.resize(new_cap);
342 }
343 }
344 }
345 }
346
347 /// Returns `true` if the queue is empty.
348 ///
349 /// ```
350 /// use crossbeam_deque::Worker;
351 ///
352 /// let w = Worker::new_lifo();
353 ///
354 /// assert!(w.is_empty());
355 /// w.push(1);
356 /// assert!(!w.is_empty());
357 /// ```
358 pub fn is_empty(&self) -> bool {
359 let b = self.inner.back.load(Ordering::Relaxed);
360 let f = self.inner.front.load(Ordering::SeqCst);
361 b.wrapping_sub(f) <= 0
362 }
363
364 /// Returns the number of tasks in the deque.
365 ///
366 /// ```
367 /// use crossbeam_deque::Worker;
368 ///
369 /// let w = Worker::new_lifo();
370 ///
371 /// assert_eq!(w.len(), 0);
372 /// w.push(1);
373 /// assert_eq!(w.len(), 1);
374 /// w.push(1);
375 /// assert_eq!(w.len(), 2);
376 /// ```
377 pub fn len(&self) -> usize {
378 let b = self.inner.back.load(Ordering::Relaxed);
379 let f = self.inner.front.load(Ordering::SeqCst);
380 b.wrapping_sub(f).max(0) as usize
381 }
382
383 /// Pushes a task into the queue.
384 ///
385 /// # Examples
386 ///
387 /// ```
388 /// use crossbeam_deque::Worker;
389 ///
390 /// let w = Worker::new_lifo();
391 /// w.push(1);
392 /// w.push(2);
393 /// ```
394 pub fn push(&self, task: T) {
395 // Load the back index, front index, and buffer.
396 let b = self.inner.back.load(Ordering::Relaxed);
397 let f = self.inner.front.load(Ordering::Acquire);
398 let mut buffer = self.buffer.get();
399
400 // Calculate the length of the queue.
401 let len = b.wrapping_sub(f);
402
403 // Is the queue full?
404 if len >= buffer.cap as isize {
405 // Yes. Grow the underlying buffer.
406 unsafe {
407 self.resize(2 * buffer.cap);
408 }
409 buffer = self.buffer.get();
410 }
411
412 // Write `task` into the slot.
413 unsafe {
414 buffer.write(b, MaybeUninit::new(task));
415 }
416
417 atomic::fence(Ordering::Release);
418
419 // Increment the back index.
420 //
421 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
422 // races because it doesn't understand fences.
423 self.inner.back.store(b.wrapping_add(1), Ordering::Release);
424 }
425
426 /// Pops a task from the queue.
427 ///
428 /// # Examples
429 ///
430 /// ```
431 /// use crossbeam_deque::Worker;
432 ///
433 /// let w = Worker::new_fifo();
434 /// w.push(1);
435 /// w.push(2);
436 ///
437 /// assert_eq!(w.pop(), Some(1));
438 /// assert_eq!(w.pop(), Some(2));
439 /// assert_eq!(w.pop(), None);
440 /// ```
441 pub fn pop(&self) -> Option<T> {
442 // Load the back and front index.
443 let b = self.inner.back.load(Ordering::Relaxed);
444 let f = self.inner.front.load(Ordering::Relaxed);
445
446 // Calculate the length of the queue.
447 let len = b.wrapping_sub(f);
448
449 // Is the queue empty?
450 if len <= 0 {
451 return None;
452 }
453
454 match self.flavor {
455 // Pop from the front of the queue.
456 Flavor::Fifo => {
457 // Try incrementing the front index to pop the task.
458 let f = self.inner.front.fetch_add(1, Ordering::SeqCst);
459 let new_f = f.wrapping_add(1);
460
461 if b.wrapping_sub(new_f) < 0 {
462 self.inner.front.store(f, Ordering::Relaxed);
463 return None;
464 }
465
466 unsafe {
467 // Read the popped task.
468 let buffer = self.buffer.get();
469 let task = buffer.read(f).assume_init();
470
471 // Shrink the buffer if `len - 1` is less than one fourth of the capacity.
472 if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 {
473 self.resize(buffer.cap / 2);
474 }
475
476 Some(task)
477 }
478 }
479
480 // Pop from the back of the queue.
481 Flavor::Lifo => {
482 // Decrement the back index.
483 let b = b.wrapping_sub(1);
484 self.inner.back.store(b, Ordering::Relaxed);
485
486 atomic::fence(Ordering::SeqCst);
487
488 // Load the front index.
489 let f = self.inner.front.load(Ordering::Relaxed);
490
491 // Compute the length after the back index was decremented.
492 let len = b.wrapping_sub(f);
493
494 if len < 0 {
495 // The queue is empty. Restore the back index to the original task.
496 self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
497 None
498 } else {
499 // Read the task to be popped.
500 let buffer = self.buffer.get();
501 let mut task = unsafe { Some(buffer.read(b)) };
502
503 // Are we popping the last task from the queue?
504 if len == 0 {
505 // Try incrementing the front index.
506 if self
507 .inner
508 .front
509 .compare_exchange(
510 f,
511 f.wrapping_add(1),
512 Ordering::SeqCst,
513 Ordering::Relaxed,
514 )
515 .is_err()
516 {
517 // Failed. We didn't pop anything. Reset to `None`.
518 task.take();
519 }
520
521 // Restore the back index to the original task.
522 self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
523 } else {
524 // Shrink the buffer if `len` is less than one fourth of the capacity.
525 if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 {
526 unsafe {
527 self.resize(buffer.cap / 2);
528 }
529 }
530 }
531
532 task.map(|t| unsafe { t.assume_init() })
533 }
534 }
535 }
536 }
537}
538
539impl<T> fmt::Debug for Worker<T> {
540 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
541 f.pad("Worker { .. }")
542 }
543}
544
545/// A stealer handle of a worker queue.
546///
547/// Stealers can be shared among threads.
548///
549/// Task schedulers typically have a single worker queue per worker thread.
550///
551/// # Examples
552///
553/// ```
554/// use crossbeam_deque::{Steal, Worker};
555///
556/// let w = Worker::new_lifo();
557/// w.push(1);
558/// w.push(2);
559///
560/// let s = w.stealer();
561/// assert_eq!(s.steal(), Steal::Success(1));
562/// assert_eq!(s.steal(), Steal::Success(2));
563/// assert_eq!(s.steal(), Steal::Empty);
564/// ```
565pub struct Stealer<T> {
566 /// A reference to the inner representation of the queue.
567 inner: Arc<CachePadded<Inner<T>>>,
568
569 /// The flavor of the queue.
570 flavor: Flavor,
571}
572
573unsafe impl<T: Send> Send for Stealer<T> {}
574unsafe impl<T: Send> Sync for Stealer<T> {}
575
576impl<T> Stealer<T> {
577 /// Returns `true` if the queue is empty.
578 ///
579 /// ```
580 /// use crossbeam_deque::Worker;
581 ///
582 /// let w = Worker::new_lifo();
583 /// let s = w.stealer();
584 ///
585 /// assert!(s.is_empty());
586 /// w.push(1);
587 /// assert!(!s.is_empty());
588 /// ```
589 pub fn is_empty(&self) -> bool {
590 let f = self.inner.front.load(Ordering::Acquire);
591 atomic::fence(Ordering::SeqCst);
592 let b = self.inner.back.load(Ordering::Acquire);
593 b.wrapping_sub(f) <= 0
594 }
595
596 /// Returns the number of tasks in the deque.
597 ///
598 /// ```
599 /// use crossbeam_deque::Worker;
600 ///
601 /// let w = Worker::new_lifo();
602 /// let s = w.stealer();
603 ///
604 /// assert_eq!(s.len(), 0);
605 /// w.push(1);
606 /// assert_eq!(s.len(), 1);
607 /// w.push(2);
608 /// assert_eq!(s.len(), 2);
609 /// ```
610 pub fn len(&self) -> usize {
611 let f = self.inner.front.load(Ordering::Acquire);
612 atomic::fence(Ordering::SeqCst);
613 let b = self.inner.back.load(Ordering::Acquire);
614 b.wrapping_sub(f).max(0) as usize
615 }
616
617 /// Steals a task from the queue.
618 ///
619 /// # Examples
620 ///
621 /// ```
622 /// use crossbeam_deque::{Steal, Worker};
623 ///
624 /// let w = Worker::new_lifo();
625 /// w.push(1);
626 /// w.push(2);
627 ///
628 /// let s = w.stealer();
629 /// assert_eq!(s.steal(), Steal::Success(1));
630 /// assert_eq!(s.steal(), Steal::Success(2));
631 /// ```
632 pub fn steal(&self) -> Steal<T> {
633 // Load the front index.
634 let f = self.inner.front.load(Ordering::Acquire);
635
636 // A SeqCst fence is needed here.
637 //
638 // If the current thread is already pinned (reentrantly), we must manually issue the
639 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
640 // have to.
641 if epoch::is_pinned() {
642 atomic::fence(Ordering::SeqCst);
643 }
644
645 let guard = &epoch::pin();
646
647 // Load the back index.
648 let b = self.inner.back.load(Ordering::Acquire);
649
650 // Is the queue empty?
651 if b.wrapping_sub(f) <= 0 {
652 return Steal::Empty;
653 }
654
655 // Load the buffer and read the task at the front.
656 let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
657 let task = unsafe { buffer.deref().read(f) };
658
659 // Try incrementing the front index to steal the task.
660 // If the buffer has been swapped or the increment fails, we retry.
661 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
662 || self
663 .inner
664 .front
665 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
666 .is_err()
667 {
668 // We didn't steal this task, forget it.
669 return Steal::Retry;
670 }
671
672 // Return the stolen task.
673 Steal::Success(unsafe { task.assume_init() })
674 }
675
676 /// Steals a batch of tasks and pushes them into another worker.
677 ///
678 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
679 /// steal around half of the tasks in the queue, but also not more than some constant limit.
680 ///
681 /// # Examples
682 ///
683 /// ```
684 /// use crossbeam_deque::Worker;
685 ///
686 /// let w1 = Worker::new_fifo();
687 /// w1.push(1);
688 /// w1.push(2);
689 /// w1.push(3);
690 /// w1.push(4);
691 ///
692 /// let s = w1.stealer();
693 /// let w2 = Worker::new_fifo();
694 ///
695 /// let _ = s.steal_batch(&w2);
696 /// assert_eq!(w2.pop(), Some(1));
697 /// assert_eq!(w2.pop(), Some(2));
698 /// ```
699 pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
700 self.steal_batch_with_limit(dest, MAX_BATCH)
701 }
702
703 /// Steals no more than `limit` of tasks and pushes them into another worker.
704 ///
705 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
706 /// steal around half of the tasks in the queue, but also not more than the given limit.
707 ///
708 /// # Examples
709 ///
710 /// ```
711 /// use crossbeam_deque::Worker;
712 ///
713 /// let w1 = Worker::new_fifo();
714 /// w1.push(1);
715 /// w1.push(2);
716 /// w1.push(3);
717 /// w1.push(4);
718 /// w1.push(5);
719 /// w1.push(6);
720 ///
721 /// let s = w1.stealer();
722 /// let w2 = Worker::new_fifo();
723 ///
724 /// let _ = s.steal_batch_with_limit(&w2, 2);
725 /// assert_eq!(w2.pop(), Some(1));
726 /// assert_eq!(w2.pop(), Some(2));
727 /// assert_eq!(w2.pop(), None);
728 ///
729 /// w1.push(7);
730 /// w1.push(8);
731 /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
732 /// // half of the elements are currently popped, but the number of popped elements is considered
733 /// // an implementation detail that may be changed in the future.
734 /// let _ = s.steal_batch_with_limit(&w2, std::usize::MAX);
735 /// assert_eq!(w2.len(), 3);
736 /// ```
737 pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
738 assert!(limit > 0);
739 if Arc::ptr_eq(&self.inner, &dest.inner) {
740 if dest.is_empty() {
741 return Steal::Empty;
742 } else {
743 return Steal::Success(());
744 }
745 }
746
747 // Load the front index.
748 let mut f = self.inner.front.load(Ordering::Acquire);
749
750 // A SeqCst fence is needed here.
751 //
752 // If the current thread is already pinned (reentrantly), we must manually issue the
753 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
754 // have to.
755 if epoch::is_pinned() {
756 atomic::fence(Ordering::SeqCst);
757 }
758
759 let guard = &epoch::pin();
760
761 // Load the back index.
762 let b = self.inner.back.load(Ordering::Acquire);
763
764 // Is the queue empty?
765 let len = b.wrapping_sub(f);
766 if len <= 0 {
767 return Steal::Empty;
768 }
769
770 // Reserve capacity for the stolen batch.
771 let batch_size = cmp::min((len as usize + 1) / 2, limit);
772 dest.reserve(batch_size);
773 let mut batch_size = batch_size as isize;
774
775 // Get the destination buffer and back index.
776 let dest_buffer = dest.buffer.get();
777 let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
778
779 // Load the buffer.
780 let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
781
782 match self.flavor {
783 // Steal a batch of tasks from the front at once.
784 Flavor::Fifo => {
785 // Copy the batch from the source to the destination buffer.
786 match dest.flavor {
787 Flavor::Fifo => {
788 for i in 0..batch_size {
789 unsafe {
790 let task = buffer.deref().read(f.wrapping_add(i));
791 dest_buffer.write(dest_b.wrapping_add(i), task);
792 }
793 }
794 }
795 Flavor::Lifo => {
796 for i in 0..batch_size {
797 unsafe {
798 let task = buffer.deref().read(f.wrapping_add(i));
799 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
800 }
801 }
802 }
803 }
804
805 // Try incrementing the front index to steal the batch.
806 // If the buffer has been swapped or the increment fails, we retry.
807 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
808 || self
809 .inner
810 .front
811 .compare_exchange(
812 f,
813 f.wrapping_add(batch_size),
814 Ordering::SeqCst,
815 Ordering::Relaxed,
816 )
817 .is_err()
818 {
819 return Steal::Retry;
820 }
821
822 dest_b = dest_b.wrapping_add(batch_size);
823 }
824
825 // Steal a batch of tasks from the front one by one.
826 Flavor::Lifo => {
827 // This loop may modify the batch_size, which triggers a clippy lint warning.
828 // Use a new variable to avoid the warning, and to make it clear we aren't
829 // modifying the loop exit condition during iteration.
830 let original_batch_size = batch_size;
831
832 for i in 0..original_batch_size {
833 // If this is not the first steal, check whether the queue is empty.
834 if i > 0 {
835 // We've already got the current front index. Now execute the fence to
836 // synchronize with other threads.
837 atomic::fence(Ordering::SeqCst);
838
839 // Load the back index.
840 let b = self.inner.back.load(Ordering::Acquire);
841
842 // Is the queue empty?
843 if b.wrapping_sub(f) <= 0 {
844 batch_size = i;
845 break;
846 }
847 }
848
849 // Read the task at the front.
850 let task = unsafe { buffer.deref().read(f) };
851
852 // Try incrementing the front index to steal the task.
853 // If the buffer has been swapped or the increment fails, we retry.
854 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
855 || self
856 .inner
857 .front
858 .compare_exchange(
859 f,
860 f.wrapping_add(1),
861 Ordering::SeqCst,
862 Ordering::Relaxed,
863 )
864 .is_err()
865 {
866 // We didn't steal this task, forget it and break from the loop.
867 batch_size = i;
868 break;
869 }
870
871 // Write the stolen task into the destination buffer.
872 unsafe {
873 dest_buffer.write(dest_b, task);
874 }
875
876 // Move the source front index and the destination back index one step forward.
877 f = f.wrapping_add(1);
878 dest_b = dest_b.wrapping_add(1);
879 }
880
881 // If we didn't steal anything, the operation needs to be retried.
882 if batch_size == 0 {
883 return Steal::Retry;
884 }
885
886 // If stealing into a FIFO queue, stolen tasks need to be reversed.
887 if dest.flavor == Flavor::Fifo {
888 for i in 0..batch_size / 2 {
889 unsafe {
890 let i1 = dest_b.wrapping_sub(batch_size - i);
891 let i2 = dest_b.wrapping_sub(i + 1);
892 let t1 = dest_buffer.read(i1);
893 let t2 = dest_buffer.read(i2);
894 dest_buffer.write(i1, t2);
895 dest_buffer.write(i2, t1);
896 }
897 }
898 }
899 }
900 }
901
902 atomic::fence(Ordering::Release);
903
904 // Update the back index in the destination queue.
905 //
906 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
907 // races because it doesn't understand fences.
908 dest.inner.back.store(dest_b, Ordering::Release);
909
910 // Return with success.
911 Steal::Success(())
912 }
913
914 /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker.
915 ///
916 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
917 /// steal around half of the tasks in the queue, but also not more than some constant limit.
918 ///
919 /// # Examples
920 ///
921 /// ```
922 /// use crossbeam_deque::{Steal, Worker};
923 ///
924 /// let w1 = Worker::new_fifo();
925 /// w1.push(1);
926 /// w1.push(2);
927 /// w1.push(3);
928 /// w1.push(4);
929 ///
930 /// let s = w1.stealer();
931 /// let w2 = Worker::new_fifo();
932 ///
933 /// assert_eq!(s.steal_batch_and_pop(&w2), Steal::Success(1));
934 /// assert_eq!(w2.pop(), Some(2));
935 /// ```
936 pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
937 self.steal_batch_with_limit_and_pop(dest, MAX_BATCH)
938 }
939
940 /// Steals no more than `limit` of tasks, pushes them into another worker, and pops a task from
941 /// that worker.
942 ///
943 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
944 /// steal around half of the tasks in the queue, but also not more than the given limit.
945 ///
946 /// # Examples
947 ///
948 /// ```
949 /// use crossbeam_deque::{Steal, Worker};
950 ///
951 /// let w1 = Worker::new_fifo();
952 /// w1.push(1);
953 /// w1.push(2);
954 /// w1.push(3);
955 /// w1.push(4);
956 /// w1.push(5);
957 /// w1.push(6);
958 ///
959 /// let s = w1.stealer();
960 /// let w2 = Worker::new_fifo();
961 ///
962 /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, 2), Steal::Success(1));
963 /// assert_eq!(w2.pop(), Some(2));
964 /// assert_eq!(w2.pop(), None);
965 ///
966 /// w1.push(7);
967 /// w1.push(8);
968 /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
969 /// // half of the elements are currently popped, but the number of popped elements is considered
970 /// // an implementation detail that may be changed in the future.
971 /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, std::usize::MAX), Steal::Success(3));
972 /// assert_eq!(w2.pop(), Some(4));
973 /// assert_eq!(w2.pop(), Some(5));
974 /// assert_eq!(w2.pop(), None);
975 /// ```
976 pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
977 assert!(limit > 0);
978 if Arc::ptr_eq(&self.inner, &dest.inner) {
979 match dest.pop() {
980 None => return Steal::Empty,
981 Some(task) => return Steal::Success(task),
982 }
983 }
984
985 // Load the front index.
986 let mut f = self.inner.front.load(Ordering::Acquire);
987
988 // A SeqCst fence is needed here.
989 //
990 // If the current thread is already pinned (reentrantly), we must manually issue the
991 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
992 // have to.
993 if epoch::is_pinned() {
994 atomic::fence(Ordering::SeqCst);
995 }
996
997 let guard = &epoch::pin();
998
999 // Load the back index.
1000 let b = self.inner.back.load(Ordering::Acquire);
1001
1002 // Is the queue empty?
1003 let len = b.wrapping_sub(f);
1004 if len <= 0 {
1005 return Steal::Empty;
1006 }
1007
1008 // Reserve capacity for the stolen batch.
1009 let batch_size = cmp::min((len as usize - 1) / 2, limit - 1);
1010 dest.reserve(batch_size);
1011 let mut batch_size = batch_size as isize;
1012
1013 // Get the destination buffer and back index.
1014 let dest_buffer = dest.buffer.get();
1015 let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
1016
1017 // Load the buffer
1018 let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
1019
1020 // Read the task at the front.
1021 let mut task = unsafe { buffer.deref().read(f) };
1022
1023 match self.flavor {
1024 // Steal a batch of tasks from the front at once.
1025 Flavor::Fifo => {
1026 // Copy the batch from the source to the destination buffer.
1027 match dest.flavor {
1028 Flavor::Fifo => {
1029 for i in 0..batch_size {
1030 unsafe {
1031 let task = buffer.deref().read(f.wrapping_add(i + 1));
1032 dest_buffer.write(dest_b.wrapping_add(i), task);
1033 }
1034 }
1035 }
1036 Flavor::Lifo => {
1037 for i in 0..batch_size {
1038 unsafe {
1039 let task = buffer.deref().read(f.wrapping_add(i + 1));
1040 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
1041 }
1042 }
1043 }
1044 }
1045
1046 // Try incrementing the front index to steal the task.
1047 // If the buffer has been swapped or the increment fails, we retry.
1048 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1049 || self
1050 .inner
1051 .front
1052 .compare_exchange(
1053 f,
1054 f.wrapping_add(batch_size + 1),
1055 Ordering::SeqCst,
1056 Ordering::Relaxed,
1057 )
1058 .is_err()
1059 {
1060 // We didn't steal this task, forget it.
1061 return Steal::Retry;
1062 }
1063
1064 dest_b = dest_b.wrapping_add(batch_size);
1065 }
1066
1067 // Steal a batch of tasks from the front one by one.
1068 Flavor::Lifo => {
1069 // Try incrementing the front index to steal the task.
1070 if self
1071 .inner
1072 .front
1073 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
1074 .is_err()
1075 {
1076 // We didn't steal this task, forget it.
1077 return Steal::Retry;
1078 }
1079
1080 // Move the front index one step forward.
1081 f = f.wrapping_add(1);
1082
1083 // Repeat the same procedure for the batch steals.
1084 //
1085 // This loop may modify the batch_size, which triggers a clippy lint warning.
1086 // Use a new variable to avoid the warning, and to make it clear we aren't
1087 // modifying the loop exit condition during iteration.
1088 let original_batch_size = batch_size;
1089 for i in 0..original_batch_size {
1090 // We've already got the current front index. Now execute the fence to
1091 // synchronize with other threads.
1092 atomic::fence(Ordering::SeqCst);
1093
1094 // Load the back index.
1095 let b = self.inner.back.load(Ordering::Acquire);
1096
1097 // Is the queue empty?
1098 if b.wrapping_sub(f) <= 0 {
1099 batch_size = i;
1100 break;
1101 }
1102
1103 // Read the task at the front.
1104 let tmp = unsafe { buffer.deref().read(f) };
1105
1106 // Try incrementing the front index to steal the task.
1107 // If the buffer has been swapped or the increment fails, we retry.
1108 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1109 || self
1110 .inner
1111 .front
1112 .compare_exchange(
1113 f,
1114 f.wrapping_add(1),
1115 Ordering::SeqCst,
1116 Ordering::Relaxed,
1117 )
1118 .is_err()
1119 {
1120 // We didn't steal this task, forget it and break from the loop.
1121 batch_size = i;
1122 break;
1123 }
1124
1125 // Write the previously stolen task into the destination buffer.
1126 unsafe {
1127 dest_buffer.write(dest_b, mem::replace(&mut task, tmp));
1128 }
1129
1130 // Move the source front index and the destination back index one step forward.
1131 f = f.wrapping_add(1);
1132 dest_b = dest_b.wrapping_add(1);
1133 }
1134
1135 // If stealing into a FIFO queue, stolen tasks need to be reversed.
1136 if dest.flavor == Flavor::Fifo {
1137 for i in 0..batch_size / 2 {
1138 unsafe {
1139 let i1 = dest_b.wrapping_sub(batch_size - i);
1140 let i2 = dest_b.wrapping_sub(i + 1);
1141 let t1 = dest_buffer.read(i1);
1142 let t2 = dest_buffer.read(i2);
1143 dest_buffer.write(i1, t2);
1144 dest_buffer.write(i2, t1);
1145 }
1146 }
1147 }
1148 }
1149 }
1150
1151 atomic::fence(Ordering::Release);
1152
1153 // Update the back index in the destination queue.
1154 //
1155 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
1156 // races because it doesn't understand fences.
1157 dest.inner.back.store(dest_b, Ordering::Release);
1158
1159 // Return with success.
1160 Steal::Success(unsafe { task.assume_init() })
1161 }
1162}
1163
1164impl<T> Clone for Stealer<T> {
1165 fn clone(&self) -> Stealer<T> {
1166 Stealer {
1167 inner: self.inner.clone(),
1168 flavor: self.flavor,
1169 }
1170 }
1171}
1172
1173impl<T> fmt::Debug for Stealer<T> {
1174 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1175 f.pad("Stealer { .. }")
1176 }
1177}
1178
1179// Bits indicating the state of a slot:
1180// * If a task has been written into the slot, `WRITE` is set.
1181// * If a task has been read from the slot, `READ` is set.
1182// * If the block is being destroyed, `DESTROY` is set.
1183const WRITE: usize = 1;
1184const READ: usize = 2;
1185const DESTROY: usize = 4;
1186
1187// Each block covers one "lap" of indices.
1188const LAP: usize = 64;
1189// The maximum number of values a block can hold.
1190const BLOCK_CAP: usize = LAP - 1;
1191// How many lower bits are reserved for metadata.
1192const SHIFT: usize = 1;
1193// Indicates that the block is not the last one.
1194const HAS_NEXT: usize = 1;
1195
1196/// A slot in a block.
1197struct Slot<T> {
1198 /// The task.
1199 task: UnsafeCell<MaybeUninit<T>>,
1200
1201 /// The state of the slot.
1202 state: AtomicUsize,
1203}
1204
1205impl<T> Slot<T> {
1206 const UNINIT: Self = Self {
1207 task: UnsafeCell::new(MaybeUninit::uninit()),
1208 state: AtomicUsize::new(0),
1209 };
1210
1211 /// Waits until a task is written into the slot.
1212 fn wait_write(&self) {
1213 let backoff: Backoff = Backoff::new();
1214 while self.state.load(order:Ordering::Acquire) & WRITE == 0 {
1215 backoff.snooze();
1216 }
1217 }
1218}
1219
1220/// A block in a linked list.
1221///
1222/// Each block in the list can hold up to `BLOCK_CAP` values.
1223struct Block<T> {
1224 /// The next block in the linked list.
1225 next: AtomicPtr<Block<T>>,
1226
1227 /// Slots for values.
1228 slots: [Slot<T>; BLOCK_CAP],
1229}
1230
1231impl<T> Block<T> {
1232 /// Creates an empty block that starts at `start_index`.
1233 fn new() -> Block<T> {
1234 Self {
1235 next: AtomicPtr::new(ptr::null_mut()),
1236 slots: [Slot::UNINIT; BLOCK_CAP],
1237 }
1238 }
1239
1240 /// Waits until the next pointer is set.
1241 fn wait_next(&self) -> *mut Block<T> {
1242 let backoff = Backoff::new();
1243 loop {
1244 let next = self.next.load(Ordering::Acquire);
1245 if !next.is_null() {
1246 return next;
1247 }
1248 backoff.snooze();
1249 }
1250 }
1251
1252 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
1253 unsafe fn destroy(this: *mut Block<T>, count: usize) {
1254 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
1255 // begun destruction of the block.
1256 for i in (0..count).rev() {
1257 let slot = (*this).slots.get_unchecked(i);
1258
1259 // Mark the `DESTROY` bit if a thread is still using the slot.
1260 if slot.state.load(Ordering::Acquire) & READ == 0
1261 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
1262 {
1263 // If a thread is still using the slot, it will continue destruction of the block.
1264 return;
1265 }
1266 }
1267
1268 // No thread is using the block, now it is safe to destroy it.
1269 drop(Box::from_raw(this));
1270 }
1271}
1272
1273/// A position in a queue.
1274struct Position<T> {
1275 /// The index in the queue.
1276 index: AtomicUsize,
1277
1278 /// The block in the linked list.
1279 block: AtomicPtr<Block<T>>,
1280}
1281
1282/// An injector queue.
1283///
1284/// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have
1285/// a single injector queue, which is the entry point for new tasks.
1286///
1287/// # Examples
1288///
1289/// ```
1290/// use crossbeam_deque::{Injector, Steal};
1291///
1292/// let q = Injector::new();
1293/// q.push(1);
1294/// q.push(2);
1295///
1296/// assert_eq!(q.steal(), Steal::Success(1));
1297/// assert_eq!(q.steal(), Steal::Success(2));
1298/// assert_eq!(q.steal(), Steal::Empty);
1299/// ```
1300pub struct Injector<T> {
1301 /// The head of the queue.
1302 head: CachePadded<Position<T>>,
1303
1304 /// The tail of the queue.
1305 tail: CachePadded<Position<T>>,
1306
1307 /// Indicates that dropping a `Injector<T>` may drop values of type `T`.
1308 _marker: PhantomData<T>,
1309}
1310
1311unsafe impl<T: Send> Send for Injector<T> {}
1312unsafe impl<T: Send> Sync for Injector<T> {}
1313
1314impl<T> Default for Injector<T> {
1315 fn default() -> Self {
1316 let block: *mut Block = Box::into_raw(Box::new(Block::<T>::new()));
1317 Self {
1318 head: CachePadded::new(Position {
1319 block: AtomicPtr::new(block),
1320 index: AtomicUsize::new(0),
1321 }),
1322 tail: CachePadded::new(Position {
1323 block: AtomicPtr::new(block),
1324 index: AtomicUsize::new(0),
1325 }),
1326 _marker: PhantomData,
1327 }
1328 }
1329}
1330
1331impl<T> Injector<T> {
1332 /// Creates a new injector queue.
1333 ///
1334 /// # Examples
1335 ///
1336 /// ```
1337 /// use crossbeam_deque::Injector;
1338 ///
1339 /// let q = Injector::<i32>::new();
1340 /// ```
1341 pub fn new() -> Injector<T> {
1342 Self::default()
1343 }
1344
1345 /// Pushes a task into the queue.
1346 ///
1347 /// # Examples
1348 ///
1349 /// ```
1350 /// use crossbeam_deque::Injector;
1351 ///
1352 /// let w = Injector::new();
1353 /// w.push(1);
1354 /// w.push(2);
1355 /// ```
1356 pub fn push(&self, task: T) {
1357 let backoff = Backoff::new();
1358 let mut tail = self.tail.index.load(Ordering::Acquire);
1359 let mut block = self.tail.block.load(Ordering::Acquire);
1360 let mut next_block = None;
1361
1362 loop {
1363 // Calculate the offset of the index into the block.
1364 let offset = (tail >> SHIFT) % LAP;
1365
1366 // If we reached the end of the block, wait until the next one is installed.
1367 if offset == BLOCK_CAP {
1368 backoff.snooze();
1369 tail = self.tail.index.load(Ordering::Acquire);
1370 block = self.tail.block.load(Ordering::Acquire);
1371 continue;
1372 }
1373
1374 // If we're going to have to install the next block, allocate it in advance in order to
1375 // make the wait for other threads as short as possible.
1376 if offset + 1 == BLOCK_CAP && next_block.is_none() {
1377 next_block = Some(Box::new(Block::<T>::new()));
1378 }
1379
1380 let new_tail = tail + (1 << SHIFT);
1381
1382 // Try advancing the tail forward.
1383 match self.tail.index.compare_exchange_weak(
1384 tail,
1385 new_tail,
1386 Ordering::SeqCst,
1387 Ordering::Acquire,
1388 ) {
1389 Ok(_) => unsafe {
1390 // If we've reached the end of the block, install the next one.
1391 if offset + 1 == BLOCK_CAP {
1392 let next_block = Box::into_raw(next_block.unwrap());
1393 let next_index = new_tail.wrapping_add(1 << SHIFT);
1394
1395 self.tail.block.store(next_block, Ordering::Release);
1396 self.tail.index.store(next_index, Ordering::Release);
1397 (*block).next.store(next_block, Ordering::Release);
1398 }
1399
1400 // Write the task into the slot.
1401 let slot = (*block).slots.get_unchecked(offset);
1402 slot.task.get().write(MaybeUninit::new(task));
1403 slot.state.fetch_or(WRITE, Ordering::Release);
1404
1405 return;
1406 },
1407 Err(t) => {
1408 tail = t;
1409 block = self.tail.block.load(Ordering::Acquire);
1410 backoff.spin();
1411 }
1412 }
1413 }
1414 }
1415
1416 /// Steals a task from the queue.
1417 ///
1418 /// # Examples
1419 ///
1420 /// ```
1421 /// use crossbeam_deque::{Injector, Steal};
1422 ///
1423 /// let q = Injector::new();
1424 /// q.push(1);
1425 /// q.push(2);
1426 ///
1427 /// assert_eq!(q.steal(), Steal::Success(1));
1428 /// assert_eq!(q.steal(), Steal::Success(2));
1429 /// assert_eq!(q.steal(), Steal::Empty);
1430 /// ```
1431 pub fn steal(&self) -> Steal<T> {
1432 let mut head;
1433 let mut block;
1434 let mut offset;
1435
1436 let backoff = Backoff::new();
1437 loop {
1438 head = self.head.index.load(Ordering::Acquire);
1439 block = self.head.block.load(Ordering::Acquire);
1440
1441 // Calculate the offset of the index into the block.
1442 offset = (head >> SHIFT) % LAP;
1443
1444 // If we reached the end of the block, wait until the next one is installed.
1445 if offset == BLOCK_CAP {
1446 backoff.snooze();
1447 } else {
1448 break;
1449 }
1450 }
1451
1452 let mut new_head = head + (1 << SHIFT);
1453
1454 if new_head & HAS_NEXT == 0 {
1455 atomic::fence(Ordering::SeqCst);
1456 let tail = self.tail.index.load(Ordering::Relaxed);
1457
1458 // If the tail equals the head, that means the queue is empty.
1459 if head >> SHIFT == tail >> SHIFT {
1460 return Steal::Empty;
1461 }
1462
1463 // If head and tail are not in the same block, set `HAS_NEXT` in head.
1464 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1465 new_head |= HAS_NEXT;
1466 }
1467 }
1468
1469 // Try moving the head index forward.
1470 if self
1471 .head
1472 .index
1473 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1474 .is_err()
1475 {
1476 return Steal::Retry;
1477 }
1478
1479 unsafe {
1480 // If we've reached the end of the block, move to the next one.
1481 if offset + 1 == BLOCK_CAP {
1482 let next = (*block).wait_next();
1483 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1484 if !(*next).next.load(Ordering::Relaxed).is_null() {
1485 next_index |= HAS_NEXT;
1486 }
1487
1488 self.head.block.store(next, Ordering::Release);
1489 self.head.index.store(next_index, Ordering::Release);
1490 }
1491
1492 // Read the task.
1493 let slot = (*block).slots.get_unchecked(offset);
1494 slot.wait_write();
1495 let task = slot.task.get().read().assume_init();
1496
1497 // Destroy the block if we've reached the end, or if another thread wanted to destroy
1498 // but couldn't because we were busy reading from the slot.
1499 if (offset + 1 == BLOCK_CAP)
1500 || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0)
1501 {
1502 Block::destroy(block, offset);
1503 }
1504
1505 Steal::Success(task)
1506 }
1507 }
1508
1509 /// Steals a batch of tasks and pushes them into a worker.
1510 ///
1511 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1512 /// steal around half of the tasks in the queue, but also not more than some constant limit.
1513 ///
1514 /// # Examples
1515 ///
1516 /// ```
1517 /// use crossbeam_deque::{Injector, Worker};
1518 ///
1519 /// let q = Injector::new();
1520 /// q.push(1);
1521 /// q.push(2);
1522 /// q.push(3);
1523 /// q.push(4);
1524 ///
1525 /// let w = Worker::new_fifo();
1526 /// let _ = q.steal_batch(&w);
1527 /// assert_eq!(w.pop(), Some(1));
1528 /// assert_eq!(w.pop(), Some(2));
1529 /// ```
1530 pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
1531 self.steal_batch_with_limit(dest, MAX_BATCH)
1532 }
1533
1534 /// Steals no more than of tasks and pushes them into a worker.
1535 ///
1536 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1537 /// steal around half of the tasks in the queue, but also not more than some constant limit.
1538 ///
1539 /// # Examples
1540 ///
1541 /// ```
1542 /// use crossbeam_deque::{Injector, Worker};
1543 ///
1544 /// let q = Injector::new();
1545 /// q.push(1);
1546 /// q.push(2);
1547 /// q.push(3);
1548 /// q.push(4);
1549 /// q.push(5);
1550 /// q.push(6);
1551 ///
1552 /// let w = Worker::new_fifo();
1553 /// let _ = q.steal_batch_with_limit(&w, 2);
1554 /// assert_eq!(w.pop(), Some(1));
1555 /// assert_eq!(w.pop(), Some(2));
1556 /// assert_eq!(w.pop(), None);
1557 ///
1558 /// q.push(7);
1559 /// q.push(8);
1560 /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1561 /// // half of the elements are currently popped, but the number of popped elements is considered
1562 /// // an implementation detail that may be changed in the future.
1563 /// let _ = q.steal_batch_with_limit(&w, std::usize::MAX);
1564 /// assert_eq!(w.len(), 3);
1565 /// ```
1566 pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
1567 assert!(limit > 0);
1568 let mut head;
1569 let mut block;
1570 let mut offset;
1571
1572 let backoff = Backoff::new();
1573 loop {
1574 head = self.head.index.load(Ordering::Acquire);
1575 block = self.head.block.load(Ordering::Acquire);
1576
1577 // Calculate the offset of the index into the block.
1578 offset = (head >> SHIFT) % LAP;
1579
1580 // If we reached the end of the block, wait until the next one is installed.
1581 if offset == BLOCK_CAP {
1582 backoff.snooze();
1583 } else {
1584 break;
1585 }
1586 }
1587
1588 let mut new_head = head;
1589 let advance;
1590
1591 if new_head & HAS_NEXT == 0 {
1592 atomic::fence(Ordering::SeqCst);
1593 let tail = self.tail.index.load(Ordering::Relaxed);
1594
1595 // If the tail equals the head, that means the queue is empty.
1596 if head >> SHIFT == tail >> SHIFT {
1597 return Steal::Empty;
1598 }
1599
1600 // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate
1601 // the right batch size to steal.
1602 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1603 new_head |= HAS_NEXT;
1604 // We can steal all tasks till the end of the block.
1605 advance = (BLOCK_CAP - offset).min(limit);
1606 } else {
1607 let len = (tail - head) >> SHIFT;
1608 // Steal half of the available tasks.
1609 advance = ((len + 1) / 2).min(limit);
1610 }
1611 } else {
1612 // We can steal all tasks till the end of the block.
1613 advance = (BLOCK_CAP - offset).min(limit);
1614 }
1615
1616 new_head += advance << SHIFT;
1617 let new_offset = offset + advance;
1618
1619 // Try moving the head index forward.
1620 if self
1621 .head
1622 .index
1623 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1624 .is_err()
1625 {
1626 return Steal::Retry;
1627 }
1628
1629 // Reserve capacity for the stolen batch.
1630 let batch_size = new_offset - offset;
1631 dest.reserve(batch_size);
1632
1633 // Get the destination buffer and back index.
1634 let dest_buffer = dest.buffer.get();
1635 let dest_b = dest.inner.back.load(Ordering::Relaxed);
1636
1637 unsafe {
1638 // If we've reached the end of the block, move to the next one.
1639 if new_offset == BLOCK_CAP {
1640 let next = (*block).wait_next();
1641 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1642 if !(*next).next.load(Ordering::Relaxed).is_null() {
1643 next_index |= HAS_NEXT;
1644 }
1645
1646 self.head.block.store(next, Ordering::Release);
1647 self.head.index.store(next_index, Ordering::Release);
1648 }
1649
1650 // Copy values from the injector into the destination queue.
1651 match dest.flavor {
1652 Flavor::Fifo => {
1653 for i in 0..batch_size {
1654 // Read the task.
1655 let slot = (*block).slots.get_unchecked(offset + i);
1656 slot.wait_write();
1657 let task = slot.task.get().read();
1658
1659 // Write it into the destination queue.
1660 dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1661 }
1662 }
1663
1664 Flavor::Lifo => {
1665 for i in 0..batch_size {
1666 // Read the task.
1667 let slot = (*block).slots.get_unchecked(offset + i);
1668 slot.wait_write();
1669 let task = slot.task.get().read();
1670
1671 // Write it into the destination queue.
1672 dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1673 }
1674 }
1675 }
1676
1677 atomic::fence(Ordering::Release);
1678
1679 // Update the back index in the destination queue.
1680 //
1681 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1682 // data races because it doesn't understand fences.
1683 dest.inner
1684 .back
1685 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1686
1687 // Destroy the block if we've reached the end, or if another thread wanted to destroy
1688 // but couldn't because we were busy reading from the slot.
1689 if new_offset == BLOCK_CAP {
1690 Block::destroy(block, offset);
1691 } else {
1692 for i in offset..new_offset {
1693 let slot = (*block).slots.get_unchecked(i);
1694
1695 if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1696 Block::destroy(block, offset);
1697 break;
1698 }
1699 }
1700 }
1701
1702 Steal::Success(())
1703 }
1704 }
1705
1706 /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker.
1707 ///
1708 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1709 /// steal around half of the tasks in the queue, but also not more than some constant limit.
1710 ///
1711 /// # Examples
1712 ///
1713 /// ```
1714 /// use crossbeam_deque::{Injector, Steal, Worker};
1715 ///
1716 /// let q = Injector::new();
1717 /// q.push(1);
1718 /// q.push(2);
1719 /// q.push(3);
1720 /// q.push(4);
1721 ///
1722 /// let w = Worker::new_fifo();
1723 /// assert_eq!(q.steal_batch_and_pop(&w), Steal::Success(1));
1724 /// assert_eq!(w.pop(), Some(2));
1725 /// ```
1726 pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
1727 // TODO: we use `MAX_BATCH + 1` as the hard limit for Injecter as the performance is slightly
1728 // better, but we may change it in the future to be compatible with the same method in Stealer.
1729 self.steal_batch_with_limit_and_pop(dest, MAX_BATCH + 1)
1730 }
1731
1732 /// Steals no more than `limit` of tasks, pushes them into a worker, and pops a task from that worker.
1733 ///
1734 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1735 /// steal around half of the tasks in the queue, but also not more than the given limit.
1736 ///
1737 /// # Examples
1738 ///
1739 /// ```
1740 /// use crossbeam_deque::{Injector, Steal, Worker};
1741 ///
1742 /// let q = Injector::new();
1743 /// q.push(1);
1744 /// q.push(2);
1745 /// q.push(3);
1746 /// q.push(4);
1747 /// q.push(5);
1748 /// q.push(6);
1749 ///
1750 /// let w = Worker::new_fifo();
1751 /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, 2), Steal::Success(1));
1752 /// assert_eq!(w.pop(), Some(2));
1753 /// assert_eq!(w.pop(), None);
1754 ///
1755 /// q.push(7);
1756 /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1757 /// // half of the elements are currently popped, but the number of popped elements is considered
1758 /// // an implementation detail that may be changed in the future.
1759 /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, std::usize::MAX), Steal::Success(3));
1760 /// assert_eq!(w.pop(), Some(4));
1761 /// assert_eq!(w.pop(), Some(5));
1762 /// assert_eq!(w.pop(), None);
1763 /// ```
1764 pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
1765 assert!(limit > 0);
1766 let mut head;
1767 let mut block;
1768 let mut offset;
1769
1770 let backoff = Backoff::new();
1771 loop {
1772 head = self.head.index.load(Ordering::Acquire);
1773 block = self.head.block.load(Ordering::Acquire);
1774
1775 // Calculate the offset of the index into the block.
1776 offset = (head >> SHIFT) % LAP;
1777
1778 // If we reached the end of the block, wait until the next one is installed.
1779 if offset == BLOCK_CAP {
1780 backoff.snooze();
1781 } else {
1782 break;
1783 }
1784 }
1785
1786 let mut new_head = head;
1787 let advance;
1788
1789 if new_head & HAS_NEXT == 0 {
1790 atomic::fence(Ordering::SeqCst);
1791 let tail = self.tail.index.load(Ordering::Relaxed);
1792
1793 // If the tail equals the head, that means the queue is empty.
1794 if head >> SHIFT == tail >> SHIFT {
1795 return Steal::Empty;
1796 }
1797
1798 // If head and tail are not in the same block, set `HAS_NEXT` in head.
1799 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1800 new_head |= HAS_NEXT;
1801 // We can steal all tasks till the end of the block.
1802 advance = (BLOCK_CAP - offset).min(limit);
1803 } else {
1804 let len = (tail - head) >> SHIFT;
1805 // Steal half of the available tasks.
1806 advance = ((len + 1) / 2).min(limit);
1807 }
1808 } else {
1809 // We can steal all tasks till the end of the block.
1810 advance = (BLOCK_CAP - offset).min(limit);
1811 }
1812
1813 new_head += advance << SHIFT;
1814 let new_offset = offset + advance;
1815
1816 // Try moving the head index forward.
1817 if self
1818 .head
1819 .index
1820 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1821 .is_err()
1822 {
1823 return Steal::Retry;
1824 }
1825
1826 // Reserve capacity for the stolen batch.
1827 let batch_size = new_offset - offset - 1;
1828 dest.reserve(batch_size);
1829
1830 // Get the destination buffer and back index.
1831 let dest_buffer = dest.buffer.get();
1832 let dest_b = dest.inner.back.load(Ordering::Relaxed);
1833
1834 unsafe {
1835 // If we've reached the end of the block, move to the next one.
1836 if new_offset == BLOCK_CAP {
1837 let next = (*block).wait_next();
1838 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1839 if !(*next).next.load(Ordering::Relaxed).is_null() {
1840 next_index |= HAS_NEXT;
1841 }
1842
1843 self.head.block.store(next, Ordering::Release);
1844 self.head.index.store(next_index, Ordering::Release);
1845 }
1846
1847 // Read the task.
1848 let slot = (*block).slots.get_unchecked(offset);
1849 slot.wait_write();
1850 let task = slot.task.get().read();
1851
1852 match dest.flavor {
1853 Flavor::Fifo => {
1854 // Copy values from the injector into the destination queue.
1855 for i in 0..batch_size {
1856 // Read the task.
1857 let slot = (*block).slots.get_unchecked(offset + i + 1);
1858 slot.wait_write();
1859 let task = slot.task.get().read();
1860
1861 // Write it into the destination queue.
1862 dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1863 }
1864 }
1865
1866 Flavor::Lifo => {
1867 // Copy values from the injector into the destination queue.
1868 for i in 0..batch_size {
1869 // Read the task.
1870 let slot = (*block).slots.get_unchecked(offset + i + 1);
1871 slot.wait_write();
1872 let task = slot.task.get().read();
1873
1874 // Write it into the destination queue.
1875 dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1876 }
1877 }
1878 }
1879
1880 atomic::fence(Ordering::Release);
1881
1882 // Update the back index in the destination queue.
1883 //
1884 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1885 // data races because it doesn't understand fences.
1886 dest.inner
1887 .back
1888 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1889
1890 // Destroy the block if we've reached the end, or if another thread wanted to destroy
1891 // but couldn't because we were busy reading from the slot.
1892 if new_offset == BLOCK_CAP {
1893 Block::destroy(block, offset);
1894 } else {
1895 for i in offset..new_offset {
1896 let slot = (*block).slots.get_unchecked(i);
1897
1898 if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1899 Block::destroy(block, offset);
1900 break;
1901 }
1902 }
1903 }
1904
1905 Steal::Success(task.assume_init())
1906 }
1907 }
1908
1909 /// Returns `true` if the queue is empty.
1910 ///
1911 /// # Examples
1912 ///
1913 /// ```
1914 /// use crossbeam_deque::Injector;
1915 ///
1916 /// let q = Injector::new();
1917 ///
1918 /// assert!(q.is_empty());
1919 /// q.push(1);
1920 /// assert!(!q.is_empty());
1921 /// ```
1922 pub fn is_empty(&self) -> bool {
1923 let head = self.head.index.load(Ordering::SeqCst);
1924 let tail = self.tail.index.load(Ordering::SeqCst);
1925 head >> SHIFT == tail >> SHIFT
1926 }
1927
1928 /// Returns the number of tasks in the queue.
1929 ///
1930 /// # Examples
1931 ///
1932 /// ```
1933 /// use crossbeam_deque::Injector;
1934 ///
1935 /// let q = Injector::new();
1936 ///
1937 /// assert_eq!(q.len(), 0);
1938 /// q.push(1);
1939 /// assert_eq!(q.len(), 1);
1940 /// q.push(1);
1941 /// assert_eq!(q.len(), 2);
1942 /// ```
1943 pub fn len(&self) -> usize {
1944 loop {
1945 // Load the tail index, then load the head index.
1946 let mut tail = self.tail.index.load(Ordering::SeqCst);
1947 let mut head = self.head.index.load(Ordering::SeqCst);
1948
1949 // If the tail index didn't change, we've got consistent indices to work with.
1950 if self.tail.index.load(Ordering::SeqCst) == tail {
1951 // Erase the lower bits.
1952 tail &= !((1 << SHIFT) - 1);
1953 head &= !((1 << SHIFT) - 1);
1954
1955 // Fix up indices if they fall onto block ends.
1956 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
1957 tail = tail.wrapping_add(1 << SHIFT);
1958 }
1959 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
1960 head = head.wrapping_add(1 << SHIFT);
1961 }
1962
1963 // Rotate indices so that head falls into the first block.
1964 let lap = (head >> SHIFT) / LAP;
1965 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
1966 head = head.wrapping_sub((lap * LAP) << SHIFT);
1967
1968 // Remove the lower bits.
1969 tail >>= SHIFT;
1970 head >>= SHIFT;
1971
1972 // Return the difference minus the number of blocks between tail and head.
1973 return tail - head - tail / LAP;
1974 }
1975 }
1976 }
1977}
1978
1979impl<T> Drop for Injector<T> {
1980 fn drop(&mut self) {
1981 let mut head = *self.head.index.get_mut();
1982 let mut tail = *self.tail.index.get_mut();
1983 let mut block = *self.head.block.get_mut();
1984
1985 // Erase the lower bits.
1986 head &= !((1 << SHIFT) - 1);
1987 tail &= !((1 << SHIFT) - 1);
1988
1989 unsafe {
1990 // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
1991 while head != tail {
1992 let offset = (head >> SHIFT) % LAP;
1993
1994 if offset < BLOCK_CAP {
1995 // Drop the task in the slot.
1996 let slot = (*block).slots.get_unchecked(offset);
1997 (*slot.task.get()).assume_init_drop();
1998 } else {
1999 // Deallocate the block and move to the next one.
2000 let next = *(*block).next.get_mut();
2001 drop(Box::from_raw(block));
2002 block = next;
2003 }
2004
2005 head = head.wrapping_add(1 << SHIFT);
2006 }
2007
2008 // Deallocate the last remaining block.
2009 drop(Box::from_raw(block));
2010 }
2011 }
2012}
2013
2014impl<T> fmt::Debug for Injector<T> {
2015 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2016 f.pad("Worker { .. }")
2017 }
2018}
2019
2020/// Possible outcomes of a steal operation.
2021///
2022/// # Examples
2023///
2024/// There are lots of ways to chain results of steal operations together:
2025///
2026/// ```
2027/// use crossbeam_deque::Steal::{self, Empty, Retry, Success};
2028///
2029/// let collect = |v: Vec<Steal<i32>>| v.into_iter().collect::<Steal<i32>>();
2030///
2031/// assert_eq!(collect(vec![Empty, Empty, Empty]), Empty);
2032/// assert_eq!(collect(vec![Empty, Retry, Empty]), Retry);
2033/// assert_eq!(collect(vec![Retry, Success(1), Empty]), Success(1));
2034///
2035/// assert_eq!(collect(vec![Empty, Empty]).or_else(|| Retry), Retry);
2036/// assert_eq!(collect(vec![Retry, Empty]).or_else(|| Success(1)), Success(1));
2037/// ```
2038#[must_use]
2039#[derive(PartialEq, Eq, Copy, Clone)]
2040pub enum Steal<T> {
2041 /// The queue was empty at the time of stealing.
2042 Empty,
2043
2044 /// At least one task was successfully stolen.
2045 Success(T),
2046
2047 /// The steal operation needs to be retried.
2048 Retry,
2049}
2050
2051impl<T> Steal<T> {
2052 /// Returns `true` if the queue was empty at the time of stealing.
2053 ///
2054 /// # Examples
2055 ///
2056 /// ```
2057 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2058 ///
2059 /// assert!(!Success(7).is_empty());
2060 /// assert!(!Retry::<i32>.is_empty());
2061 ///
2062 /// assert!(Empty::<i32>.is_empty());
2063 /// ```
2064 pub fn is_empty(&self) -> bool {
2065 match self {
2066 Steal::Empty => true,
2067 _ => false,
2068 }
2069 }
2070
2071 /// Returns `true` if at least one task was stolen.
2072 ///
2073 /// # Examples
2074 ///
2075 /// ```
2076 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2077 ///
2078 /// assert!(!Empty::<i32>.is_success());
2079 /// assert!(!Retry::<i32>.is_success());
2080 ///
2081 /// assert!(Success(7).is_success());
2082 /// ```
2083 pub fn is_success(&self) -> bool {
2084 match self {
2085 Steal::Success(_) => true,
2086 _ => false,
2087 }
2088 }
2089
2090 /// Returns `true` if the steal operation needs to be retried.
2091 ///
2092 /// # Examples
2093 ///
2094 /// ```
2095 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2096 ///
2097 /// assert!(!Empty::<i32>.is_retry());
2098 /// assert!(!Success(7).is_retry());
2099 ///
2100 /// assert!(Retry::<i32>.is_retry());
2101 /// ```
2102 pub fn is_retry(&self) -> bool {
2103 match self {
2104 Steal::Retry => true,
2105 _ => false,
2106 }
2107 }
2108
2109 /// Returns the result of the operation, if successful.
2110 ///
2111 /// # Examples
2112 ///
2113 /// ```
2114 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2115 ///
2116 /// assert_eq!(Empty::<i32>.success(), None);
2117 /// assert_eq!(Retry::<i32>.success(), None);
2118 ///
2119 /// assert_eq!(Success(7).success(), Some(7));
2120 /// ```
2121 pub fn success(self) -> Option<T> {
2122 match self {
2123 Steal::Success(res) => Some(res),
2124 _ => None,
2125 }
2126 }
2127
2128 /// If no task was stolen, attempts another steal operation.
2129 ///
2130 /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then:
2131 ///
2132 /// * If the second steal resulted in `Success`, it is returned.
2133 /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned.
2134 /// * If both resulted in `None`, then `None` is returned.
2135 ///
2136 /// # Examples
2137 ///
2138 /// ```
2139 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2140 ///
2141 /// assert_eq!(Success(1).or_else(|| Success(2)), Success(1));
2142 /// assert_eq!(Retry.or_else(|| Success(2)), Success(2));
2143 ///
2144 /// assert_eq!(Retry.or_else(|| Empty), Retry::<i32>);
2145 /// assert_eq!(Empty.or_else(|| Retry), Retry::<i32>);
2146 ///
2147 /// assert_eq!(Empty.or_else(|| Empty), Empty::<i32>);
2148 /// ```
2149 pub fn or_else<F>(self, f: F) -> Steal<T>
2150 where
2151 F: FnOnce() -> Steal<T>,
2152 {
2153 match self {
2154 Steal::Empty => f(),
2155 Steal::Success(_) => self,
2156 Steal::Retry => {
2157 if let Steal::Success(res) = f() {
2158 Steal::Success(res)
2159 } else {
2160 Steal::Retry
2161 }
2162 }
2163 }
2164 }
2165}
2166
2167impl<T> fmt::Debug for Steal<T> {
2168 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2169 match self {
2170 Steal::Empty => f.pad("Empty"),
2171 Steal::Success(_) => f.pad("Success(..)"),
2172 Steal::Retry => f.pad("Retry"),
2173 }
2174 }
2175}
2176
2177impl<T> FromIterator<Steal<T>> for Steal<T> {
2178 /// Consumes items until a `Success` is found and returns it.
2179 ///
2180 /// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`.
2181 /// Otherwise, `Empty` is returned.
2182 fn from_iter<I>(iter: I) -> Steal<T>
2183 where
2184 I: IntoIterator<Item = Steal<T>>,
2185 {
2186 let mut retry = false;
2187 for s in iter {
2188 match &s {
2189 Steal::Empty => {}
2190 Steal::Success(_) => return s,
2191 Steal::Retry => retry = true,
2192 }
2193 }
2194
2195 if retry {
2196 Steal::Retry
2197 } else {
2198 Steal::Empty
2199 }
2200 }
2201}
2202