• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use super::noop::NoopConsumer;
2 use super::plumbing::{Consumer, Folder, Reducer, UnindexedConsumer};
3 use super::{IntoParallelIterator, ParallelExtend, ParallelIterator};
4 
5 use std::borrow::Cow;
6 use std::collections::LinkedList;
7 use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
8 use std::collections::{BinaryHeap, VecDeque};
9 use std::hash::{BuildHasher, Hash};
10 
11 /// Performs a generic `par_extend` by collecting to a `LinkedList<Vec<_>>` in
12 /// parallel, then extending the collection sequentially.
13 macro_rules! extend {
14     ($self:ident, $par_iter:ident, $extend:ident) => {
15         $extend(
16             $self,
17             $par_iter.into_par_iter().drive_unindexed(ListVecConsumer),
18         );
19     };
20 }
21 
22 /// Computes the total length of a `LinkedList<Vec<_>>`.
len<T>(list: &LinkedList<Vec<T>>) -> usize23 fn len<T>(list: &LinkedList<Vec<T>>) -> usize {
24     list.iter().map(Vec::len).sum()
25 }
26 
27 struct ListVecConsumer;
28 
29 struct ListVecFolder<T> {
30     vec: Vec<T>,
31 }
32 
33 impl<T: Send> Consumer<T> for ListVecConsumer {
34     type Folder = ListVecFolder<T>;
35     type Reducer = ListReducer;
36     type Result = LinkedList<Vec<T>>;
37 
split_at(self, _index: usize) -> (Self, Self, Self::Reducer)38     fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
39         (Self, Self, ListReducer)
40     }
41 
into_folder(self) -> Self::Folder42     fn into_folder(self) -> Self::Folder {
43         ListVecFolder { vec: Vec::new() }
44     }
45 
full(&self) -> bool46     fn full(&self) -> bool {
47         false
48     }
49 }
50 
51 impl<T: Send> UnindexedConsumer<T> for ListVecConsumer {
split_off_left(&self) -> Self52     fn split_off_left(&self) -> Self {
53         Self
54     }
55 
to_reducer(&self) -> Self::Reducer56     fn to_reducer(&self) -> Self::Reducer {
57         ListReducer
58     }
59 }
60 
61 impl<T> Folder<T> for ListVecFolder<T> {
62     type Result = LinkedList<Vec<T>>;
63 
consume(mut self, item: T) -> Self64     fn consume(mut self, item: T) -> Self {
65         self.vec.push(item);
66         self
67     }
68 
consume_iter<I>(mut self, iter: I) -> Self where I: IntoIterator<Item = T>,69     fn consume_iter<I>(mut self, iter: I) -> Self
70     where
71         I: IntoIterator<Item = T>,
72     {
73         self.vec.extend(iter);
74         self
75     }
76 
complete(self) -> Self::Result77     fn complete(self) -> Self::Result {
78         let mut list = LinkedList::new();
79         if !self.vec.is_empty() {
80             list.push_back(self.vec);
81         }
82         list
83     }
84 
full(&self) -> bool85     fn full(&self) -> bool {
86         false
87     }
88 }
89 
heap_extend<T, Item>(heap: &mut BinaryHeap<T>, list: LinkedList<Vec<Item>>) where BinaryHeap<T>: Extend<Item>,90 fn heap_extend<T, Item>(heap: &mut BinaryHeap<T>, list: LinkedList<Vec<Item>>)
91 where
92     BinaryHeap<T>: Extend<Item>,
93 {
94     heap.reserve(len(&list));
95     for vec in list {
96         heap.extend(vec);
97     }
98 }
99 
100 /// Extends a binary heap with items from a parallel iterator.
101 impl<T> ParallelExtend<T> for BinaryHeap<T>
102 where
103     T: Ord + Send,
104 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = T>,105     fn par_extend<I>(&mut self, par_iter: I)
106     where
107         I: IntoParallelIterator<Item = T>,
108     {
109         extend!(self, par_iter, heap_extend);
110     }
111 }
112 
113 /// Extends a binary heap with copied items from a parallel iterator.
114 impl<'a, T> ParallelExtend<&'a T> for BinaryHeap<T>
115 where
116     T: 'a + Copy + Ord + Send + Sync,
117 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = &'a T>,118     fn par_extend<I>(&mut self, par_iter: I)
119     where
120         I: IntoParallelIterator<Item = &'a T>,
121     {
122         extend!(self, par_iter, heap_extend);
123     }
124 }
125 
btree_map_extend<K, V, Item>(map: &mut BTreeMap<K, V>, list: LinkedList<Vec<Item>>) where BTreeMap<K, V>: Extend<Item>,126 fn btree_map_extend<K, V, Item>(map: &mut BTreeMap<K, V>, list: LinkedList<Vec<Item>>)
127 where
128     BTreeMap<K, V>: Extend<Item>,
129 {
130     for vec in list {
131         map.extend(vec);
132     }
133 }
134 
135 /// Extends a B-tree map with items from a parallel iterator.
136 impl<K, V> ParallelExtend<(K, V)> for BTreeMap<K, V>
137 where
138     K: Ord + Send,
139     V: Send,
140 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = (K, V)>,141     fn par_extend<I>(&mut self, par_iter: I)
142     where
143         I: IntoParallelIterator<Item = (K, V)>,
144     {
145         extend!(self, par_iter, btree_map_extend);
146     }
147 }
148 
149 /// Extends a B-tree map with copied items from a parallel iterator.
150 impl<'a, K: 'a, V: 'a> ParallelExtend<(&'a K, &'a V)> for BTreeMap<K, V>
151 where
152     K: Copy + Ord + Send + Sync,
153     V: Copy + Send + Sync,
154 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = (&'a K, &'a V)>,155     fn par_extend<I>(&mut self, par_iter: I)
156     where
157         I: IntoParallelIterator<Item = (&'a K, &'a V)>,
158     {
159         extend!(self, par_iter, btree_map_extend);
160     }
161 }
162 
btree_set_extend<T, Item>(set: &mut BTreeSet<T>, list: LinkedList<Vec<Item>>) where BTreeSet<T>: Extend<Item>,163 fn btree_set_extend<T, Item>(set: &mut BTreeSet<T>, list: LinkedList<Vec<Item>>)
164 where
165     BTreeSet<T>: Extend<Item>,
166 {
167     for vec in list {
168         set.extend(vec);
169     }
170 }
171 
172 /// Extends a B-tree set with items from a parallel iterator.
173 impl<T> ParallelExtend<T> for BTreeSet<T>
174 where
175     T: Ord + Send,
176 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = T>,177     fn par_extend<I>(&mut self, par_iter: I)
178     where
179         I: IntoParallelIterator<Item = T>,
180     {
181         extend!(self, par_iter, btree_set_extend);
182     }
183 }
184 
185 /// Extends a B-tree set with copied items from a parallel iterator.
186 impl<'a, T> ParallelExtend<&'a T> for BTreeSet<T>
187 where
188     T: 'a + Copy + Ord + Send + Sync,
189 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = &'a T>,190     fn par_extend<I>(&mut self, par_iter: I)
191     where
192         I: IntoParallelIterator<Item = &'a T>,
193     {
194         extend!(self, par_iter, btree_set_extend);
195     }
196 }
197 
hash_map_extend<K, V, S, Item>(map: &mut HashMap<K, V, S>, list: LinkedList<Vec<Item>>) where HashMap<K, V, S>: Extend<Item>, K: Eq + Hash, S: BuildHasher,198 fn hash_map_extend<K, V, S, Item>(map: &mut HashMap<K, V, S>, list: LinkedList<Vec<Item>>)
199 where
200     HashMap<K, V, S>: Extend<Item>,
201     K: Eq + Hash,
202     S: BuildHasher,
203 {
204     map.reserve(len(&list));
205     for vec in list {
206         map.extend(vec);
207     }
208 }
209 
210 /// Extends a hash map with items from a parallel iterator.
211 impl<K, V, S> ParallelExtend<(K, V)> for HashMap<K, V, S>
212 where
213     K: Eq + Hash + Send,
214     V: Send,
215     S: BuildHasher + Send,
216 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = (K, V)>,217     fn par_extend<I>(&mut self, par_iter: I)
218     where
219         I: IntoParallelIterator<Item = (K, V)>,
220     {
221         // See the map_collect benchmarks in rayon-demo for different strategies.
222         extend!(self, par_iter, hash_map_extend);
223     }
224 }
225 
226 /// Extends a hash map with copied items from a parallel iterator.
227 impl<'a, K: 'a, V: 'a, S> ParallelExtend<(&'a K, &'a V)> for HashMap<K, V, S>
228 where
229     K: Copy + Eq + Hash + Send + Sync,
230     V: Copy + Send + Sync,
231     S: BuildHasher + Send,
232 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = (&'a K, &'a V)>,233     fn par_extend<I>(&mut self, par_iter: I)
234     where
235         I: IntoParallelIterator<Item = (&'a K, &'a V)>,
236     {
237         extend!(self, par_iter, hash_map_extend);
238     }
239 }
240 
hash_set_extend<T, S, Item>(set: &mut HashSet<T, S>, list: LinkedList<Vec<Item>>) where HashSet<T, S>: Extend<Item>, T: Eq + Hash, S: BuildHasher,241 fn hash_set_extend<T, S, Item>(set: &mut HashSet<T, S>, list: LinkedList<Vec<Item>>)
242 where
243     HashSet<T, S>: Extend<Item>,
244     T: Eq + Hash,
245     S: BuildHasher,
246 {
247     set.reserve(len(&list));
248     for vec in list {
249         set.extend(vec);
250     }
251 }
252 
253 /// Extends a hash set with items from a parallel iterator.
254 impl<T, S> ParallelExtend<T> for HashSet<T, S>
255 where
256     T: Eq + Hash + Send,
257     S: BuildHasher + Send,
258 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = T>,259     fn par_extend<I>(&mut self, par_iter: I)
260     where
261         I: IntoParallelIterator<Item = T>,
262     {
263         extend!(self, par_iter, hash_set_extend);
264     }
265 }
266 
267 /// Extends a hash set with copied items from a parallel iterator.
268 impl<'a, T, S> ParallelExtend<&'a T> for HashSet<T, S>
269 where
270     T: 'a + Copy + Eq + Hash + Send + Sync,
271     S: BuildHasher + Send,
272 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = &'a T>,273     fn par_extend<I>(&mut self, par_iter: I)
274     where
275         I: IntoParallelIterator<Item = &'a T>,
276     {
277         extend!(self, par_iter, hash_set_extend);
278     }
279 }
280 
281 /// Extends a linked list with items from a parallel iterator.
282 impl<T> ParallelExtend<T> for LinkedList<T>
283 where
284     T: Send,
285 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = T>,286     fn par_extend<I>(&mut self, par_iter: I)
287     where
288         I: IntoParallelIterator<Item = T>,
289     {
290         let mut list = par_iter.into_par_iter().drive_unindexed(ListConsumer);
291         self.append(&mut list);
292     }
293 }
294 
295 /// Extends a linked list with copied items from a parallel iterator.
296 impl<'a, T> ParallelExtend<&'a T> for LinkedList<T>
297 where
298     T: 'a + Copy + Send + Sync,
299 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = &'a T>,300     fn par_extend<I>(&mut self, par_iter: I)
301     where
302         I: IntoParallelIterator<Item = &'a T>,
303     {
304         self.par_extend(par_iter.into_par_iter().copied())
305     }
306 }
307 
308 struct ListConsumer;
309 
310 struct ListFolder<T> {
311     list: LinkedList<T>,
312 }
313 
314 struct ListReducer;
315 
316 impl<T: Send> Consumer<T> for ListConsumer {
317     type Folder = ListFolder<T>;
318     type Reducer = ListReducer;
319     type Result = LinkedList<T>;
320 
split_at(self, _index: usize) -> (Self, Self, Self::Reducer)321     fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
322         (Self, Self, ListReducer)
323     }
324 
into_folder(self) -> Self::Folder325     fn into_folder(self) -> Self::Folder {
326         ListFolder {
327             list: LinkedList::new(),
328         }
329     }
330 
full(&self) -> bool331     fn full(&self) -> bool {
332         false
333     }
334 }
335 
336 impl<T: Send> UnindexedConsumer<T> for ListConsumer {
split_off_left(&self) -> Self337     fn split_off_left(&self) -> Self {
338         Self
339     }
340 
to_reducer(&self) -> Self::Reducer341     fn to_reducer(&self) -> Self::Reducer {
342         ListReducer
343     }
344 }
345 
346 impl<T> Folder<T> for ListFolder<T> {
347     type Result = LinkedList<T>;
348 
consume(mut self, item: T) -> Self349     fn consume(mut self, item: T) -> Self {
350         self.list.push_back(item);
351         self
352     }
353 
consume_iter<I>(mut self, iter: I) -> Self where I: IntoIterator<Item = T>,354     fn consume_iter<I>(mut self, iter: I) -> Self
355     where
356         I: IntoIterator<Item = T>,
357     {
358         self.list.extend(iter);
359         self
360     }
361 
complete(self) -> Self::Result362     fn complete(self) -> Self::Result {
363         self.list
364     }
365 
full(&self) -> bool366     fn full(&self) -> bool {
367         false
368     }
369 }
370 
371 impl<T> Reducer<LinkedList<T>> for ListReducer {
reduce(self, mut left: LinkedList<T>, mut right: LinkedList<T>) -> LinkedList<T>372     fn reduce(self, mut left: LinkedList<T>, mut right: LinkedList<T>) -> LinkedList<T> {
373         left.append(&mut right);
374         left
375     }
376 }
377 
flat_string_extend(string: &mut String, list: LinkedList<String>)378 fn flat_string_extend(string: &mut String, list: LinkedList<String>) {
379     string.reserve(list.iter().map(String::len).sum());
380     string.extend(list);
381 }
382 
383 /// Extends a string with characters from a parallel iterator.
384 impl ParallelExtend<char> for String {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = char>,385     fn par_extend<I>(&mut self, par_iter: I)
386     where
387         I: IntoParallelIterator<Item = char>,
388     {
389         // This is like `extend`, but `Vec<char>` is less efficient to deal
390         // with than `String`, so instead collect to `LinkedList<String>`.
391         let list = par_iter.into_par_iter().drive_unindexed(ListStringConsumer);
392         flat_string_extend(self, list);
393     }
394 }
395 
396 /// Extends a string with copied characters from a parallel iterator.
397 impl<'a> ParallelExtend<&'a char> for String {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = &'a char>,398     fn par_extend<I>(&mut self, par_iter: I)
399     where
400         I: IntoParallelIterator<Item = &'a char>,
401     {
402         self.par_extend(par_iter.into_par_iter().copied())
403     }
404 }
405 
406 struct ListStringConsumer;
407 
408 struct ListStringFolder {
409     string: String,
410 }
411 
412 impl Consumer<char> for ListStringConsumer {
413     type Folder = ListStringFolder;
414     type Reducer = ListReducer;
415     type Result = LinkedList<String>;
416 
split_at(self, _index: usize) -> (Self, Self, Self::Reducer)417     fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
418         (Self, Self, ListReducer)
419     }
420 
into_folder(self) -> Self::Folder421     fn into_folder(self) -> Self::Folder {
422         ListStringFolder {
423             string: String::new(),
424         }
425     }
426 
full(&self) -> bool427     fn full(&self) -> bool {
428         false
429     }
430 }
431 
432 impl UnindexedConsumer<char> for ListStringConsumer {
split_off_left(&self) -> Self433     fn split_off_left(&self) -> Self {
434         Self
435     }
436 
to_reducer(&self) -> Self::Reducer437     fn to_reducer(&self) -> Self::Reducer {
438         ListReducer
439     }
440 }
441 
442 impl Folder<char> for ListStringFolder {
443     type Result = LinkedList<String>;
444 
consume(mut self, item: char) -> Self445     fn consume(mut self, item: char) -> Self {
446         self.string.push(item);
447         self
448     }
449 
consume_iter<I>(mut self, iter: I) -> Self where I: IntoIterator<Item = char>,450     fn consume_iter<I>(mut self, iter: I) -> Self
451     where
452         I: IntoIterator<Item = char>,
453     {
454         self.string.extend(iter);
455         self
456     }
457 
complete(self) -> Self::Result458     fn complete(self) -> Self::Result {
459         let mut list = LinkedList::new();
460         if !self.string.is_empty() {
461             list.push_back(self.string);
462         }
463         list
464     }
465 
full(&self) -> bool466     fn full(&self) -> bool {
467         false
468     }
469 }
470 
string_extend<Item>(string: &mut String, list: LinkedList<Vec<Item>>) where String: Extend<Item>, Item: AsRef<str>,471 fn string_extend<Item>(string: &mut String, list: LinkedList<Vec<Item>>)
472 where
473     String: Extend<Item>,
474     Item: AsRef<str>,
475 {
476     let len = list.iter().flatten().map(Item::as_ref).map(str::len).sum();
477     string.reserve(len);
478     for vec in list {
479         string.extend(vec);
480     }
481 }
482 
483 /// Extends a string with string slices from a parallel iterator.
484 impl<'a> ParallelExtend<&'a str> for String {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = &'a str>,485     fn par_extend<I>(&mut self, par_iter: I)
486     where
487         I: IntoParallelIterator<Item = &'a str>,
488     {
489         extend!(self, par_iter, string_extend);
490     }
491 }
492 
493 /// Extends a string with strings from a parallel iterator.
494 impl ParallelExtend<String> for String {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = String>,495     fn par_extend<I>(&mut self, par_iter: I)
496     where
497         I: IntoParallelIterator<Item = String>,
498     {
499         extend!(self, par_iter, string_extend);
500     }
501 }
502 
503 /// Extends a string with string slices from a parallel iterator.
504 impl<'a> ParallelExtend<Cow<'a, str>> for String {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = Cow<'a, str>>,505     fn par_extend<I>(&mut self, par_iter: I)
506     where
507         I: IntoParallelIterator<Item = Cow<'a, str>>,
508     {
509         extend!(self, par_iter, string_extend);
510     }
511 }
512 
deque_extend<T, Item>(deque: &mut VecDeque<T>, list: LinkedList<Vec<Item>>) where VecDeque<T>: Extend<Item>,513 fn deque_extend<T, Item>(deque: &mut VecDeque<T>, list: LinkedList<Vec<Item>>)
514 where
515     VecDeque<T>: Extend<Item>,
516 {
517     deque.reserve(len(&list));
518     for vec in list {
519         deque.extend(vec);
520     }
521 }
522 
523 /// Extends a deque with items from a parallel iterator.
524 impl<T> ParallelExtend<T> for VecDeque<T>
525 where
526     T: Send,
527 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = T>,528     fn par_extend<I>(&mut self, par_iter: I)
529     where
530         I: IntoParallelIterator<Item = T>,
531     {
532         extend!(self, par_iter, deque_extend);
533     }
534 }
535 
536 /// Extends a deque with copied items from a parallel iterator.
537 impl<'a, T> ParallelExtend<&'a T> for VecDeque<T>
538 where
539     T: 'a + Copy + Send + Sync,
540 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = &'a T>,541     fn par_extend<I>(&mut self, par_iter: I)
542     where
543         I: IntoParallelIterator<Item = &'a T>,
544     {
545         extend!(self, par_iter, deque_extend);
546     }
547 }
548 
vec_append<T>(vec: &mut Vec<T>, list: LinkedList<Vec<T>>)549 fn vec_append<T>(vec: &mut Vec<T>, list: LinkedList<Vec<T>>) {
550     vec.reserve(len(&list));
551     for mut other in list {
552         vec.append(&mut other);
553     }
554 }
555 
556 /// Extends a vector with items from a parallel iterator.
557 impl<T> ParallelExtend<T> for Vec<T>
558 where
559     T: Send,
560 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = T>,561     fn par_extend<I>(&mut self, par_iter: I)
562     where
563         I: IntoParallelIterator<Item = T>,
564     {
565         // See the vec_collect benchmarks in rayon-demo for different strategies.
566         let par_iter = par_iter.into_par_iter();
567         match par_iter.opt_len() {
568             Some(len) => {
569                 // When Rust gets specialization, we can get here for indexed iterators
570                 // without relying on `opt_len`.  Until then, `special_extend()` fakes
571                 // an unindexed mode on the promise that `opt_len()` is accurate.
572                 super::collect::special_extend(par_iter, len, self);
573             }
574             None => {
575                 // This works like `extend`, but `Vec::append` is more efficient.
576                 let list = par_iter.drive_unindexed(ListVecConsumer);
577                 vec_append(self, list);
578             }
579         }
580     }
581 }
582 
583 /// Extends a vector with copied items from a parallel iterator.
584 impl<'a, T> ParallelExtend<&'a T> for Vec<T>
585 where
586     T: 'a + Copy + Send + Sync,
587 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = &'a T>,588     fn par_extend<I>(&mut self, par_iter: I)
589     where
590         I: IntoParallelIterator<Item = &'a T>,
591     {
592         self.par_extend(par_iter.into_par_iter().copied())
593     }
594 }
595 
596 /// Collapses all unit items from a parallel iterator into one.
597 impl ParallelExtend<()> for () {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = ()>,598     fn par_extend<I>(&mut self, par_iter: I)
599     where
600         I: IntoParallelIterator<Item = ()>,
601     {
602         par_iter.into_par_iter().drive_unindexed(NoopConsumer)
603     }
604 }
605