1use std::cell::{Cell, UnsafeCell};
2use std::cmp;
3use std::fmt;
4use std::iter::FromIterator;
5use std::marker::PhantomData;
6use std::mem::{self, ManuallyDrop, MaybeUninit};
7use std::ptr;
8use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
9use std::sync::Arc;
10
11use crate::epoch::{self, Atomic, Owned};
12use crate::utils::{Backoff, CachePadded};
13
14// Minimum buffer capacity.
15const MIN_CAP: usize = 64;
16// Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`.
17const MAX_BATCH: usize = 32;
18// If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets
19// deallocated as soon as possible.
20const FLUSH_THRESHOLD_BYTES: usize = 1 << 10;
21
22/// A buffer that holds tasks in a worker queue.
23///
24/// This is just a pointer to the buffer and its length - dropping an instance of this struct will
25/// *not* deallocate the buffer.
26struct Buffer<T> {
27 /// Pointer to the allocated memory.
28 ptr: *mut T,
29
30 /// Capacity of the buffer. Always a power of two.
31 cap: usize,
32}
33
34unsafe impl<T> Send for Buffer<T> {}
35
36impl<T> Buffer<T> {
37 /// Allocates a new buffer with the specified capacity.
38 fn alloc(cap: usize) -> Buffer<T> {
39 debug_assert_eq!(cap, cap.next_power_of_two());
40
41 let mut v = ManuallyDrop::new(Vec::with_capacity(cap));
42 let ptr = v.as_mut_ptr();
43
44 Buffer { ptr, cap }
45 }
46
47 /// Deallocates the buffer.
48 unsafe fn dealloc(self) {
49 drop(Vec::from_raw_parts(self.ptr, 0, self.cap));
50 }
51
52 /// Returns a pointer to the task at the specified `index`.
53 unsafe fn at(&self, index: isize) -> *mut T {
54 // `self.cap` is always a power of two.
55 // We do all the loads at `MaybeUninit` because we might realize, after loading, that we
56 // don't actually have the right to access this memory.
57 self.ptr.offset(index & (self.cap - 1) as isize)
58 }
59
60 /// Writes `task` into the specified `index`.
61 ///
62 /// This method might be concurrently called with another `read` at the same index, which is
63 /// technically speaking a data race and therefore UB. We should use an atomic store here, but
64 /// that would be more expensive and difficult to implement generically for all types `T`.
65 /// Hence, as a hack, we use a volatile write instead.
66 unsafe fn write(&self, index: isize, task: MaybeUninit<T>) {
67 ptr::write_volatile(self.at(index).cast::<MaybeUninit<T>>(), task)
68 }
69
70 /// Reads a task from the specified `index`.
71 ///
72 /// This method might be concurrently called with another `write` at the same index, which is
73 /// technically speaking a data race and therefore UB. We should use an atomic load here, but
74 /// that would be more expensive and difficult to implement generically for all types `T`.
75 /// Hence, as a hack, we use a volatile load instead.
76 unsafe fn read(&self, index: isize) -> MaybeUninit<T> {
77 ptr::read_volatile(self.at(index).cast::<MaybeUninit<T>>())
78 }
79}
80
81impl<T> Clone for Buffer<T> {
82 fn clone(&self) -> Buffer<T> {
83 Buffer {
84 ptr: self.ptr,
85 cap: self.cap,
86 }
87 }
88}
89
90impl<T> Copy for Buffer<T> {}
91
92/// Internal queue data shared between the worker and stealers.
93///
94/// The implementation is based on the following work:
95///
96/// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev]
97/// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models.
98/// PPoPP 2013.][weak-mem]
99/// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++
100/// atomics. OOPSLA 2013.][checker]
101///
102/// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974
103/// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524
104/// [checker]: https://dl.acm.org/citation.cfm?id=2509514
105struct Inner<T> {
106 /// The front index.
107 front: AtomicIsize,
108
109 /// The back index.
110 back: AtomicIsize,
111
112 /// The underlying buffer.
113 buffer: CachePadded<Atomic<Buffer<T>>>,
114}
115
116impl<T> Drop for Inner<T> {
117 fn drop(&mut self) {
118 // Load the back index, front index, and buffer.
119 let b: isize = *self.back.get_mut();
120 let f: isize = *self.front.get_mut();
121
122 unsafe {
123 let buffer: Shared<'_, Buffer> = self.buffer.load(ord:Ordering::Relaxed, epoch::unprotected());
124
125 // Go through the buffer from front to back and drop all tasks in the queue.
126 let mut i: isize = f;
127 while i != b {
128 buffer.deref().at(index:i).drop_in_place();
129 i = i.wrapping_add(1);
130 }
131
132 // Free the memory allocated by the buffer.
133 buffer.into_owned().into_box().dealloc();
134 }
135 }
136}
137
138/// Worker queue flavor: FIFO or LIFO.
139#[derive(Clone, Copy, Debug, Eq, PartialEq)]
140enum Flavor {
141 /// The first-in first-out flavor.
142 Fifo,
143
144 /// The last-in first-out flavor.
145 Lifo,
146}
147
148/// A worker queue.
149///
150/// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal
151/// tasks from it. Task schedulers typically create a single worker queue per thread.
152///
153/// # Examples
154///
155/// A FIFO worker:
156///
157/// ```
158/// use crossbeam_deque::{Steal, Worker};
159///
160/// let w = Worker::new_fifo();
161/// let s = w.stealer();
162///
163/// w.push(1);
164/// w.push(2);
165/// w.push(3);
166///
167/// assert_eq!(s.steal(), Steal::Success(1));
168/// assert_eq!(w.pop(), Some(2));
169/// assert_eq!(w.pop(), Some(3));
170/// ```
171///
172/// A LIFO worker:
173///
174/// ```
175/// use crossbeam_deque::{Steal, Worker};
176///
177/// let w = Worker::new_lifo();
178/// let s = w.stealer();
179///
180/// w.push(1);
181/// w.push(2);
182/// w.push(3);
183///
184/// assert_eq!(s.steal(), Steal::Success(1));
185/// assert_eq!(w.pop(), Some(3));
186/// assert_eq!(w.pop(), Some(2));
187/// ```
188pub struct Worker<T> {
189 /// A reference to the inner representation of the queue.
190 inner: Arc<CachePadded<Inner<T>>>,
191
192 /// A copy of `inner.buffer` for quick access.
193 buffer: Cell<Buffer<T>>,
194
195 /// The flavor of the queue.
196 flavor: Flavor,
197
198 /// Indicates that the worker cannot be shared among threads.
199 _marker: PhantomData<*mut ()>, // !Send + !Sync
200}
201
202unsafe impl<T: Send> Send for Worker<T> {}
203
204impl<T> Worker<T> {
205 /// Creates a FIFO worker queue.
206 ///
207 /// Tasks are pushed and popped from opposite ends.
208 ///
209 /// # Examples
210 ///
211 /// ```
212 /// use crossbeam_deque::Worker;
213 ///
214 /// let w = Worker::<i32>::new_fifo();
215 /// ```
216 pub fn new_fifo() -> Worker<T> {
217 let buffer = Buffer::alloc(MIN_CAP);
218
219 let inner = Arc::new(CachePadded::new(Inner {
220 front: AtomicIsize::new(0),
221 back: AtomicIsize::new(0),
222 buffer: CachePadded::new(Atomic::new(buffer)),
223 }));
224
225 Worker {
226 inner,
227 buffer: Cell::new(buffer),
228 flavor: Flavor::Fifo,
229 _marker: PhantomData,
230 }
231 }
232
233 /// Creates a LIFO worker queue.
234 ///
235 /// Tasks are pushed and popped from the same end.
236 ///
237 /// # Examples
238 ///
239 /// ```
240 /// use crossbeam_deque::Worker;
241 ///
242 /// let w = Worker::<i32>::new_lifo();
243 /// ```
244 pub fn new_lifo() -> Worker<T> {
245 let buffer = Buffer::alloc(MIN_CAP);
246
247 let inner = Arc::new(CachePadded::new(Inner {
248 front: AtomicIsize::new(0),
249 back: AtomicIsize::new(0),
250 buffer: CachePadded::new(Atomic::new(buffer)),
251 }));
252
253 Worker {
254 inner,
255 buffer: Cell::new(buffer),
256 flavor: Flavor::Lifo,
257 _marker: PhantomData,
258 }
259 }
260
261 /// Creates a stealer for this queue.
262 ///
263 /// The returned stealer can be shared among threads and cloned.
264 ///
265 /// # Examples
266 ///
267 /// ```
268 /// use crossbeam_deque::Worker;
269 ///
270 /// let w = Worker::<i32>::new_lifo();
271 /// let s = w.stealer();
272 /// ```
273 pub fn stealer(&self) -> Stealer<T> {
274 Stealer {
275 inner: self.inner.clone(),
276 flavor: self.flavor,
277 }
278 }
279
280 /// Resizes the internal buffer to the new capacity of `new_cap`.
281 #[cold]
282 unsafe fn resize(&self, new_cap: usize) {
283 // Load the back index, front index, and buffer.
284 let b = self.inner.back.load(Ordering::Relaxed);
285 let f = self.inner.front.load(Ordering::Relaxed);
286 let buffer = self.buffer.get();
287
288 // Allocate a new buffer and copy data from the old buffer to the new one.
289 let new = Buffer::alloc(new_cap);
290 let mut i = f;
291 while i != b {
292 ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1);
293 i = i.wrapping_add(1);
294 }
295
296 let guard = &epoch::pin();
297
298 // Replace the old buffer with the new one.
299 self.buffer.replace(new);
300 let old =
301 self.inner
302 .buffer
303 .swap(Owned::new(new).into_shared(guard), Ordering::Release, guard);
304
305 // Destroy the old buffer later.
306 guard.defer_unchecked(move || old.into_owned().into_box().dealloc());
307
308 // If the buffer is very large, then flush the thread-local garbage in order to deallocate
309 // it as soon as possible.
310 if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES {
311 guard.flush();
312 }
313 }
314
315 /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the
316 /// buffer.
317 fn reserve(&self, reserve_cap: usize) {
318 if reserve_cap > 0 {
319 // Compute the current length.
320 let b = self.inner.back.load(Ordering::Relaxed);
321 let f = self.inner.front.load(Ordering::SeqCst);
322 let len = b.wrapping_sub(f) as usize;
323
324 // The current capacity.
325 let cap = self.buffer.get().cap;
326
327 // Is there enough capacity to push `reserve_cap` tasks?
328 if cap - len < reserve_cap {
329 // Keep doubling the capacity as much as is needed.
330 let mut new_cap = cap * 2;
331 while new_cap - len < reserve_cap {
332 new_cap *= 2;
333 }
334
335 // Resize the buffer.
336 unsafe {
337 self.resize(new_cap);
338 }
339 }
340 }
341 }
342
343 /// Returns `true` if the queue is empty.
344 ///
345 /// ```
346 /// use crossbeam_deque::Worker;
347 ///
348 /// let w = Worker::new_lifo();
349 ///
350 /// assert!(w.is_empty());
351 /// w.push(1);
352 /// assert!(!w.is_empty());
353 /// ```
354 pub fn is_empty(&self) -> bool {
355 let b = self.inner.back.load(Ordering::Relaxed);
356 let f = self.inner.front.load(Ordering::SeqCst);
357 b.wrapping_sub(f) <= 0
358 }
359
360 /// Returns the number of tasks in the deque.
361 ///
362 /// ```
363 /// use crossbeam_deque::Worker;
364 ///
365 /// let w = Worker::new_lifo();
366 ///
367 /// assert_eq!(w.len(), 0);
368 /// w.push(1);
369 /// assert_eq!(w.len(), 1);
370 /// w.push(1);
371 /// assert_eq!(w.len(), 2);
372 /// ```
373 pub fn len(&self) -> usize {
374 let b = self.inner.back.load(Ordering::Relaxed);
375 let f = self.inner.front.load(Ordering::SeqCst);
376 b.wrapping_sub(f).max(0) as usize
377 }
378
379 /// Pushes a task into the queue.
380 ///
381 /// # Examples
382 ///
383 /// ```
384 /// use crossbeam_deque::Worker;
385 ///
386 /// let w = Worker::new_lifo();
387 /// w.push(1);
388 /// w.push(2);
389 /// ```
390 pub fn push(&self, task: T) {
391 // Load the back index, front index, and buffer.
392 let b = self.inner.back.load(Ordering::Relaxed);
393 let f = self.inner.front.load(Ordering::Acquire);
394 let mut buffer = self.buffer.get();
395
396 // Calculate the length of the queue.
397 let len = b.wrapping_sub(f);
398
399 // Is the queue full?
400 if len >= buffer.cap as isize {
401 // Yes. Grow the underlying buffer.
402 unsafe {
403 self.resize(2 * buffer.cap);
404 }
405 buffer = self.buffer.get();
406 }
407
408 // Write `task` into the slot.
409 unsafe {
410 buffer.write(b, MaybeUninit::new(task));
411 }
412
413 atomic::fence(Ordering::Release);
414
415 // Increment the back index.
416 //
417 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
418 // races because it doesn't understand fences.
419 self.inner.back.store(b.wrapping_add(1), Ordering::Release);
420 }
421
422 /// Pops a task from the queue.
423 ///
424 /// # Examples
425 ///
426 /// ```
427 /// use crossbeam_deque::Worker;
428 ///
429 /// let w = Worker::new_fifo();
430 /// w.push(1);
431 /// w.push(2);
432 ///
433 /// assert_eq!(w.pop(), Some(1));
434 /// assert_eq!(w.pop(), Some(2));
435 /// assert_eq!(w.pop(), None);
436 /// ```
437 pub fn pop(&self) -> Option<T> {
438 // Load the back and front index.
439 let b = self.inner.back.load(Ordering::Relaxed);
440 let f = self.inner.front.load(Ordering::Relaxed);
441
442 // Calculate the length of the queue.
443 let len = b.wrapping_sub(f);
444
445 // Is the queue empty?
446 if len <= 0 {
447 return None;
448 }
449
450 match self.flavor {
451 // Pop from the front of the queue.
452 Flavor::Fifo => {
453 // Try incrementing the front index to pop the task.
454 let f = self.inner.front.fetch_add(1, Ordering::SeqCst);
455 let new_f = f.wrapping_add(1);
456
457 if b.wrapping_sub(new_f) < 0 {
458 self.inner.front.store(f, Ordering::Relaxed);
459 return None;
460 }
461
462 unsafe {
463 // Read the popped task.
464 let buffer = self.buffer.get();
465 let task = buffer.read(f).assume_init();
466
467 // Shrink the buffer if `len - 1` is less than one fourth of the capacity.
468 if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 {
469 self.resize(buffer.cap / 2);
470 }
471
472 Some(task)
473 }
474 }
475
476 // Pop from the back of the queue.
477 Flavor::Lifo => {
478 // Decrement the back index.
479 let b = b.wrapping_sub(1);
480 self.inner.back.store(b, Ordering::Relaxed);
481
482 atomic::fence(Ordering::SeqCst);
483
484 // Load the front index.
485 let f = self.inner.front.load(Ordering::Relaxed);
486
487 // Compute the length after the back index was decremented.
488 let len = b.wrapping_sub(f);
489
490 if len < 0 {
491 // The queue is empty. Restore the back index to the original task.
492 self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
493 None
494 } else {
495 // Read the task to be popped.
496 let buffer = self.buffer.get();
497 let mut task = unsafe { Some(buffer.read(b)) };
498
499 // Are we popping the last task from the queue?
500 if len == 0 {
501 // Try incrementing the front index.
502 if self
503 .inner
504 .front
505 .compare_exchange(
506 f,
507 f.wrapping_add(1),
508 Ordering::SeqCst,
509 Ordering::Relaxed,
510 )
511 .is_err()
512 {
513 // Failed. We didn't pop anything. Reset to `None`.
514 task.take();
515 }
516
517 // Restore the back index to the original task.
518 self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
519 } else {
520 // Shrink the buffer if `len` is less than one fourth of the capacity.
521 if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 {
522 unsafe {
523 self.resize(buffer.cap / 2);
524 }
525 }
526 }
527
528 task.map(|t| unsafe { t.assume_init() })
529 }
530 }
531 }
532 }
533}
534
535impl<T> fmt::Debug for Worker<T> {
536 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
537 f.pad("Worker { .. }")
538 }
539}
540
541/// A stealer handle of a worker queue.
542///
543/// Stealers can be shared among threads.
544///
545/// Task schedulers typically have a single worker queue per worker thread.
546///
547/// # Examples
548///
549/// ```
550/// use crossbeam_deque::{Steal, Worker};
551///
552/// let w = Worker::new_lifo();
553/// w.push(1);
554/// w.push(2);
555///
556/// let s = w.stealer();
557/// assert_eq!(s.steal(), Steal::Success(1));
558/// assert_eq!(s.steal(), Steal::Success(2));
559/// assert_eq!(s.steal(), Steal::Empty);
560/// ```
561pub struct Stealer<T> {
562 /// A reference to the inner representation of the queue.
563 inner: Arc<CachePadded<Inner<T>>>,
564
565 /// The flavor of the queue.
566 flavor: Flavor,
567}
568
569unsafe impl<T: Send> Send for Stealer<T> {}
570unsafe impl<T: Send> Sync for Stealer<T> {}
571
572impl<T> Stealer<T> {
573 /// Returns `true` if the queue is empty.
574 ///
575 /// ```
576 /// use crossbeam_deque::Worker;
577 ///
578 /// let w = Worker::new_lifo();
579 /// let s = w.stealer();
580 ///
581 /// assert!(s.is_empty());
582 /// w.push(1);
583 /// assert!(!s.is_empty());
584 /// ```
585 pub fn is_empty(&self) -> bool {
586 let f = self.inner.front.load(Ordering::Acquire);
587 atomic::fence(Ordering::SeqCst);
588 let b = self.inner.back.load(Ordering::Acquire);
589 b.wrapping_sub(f) <= 0
590 }
591
592 /// Returns the number of tasks in the deque.
593 ///
594 /// ```
595 /// use crossbeam_deque::Worker;
596 ///
597 /// let w = Worker::new_lifo();
598 /// let s = w.stealer();
599 ///
600 /// assert_eq!(s.len(), 0);
601 /// w.push(1);
602 /// assert_eq!(s.len(), 1);
603 /// w.push(2);
604 /// assert_eq!(s.len(), 2);
605 /// ```
606 pub fn len(&self) -> usize {
607 let f = self.inner.front.load(Ordering::Acquire);
608 atomic::fence(Ordering::SeqCst);
609 let b = self.inner.back.load(Ordering::Acquire);
610 b.wrapping_sub(f).max(0) as usize
611 }
612
613 /// Steals a task from the queue.
614 ///
615 /// # Examples
616 ///
617 /// ```
618 /// use crossbeam_deque::{Steal, Worker};
619 ///
620 /// let w = Worker::new_lifo();
621 /// w.push(1);
622 /// w.push(2);
623 ///
624 /// let s = w.stealer();
625 /// assert_eq!(s.steal(), Steal::Success(1));
626 /// assert_eq!(s.steal(), Steal::Success(2));
627 /// ```
628 pub fn steal(&self) -> Steal<T> {
629 // Load the front index.
630 let f = self.inner.front.load(Ordering::Acquire);
631
632 // A SeqCst fence is needed here.
633 //
634 // If the current thread is already pinned (reentrantly), we must manually issue the
635 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
636 // have to.
637 if epoch::is_pinned() {
638 atomic::fence(Ordering::SeqCst);
639 }
640
641 let guard = &epoch::pin();
642
643 // Load the back index.
644 let b = self.inner.back.load(Ordering::Acquire);
645
646 // Is the queue empty?
647 if b.wrapping_sub(f) <= 0 {
648 return Steal::Empty;
649 }
650
651 // Load the buffer and read the task at the front.
652 let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
653 let task = unsafe { buffer.deref().read(f) };
654
655 // Try incrementing the front index to steal the task.
656 // If the buffer has been swapped or the increment fails, we retry.
657 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
658 || self
659 .inner
660 .front
661 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
662 .is_err()
663 {
664 // We didn't steal this task, forget it.
665 return Steal::Retry;
666 }
667
668 // Return the stolen task.
669 Steal::Success(unsafe { task.assume_init() })
670 }
671
672 /// Steals a batch of tasks and pushes them into another worker.
673 ///
674 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
675 /// steal around half of the tasks in the queue, but also not more than some constant limit.
676 ///
677 /// # Examples
678 ///
679 /// ```
680 /// use crossbeam_deque::Worker;
681 ///
682 /// let w1 = Worker::new_fifo();
683 /// w1.push(1);
684 /// w1.push(2);
685 /// w1.push(3);
686 /// w1.push(4);
687 ///
688 /// let s = w1.stealer();
689 /// let w2 = Worker::new_fifo();
690 ///
691 /// let _ = s.steal_batch(&w2);
692 /// assert_eq!(w2.pop(), Some(1));
693 /// assert_eq!(w2.pop(), Some(2));
694 /// ```
695 pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
696 self.steal_batch_with_limit(dest, MAX_BATCH)
697 }
698
699 /// Steals no more than `limit` of tasks and pushes them into another worker.
700 ///
701 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
702 /// steal around half of the tasks in the queue, but also not more than the given limit.
703 ///
704 /// # Examples
705 ///
706 /// ```
707 /// use crossbeam_deque::Worker;
708 ///
709 /// let w1 = Worker::new_fifo();
710 /// w1.push(1);
711 /// w1.push(2);
712 /// w1.push(3);
713 /// w1.push(4);
714 /// w1.push(5);
715 /// w1.push(6);
716 ///
717 /// let s = w1.stealer();
718 /// let w2 = Worker::new_fifo();
719 ///
720 /// let _ = s.steal_batch_with_limit(&w2, 2);
721 /// assert_eq!(w2.pop(), Some(1));
722 /// assert_eq!(w2.pop(), Some(2));
723 /// assert_eq!(w2.pop(), None);
724 ///
725 /// w1.push(7);
726 /// w1.push(8);
727 /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
728 /// // half of the elements are currently popped, but the number of popped elements is considered
729 /// // an implementation detail that may be changed in the future.
730 /// let _ = s.steal_batch_with_limit(&w2, std::usize::MAX);
731 /// assert_eq!(w2.len(), 3);
732 /// ```
733 pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
734 assert!(limit > 0);
735 if Arc::ptr_eq(&self.inner, &dest.inner) {
736 if dest.is_empty() {
737 return Steal::Empty;
738 } else {
739 return Steal::Success(());
740 }
741 }
742
743 // Load the front index.
744 let mut f = self.inner.front.load(Ordering::Acquire);
745
746 // A SeqCst fence is needed here.
747 //
748 // If the current thread is already pinned (reentrantly), we must manually issue the
749 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
750 // have to.
751 if epoch::is_pinned() {
752 atomic::fence(Ordering::SeqCst);
753 }
754
755 let guard = &epoch::pin();
756
757 // Load the back index.
758 let b = self.inner.back.load(Ordering::Acquire);
759
760 // Is the queue empty?
761 let len = b.wrapping_sub(f);
762 if len <= 0 {
763 return Steal::Empty;
764 }
765
766 // Reserve capacity for the stolen batch.
767 let batch_size = cmp::min((len as usize + 1) / 2, limit);
768 dest.reserve(batch_size);
769 let mut batch_size = batch_size as isize;
770
771 // Get the destination buffer and back index.
772 let dest_buffer = dest.buffer.get();
773 let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
774
775 // Load the buffer.
776 let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
777
778 match self.flavor {
779 // Steal a batch of tasks from the front at once.
780 Flavor::Fifo => {
781 // Copy the batch from the source to the destination buffer.
782 match dest.flavor {
783 Flavor::Fifo => {
784 for i in 0..batch_size {
785 unsafe {
786 let task = buffer.deref().read(f.wrapping_add(i));
787 dest_buffer.write(dest_b.wrapping_add(i), task);
788 }
789 }
790 }
791 Flavor::Lifo => {
792 for i in 0..batch_size {
793 unsafe {
794 let task = buffer.deref().read(f.wrapping_add(i));
795 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
796 }
797 }
798 }
799 }
800
801 // Try incrementing the front index to steal the batch.
802 // If the buffer has been swapped or the increment fails, we retry.
803 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
804 || self
805 .inner
806 .front
807 .compare_exchange(
808 f,
809 f.wrapping_add(batch_size),
810 Ordering::SeqCst,
811 Ordering::Relaxed,
812 )
813 .is_err()
814 {
815 return Steal::Retry;
816 }
817
818 dest_b = dest_b.wrapping_add(batch_size);
819 }
820
821 // Steal a batch of tasks from the front one by one.
822 Flavor::Lifo => {
823 // This loop may modify the batch_size, which triggers a clippy lint warning.
824 // Use a new variable to avoid the warning, and to make it clear we aren't
825 // modifying the loop exit condition during iteration.
826 let original_batch_size = batch_size;
827
828 for i in 0..original_batch_size {
829 // If this is not the first steal, check whether the queue is empty.
830 if i > 0 {
831 // We've already got the current front index. Now execute the fence to
832 // synchronize with other threads.
833 atomic::fence(Ordering::SeqCst);
834
835 // Load the back index.
836 let b = self.inner.back.load(Ordering::Acquire);
837
838 // Is the queue empty?
839 if b.wrapping_sub(f) <= 0 {
840 batch_size = i;
841 break;
842 }
843 }
844
845 // Read the task at the front.
846 let task = unsafe { buffer.deref().read(f) };
847
848 // Try incrementing the front index to steal the task.
849 // If the buffer has been swapped or the increment fails, we retry.
850 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
851 || self
852 .inner
853 .front
854 .compare_exchange(
855 f,
856 f.wrapping_add(1),
857 Ordering::SeqCst,
858 Ordering::Relaxed,
859 )
860 .is_err()
861 {
862 // We didn't steal this task, forget it and break from the loop.
863 batch_size = i;
864 break;
865 }
866
867 // Write the stolen task into the destination buffer.
868 unsafe {
869 dest_buffer.write(dest_b, task);
870 }
871
872 // Move the source front index and the destination back index one step forward.
873 f = f.wrapping_add(1);
874 dest_b = dest_b.wrapping_add(1);
875 }
876
877 // If we didn't steal anything, the operation needs to be retried.
878 if batch_size == 0 {
879 return Steal::Retry;
880 }
881
882 // If stealing into a FIFO queue, stolen tasks need to be reversed.
883 if dest.flavor == Flavor::Fifo {
884 for i in 0..batch_size / 2 {
885 unsafe {
886 let i1 = dest_b.wrapping_sub(batch_size - i);
887 let i2 = dest_b.wrapping_sub(i + 1);
888 let t1 = dest_buffer.read(i1);
889 let t2 = dest_buffer.read(i2);
890 dest_buffer.write(i1, t2);
891 dest_buffer.write(i2, t1);
892 }
893 }
894 }
895 }
896 }
897
898 atomic::fence(Ordering::Release);
899
900 // Update the back index in the destination queue.
901 //
902 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
903 // races because it doesn't understand fences.
904 dest.inner.back.store(dest_b, Ordering::Release);
905
906 // Return with success.
907 Steal::Success(())
908 }
909
910 /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker.
911 ///
912 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
913 /// steal around half of the tasks in the queue, but also not more than some constant limit.
914 ///
915 /// # Examples
916 ///
917 /// ```
918 /// use crossbeam_deque::{Steal, Worker};
919 ///
920 /// let w1 = Worker::new_fifo();
921 /// w1.push(1);
922 /// w1.push(2);
923 /// w1.push(3);
924 /// w1.push(4);
925 ///
926 /// let s = w1.stealer();
927 /// let w2 = Worker::new_fifo();
928 ///
929 /// assert_eq!(s.steal_batch_and_pop(&w2), Steal::Success(1));
930 /// assert_eq!(w2.pop(), Some(2));
931 /// ```
932 pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
933 self.steal_batch_with_limit_and_pop(dest, MAX_BATCH)
934 }
935
936 /// Steals no more than `limit` of tasks, pushes them into another worker, and pops a task from
937 /// that worker.
938 ///
939 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
940 /// steal around half of the tasks in the queue, but also not more than the given limit.
941 ///
942 /// # Examples
943 ///
944 /// ```
945 /// use crossbeam_deque::{Steal, Worker};
946 ///
947 /// let w1 = Worker::new_fifo();
948 /// w1.push(1);
949 /// w1.push(2);
950 /// w1.push(3);
951 /// w1.push(4);
952 /// w1.push(5);
953 /// w1.push(6);
954 ///
955 /// let s = w1.stealer();
956 /// let w2 = Worker::new_fifo();
957 ///
958 /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, 2), Steal::Success(1));
959 /// assert_eq!(w2.pop(), Some(2));
960 /// assert_eq!(w2.pop(), None);
961 ///
962 /// w1.push(7);
963 /// w1.push(8);
964 /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
965 /// // half of the elements are currently popped, but the number of popped elements is considered
966 /// // an implementation detail that may be changed in the future.
967 /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, std::usize::MAX), Steal::Success(3));
968 /// assert_eq!(w2.pop(), Some(4));
969 /// assert_eq!(w2.pop(), Some(5));
970 /// assert_eq!(w2.pop(), None);
971 /// ```
972 pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
973 assert!(limit > 0);
974 if Arc::ptr_eq(&self.inner, &dest.inner) {
975 match dest.pop() {
976 None => return Steal::Empty,
977 Some(task) => return Steal::Success(task),
978 }
979 }
980
981 // Load the front index.
982 let mut f = self.inner.front.load(Ordering::Acquire);
983
984 // A SeqCst fence is needed here.
985 //
986 // If the current thread is already pinned (reentrantly), we must manually issue the
987 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
988 // have to.
989 if epoch::is_pinned() {
990 atomic::fence(Ordering::SeqCst);
991 }
992
993 let guard = &epoch::pin();
994
995 // Load the back index.
996 let b = self.inner.back.load(Ordering::Acquire);
997
998 // Is the queue empty?
999 let len = b.wrapping_sub(f);
1000 if len <= 0 {
1001 return Steal::Empty;
1002 }
1003
1004 // Reserve capacity for the stolen batch.
1005 let batch_size = cmp::min((len as usize - 1) / 2, limit - 1);
1006 dest.reserve(batch_size);
1007 let mut batch_size = batch_size as isize;
1008
1009 // Get the destination buffer and back index.
1010 let dest_buffer = dest.buffer.get();
1011 let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
1012
1013 // Load the buffer
1014 let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
1015
1016 // Read the task at the front.
1017 let mut task = unsafe { buffer.deref().read(f) };
1018
1019 match self.flavor {
1020 // Steal a batch of tasks from the front at once.
1021 Flavor::Fifo => {
1022 // Copy the batch from the source to the destination buffer.
1023 match dest.flavor {
1024 Flavor::Fifo => {
1025 for i in 0..batch_size {
1026 unsafe {
1027 let task = buffer.deref().read(f.wrapping_add(i + 1));
1028 dest_buffer.write(dest_b.wrapping_add(i), task);
1029 }
1030 }
1031 }
1032 Flavor::Lifo => {
1033 for i in 0..batch_size {
1034 unsafe {
1035 let task = buffer.deref().read(f.wrapping_add(i + 1));
1036 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
1037 }
1038 }
1039 }
1040 }
1041
1042 // Try incrementing the front index to steal the task.
1043 // If the buffer has been swapped or the increment fails, we retry.
1044 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1045 || self
1046 .inner
1047 .front
1048 .compare_exchange(
1049 f,
1050 f.wrapping_add(batch_size + 1),
1051 Ordering::SeqCst,
1052 Ordering::Relaxed,
1053 )
1054 .is_err()
1055 {
1056 // We didn't steal this task, forget it.
1057 return Steal::Retry;
1058 }
1059
1060 dest_b = dest_b.wrapping_add(batch_size);
1061 }
1062
1063 // Steal a batch of tasks from the front one by one.
1064 Flavor::Lifo => {
1065 // Try incrementing the front index to steal the task.
1066 if self
1067 .inner
1068 .front
1069 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
1070 .is_err()
1071 {
1072 // We didn't steal this task, forget it.
1073 return Steal::Retry;
1074 }
1075
1076 // Move the front index one step forward.
1077 f = f.wrapping_add(1);
1078
1079 // Repeat the same procedure for the batch steals.
1080 //
1081 // This loop may modify the batch_size, which triggers a clippy lint warning.
1082 // Use a new variable to avoid the warning, and to make it clear we aren't
1083 // modifying the loop exit condition during iteration.
1084 let original_batch_size = batch_size;
1085 for i in 0..original_batch_size {
1086 // We've already got the current front index. Now execute the fence to
1087 // synchronize with other threads.
1088 atomic::fence(Ordering::SeqCst);
1089
1090 // Load the back index.
1091 let b = self.inner.back.load(Ordering::Acquire);
1092
1093 // Is the queue empty?
1094 if b.wrapping_sub(f) <= 0 {
1095 batch_size = i;
1096 break;
1097 }
1098
1099 // Read the task at the front.
1100 let tmp = unsafe { buffer.deref().read(f) };
1101
1102 // Try incrementing the front index to steal the task.
1103 // If the buffer has been swapped or the increment fails, we retry.
1104 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1105 || self
1106 .inner
1107 .front
1108 .compare_exchange(
1109 f,
1110 f.wrapping_add(1),
1111 Ordering::SeqCst,
1112 Ordering::Relaxed,
1113 )
1114 .is_err()
1115 {
1116 // We didn't steal this task, forget it and break from the loop.
1117 batch_size = i;
1118 break;
1119 }
1120
1121 // Write the previously stolen task into the destination buffer.
1122 unsafe {
1123 dest_buffer.write(dest_b, mem::replace(&mut task, tmp));
1124 }
1125
1126 // Move the source front index and the destination back index one step forward.
1127 f = f.wrapping_add(1);
1128 dest_b = dest_b.wrapping_add(1);
1129 }
1130
1131 // If stealing into a FIFO queue, stolen tasks need to be reversed.
1132 if dest.flavor == Flavor::Fifo {
1133 for i in 0..batch_size / 2 {
1134 unsafe {
1135 let i1 = dest_b.wrapping_sub(batch_size - i);
1136 let i2 = dest_b.wrapping_sub(i + 1);
1137 let t1 = dest_buffer.read(i1);
1138 let t2 = dest_buffer.read(i2);
1139 dest_buffer.write(i1, t2);
1140 dest_buffer.write(i2, t1);
1141 }
1142 }
1143 }
1144 }
1145 }
1146
1147 atomic::fence(Ordering::Release);
1148
1149 // Update the back index in the destination queue.
1150 //
1151 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
1152 // races because it doesn't understand fences.
1153 dest.inner.back.store(dest_b, Ordering::Release);
1154
1155 // Return with success.
1156 Steal::Success(unsafe { task.assume_init() })
1157 }
1158}
1159
1160impl<T> Clone for Stealer<T> {
1161 fn clone(&self) -> Stealer<T> {
1162 Stealer {
1163 inner: self.inner.clone(),
1164 flavor: self.flavor,
1165 }
1166 }
1167}
1168
1169impl<T> fmt::Debug for Stealer<T> {
1170 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1171 f.pad("Stealer { .. }")
1172 }
1173}
1174
1175// Bits indicating the state of a slot:
1176// * If a task has been written into the slot, `WRITE` is set.
1177// * If a task has been read from the slot, `READ` is set.
1178// * If the block is being destroyed, `DESTROY` is set.
1179const WRITE: usize = 1;
1180const READ: usize = 2;
1181const DESTROY: usize = 4;
1182
1183// Each block covers one "lap" of indices.
1184const LAP: usize = 64;
1185// The maximum number of values a block can hold.
1186const BLOCK_CAP: usize = LAP - 1;
1187// How many lower bits are reserved for metadata.
1188const SHIFT: usize = 1;
1189// Indicates that the block is not the last one.
1190const HAS_NEXT: usize = 1;
1191
1192/// A slot in a block.
1193struct Slot<T> {
1194 /// The task.
1195 task: UnsafeCell<MaybeUninit<T>>,
1196
1197 /// The state of the slot.
1198 state: AtomicUsize,
1199}
1200
1201impl<T> Slot<T> {
1202 const UNINIT: Self = Self {
1203 task: UnsafeCell::new(MaybeUninit::uninit()),
1204 state: AtomicUsize::new(0),
1205 };
1206
1207 /// Waits until a task is written into the slot.
1208 fn wait_write(&self) {
1209 let backoff: Backoff = Backoff::new();
1210 while self.state.load(order:Ordering::Acquire) & WRITE == 0 {
1211 backoff.snooze();
1212 }
1213 }
1214}
1215
1216/// A block in a linked list.
1217///
1218/// Each block in the list can hold up to `BLOCK_CAP` values.
1219struct Block<T> {
1220 /// The next block in the linked list.
1221 next: AtomicPtr<Block<T>>,
1222
1223 /// Slots for values.
1224 slots: [Slot<T>; BLOCK_CAP],
1225}
1226
1227impl<T> Block<T> {
1228 /// Creates an empty block that starts at `start_index`.
1229 fn new() -> Block<T> {
1230 Self {
1231 next: AtomicPtr::new(ptr::null_mut()),
1232 slots: [Slot::UNINIT; BLOCK_CAP],
1233 }
1234 }
1235
1236 /// Waits until the next pointer is set.
1237 fn wait_next(&self) -> *mut Block<T> {
1238 let backoff = Backoff::new();
1239 loop {
1240 let next = self.next.load(Ordering::Acquire);
1241 if !next.is_null() {
1242 return next;
1243 }
1244 backoff.snooze();
1245 }
1246 }
1247
1248 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
1249 unsafe fn destroy(this: *mut Block<T>, count: usize) {
1250 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
1251 // begun destruction of the block.
1252 for i in (0..count).rev() {
1253 let slot = (*this).slots.get_unchecked(i);
1254
1255 // Mark the `DESTROY` bit if a thread is still using the slot.
1256 if slot.state.load(Ordering::Acquire) & READ == 0
1257 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
1258 {
1259 // If a thread is still using the slot, it will continue destruction of the block.
1260 return;
1261 }
1262 }
1263
1264 // No thread is using the block, now it is safe to destroy it.
1265 drop(Box::from_raw(this));
1266 }
1267}
1268
1269/// A position in a queue.
1270struct Position<T> {
1271 /// The index in the queue.
1272 index: AtomicUsize,
1273
1274 /// The block in the linked list.
1275 block: AtomicPtr<Block<T>>,
1276}
1277
1278/// An injector queue.
1279///
1280/// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have
1281/// a single injector queue, which is the entry point for new tasks.
1282///
1283/// # Examples
1284///
1285/// ```
1286/// use crossbeam_deque::{Injector, Steal};
1287///
1288/// let q = Injector::new();
1289/// q.push(1);
1290/// q.push(2);
1291///
1292/// assert_eq!(q.steal(), Steal::Success(1));
1293/// assert_eq!(q.steal(), Steal::Success(2));
1294/// assert_eq!(q.steal(), Steal::Empty);
1295/// ```
1296pub struct Injector<T> {
1297 /// The head of the queue.
1298 head: CachePadded<Position<T>>,
1299
1300 /// The tail of the queue.
1301 tail: CachePadded<Position<T>>,
1302
1303 /// Indicates that dropping a `Injector<T>` may drop values of type `T`.
1304 _marker: PhantomData<T>,
1305}
1306
1307unsafe impl<T: Send> Send for Injector<T> {}
1308unsafe impl<T: Send> Sync for Injector<T> {}
1309
1310impl<T> Default for Injector<T> {
1311 fn default() -> Self {
1312 let block: *mut Block = Box::into_raw(Box::new(Block::<T>::new()));
1313 Self {
1314 head: CachePadded::new(Position {
1315 block: AtomicPtr::new(block),
1316 index: AtomicUsize::new(0),
1317 }),
1318 tail: CachePadded::new(Position {
1319 block: AtomicPtr::new(block),
1320 index: AtomicUsize::new(0),
1321 }),
1322 _marker: PhantomData,
1323 }
1324 }
1325}
1326
1327impl<T> Injector<T> {
1328 /// Creates a new injector queue.
1329 ///
1330 /// # Examples
1331 ///
1332 /// ```
1333 /// use crossbeam_deque::Injector;
1334 ///
1335 /// let q = Injector::<i32>::new();
1336 /// ```
1337 pub fn new() -> Injector<T> {
1338 Self::default()
1339 }
1340
1341 /// Pushes a task into the queue.
1342 ///
1343 /// # Examples
1344 ///
1345 /// ```
1346 /// use crossbeam_deque::Injector;
1347 ///
1348 /// let w = Injector::new();
1349 /// w.push(1);
1350 /// w.push(2);
1351 /// ```
1352 pub fn push(&self, task: T) {
1353 let backoff = Backoff::new();
1354 let mut tail = self.tail.index.load(Ordering::Acquire);
1355 let mut block = self.tail.block.load(Ordering::Acquire);
1356 let mut next_block = None;
1357
1358 loop {
1359 // Calculate the offset of the index into the block.
1360 let offset = (tail >> SHIFT) % LAP;
1361
1362 // If we reached the end of the block, wait until the next one is installed.
1363 if offset == BLOCK_CAP {
1364 backoff.snooze();
1365 tail = self.tail.index.load(Ordering::Acquire);
1366 block = self.tail.block.load(Ordering::Acquire);
1367 continue;
1368 }
1369
1370 // If we're going to have to install the next block, allocate it in advance in order to
1371 // make the wait for other threads as short as possible.
1372 if offset + 1 == BLOCK_CAP && next_block.is_none() {
1373 next_block = Some(Box::new(Block::<T>::new()));
1374 }
1375
1376 let new_tail = tail + (1 << SHIFT);
1377
1378 // Try advancing the tail forward.
1379 match self.tail.index.compare_exchange_weak(
1380 tail,
1381 new_tail,
1382 Ordering::SeqCst,
1383 Ordering::Acquire,
1384 ) {
1385 Ok(_) => unsafe {
1386 // If we've reached the end of the block, install the next one.
1387 if offset + 1 == BLOCK_CAP {
1388 let next_block = Box::into_raw(next_block.unwrap());
1389 let next_index = new_tail.wrapping_add(1 << SHIFT);
1390
1391 self.tail.block.store(next_block, Ordering::Release);
1392 self.tail.index.store(next_index, Ordering::Release);
1393 (*block).next.store(next_block, Ordering::Release);
1394 }
1395
1396 // Write the task into the slot.
1397 let slot = (*block).slots.get_unchecked(offset);
1398 slot.task.get().write(MaybeUninit::new(task));
1399 slot.state.fetch_or(WRITE, Ordering::Release);
1400
1401 return;
1402 },
1403 Err(t) => {
1404 tail = t;
1405 block = self.tail.block.load(Ordering::Acquire);
1406 backoff.spin();
1407 }
1408 }
1409 }
1410 }
1411
1412 /// Steals a task from the queue.
1413 ///
1414 /// # Examples
1415 ///
1416 /// ```
1417 /// use crossbeam_deque::{Injector, Steal};
1418 ///
1419 /// let q = Injector::new();
1420 /// q.push(1);
1421 /// q.push(2);
1422 ///
1423 /// assert_eq!(q.steal(), Steal::Success(1));
1424 /// assert_eq!(q.steal(), Steal::Success(2));
1425 /// assert_eq!(q.steal(), Steal::Empty);
1426 /// ```
1427 pub fn steal(&self) -> Steal<T> {
1428 let mut head;
1429 let mut block;
1430 let mut offset;
1431
1432 let backoff = Backoff::new();
1433 loop {
1434 head = self.head.index.load(Ordering::Acquire);
1435 block = self.head.block.load(Ordering::Acquire);
1436
1437 // Calculate the offset of the index into the block.
1438 offset = (head >> SHIFT) % LAP;
1439
1440 // If we reached the end of the block, wait until the next one is installed.
1441 if offset == BLOCK_CAP {
1442 backoff.snooze();
1443 } else {
1444 break;
1445 }
1446 }
1447
1448 let mut new_head = head + (1 << SHIFT);
1449
1450 if new_head & HAS_NEXT == 0 {
1451 atomic::fence(Ordering::SeqCst);
1452 let tail = self.tail.index.load(Ordering::Relaxed);
1453
1454 // If the tail equals the head, that means the queue is empty.
1455 if head >> SHIFT == tail >> SHIFT {
1456 return Steal::Empty;
1457 }
1458
1459 // If head and tail are not in the same block, set `HAS_NEXT` in head.
1460 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1461 new_head |= HAS_NEXT;
1462 }
1463 }
1464
1465 // Try moving the head index forward.
1466 if self
1467 .head
1468 .index
1469 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1470 .is_err()
1471 {
1472 return Steal::Retry;
1473 }
1474
1475 unsafe {
1476 // If we've reached the end of the block, move to the next one.
1477 if offset + 1 == BLOCK_CAP {
1478 let next = (*block).wait_next();
1479 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1480 if !(*next).next.load(Ordering::Relaxed).is_null() {
1481 next_index |= HAS_NEXT;
1482 }
1483
1484 self.head.block.store(next, Ordering::Release);
1485 self.head.index.store(next_index, Ordering::Release);
1486 }
1487
1488 // Read the task.
1489 let slot = (*block).slots.get_unchecked(offset);
1490 slot.wait_write();
1491 let task = slot.task.get().read().assume_init();
1492
1493 // Destroy the block if we've reached the end, or if another thread wanted to destroy
1494 // but couldn't because we were busy reading from the slot.
1495 if (offset + 1 == BLOCK_CAP)
1496 || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0)
1497 {
1498 Block::destroy(block, offset);
1499 }
1500
1501 Steal::Success(task)
1502 }
1503 }
1504
1505 /// Steals a batch of tasks and pushes them into a worker.
1506 ///
1507 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1508 /// steal around half of the tasks in the queue, but also not more than some constant limit.
1509 ///
1510 /// # Examples
1511 ///
1512 /// ```
1513 /// use crossbeam_deque::{Injector, Worker};
1514 ///
1515 /// let q = Injector::new();
1516 /// q.push(1);
1517 /// q.push(2);
1518 /// q.push(3);
1519 /// q.push(4);
1520 ///
1521 /// let w = Worker::new_fifo();
1522 /// let _ = q.steal_batch(&w);
1523 /// assert_eq!(w.pop(), Some(1));
1524 /// assert_eq!(w.pop(), Some(2));
1525 /// ```
1526 pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
1527 self.steal_batch_with_limit(dest, MAX_BATCH)
1528 }
1529
1530 /// Steals no more than of tasks and pushes them into a worker.
1531 ///
1532 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1533 /// steal around half of the tasks in the queue, but also not more than some constant limit.
1534 ///
1535 /// # Examples
1536 ///
1537 /// ```
1538 /// use crossbeam_deque::{Injector, Worker};
1539 ///
1540 /// let q = Injector::new();
1541 /// q.push(1);
1542 /// q.push(2);
1543 /// q.push(3);
1544 /// q.push(4);
1545 /// q.push(5);
1546 /// q.push(6);
1547 ///
1548 /// let w = Worker::new_fifo();
1549 /// let _ = q.steal_batch_with_limit(&w, 2);
1550 /// assert_eq!(w.pop(), Some(1));
1551 /// assert_eq!(w.pop(), Some(2));
1552 /// assert_eq!(w.pop(), None);
1553 ///
1554 /// q.push(7);
1555 /// q.push(8);
1556 /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1557 /// // half of the elements are currently popped, but the number of popped elements is considered
1558 /// // an implementation detail that may be changed in the future.
1559 /// let _ = q.steal_batch_with_limit(&w, std::usize::MAX);
1560 /// assert_eq!(w.len(), 3);
1561 /// ```
1562 pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
1563 assert!(limit > 0);
1564 let mut head;
1565 let mut block;
1566 let mut offset;
1567
1568 let backoff = Backoff::new();
1569 loop {
1570 head = self.head.index.load(Ordering::Acquire);
1571 block = self.head.block.load(Ordering::Acquire);
1572
1573 // Calculate the offset of the index into the block.
1574 offset = (head >> SHIFT) % LAP;
1575
1576 // If we reached the end of the block, wait until the next one is installed.
1577 if offset == BLOCK_CAP {
1578 backoff.snooze();
1579 } else {
1580 break;
1581 }
1582 }
1583
1584 let mut new_head = head;
1585 let advance;
1586
1587 if new_head & HAS_NEXT == 0 {
1588 atomic::fence(Ordering::SeqCst);
1589 let tail = self.tail.index.load(Ordering::Relaxed);
1590
1591 // If the tail equals the head, that means the queue is empty.
1592 if head >> SHIFT == tail >> SHIFT {
1593 return Steal::Empty;
1594 }
1595
1596 // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate
1597 // the right batch size to steal.
1598 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1599 new_head |= HAS_NEXT;
1600 // We can steal all tasks till the end of the block.
1601 advance = (BLOCK_CAP - offset).min(limit);
1602 } else {
1603 let len = (tail - head) >> SHIFT;
1604 // Steal half of the available tasks.
1605 advance = ((len + 1) / 2).min(limit);
1606 }
1607 } else {
1608 // We can steal all tasks till the end of the block.
1609 advance = (BLOCK_CAP - offset).min(limit);
1610 }
1611
1612 new_head += advance << SHIFT;
1613 let new_offset = offset + advance;
1614
1615 // Try moving the head index forward.
1616 if self
1617 .head
1618 .index
1619 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1620 .is_err()
1621 {
1622 return Steal::Retry;
1623 }
1624
1625 // Reserve capacity for the stolen batch.
1626 let batch_size = new_offset - offset;
1627 dest.reserve(batch_size);
1628
1629 // Get the destination buffer and back index.
1630 let dest_buffer = dest.buffer.get();
1631 let dest_b = dest.inner.back.load(Ordering::Relaxed);
1632
1633 unsafe {
1634 // If we've reached the end of the block, move to the next one.
1635 if new_offset == BLOCK_CAP {
1636 let next = (*block).wait_next();
1637 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1638 if !(*next).next.load(Ordering::Relaxed).is_null() {
1639 next_index |= HAS_NEXT;
1640 }
1641
1642 self.head.block.store(next, Ordering::Release);
1643 self.head.index.store(next_index, Ordering::Release);
1644 }
1645
1646 // Copy values from the injector into the destination queue.
1647 match dest.flavor {
1648 Flavor::Fifo => {
1649 for i in 0..batch_size {
1650 // Read the task.
1651 let slot = (*block).slots.get_unchecked(offset + i);
1652 slot.wait_write();
1653 let task = slot.task.get().read();
1654
1655 // Write it into the destination queue.
1656 dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1657 }
1658 }
1659
1660 Flavor::Lifo => {
1661 for i in 0..batch_size {
1662 // Read the task.
1663 let slot = (*block).slots.get_unchecked(offset + i);
1664 slot.wait_write();
1665 let task = slot.task.get().read();
1666
1667 // Write it into the destination queue.
1668 dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1669 }
1670 }
1671 }
1672
1673 atomic::fence(Ordering::Release);
1674
1675 // Update the back index in the destination queue.
1676 //
1677 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1678 // data races because it doesn't understand fences.
1679 dest.inner
1680 .back
1681 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1682
1683 // Destroy the block if we've reached the end, or if another thread wanted to destroy
1684 // but couldn't because we were busy reading from the slot.
1685 if new_offset == BLOCK_CAP {
1686 Block::destroy(block, offset);
1687 } else {
1688 for i in offset..new_offset {
1689 let slot = (*block).slots.get_unchecked(i);
1690
1691 if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1692 Block::destroy(block, offset);
1693 break;
1694 }
1695 }
1696 }
1697
1698 Steal::Success(())
1699 }
1700 }
1701
1702 /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker.
1703 ///
1704 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1705 /// steal around half of the tasks in the queue, but also not more than some constant limit.
1706 ///
1707 /// # Examples
1708 ///
1709 /// ```
1710 /// use crossbeam_deque::{Injector, Steal, Worker};
1711 ///
1712 /// let q = Injector::new();
1713 /// q.push(1);
1714 /// q.push(2);
1715 /// q.push(3);
1716 /// q.push(4);
1717 ///
1718 /// let w = Worker::new_fifo();
1719 /// assert_eq!(q.steal_batch_and_pop(&w), Steal::Success(1));
1720 /// assert_eq!(w.pop(), Some(2));
1721 /// ```
1722 pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
1723 // TODO: we use `MAX_BATCH + 1` as the hard limit for Injecter as the performance is slightly
1724 // better, but we may change it in the future to be compatible with the same method in Stealer.
1725 self.steal_batch_with_limit_and_pop(dest, MAX_BATCH + 1)
1726 }
1727
1728 /// Steals no more than `limit` of tasks, pushes them into a worker, and pops a task from that worker.
1729 ///
1730 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1731 /// steal around half of the tasks in the queue, but also not more than the given limit.
1732 ///
1733 /// # Examples
1734 ///
1735 /// ```
1736 /// use crossbeam_deque::{Injector, Steal, Worker};
1737 ///
1738 /// let q = Injector::new();
1739 /// q.push(1);
1740 /// q.push(2);
1741 /// q.push(3);
1742 /// q.push(4);
1743 /// q.push(5);
1744 /// q.push(6);
1745 ///
1746 /// let w = Worker::new_fifo();
1747 /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, 2), Steal::Success(1));
1748 /// assert_eq!(w.pop(), Some(2));
1749 /// assert_eq!(w.pop(), None);
1750 ///
1751 /// q.push(7);
1752 /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1753 /// // half of the elements are currently popped, but the number of popped elements is considered
1754 /// // an implementation detail that may be changed in the future.
1755 /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, std::usize::MAX), Steal::Success(3));
1756 /// assert_eq!(w.pop(), Some(4));
1757 /// assert_eq!(w.pop(), Some(5));
1758 /// assert_eq!(w.pop(), None);
1759 /// ```
1760 pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
1761 assert!(limit > 0);
1762 let mut head;
1763 let mut block;
1764 let mut offset;
1765
1766 let backoff = Backoff::new();
1767 loop {
1768 head = self.head.index.load(Ordering::Acquire);
1769 block = self.head.block.load(Ordering::Acquire);
1770
1771 // Calculate the offset of the index into the block.
1772 offset = (head >> SHIFT) % LAP;
1773
1774 // If we reached the end of the block, wait until the next one is installed.
1775 if offset == BLOCK_CAP {
1776 backoff.snooze();
1777 } else {
1778 break;
1779 }
1780 }
1781
1782 let mut new_head = head;
1783 let advance;
1784
1785 if new_head & HAS_NEXT == 0 {
1786 atomic::fence(Ordering::SeqCst);
1787 let tail = self.tail.index.load(Ordering::Relaxed);
1788
1789 // If the tail equals the head, that means the queue is empty.
1790 if head >> SHIFT == tail >> SHIFT {
1791 return Steal::Empty;
1792 }
1793
1794 // If head and tail are not in the same block, set `HAS_NEXT` in head.
1795 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1796 new_head |= HAS_NEXT;
1797 // We can steal all tasks till the end of the block.
1798 advance = (BLOCK_CAP - offset).min(limit);
1799 } else {
1800 let len = (tail - head) >> SHIFT;
1801 // Steal half of the available tasks.
1802 advance = ((len + 1) / 2).min(limit);
1803 }
1804 } else {
1805 // We can steal all tasks till the end of the block.
1806 advance = (BLOCK_CAP - offset).min(limit);
1807 }
1808
1809 new_head += advance << SHIFT;
1810 let new_offset = offset + advance;
1811
1812 // Try moving the head index forward.
1813 if self
1814 .head
1815 .index
1816 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1817 .is_err()
1818 {
1819 return Steal::Retry;
1820 }
1821
1822 // Reserve capacity for the stolen batch.
1823 let batch_size = new_offset - offset - 1;
1824 dest.reserve(batch_size);
1825
1826 // Get the destination buffer and back index.
1827 let dest_buffer = dest.buffer.get();
1828 let dest_b = dest.inner.back.load(Ordering::Relaxed);
1829
1830 unsafe {
1831 // If we've reached the end of the block, move to the next one.
1832 if new_offset == BLOCK_CAP {
1833 let next = (*block).wait_next();
1834 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1835 if !(*next).next.load(Ordering::Relaxed).is_null() {
1836 next_index |= HAS_NEXT;
1837 }
1838
1839 self.head.block.store(next, Ordering::Release);
1840 self.head.index.store(next_index, Ordering::Release);
1841 }
1842
1843 // Read the task.
1844 let slot = (*block).slots.get_unchecked(offset);
1845 slot.wait_write();
1846 let task = slot.task.get().read();
1847
1848 match dest.flavor {
1849 Flavor::Fifo => {
1850 // Copy values from the injector into the destination queue.
1851 for i in 0..batch_size {
1852 // Read the task.
1853 let slot = (*block).slots.get_unchecked(offset + i + 1);
1854 slot.wait_write();
1855 let task = slot.task.get().read();
1856
1857 // Write it into the destination queue.
1858 dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1859 }
1860 }
1861
1862 Flavor::Lifo => {
1863 // Copy values from the injector into the destination queue.
1864 for i in 0..batch_size {
1865 // Read the task.
1866 let slot = (*block).slots.get_unchecked(offset + i + 1);
1867 slot.wait_write();
1868 let task = slot.task.get().read();
1869
1870 // Write it into the destination queue.
1871 dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1872 }
1873 }
1874 }
1875
1876 atomic::fence(Ordering::Release);
1877
1878 // Update the back index in the destination queue.
1879 //
1880 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1881 // data races because it doesn't understand fences.
1882 dest.inner
1883 .back
1884 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1885
1886 // Destroy the block if we've reached the end, or if another thread wanted to destroy
1887 // but couldn't because we were busy reading from the slot.
1888 if new_offset == BLOCK_CAP {
1889 Block::destroy(block, offset);
1890 } else {
1891 for i in offset..new_offset {
1892 let slot = (*block).slots.get_unchecked(i);
1893
1894 if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1895 Block::destroy(block, offset);
1896 break;
1897 }
1898 }
1899 }
1900
1901 Steal::Success(task.assume_init())
1902 }
1903 }
1904
1905 /// Returns `true` if the queue is empty.
1906 ///
1907 /// # Examples
1908 ///
1909 /// ```
1910 /// use crossbeam_deque::Injector;
1911 ///
1912 /// let q = Injector::new();
1913 ///
1914 /// assert!(q.is_empty());
1915 /// q.push(1);
1916 /// assert!(!q.is_empty());
1917 /// ```
1918 pub fn is_empty(&self) -> bool {
1919 let head = self.head.index.load(Ordering::SeqCst);
1920 let tail = self.tail.index.load(Ordering::SeqCst);
1921 head >> SHIFT == tail >> SHIFT
1922 }
1923
1924 /// Returns the number of tasks in the queue.
1925 ///
1926 /// # Examples
1927 ///
1928 /// ```
1929 /// use crossbeam_deque::Injector;
1930 ///
1931 /// let q = Injector::new();
1932 ///
1933 /// assert_eq!(q.len(), 0);
1934 /// q.push(1);
1935 /// assert_eq!(q.len(), 1);
1936 /// q.push(1);
1937 /// assert_eq!(q.len(), 2);
1938 /// ```
1939 pub fn len(&self) -> usize {
1940 loop {
1941 // Load the tail index, then load the head index.
1942 let mut tail = self.tail.index.load(Ordering::SeqCst);
1943 let mut head = self.head.index.load(Ordering::SeqCst);
1944
1945 // If the tail index didn't change, we've got consistent indices to work with.
1946 if self.tail.index.load(Ordering::SeqCst) == tail {
1947 // Erase the lower bits.
1948 tail &= !((1 << SHIFT) - 1);
1949 head &= !((1 << SHIFT) - 1);
1950
1951 // Fix up indices if they fall onto block ends.
1952 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
1953 tail = tail.wrapping_add(1 << SHIFT);
1954 }
1955 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
1956 head = head.wrapping_add(1 << SHIFT);
1957 }
1958
1959 // Rotate indices so that head falls into the first block.
1960 let lap = (head >> SHIFT) / LAP;
1961 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
1962 head = head.wrapping_sub((lap * LAP) << SHIFT);
1963
1964 // Remove the lower bits.
1965 tail >>= SHIFT;
1966 head >>= SHIFT;
1967
1968 // Return the difference minus the number of blocks between tail and head.
1969 return tail - head - tail / LAP;
1970 }
1971 }
1972 }
1973}
1974
1975impl<T> Drop for Injector<T> {
1976 fn drop(&mut self) {
1977 let mut head = *self.head.index.get_mut();
1978 let mut tail = *self.tail.index.get_mut();
1979 let mut block = *self.head.block.get_mut();
1980
1981 // Erase the lower bits.
1982 head &= !((1 << SHIFT) - 1);
1983 tail &= !((1 << SHIFT) - 1);
1984
1985 unsafe {
1986 // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
1987 while head != tail {
1988 let offset = (head >> SHIFT) % LAP;
1989
1990 if offset < BLOCK_CAP {
1991 // Drop the task in the slot.
1992 let slot = (*block).slots.get_unchecked(offset);
1993 let p = &mut *slot.task.get();
1994 p.as_mut_ptr().drop_in_place();
1995 } else {
1996 // Deallocate the block and move to the next one.
1997 let next = *(*block).next.get_mut();
1998 drop(Box::from_raw(block));
1999 block = next;
2000 }
2001
2002 head = head.wrapping_add(1 << SHIFT);
2003 }
2004
2005 // Deallocate the last remaining block.
2006 drop(Box::from_raw(block));
2007 }
2008 }
2009}
2010
2011impl<T> fmt::Debug for Injector<T> {
2012 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2013 f.pad("Worker { .. }")
2014 }
2015}
2016
2017/// Possible outcomes of a steal operation.
2018///
2019/// # Examples
2020///
2021/// There are lots of ways to chain results of steal operations together:
2022///
2023/// ```
2024/// use crossbeam_deque::Steal::{self, Empty, Retry, Success};
2025///
2026/// let collect = |v: Vec<Steal<i32>>| v.into_iter().collect::<Steal<i32>>();
2027///
2028/// assert_eq!(collect(vec![Empty, Empty, Empty]), Empty);
2029/// assert_eq!(collect(vec![Empty, Retry, Empty]), Retry);
2030/// assert_eq!(collect(vec![Retry, Success(1), Empty]), Success(1));
2031///
2032/// assert_eq!(collect(vec![Empty, Empty]).or_else(|| Retry), Retry);
2033/// assert_eq!(collect(vec![Retry, Empty]).or_else(|| Success(1)), Success(1));
2034/// ```
2035#[must_use]
2036#[derive(PartialEq, Eq, Copy, Clone)]
2037pub enum Steal<T> {
2038 /// The queue was empty at the time of stealing.
2039 Empty,
2040
2041 /// At least one task was successfully stolen.
2042 Success(T),
2043
2044 /// The steal operation needs to be retried.
2045 Retry,
2046}
2047
2048impl<T> Steal<T> {
2049 /// Returns `true` if the queue was empty at the time of stealing.
2050 ///
2051 /// # Examples
2052 ///
2053 /// ```
2054 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2055 ///
2056 /// assert!(!Success(7).is_empty());
2057 /// assert!(!Retry::<i32>.is_empty());
2058 ///
2059 /// assert!(Empty::<i32>.is_empty());
2060 /// ```
2061 pub fn is_empty(&self) -> bool {
2062 match self {
2063 Steal::Empty => true,
2064 _ => false,
2065 }
2066 }
2067
2068 /// Returns `true` if at least one task was stolen.
2069 ///
2070 /// # Examples
2071 ///
2072 /// ```
2073 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2074 ///
2075 /// assert!(!Empty::<i32>.is_success());
2076 /// assert!(!Retry::<i32>.is_success());
2077 ///
2078 /// assert!(Success(7).is_success());
2079 /// ```
2080 pub fn is_success(&self) -> bool {
2081 match self {
2082 Steal::Success(_) => true,
2083 _ => false,
2084 }
2085 }
2086
2087 /// Returns `true` if the steal operation needs to be retried.
2088 ///
2089 /// # Examples
2090 ///
2091 /// ```
2092 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2093 ///
2094 /// assert!(!Empty::<i32>.is_retry());
2095 /// assert!(!Success(7).is_retry());
2096 ///
2097 /// assert!(Retry::<i32>.is_retry());
2098 /// ```
2099 pub fn is_retry(&self) -> bool {
2100 match self {
2101 Steal::Retry => true,
2102 _ => false,
2103 }
2104 }
2105
2106 /// Returns the result of the operation, if successful.
2107 ///
2108 /// # Examples
2109 ///
2110 /// ```
2111 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2112 ///
2113 /// assert_eq!(Empty::<i32>.success(), None);
2114 /// assert_eq!(Retry::<i32>.success(), None);
2115 ///
2116 /// assert_eq!(Success(7).success(), Some(7));
2117 /// ```
2118 pub fn success(self) -> Option<T> {
2119 match self {
2120 Steal::Success(res) => Some(res),
2121 _ => None,
2122 }
2123 }
2124
2125 /// If no task was stolen, attempts another steal operation.
2126 ///
2127 /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then:
2128 ///
2129 /// * If the second steal resulted in `Success`, it is returned.
2130 /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned.
2131 /// * If both resulted in `None`, then `None` is returned.
2132 ///
2133 /// # Examples
2134 ///
2135 /// ```
2136 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2137 ///
2138 /// assert_eq!(Success(1).or_else(|| Success(2)), Success(1));
2139 /// assert_eq!(Retry.or_else(|| Success(2)), Success(2));
2140 ///
2141 /// assert_eq!(Retry.or_else(|| Empty), Retry::<i32>);
2142 /// assert_eq!(Empty.or_else(|| Retry), Retry::<i32>);
2143 ///
2144 /// assert_eq!(Empty.or_else(|| Empty), Empty::<i32>);
2145 /// ```
2146 pub fn or_else<F>(self, f: F) -> Steal<T>
2147 where
2148 F: FnOnce() -> Steal<T>,
2149 {
2150 match self {
2151 Steal::Empty => f(),
2152 Steal::Success(_) => self,
2153 Steal::Retry => {
2154 if let Steal::Success(res) = f() {
2155 Steal::Success(res)
2156 } else {
2157 Steal::Retry
2158 }
2159 }
2160 }
2161 }
2162}
2163
2164impl<T> fmt::Debug for Steal<T> {
2165 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2166 match self {
2167 Steal::Empty => f.pad("Empty"),
2168 Steal::Success(_) => f.pad("Success(..)"),
2169 Steal::Retry => f.pad("Retry"),
2170 }
2171 }
2172}
2173
2174impl<T> FromIterator<Steal<T>> for Steal<T> {
2175 /// Consumes items until a `Success` is found and returns it.
2176 ///
2177 /// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`.
2178 /// Otherwise, `Empty` is returned.
2179 fn from_iter<I>(iter: I) -> Steal<T>
2180 where
2181 I: IntoIterator<Item = Steal<T>>,
2182 {
2183 let mut retry = false;
2184 for s in iter {
2185 match &s {
2186 Steal::Empty => {}
2187 Steal::Success(_) => return s,
2188 Steal::Retry => retry = true,
2189 }
2190 }
2191
2192 if retry {
2193 Steal::Retry
2194 } else {
2195 Steal::Empty
2196 }
2197 }
2198}
2199