1 | use super::noop::NoopConsumer; |
2 | use super::plumbing::{Consumer, Folder, Reducer, UnindexedConsumer}; |
3 | use super::{IntoParallelIterator, ParallelExtend, ParallelIterator}; |
4 | |
5 | use std::borrow::Cow; |
6 | use std::collections::LinkedList; |
7 | use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; |
8 | use std::collections::{BinaryHeap, VecDeque}; |
9 | use std::hash::{BuildHasher, Hash}; |
10 | |
11 | /// Performs a generic `par_extend` by collecting to a `LinkedList<Vec<_>>` in |
12 | /// parallel, then extending the collection sequentially. |
13 | macro_rules! extend { |
14 | ($self:ident, $par_iter:ident, $extend:ident) => { |
15 | $extend( |
16 | $self, |
17 | $par_iter.into_par_iter().drive_unindexed(ListVecConsumer), |
18 | ); |
19 | }; |
20 | } |
21 | |
22 | /// Computes the total length of a `LinkedList<Vec<_>>`. |
23 | fn len<T>(list: &LinkedList<Vec<T>>) -> usize { |
24 | list.iter().map(Vec::len).sum() |
25 | } |
26 | |
27 | struct ListVecConsumer; |
28 | |
29 | struct ListVecFolder<T> { |
30 | vec: Vec<T>, |
31 | } |
32 | |
33 | impl<T: Send> Consumer<T> for ListVecConsumer { |
34 | type Folder = ListVecFolder<T>; |
35 | type Reducer = ListReducer; |
36 | type Result = LinkedList<Vec<T>>; |
37 | |
38 | fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) { |
39 | (Self, Self, ListReducer) |
40 | } |
41 | |
42 | fn into_folder(self) -> Self::Folder { |
43 | ListVecFolder { vec: Vec::new() } |
44 | } |
45 | |
46 | fn full(&self) -> bool { |
47 | false |
48 | } |
49 | } |
50 | |
51 | impl<T: Send> UnindexedConsumer<T> for ListVecConsumer { |
52 | fn split_off_left(&self) -> Self { |
53 | Self |
54 | } |
55 | |
56 | fn to_reducer(&self) -> Self::Reducer { |
57 | ListReducer |
58 | } |
59 | } |
60 | |
61 | impl<T> Folder<T> for ListVecFolder<T> { |
62 | type Result = LinkedList<Vec<T>>; |
63 | |
64 | fn consume(mut self, item: T) -> Self { |
65 | self.vec.push(item); |
66 | self |
67 | } |
68 | |
69 | fn consume_iter<I>(mut self, iter: I) -> Self |
70 | where |
71 | I: IntoIterator<Item = T>, |
72 | { |
73 | self.vec.extend(iter); |
74 | self |
75 | } |
76 | |
77 | fn complete(self) -> Self::Result { |
78 | let mut list = LinkedList::new(); |
79 | if !self.vec.is_empty() { |
80 | list.push_back(self.vec); |
81 | } |
82 | list |
83 | } |
84 | |
85 | fn full(&self) -> bool { |
86 | false |
87 | } |
88 | } |
89 | |
90 | fn heap_extend<T, Item>(heap: &mut BinaryHeap<T>, list: LinkedList<Vec<Item>>) |
91 | where |
92 | BinaryHeap<T>: Extend<Item>, |
93 | { |
94 | heap.reserve(len(&list)); |
95 | for vec in list { |
96 | heap.extend(vec); |
97 | } |
98 | } |
99 | |
100 | /// Extends a binary heap with items from a parallel iterator. |
101 | impl<T> ParallelExtend<T> for BinaryHeap<T> |
102 | where |
103 | T: Ord + Send, |
104 | { |
105 | fn par_extend<I>(&mut self, par_iter: I) |
106 | where |
107 | I: IntoParallelIterator<Item = T>, |
108 | { |
109 | extend!(self, par_iter, heap_extend); |
110 | } |
111 | } |
112 | |
113 | /// Extends a binary heap with copied items from a parallel iterator. |
114 | impl<'a, T> ParallelExtend<&'a T> for BinaryHeap<T> |
115 | where |
116 | T: 'a + Copy + Ord + Send + Sync, |
117 | { |
118 | fn par_extend<I>(&mut self, par_iter: I) |
119 | where |
120 | I: IntoParallelIterator<Item = &'a T>, |
121 | { |
122 | extend!(self, par_iter, heap_extend); |
123 | } |
124 | } |
125 | |
126 | fn btree_map_extend<K, V, Item>(map: &mut BTreeMap<K, V>, list: LinkedList<Vec<Item>>) |
127 | where |
128 | BTreeMap<K, V>: Extend<Item>, |
129 | { |
130 | for vec in list { |
131 | map.extend(vec); |
132 | } |
133 | } |
134 | |
135 | /// Extends a B-tree map with items from a parallel iterator. |
136 | impl<K, V> ParallelExtend<(K, V)> for BTreeMap<K, V> |
137 | where |
138 | K: Ord + Send, |
139 | V: Send, |
140 | { |
141 | fn par_extend<I>(&mut self, par_iter: I) |
142 | where |
143 | I: IntoParallelIterator<Item = (K, V)>, |
144 | { |
145 | extend!(self, par_iter, btree_map_extend); |
146 | } |
147 | } |
148 | |
149 | /// Extends a B-tree map with copied items from a parallel iterator. |
150 | impl<'a, K: 'a, V: 'a> ParallelExtend<(&'a K, &'a V)> for BTreeMap<K, V> |
151 | where |
152 | K: Copy + Ord + Send + Sync, |
153 | V: Copy + Send + Sync, |
154 | { |
155 | fn par_extend<I>(&mut self, par_iter: I) |
156 | where |
157 | I: IntoParallelIterator<Item = (&'a K, &'a V)>, |
158 | { |
159 | extend!(self, par_iter, btree_map_extend); |
160 | } |
161 | } |
162 | |
163 | fn btree_set_extend<T, Item>(set: &mut BTreeSet<T>, list: LinkedList<Vec<Item>>) |
164 | where |
165 | BTreeSet<T>: Extend<Item>, |
166 | { |
167 | for vec in list { |
168 | set.extend(vec); |
169 | } |
170 | } |
171 | |
172 | /// Extends a B-tree set with items from a parallel iterator. |
173 | impl<T> ParallelExtend<T> for BTreeSet<T> |
174 | where |
175 | T: Ord + Send, |
176 | { |
177 | fn par_extend<I>(&mut self, par_iter: I) |
178 | where |
179 | I: IntoParallelIterator<Item = T>, |
180 | { |
181 | extend!(self, par_iter, btree_set_extend); |
182 | } |
183 | } |
184 | |
185 | /// Extends a B-tree set with copied items from a parallel iterator. |
186 | impl<'a, T> ParallelExtend<&'a T> for BTreeSet<T> |
187 | where |
188 | T: 'a + Copy + Ord + Send + Sync, |
189 | { |
190 | fn par_extend<I>(&mut self, par_iter: I) |
191 | where |
192 | I: IntoParallelIterator<Item = &'a T>, |
193 | { |
194 | extend!(self, par_iter, btree_set_extend); |
195 | } |
196 | } |
197 | |
198 | fn hash_map_extend<K, V, S, Item>(map: &mut HashMap<K, V, S>, list: LinkedList<Vec<Item>>) |
199 | where |
200 | HashMap<K, V, S>: Extend<Item>, |
201 | K: Eq + Hash, |
202 | S: BuildHasher, |
203 | { |
204 | map.reserve(len(&list)); |
205 | for vec in list { |
206 | map.extend(vec); |
207 | } |
208 | } |
209 | |
210 | /// Extends a hash map with items from a parallel iterator. |
211 | impl<K, V, S> ParallelExtend<(K, V)> for HashMap<K, V, S> |
212 | where |
213 | K: Eq + Hash + Send, |
214 | V: Send, |
215 | S: BuildHasher + Send, |
216 | { |
217 | fn par_extend<I>(&mut self, par_iter: I) |
218 | where |
219 | I: IntoParallelIterator<Item = (K, V)>, |
220 | { |
221 | // See the map_collect benchmarks in rayon-demo for different strategies. |
222 | extend!(self, par_iter, hash_map_extend); |
223 | } |
224 | } |
225 | |
226 | /// Extends a hash map with copied items from a parallel iterator. |
227 | impl<'a, K: 'a, V: 'a, S> ParallelExtend<(&'a K, &'a V)> for HashMap<K, V, S> |
228 | where |
229 | K: Copy + Eq + Hash + Send + Sync, |
230 | V: Copy + Send + Sync, |
231 | S: BuildHasher + Send, |
232 | { |
233 | fn par_extend<I>(&mut self, par_iter: I) |
234 | where |
235 | I: IntoParallelIterator<Item = (&'a K, &'a V)>, |
236 | { |
237 | extend!(self, par_iter, hash_map_extend); |
238 | } |
239 | } |
240 | |
241 | fn hash_set_extend<T, S, Item>(set: &mut HashSet<T, S>, list: LinkedList<Vec<Item>>) |
242 | where |
243 | HashSet<T, S>: Extend<Item>, |
244 | T: Eq + Hash, |
245 | S: BuildHasher, |
246 | { |
247 | set.reserve(len(&list)); |
248 | for vec in list { |
249 | set.extend(vec); |
250 | } |
251 | } |
252 | |
253 | /// Extends a hash set with items from a parallel iterator. |
254 | impl<T, S> ParallelExtend<T> for HashSet<T, S> |
255 | where |
256 | T: Eq + Hash + Send, |
257 | S: BuildHasher + Send, |
258 | { |
259 | fn par_extend<I>(&mut self, par_iter: I) |
260 | where |
261 | I: IntoParallelIterator<Item = T>, |
262 | { |
263 | extend!(self, par_iter, hash_set_extend); |
264 | } |
265 | } |
266 | |
267 | /// Extends a hash set with copied items from a parallel iterator. |
268 | impl<'a, T, S> ParallelExtend<&'a T> for HashSet<T, S> |
269 | where |
270 | T: 'a + Copy + Eq + Hash + Send + Sync, |
271 | S: BuildHasher + Send, |
272 | { |
273 | fn par_extend<I>(&mut self, par_iter: I) |
274 | where |
275 | I: IntoParallelIterator<Item = &'a T>, |
276 | { |
277 | extend!(self, par_iter, hash_set_extend); |
278 | } |
279 | } |
280 | |
281 | /// Extends a linked list with items from a parallel iterator. |
282 | impl<T> ParallelExtend<T> for LinkedList<T> |
283 | where |
284 | T: Send, |
285 | { |
286 | fn par_extend<I>(&mut self, par_iter: I) |
287 | where |
288 | I: IntoParallelIterator<Item = T>, |
289 | { |
290 | let mut list = par_iter.into_par_iter().drive_unindexed(ListConsumer); |
291 | self.append(&mut list); |
292 | } |
293 | } |
294 | |
295 | /// Extends a linked list with copied items from a parallel iterator. |
296 | impl<'a, T> ParallelExtend<&'a T> for LinkedList<T> |
297 | where |
298 | T: 'a + Copy + Send + Sync, |
299 | { |
300 | fn par_extend<I>(&mut self, par_iter: I) |
301 | where |
302 | I: IntoParallelIterator<Item = &'a T>, |
303 | { |
304 | self.par_extend(par_iter.into_par_iter().copied()) |
305 | } |
306 | } |
307 | |
308 | struct ListConsumer; |
309 | |
310 | struct ListFolder<T> { |
311 | list: LinkedList<T>, |
312 | } |
313 | |
314 | struct ListReducer; |
315 | |
316 | impl<T: Send> Consumer<T> for ListConsumer { |
317 | type Folder = ListFolder<T>; |
318 | type Reducer = ListReducer; |
319 | type Result = LinkedList<T>; |
320 | |
321 | fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) { |
322 | (Self, Self, ListReducer) |
323 | } |
324 | |
325 | fn into_folder(self) -> Self::Folder { |
326 | ListFolder { |
327 | list: LinkedList::new(), |
328 | } |
329 | } |
330 | |
331 | fn full(&self) -> bool { |
332 | false |
333 | } |
334 | } |
335 | |
336 | impl<T: Send> UnindexedConsumer<T> for ListConsumer { |
337 | fn split_off_left(&self) -> Self { |
338 | Self |
339 | } |
340 | |
341 | fn to_reducer(&self) -> Self::Reducer { |
342 | ListReducer |
343 | } |
344 | } |
345 | |
346 | impl<T> Folder<T> for ListFolder<T> { |
347 | type Result = LinkedList<T>; |
348 | |
349 | fn consume(mut self, item: T) -> Self { |
350 | self.list.push_back(item); |
351 | self |
352 | } |
353 | |
354 | fn consume_iter<I>(mut self, iter: I) -> Self |
355 | where |
356 | I: IntoIterator<Item = T>, |
357 | { |
358 | self.list.extend(iter); |
359 | self |
360 | } |
361 | |
362 | fn complete(self) -> Self::Result { |
363 | self.list |
364 | } |
365 | |
366 | fn full(&self) -> bool { |
367 | false |
368 | } |
369 | } |
370 | |
371 | impl<T> Reducer<LinkedList<T>> for ListReducer { |
372 | fn reduce(self, mut left: LinkedList<T>, mut right: LinkedList<T>) -> LinkedList<T> { |
373 | left.append(&mut right); |
374 | left |
375 | } |
376 | } |
377 | |
378 | fn flat_string_extend(string: &mut String, list: LinkedList<String>) { |
379 | string.reserve(list.iter().map(String::len).sum()); |
380 | string.extend(list); |
381 | } |
382 | |
383 | /// Extends a string with characters from a parallel iterator. |
384 | impl ParallelExtend<char> for String { |
385 | fn par_extend<I>(&mut self, par_iter: I) |
386 | where |
387 | I: IntoParallelIterator<Item = char>, |
388 | { |
389 | // This is like `extend`, but `Vec<char>` is less efficient to deal |
390 | // with than `String`, so instead collect to `LinkedList<String>`. |
391 | let list = par_iter.into_par_iter().drive_unindexed(ListStringConsumer); |
392 | flat_string_extend(self, list); |
393 | } |
394 | } |
395 | |
396 | /// Extends a string with copied characters from a parallel iterator. |
397 | impl<'a> ParallelExtend<&'a char> for String { |
398 | fn par_extend<I>(&mut self, par_iter: I) |
399 | where |
400 | I: IntoParallelIterator<Item = &'a char>, |
401 | { |
402 | self.par_extend(par_iter.into_par_iter().copied()) |
403 | } |
404 | } |
405 | |
406 | struct ListStringConsumer; |
407 | |
408 | struct ListStringFolder { |
409 | string: String, |
410 | } |
411 | |
412 | impl Consumer<char> for ListStringConsumer { |
413 | type Folder = ListStringFolder; |
414 | type Reducer = ListReducer; |
415 | type Result = LinkedList<String>; |
416 | |
417 | fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) { |
418 | (Self, Self, ListReducer) |
419 | } |
420 | |
421 | fn into_folder(self) -> Self::Folder { |
422 | ListStringFolder { |
423 | string: String::new(), |
424 | } |
425 | } |
426 | |
427 | fn full(&self) -> bool { |
428 | false |
429 | } |
430 | } |
431 | |
432 | impl UnindexedConsumer<char> for ListStringConsumer { |
433 | fn split_off_left(&self) -> Self { |
434 | Self |
435 | } |
436 | |
437 | fn to_reducer(&self) -> Self::Reducer { |
438 | ListReducer |
439 | } |
440 | } |
441 | |
442 | impl Folder<char> for ListStringFolder { |
443 | type Result = LinkedList<String>; |
444 | |
445 | fn consume(mut self, item: char) -> Self { |
446 | self.string.push(item); |
447 | self |
448 | } |
449 | |
450 | fn consume_iter<I>(mut self, iter: I) -> Self |
451 | where |
452 | I: IntoIterator<Item = char>, |
453 | { |
454 | self.string.extend(iter); |
455 | self |
456 | } |
457 | |
458 | fn complete(self) -> Self::Result { |
459 | let mut list = LinkedList::new(); |
460 | if !self.string.is_empty() { |
461 | list.push_back(self.string); |
462 | } |
463 | list |
464 | } |
465 | |
466 | fn full(&self) -> bool { |
467 | false |
468 | } |
469 | } |
470 | |
471 | fn string_extend<Item>(string: &mut String, list: LinkedList<Vec<Item>>) |
472 | where |
473 | String: Extend<Item>, |
474 | Item: AsRef<str>, |
475 | { |
476 | let len = list.iter().flatten().map(Item::as_ref).map(str::len).sum(); |
477 | string.reserve(len); |
478 | for vec in list { |
479 | string.extend(vec); |
480 | } |
481 | } |
482 | |
483 | /// Extends a string with string slices from a parallel iterator. |
484 | impl<'a> ParallelExtend<&'a str> for String { |
485 | fn par_extend<I>(&mut self, par_iter: I) |
486 | where |
487 | I: IntoParallelIterator<Item = &'a str>, |
488 | { |
489 | extend!(self, par_iter, string_extend); |
490 | } |
491 | } |
492 | |
493 | /// Extends a string with strings from a parallel iterator. |
494 | impl ParallelExtend<String> for String { |
495 | fn par_extend<I>(&mut self, par_iter: I) |
496 | where |
497 | I: IntoParallelIterator<Item = String>, |
498 | { |
499 | extend!(self, par_iter, string_extend); |
500 | } |
501 | } |
502 | |
503 | /// Extends a string with boxed strings from a parallel iterator. |
504 | impl ParallelExtend<Box<str>> for String { |
505 | fn par_extend<I>(&mut self, par_iter: I) |
506 | where |
507 | I: IntoParallelIterator<Item = Box<str>>, |
508 | { |
509 | extend!(self, par_iter, string_extend); |
510 | } |
511 | } |
512 | |
513 | /// Extends a string with string slices from a parallel iterator. |
514 | impl<'a> ParallelExtend<Cow<'a, str>> for String { |
515 | fn par_extend<I>(&mut self, par_iter: I) |
516 | where |
517 | I: IntoParallelIterator<Item = Cow<'a, str>>, |
518 | { |
519 | extend!(self, par_iter, string_extend); |
520 | } |
521 | } |
522 | |
523 | fn deque_extend<T, Item>(deque: &mut VecDeque<T>, list: LinkedList<Vec<Item>>) |
524 | where |
525 | VecDeque<T>: Extend<Item>, |
526 | { |
527 | deque.reserve(len(&list)); |
528 | for vec in list { |
529 | deque.extend(vec); |
530 | } |
531 | } |
532 | |
533 | /// Extends a deque with items from a parallel iterator. |
534 | impl<T> ParallelExtend<T> for VecDeque<T> |
535 | where |
536 | T: Send, |
537 | { |
538 | fn par_extend<I>(&mut self, par_iter: I) |
539 | where |
540 | I: IntoParallelIterator<Item = T>, |
541 | { |
542 | extend!(self, par_iter, deque_extend); |
543 | } |
544 | } |
545 | |
546 | /// Extends a deque with copied items from a parallel iterator. |
547 | impl<'a, T> ParallelExtend<&'a T> for VecDeque<T> |
548 | where |
549 | T: 'a + Copy + Send + Sync, |
550 | { |
551 | fn par_extend<I>(&mut self, par_iter: I) |
552 | where |
553 | I: IntoParallelIterator<Item = &'a T>, |
554 | { |
555 | extend!(self, par_iter, deque_extend); |
556 | } |
557 | } |
558 | |
559 | fn vec_append<T>(vec: &mut Vec<T>, list: LinkedList<Vec<T>>) { |
560 | vec.reserve(len(&list)); |
561 | for mut other in list { |
562 | vec.append(&mut other); |
563 | } |
564 | } |
565 | |
566 | /// Extends a vector with items from a parallel iterator. |
567 | impl<T> ParallelExtend<T> for Vec<T> |
568 | where |
569 | T: Send, |
570 | { |
571 | fn par_extend<I>(&mut self, par_iter: I) |
572 | where |
573 | I: IntoParallelIterator<Item = T>, |
574 | { |
575 | // See the vec_collect benchmarks in rayon-demo for different strategies. |
576 | let par_iter = par_iter.into_par_iter(); |
577 | match par_iter.opt_len() { |
578 | Some(len) => { |
579 | // When Rust gets specialization, we can get here for indexed iterators |
580 | // without relying on `opt_len`. Until then, `special_extend()` fakes |
581 | // an unindexed mode on the promise that `opt_len()` is accurate. |
582 | super::collect::special_extend(par_iter, len, self); |
583 | } |
584 | None => { |
585 | // This works like `extend`, but `Vec::append` is more efficient. |
586 | let list = par_iter.drive_unindexed(ListVecConsumer); |
587 | vec_append(self, list); |
588 | } |
589 | } |
590 | } |
591 | } |
592 | |
593 | /// Extends a vector with copied items from a parallel iterator. |
594 | impl<'a, T> ParallelExtend<&'a T> for Vec<T> |
595 | where |
596 | T: 'a + Copy + Send + Sync, |
597 | { |
598 | fn par_extend<I>(&mut self, par_iter: I) |
599 | where |
600 | I: IntoParallelIterator<Item = &'a T>, |
601 | { |
602 | self.par_extend(par_iter.into_par_iter().copied()) |
603 | } |
604 | } |
605 | |
606 | /// Collapses all unit items from a parallel iterator into one. |
607 | impl ParallelExtend<()> for () { |
608 | fn par_extend<I>(&mut self, par_iter: I) |
609 | where |
610 | I: IntoParallelIterator<Item = ()>, |
611 | { |
612 | par_iter.into_par_iter().drive_unindexed(NoopConsumer) |
613 | } |
614 | } |
615 | |