1 | use super::noop::NoopConsumer; |
2 | use super::plumbing::{Consumer, Folder, Reducer, UnindexedConsumer}; |
3 | use super::{IntoParallelIterator, ParallelExtend, ParallelIterator}; |
4 | |
5 | use either::Either; |
6 | use std::borrow::Cow; |
7 | use std::collections::LinkedList; |
8 | use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; |
9 | use std::collections::{BinaryHeap, VecDeque}; |
10 | use std::ffi::{OsStr, OsString}; |
11 | use std::hash::{BuildHasher, Hash}; |
12 | |
13 | /// Performs a generic `par_extend` by collecting to a `LinkedList<Vec<_>>` in |
14 | /// parallel, then extending the collection sequentially. |
15 | macro_rules! extend { |
16 | ($self:ident, $par_iter:ident) => { |
17 | extend!($self <- fast_collect($par_iter)) |
18 | }; |
19 | ($self:ident <- $vecs:expr) => { |
20 | match $vecs { |
21 | Either::Left(vec) => $self.extend(vec), |
22 | Either::Right(list) => { |
23 | for vec in list { |
24 | $self.extend(vec); |
25 | } |
26 | } |
27 | } |
28 | }; |
29 | } |
30 | macro_rules! extend_reserved { |
31 | ($self:ident, $par_iter:ident, $len:ident) => { |
32 | let vecs = fast_collect($par_iter); |
33 | $self.reserve($len(&vecs)); |
34 | extend!($self <- vecs) |
35 | }; |
36 | ($self:ident, $par_iter:ident) => { |
37 | extend_reserved!($self, $par_iter, len) |
38 | }; |
39 | } |
40 | |
41 | /// Computes the total length of a `fast_collect` result. |
42 | fn len<T>(vecs: &Either<Vec<T>, LinkedList<Vec<T>>>) -> usize { |
43 | match vecs { |
44 | Either::Left(vec: &Vec) => vec.len(), |
45 | Either::Right(list: &LinkedList>) => list.iter().map(Vec::len).sum(), |
46 | } |
47 | } |
48 | |
49 | /// Computes the total string length of a `fast_collect` result. |
50 | fn string_len<T: AsRef<str>>(vecs: &Either<Vec<T>, LinkedList<Vec<T>>>) -> usize { |
51 | let strs: Either, Flatten<…>> = match vecs { |
52 | Either::Left(vec: &Vec) => Either::Left(vec.iter()), |
53 | Either::Right(list: &LinkedList>) => Either::Right(list.iter().flatten()), |
54 | }; |
55 | strs.map(AsRef::as_ref).map(str::len).sum() |
56 | } |
57 | |
58 | /// Computes the total OS-string length of a `fast_collect` result. |
59 | fn osstring_len<T: AsRef<OsStr>>(vecs: &Either<Vec<T>, LinkedList<Vec<T>>>) -> usize { |
60 | let osstrs: Either, Flatten<…>> = match vecs { |
61 | Either::Left(vec: &Vec) => Either::Left(vec.iter()), |
62 | Either::Right(list: &LinkedList>) => Either::Right(list.iter().flatten()), |
63 | }; |
64 | osstrs.map(AsRef::as_ref).map(OsStr::len).sum() |
65 | } |
66 | |
67 | pub(super) fn fast_collect<I, T>(pi: I) -> Either<Vec<T>, LinkedList<Vec<T>>> |
68 | where |
69 | I: IntoParallelIterator<Item = T>, |
70 | T: Send, |
71 | { |
72 | let par_iter: ::Iter = pi.into_par_iter(); |
73 | match par_iter.opt_len() { |
74 | Some(len: usize) => { |
75 | // Pseudo-specialization. See impl of ParallelExtend for Vec for more details. |
76 | let mut vec: Vec = Vec::new(); |
77 | super::collect::special_extend(pi:par_iter, len, &mut vec); |
78 | Either::Left(vec) |
79 | } |
80 | None => Either::Right(par_iter.drive_unindexed(consumer:ListVecConsumer)), |
81 | } |
82 | } |
83 | |
84 | struct ListVecConsumer; |
85 | |
86 | struct ListVecFolder<T> { |
87 | vec: Vec<T>, |
88 | } |
89 | |
90 | impl<T: Send> Consumer<T> for ListVecConsumer { |
91 | type Folder = ListVecFolder<T>; |
92 | type Reducer = ListReducer; |
93 | type Result = LinkedList<Vec<T>>; |
94 | |
95 | fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) { |
96 | (Self, Self, ListReducer) |
97 | } |
98 | |
99 | fn into_folder(self) -> Self::Folder { |
100 | ListVecFolder { vec: Vec::new() } |
101 | } |
102 | |
103 | fn full(&self) -> bool { |
104 | false |
105 | } |
106 | } |
107 | |
108 | impl<T: Send> UnindexedConsumer<T> for ListVecConsumer { |
109 | fn split_off_left(&self) -> Self { |
110 | Self |
111 | } |
112 | |
113 | fn to_reducer(&self) -> Self::Reducer { |
114 | ListReducer |
115 | } |
116 | } |
117 | |
118 | impl<T> Folder<T> for ListVecFolder<T> { |
119 | type Result = LinkedList<Vec<T>>; |
120 | |
121 | fn consume(mut self, item: T) -> Self { |
122 | self.vec.push(item); |
123 | self |
124 | } |
125 | |
126 | fn consume_iter<I>(mut self, iter: I) -> Self |
127 | where |
128 | I: IntoIterator<Item = T>, |
129 | { |
130 | self.vec.extend(iter); |
131 | self |
132 | } |
133 | |
134 | fn complete(self) -> Self::Result { |
135 | let mut list = LinkedList::new(); |
136 | if !self.vec.is_empty() { |
137 | list.push_back(self.vec); |
138 | } |
139 | list |
140 | } |
141 | |
142 | fn full(&self) -> bool { |
143 | false |
144 | } |
145 | } |
146 | |
147 | /// Extends a binary heap with items from a parallel iterator. |
148 | impl<T> ParallelExtend<T> for BinaryHeap<T> |
149 | where |
150 | T: Ord + Send, |
151 | { |
152 | fn par_extend<I>(&mut self, par_iter: I) |
153 | where |
154 | I: IntoParallelIterator<Item = T>, |
155 | { |
156 | extend_reserved!(self, par_iter); |
157 | } |
158 | } |
159 | |
160 | /// Extends a binary heap with copied items from a parallel iterator. |
161 | impl<'a, T> ParallelExtend<&'a T> for BinaryHeap<T> |
162 | where |
163 | T: 'a + Copy + Ord + Send + Sync, |
164 | { |
165 | fn par_extend<I>(&mut self, par_iter: I) |
166 | where |
167 | I: IntoParallelIterator<Item = &'a T>, |
168 | { |
169 | extend_reserved!(self, par_iter); |
170 | } |
171 | } |
172 | |
173 | /// Extends a B-tree map with items from a parallel iterator. |
174 | impl<K, V> ParallelExtend<(K, V)> for BTreeMap<K, V> |
175 | where |
176 | K: Ord + Send, |
177 | V: Send, |
178 | { |
179 | fn par_extend<I>(&mut self, par_iter: I) |
180 | where |
181 | I: IntoParallelIterator<Item = (K, V)>, |
182 | { |
183 | extend!(self, par_iter); |
184 | } |
185 | } |
186 | |
187 | /// Extends a B-tree map with copied items from a parallel iterator. |
188 | impl<'a, K: 'a, V: 'a> ParallelExtend<(&'a K, &'a V)> for BTreeMap<K, V> |
189 | where |
190 | K: Copy + Ord + Send + Sync, |
191 | V: Copy + Send + Sync, |
192 | { |
193 | fn par_extend<I>(&mut self, par_iter: I) |
194 | where |
195 | I: IntoParallelIterator<Item = (&'a K, &'a V)>, |
196 | { |
197 | extend!(self, par_iter); |
198 | } |
199 | } |
200 | |
201 | /// Extends a B-tree set with items from a parallel iterator. |
202 | impl<T> ParallelExtend<T> for BTreeSet<T> |
203 | where |
204 | T: Ord + Send, |
205 | { |
206 | fn par_extend<I>(&mut self, par_iter: I) |
207 | where |
208 | I: IntoParallelIterator<Item = T>, |
209 | { |
210 | extend!(self, par_iter); |
211 | } |
212 | } |
213 | |
214 | /// Extends a B-tree set with copied items from a parallel iterator. |
215 | impl<'a, T> ParallelExtend<&'a T> for BTreeSet<T> |
216 | where |
217 | T: 'a + Copy + Ord + Send + Sync, |
218 | { |
219 | fn par_extend<I>(&mut self, par_iter: I) |
220 | where |
221 | I: IntoParallelIterator<Item = &'a T>, |
222 | { |
223 | extend!(self, par_iter); |
224 | } |
225 | } |
226 | |
227 | /// Extends a hash map with items from a parallel iterator. |
228 | impl<K, V, S> ParallelExtend<(K, V)> for HashMap<K, V, S> |
229 | where |
230 | K: Eq + Hash + Send, |
231 | V: Send, |
232 | S: BuildHasher + Send, |
233 | { |
234 | fn par_extend<I>(&mut self, par_iter: I) |
235 | where |
236 | I: IntoParallelIterator<Item = (K, V)>, |
237 | { |
238 | // See the map_collect benchmarks in rayon-demo for different strategies. |
239 | extend_reserved!(self, par_iter); |
240 | } |
241 | } |
242 | |
243 | /// Extends a hash map with copied items from a parallel iterator. |
244 | impl<'a, K: 'a, V: 'a, S> ParallelExtend<(&'a K, &'a V)> for HashMap<K, V, S> |
245 | where |
246 | K: Copy + Eq + Hash + Send + Sync, |
247 | V: Copy + Send + Sync, |
248 | S: BuildHasher + Send, |
249 | { |
250 | fn par_extend<I>(&mut self, par_iter: I) |
251 | where |
252 | I: IntoParallelIterator<Item = (&'a K, &'a V)>, |
253 | { |
254 | extend_reserved!(self, par_iter); |
255 | } |
256 | } |
257 | |
258 | /// Extends a hash set with items from a parallel iterator. |
259 | impl<T, S> ParallelExtend<T> for HashSet<T, S> |
260 | where |
261 | T: Eq + Hash + Send, |
262 | S: BuildHasher + Send, |
263 | { |
264 | fn par_extend<I>(&mut self, par_iter: I) |
265 | where |
266 | I: IntoParallelIterator<Item = T>, |
267 | { |
268 | extend_reserved!(self, par_iter); |
269 | } |
270 | } |
271 | |
272 | /// Extends a hash set with copied items from a parallel iterator. |
273 | impl<'a, T, S> ParallelExtend<&'a T> for HashSet<T, S> |
274 | where |
275 | T: 'a + Copy + Eq + Hash + Send + Sync, |
276 | S: BuildHasher + Send, |
277 | { |
278 | fn par_extend<I>(&mut self, par_iter: I) |
279 | where |
280 | I: IntoParallelIterator<Item = &'a T>, |
281 | { |
282 | extend_reserved!(self, par_iter); |
283 | } |
284 | } |
285 | |
286 | /// Extends a linked list with items from a parallel iterator. |
287 | impl<T> ParallelExtend<T> for LinkedList<T> |
288 | where |
289 | T: Send, |
290 | { |
291 | fn par_extend<I>(&mut self, par_iter: I) |
292 | where |
293 | I: IntoParallelIterator<Item = T>, |
294 | { |
295 | let mut list: LinkedList = par_iter.into_par_iter().drive_unindexed(consumer:ListConsumer); |
296 | self.append(&mut list); |
297 | } |
298 | } |
299 | |
300 | /// Extends a linked list with copied items from a parallel iterator. |
301 | impl<'a, T> ParallelExtend<&'a T> for LinkedList<T> |
302 | where |
303 | T: 'a + Copy + Send + Sync, |
304 | { |
305 | fn par_extend<I>(&mut self, par_iter: I) |
306 | where |
307 | I: IntoParallelIterator<Item = &'a T>, |
308 | { |
309 | self.par_extend(par_iter:par_iter.into_par_iter().copied()) |
310 | } |
311 | } |
312 | |
313 | struct ListConsumer; |
314 | |
315 | struct ListFolder<T> { |
316 | list: LinkedList<T>, |
317 | } |
318 | |
319 | struct ListReducer; |
320 | |
321 | impl<T: Send> Consumer<T> for ListConsumer { |
322 | type Folder = ListFolder<T>; |
323 | type Reducer = ListReducer; |
324 | type Result = LinkedList<T>; |
325 | |
326 | fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) { |
327 | (Self, Self, ListReducer) |
328 | } |
329 | |
330 | fn into_folder(self) -> Self::Folder { |
331 | ListFolder { |
332 | list: LinkedList::new(), |
333 | } |
334 | } |
335 | |
336 | fn full(&self) -> bool { |
337 | false |
338 | } |
339 | } |
340 | |
341 | impl<T: Send> UnindexedConsumer<T> for ListConsumer { |
342 | fn split_off_left(&self) -> Self { |
343 | Self |
344 | } |
345 | |
346 | fn to_reducer(&self) -> Self::Reducer { |
347 | ListReducer |
348 | } |
349 | } |
350 | |
351 | impl<T> Folder<T> for ListFolder<T> { |
352 | type Result = LinkedList<T>; |
353 | |
354 | fn consume(mut self, item: T) -> Self { |
355 | self.list.push_back(elt:item); |
356 | self |
357 | } |
358 | |
359 | fn consume_iter<I>(mut self, iter: I) -> Self |
360 | where |
361 | I: IntoIterator<Item = T>, |
362 | { |
363 | self.list.extend(iter); |
364 | self |
365 | } |
366 | |
367 | fn complete(self) -> Self::Result { |
368 | self.list |
369 | } |
370 | |
371 | fn full(&self) -> bool { |
372 | false |
373 | } |
374 | } |
375 | |
376 | impl<T> Reducer<LinkedList<T>> for ListReducer { |
377 | fn reduce(self, mut left: LinkedList<T>, mut right: LinkedList<T>) -> LinkedList<T> { |
378 | left.append(&mut right); |
379 | left |
380 | } |
381 | } |
382 | |
383 | /// Extends an OS-string with string slices from a parallel iterator. |
384 | impl<'a> ParallelExtend<&'a OsStr> for OsString { |
385 | fn par_extend<I>(&mut self, par_iter: I) |
386 | where |
387 | I: IntoParallelIterator<Item = &'a OsStr>, |
388 | { |
389 | extend_reserved!(self, par_iter, osstring_len); |
390 | } |
391 | } |
392 | |
393 | /// Extends an OS-string with strings from a parallel iterator. |
394 | impl ParallelExtend<OsString> for OsString { |
395 | fn par_extend<I>(&mut self, par_iter: I) |
396 | where |
397 | I: IntoParallelIterator<Item = OsString>, |
398 | { |
399 | extend_reserved!(self, par_iter, osstring_len); |
400 | } |
401 | } |
402 | |
403 | /// Extends an OS-string with string slices from a parallel iterator. |
404 | impl<'a> ParallelExtend<Cow<'a, OsStr>> for OsString { |
405 | fn par_extend<I>(&mut self, par_iter: I) |
406 | where |
407 | I: IntoParallelIterator<Item = Cow<'a, OsStr>>, |
408 | { |
409 | extend_reserved!(self, par_iter, osstring_len); |
410 | } |
411 | } |
412 | |
413 | /// Extends a string with characters from a parallel iterator. |
414 | impl ParallelExtend<char> for String { |
415 | fn par_extend<I>(&mut self, par_iter: I) |
416 | where |
417 | I: IntoParallelIterator<Item = char>, |
418 | { |
419 | // This is like `extend`, but `Vec<char>` is less efficient to deal |
420 | // with than `String`, so instead collect to `LinkedList<String>`. |
421 | let list: LinkedList = par_iter.into_par_iter().drive_unindexed(consumer:ListStringConsumer); |
422 | self.reserve(additional:list.iter().map(String::len).sum()); |
423 | self.extend(iter:list); |
424 | } |
425 | } |
426 | |
427 | /// Extends a string with copied characters from a parallel iterator. |
428 | impl<'a> ParallelExtend<&'a char> for String { |
429 | fn par_extend<I>(&mut self, par_iter: I) |
430 | where |
431 | I: IntoParallelIterator<Item = &'a char>, |
432 | { |
433 | self.par_extend(par_iter:par_iter.into_par_iter().copied()) |
434 | } |
435 | } |
436 | |
437 | struct ListStringConsumer; |
438 | |
439 | struct ListStringFolder { |
440 | string: String, |
441 | } |
442 | |
443 | impl Consumer<char> for ListStringConsumer { |
444 | type Folder = ListStringFolder; |
445 | type Reducer = ListReducer; |
446 | type Result = LinkedList<String>; |
447 | |
448 | fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) { |
449 | (Self, Self, ListReducer) |
450 | } |
451 | |
452 | fn into_folder(self) -> Self::Folder { |
453 | ListStringFolder { |
454 | string: String::new(), |
455 | } |
456 | } |
457 | |
458 | fn full(&self) -> bool { |
459 | false |
460 | } |
461 | } |
462 | |
463 | impl UnindexedConsumer<char> for ListStringConsumer { |
464 | fn split_off_left(&self) -> Self { |
465 | Self |
466 | } |
467 | |
468 | fn to_reducer(&self) -> Self::Reducer { |
469 | ListReducer |
470 | } |
471 | } |
472 | |
473 | impl Folder<char> for ListStringFolder { |
474 | type Result = LinkedList<String>; |
475 | |
476 | fn consume(mut self, item: char) -> Self { |
477 | self.string.push(item); |
478 | self |
479 | } |
480 | |
481 | fn consume_iter<I>(mut self, iter: I) -> Self |
482 | where |
483 | I: IntoIterator<Item = char>, |
484 | { |
485 | self.string.extend(iter); |
486 | self |
487 | } |
488 | |
489 | fn complete(self) -> Self::Result { |
490 | let mut list = LinkedList::new(); |
491 | if !self.string.is_empty() { |
492 | list.push_back(self.string); |
493 | } |
494 | list |
495 | } |
496 | |
497 | fn full(&self) -> bool { |
498 | false |
499 | } |
500 | } |
501 | |
502 | /// Extends a string with string slices from a parallel iterator. |
503 | impl<'a> ParallelExtend<&'a str> for String { |
504 | fn par_extend<I>(&mut self, par_iter: I) |
505 | where |
506 | I: IntoParallelIterator<Item = &'a str>, |
507 | { |
508 | extend_reserved!(self, par_iter, string_len); |
509 | } |
510 | } |
511 | |
512 | /// Extends a string with strings from a parallel iterator. |
513 | impl ParallelExtend<String> for String { |
514 | fn par_extend<I>(&mut self, par_iter: I) |
515 | where |
516 | I: IntoParallelIterator<Item = String>, |
517 | { |
518 | extend_reserved!(self, par_iter, string_len); |
519 | } |
520 | } |
521 | |
522 | /// Extends a string with boxed strings from a parallel iterator. |
523 | impl ParallelExtend<Box<str>> for String { |
524 | fn par_extend<I>(&mut self, par_iter: I) |
525 | where |
526 | I: IntoParallelIterator<Item = Box<str>>, |
527 | { |
528 | extend_reserved!(self, par_iter, string_len); |
529 | } |
530 | } |
531 | |
532 | /// Extends a string with string slices from a parallel iterator. |
533 | impl<'a> ParallelExtend<Cow<'a, str>> for String { |
534 | fn par_extend<I>(&mut self, par_iter: I) |
535 | where |
536 | I: IntoParallelIterator<Item = Cow<'a, str>>, |
537 | { |
538 | extend_reserved!(self, par_iter, string_len); |
539 | } |
540 | } |
541 | |
542 | /// Extends a deque with items from a parallel iterator. |
543 | impl<T> ParallelExtend<T> for VecDeque<T> |
544 | where |
545 | T: Send, |
546 | { |
547 | fn par_extend<I>(&mut self, par_iter: I) |
548 | where |
549 | I: IntoParallelIterator<Item = T>, |
550 | { |
551 | extend_reserved!(self, par_iter); |
552 | } |
553 | } |
554 | |
555 | /// Extends a deque with copied items from a parallel iterator. |
556 | impl<'a, T> ParallelExtend<&'a T> for VecDeque<T> |
557 | where |
558 | T: 'a + Copy + Send + Sync, |
559 | { |
560 | fn par_extend<I>(&mut self, par_iter: I) |
561 | where |
562 | I: IntoParallelIterator<Item = &'a T>, |
563 | { |
564 | extend_reserved!(self, par_iter); |
565 | } |
566 | } |
567 | |
568 | /// Extends a vector with items from a parallel iterator. |
569 | impl<T> ParallelExtend<T> for Vec<T> |
570 | where |
571 | T: Send, |
572 | { |
573 | fn par_extend<I>(&mut self, par_iter: I) |
574 | where |
575 | I: IntoParallelIterator<Item = T>, |
576 | { |
577 | // See the vec_collect benchmarks in rayon-demo for different strategies. |
578 | let par_iter = par_iter.into_par_iter(); |
579 | match par_iter.opt_len() { |
580 | Some(len) => { |
581 | // When Rust gets specialization, we can get here for indexed iterators |
582 | // without relying on `opt_len`. Until then, `special_extend()` fakes |
583 | // an unindexed mode on the promise that `opt_len()` is accurate. |
584 | super::collect::special_extend(par_iter, len, self); |
585 | } |
586 | None => { |
587 | // This works like `extend`, but `Vec::append` is more efficient. |
588 | let list = par_iter.drive_unindexed(ListVecConsumer); |
589 | self.reserve(list.iter().map(Vec::len).sum()); |
590 | for mut other in list { |
591 | self.append(&mut other); |
592 | } |
593 | } |
594 | } |
595 | } |
596 | } |
597 | |
598 | /// Extends a vector with copied items from a parallel iterator. |
599 | impl<'a, T> ParallelExtend<&'a T> for Vec<T> |
600 | where |
601 | T: 'a + Copy + Send + Sync, |
602 | { |
603 | fn par_extend<I>(&mut self, par_iter: I) |
604 | where |
605 | I: IntoParallelIterator<Item = &'a T>, |
606 | { |
607 | self.par_extend(par_iter:par_iter.into_par_iter().copied()) |
608 | } |
609 | } |
610 | |
611 | /// Collapses all unit items from a parallel iterator into one. |
612 | impl ParallelExtend<()> for () { |
613 | fn par_extend<I>(&mut self, par_iter: I) |
614 | where |
615 | I: IntoParallelIterator<Item = ()>, |
616 | { |
617 | par_iter.into_par_iter().drive_unindexed(consumer:NoopConsumer) |
618 | } |
619 | } |
620 | |