1 use crate::raw::Bucket; 2 use crate::raw::{Allocator, Global, RawIter, RawIterRange, RawTable}; 3 use crate::scopeguard::guard; 4 use alloc::alloc::dealloc; 5 use core::marker::PhantomData; 6 use core::mem; 7 use core::ptr::NonNull; 8 use rayon::iter::{ 9 plumbing::{self, Folder, UnindexedConsumer, UnindexedProducer}, 10 ParallelIterator, 11 }; 12 13 /// Parallel iterator which returns a raw pointer to every full bucket in the table. 14 pub struct RawParIter<T> { 15 iter: RawIterRange<T>, 16 } 17 18 impl<T> RawParIter<T> { 19 #[cfg_attr(feature = "inline-more", inline)] iter(&self) -> RawIterRange<T>20 pub(super) unsafe fn iter(&self) -> RawIterRange<T> { 21 self.iter.clone() 22 } 23 } 24 25 impl<T> Clone for RawParIter<T> { 26 #[cfg_attr(feature = "inline-more", inline)] clone(&self) -> Self27 fn clone(&self) -> Self { 28 Self { 29 iter: self.iter.clone(), 30 } 31 } 32 } 33 34 impl<T> From<RawIter<T>> for RawParIter<T> { from(it: RawIter<T>) -> Self35 fn from(it: RawIter<T>) -> Self { 36 RawParIter { iter: it.iter } 37 } 38 } 39 40 impl<T> ParallelIterator for RawParIter<T> { 41 type Item = Bucket<T>; 42 43 #[cfg_attr(feature = "inline-more", inline)] drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,44 fn drive_unindexed<C>(self, consumer: C) -> C::Result 45 where 46 C: UnindexedConsumer<Self::Item>, 47 { 48 let producer = ParIterProducer { iter: self.iter }; 49 plumbing::bridge_unindexed(producer, consumer) 50 } 51 } 52 53 /// Producer which returns a `Bucket<T>` for every element. 54 struct ParIterProducer<T> { 55 iter: RawIterRange<T>, 56 } 57 58 impl<T> UnindexedProducer for ParIterProducer<T> { 59 type Item = Bucket<T>; 60 61 #[cfg_attr(feature = "inline-more", inline)] split(self) -> (Self, Option<Self>)62 fn split(self) -> (Self, Option<Self>) { 63 let (left, right) = self.iter.split(); 64 let left = ParIterProducer { iter: left }; 65 let right = right.map(|right| ParIterProducer { iter: right }); 66 (left, right) 67 } 68 69 #[cfg_attr(feature = "inline-more", inline)] fold_with<F>(self, folder: F) -> F where F: Folder<Self::Item>,70 fn fold_with<F>(self, folder: F) -> F 71 where 72 F: Folder<Self::Item>, 73 { 74 folder.consume_iter(self.iter) 75 } 76 } 77 78 /// Parallel iterator which consumes a table and returns elements. 79 pub struct RawIntoParIter<T, A: Allocator + Clone = Global> { 80 table: RawTable<T, A>, 81 } 82 83 impl<T, A: Allocator + Clone> RawIntoParIter<T, A> { 84 #[cfg_attr(feature = "inline-more", inline)] par_iter(&self) -> RawParIter<T>85 pub(super) unsafe fn par_iter(&self) -> RawParIter<T> { 86 self.table.par_iter() 87 } 88 } 89 90 impl<T: Send, A: Allocator + Clone + Send> ParallelIterator for RawIntoParIter<T, A> { 91 type Item = T; 92 93 #[cfg_attr(feature = "inline-more", inline)] drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,94 fn drive_unindexed<C>(self, consumer: C) -> C::Result 95 where 96 C: UnindexedConsumer<Self::Item>, 97 { 98 let iter = unsafe { self.table.iter().iter }; 99 let _guard = guard(self.table.into_allocation(), |alloc| { 100 if let Some((ptr, layout)) = *alloc { 101 unsafe { 102 dealloc(ptr.as_ptr(), layout); 103 } 104 } 105 }); 106 let producer = ParDrainProducer { iter }; 107 plumbing::bridge_unindexed(producer, consumer) 108 } 109 } 110 111 /// Parallel iterator which consumes elements without freeing the table storage. 112 pub struct RawParDrain<'a, T, A: Allocator + Clone = Global> { 113 // We don't use a &'a mut RawTable<T> because we want RawParDrain to be 114 // covariant over T. 115 table: NonNull<RawTable<T, A>>, 116 marker: PhantomData<&'a RawTable<T, A>>, 117 } 118 119 unsafe impl<T: Send, A: Allocator + Clone> Send for RawParDrain<'_, T, A> {} 120 121 impl<T, A: Allocator + Clone> RawParDrain<'_, T, A> { 122 #[cfg_attr(feature = "inline-more", inline)] par_iter(&self) -> RawParIter<T>123 pub(super) unsafe fn par_iter(&self) -> RawParIter<T> { 124 self.table.as_ref().par_iter() 125 } 126 } 127 128 impl<T: Send, A: Allocator + Clone> ParallelIterator for RawParDrain<'_, T, A> { 129 type Item = T; 130 131 #[cfg_attr(feature = "inline-more", inline)] drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,132 fn drive_unindexed<C>(self, consumer: C) -> C::Result 133 where 134 C: UnindexedConsumer<Self::Item>, 135 { 136 let _guard = guard(self.table, |table| unsafe { 137 table.as_mut().clear_no_drop(); 138 }); 139 let iter = unsafe { self.table.as_ref().iter().iter }; 140 mem::forget(self); 141 let producer = ParDrainProducer { iter }; 142 plumbing::bridge_unindexed(producer, consumer) 143 } 144 } 145 146 impl<T, A: Allocator + Clone> Drop for RawParDrain<'_, T, A> { drop(&mut self)147 fn drop(&mut self) { 148 // If drive_unindexed is not called then simply clear the table. 149 unsafe { 150 self.table.as_mut().clear(); 151 } 152 } 153 } 154 155 /// Producer which will consume all elements in the range, even if it is dropped 156 /// halfway through. 157 struct ParDrainProducer<T> { 158 iter: RawIterRange<T>, 159 } 160 161 impl<T: Send> UnindexedProducer for ParDrainProducer<T> { 162 type Item = T; 163 164 #[cfg_attr(feature = "inline-more", inline)] split(self) -> (Self, Option<Self>)165 fn split(self) -> (Self, Option<Self>) { 166 let (left, right) = self.iter.clone().split(); 167 mem::forget(self); 168 let left = ParDrainProducer { iter: left }; 169 let right = right.map(|right| ParDrainProducer { iter: right }); 170 (left, right) 171 } 172 173 #[cfg_attr(feature = "inline-more", inline)] fold_with<F>(mut self, mut folder: F) -> F where F: Folder<Self::Item>,174 fn fold_with<F>(mut self, mut folder: F) -> F 175 where 176 F: Folder<Self::Item>, 177 { 178 // Make sure to modify the iterator in-place so that any remaining 179 // elements are processed in our Drop impl. 180 for item in &mut self.iter { 181 folder = folder.consume(unsafe { item.read() }); 182 if folder.full() { 183 return folder; 184 } 185 } 186 187 // If we processed all elements then we don't need to run the drop. 188 mem::forget(self); 189 folder 190 } 191 } 192 193 impl<T> Drop for ParDrainProducer<T> { 194 #[cfg_attr(feature = "inline-more", inline)] drop(&mut self)195 fn drop(&mut self) { 196 // Drop all remaining elements 197 if mem::needs_drop::<T>() { 198 for item in &mut self.iter { 199 unsafe { 200 item.drop(); 201 } 202 } 203 } 204 } 205 } 206 207 impl<T, A: Allocator + Clone> RawTable<T, A> { 208 /// Returns a parallel iterator over the elements in a `RawTable`. 209 #[cfg_attr(feature = "inline-more", inline)] par_iter(&self) -> RawParIter<T>210 pub unsafe fn par_iter(&self) -> RawParIter<T> { 211 RawParIter { 212 iter: self.iter().iter, 213 } 214 } 215 216 /// Returns a parallel iterator over the elements in a `RawTable`. 217 #[cfg_attr(feature = "inline-more", inline)] into_par_iter(self) -> RawIntoParIter<T, A>218 pub fn into_par_iter(self) -> RawIntoParIter<T, A> { 219 RawIntoParIter { table: self } 220 } 221 222 /// Returns a parallel iterator which consumes all elements of a `RawTable` 223 /// without freeing its memory allocation. 224 #[cfg_attr(feature = "inline-more", inline)] par_drain(&mut self) -> RawParDrain<'_, T, A>225 pub fn par_drain(&mut self) -> RawParDrain<'_, T, A> { 226 RawParDrain { 227 table: NonNull::from(self), 228 marker: PhantomData, 229 } 230 } 231 } 232