1 use crate::raw::Bucket; 2 use crate::raw::{Allocator, Global, RawIter, RawIterRange, RawTable}; 3 use crate::scopeguard::guard; 4 use alloc::alloc::dealloc; 5 use core::marker::PhantomData; 6 use core::mem; 7 use core::ptr::NonNull; 8 use rayon::iter::{ 9 plumbing::{self, Folder, UnindexedConsumer, UnindexedProducer}, 10 ParallelIterator, 11 }; 12 13 /// Parallel iterator which returns a raw pointer to every full bucket in the table. 14 pub struct RawParIter<T> { 15 iter: RawIterRange<T>, 16 } 17 18 impl<T> RawParIter<T> { 19 #[cfg_attr(feature = "inline-more", inline)] iter(&self) -> RawIterRange<T>20 pub(super) unsafe fn iter(&self) -> RawIterRange<T> { 21 self.iter.clone() 22 } 23 } 24 25 impl<T> Clone for RawParIter<T> { 26 #[cfg_attr(feature = "inline-more", inline)] clone(&self) -> Self27 fn clone(&self) -> Self { 28 Self { 29 iter: self.iter.clone(), 30 } 31 } 32 } 33 34 impl<T> From<RawIter<T>> for RawParIter<T> { from(it: RawIter<T>) -> Self35 fn from(it: RawIter<T>) -> Self { 36 RawParIter { iter: it.iter } 37 } 38 } 39 40 impl<T> ParallelIterator for RawParIter<T> { 41 type Item = Bucket<T>; 42 43 #[cfg_attr(feature = "inline-more", inline)] drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,44 fn drive_unindexed<C>(self, consumer: C) -> C::Result 45 where 46 C: UnindexedConsumer<Self::Item>, 47 { 48 let producer = ParIterProducer { iter: self.iter }; 49 plumbing::bridge_unindexed(producer, consumer) 50 } 51 } 52 53 /// Producer which returns a `Bucket<T>` for every element. 54 struct ParIterProducer<T> { 55 iter: RawIterRange<T>, 56 } 57 58 impl<T> UnindexedProducer for ParIterProducer<T> { 59 type Item = Bucket<T>; 60 61 #[cfg_attr(feature = "inline-more", inline)] split(self) -> (Self, Option<Self>)62 fn split(self) -> (Self, Option<Self>) { 63 let (left, right) = self.iter.split(); 64 let left = ParIterProducer { iter: left }; 65 let right = right.map(|right| ParIterProducer { iter: right }); 66 (left, right) 67 } 68 69 #[cfg_attr(feature = "inline-more", inline)] fold_with<F>(self, folder: F) -> F where F: Folder<Self::Item>,70 fn fold_with<F>(self, folder: F) -> F 71 where 72 F: Folder<Self::Item>, 73 { 74 folder.consume_iter(self.iter) 75 } 76 } 77 78 /// Parallel iterator which consumes a table and returns elements. 79 pub struct RawIntoParIter<T, A: Allocator + Clone = Global> { 80 table: RawTable<T, A>, 81 } 82 83 impl<T, A: Allocator + Clone> RawIntoParIter<T, A> { 84 #[cfg_attr(feature = "inline-more", inline)] par_iter(&self) -> RawParIter<T>85 pub(super) unsafe fn par_iter(&self) -> RawParIter<T> { 86 self.table.par_iter() 87 } 88 } 89 90 impl<T: Send, A: Allocator + Clone> ParallelIterator for RawIntoParIter<T, A> { 91 type Item = T; 92 93 #[cfg_attr(feature = "inline-more", inline)] drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,94 fn drive_unindexed<C>(self, consumer: C) -> C::Result 95 where 96 C: UnindexedConsumer<Self::Item>, 97 { 98 let iter = unsafe { self.table.iter().iter }; 99 let _guard = guard(self.table.into_allocation(), |alloc| { 100 if let Some((ptr, layout)) = *alloc { 101 unsafe { 102 dealloc(ptr.as_ptr(), layout); 103 } 104 } 105 }); 106 let producer = ParDrainProducer { iter }; 107 plumbing::bridge_unindexed(producer, consumer) 108 } 109 } 110 111 /// Parallel iterator which consumes elements without freeing the table storage. 112 pub struct RawParDrain<'a, T, A: Allocator + Clone = Global> { 113 // We don't use a &'a mut RawTable<T> because we want RawParDrain to be 114 // covariant over T. 115 table: NonNull<RawTable<T, A>>, 116 marker: PhantomData<&'a RawTable<T, A>>, 117 } 118 119 unsafe impl<T, A: Allocator + Clone> Send for RawParDrain<'_, T, A> {} 120 121 impl<T, A: Allocator + Clone> RawParDrain<'_, T, A> { 122 #[cfg_attr(feature = "inline-more", inline)] par_iter(&self) -> RawParIter<T>123 pub(super) unsafe fn par_iter(&self) -> RawParIter<T> { 124 self.table.as_ref().par_iter() 125 } 126 } 127 128 impl<T: Send, A: Allocator + Clone> ParallelIterator for RawParDrain<'_, T, A> { 129 type Item = T; 130 131 #[cfg_attr(feature = "inline-more", inline)] drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,132 fn drive_unindexed<C>(self, consumer: C) -> C::Result 133 where 134 C: UnindexedConsumer<Self::Item>, 135 { 136 let _guard = guard(self.table, |table| unsafe { 137 table.as_mut().clear_no_drop() 138 }); 139 let iter = unsafe { self.table.as_ref().iter().iter }; 140 mem::forget(self); 141 let producer = ParDrainProducer { iter }; 142 plumbing::bridge_unindexed(producer, consumer) 143 } 144 } 145 146 impl<T, A: Allocator + Clone> Drop for RawParDrain<'_, T, A> { drop(&mut self)147 fn drop(&mut self) { 148 // If drive_unindexed is not called then simply clear the table. 149 unsafe { self.table.as_mut().clear() } 150 } 151 } 152 153 /// Producer which will consume all elements in the range, even if it is dropped 154 /// halfway through. 155 struct ParDrainProducer<T> { 156 iter: RawIterRange<T>, 157 } 158 159 impl<T: Send> UnindexedProducer for ParDrainProducer<T> { 160 type Item = T; 161 162 #[cfg_attr(feature = "inline-more", inline)] split(self) -> (Self, Option<Self>)163 fn split(self) -> (Self, Option<Self>) { 164 let (left, right) = self.iter.clone().split(); 165 mem::forget(self); 166 let left = ParDrainProducer { iter: left }; 167 let right = right.map(|right| ParDrainProducer { iter: right }); 168 (left, right) 169 } 170 171 #[cfg_attr(feature = "inline-more", inline)] fold_with<F>(mut self, mut folder: F) -> F where F: Folder<Self::Item>,172 fn fold_with<F>(mut self, mut folder: F) -> F 173 where 174 F: Folder<Self::Item>, 175 { 176 // Make sure to modify the iterator in-place so that any remaining 177 // elements are processed in our Drop impl. 178 while let Some(item) = self.iter.next() { 179 folder = folder.consume(unsafe { item.read() }); 180 if folder.full() { 181 return folder; 182 } 183 } 184 185 // If we processed all elements then we don't need to run the drop. 186 mem::forget(self); 187 folder 188 } 189 } 190 191 impl<T> Drop for ParDrainProducer<T> { 192 #[cfg_attr(feature = "inline-more", inline)] drop(&mut self)193 fn drop(&mut self) { 194 // Drop all remaining elements 195 if mem::needs_drop::<T>() { 196 while let Some(item) = self.iter.next() { 197 unsafe { 198 item.drop(); 199 } 200 } 201 } 202 } 203 } 204 205 impl<T, A: Allocator + Clone> RawTable<T, A> { 206 /// Returns a parallel iterator over the elements in a `RawTable`. 207 #[cfg_attr(feature = "inline-more", inline)] par_iter(&self) -> RawParIter<T>208 pub unsafe fn par_iter(&self) -> RawParIter<T> { 209 RawParIter { 210 iter: self.iter().iter, 211 } 212 } 213 214 /// Returns a parallel iterator over the elements in a `RawTable`. 215 #[cfg_attr(feature = "inline-more", inline)] into_par_iter(self) -> RawIntoParIter<T, A>216 pub fn into_par_iter(self) -> RawIntoParIter<T, A> { 217 RawIntoParIter { table: self } 218 } 219 220 /// Returns a parallel iterator which consumes all elements of a `RawTable` 221 /// without freeing its memory allocation. 222 #[cfg_attr(feature = "inline-more", inline)] par_drain(&mut self) -> RawParDrain<'_, T, A>223 pub fn par_drain(&mut self) -> RawParDrain<'_, T, A> { 224 RawParDrain { 225 table: NonNull::from(self), 226 marker: PhantomData, 227 } 228 } 229 } 230