• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use super::plumbing::*;
2 use super::*;
3 use std::cell::Cell;
4 use std::sync::atomic::{AtomicUsize, Ordering};
5 
6 #[cfg(test)]
7 mod test;
8 
9 // The key optimization for find_first is that a consumer can stop its search if
10 // some consumer to its left already found a match (and similarly for consumers
11 // to the right for find_last). To make this work, all consumers need some
12 // notion of their position in the data relative to other consumers, including
13 // unindexed consumers that have no built-in notion of position.
14 //
15 // To solve this, we assign each consumer a lower and upper bound for an
16 // imaginary "range" of data that it consumes. The initial consumer starts with
17 // the range 0..usize::max_value(). The split divides this range in half so that
18 // one resulting consumer has the range 0..(usize::max_value() / 2), and the
19 // other has (usize::max_value() / 2)..usize::max_value(). Every subsequent
20 // split divides the range in half again until it cannot be split anymore
21 // (i.e. its length is 1), in which case the split returns two consumers with
22 // the same range. In that case both consumers will continue to consume all
23 // their data regardless of whether a better match is found, but the reducer
24 // will still return the correct answer.
25 
26 #[derive(Copy, Clone)]
27 enum MatchPosition {
28     Leftmost,
29     Rightmost,
30 }
31 
32 /// Returns true if pos1 is a better match than pos2 according to MatchPosition
33 #[inline]
better_position(pos1: usize, pos2: usize, mp: MatchPosition) -> bool34 fn better_position(pos1: usize, pos2: usize, mp: MatchPosition) -> bool {
35     match mp {
36         MatchPosition::Leftmost => pos1 < pos2,
37         MatchPosition::Rightmost => pos1 > pos2,
38     }
39 }
40 
find_first<I, P>(pi: I, find_op: P) -> Option<I::Item> where I: ParallelIterator, P: Fn(&I::Item) -> bool + Sync,41 pub(super) fn find_first<I, P>(pi: I, find_op: P) -> Option<I::Item>
42 where
43     I: ParallelIterator,
44     P: Fn(&I::Item) -> bool + Sync,
45 {
46     let best_found = AtomicUsize::new(usize::max_value());
47     let consumer = FindConsumer::new(&find_op, MatchPosition::Leftmost, &best_found);
48     pi.drive_unindexed(consumer)
49 }
50 
find_last<I, P>(pi: I, find_op: P) -> Option<I::Item> where I: ParallelIterator, P: Fn(&I::Item) -> bool + Sync,51 pub(super) fn find_last<I, P>(pi: I, find_op: P) -> Option<I::Item>
52 where
53     I: ParallelIterator,
54     P: Fn(&I::Item) -> bool + Sync,
55 {
56     let best_found = AtomicUsize::new(0);
57     let consumer = FindConsumer::new(&find_op, MatchPosition::Rightmost, &best_found);
58     pi.drive_unindexed(consumer)
59 }
60 
61 struct FindConsumer<'p, P> {
62     find_op: &'p P,
63     lower_bound: Cell<usize>,
64     upper_bound: usize,
65     match_position: MatchPosition,
66     best_found: &'p AtomicUsize,
67 }
68 
69 impl<'p, P> FindConsumer<'p, P> {
new(find_op: &'p P, match_position: MatchPosition, best_found: &'p AtomicUsize) -> Self70     fn new(find_op: &'p P, match_position: MatchPosition, best_found: &'p AtomicUsize) -> Self {
71         FindConsumer {
72             find_op,
73             lower_bound: Cell::new(0),
74             upper_bound: usize::max_value(),
75             match_position,
76             best_found,
77         }
78     }
79 
current_index(&self) -> usize80     fn current_index(&self) -> usize {
81         match self.match_position {
82             MatchPosition::Leftmost => self.lower_bound.get(),
83             MatchPosition::Rightmost => self.upper_bound,
84         }
85     }
86 }
87 
88 impl<'p, T, P> Consumer<T> for FindConsumer<'p, P>
89 where
90     T: Send,
91     P: Fn(&T) -> bool + Sync,
92 {
93     type Folder = FindFolder<'p, T, P>;
94     type Reducer = FindReducer;
95     type Result = Option<T>;
96 
split_at(self, _index: usize) -> (Self, Self, Self::Reducer)97     fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
98         let dir = self.match_position;
99         (
100             self.split_off_left(),
101             self,
102             FindReducer {
103                 match_position: dir,
104             },
105         )
106     }
107 
into_folder(self) -> Self::Folder108     fn into_folder(self) -> Self::Folder {
109         FindFolder {
110             find_op: self.find_op,
111             boundary: self.current_index(),
112             match_position: self.match_position,
113             best_found: self.best_found,
114             item: None,
115         }
116     }
117 
full(&self) -> bool118     fn full(&self) -> bool {
119         // can stop consuming if the best found index so far is *strictly*
120         // better than anything this consumer will find
121         better_position(
122             self.best_found.load(Ordering::Relaxed),
123             self.current_index(),
124             self.match_position,
125         )
126     }
127 }
128 
129 impl<'p, T, P> UnindexedConsumer<T> for FindConsumer<'p, P>
130 where
131     T: Send,
132     P: Fn(&T) -> bool + Sync,
133 {
split_off_left(&self) -> Self134     fn split_off_left(&self) -> Self {
135         // Upper bound for one consumer will be lower bound for the other. This
136         // overlap is okay, because only one of the bounds will be used for
137         // comparing against best_found; the other is kept only to be able to
138         // divide the range in half.
139         //
140         // When the resolution of usize has been exhausted (i.e. when
141         // upper_bound = lower_bound), both results of this split will have the
142         // same range. When that happens, we lose the ability to tell one
143         // consumer to stop working when the other finds a better match, but the
144         // reducer ensures that the best answer is still returned (see the test
145         // above).
146         let old_lower_bound = self.lower_bound.get();
147         let median = old_lower_bound + ((self.upper_bound - old_lower_bound) / 2);
148         self.lower_bound.set(median);
149 
150         FindConsumer {
151             find_op: self.find_op,
152             lower_bound: Cell::new(old_lower_bound),
153             upper_bound: median,
154             match_position: self.match_position,
155             best_found: self.best_found,
156         }
157     }
158 
to_reducer(&self) -> Self::Reducer159     fn to_reducer(&self) -> Self::Reducer {
160         FindReducer {
161             match_position: self.match_position,
162         }
163     }
164 }
165 
166 struct FindFolder<'p, T, P> {
167     find_op: &'p P,
168     boundary: usize,
169     match_position: MatchPosition,
170     best_found: &'p AtomicUsize,
171     item: Option<T>,
172 }
173 
174 impl<'p, P: 'p + Fn(&T) -> bool, T> Folder<T> for FindFolder<'p, T, P> {
175     type Result = Option<T>;
176 
consume(mut self, item: T) -> Self177     fn consume(mut self, item: T) -> Self {
178         let found_best_in_range = match self.match_position {
179             MatchPosition::Leftmost => self.item.is_some(),
180             MatchPosition::Rightmost => false,
181         };
182 
183         if !found_best_in_range && (self.find_op)(&item) {
184             // Continuously try to set best_found until we succeed or we
185             // discover a better match was already found.
186             let mut current = self.best_found.load(Ordering::Relaxed);
187             loop {
188                 if better_position(current, self.boundary, self.match_position) {
189                     break;
190                 }
191                 match self.best_found.compare_exchange_weak(
192                     current,
193                     self.boundary,
194                     Ordering::Relaxed,
195                     Ordering::Relaxed,
196                 ) {
197                     Ok(_) => {
198                         self.item = Some(item);
199                         break;
200                     }
201                     Err(v) => current = v,
202                 }
203             }
204         }
205         self
206     }
207 
complete(self) -> Self::Result208     fn complete(self) -> Self::Result {
209         self.item
210     }
211 
full(&self) -> bool212     fn full(&self) -> bool {
213         let found_best_in_range = match self.match_position {
214             MatchPosition::Leftmost => self.item.is_some(),
215             MatchPosition::Rightmost => false,
216         };
217 
218         found_best_in_range
219             || better_position(
220                 self.best_found.load(Ordering::Relaxed),
221                 self.boundary,
222                 self.match_position,
223             )
224     }
225 }
226 
227 struct FindReducer {
228     match_position: MatchPosition,
229 }
230 
231 impl<T> Reducer<Option<T>> for FindReducer {
reduce(self, left: Option<T>, right: Option<T>) -> Option<T>232     fn reduce(self, left: Option<T>, right: Option<T>) -> Option<T> {
233         match self.match_position {
234             MatchPosition::Leftmost => left.or(right),
235             MatchPosition::Rightmost => right.or(left),
236         }
237     }
238 }
239