• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use super::noop::NoopConsumer;
2 use super::plumbing::{Consumer, Folder, Reducer, UnindexedConsumer};
3 use super::{IntoParallelIterator, ParallelExtend, ParallelIterator};
4 
5 use either::Either;
6 use std::borrow::Cow;
7 use std::collections::LinkedList;
8 use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
9 use std::collections::{BinaryHeap, VecDeque};
10 use std::ffi::{OsStr, OsString};
11 use std::hash::{BuildHasher, Hash};
12 
13 /// Performs a generic `par_extend` by collecting to a `LinkedList<Vec<_>>` in
14 /// parallel, then extending the collection sequentially.
15 macro_rules! extend {
16     ($self:ident, $par_iter:ident) => {
17         extend!($self <- fast_collect($par_iter))
18     };
19     ($self:ident <- $vecs:expr) => {
20         match $vecs {
21             Either::Left(vec) => $self.extend(vec),
22             Either::Right(list) => {
23                 for vec in list {
24                     $self.extend(vec);
25                 }
26             }
27         }
28     };
29 }
30 macro_rules! extend_reserved {
31     ($self:ident, $par_iter:ident, $len:ident) => {
32         let vecs = fast_collect($par_iter);
33         $self.reserve($len(&vecs));
34         extend!($self <- vecs)
35     };
36     ($self:ident, $par_iter:ident) => {
37         extend_reserved!($self, $par_iter, len)
38     };
39 }
40 
41 /// Computes the total length of a `fast_collect` result.
len<T>(vecs: &Either<Vec<T>, LinkedList<Vec<T>>>) -> usize42 fn len<T>(vecs: &Either<Vec<T>, LinkedList<Vec<T>>>) -> usize {
43     match vecs {
44         Either::Left(vec) => vec.len(),
45         Either::Right(list) => list.iter().map(Vec::len).sum(),
46     }
47 }
48 
49 /// Computes the total string length of a `fast_collect` result.
string_len<T: AsRef<str>>(vecs: &Either<Vec<T>, LinkedList<Vec<T>>>) -> usize50 fn string_len<T: AsRef<str>>(vecs: &Either<Vec<T>, LinkedList<Vec<T>>>) -> usize {
51     let strs = match vecs {
52         Either::Left(vec) => Either::Left(vec.iter()),
53         Either::Right(list) => Either::Right(list.iter().flatten()),
54     };
55     strs.map(AsRef::as_ref).map(str::len).sum()
56 }
57 
58 /// Computes the total OS-string length of a `fast_collect` result.
osstring_len<T: AsRef<OsStr>>(vecs: &Either<Vec<T>, LinkedList<Vec<T>>>) -> usize59 fn osstring_len<T: AsRef<OsStr>>(vecs: &Either<Vec<T>, LinkedList<Vec<T>>>) -> usize {
60     let osstrs = match vecs {
61         Either::Left(vec) => Either::Left(vec.iter()),
62         Either::Right(list) => Either::Right(list.iter().flatten()),
63     };
64     osstrs.map(AsRef::as_ref).map(OsStr::len).sum()
65 }
66 
fast_collect<I, T>(pi: I) -> Either<Vec<T>, LinkedList<Vec<T>>> where I: IntoParallelIterator<Item = T>, T: Send,67 pub(super) fn fast_collect<I, T>(pi: I) -> Either<Vec<T>, LinkedList<Vec<T>>>
68 where
69     I: IntoParallelIterator<Item = T>,
70     T: Send,
71 {
72     let par_iter = pi.into_par_iter();
73     match par_iter.opt_len() {
74         Some(len) => {
75             // Pseudo-specialization. See impl of ParallelExtend for Vec for more details.
76             let mut vec = Vec::new();
77             super::collect::special_extend(par_iter, len, &mut vec);
78             Either::Left(vec)
79         }
80         None => Either::Right(par_iter.drive_unindexed(ListVecConsumer)),
81     }
82 }
83 
84 struct ListVecConsumer;
85 
86 struct ListVecFolder<T> {
87     vec: Vec<T>,
88 }
89 
90 impl<T: Send> Consumer<T> for ListVecConsumer {
91     type Folder = ListVecFolder<T>;
92     type Reducer = ListReducer;
93     type Result = LinkedList<Vec<T>>;
94 
split_at(self, _index: usize) -> (Self, Self, Self::Reducer)95     fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
96         (Self, Self, ListReducer)
97     }
98 
into_folder(self) -> Self::Folder99     fn into_folder(self) -> Self::Folder {
100         ListVecFolder { vec: Vec::new() }
101     }
102 
full(&self) -> bool103     fn full(&self) -> bool {
104         false
105     }
106 }
107 
108 impl<T: Send> UnindexedConsumer<T> for ListVecConsumer {
split_off_left(&self) -> Self109     fn split_off_left(&self) -> Self {
110         Self
111     }
112 
to_reducer(&self) -> Self::Reducer113     fn to_reducer(&self) -> Self::Reducer {
114         ListReducer
115     }
116 }
117 
118 impl<T> Folder<T> for ListVecFolder<T> {
119     type Result = LinkedList<Vec<T>>;
120 
consume(mut self, item: T) -> Self121     fn consume(mut self, item: T) -> Self {
122         self.vec.push(item);
123         self
124     }
125 
consume_iter<I>(mut self, iter: I) -> Self where I: IntoIterator<Item = T>,126     fn consume_iter<I>(mut self, iter: I) -> Self
127     where
128         I: IntoIterator<Item = T>,
129     {
130         self.vec.extend(iter);
131         self
132     }
133 
complete(self) -> Self::Result134     fn complete(self) -> Self::Result {
135         let mut list = LinkedList::new();
136         if !self.vec.is_empty() {
137             list.push_back(self.vec);
138         }
139         list
140     }
141 
full(&self) -> bool142     fn full(&self) -> bool {
143         false
144     }
145 }
146 
147 /// Extends a binary heap with items from a parallel iterator.
148 impl<T> ParallelExtend<T> for BinaryHeap<T>
149 where
150     T: Ord + Send,
151 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = T>,152     fn par_extend<I>(&mut self, par_iter: I)
153     where
154         I: IntoParallelIterator<Item = T>,
155     {
156         extend_reserved!(self, par_iter);
157     }
158 }
159 
160 /// Extends a binary heap with copied items from a parallel iterator.
161 impl<'a, T> ParallelExtend<&'a T> for BinaryHeap<T>
162 where
163     T: 'a + Copy + Ord + Send + Sync,
164 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = &'a T>,165     fn par_extend<I>(&mut self, par_iter: I)
166     where
167         I: IntoParallelIterator<Item = &'a T>,
168     {
169         extend_reserved!(self, par_iter);
170     }
171 }
172 
173 /// Extends a B-tree map with items from a parallel iterator.
174 impl<K, V> ParallelExtend<(K, V)> for BTreeMap<K, V>
175 where
176     K: Ord + Send,
177     V: Send,
178 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = (K, V)>,179     fn par_extend<I>(&mut self, par_iter: I)
180     where
181         I: IntoParallelIterator<Item = (K, V)>,
182     {
183         extend!(self, par_iter);
184     }
185 }
186 
187 /// Extends a B-tree map with copied items from a parallel iterator.
188 impl<'a, K: 'a, V: 'a> ParallelExtend<(&'a K, &'a V)> for BTreeMap<K, V>
189 where
190     K: Copy + Ord + Send + Sync,
191     V: Copy + Send + Sync,
192 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = (&'a K, &'a V)>,193     fn par_extend<I>(&mut self, par_iter: I)
194     where
195         I: IntoParallelIterator<Item = (&'a K, &'a V)>,
196     {
197         extend!(self, par_iter);
198     }
199 }
200 
201 /// Extends a B-tree set with items from a parallel iterator.
202 impl<T> ParallelExtend<T> for BTreeSet<T>
203 where
204     T: Ord + Send,
205 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = T>,206     fn par_extend<I>(&mut self, par_iter: I)
207     where
208         I: IntoParallelIterator<Item = T>,
209     {
210         extend!(self, par_iter);
211     }
212 }
213 
214 /// Extends a B-tree set with copied items from a parallel iterator.
215 impl<'a, T> ParallelExtend<&'a T> for BTreeSet<T>
216 where
217     T: 'a + Copy + Ord + Send + Sync,
218 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = &'a T>,219     fn par_extend<I>(&mut self, par_iter: I)
220     where
221         I: IntoParallelIterator<Item = &'a T>,
222     {
223         extend!(self, par_iter);
224     }
225 }
226 
227 /// Extends a hash map with items from a parallel iterator.
228 impl<K, V, S> ParallelExtend<(K, V)> for HashMap<K, V, S>
229 where
230     K: Eq + Hash + Send,
231     V: Send,
232     S: BuildHasher + Send,
233 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = (K, V)>,234     fn par_extend<I>(&mut self, par_iter: I)
235     where
236         I: IntoParallelIterator<Item = (K, V)>,
237     {
238         // See the map_collect benchmarks in rayon-demo for different strategies.
239         extend_reserved!(self, par_iter);
240     }
241 }
242 
243 /// Extends a hash map with copied items from a parallel iterator.
244 impl<'a, K: 'a, V: 'a, S> ParallelExtend<(&'a K, &'a V)> for HashMap<K, V, S>
245 where
246     K: Copy + Eq + Hash + Send + Sync,
247     V: Copy + Send + Sync,
248     S: BuildHasher + Send,
249 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = (&'a K, &'a V)>,250     fn par_extend<I>(&mut self, par_iter: I)
251     where
252         I: IntoParallelIterator<Item = (&'a K, &'a V)>,
253     {
254         extend_reserved!(self, par_iter);
255     }
256 }
257 
258 /// Extends a hash set with items from a parallel iterator.
259 impl<T, S> ParallelExtend<T> for HashSet<T, S>
260 where
261     T: Eq + Hash + Send,
262     S: BuildHasher + Send,
263 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = T>,264     fn par_extend<I>(&mut self, par_iter: I)
265     where
266         I: IntoParallelIterator<Item = T>,
267     {
268         extend_reserved!(self, par_iter);
269     }
270 }
271 
272 /// Extends a hash set with copied items from a parallel iterator.
273 impl<'a, T, S> ParallelExtend<&'a T> for HashSet<T, S>
274 where
275     T: 'a + Copy + Eq + Hash + Send + Sync,
276     S: BuildHasher + Send,
277 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = &'a T>,278     fn par_extend<I>(&mut self, par_iter: I)
279     where
280         I: IntoParallelIterator<Item = &'a T>,
281     {
282         extend_reserved!(self, par_iter);
283     }
284 }
285 
286 /// Extends a linked list with items from a parallel iterator.
287 impl<T> ParallelExtend<T> for LinkedList<T>
288 where
289     T: Send,
290 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = T>,291     fn par_extend<I>(&mut self, par_iter: I)
292     where
293         I: IntoParallelIterator<Item = T>,
294     {
295         let mut list = par_iter.into_par_iter().drive_unindexed(ListConsumer);
296         self.append(&mut list);
297     }
298 }
299 
300 /// Extends a linked list with copied items from a parallel iterator.
301 impl<'a, T> ParallelExtend<&'a T> for LinkedList<T>
302 where
303     T: 'a + Copy + Send + Sync,
304 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = &'a T>,305     fn par_extend<I>(&mut self, par_iter: I)
306     where
307         I: IntoParallelIterator<Item = &'a T>,
308     {
309         self.par_extend(par_iter.into_par_iter().copied())
310     }
311 }
312 
313 struct ListConsumer;
314 
315 struct ListFolder<T> {
316     list: LinkedList<T>,
317 }
318 
319 struct ListReducer;
320 
321 impl<T: Send> Consumer<T> for ListConsumer {
322     type Folder = ListFolder<T>;
323     type Reducer = ListReducer;
324     type Result = LinkedList<T>;
325 
split_at(self, _index: usize) -> (Self, Self, Self::Reducer)326     fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
327         (Self, Self, ListReducer)
328     }
329 
into_folder(self) -> Self::Folder330     fn into_folder(self) -> Self::Folder {
331         ListFolder {
332             list: LinkedList::new(),
333         }
334     }
335 
full(&self) -> bool336     fn full(&self) -> bool {
337         false
338     }
339 }
340 
341 impl<T: Send> UnindexedConsumer<T> for ListConsumer {
split_off_left(&self) -> Self342     fn split_off_left(&self) -> Self {
343         Self
344     }
345 
to_reducer(&self) -> Self::Reducer346     fn to_reducer(&self) -> Self::Reducer {
347         ListReducer
348     }
349 }
350 
351 impl<T> Folder<T> for ListFolder<T> {
352     type Result = LinkedList<T>;
353 
consume(mut self, item: T) -> Self354     fn consume(mut self, item: T) -> Self {
355         self.list.push_back(item);
356         self
357     }
358 
consume_iter<I>(mut self, iter: I) -> Self where I: IntoIterator<Item = T>,359     fn consume_iter<I>(mut self, iter: I) -> Self
360     where
361         I: IntoIterator<Item = T>,
362     {
363         self.list.extend(iter);
364         self
365     }
366 
complete(self) -> Self::Result367     fn complete(self) -> Self::Result {
368         self.list
369     }
370 
full(&self) -> bool371     fn full(&self) -> bool {
372         false
373     }
374 }
375 
376 impl<T> Reducer<LinkedList<T>> for ListReducer {
reduce(self, mut left: LinkedList<T>, mut right: LinkedList<T>) -> LinkedList<T>377     fn reduce(self, mut left: LinkedList<T>, mut right: LinkedList<T>) -> LinkedList<T> {
378         left.append(&mut right);
379         left
380     }
381 }
382 
383 /// Extends an OS-string with string slices from a parallel iterator.
384 impl<'a> ParallelExtend<&'a OsStr> for OsString {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = &'a OsStr>,385     fn par_extend<I>(&mut self, par_iter: I)
386     where
387         I: IntoParallelIterator<Item = &'a OsStr>,
388     {
389         extend_reserved!(self, par_iter, osstring_len);
390     }
391 }
392 
393 /// Extends an OS-string with strings from a parallel iterator.
394 impl ParallelExtend<OsString> for OsString {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = OsString>,395     fn par_extend<I>(&mut self, par_iter: I)
396     where
397         I: IntoParallelIterator<Item = OsString>,
398     {
399         extend_reserved!(self, par_iter, osstring_len);
400     }
401 }
402 
403 /// Extends an OS-string with string slices from a parallel iterator.
404 impl<'a> ParallelExtend<Cow<'a, OsStr>> for OsString {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = Cow<'a, OsStr>>,405     fn par_extend<I>(&mut self, par_iter: I)
406     where
407         I: IntoParallelIterator<Item = Cow<'a, OsStr>>,
408     {
409         extend_reserved!(self, par_iter, osstring_len);
410     }
411 }
412 
413 /// Extends a string with characters from a parallel iterator.
414 impl ParallelExtend<char> for String {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = char>,415     fn par_extend<I>(&mut self, par_iter: I)
416     where
417         I: IntoParallelIterator<Item = char>,
418     {
419         // This is like `extend`, but `Vec<char>` is less efficient to deal
420         // with than `String`, so instead collect to `LinkedList<String>`.
421         let list = par_iter.into_par_iter().drive_unindexed(ListStringConsumer);
422         self.reserve(list.iter().map(String::len).sum());
423         self.extend(list);
424     }
425 }
426 
427 /// Extends a string with copied characters from a parallel iterator.
428 impl<'a> ParallelExtend<&'a char> for String {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = &'a char>,429     fn par_extend<I>(&mut self, par_iter: I)
430     where
431         I: IntoParallelIterator<Item = &'a char>,
432     {
433         self.par_extend(par_iter.into_par_iter().copied())
434     }
435 }
436 
437 struct ListStringConsumer;
438 
439 struct ListStringFolder {
440     string: String,
441 }
442 
443 impl Consumer<char> for ListStringConsumer {
444     type Folder = ListStringFolder;
445     type Reducer = ListReducer;
446     type Result = LinkedList<String>;
447 
split_at(self, _index: usize) -> (Self, Self, Self::Reducer)448     fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
449         (Self, Self, ListReducer)
450     }
451 
into_folder(self) -> Self::Folder452     fn into_folder(self) -> Self::Folder {
453         ListStringFolder {
454             string: String::new(),
455         }
456     }
457 
full(&self) -> bool458     fn full(&self) -> bool {
459         false
460     }
461 }
462 
463 impl UnindexedConsumer<char> for ListStringConsumer {
split_off_left(&self) -> Self464     fn split_off_left(&self) -> Self {
465         Self
466     }
467 
to_reducer(&self) -> Self::Reducer468     fn to_reducer(&self) -> Self::Reducer {
469         ListReducer
470     }
471 }
472 
473 impl Folder<char> for ListStringFolder {
474     type Result = LinkedList<String>;
475 
consume(mut self, item: char) -> Self476     fn consume(mut self, item: char) -> Self {
477         self.string.push(item);
478         self
479     }
480 
consume_iter<I>(mut self, iter: I) -> Self where I: IntoIterator<Item = char>,481     fn consume_iter<I>(mut self, iter: I) -> Self
482     where
483         I: IntoIterator<Item = char>,
484     {
485         self.string.extend(iter);
486         self
487     }
488 
complete(self) -> Self::Result489     fn complete(self) -> Self::Result {
490         let mut list = LinkedList::new();
491         if !self.string.is_empty() {
492             list.push_back(self.string);
493         }
494         list
495     }
496 
full(&self) -> bool497     fn full(&self) -> bool {
498         false
499     }
500 }
501 
502 /// Extends a string with string slices from a parallel iterator.
503 impl<'a> ParallelExtend<&'a str> for String {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = &'a str>,504     fn par_extend<I>(&mut self, par_iter: I)
505     where
506         I: IntoParallelIterator<Item = &'a str>,
507     {
508         extend_reserved!(self, par_iter, string_len);
509     }
510 }
511 
512 /// Extends a string with strings from a parallel iterator.
513 impl ParallelExtend<String> for String {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = String>,514     fn par_extend<I>(&mut self, par_iter: I)
515     where
516         I: IntoParallelIterator<Item = String>,
517     {
518         extend_reserved!(self, par_iter, string_len);
519     }
520 }
521 
522 /// Extends a string with boxed strings from a parallel iterator.
523 impl ParallelExtend<Box<str>> for String {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = Box<str>>,524     fn par_extend<I>(&mut self, par_iter: I)
525     where
526         I: IntoParallelIterator<Item = Box<str>>,
527     {
528         extend_reserved!(self, par_iter, string_len);
529     }
530 }
531 
532 /// Extends a string with string slices from a parallel iterator.
533 impl<'a> ParallelExtend<Cow<'a, str>> for String {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = Cow<'a, str>>,534     fn par_extend<I>(&mut self, par_iter: I)
535     where
536         I: IntoParallelIterator<Item = Cow<'a, str>>,
537     {
538         extend_reserved!(self, par_iter, string_len);
539     }
540 }
541 
542 /// Extends a deque with items from a parallel iterator.
543 impl<T> ParallelExtend<T> for VecDeque<T>
544 where
545     T: Send,
546 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = T>,547     fn par_extend<I>(&mut self, par_iter: I)
548     where
549         I: IntoParallelIterator<Item = T>,
550     {
551         extend_reserved!(self, par_iter);
552     }
553 }
554 
555 /// Extends a deque with copied items from a parallel iterator.
556 impl<'a, T> ParallelExtend<&'a T> for VecDeque<T>
557 where
558     T: 'a + Copy + Send + Sync,
559 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = &'a T>,560     fn par_extend<I>(&mut self, par_iter: I)
561     where
562         I: IntoParallelIterator<Item = &'a T>,
563     {
564         extend_reserved!(self, par_iter);
565     }
566 }
567 
568 /// Extends a vector with items from a parallel iterator.
569 impl<T> ParallelExtend<T> for Vec<T>
570 where
571     T: Send,
572 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = T>,573     fn par_extend<I>(&mut self, par_iter: I)
574     where
575         I: IntoParallelIterator<Item = T>,
576     {
577         // See the vec_collect benchmarks in rayon-demo for different strategies.
578         let par_iter = par_iter.into_par_iter();
579         match par_iter.opt_len() {
580             Some(len) => {
581                 // When Rust gets specialization, we can get here for indexed iterators
582                 // without relying on `opt_len`.  Until then, `special_extend()` fakes
583                 // an unindexed mode on the promise that `opt_len()` is accurate.
584                 super::collect::special_extend(par_iter, len, self);
585             }
586             None => {
587                 // This works like `extend`, but `Vec::append` is more efficient.
588                 let list = par_iter.drive_unindexed(ListVecConsumer);
589                 self.reserve(list.iter().map(Vec::len).sum());
590                 for mut other in list {
591                     self.append(&mut other);
592                 }
593             }
594         }
595     }
596 }
597 
598 /// Extends a vector with copied items from a parallel iterator.
599 impl<'a, T> ParallelExtend<&'a T> for Vec<T>
600 where
601     T: 'a + Copy + Send + Sync,
602 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = &'a T>,603     fn par_extend<I>(&mut self, par_iter: I)
604     where
605         I: IntoParallelIterator<Item = &'a T>,
606     {
607         self.par_extend(par_iter.into_par_iter().copied())
608     }
609 }
610 
611 /// Collapses all unit items from a parallel iterator into one.
612 impl ParallelExtend<()> for () {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = ()>,613     fn par_extend<I>(&mut self, par_iter: I)
614     where
615         I: IntoParallelIterator<Item = ()>,
616     {
617         par_iter.into_par_iter().drive_unindexed(NoopConsumer)
618     }
619 }
620