1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use super::{Consumer, ParallelIterator};
15 use crate::error::ScheduleError;
16
for_each<P, F>(par_iter: P, f: F) -> Result<(), ScheduleError> where P: ParallelIterator + Send, F: Fn(P::Item) + Copy + Sync + Send,17 pub async fn for_each<P, F>(par_iter: P, f: F) -> Result<(), ScheduleError>
18 where
19 P: ParallelIterator + Send,
20 F: Fn(P::Item) + Copy + Sync + Send,
21 {
22 let consumer = ForEachConsumer::new(f);
23 par_iter.drive(consumer).await
24 }
25
26 pub struct ForEachConsumer<F> {
27 f: F,
28 }
29
30 impl<F> ForEachConsumer<F> {
new(f: F) -> Self31 fn new(f: F) -> Self {
32 Self { f }
33 }
34 }
35
36 impl<F, P> Consumer<P> for ForEachConsumer<F>
37 where
38 P: ParallelIterator,
39 F: Fn(P::Item),
40 {
41 type Output = ();
consume(&self, par_iter: P) -> Self::Output42 fn consume(&self, par_iter: P) -> Self::Output {
43 par_iter.iter().for_each(&self.f)
44 }
45
combine(_a: Self::Output, _b: Self::Output) -> Self::Output46 fn combine(_a: Self::Output, _b: Self::Output) -> Self::Output {}
47 }
48