1 use super::plumbing::*;
2 use super::ParallelIterator;
3 use super::Try;
4
5 use std::ops::ControlFlow::{self, Break, Continue};
6 use std::sync::atomic::{AtomicBool, Ordering};
7
try_reduce<PI, R, ID, T>(pi: PI, identity: ID, reduce_op: R) -> T where PI: ParallelIterator<Item = T>, R: Fn(T::Output, T::Output) -> T + Sync, ID: Fn() -> T::Output + Sync, T: Try + Send,8 pub(super) fn try_reduce<PI, R, ID, T>(pi: PI, identity: ID, reduce_op: R) -> T
9 where
10 PI: ParallelIterator<Item = T>,
11 R: Fn(T::Output, T::Output) -> T + Sync,
12 ID: Fn() -> T::Output + Sync,
13 T: Try + Send,
14 {
15 let full = AtomicBool::new(false);
16 let consumer = TryReduceConsumer {
17 identity: &identity,
18 reduce_op: &reduce_op,
19 full: &full,
20 };
21 pi.drive_unindexed(consumer)
22 }
23
24 struct TryReduceConsumer<'r, R, ID> {
25 identity: &'r ID,
26 reduce_op: &'r R,
27 full: &'r AtomicBool,
28 }
29
30 impl<'r, R, ID> Copy for TryReduceConsumer<'r, R, ID> {}
31
32 impl<'r, R, ID> Clone for TryReduceConsumer<'r, R, ID> {
clone(&self) -> Self33 fn clone(&self) -> Self {
34 *self
35 }
36 }
37
38 impl<'r, R, ID, T> Consumer<T> for TryReduceConsumer<'r, R, ID>
39 where
40 R: Fn(T::Output, T::Output) -> T + Sync,
41 ID: Fn() -> T::Output + Sync,
42 T: Try + Send,
43 {
44 type Folder = TryReduceFolder<'r, R, T>;
45 type Reducer = Self;
46 type Result = T;
47
split_at(self, _index: usize) -> (Self, Self, Self)48 fn split_at(self, _index: usize) -> (Self, Self, Self) {
49 (self, self, self)
50 }
51
into_folder(self) -> Self::Folder52 fn into_folder(self) -> Self::Folder {
53 TryReduceFolder {
54 reduce_op: self.reduce_op,
55 control: Continue((self.identity)()),
56 full: self.full,
57 }
58 }
59
full(&self) -> bool60 fn full(&self) -> bool {
61 self.full.load(Ordering::Relaxed)
62 }
63 }
64
65 impl<'r, R, ID, T> UnindexedConsumer<T> for TryReduceConsumer<'r, R, ID>
66 where
67 R: Fn(T::Output, T::Output) -> T + Sync,
68 ID: Fn() -> T::Output + Sync,
69 T: Try + Send,
70 {
split_off_left(&self) -> Self71 fn split_off_left(&self) -> Self {
72 *self
73 }
74
to_reducer(&self) -> Self::Reducer75 fn to_reducer(&self) -> Self::Reducer {
76 *self
77 }
78 }
79
80 impl<'r, R, ID, T> Reducer<T> for TryReduceConsumer<'r, R, ID>
81 where
82 R: Fn(T::Output, T::Output) -> T + Sync,
83 T: Try,
84 {
reduce(self, left: T, right: T) -> T85 fn reduce(self, left: T, right: T) -> T {
86 match (left.branch(), right.branch()) {
87 (Continue(left), Continue(right)) => (self.reduce_op)(left, right),
88 (Break(r), _) | (_, Break(r)) => T::from_residual(r),
89 }
90 }
91 }
92
93 struct TryReduceFolder<'r, R, T: Try> {
94 reduce_op: &'r R,
95 control: ControlFlow<T::Residual, T::Output>,
96 full: &'r AtomicBool,
97 }
98
99 impl<'r, R, T> Folder<T> for TryReduceFolder<'r, R, T>
100 where
101 R: Fn(T::Output, T::Output) -> T,
102 T: Try,
103 {
104 type Result = T;
105
consume(mut self, item: T) -> Self106 fn consume(mut self, item: T) -> Self {
107 let reduce_op = self.reduce_op;
108 self.control = match (self.control, item.branch()) {
109 (Continue(left), Continue(right)) => reduce_op(left, right).branch(),
110 (control @ Break(_), _) | (_, control @ Break(_)) => control,
111 };
112 if let Break(_) = self.control {
113 self.full.store(true, Ordering::Relaxed);
114 }
115 self
116 }
117
complete(self) -> T118 fn complete(self) -> T {
119 match self.control {
120 Continue(c) => T::from_output(c),
121 Break(r) => T::from_residual(r),
122 }
123 }
124
full(&self) -> bool125 fn full(&self) -> bool {
126 match self.control {
127 Break(_) => true,
128 _ => self.full.load(Ordering::Relaxed),
129 }
130 }
131 }
132