• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use super::plumbing::*;
2 use super::ParallelIterator;
3 use super::Try;
4 
5 use std::ops::ControlFlow::{self, Break, Continue};
6 use std::sync::atomic::{AtomicBool, Ordering};
7 
try_reduce<PI, R, ID, T>(pi: PI, identity: ID, reduce_op: R) -> T where PI: ParallelIterator<Item = T>, R: Fn(T::Output, T::Output) -> T + Sync, ID: Fn() -> T::Output + Sync, T: Try + Send,8 pub(super) fn try_reduce<PI, R, ID, T>(pi: PI, identity: ID, reduce_op: R) -> T
9 where
10     PI: ParallelIterator<Item = T>,
11     R: Fn(T::Output, T::Output) -> T + Sync,
12     ID: Fn() -> T::Output + Sync,
13     T: Try + Send,
14 {
15     let full = AtomicBool::new(false);
16     let consumer = TryReduceConsumer {
17         identity: &identity,
18         reduce_op: &reduce_op,
19         full: &full,
20     };
21     pi.drive_unindexed(consumer)
22 }
23 
24 struct TryReduceConsumer<'r, R, ID> {
25     identity: &'r ID,
26     reduce_op: &'r R,
27     full: &'r AtomicBool,
28 }
29 
30 impl<'r, R, ID> Copy for TryReduceConsumer<'r, R, ID> {}
31 
32 impl<'r, R, ID> Clone for TryReduceConsumer<'r, R, ID> {
clone(&self) -> Self33     fn clone(&self) -> Self {
34         *self
35     }
36 }
37 
38 impl<'r, R, ID, T> Consumer<T> for TryReduceConsumer<'r, R, ID>
39 where
40     R: Fn(T::Output, T::Output) -> T + Sync,
41     ID: Fn() -> T::Output + Sync,
42     T: Try + Send,
43 {
44     type Folder = TryReduceFolder<'r, R, T>;
45     type Reducer = Self;
46     type Result = T;
47 
split_at(self, _index: usize) -> (Self, Self, Self)48     fn split_at(self, _index: usize) -> (Self, Self, Self) {
49         (self, self, self)
50     }
51 
into_folder(self) -> Self::Folder52     fn into_folder(self) -> Self::Folder {
53         TryReduceFolder {
54             reduce_op: self.reduce_op,
55             control: Continue((self.identity)()),
56             full: self.full,
57         }
58     }
59 
full(&self) -> bool60     fn full(&self) -> bool {
61         self.full.load(Ordering::Relaxed)
62     }
63 }
64 
65 impl<'r, R, ID, T> UnindexedConsumer<T> for TryReduceConsumer<'r, R, ID>
66 where
67     R: Fn(T::Output, T::Output) -> T + Sync,
68     ID: Fn() -> T::Output + Sync,
69     T: Try + Send,
70 {
split_off_left(&self) -> Self71     fn split_off_left(&self) -> Self {
72         *self
73     }
74 
to_reducer(&self) -> Self::Reducer75     fn to_reducer(&self) -> Self::Reducer {
76         *self
77     }
78 }
79 
80 impl<'r, R, ID, T> Reducer<T> for TryReduceConsumer<'r, R, ID>
81 where
82     R: Fn(T::Output, T::Output) -> T + Sync,
83     T: Try,
84 {
reduce(self, left: T, right: T) -> T85     fn reduce(self, left: T, right: T) -> T {
86         match (left.branch(), right.branch()) {
87             (Continue(left), Continue(right)) => (self.reduce_op)(left, right),
88             (Break(r), _) | (_, Break(r)) => T::from_residual(r),
89         }
90     }
91 }
92 
93 struct TryReduceFolder<'r, R, T: Try> {
94     reduce_op: &'r R,
95     control: ControlFlow<T::Residual, T::Output>,
96     full: &'r AtomicBool,
97 }
98 
99 impl<'r, R, T> Folder<T> for TryReduceFolder<'r, R, T>
100 where
101     R: Fn(T::Output, T::Output) -> T,
102     T: Try,
103 {
104     type Result = T;
105 
consume(mut self, item: T) -> Self106     fn consume(mut self, item: T) -> Self {
107         let reduce_op = self.reduce_op;
108         self.control = match (self.control, item.branch()) {
109             (Continue(left), Continue(right)) => reduce_op(left, right).branch(),
110             (control @ Break(_), _) | (_, control @ Break(_)) => control,
111         };
112         if let Break(_) = self.control {
113             self.full.store(true, Ordering::Relaxed);
114         }
115         self
116     }
117 
complete(self) -> T118     fn complete(self) -> T {
119         match self.control {
120             Continue(c) => T::from_output(c),
121             Break(r) => T::from_residual(r),
122         }
123     }
124 
full(&self) -> bool125     fn full(&self) -> bool {
126         match self.control {
127             Break(_) => true,
128             _ => self.full.load(Ordering::Relaxed),
129         }
130     }
131 }
132