1 use super::plumbing::*;
2 use super::*;
3 use std::cell::Cell;
4 use std::sync::atomic::{AtomicUsize, Ordering};
5
6 #[cfg(test)]
7 mod test;
8
9 // The key optimization for find_first is that a consumer can stop its search if
10 // some consumer to its left already found a match (and similarly for consumers
11 // to the right for find_last). To make this work, all consumers need some
12 // notion of their position in the data relative to other consumers, including
13 // unindexed consumers that have no built-in notion of position.
14 //
15 // To solve this, we assign each consumer a lower and upper bound for an
16 // imaginary "range" of data that it consumes. The initial consumer starts with
17 // the range 0..usize::max_value(). The split divides this range in half so that
18 // one resulting consumer has the range 0..(usize::max_value() / 2), and the
19 // other has (usize::max_value() / 2)..usize::max_value(). Every subsequent
20 // split divides the range in half again until it cannot be split anymore
21 // (i.e. its length is 1), in which case the split returns two consumers with
22 // the same range. In that case both consumers will continue to consume all
23 // their data regardless of whether a better match is found, but the reducer
24 // will still return the correct answer.
25
26 #[derive(Copy, Clone)]
27 enum MatchPosition {
28 Leftmost,
29 Rightmost,
30 }
31
32 /// Returns true if pos1 is a better match than pos2 according to MatchPosition
33 #[inline]
better_position(pos1: usize, pos2: usize, mp: MatchPosition) -> bool34 fn better_position(pos1: usize, pos2: usize, mp: MatchPosition) -> bool {
35 match mp {
36 MatchPosition::Leftmost => pos1 < pos2,
37 MatchPosition::Rightmost => pos1 > pos2,
38 }
39 }
40
find_first<I, P>(pi: I, find_op: P) -> Option<I::Item> where I: ParallelIterator, P: Fn(&I::Item) -> bool + Sync,41 pub(super) fn find_first<I, P>(pi: I, find_op: P) -> Option<I::Item>
42 where
43 I: ParallelIterator,
44 P: Fn(&I::Item) -> bool + Sync,
45 {
46 let best_found = AtomicUsize::new(usize::max_value());
47 let consumer = FindConsumer::new(&find_op, MatchPosition::Leftmost, &best_found);
48 pi.drive_unindexed(consumer)
49 }
50
find_last<I, P>(pi: I, find_op: P) -> Option<I::Item> where I: ParallelIterator, P: Fn(&I::Item) -> bool + Sync,51 pub(super) fn find_last<I, P>(pi: I, find_op: P) -> Option<I::Item>
52 where
53 I: ParallelIterator,
54 P: Fn(&I::Item) -> bool + Sync,
55 {
56 let best_found = AtomicUsize::new(0);
57 let consumer = FindConsumer::new(&find_op, MatchPosition::Rightmost, &best_found);
58 pi.drive_unindexed(consumer)
59 }
60
61 struct FindConsumer<'p, P> {
62 find_op: &'p P,
63 lower_bound: Cell<usize>,
64 upper_bound: usize,
65 match_position: MatchPosition,
66 best_found: &'p AtomicUsize,
67 }
68
69 impl<'p, P> FindConsumer<'p, P> {
new(find_op: &'p P, match_position: MatchPosition, best_found: &'p AtomicUsize) -> Self70 fn new(find_op: &'p P, match_position: MatchPosition, best_found: &'p AtomicUsize) -> Self {
71 FindConsumer {
72 find_op,
73 lower_bound: Cell::new(0),
74 upper_bound: usize::max_value(),
75 match_position,
76 best_found,
77 }
78 }
79
current_index(&self) -> usize80 fn current_index(&self) -> usize {
81 match self.match_position {
82 MatchPosition::Leftmost => self.lower_bound.get(),
83 MatchPosition::Rightmost => self.upper_bound,
84 }
85 }
86 }
87
88 impl<'p, T, P> Consumer<T> for FindConsumer<'p, P>
89 where
90 T: Send,
91 P: Fn(&T) -> bool + Sync,
92 {
93 type Folder = FindFolder<'p, T, P>;
94 type Reducer = FindReducer;
95 type Result = Option<T>;
96
split_at(self, _index: usize) -> (Self, Self, Self::Reducer)97 fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
98 let dir = self.match_position;
99 (
100 self.split_off_left(),
101 self,
102 FindReducer {
103 match_position: dir,
104 },
105 )
106 }
107
into_folder(self) -> Self::Folder108 fn into_folder(self) -> Self::Folder {
109 FindFolder {
110 find_op: self.find_op,
111 boundary: self.current_index(),
112 match_position: self.match_position,
113 best_found: self.best_found,
114 item: None,
115 }
116 }
117
full(&self) -> bool118 fn full(&self) -> bool {
119 // can stop consuming if the best found index so far is *strictly*
120 // better than anything this consumer will find
121 better_position(
122 self.best_found.load(Ordering::Relaxed),
123 self.current_index(),
124 self.match_position,
125 )
126 }
127 }
128
129 impl<'p, T, P> UnindexedConsumer<T> for FindConsumer<'p, P>
130 where
131 T: Send,
132 P: Fn(&T) -> bool + Sync,
133 {
split_off_left(&self) -> Self134 fn split_off_left(&self) -> Self {
135 // Upper bound for one consumer will be lower bound for the other. This
136 // overlap is okay, because only one of the bounds will be used for
137 // comparing against best_found; the other is kept only to be able to
138 // divide the range in half.
139 //
140 // When the resolution of usize has been exhausted (i.e. when
141 // upper_bound = lower_bound), both results of this split will have the
142 // same range. When that happens, we lose the ability to tell one
143 // consumer to stop working when the other finds a better match, but the
144 // reducer ensures that the best answer is still returned (see the test
145 // above).
146 let old_lower_bound = self.lower_bound.get();
147 let median = old_lower_bound + ((self.upper_bound - old_lower_bound) / 2);
148 self.lower_bound.set(median);
149
150 FindConsumer {
151 find_op: self.find_op,
152 lower_bound: Cell::new(old_lower_bound),
153 upper_bound: median,
154 match_position: self.match_position,
155 best_found: self.best_found,
156 }
157 }
158
to_reducer(&self) -> Self::Reducer159 fn to_reducer(&self) -> Self::Reducer {
160 FindReducer {
161 match_position: self.match_position,
162 }
163 }
164 }
165
166 struct FindFolder<'p, T, P> {
167 find_op: &'p P,
168 boundary: usize,
169 match_position: MatchPosition,
170 best_found: &'p AtomicUsize,
171 item: Option<T>,
172 }
173
174 impl<'p, P: 'p + Fn(&T) -> bool, T> Folder<T> for FindFolder<'p, T, P> {
175 type Result = Option<T>;
176
consume(mut self, item: T) -> Self177 fn consume(mut self, item: T) -> Self {
178 let found_best_in_range = match self.match_position {
179 MatchPosition::Leftmost => self.item.is_some(),
180 MatchPosition::Rightmost => false,
181 };
182
183 if !found_best_in_range && (self.find_op)(&item) {
184 // Continuously try to set best_found until we succeed or we
185 // discover a better match was already found.
186 let mut current = self.best_found.load(Ordering::Relaxed);
187 loop {
188 if better_position(current, self.boundary, self.match_position) {
189 break;
190 }
191 match self.best_found.compare_exchange_weak(
192 current,
193 self.boundary,
194 Ordering::Relaxed,
195 Ordering::Relaxed,
196 ) {
197 Ok(_) => {
198 self.item = Some(item);
199 break;
200 }
201 Err(v) => current = v,
202 }
203 }
204 }
205 self
206 }
207
complete(self) -> Self::Result208 fn complete(self) -> Self::Result {
209 self.item
210 }
211
full(&self) -> bool212 fn full(&self) -> bool {
213 let found_best_in_range = match self.match_position {
214 MatchPosition::Leftmost => self.item.is_some(),
215 MatchPosition::Rightmost => false,
216 };
217
218 found_best_in_range
219 || better_position(
220 self.best_found.load(Ordering::Relaxed),
221 self.boundary,
222 self.match_position,
223 )
224 }
225 }
226
227 struct FindReducer {
228 match_position: MatchPosition,
229 }
230
231 impl<T> Reducer<Option<T>> for FindReducer {
reduce(self, left: Option<T>, right: Option<T>) -> Option<T>232 fn reduce(self, left: Option<T>, right: Option<T>) -> Option<T> {
233 match self.match_position {
234 MatchPosition::Leftmost => left.or(right),
235 MatchPosition::Rightmost => right.or(left),
236 }
237 }
238 }
239