1use std::cell::{Cell, UnsafeCell};
2use std::cmp;
3use std::fmt;
4use std::iter::FromIterator;
5use std::marker::PhantomData;
6use std::mem::{self, ManuallyDrop, MaybeUninit};
7use std::ptr;
8use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
9use std::sync::Arc;
10
11use crate::epoch::{self, Atomic, Owned};
12use crate::utils::{Backoff, CachePadded};
13
14// Minimum buffer capacity.
15const MIN_CAP: usize = 64;
16// Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`.
17const MAX_BATCH: usize = 32;
18// If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets
19// deallocated as soon as possible.
20const FLUSH_THRESHOLD_BYTES: usize = 1 << 10;
21
22/// A buffer that holds tasks in a worker queue.
23///
24/// This is just a pointer to the buffer and its length - dropping an instance of this struct will
25/// *not* deallocate the buffer.
26struct Buffer<T> {
27 /// Pointer to the allocated memory.
28 ptr: *mut T,
29
30 /// Capacity of the buffer. Always a power of two.
31 cap: usize,
32}
33
34unsafe impl<T> Send for Buffer<T> {}
35
36impl<T> Buffer<T> {
37 /// Allocates a new buffer with the specified capacity.
38 fn alloc(cap: usize) -> Buffer<T> {
39 debug_assert_eq!(cap, cap.next_power_of_two());
40
41 let mut v = ManuallyDrop::new(Vec::with_capacity(cap));
42 let ptr = v.as_mut_ptr();
43
44 Buffer { ptr, cap }
45 }
46
47 /// Deallocates the buffer.
48 unsafe fn dealloc(self) {
49 drop(Vec::from_raw_parts(self.ptr, 0, self.cap));
50 }
51
52 /// Returns a pointer to the task at the specified `index`.
53 unsafe fn at(&self, index: isize) -> *mut T {
54 // `self.cap` is always a power of two.
55 // We do all the loads at `MaybeUninit` because we might realize, after loading, that we
56 // don't actually have the right to access this memory.
57 self.ptr.offset(index & (self.cap - 1) as isize)
58 }
59
60 /// Writes `task` into the specified `index`.
61 ///
62 /// This method might be concurrently called with another `read` at the same index, which is
63 /// technically speaking a data race and therefore UB. We should use an atomic store here, but
64 /// that would be more expensive and difficult to implement generically for all types `T`.
65 /// Hence, as a hack, we use a volatile write instead.
66 unsafe fn write(&self, index: isize, task: MaybeUninit<T>) {
67 ptr::write_volatile(self.at(index).cast::<MaybeUninit<T>>(), task)
68 }
69
70 /// Reads a task from the specified `index`.
71 ///
72 /// This method might be concurrently called with another `write` at the same index, which is
73 /// technically speaking a data race and therefore UB. We should use an atomic load here, but
74 /// that would be more expensive and difficult to implement generically for all types `T`.
75 /// Hence, as a hack, we use a volatile load instead.
76 unsafe fn read(&self, index: isize) -> MaybeUninit<T> {
77 ptr::read_volatile(self.at(index).cast::<MaybeUninit<T>>())
78 }
79}
80
81impl<T> Clone for Buffer<T> {
82 fn clone(&self) -> Buffer<T> {
83 *self
84 }
85}
86
87impl<T> Copy for Buffer<T> {}
88
89/// Internal queue data shared between the worker and stealers.
90///
91/// The implementation is based on the following work:
92///
93/// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev]
94/// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models.
95/// PPoPP 2013.][weak-mem]
96/// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++
97/// atomics. OOPSLA 2013.][checker]
98///
99/// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974
100/// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524
101/// [checker]: https://dl.acm.org/citation.cfm?id=2509514
102struct Inner<T> {
103 /// The front index.
104 front: AtomicIsize,
105
106 /// The back index.
107 back: AtomicIsize,
108
109 /// The underlying buffer.
110 buffer: CachePadded<Atomic<Buffer<T>>>,
111}
112
113impl<T> Drop for Inner<T> {
114 fn drop(&mut self) {
115 // Load the back index, front index, and buffer.
116 let b = *self.back.get_mut();
117 let f = *self.front.get_mut();
118
119 unsafe {
120 let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected());
121
122 // Go through the buffer from front to back and drop all tasks in the queue.
123 let mut i = f;
124 while i != b {
125 buffer.deref().at(i).drop_in_place();
126 i = i.wrapping_add(1);
127 }
128
129 // Free the memory allocated by the buffer.
130 buffer.into_owned().into_box().dealloc();
131 }
132 }
133}
134
135/// Worker queue flavor: FIFO or LIFO.
136#[derive(Clone, Copy, Debug, Eq, PartialEq)]
137enum Flavor {
138 /// The first-in first-out flavor.
139 Fifo,
140
141 /// The last-in first-out flavor.
142 Lifo,
143}
144
145/// A worker queue.
146///
147/// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal
148/// tasks from it. Task schedulers typically create a single worker queue per thread.
149///
150/// # Examples
151///
152/// A FIFO worker:
153///
154/// ```
155/// use crossbeam_deque::{Steal, Worker};
156///
157/// let w = Worker::new_fifo();
158/// let s = w.stealer();
159///
160/// w.push(1);
161/// w.push(2);
162/// w.push(3);
163///
164/// assert_eq!(s.steal(), Steal::Success(1));
165/// assert_eq!(w.pop(), Some(2));
166/// assert_eq!(w.pop(), Some(3));
167/// ```
168///
169/// A LIFO worker:
170///
171/// ```
172/// use crossbeam_deque::{Steal, Worker};
173///
174/// let w = Worker::new_lifo();
175/// let s = w.stealer();
176///
177/// w.push(1);
178/// w.push(2);
179/// w.push(3);
180///
181/// assert_eq!(s.steal(), Steal::Success(1));
182/// assert_eq!(w.pop(), Some(3));
183/// assert_eq!(w.pop(), Some(2));
184/// ```
185pub struct Worker<T> {
186 /// A reference to the inner representation of the queue.
187 inner: Arc<CachePadded<Inner<T>>>,
188
189 /// A copy of `inner.buffer` for quick access.
190 buffer: Cell<Buffer<T>>,
191
192 /// The flavor of the queue.
193 flavor: Flavor,
194
195 /// Indicates that the worker cannot be shared among threads.
196 _marker: PhantomData<*mut ()>, // !Send + !Sync
197}
198
199unsafe impl<T: Send> Send for Worker<T> {}
200
201impl<T> Worker<T> {
202 /// Creates a FIFO worker queue.
203 ///
204 /// Tasks are pushed and popped from opposite ends.
205 ///
206 /// # Examples
207 ///
208 /// ```
209 /// use crossbeam_deque::Worker;
210 ///
211 /// let w = Worker::<i32>::new_fifo();
212 /// ```
213 pub fn new_fifo() -> Worker<T> {
214 let buffer = Buffer::alloc(MIN_CAP);
215
216 let inner = Arc::new(CachePadded::new(Inner {
217 front: AtomicIsize::new(0),
218 back: AtomicIsize::new(0),
219 buffer: CachePadded::new(Atomic::new(buffer)),
220 }));
221
222 Worker {
223 inner,
224 buffer: Cell::new(buffer),
225 flavor: Flavor::Fifo,
226 _marker: PhantomData,
227 }
228 }
229
230 /// Creates a LIFO worker queue.
231 ///
232 /// Tasks are pushed and popped from the same end.
233 ///
234 /// # Examples
235 ///
236 /// ```
237 /// use crossbeam_deque::Worker;
238 ///
239 /// let w = Worker::<i32>::new_lifo();
240 /// ```
241 pub fn new_lifo() -> Worker<T> {
242 let buffer = Buffer::alloc(MIN_CAP);
243
244 let inner = Arc::new(CachePadded::new(Inner {
245 front: AtomicIsize::new(0),
246 back: AtomicIsize::new(0),
247 buffer: CachePadded::new(Atomic::new(buffer)),
248 }));
249
250 Worker {
251 inner,
252 buffer: Cell::new(buffer),
253 flavor: Flavor::Lifo,
254 _marker: PhantomData,
255 }
256 }
257
258 /// Creates a stealer for this queue.
259 ///
260 /// The returned stealer can be shared among threads and cloned.
261 ///
262 /// # Examples
263 ///
264 /// ```
265 /// use crossbeam_deque::Worker;
266 ///
267 /// let w = Worker::<i32>::new_lifo();
268 /// let s = w.stealer();
269 /// ```
270 pub fn stealer(&self) -> Stealer<T> {
271 Stealer {
272 inner: self.inner.clone(),
273 flavor: self.flavor,
274 }
275 }
276
277 /// Resizes the internal buffer to the new capacity of `new_cap`.
278 #[cold]
279 unsafe fn resize(&self, new_cap: usize) {
280 // Load the back index, front index, and buffer.
281 let b = self.inner.back.load(Ordering::Relaxed);
282 let f = self.inner.front.load(Ordering::Relaxed);
283 let buffer = self.buffer.get();
284
285 // Allocate a new buffer and copy data from the old buffer to the new one.
286 let new = Buffer::alloc(new_cap);
287 let mut i = f;
288 while i != b {
289 ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1);
290 i = i.wrapping_add(1);
291 }
292
293 let guard = &epoch::pin();
294
295 // Replace the old buffer with the new one.
296 self.buffer.replace(new);
297 let old =
298 self.inner
299 .buffer
300 .swap(Owned::new(new).into_shared(guard), Ordering::Release, guard);
301
302 // Destroy the old buffer later.
303 guard.defer_unchecked(move || old.into_owned().into_box().dealloc());
304
305 // If the buffer is very large, then flush the thread-local garbage in order to deallocate
306 // it as soon as possible.
307 if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES {
308 guard.flush();
309 }
310 }
311
312 /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the
313 /// buffer.
314 fn reserve(&self, reserve_cap: usize) {
315 if reserve_cap > 0 {
316 // Compute the current length.
317 let b = self.inner.back.load(Ordering::Relaxed);
318 let f = self.inner.front.load(Ordering::SeqCst);
319 let len = b.wrapping_sub(f) as usize;
320
321 // The current capacity.
322 let cap = self.buffer.get().cap;
323
324 // Is there enough capacity to push `reserve_cap` tasks?
325 if cap - len < reserve_cap {
326 // Keep doubling the capacity as much as is needed.
327 let mut new_cap = cap * 2;
328 while new_cap - len < reserve_cap {
329 new_cap *= 2;
330 }
331
332 // Resize the buffer.
333 unsafe {
334 self.resize(new_cap);
335 }
336 }
337 }
338 }
339
340 /// Returns `true` if the queue is empty.
341 ///
342 /// ```
343 /// use crossbeam_deque::Worker;
344 ///
345 /// let w = Worker::new_lifo();
346 ///
347 /// assert!(w.is_empty());
348 /// w.push(1);
349 /// assert!(!w.is_empty());
350 /// ```
351 pub fn is_empty(&self) -> bool {
352 let b = self.inner.back.load(Ordering::Relaxed);
353 let f = self.inner.front.load(Ordering::SeqCst);
354 b.wrapping_sub(f) <= 0
355 }
356
357 /// Returns the number of tasks in the deque.
358 ///
359 /// ```
360 /// use crossbeam_deque::Worker;
361 ///
362 /// let w = Worker::new_lifo();
363 ///
364 /// assert_eq!(w.len(), 0);
365 /// w.push(1);
366 /// assert_eq!(w.len(), 1);
367 /// w.push(1);
368 /// assert_eq!(w.len(), 2);
369 /// ```
370 pub fn len(&self) -> usize {
371 let b = self.inner.back.load(Ordering::Relaxed);
372 let f = self.inner.front.load(Ordering::SeqCst);
373 b.wrapping_sub(f).max(0) as usize
374 }
375
376 /// Pushes a task into the queue.
377 ///
378 /// # Examples
379 ///
380 /// ```
381 /// use crossbeam_deque::Worker;
382 ///
383 /// let w = Worker::new_lifo();
384 /// w.push(1);
385 /// w.push(2);
386 /// ```
387 pub fn push(&self, task: T) {
388 // Load the back index, front index, and buffer.
389 let b = self.inner.back.load(Ordering::Relaxed);
390 let f = self.inner.front.load(Ordering::Acquire);
391 let mut buffer = self.buffer.get();
392
393 // Calculate the length of the queue.
394 let len = b.wrapping_sub(f);
395
396 // Is the queue full?
397 if len >= buffer.cap as isize {
398 // Yes. Grow the underlying buffer.
399 unsafe {
400 self.resize(2 * buffer.cap);
401 }
402 buffer = self.buffer.get();
403 }
404
405 // Write `task` into the slot.
406 unsafe {
407 buffer.write(b, MaybeUninit::new(task));
408 }
409
410 atomic::fence(Ordering::Release);
411
412 // Increment the back index.
413 //
414 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
415 // races because it doesn't understand fences.
416 self.inner.back.store(b.wrapping_add(1), Ordering::Release);
417 }
418
419 /// Pops a task from the queue.
420 ///
421 /// # Examples
422 ///
423 /// ```
424 /// use crossbeam_deque::Worker;
425 ///
426 /// let w = Worker::new_fifo();
427 /// w.push(1);
428 /// w.push(2);
429 ///
430 /// assert_eq!(w.pop(), Some(1));
431 /// assert_eq!(w.pop(), Some(2));
432 /// assert_eq!(w.pop(), None);
433 /// ```
434 pub fn pop(&self) -> Option<T> {
435 // Load the back and front index.
436 let b = self.inner.back.load(Ordering::Relaxed);
437 let f = self.inner.front.load(Ordering::Relaxed);
438
439 // Calculate the length of the queue.
440 let len = b.wrapping_sub(f);
441
442 // Is the queue empty?
443 if len <= 0 {
444 return None;
445 }
446
447 match self.flavor {
448 // Pop from the front of the queue.
449 Flavor::Fifo => {
450 // Try incrementing the front index to pop the task.
451 let f = self.inner.front.fetch_add(1, Ordering::SeqCst);
452 let new_f = f.wrapping_add(1);
453
454 if b.wrapping_sub(new_f) < 0 {
455 self.inner.front.store(f, Ordering::Relaxed);
456 return None;
457 }
458
459 unsafe {
460 // Read the popped task.
461 let buffer = self.buffer.get();
462 let task = buffer.read(f).assume_init();
463
464 // Shrink the buffer if `len - 1` is less than one fourth of the capacity.
465 if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 {
466 self.resize(buffer.cap / 2);
467 }
468
469 Some(task)
470 }
471 }
472
473 // Pop from the back of the queue.
474 Flavor::Lifo => {
475 // Decrement the back index.
476 let b = b.wrapping_sub(1);
477 self.inner.back.store(b, Ordering::Relaxed);
478
479 atomic::fence(Ordering::SeqCst);
480
481 // Load the front index.
482 let f = self.inner.front.load(Ordering::Relaxed);
483
484 // Compute the length after the back index was decremented.
485 let len = b.wrapping_sub(f);
486
487 if len < 0 {
488 // The queue is empty. Restore the back index to the original task.
489 self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
490 None
491 } else {
492 // Read the task to be popped.
493 let buffer = self.buffer.get();
494 let mut task = unsafe { Some(buffer.read(b)) };
495
496 // Are we popping the last task from the queue?
497 if len == 0 {
498 // Try incrementing the front index.
499 if self
500 .inner
501 .front
502 .compare_exchange(
503 f,
504 f.wrapping_add(1),
505 Ordering::SeqCst,
506 Ordering::Relaxed,
507 )
508 .is_err()
509 {
510 // Failed. We didn't pop anything. Reset to `None`.
511 task.take();
512 }
513
514 // Restore the back index to the original task.
515 self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
516 } else {
517 // Shrink the buffer if `len` is less than one fourth of the capacity.
518 if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 {
519 unsafe {
520 self.resize(buffer.cap / 2);
521 }
522 }
523 }
524
525 task.map(|t| unsafe { t.assume_init() })
526 }
527 }
528 }
529 }
530}
531
532impl<T> fmt::Debug for Worker<T> {
533 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
534 f.pad("Worker { .. }")
535 }
536}
537
538/// A stealer handle of a worker queue.
539///
540/// Stealers can be shared among threads.
541///
542/// Task schedulers typically have a single worker queue per worker thread.
543///
544/// # Examples
545///
546/// ```
547/// use crossbeam_deque::{Steal, Worker};
548///
549/// let w = Worker::new_lifo();
550/// w.push(1);
551/// w.push(2);
552///
553/// let s = w.stealer();
554/// assert_eq!(s.steal(), Steal::Success(1));
555/// assert_eq!(s.steal(), Steal::Success(2));
556/// assert_eq!(s.steal(), Steal::Empty);
557/// ```
558pub struct Stealer<T> {
559 /// A reference to the inner representation of the queue.
560 inner: Arc<CachePadded<Inner<T>>>,
561
562 /// The flavor of the queue.
563 flavor: Flavor,
564}
565
566unsafe impl<T: Send> Send for Stealer<T> {}
567unsafe impl<T: Send> Sync for Stealer<T> {}
568
569impl<T> Stealer<T> {
570 /// Returns `true` if the queue is empty.
571 ///
572 /// ```
573 /// use crossbeam_deque::Worker;
574 ///
575 /// let w = Worker::new_lifo();
576 /// let s = w.stealer();
577 ///
578 /// assert!(s.is_empty());
579 /// w.push(1);
580 /// assert!(!s.is_empty());
581 /// ```
582 pub fn is_empty(&self) -> bool {
583 let f = self.inner.front.load(Ordering::Acquire);
584 atomic::fence(Ordering::SeqCst);
585 let b = self.inner.back.load(Ordering::Acquire);
586 b.wrapping_sub(f) <= 0
587 }
588
589 /// Returns the number of tasks in the deque.
590 ///
591 /// ```
592 /// use crossbeam_deque::Worker;
593 ///
594 /// let w = Worker::new_lifo();
595 /// let s = w.stealer();
596 ///
597 /// assert_eq!(s.len(), 0);
598 /// w.push(1);
599 /// assert_eq!(s.len(), 1);
600 /// w.push(2);
601 /// assert_eq!(s.len(), 2);
602 /// ```
603 pub fn len(&self) -> usize {
604 let f = self.inner.front.load(Ordering::Acquire);
605 atomic::fence(Ordering::SeqCst);
606 let b = self.inner.back.load(Ordering::Acquire);
607 b.wrapping_sub(f).max(0) as usize
608 }
609
610 /// Steals a task from the queue.
611 ///
612 /// # Examples
613 ///
614 /// ```
615 /// use crossbeam_deque::{Steal, Worker};
616 ///
617 /// let w = Worker::new_lifo();
618 /// w.push(1);
619 /// w.push(2);
620 ///
621 /// let s = w.stealer();
622 /// assert_eq!(s.steal(), Steal::Success(1));
623 /// assert_eq!(s.steal(), Steal::Success(2));
624 /// ```
625 pub fn steal(&self) -> Steal<T> {
626 // Load the front index.
627 let f = self.inner.front.load(Ordering::Acquire);
628
629 // A SeqCst fence is needed here.
630 //
631 // If the current thread is already pinned (reentrantly), we must manually issue the
632 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
633 // have to.
634 if epoch::is_pinned() {
635 atomic::fence(Ordering::SeqCst);
636 }
637
638 let guard = &epoch::pin();
639
640 // Load the back index.
641 let b = self.inner.back.load(Ordering::Acquire);
642
643 // Is the queue empty?
644 if b.wrapping_sub(f) <= 0 {
645 return Steal::Empty;
646 }
647
648 // Load the buffer and read the task at the front.
649 let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
650 let task = unsafe { buffer.deref().read(f) };
651
652 // Try incrementing the front index to steal the task.
653 // If the buffer has been swapped or the increment fails, we retry.
654 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
655 || self
656 .inner
657 .front
658 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
659 .is_err()
660 {
661 // We didn't steal this task, forget it.
662 return Steal::Retry;
663 }
664
665 // Return the stolen task.
666 Steal::Success(unsafe { task.assume_init() })
667 }
668
669 /// Steals a batch of tasks and pushes them into another worker.
670 ///
671 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
672 /// steal around half of the tasks in the queue, but also not more than some constant limit.
673 ///
674 /// # Examples
675 ///
676 /// ```
677 /// use crossbeam_deque::Worker;
678 ///
679 /// let w1 = Worker::new_fifo();
680 /// w1.push(1);
681 /// w1.push(2);
682 /// w1.push(3);
683 /// w1.push(4);
684 ///
685 /// let s = w1.stealer();
686 /// let w2 = Worker::new_fifo();
687 ///
688 /// let _ = s.steal_batch(&w2);
689 /// assert_eq!(w2.pop(), Some(1));
690 /// assert_eq!(w2.pop(), Some(2));
691 /// ```
692 pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
693 self.steal_batch_with_limit(dest, MAX_BATCH)
694 }
695
696 /// Steals no more than `limit` of tasks and pushes them into another worker.
697 ///
698 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
699 /// steal around half of the tasks in the queue, but also not more than the given limit.
700 ///
701 /// # Examples
702 ///
703 /// ```
704 /// use crossbeam_deque::Worker;
705 ///
706 /// let w1 = Worker::new_fifo();
707 /// w1.push(1);
708 /// w1.push(2);
709 /// w1.push(3);
710 /// w1.push(4);
711 /// w1.push(5);
712 /// w1.push(6);
713 ///
714 /// let s = w1.stealer();
715 /// let w2 = Worker::new_fifo();
716 ///
717 /// let _ = s.steal_batch_with_limit(&w2, 2);
718 /// assert_eq!(w2.pop(), Some(1));
719 /// assert_eq!(w2.pop(), Some(2));
720 /// assert_eq!(w2.pop(), None);
721 ///
722 /// w1.push(7);
723 /// w1.push(8);
724 /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
725 /// // half of the elements are currently popped, but the number of popped elements is considered
726 /// // an implementation detail that may be changed in the future.
727 /// let _ = s.steal_batch_with_limit(&w2, std::usize::MAX);
728 /// assert_eq!(w2.len(), 3);
729 /// ```
730 pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
731 assert!(limit > 0);
732 if Arc::ptr_eq(&self.inner, &dest.inner) {
733 if dest.is_empty() {
734 return Steal::Empty;
735 } else {
736 return Steal::Success(());
737 }
738 }
739
740 // Load the front index.
741 let mut f = self.inner.front.load(Ordering::Acquire);
742
743 // A SeqCst fence is needed here.
744 //
745 // If the current thread is already pinned (reentrantly), we must manually issue the
746 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
747 // have to.
748 if epoch::is_pinned() {
749 atomic::fence(Ordering::SeqCst);
750 }
751
752 let guard = &epoch::pin();
753
754 // Load the back index.
755 let b = self.inner.back.load(Ordering::Acquire);
756
757 // Is the queue empty?
758 let len = b.wrapping_sub(f);
759 if len <= 0 {
760 return Steal::Empty;
761 }
762
763 // Reserve capacity for the stolen batch.
764 let batch_size = cmp::min((len as usize + 1) / 2, limit);
765 dest.reserve(batch_size);
766 let mut batch_size = batch_size as isize;
767
768 // Get the destination buffer and back index.
769 let dest_buffer = dest.buffer.get();
770 let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
771
772 // Load the buffer.
773 let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
774
775 match self.flavor {
776 // Steal a batch of tasks from the front at once.
777 Flavor::Fifo => {
778 // Copy the batch from the source to the destination buffer.
779 match dest.flavor {
780 Flavor::Fifo => {
781 for i in 0..batch_size {
782 unsafe {
783 let task = buffer.deref().read(f.wrapping_add(i));
784 dest_buffer.write(dest_b.wrapping_add(i), task);
785 }
786 }
787 }
788 Flavor::Lifo => {
789 for i in 0..batch_size {
790 unsafe {
791 let task = buffer.deref().read(f.wrapping_add(i));
792 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
793 }
794 }
795 }
796 }
797
798 // Try incrementing the front index to steal the batch.
799 // If the buffer has been swapped or the increment fails, we retry.
800 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
801 || self
802 .inner
803 .front
804 .compare_exchange(
805 f,
806 f.wrapping_add(batch_size),
807 Ordering::SeqCst,
808 Ordering::Relaxed,
809 )
810 .is_err()
811 {
812 return Steal::Retry;
813 }
814
815 dest_b = dest_b.wrapping_add(batch_size);
816 }
817
818 // Steal a batch of tasks from the front one by one.
819 Flavor::Lifo => {
820 // This loop may modify the batch_size, which triggers a clippy lint warning.
821 // Use a new variable to avoid the warning, and to make it clear we aren't
822 // modifying the loop exit condition during iteration.
823 let original_batch_size = batch_size;
824
825 for i in 0..original_batch_size {
826 // If this is not the first steal, check whether the queue is empty.
827 if i > 0 {
828 // We've already got the current front index. Now execute the fence to
829 // synchronize with other threads.
830 atomic::fence(Ordering::SeqCst);
831
832 // Load the back index.
833 let b = self.inner.back.load(Ordering::Acquire);
834
835 // Is the queue empty?
836 if b.wrapping_sub(f) <= 0 {
837 batch_size = i;
838 break;
839 }
840 }
841
842 // Read the task at the front.
843 let task = unsafe { buffer.deref().read(f) };
844
845 // Try incrementing the front index to steal the task.
846 // If the buffer has been swapped or the increment fails, we retry.
847 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
848 || self
849 .inner
850 .front
851 .compare_exchange(
852 f,
853 f.wrapping_add(1),
854 Ordering::SeqCst,
855 Ordering::Relaxed,
856 )
857 .is_err()
858 {
859 // We didn't steal this task, forget it and break from the loop.
860 batch_size = i;
861 break;
862 }
863
864 // Write the stolen task into the destination buffer.
865 unsafe {
866 dest_buffer.write(dest_b, task);
867 }
868
869 // Move the source front index and the destination back index one step forward.
870 f = f.wrapping_add(1);
871 dest_b = dest_b.wrapping_add(1);
872 }
873
874 // If we didn't steal anything, the operation needs to be retried.
875 if batch_size == 0 {
876 return Steal::Retry;
877 }
878
879 // If stealing into a FIFO queue, stolen tasks need to be reversed.
880 if dest.flavor == Flavor::Fifo {
881 for i in 0..batch_size / 2 {
882 unsafe {
883 let i1 = dest_b.wrapping_sub(batch_size - i);
884 let i2 = dest_b.wrapping_sub(i + 1);
885 let t1 = dest_buffer.read(i1);
886 let t2 = dest_buffer.read(i2);
887 dest_buffer.write(i1, t2);
888 dest_buffer.write(i2, t1);
889 }
890 }
891 }
892 }
893 }
894
895 atomic::fence(Ordering::Release);
896
897 // Update the back index in the destination queue.
898 //
899 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
900 // races because it doesn't understand fences.
901 dest.inner.back.store(dest_b, Ordering::Release);
902
903 // Return with success.
904 Steal::Success(())
905 }
906
907 /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker.
908 ///
909 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
910 /// steal around half of the tasks in the queue, but also not more than some constant limit.
911 ///
912 /// # Examples
913 ///
914 /// ```
915 /// use crossbeam_deque::{Steal, Worker};
916 ///
917 /// let w1 = Worker::new_fifo();
918 /// w1.push(1);
919 /// w1.push(2);
920 /// w1.push(3);
921 /// w1.push(4);
922 ///
923 /// let s = w1.stealer();
924 /// let w2 = Worker::new_fifo();
925 ///
926 /// assert_eq!(s.steal_batch_and_pop(&w2), Steal::Success(1));
927 /// assert_eq!(w2.pop(), Some(2));
928 /// ```
929 pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
930 self.steal_batch_with_limit_and_pop(dest, MAX_BATCH)
931 }
932
933 /// Steals no more than `limit` of tasks, pushes them into another worker, and pops a task from
934 /// that worker.
935 ///
936 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
937 /// steal around half of the tasks in the queue, but also not more than the given limit.
938 ///
939 /// # Examples
940 ///
941 /// ```
942 /// use crossbeam_deque::{Steal, Worker};
943 ///
944 /// let w1 = Worker::new_fifo();
945 /// w1.push(1);
946 /// w1.push(2);
947 /// w1.push(3);
948 /// w1.push(4);
949 /// w1.push(5);
950 /// w1.push(6);
951 ///
952 /// let s = w1.stealer();
953 /// let w2 = Worker::new_fifo();
954 ///
955 /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, 2), Steal::Success(1));
956 /// assert_eq!(w2.pop(), Some(2));
957 /// assert_eq!(w2.pop(), None);
958 ///
959 /// w1.push(7);
960 /// w1.push(8);
961 /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
962 /// // half of the elements are currently popped, but the number of popped elements is considered
963 /// // an implementation detail that may be changed in the future.
964 /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, std::usize::MAX), Steal::Success(3));
965 /// assert_eq!(w2.pop(), Some(4));
966 /// assert_eq!(w2.pop(), Some(5));
967 /// assert_eq!(w2.pop(), None);
968 /// ```
969 pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
970 assert!(limit > 0);
971 if Arc::ptr_eq(&self.inner, &dest.inner) {
972 match dest.pop() {
973 None => return Steal::Empty,
974 Some(task) => return Steal::Success(task),
975 }
976 }
977
978 // Load the front index.
979 let mut f = self.inner.front.load(Ordering::Acquire);
980
981 // A SeqCst fence is needed here.
982 //
983 // If the current thread is already pinned (reentrantly), we must manually issue the
984 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
985 // have to.
986 if epoch::is_pinned() {
987 atomic::fence(Ordering::SeqCst);
988 }
989
990 let guard = &epoch::pin();
991
992 // Load the back index.
993 let b = self.inner.back.load(Ordering::Acquire);
994
995 // Is the queue empty?
996 let len = b.wrapping_sub(f);
997 if len <= 0 {
998 return Steal::Empty;
999 }
1000
1001 // Reserve capacity for the stolen batch.
1002 let batch_size = cmp::min((len as usize - 1) / 2, limit - 1);
1003 dest.reserve(batch_size);
1004 let mut batch_size = batch_size as isize;
1005
1006 // Get the destination buffer and back index.
1007 let dest_buffer = dest.buffer.get();
1008 let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
1009
1010 // Load the buffer
1011 let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
1012
1013 // Read the task at the front.
1014 let mut task = unsafe { buffer.deref().read(f) };
1015
1016 match self.flavor {
1017 // Steal a batch of tasks from the front at once.
1018 Flavor::Fifo => {
1019 // Copy the batch from the source to the destination buffer.
1020 match dest.flavor {
1021 Flavor::Fifo => {
1022 for i in 0..batch_size {
1023 unsafe {
1024 let task = buffer.deref().read(f.wrapping_add(i + 1));
1025 dest_buffer.write(dest_b.wrapping_add(i), task);
1026 }
1027 }
1028 }
1029 Flavor::Lifo => {
1030 for i in 0..batch_size {
1031 unsafe {
1032 let task = buffer.deref().read(f.wrapping_add(i + 1));
1033 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
1034 }
1035 }
1036 }
1037 }
1038
1039 // Try incrementing the front index to steal the task.
1040 // If the buffer has been swapped or the increment fails, we retry.
1041 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1042 || self
1043 .inner
1044 .front
1045 .compare_exchange(
1046 f,
1047 f.wrapping_add(batch_size + 1),
1048 Ordering::SeqCst,
1049 Ordering::Relaxed,
1050 )
1051 .is_err()
1052 {
1053 // We didn't steal this task, forget it.
1054 return Steal::Retry;
1055 }
1056
1057 dest_b = dest_b.wrapping_add(batch_size);
1058 }
1059
1060 // Steal a batch of tasks from the front one by one.
1061 Flavor::Lifo => {
1062 // Try incrementing the front index to steal the task.
1063 if self
1064 .inner
1065 .front
1066 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
1067 .is_err()
1068 {
1069 // We didn't steal this task, forget it.
1070 return Steal::Retry;
1071 }
1072
1073 // Move the front index one step forward.
1074 f = f.wrapping_add(1);
1075
1076 // Repeat the same procedure for the batch steals.
1077 //
1078 // This loop may modify the batch_size, which triggers a clippy lint warning.
1079 // Use a new variable to avoid the warning, and to make it clear we aren't
1080 // modifying the loop exit condition during iteration.
1081 let original_batch_size = batch_size;
1082 for i in 0..original_batch_size {
1083 // We've already got the current front index. Now execute the fence to
1084 // synchronize with other threads.
1085 atomic::fence(Ordering::SeqCst);
1086
1087 // Load the back index.
1088 let b = self.inner.back.load(Ordering::Acquire);
1089
1090 // Is the queue empty?
1091 if b.wrapping_sub(f) <= 0 {
1092 batch_size = i;
1093 break;
1094 }
1095
1096 // Read the task at the front.
1097 let tmp = unsafe { buffer.deref().read(f) };
1098
1099 // Try incrementing the front index to steal the task.
1100 // If the buffer has been swapped or the increment fails, we retry.
1101 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1102 || self
1103 .inner
1104 .front
1105 .compare_exchange(
1106 f,
1107 f.wrapping_add(1),
1108 Ordering::SeqCst,
1109 Ordering::Relaxed,
1110 )
1111 .is_err()
1112 {
1113 // We didn't steal this task, forget it and break from the loop.
1114 batch_size = i;
1115 break;
1116 }
1117
1118 // Write the previously stolen task into the destination buffer.
1119 unsafe {
1120 dest_buffer.write(dest_b, mem::replace(&mut task, tmp));
1121 }
1122
1123 // Move the source front index and the destination back index one step forward.
1124 f = f.wrapping_add(1);
1125 dest_b = dest_b.wrapping_add(1);
1126 }
1127
1128 // If stealing into a FIFO queue, stolen tasks need to be reversed.
1129 if dest.flavor == Flavor::Fifo {
1130 for i in 0..batch_size / 2 {
1131 unsafe {
1132 let i1 = dest_b.wrapping_sub(batch_size - i);
1133 let i2 = dest_b.wrapping_sub(i + 1);
1134 let t1 = dest_buffer.read(i1);
1135 let t2 = dest_buffer.read(i2);
1136 dest_buffer.write(i1, t2);
1137 dest_buffer.write(i2, t1);
1138 }
1139 }
1140 }
1141 }
1142 }
1143
1144 atomic::fence(Ordering::Release);
1145
1146 // Update the back index in the destination queue.
1147 //
1148 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
1149 // races because it doesn't understand fences.
1150 dest.inner.back.store(dest_b, Ordering::Release);
1151
1152 // Return with success.
1153 Steal::Success(unsafe { task.assume_init() })
1154 }
1155}
1156
1157impl<T> Clone for Stealer<T> {
1158 fn clone(&self) -> Stealer<T> {
1159 Stealer {
1160 inner: self.inner.clone(),
1161 flavor: self.flavor,
1162 }
1163 }
1164}
1165
1166impl<T> fmt::Debug for Stealer<T> {
1167 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1168 f.pad("Stealer { .. }")
1169 }
1170}
1171
1172// Bits indicating the state of a slot:
1173// * If a task has been written into the slot, `WRITE` is set.
1174// * If a task has been read from the slot, `READ` is set.
1175// * If the block is being destroyed, `DESTROY` is set.
1176const WRITE: usize = 1;
1177const READ: usize = 2;
1178const DESTROY: usize = 4;
1179
1180// Each block covers one "lap" of indices.
1181const LAP: usize = 64;
1182// The maximum number of values a block can hold.
1183const BLOCK_CAP: usize = LAP - 1;
1184// How many lower bits are reserved for metadata.
1185const SHIFT: usize = 1;
1186// Indicates that the block is not the last one.
1187const HAS_NEXT: usize = 1;
1188
1189/// A slot in a block.
1190struct Slot<T> {
1191 /// The task.
1192 task: UnsafeCell<MaybeUninit<T>>,
1193
1194 /// The state of the slot.
1195 state: AtomicUsize,
1196}
1197
1198impl<T> Slot<T> {
1199 const UNINIT: Self = Self {
1200 task: UnsafeCell::new(MaybeUninit::uninit()),
1201 state: AtomicUsize::new(0),
1202 };
1203
1204 /// Waits until a task is written into the slot.
1205 fn wait_write(&self) {
1206 let backoff = Backoff::new();
1207 while self.state.load(Ordering::Acquire) & WRITE == 0 {
1208 backoff.snooze();
1209 }
1210 }
1211}
1212
1213/// A block in a linked list.
1214///
1215/// Each block in the list can hold up to `BLOCK_CAP` values.
1216struct Block<T> {
1217 /// The next block in the linked list.
1218 next: AtomicPtr<Block<T>>,
1219
1220 /// Slots for values.
1221 slots: [Slot<T>; BLOCK_CAP],
1222}
1223
1224impl<T> Block<T> {
1225 /// Creates an empty block that starts at `start_index`.
1226 fn new() -> Block<T> {
1227 Self {
1228 next: AtomicPtr::new(ptr::null_mut()),
1229 slots: [Slot::UNINIT; BLOCK_CAP],
1230 }
1231 }
1232
1233 /// Waits until the next pointer is set.
1234 fn wait_next(&self) -> *mut Block<T> {
1235 let backoff = Backoff::new();
1236 loop {
1237 let next = self.next.load(Ordering::Acquire);
1238 if !next.is_null() {
1239 return next;
1240 }
1241 backoff.snooze();
1242 }
1243 }
1244
1245 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
1246 unsafe fn destroy(this: *mut Block<T>, count: usize) {
1247 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
1248 // begun destruction of the block.
1249 for i in (0..count).rev() {
1250 let slot = (*this).slots.get_unchecked(i);
1251
1252 // Mark the `DESTROY` bit if a thread is still using the slot.
1253 if slot.state.load(Ordering::Acquire) & READ == 0
1254 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
1255 {
1256 // If a thread is still using the slot, it will continue destruction of the block.
1257 return;
1258 }
1259 }
1260
1261 // No thread is using the block, now it is safe to destroy it.
1262 drop(Box::from_raw(this));
1263 }
1264}
1265
1266/// A position in a queue.
1267struct Position<T> {
1268 /// The index in the queue.
1269 index: AtomicUsize,
1270
1271 /// The block in the linked list.
1272 block: AtomicPtr<Block<T>>,
1273}
1274
1275/// An injector queue.
1276///
1277/// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have
1278/// a single injector queue, which is the entry point for new tasks.
1279///
1280/// # Examples
1281///
1282/// ```
1283/// use crossbeam_deque::{Injector, Steal};
1284///
1285/// let q = Injector::new();
1286/// q.push(1);
1287/// q.push(2);
1288///
1289/// assert_eq!(q.steal(), Steal::Success(1));
1290/// assert_eq!(q.steal(), Steal::Success(2));
1291/// assert_eq!(q.steal(), Steal::Empty);
1292/// ```
1293pub struct Injector<T> {
1294 /// The head of the queue.
1295 head: CachePadded<Position<T>>,
1296
1297 /// The tail of the queue.
1298 tail: CachePadded<Position<T>>,
1299
1300 /// Indicates that dropping a `Injector<T>` may drop values of type `T`.
1301 _marker: PhantomData<T>,
1302}
1303
1304unsafe impl<T: Send> Send for Injector<T> {}
1305unsafe impl<T: Send> Sync for Injector<T> {}
1306
1307impl<T> Default for Injector<T> {
1308 fn default() -> Self {
1309 let block = Box::into_raw(Box::new(Block::<T>::new()));
1310 Self {
1311 head: CachePadded::new(Position {
1312 block: AtomicPtr::new(block),
1313 index: AtomicUsize::new(0),
1314 }),
1315 tail: CachePadded::new(Position {
1316 block: AtomicPtr::new(block),
1317 index: AtomicUsize::new(0),
1318 }),
1319 _marker: PhantomData,
1320 }
1321 }
1322}
1323
1324impl<T> Injector<T> {
1325 /// Creates a new injector queue.
1326 ///
1327 /// # Examples
1328 ///
1329 /// ```
1330 /// use crossbeam_deque::Injector;
1331 ///
1332 /// let q = Injector::<i32>::new();
1333 /// ```
1334 pub fn new() -> Injector<T> {
1335 Self::default()
1336 }
1337
1338 /// Pushes a task into the queue.
1339 ///
1340 /// # Examples
1341 ///
1342 /// ```
1343 /// use crossbeam_deque::Injector;
1344 ///
1345 /// let w = Injector::new();
1346 /// w.push(1);
1347 /// w.push(2);
1348 /// ```
1349 pub fn push(&self, task: T) {
1350 let backoff = Backoff::new();
1351 let mut tail = self.tail.index.load(Ordering::Acquire);
1352 let mut block = self.tail.block.load(Ordering::Acquire);
1353 let mut next_block = None;
1354
1355 loop {
1356 // Calculate the offset of the index into the block.
1357 let offset = (tail >> SHIFT) % LAP;
1358
1359 // If we reached the end of the block, wait until the next one is installed.
1360 if offset == BLOCK_CAP {
1361 backoff.snooze();
1362 tail = self.tail.index.load(Ordering::Acquire);
1363 block = self.tail.block.load(Ordering::Acquire);
1364 continue;
1365 }
1366
1367 // If we're going to have to install the next block, allocate it in advance in order to
1368 // make the wait for other threads as short as possible.
1369 if offset + 1 == BLOCK_CAP && next_block.is_none() {
1370 next_block = Some(Box::new(Block::<T>::new()));
1371 }
1372
1373 let new_tail = tail + (1 << SHIFT);
1374
1375 // Try advancing the tail forward.
1376 match self.tail.index.compare_exchange_weak(
1377 tail,
1378 new_tail,
1379 Ordering::SeqCst,
1380 Ordering::Acquire,
1381 ) {
1382 Ok(_) => unsafe {
1383 // If we've reached the end of the block, install the next one.
1384 if offset + 1 == BLOCK_CAP {
1385 let next_block = Box::into_raw(next_block.unwrap());
1386 let next_index = new_tail.wrapping_add(1 << SHIFT);
1387
1388 self.tail.block.store(next_block, Ordering::Release);
1389 self.tail.index.store(next_index, Ordering::Release);
1390 (*block).next.store(next_block, Ordering::Release);
1391 }
1392
1393 // Write the task into the slot.
1394 let slot = (*block).slots.get_unchecked(offset);
1395 slot.task.get().write(MaybeUninit::new(task));
1396 slot.state.fetch_or(WRITE, Ordering::Release);
1397
1398 return;
1399 },
1400 Err(t) => {
1401 tail = t;
1402 block = self.tail.block.load(Ordering::Acquire);
1403 backoff.spin();
1404 }
1405 }
1406 }
1407 }
1408
1409 /// Steals a task from the queue.
1410 ///
1411 /// # Examples
1412 ///
1413 /// ```
1414 /// use crossbeam_deque::{Injector, Steal};
1415 ///
1416 /// let q = Injector::new();
1417 /// q.push(1);
1418 /// q.push(2);
1419 ///
1420 /// assert_eq!(q.steal(), Steal::Success(1));
1421 /// assert_eq!(q.steal(), Steal::Success(2));
1422 /// assert_eq!(q.steal(), Steal::Empty);
1423 /// ```
1424 pub fn steal(&self) -> Steal<T> {
1425 let mut head;
1426 let mut block;
1427 let mut offset;
1428
1429 let backoff = Backoff::new();
1430 loop {
1431 head = self.head.index.load(Ordering::Acquire);
1432 block = self.head.block.load(Ordering::Acquire);
1433
1434 // Calculate the offset of the index into the block.
1435 offset = (head >> SHIFT) % LAP;
1436
1437 // If we reached the end of the block, wait until the next one is installed.
1438 if offset == BLOCK_CAP {
1439 backoff.snooze();
1440 } else {
1441 break;
1442 }
1443 }
1444
1445 let mut new_head = head + (1 << SHIFT);
1446
1447 if new_head & HAS_NEXT == 0 {
1448 atomic::fence(Ordering::SeqCst);
1449 let tail = self.tail.index.load(Ordering::Relaxed);
1450
1451 // If the tail equals the head, that means the queue is empty.
1452 if head >> SHIFT == tail >> SHIFT {
1453 return Steal::Empty;
1454 }
1455
1456 // If head and tail are not in the same block, set `HAS_NEXT` in head.
1457 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1458 new_head |= HAS_NEXT;
1459 }
1460 }
1461
1462 // Try moving the head index forward.
1463 if self
1464 .head
1465 .index
1466 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1467 .is_err()
1468 {
1469 return Steal::Retry;
1470 }
1471
1472 unsafe {
1473 // If we've reached the end of the block, move to the next one.
1474 if offset + 1 == BLOCK_CAP {
1475 let next = (*block).wait_next();
1476 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1477 if !(*next).next.load(Ordering::Relaxed).is_null() {
1478 next_index |= HAS_NEXT;
1479 }
1480
1481 self.head.block.store(next, Ordering::Release);
1482 self.head.index.store(next_index, Ordering::Release);
1483 }
1484
1485 // Read the task.
1486 let slot = (*block).slots.get_unchecked(offset);
1487 slot.wait_write();
1488 let task = slot.task.get().read().assume_init();
1489
1490 // Destroy the block if we've reached the end, or if another thread wanted to destroy
1491 // but couldn't because we were busy reading from the slot.
1492 if (offset + 1 == BLOCK_CAP)
1493 || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0)
1494 {
1495 Block::destroy(block, offset);
1496 }
1497
1498 Steal::Success(task)
1499 }
1500 }
1501
1502 /// Steals a batch of tasks and pushes them into a worker.
1503 ///
1504 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1505 /// steal around half of the tasks in the queue, but also not more than some constant limit.
1506 ///
1507 /// # Examples
1508 ///
1509 /// ```
1510 /// use crossbeam_deque::{Injector, Worker};
1511 ///
1512 /// let q = Injector::new();
1513 /// q.push(1);
1514 /// q.push(2);
1515 /// q.push(3);
1516 /// q.push(4);
1517 ///
1518 /// let w = Worker::new_fifo();
1519 /// let _ = q.steal_batch(&w);
1520 /// assert_eq!(w.pop(), Some(1));
1521 /// assert_eq!(w.pop(), Some(2));
1522 /// ```
1523 pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
1524 self.steal_batch_with_limit(dest, MAX_BATCH)
1525 }
1526
1527 /// Steals no more than of tasks and pushes them into a worker.
1528 ///
1529 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1530 /// steal around half of the tasks in the queue, but also not more than some constant limit.
1531 ///
1532 /// # Examples
1533 ///
1534 /// ```
1535 /// use crossbeam_deque::{Injector, Worker};
1536 ///
1537 /// let q = Injector::new();
1538 /// q.push(1);
1539 /// q.push(2);
1540 /// q.push(3);
1541 /// q.push(4);
1542 /// q.push(5);
1543 /// q.push(6);
1544 ///
1545 /// let w = Worker::new_fifo();
1546 /// let _ = q.steal_batch_with_limit(&w, 2);
1547 /// assert_eq!(w.pop(), Some(1));
1548 /// assert_eq!(w.pop(), Some(2));
1549 /// assert_eq!(w.pop(), None);
1550 ///
1551 /// q.push(7);
1552 /// q.push(8);
1553 /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1554 /// // half of the elements are currently popped, but the number of popped elements is considered
1555 /// // an implementation detail that may be changed in the future.
1556 /// let _ = q.steal_batch_with_limit(&w, std::usize::MAX);
1557 /// assert_eq!(w.len(), 3);
1558 /// ```
1559 pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
1560 assert!(limit > 0);
1561 let mut head;
1562 let mut block;
1563 let mut offset;
1564
1565 let backoff = Backoff::new();
1566 loop {
1567 head = self.head.index.load(Ordering::Acquire);
1568 block = self.head.block.load(Ordering::Acquire);
1569
1570 // Calculate the offset of the index into the block.
1571 offset = (head >> SHIFT) % LAP;
1572
1573 // If we reached the end of the block, wait until the next one is installed.
1574 if offset == BLOCK_CAP {
1575 backoff.snooze();
1576 } else {
1577 break;
1578 }
1579 }
1580
1581 let mut new_head = head;
1582 let advance;
1583
1584 if new_head & HAS_NEXT == 0 {
1585 atomic::fence(Ordering::SeqCst);
1586 let tail = self.tail.index.load(Ordering::Relaxed);
1587
1588 // If the tail equals the head, that means the queue is empty.
1589 if head >> SHIFT == tail >> SHIFT {
1590 return Steal::Empty;
1591 }
1592
1593 // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate
1594 // the right batch size to steal.
1595 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1596 new_head |= HAS_NEXT;
1597 // We can steal all tasks till the end of the block.
1598 advance = (BLOCK_CAP - offset).min(limit);
1599 } else {
1600 let len = (tail - head) >> SHIFT;
1601 // Steal half of the available tasks.
1602 advance = ((len + 1) / 2).min(limit);
1603 }
1604 } else {
1605 // We can steal all tasks till the end of the block.
1606 advance = (BLOCK_CAP - offset).min(limit);
1607 }
1608
1609 new_head += advance << SHIFT;
1610 let new_offset = offset + advance;
1611
1612 // Try moving the head index forward.
1613 if self
1614 .head
1615 .index
1616 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1617 .is_err()
1618 {
1619 return Steal::Retry;
1620 }
1621
1622 // Reserve capacity for the stolen batch.
1623 let batch_size = new_offset - offset;
1624 dest.reserve(batch_size);
1625
1626 // Get the destination buffer and back index.
1627 let dest_buffer = dest.buffer.get();
1628 let dest_b = dest.inner.back.load(Ordering::Relaxed);
1629
1630 unsafe {
1631 // If we've reached the end of the block, move to the next one.
1632 if new_offset == BLOCK_CAP {
1633 let next = (*block).wait_next();
1634 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1635 if !(*next).next.load(Ordering::Relaxed).is_null() {
1636 next_index |= HAS_NEXT;
1637 }
1638
1639 self.head.block.store(next, Ordering::Release);
1640 self.head.index.store(next_index, Ordering::Release);
1641 }
1642
1643 // Copy values from the injector into the destination queue.
1644 match dest.flavor {
1645 Flavor::Fifo => {
1646 for i in 0..batch_size {
1647 // Read the task.
1648 let slot = (*block).slots.get_unchecked(offset + i);
1649 slot.wait_write();
1650 let task = slot.task.get().read();
1651
1652 // Write it into the destination queue.
1653 dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1654 }
1655 }
1656
1657 Flavor::Lifo => {
1658 for i in 0..batch_size {
1659 // Read the task.
1660 let slot = (*block).slots.get_unchecked(offset + i);
1661 slot.wait_write();
1662 let task = slot.task.get().read();
1663
1664 // Write it into the destination queue.
1665 dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1666 }
1667 }
1668 }
1669
1670 atomic::fence(Ordering::Release);
1671
1672 // Update the back index in the destination queue.
1673 //
1674 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1675 // data races because it doesn't understand fences.
1676 dest.inner
1677 .back
1678 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1679
1680 // Destroy the block if we've reached the end, or if another thread wanted to destroy
1681 // but couldn't because we were busy reading from the slot.
1682 if new_offset == BLOCK_CAP {
1683 Block::destroy(block, offset);
1684 } else {
1685 for i in offset..new_offset {
1686 let slot = (*block).slots.get_unchecked(i);
1687
1688 if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1689 Block::destroy(block, offset);
1690 break;
1691 }
1692 }
1693 }
1694
1695 Steal::Success(())
1696 }
1697 }
1698
1699 /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker.
1700 ///
1701 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1702 /// steal around half of the tasks in the queue, but also not more than some constant limit.
1703 ///
1704 /// # Examples
1705 ///
1706 /// ```
1707 /// use crossbeam_deque::{Injector, Steal, Worker};
1708 ///
1709 /// let q = Injector::new();
1710 /// q.push(1);
1711 /// q.push(2);
1712 /// q.push(3);
1713 /// q.push(4);
1714 ///
1715 /// let w = Worker::new_fifo();
1716 /// assert_eq!(q.steal_batch_and_pop(&w), Steal::Success(1));
1717 /// assert_eq!(w.pop(), Some(2));
1718 /// ```
1719 pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
1720 // TODO: we use `MAX_BATCH + 1` as the hard limit for Injecter as the performance is slightly
1721 // better, but we may change it in the future to be compatible with the same method in Stealer.
1722 self.steal_batch_with_limit_and_pop(dest, MAX_BATCH + 1)
1723 }
1724
1725 /// Steals no more than `limit` of tasks, pushes them into a worker, and pops a task from that worker.
1726 ///
1727 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1728 /// steal around half of the tasks in the queue, but also not more than the given limit.
1729 ///
1730 /// # Examples
1731 ///
1732 /// ```
1733 /// use crossbeam_deque::{Injector, Steal, Worker};
1734 ///
1735 /// let q = Injector::new();
1736 /// q.push(1);
1737 /// q.push(2);
1738 /// q.push(3);
1739 /// q.push(4);
1740 /// q.push(5);
1741 /// q.push(6);
1742 ///
1743 /// let w = Worker::new_fifo();
1744 /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, 2), Steal::Success(1));
1745 /// assert_eq!(w.pop(), Some(2));
1746 /// assert_eq!(w.pop(), None);
1747 ///
1748 /// q.push(7);
1749 /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1750 /// // half of the elements are currently popped, but the number of popped elements is considered
1751 /// // an implementation detail that may be changed in the future.
1752 /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, std::usize::MAX), Steal::Success(3));
1753 /// assert_eq!(w.pop(), Some(4));
1754 /// assert_eq!(w.pop(), Some(5));
1755 /// assert_eq!(w.pop(), None);
1756 /// ```
1757 pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
1758 assert!(limit > 0);
1759 let mut head;
1760 let mut block;
1761 let mut offset;
1762
1763 let backoff = Backoff::new();
1764 loop {
1765 head = self.head.index.load(Ordering::Acquire);
1766 block = self.head.block.load(Ordering::Acquire);
1767
1768 // Calculate the offset of the index into the block.
1769 offset = (head >> SHIFT) % LAP;
1770
1771 // If we reached the end of the block, wait until the next one is installed.
1772 if offset == BLOCK_CAP {
1773 backoff.snooze();
1774 } else {
1775 break;
1776 }
1777 }
1778
1779 let mut new_head = head;
1780 let advance;
1781
1782 if new_head & HAS_NEXT == 0 {
1783 atomic::fence(Ordering::SeqCst);
1784 let tail = self.tail.index.load(Ordering::Relaxed);
1785
1786 // If the tail equals the head, that means the queue is empty.
1787 if head >> SHIFT == tail >> SHIFT {
1788 return Steal::Empty;
1789 }
1790
1791 // If head and tail are not in the same block, set `HAS_NEXT` in head.
1792 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1793 new_head |= HAS_NEXT;
1794 // We can steal all tasks till the end of the block.
1795 advance = (BLOCK_CAP - offset).min(limit);
1796 } else {
1797 let len = (tail - head) >> SHIFT;
1798 // Steal half of the available tasks.
1799 advance = ((len + 1) / 2).min(limit);
1800 }
1801 } else {
1802 // We can steal all tasks till the end of the block.
1803 advance = (BLOCK_CAP - offset).min(limit);
1804 }
1805
1806 new_head += advance << SHIFT;
1807 let new_offset = offset + advance;
1808
1809 // Try moving the head index forward.
1810 if self
1811 .head
1812 .index
1813 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1814 .is_err()
1815 {
1816 return Steal::Retry;
1817 }
1818
1819 // Reserve capacity for the stolen batch.
1820 let batch_size = new_offset - offset - 1;
1821 dest.reserve(batch_size);
1822
1823 // Get the destination buffer and back index.
1824 let dest_buffer = dest.buffer.get();
1825 let dest_b = dest.inner.back.load(Ordering::Relaxed);
1826
1827 unsafe {
1828 // If we've reached the end of the block, move to the next one.
1829 if new_offset == BLOCK_CAP {
1830 let next = (*block).wait_next();
1831 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1832 if !(*next).next.load(Ordering::Relaxed).is_null() {
1833 next_index |= HAS_NEXT;
1834 }
1835
1836 self.head.block.store(next, Ordering::Release);
1837 self.head.index.store(next_index, Ordering::Release);
1838 }
1839
1840 // Read the task.
1841 let slot = (*block).slots.get_unchecked(offset);
1842 slot.wait_write();
1843 let task = slot.task.get().read();
1844
1845 match dest.flavor {
1846 Flavor::Fifo => {
1847 // Copy values from the injector into the destination queue.
1848 for i in 0..batch_size {
1849 // Read the task.
1850 let slot = (*block).slots.get_unchecked(offset + i + 1);
1851 slot.wait_write();
1852 let task = slot.task.get().read();
1853
1854 // Write it into the destination queue.
1855 dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1856 }
1857 }
1858
1859 Flavor::Lifo => {
1860 // Copy values from the injector into the destination queue.
1861 for i in 0..batch_size {
1862 // Read the task.
1863 let slot = (*block).slots.get_unchecked(offset + i + 1);
1864 slot.wait_write();
1865 let task = slot.task.get().read();
1866
1867 // Write it into the destination queue.
1868 dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1869 }
1870 }
1871 }
1872
1873 atomic::fence(Ordering::Release);
1874
1875 // Update the back index in the destination queue.
1876 //
1877 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1878 // data races because it doesn't understand fences.
1879 dest.inner
1880 .back
1881 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1882
1883 // Destroy the block if we've reached the end, or if another thread wanted to destroy
1884 // but couldn't because we were busy reading from the slot.
1885 if new_offset == BLOCK_CAP {
1886 Block::destroy(block, offset);
1887 } else {
1888 for i in offset..new_offset {
1889 let slot = (*block).slots.get_unchecked(i);
1890
1891 if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1892 Block::destroy(block, offset);
1893 break;
1894 }
1895 }
1896 }
1897
1898 Steal::Success(task.assume_init())
1899 }
1900 }
1901
1902 /// Returns `true` if the queue is empty.
1903 ///
1904 /// # Examples
1905 ///
1906 /// ```
1907 /// use crossbeam_deque::Injector;
1908 ///
1909 /// let q = Injector::new();
1910 ///
1911 /// assert!(q.is_empty());
1912 /// q.push(1);
1913 /// assert!(!q.is_empty());
1914 /// ```
1915 pub fn is_empty(&self) -> bool {
1916 let head = self.head.index.load(Ordering::SeqCst);
1917 let tail = self.tail.index.load(Ordering::SeqCst);
1918 head >> SHIFT == tail >> SHIFT
1919 }
1920
1921 /// Returns the number of tasks in the queue.
1922 ///
1923 /// # Examples
1924 ///
1925 /// ```
1926 /// use crossbeam_deque::Injector;
1927 ///
1928 /// let q = Injector::new();
1929 ///
1930 /// assert_eq!(q.len(), 0);
1931 /// q.push(1);
1932 /// assert_eq!(q.len(), 1);
1933 /// q.push(1);
1934 /// assert_eq!(q.len(), 2);
1935 /// ```
1936 pub fn len(&self) -> usize {
1937 loop {
1938 // Load the tail index, then load the head index.
1939 let mut tail = self.tail.index.load(Ordering::SeqCst);
1940 let mut head = self.head.index.load(Ordering::SeqCst);
1941
1942 // If the tail index didn't change, we've got consistent indices to work with.
1943 if self.tail.index.load(Ordering::SeqCst) == tail {
1944 // Erase the lower bits.
1945 tail &= !((1 << SHIFT) - 1);
1946 head &= !((1 << SHIFT) - 1);
1947
1948 // Fix up indices if they fall onto block ends.
1949 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
1950 tail = tail.wrapping_add(1 << SHIFT);
1951 }
1952 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
1953 head = head.wrapping_add(1 << SHIFT);
1954 }
1955
1956 // Rotate indices so that head falls into the first block.
1957 let lap = (head >> SHIFT) / LAP;
1958 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
1959 head = head.wrapping_sub((lap * LAP) << SHIFT);
1960
1961 // Remove the lower bits.
1962 tail >>= SHIFT;
1963 head >>= SHIFT;
1964
1965 // Return the difference minus the number of blocks between tail and head.
1966 return tail - head - tail / LAP;
1967 }
1968 }
1969 }
1970}
1971
1972impl<T> Drop for Injector<T> {
1973 fn drop(&mut self) {
1974 let mut head = *self.head.index.get_mut();
1975 let mut tail = *self.tail.index.get_mut();
1976 let mut block = *self.head.block.get_mut();
1977
1978 // Erase the lower bits.
1979 head &= !((1 << SHIFT) - 1);
1980 tail &= !((1 << SHIFT) - 1);
1981
1982 unsafe {
1983 // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
1984 while head != tail {
1985 let offset = (head >> SHIFT) % LAP;
1986
1987 if offset < BLOCK_CAP {
1988 // Drop the task in the slot.
1989 let slot = (*block).slots.get_unchecked(offset);
1990 let p = &mut *slot.task.get();
1991 p.as_mut_ptr().drop_in_place();
1992 } else {
1993 // Deallocate the block and move to the next one.
1994 let next = *(*block).next.get_mut();
1995 drop(Box::from_raw(block));
1996 block = next;
1997 }
1998
1999 head = head.wrapping_add(1 << SHIFT);
2000 }
2001
2002 // Deallocate the last remaining block.
2003 drop(Box::from_raw(block));
2004 }
2005 }
2006}
2007
2008impl<T> fmt::Debug for Injector<T> {
2009 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2010 f.pad("Worker { .. }")
2011 }
2012}
2013
2014/// Possible outcomes of a steal operation.
2015///
2016/// # Examples
2017///
2018/// There are lots of ways to chain results of steal operations together:
2019///
2020/// ```
2021/// use crossbeam_deque::Steal::{self, Empty, Retry, Success};
2022///
2023/// let collect = |v: Vec<Steal<i32>>| v.into_iter().collect::<Steal<i32>>();
2024///
2025/// assert_eq!(collect(vec![Empty, Empty, Empty]), Empty);
2026/// assert_eq!(collect(vec![Empty, Retry, Empty]), Retry);
2027/// assert_eq!(collect(vec![Retry, Success(1), Empty]), Success(1));
2028///
2029/// assert_eq!(collect(vec![Empty, Empty]).or_else(|| Retry), Retry);
2030/// assert_eq!(collect(vec![Retry, Empty]).or_else(|| Success(1)), Success(1));
2031/// ```
2032#[must_use]
2033#[derive(PartialEq, Eq, Copy, Clone)]
2034pub enum Steal<T> {
2035 /// The queue was empty at the time of stealing.
2036 Empty,
2037
2038 /// At least one task was successfully stolen.
2039 Success(T),
2040
2041 /// The steal operation needs to be retried.
2042 Retry,
2043}
2044
2045impl<T> Steal<T> {
2046 /// Returns `true` if the queue was empty at the time of stealing.
2047 ///
2048 /// # Examples
2049 ///
2050 /// ```
2051 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2052 ///
2053 /// assert!(!Success(7).is_empty());
2054 /// assert!(!Retry::<i32>.is_empty());
2055 ///
2056 /// assert!(Empty::<i32>.is_empty());
2057 /// ```
2058 pub fn is_empty(&self) -> bool {
2059 match self {
2060 Steal::Empty => true,
2061 _ => false,
2062 }
2063 }
2064
2065 /// Returns `true` if at least one task was stolen.
2066 ///
2067 /// # Examples
2068 ///
2069 /// ```
2070 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2071 ///
2072 /// assert!(!Empty::<i32>.is_success());
2073 /// assert!(!Retry::<i32>.is_success());
2074 ///
2075 /// assert!(Success(7).is_success());
2076 /// ```
2077 pub fn is_success(&self) -> bool {
2078 match self {
2079 Steal::Success(_) => true,
2080 _ => false,
2081 }
2082 }
2083
2084 /// Returns `true` if the steal operation needs to be retried.
2085 ///
2086 /// # Examples
2087 ///
2088 /// ```
2089 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2090 ///
2091 /// assert!(!Empty::<i32>.is_retry());
2092 /// assert!(!Success(7).is_retry());
2093 ///
2094 /// assert!(Retry::<i32>.is_retry());
2095 /// ```
2096 pub fn is_retry(&self) -> bool {
2097 match self {
2098 Steal::Retry => true,
2099 _ => false,
2100 }
2101 }
2102
2103 /// Returns the result of the operation, if successful.
2104 ///
2105 /// # Examples
2106 ///
2107 /// ```
2108 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2109 ///
2110 /// assert_eq!(Empty::<i32>.success(), None);
2111 /// assert_eq!(Retry::<i32>.success(), None);
2112 ///
2113 /// assert_eq!(Success(7).success(), Some(7));
2114 /// ```
2115 pub fn success(self) -> Option<T> {
2116 match self {
2117 Steal::Success(res) => Some(res),
2118 _ => None,
2119 }
2120 }
2121
2122 /// If no task was stolen, attempts another steal operation.
2123 ///
2124 /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then:
2125 ///
2126 /// * If the second steal resulted in `Success`, it is returned.
2127 /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned.
2128 /// * If both resulted in `None`, then `None` is returned.
2129 ///
2130 /// # Examples
2131 ///
2132 /// ```
2133 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2134 ///
2135 /// assert_eq!(Success(1).or_else(|| Success(2)), Success(1));
2136 /// assert_eq!(Retry.or_else(|| Success(2)), Success(2));
2137 ///
2138 /// assert_eq!(Retry.or_else(|| Empty), Retry::<i32>);
2139 /// assert_eq!(Empty.or_else(|| Retry), Retry::<i32>);
2140 ///
2141 /// assert_eq!(Empty.or_else(|| Empty), Empty::<i32>);
2142 /// ```
2143 pub fn or_else<F>(self, f: F) -> Steal<T>
2144 where
2145 F: FnOnce() -> Steal<T>,
2146 {
2147 match self {
2148 Steal::Empty => f(),
2149 Steal::Success(_) => self,
2150 Steal::Retry => {
2151 if let Steal::Success(res) = f() {
2152 Steal::Success(res)
2153 } else {
2154 Steal::Retry
2155 }
2156 }
2157 }
2158 }
2159}
2160
2161impl<T> fmt::Debug for Steal<T> {
2162 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2163 match self {
2164 Steal::Empty => f.pad("Empty"),
2165 Steal::Success(_) => f.pad("Success(..)"),
2166 Steal::Retry => f.pad("Retry"),
2167 }
2168 }
2169}
2170
2171impl<T> FromIterator<Steal<T>> for Steal<T> {
2172 /// Consumes items until a `Success` is found and returns it.
2173 ///
2174 /// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`.
2175 /// Otherwise, `Empty` is returned.
2176 fn from_iter<I>(iter: I) -> Steal<T>
2177 where
2178 I: IntoIterator<Item = Steal<T>>,
2179 {
2180 let mut retry = false;
2181 for s in iter {
2182 match &s {
2183 Steal::Empty => {}
2184 Steal::Success(_) => return s,
2185 Steal::Retry => retry = true,
2186 }
2187 }
2188
2189 if retry {
2190 Steal::Retry
2191 } else {
2192 Steal::Empty
2193 }
2194 }
2195}
2196