• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::iter::Sum;
15 use std::ops::Add;
16 use std::pin::Pin;
17 
18 use super::core::core;
19 use crate::error::ScheduleError;
20 
21 mod map;
22 use map::Map;
23 
24 mod filter;
25 use filter::Filter;
26 
27 mod zip;
28 use zip::Zip;
29 
30 mod for_each;
31 
32 mod sum;
33 /// A parallel version of the standard iterator trait.
34 pub trait ParallelIterator: Sized + Send {
35     /// The type of the elements the parallel iterator has.
36     type Item;
37 
38     /// The type of the std iterator that the parallel iterator generate.
39     type Iter: Iterator<Item = Self::Item>;
40 
41     /// Splits one parallel iterator into two parts,
split(self) -> (Self, Option<Self>)42     fn split(self) -> (Self, Option<Self>);
43 
44     /// Returns the number of elements in this parallel iterator.
len(&self) -> usize45     fn len(&self) -> usize;
46 
47     /// Removes redundant elements, used in zip function, so that the two parts
48     /// has the same length.
reduce(self, len: usize) -> Self49     fn reduce(self, len: usize) -> Self;
50 
51     /// Returns the std iterator.
iter(self) -> Self::Iter52     fn iter(self) -> Self::Iter;
53 
54     /// Returns true if the parallel iterator data has a length of 0
is_empty(&self) -> bool55     fn is_empty(&self) -> bool {
56         self.len() == 0
57     }
58     /// cache a map_op in the parallel iterator
map<F, B>(self, map_op: F) -> Map<Self, F> where F: Fn(Self::Item) -> B,59     fn map<F, B>(self, map_op: F) -> Map<Self, F>
60     where
61         F: Fn(Self::Item) -> B,
62     {
63         map::map(self, map_op)
64     }
65 
66     /// Parallel version of the std iterator filter
filter<F>(self, predicate: F) -> Filter<Self, F> where F: Fn(&Self::Item) -> bool,67     fn filter<F>(self, predicate: F) -> Filter<Self, F>
68     where
69         F: Fn(&Self::Item) -> bool,
70     {
71         filter::filter(self, predicate)
72     }
73 
74     /// Parallel version of the std iterator zip
zip<B>(self, another: B) -> Zip<Self, B> where B: ParallelIterator,75     fn zip<B>(self, another: B) -> Zip<Self, B>
76     where
77         B: ParallelIterator,
78     {
79         zip::zip(self, another)
80     }
81 
82     /// Execute the OP in parallel on each element produced by the parallel
83     /// iterator.
for_each<'a, F>( self, f: F, ) -> Pin<Box<dyn std::future::Future<Output = Result<(), ScheduleError>> + Send + 'a>> where F: Fn(Self::Item) + Send + Copy + Sync + 'a, Self: 'a,84     fn for_each<'a, F>(
85         self,
86         f: F,
87     ) -> Pin<Box<dyn std::future::Future<Output = Result<(), ScheduleError>> + Send + 'a>>
88     where
89         F: Fn(Self::Item) + Send + Copy + Sync + 'a,
90         Self: 'a,
91     {
92         let fut = async move { for_each::for_each(self, f).await };
93         Box::pin(fut)
94     }
95 
96     /// Parallel version of the std iterator sum.
sum<'a>( self, ) -> Pin<Box<dyn std::future::Future<Output = Result<Self::Item, ScheduleError>> + Send + 'a>> where Self: 'a, Self::Item: Add<Output = Self::Item> + Sum + Send,97     fn sum<'a>(
98         self,
99     ) -> Pin<Box<dyn std::future::Future<Output = Result<Self::Item, ScheduleError>> + Send + 'a>>
100     where
101         Self: 'a,
102         Self::Item: Add<Output = Self::Item> + Sum + Send,
103     {
104         let fut = async move { sum::sum(self).await };
105         Box::pin(fut)
106     }
107 
108     /// Consumes the parallel iterator.
drive<'a, C>( self, consumer: C, ) -> Pin<Box<dyn std::future::Future<Output = Result<C::Output, ScheduleError>> + Send + 'a>> where Self: 'a, C: Consumer<Self> + Send + Sync + 'a,109     fn drive<'a, C>(
110         self,
111         consumer: C,
112     ) -> Pin<Box<dyn std::future::Future<Output = Result<C::Output, ScheduleError>> + Send + 'a>>
113     where
114         Self: 'a,
115         C: Consumer<Self> + Send + Sync + 'a,
116     {
117         let fut = async move { core(self, consumer).await };
118         Box::pin(fut)
119     }
120 }
121 
122 /// Consumer that comsume a parallel iterator and returns the result.
123 pub trait Consumer<P: ParallelIterator> {
124     /// Type that the consumer return
125     type Output: Send;
126 
127     /// Consume a parallel iterator
consume(&self, par_iter: P) -> Self::Output128     fn consume(&self, par_iter: P) -> Self::Output;
129 
130     /// Returns the result obtained by merging the two split executions
combine(a: Self::Output, b: Self::Output) -> Self::Output131     fn combine(a: Self::Output, b: Self::Output) -> Self::Output;
132 }
133