1use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2use std::sync::Mutex;
3
4use crate::iter::plumbing::{bridge_unindexed, Folder, UnindexedConsumer, UnindexedProducer};
5use crate::iter::ParallelIterator;
6use crate::{current_num_threads, current_thread_index};
7
8/// Conversion trait to convert an `Iterator` to a `ParallelIterator`.
9///
10/// This creates a "bridge" from a sequential iterator to a parallel one, by distributing its items
11/// across the Rayon thread pool. This has the advantage of being able to parallelize just about
12/// anything, but the resulting `ParallelIterator` can be less efficient than if you started with
13/// `par_iter` instead. However, it can still be useful for iterators that are difficult to
14/// parallelize by other means, like channels or file or network I/O.
15///
16/// Iterator items are pulled by `next()` one at a time, synchronized from each thread that is
17/// ready for work, so this may become a bottleneck if the serial iterator can't keep up with the
18/// parallel demand. The items are not buffered by `IterBridge`, so it's fine to use this with
19/// large or even unbounded iterators.
20///
21/// The resulting iterator is not guaranteed to keep the order of the original iterator.
22///
23/// # Examples
24///
25/// To use this trait, take an existing `Iterator` and call `par_bridge` on it. After that, you can
26/// use any of the `ParallelIterator` methods:
27///
28/// ```
29/// use rayon::iter::ParallelBridge;
30/// use rayon::prelude::ParallelIterator;
31/// use std::sync::mpsc::channel;
32///
33/// let rx = {
34/// let (tx, rx) = channel();
35///
36/// tx.send("one!");
37/// tx.send("two!");
38/// tx.send("three!");
39///
40/// rx
41/// };
42///
43/// let mut output: Vec<&'static str> = rx.into_iter().par_bridge().collect();
44/// output.sort_unstable();
45///
46/// assert_eq!(&*output, &["one!", "three!", "two!"]);
47/// ```
48pub trait ParallelBridge: Sized {
49 /// Creates a bridge from this type to a `ParallelIterator`.
50 fn par_bridge(self) -> IterBridge<Self>;
51}
52
53impl<T: Iterator + Send> ParallelBridge for T
54where
55 T::Item: Send,
56{
57 fn par_bridge(self) -> IterBridge<Self> {
58 IterBridge { iter: self }
59 }
60}
61
62/// `IterBridge` is a parallel iterator that wraps a sequential iterator.
63///
64/// This type is created when using the `par_bridge` method on `ParallelBridge`. See the
65/// [`ParallelBridge`] documentation for details.
66///
67/// [`ParallelBridge`]: trait.ParallelBridge.html
68#[derive(Debug, Clone)]
69pub struct IterBridge<Iter> {
70 iter: Iter,
71}
72
73impl<Iter: Iterator + Send> ParallelIterator for IterBridge<Iter>
74where
75 Iter::Item: Send,
76{
77 type Item = Iter::Item;
78
79 fn drive_unindexed<C>(self, consumer: C) -> C::Result
80 where
81 C: UnindexedConsumer<Self::Item>,
82 {
83 let num_threads = current_num_threads();
84 let threads_started: Vec<_> = (0..num_threads).map(|_| AtomicBool::new(false)).collect();
85
86 bridge_unindexed(
87 &IterParallelProducer {
88 split_count: AtomicUsize::new(num_threads),
89 iter: Mutex::new(self.iter.fuse()),
90 threads_started: &threads_started,
91 },
92 consumer,
93 )
94 }
95}
96
97struct IterParallelProducer<'a, Iter> {
98 split_count: AtomicUsize,
99 iter: Mutex<std::iter::Fuse<Iter>>,
100 threads_started: &'a [AtomicBool],
101}
102
103impl<Iter: Iterator + Send> UnindexedProducer for &IterParallelProducer<'_, Iter> {
104 type Item = Iter::Item;
105
106 fn split(self) -> (Self, Option<Self>) {
107 let mut count = self.split_count.load(Ordering::SeqCst);
108
109 loop {
110 // Check if the iterator is exhausted
111 if let Some(new_count) = count.checked_sub(1) {
112 match self.split_count.compare_exchange_weak(
113 count,
114 new_count,
115 Ordering::SeqCst,
116 Ordering::SeqCst,
117 ) {
118 Ok(_) => return (self, Some(self)),
119 Err(last_count) => count = last_count,
120 }
121 } else {
122 return (self, None);
123 }
124 }
125 }
126
127 fn fold_with<F>(self, mut folder: F) -> F
128 where
129 F: Folder<Self::Item>,
130 {
131 // Guard against work-stealing-induced recursion, in case `Iter::next()`
132 // calls rayon internally, so we don't deadlock our mutex. We might also
133 // be recursing via `folder` methods, which doesn't present a mutex hazard,
134 // but it's lower overhead for us to just check this once, rather than
135 // updating additional shared state on every mutex lock/unlock.
136 // (If this isn't a rayon thread, then there's no work-stealing anyway...)
137 if let Some(i) = current_thread_index() {
138 // Note: If the number of threads in the pool ever grows dynamically, then
139 // we'll end up sharing flags and may falsely detect recursion -- that's
140 // still fine for overall correctness, just not optimal for parallelism.
141 let thread_started = &self.threads_started[i % self.threads_started.len()];
142 if thread_started.swap(true, Ordering::Relaxed) {
143 // We can't make progress with a nested mutex, so just return and let
144 // the outermost loop continue with the rest of the iterator items.
145 return folder;
146 }
147 }
148
149 loop {
150 if let Ok(mut iter) = self.iter.lock() {
151 if let Some(it) = iter.next() {
152 drop(iter);
153 folder = folder.consume(it);
154 if folder.full() {
155 return folder;
156 }
157 } else {
158 return folder;
159 }
160 } else {
161 // any panics from other threads will have been caught by the pool,
162 // and will be re-thrown when joined - just exit
163 return folder;
164 }
165 }
166 }
167}
168