• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use super::{IndexedParallelIterator, IntoParallelIterator, ParallelExtend, ParallelIterator};
2 use std::mem::MaybeUninit;
3 use std::slice;
4 
5 mod consumer;
6 use self::consumer::CollectConsumer;
7 use self::consumer::CollectResult;
8 use super::unzip::unzip_indexed;
9 
10 mod test;
11 
12 /// Collects the results of the exact iterator into the specified vector.
13 ///
14 /// This is called by `IndexedParallelIterator::collect_into_vec`.
collect_into_vec<I, T>(pi: I, v: &mut Vec<T>) where I: IndexedParallelIterator<Item = T>, T: Send,15 pub(super) fn collect_into_vec<I, T>(pi: I, v: &mut Vec<T>)
16 where
17     I: IndexedParallelIterator<Item = T>,
18     T: Send,
19 {
20     v.truncate(0); // clear any old data
21     let len = pi.len();
22     Collect::new(v, len).with_consumer(|consumer| pi.drive(consumer));
23 }
24 
25 /// Collects the results of the iterator into the specified vector.
26 ///
27 /// Technically, this only works for `IndexedParallelIterator`, but we're faking a
28 /// bit of specialization here until Rust can do that natively.  Callers are
29 /// using `opt_len` to find the length before calling this, and only exact
30 /// iterators will return anything but `None` there.
31 ///
32 /// Since the type system doesn't understand that contract, we have to allow
33 /// *any* `ParallelIterator` here, and `CollectConsumer` has to also implement
34 /// `UnindexedConsumer`.  That implementation panics `unreachable!` in case
35 /// there's a bug where we actually do try to use this unindexed.
special_extend<I, T>(pi: I, len: usize, v: &mut Vec<T>) where I: ParallelIterator<Item = T>, T: Send,36 fn special_extend<I, T>(pi: I, len: usize, v: &mut Vec<T>)
37 where
38     I: ParallelIterator<Item = T>,
39     T: Send,
40 {
41     Collect::new(v, len).with_consumer(|consumer| pi.drive_unindexed(consumer));
42 }
43 
44 /// Unzips the results of the exact iterator into the specified vectors.
45 ///
46 /// This is called by `IndexedParallelIterator::unzip_into_vecs`.
unzip_into_vecs<I, A, B>(pi: I, left: &mut Vec<A>, right: &mut Vec<B>) where I: IndexedParallelIterator<Item = (A, B)>, A: Send, B: Send,47 pub(super) fn unzip_into_vecs<I, A, B>(pi: I, left: &mut Vec<A>, right: &mut Vec<B>)
48 where
49     I: IndexedParallelIterator<Item = (A, B)>,
50     A: Send,
51     B: Send,
52 {
53     // clear any old data
54     left.truncate(0);
55     right.truncate(0);
56 
57     let len = pi.len();
58     Collect::new(right, len).with_consumer(|right_consumer| {
59         let mut right_result = None;
60         Collect::new(left, len).with_consumer(|left_consumer| {
61             let (left_r, right_r) = unzip_indexed(pi, left_consumer, right_consumer);
62             right_result = Some(right_r);
63             left_r
64         });
65         right_result.unwrap()
66     });
67 }
68 
69 /// Manage the collection vector.
70 struct Collect<'c, T: Send> {
71     vec: &'c mut Vec<T>,
72     len: usize,
73 }
74 
75 impl<'c, T: Send + 'c> Collect<'c, T> {
new(vec: &'c mut Vec<T>, len: usize) -> Self76     fn new(vec: &'c mut Vec<T>, len: usize) -> Self {
77         Collect { vec, len }
78     }
79 
80     /// Create a consumer on the slice of memory we are collecting into.
81     ///
82     /// The consumer needs to be used inside the scope function, and the
83     /// complete collect result passed back.
84     ///
85     /// This method will verify the collect result, and panic if the slice
86     /// was not fully written into. Otherwise, in the successful case,
87     /// the vector is complete with the collected result.
with_consumer<F>(mut self, scope_fn: F) where F: FnOnce(CollectConsumer<'_, T>) -> CollectResult<'_, T>,88     fn with_consumer<F>(mut self, scope_fn: F)
89     where
90         F: FnOnce(CollectConsumer<'_, T>) -> CollectResult<'_, T>,
91     {
92         let slice = Self::reserve_get_tail_slice(&mut self.vec, self.len);
93         let result = scope_fn(CollectConsumer::new(slice));
94 
95         // The CollectResult represents a contiguous part of the
96         // slice, that has been written to.
97         // On unwind here, the CollectResult will be dropped.
98         // If some producers on the way did not produce enough elements,
99         // partial CollectResults may have been dropped without
100         // being reduced to the final result, and we will see
101         // that as the length coming up short.
102         //
103         // Here, we assert that `slice` is fully initialized. This is
104         // checked by the following assert, which verifies if a
105         // complete CollectResult was produced; if the length is
106         // correct, it is necessarily covering the target slice.
107         // Since we know that the consumer cannot have escaped from
108         // `drive` (by parametricity, essentially), we know that any
109         // stores that will happen, have happened. Unless some code is buggy,
110         // that means we should have seen `len` total writes.
111         let actual_writes = result.len();
112         assert!(
113             actual_writes == self.len,
114             "expected {} total writes, but got {}",
115             self.len,
116             actual_writes
117         );
118 
119         // Release the result's mutable borrow and "proxy ownership"
120         // of the elements, before the vector takes it over.
121         result.release_ownership();
122 
123         let new_len = self.vec.len() + self.len;
124 
125         unsafe {
126             self.vec.set_len(new_len);
127         }
128     }
129 
130     /// Reserve space for `len` more elements in the vector,
131     /// and return a slice to the uninitialized tail of the vector
reserve_get_tail_slice(vec: &mut Vec<T>, len: usize) -> &mut [MaybeUninit<T>]132     fn reserve_get_tail_slice(vec: &mut Vec<T>, len: usize) -> &mut [MaybeUninit<T>] {
133         // Reserve the new space.
134         vec.reserve(len);
135 
136         // TODO: use `Vec::spare_capacity_mut` instead
137         // SAFETY: `MaybeUninit<T>` is guaranteed to have the same layout
138         // as `T`, and we already made sure to have the additional space.
139         let start = vec.len();
140         let tail_ptr = vec[start..].as_mut_ptr() as *mut MaybeUninit<T>;
141         unsafe { slice::from_raw_parts_mut(tail_ptr, len) }
142     }
143 }
144 
145 /// Extends a vector with items from a parallel iterator.
146 impl<T> ParallelExtend<T> for Vec<T>
147 where
148     T: Send,
149 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = T>,150     fn par_extend<I>(&mut self, par_iter: I)
151     where
152         I: IntoParallelIterator<Item = T>,
153     {
154         // See the vec_collect benchmarks in rayon-demo for different strategies.
155         let par_iter = par_iter.into_par_iter();
156         match par_iter.opt_len() {
157             Some(len) => {
158                 // When Rust gets specialization, we can get here for indexed iterators
159                 // without relying on `opt_len`.  Until then, `special_extend()` fakes
160                 // an unindexed mode on the promise that `opt_len()` is accurate.
161                 special_extend(par_iter, len, self);
162             }
163             None => {
164                 // This works like `extend`, but `Vec::append` is more efficient.
165                 let list = super::extend::collect(par_iter);
166                 self.reserve(super::extend::len(&list));
167                 for mut vec in list {
168                     self.append(&mut vec);
169                 }
170             }
171         }
172     }
173 }
174