1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::iter::Sum; 15 use std::ops::Add; 16 use std::pin::Pin; 17 18 use super::core::core; 19 use crate::error::ScheduleError; 20 21 mod map; 22 use map::Map; 23 24 mod filter; 25 use filter::Filter; 26 27 mod zip; 28 use zip::Zip; 29 30 mod for_each; 31 32 mod sum; 33 /// A parallel version of the standard iterator trait. 34 pub trait ParallelIterator: Sized + Send { 35 /// The type of the elements the parallel iterator has. 36 type Item; 37 38 /// The type of the std iterator that the parallel iterator generate. 39 type Iter: Iterator<Item = Self::Item>; 40 41 /// Splits one parallel iterator into two parts, split(self) -> (Self, Option<Self>)42 fn split(self) -> (Self, Option<Self>); 43 44 /// Returns the number of elements in this parallel iterator. len(&self) -> usize45 fn len(&self) -> usize; 46 47 /// Removes redundant elements, used in zip function, so that the two parts 48 /// has the same length. reduce(self, len: usize) -> Self49 fn reduce(self, len: usize) -> Self; 50 51 /// Returns the std iterator. iter(self) -> Self::Iter52 fn iter(self) -> Self::Iter; 53 54 /// Returns true if the parallel iterator data has a length of 0 is_empty(&self) -> bool55 fn is_empty(&self) -> bool { 56 self.len() == 0 57 } 58 /// cache a map_op in the parallel iterator map<F, B>(self, map_op: F) -> Map<Self, F> where F: Fn(Self::Item) -> B,59 fn map<F, B>(self, map_op: F) -> Map<Self, F> 60 where 61 F: Fn(Self::Item) -> B, 62 { 63 map::map(self, map_op) 64 } 65 66 /// Parallel version of the std iterator filter filter<F>(self, predicate: F) -> Filter<Self, F> where F: Fn(&Self::Item) -> bool,67 fn filter<F>(self, predicate: F) -> Filter<Self, F> 68 where 69 F: Fn(&Self::Item) -> bool, 70 { 71 filter::filter(self, predicate) 72 } 73 74 /// Parallel version of the std iterator zip zip<B>(self, another: B) -> Zip<Self, B> where B: ParallelIterator,75 fn zip<B>(self, another: B) -> Zip<Self, B> 76 where 77 B: ParallelIterator, 78 { 79 zip::zip(self, another) 80 } 81 82 /// Execute the OP in parallel on each element produced by the parallel 83 /// iterator. for_each<'a, F>( self, f: F, ) -> Pin<Box<dyn std::future::Future<Output = Result<(), ScheduleError>> + Send + 'a>> where F: Fn(Self::Item) + Send + Copy + Sync + 'a, Self: 'a,84 fn for_each<'a, F>( 85 self, 86 f: F, 87 ) -> Pin<Box<dyn std::future::Future<Output = Result<(), ScheduleError>> + Send + 'a>> 88 where 89 F: Fn(Self::Item) + Send + Copy + Sync + 'a, 90 Self: 'a, 91 { 92 let fut = async move { for_each::for_each(self, f).await }; 93 Box::pin(fut) 94 } 95 96 /// Parallel version of the std iterator sum. sum<'a>( self, ) -> Pin<Box<dyn std::future::Future<Output = Result<Self::Item, ScheduleError>> + Send + 'a>> where Self: 'a, Self::Item: Add<Output = Self::Item> + Sum + Send,97 fn sum<'a>( 98 self, 99 ) -> Pin<Box<dyn std::future::Future<Output = Result<Self::Item, ScheduleError>> + Send + 'a>> 100 where 101 Self: 'a, 102 Self::Item: Add<Output = Self::Item> + Sum + Send, 103 { 104 let fut = async move { sum::sum(self).await }; 105 Box::pin(fut) 106 } 107 108 /// Consumes the parallel iterator. drive<'a, C>( self, consumer: C, ) -> Pin<Box<dyn std::future::Future<Output = Result<C::Output, ScheduleError>> + Send + 'a>> where Self: 'a, C: Consumer<Self> + Send + Sync + 'a,109 fn drive<'a, C>( 110 self, 111 consumer: C, 112 ) -> Pin<Box<dyn std::future::Future<Output = Result<C::Output, ScheduleError>> + Send + 'a>> 113 where 114 Self: 'a, 115 C: Consumer<Self> + Send + Sync + 'a, 116 { 117 let fut = async move { core(self, consumer).await }; 118 Box::pin(fut) 119 } 120 } 121 122 /// Consumer that comsume a parallel iterator and returns the result. 123 pub trait Consumer<P: ParallelIterator> { 124 /// Type that the consumer return 125 type Output: Send; 126 127 /// Consume a parallel iterator consume(&self, par_iter: P) -> Self::Output128 fn consume(&self, par_iter: P) -> Self::Output; 129 130 /// Returns the result obtained by merging the two split executions combine(a: Self::Output, b: Self::Output) -> Self::Output131 fn combine(a: Self::Output, b: Self::Output) -> Self::Output; 132 } 133