• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::raw::Bucket;
2 use crate::raw::{Allocator, Global, RawIter, RawIterRange, RawTable};
3 use crate::scopeguard::guard;
4 use alloc::alloc::dealloc;
5 use core::marker::PhantomData;
6 use core::mem;
7 use core::ptr::NonNull;
8 use rayon::iter::{
9     plumbing::{self, Folder, UnindexedConsumer, UnindexedProducer},
10     ParallelIterator,
11 };
12 
13 /// Parallel iterator which returns a raw pointer to every full bucket in the table.
14 pub struct RawParIter<T> {
15     iter: RawIterRange<T>,
16 }
17 
18 impl<T> RawParIter<T> {
19     #[cfg_attr(feature = "inline-more", inline)]
iter(&self) -> RawIterRange<T>20     pub(super) unsafe fn iter(&self) -> RawIterRange<T> {
21         self.iter.clone()
22     }
23 }
24 
25 impl<T> Clone for RawParIter<T> {
26     #[cfg_attr(feature = "inline-more", inline)]
clone(&self) -> Self27     fn clone(&self) -> Self {
28         Self {
29             iter: self.iter.clone(),
30         }
31     }
32 }
33 
34 impl<T> From<RawIter<T>> for RawParIter<T> {
from(it: RawIter<T>) -> Self35     fn from(it: RawIter<T>) -> Self {
36         RawParIter { iter: it.iter }
37     }
38 }
39 
40 impl<T> ParallelIterator for RawParIter<T> {
41     type Item = Bucket<T>;
42 
43     #[cfg_attr(feature = "inline-more", inline)]
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,44     fn drive_unindexed<C>(self, consumer: C) -> C::Result
45     where
46         C: UnindexedConsumer<Self::Item>,
47     {
48         let producer = ParIterProducer { iter: self.iter };
49         plumbing::bridge_unindexed(producer, consumer)
50     }
51 }
52 
53 /// Producer which returns a `Bucket<T>` for every element.
54 struct ParIterProducer<T> {
55     iter: RawIterRange<T>,
56 }
57 
58 impl<T> UnindexedProducer for ParIterProducer<T> {
59     type Item = Bucket<T>;
60 
61     #[cfg_attr(feature = "inline-more", inline)]
split(self) -> (Self, Option<Self>)62     fn split(self) -> (Self, Option<Self>) {
63         let (left, right) = self.iter.split();
64         let left = ParIterProducer { iter: left };
65         let right = right.map(|right| ParIterProducer { iter: right });
66         (left, right)
67     }
68 
69     #[cfg_attr(feature = "inline-more", inline)]
fold_with<F>(self, folder: F) -> F where F: Folder<Self::Item>,70     fn fold_with<F>(self, folder: F) -> F
71     where
72         F: Folder<Self::Item>,
73     {
74         folder.consume_iter(self.iter)
75     }
76 }
77 
78 /// Parallel iterator which consumes a table and returns elements.
79 pub struct RawIntoParIter<T, A: Allocator + Clone = Global> {
80     table: RawTable<T, A>,
81 }
82 
83 impl<T, A: Allocator + Clone> RawIntoParIter<T, A> {
84     #[cfg_attr(feature = "inline-more", inline)]
par_iter(&self) -> RawParIter<T>85     pub(super) unsafe fn par_iter(&self) -> RawParIter<T> {
86         self.table.par_iter()
87     }
88 }
89 
90 impl<T: Send, A: Allocator + Clone + Send> ParallelIterator for RawIntoParIter<T, A> {
91     type Item = T;
92 
93     #[cfg_attr(feature = "inline-more", inline)]
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,94     fn drive_unindexed<C>(self, consumer: C) -> C::Result
95     where
96         C: UnindexedConsumer<Self::Item>,
97     {
98         let iter = unsafe { self.table.iter().iter };
99         let _guard = guard(self.table.into_allocation(), |alloc| {
100             if let Some((ptr, layout)) = *alloc {
101                 unsafe {
102                     dealloc(ptr.as_ptr(), layout);
103                 }
104             }
105         });
106         let producer = ParDrainProducer { iter };
107         plumbing::bridge_unindexed(producer, consumer)
108     }
109 }
110 
111 /// Parallel iterator which consumes elements without freeing the table storage.
112 pub struct RawParDrain<'a, T, A: Allocator + Clone = Global> {
113     // We don't use a &'a mut RawTable<T> because we want RawParDrain to be
114     // covariant over T.
115     table: NonNull<RawTable<T, A>>,
116     marker: PhantomData<&'a RawTable<T, A>>,
117 }
118 
119 unsafe impl<T: Send, A: Allocator + Clone> Send for RawParDrain<'_, T, A> {}
120 
121 impl<T, A: Allocator + Clone> RawParDrain<'_, T, A> {
122     #[cfg_attr(feature = "inline-more", inline)]
par_iter(&self) -> RawParIter<T>123     pub(super) unsafe fn par_iter(&self) -> RawParIter<T> {
124         self.table.as_ref().par_iter()
125     }
126 }
127 
128 impl<T: Send, A: Allocator + Clone> ParallelIterator for RawParDrain<'_, T, A> {
129     type Item = T;
130 
131     #[cfg_attr(feature = "inline-more", inline)]
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,132     fn drive_unindexed<C>(self, consumer: C) -> C::Result
133     where
134         C: UnindexedConsumer<Self::Item>,
135     {
136         let _guard = guard(self.table, |table| unsafe {
137             table.as_mut().clear_no_drop();
138         });
139         let iter = unsafe { self.table.as_ref().iter().iter };
140         mem::forget(self);
141         let producer = ParDrainProducer { iter };
142         plumbing::bridge_unindexed(producer, consumer)
143     }
144 }
145 
146 impl<T, A: Allocator + Clone> Drop for RawParDrain<'_, T, A> {
drop(&mut self)147     fn drop(&mut self) {
148         // If drive_unindexed is not called then simply clear the table.
149         unsafe {
150             self.table.as_mut().clear();
151         }
152     }
153 }
154 
155 /// Producer which will consume all elements in the range, even if it is dropped
156 /// halfway through.
157 struct ParDrainProducer<T> {
158     iter: RawIterRange<T>,
159 }
160 
161 impl<T: Send> UnindexedProducer for ParDrainProducer<T> {
162     type Item = T;
163 
164     #[cfg_attr(feature = "inline-more", inline)]
split(self) -> (Self, Option<Self>)165     fn split(self) -> (Self, Option<Self>) {
166         let (left, right) = self.iter.clone().split();
167         mem::forget(self);
168         let left = ParDrainProducer { iter: left };
169         let right = right.map(|right| ParDrainProducer { iter: right });
170         (left, right)
171     }
172 
173     #[cfg_attr(feature = "inline-more", inline)]
fold_with<F>(mut self, mut folder: F) -> F where F: Folder<Self::Item>,174     fn fold_with<F>(mut self, mut folder: F) -> F
175     where
176         F: Folder<Self::Item>,
177     {
178         // Make sure to modify the iterator in-place so that any remaining
179         // elements are processed in our Drop impl.
180         for item in &mut self.iter {
181             folder = folder.consume(unsafe { item.read() });
182             if folder.full() {
183                 return folder;
184             }
185         }
186 
187         // If we processed all elements then we don't need to run the drop.
188         mem::forget(self);
189         folder
190     }
191 }
192 
193 impl<T> Drop for ParDrainProducer<T> {
194     #[cfg_attr(feature = "inline-more", inline)]
drop(&mut self)195     fn drop(&mut self) {
196         // Drop all remaining elements
197         if mem::needs_drop::<T>() {
198             for item in &mut self.iter {
199                 unsafe {
200                     item.drop();
201                 }
202             }
203         }
204     }
205 }
206 
207 impl<T, A: Allocator + Clone> RawTable<T, A> {
208     /// Returns a parallel iterator over the elements in a `RawTable`.
209     #[cfg_attr(feature = "inline-more", inline)]
par_iter(&self) -> RawParIter<T>210     pub unsafe fn par_iter(&self) -> RawParIter<T> {
211         RawParIter {
212             iter: self.iter().iter,
213         }
214     }
215 
216     /// Returns a parallel iterator over the elements in a `RawTable`.
217     #[cfg_attr(feature = "inline-more", inline)]
into_par_iter(self) -> RawIntoParIter<T, A>218     pub fn into_par_iter(self) -> RawIntoParIter<T, A> {
219         RawIntoParIter { table: self }
220     }
221 
222     /// Returns a parallel iterator which consumes all elements of a `RawTable`
223     /// without freeing its memory allocation.
224     #[cfg_attr(feature = "inline-more", inline)]
par_drain(&mut self) -> RawParDrain<'_, T, A>225     pub fn par_drain(&mut self) -> RawParDrain<'_, T, A> {
226         RawParDrain {
227             table: NonNull::from(self),
228             marker: PhantomData,
229         }
230     }
231 }
232