1 use super::super::plumbing::*; 2 use std::marker::PhantomData; 3 use std::ptr; 4 use std::slice; 5 6 pub(super) struct CollectConsumer<'c, T: Send> { 7 /// A slice covering the target memory, not yet initialized! 8 target: &'c mut [T], 9 } 10 11 pub(super) struct CollectFolder<'c, T: Send> { 12 /// The folder writes into `result` and must extend the result 13 /// up to exactly this number of elements. 14 final_len: usize, 15 16 /// The current written-to part of our slice of the target 17 result: CollectResult<'c, T>, 18 } 19 20 impl<'c, T: Send + 'c> CollectConsumer<'c, T> { 21 /// The target memory is considered uninitialized, and will be 22 /// overwritten without reading or dropping existing values. new(target: &'c mut [T]) -> Self23 pub(super) fn new(target: &'c mut [T]) -> Self { 24 CollectConsumer { target } 25 } 26 } 27 28 /// CollectResult represents an initialized part of the target slice. 29 /// 30 /// This is a proxy owner of the elements in the slice; when it drops, 31 /// the elements will be dropped, unless its ownership is released before then. 32 #[must_use] 33 pub(super) struct CollectResult<'c, T> { 34 start: *mut T, 35 len: usize, 36 invariant_lifetime: PhantomData<&'c mut &'c mut [T]>, 37 } 38 39 unsafe impl<'c, T> Send for CollectResult<'c, T> where T: Send {} 40 41 impl<'c, T> CollectResult<'c, T> { 42 /// The current length of the collect result len(&self) -> usize43 pub(super) fn len(&self) -> usize { 44 self.len 45 } 46 47 /// Release ownership of the slice of elements, and return the length release_ownership(mut self) -> usize48 pub(super) fn release_ownership(mut self) -> usize { 49 let ret = self.len; 50 self.len = 0; 51 ret 52 } 53 } 54 55 impl<'c, T> Drop for CollectResult<'c, T> { drop(&mut self)56 fn drop(&mut self) { 57 // Drop the first `self.len` elements, which have been recorded 58 // to be initialized by the folder. 59 unsafe { 60 ptr::drop_in_place(slice::from_raw_parts_mut(self.start, self.len)); 61 } 62 } 63 } 64 65 impl<'c, T: Send + 'c> Consumer<T> for CollectConsumer<'c, T> { 66 type Folder = CollectFolder<'c, T>; 67 type Reducer = CollectReducer; 68 type Result = CollectResult<'c, T>; 69 split_at(self, index: usize) -> (Self, Self, CollectReducer)70 fn split_at(self, index: usize) -> (Self, Self, CollectReducer) { 71 let CollectConsumer { target } = self; 72 73 // Produce new consumers. Normal slicing ensures that the 74 // memory range given to each consumer is disjoint. 75 let (left, right) = target.split_at_mut(index); 76 ( 77 CollectConsumer::new(left), 78 CollectConsumer::new(right), 79 CollectReducer, 80 ) 81 } 82 into_folder(self) -> CollectFolder<'c, T>83 fn into_folder(self) -> CollectFolder<'c, T> { 84 // Create a folder that consumes values and writes them 85 // into target. The initial result has length 0. 86 CollectFolder { 87 final_len: self.target.len(), 88 result: CollectResult { 89 start: self.target.as_mut_ptr(), 90 len: 0, 91 invariant_lifetime: PhantomData, 92 }, 93 } 94 } 95 full(&self) -> bool96 fn full(&self) -> bool { 97 false 98 } 99 } 100 101 impl<'c, T: Send + 'c> Folder<T> for CollectFolder<'c, T> { 102 type Result = CollectResult<'c, T>; 103 consume(mut self, item: T) -> CollectFolder<'c, T>104 fn consume(mut self, item: T) -> CollectFolder<'c, T> { 105 if self.result.len >= self.final_len { 106 panic!("too many values pushed to consumer"); 107 } 108 109 // Compute target pointer and write to it, and 110 // extend the current result by one element 111 unsafe { 112 self.result.start.add(self.result.len).write(item); 113 self.result.len += 1; 114 } 115 116 self 117 } 118 complete(self) -> Self::Result119 fn complete(self) -> Self::Result { 120 // NB: We don't explicitly check that the local writes were complete, 121 // but Collect will assert the total result length in the end. 122 self.result 123 } 124 full(&self) -> bool125 fn full(&self) -> bool { 126 false 127 } 128 } 129 130 /// Pretend to be unindexed for `special_collect_into_vec`, 131 /// but we should never actually get used that way... 132 impl<'c, T: Send + 'c> UnindexedConsumer<T> for CollectConsumer<'c, T> { split_off_left(&self) -> Self133 fn split_off_left(&self) -> Self { 134 unreachable!("CollectConsumer must be indexed!") 135 } to_reducer(&self) -> Self::Reducer136 fn to_reducer(&self) -> Self::Reducer { 137 CollectReducer 138 } 139 } 140 141 /// CollectReducer combines adjacent chunks; the result must always 142 /// be contiguous so that it is one combined slice. 143 pub(super) struct CollectReducer; 144 145 impl<'c, T> Reducer<CollectResult<'c, T>> for CollectReducer { reduce( self, mut left: CollectResult<'c, T>, right: CollectResult<'c, T>, ) -> CollectResult<'c, T>146 fn reduce( 147 self, 148 mut left: CollectResult<'c, T>, 149 right: CollectResult<'c, T>, 150 ) -> CollectResult<'c, T> { 151 // Merge if the CollectResults are adjacent and in left to right order 152 // else: drop the right piece now and total length will end up short in the end, 153 // when the correctness of the collected result is asserted. 154 if left.start.wrapping_add(left.len) == right.start { 155 left.len += right.release_ownership(); 156 } 157 left 158 } 159 } 160