1 use super::plumbing::*; 2 use super::*; 3 4 use std::fmt::{self, Debug}; 5 6 /// `FlatMap` maps each element to a parallel iterator, then flattens these iterators together. 7 /// This struct is created by the [`flat_map()`] method on [`ParallelIterator`] 8 /// 9 /// [`flat_map()`]: trait.ParallelIterator.html#method.flat_map 10 /// [`ParallelIterator`]: trait.ParallelIterator.html 11 #[must_use = "iterator adaptors are lazy and do nothing unless consumed"] 12 #[derive(Clone)] 13 pub struct FlatMap<I: ParallelIterator, F> { 14 base: I, 15 map_op: F, 16 } 17 18 impl<I: ParallelIterator + Debug, F> Debug for FlatMap<I, F> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result19 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 20 f.debug_struct("FlatMap").field("base", &self.base).finish() 21 } 22 } 23 24 impl<I: ParallelIterator, F> FlatMap<I, F> { 25 /// Creates a new `FlatMap` iterator. new(base: I, map_op: F) -> Self26 pub(super) fn new(base: I, map_op: F) -> Self { 27 FlatMap { base, map_op } 28 } 29 } 30 31 impl<I, F, PI> ParallelIterator for FlatMap<I, F> 32 where 33 I: ParallelIterator, 34 F: Fn(I::Item) -> PI + Sync + Send, 35 PI: IntoParallelIterator, 36 { 37 type Item = PI::Item; 38 drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,39 fn drive_unindexed<C>(self, consumer: C) -> C::Result 40 where 41 C: UnindexedConsumer<Self::Item>, 42 { 43 let consumer = FlatMapConsumer::new(consumer, &self.map_op); 44 self.base.drive_unindexed(consumer) 45 } 46 } 47 48 /// //////////////////////////////////////////////////////////////////////// 49 /// Consumer implementation 50 51 struct FlatMapConsumer<'f, C, F> { 52 base: C, 53 map_op: &'f F, 54 } 55 56 impl<'f, C, F> FlatMapConsumer<'f, C, F> { new(base: C, map_op: &'f F) -> Self57 fn new(base: C, map_op: &'f F) -> Self { 58 FlatMapConsumer { base, map_op } 59 } 60 } 61 62 impl<'f, T, U, C, F> Consumer<T> for FlatMapConsumer<'f, C, F> 63 where 64 C: UnindexedConsumer<U::Item>, 65 F: Fn(T) -> U + Sync, 66 U: IntoParallelIterator, 67 { 68 type Folder = FlatMapFolder<'f, C, F, C::Result>; 69 type Reducer = C::Reducer; 70 type Result = C::Result; 71 split_at(self, index: usize) -> (Self, Self, C::Reducer)72 fn split_at(self, index: usize) -> (Self, Self, C::Reducer) { 73 let (left, right, reducer) = self.base.split_at(index); 74 ( 75 FlatMapConsumer::new(left, self.map_op), 76 FlatMapConsumer::new(right, self.map_op), 77 reducer, 78 ) 79 } 80 into_folder(self) -> Self::Folder81 fn into_folder(self) -> Self::Folder { 82 FlatMapFolder { 83 base: self.base, 84 map_op: self.map_op, 85 previous: None, 86 } 87 } 88 full(&self) -> bool89 fn full(&self) -> bool { 90 self.base.full() 91 } 92 } 93 94 impl<'f, T, U, C, F> UnindexedConsumer<T> for FlatMapConsumer<'f, C, F> 95 where 96 C: UnindexedConsumer<U::Item>, 97 F: Fn(T) -> U + Sync, 98 U: IntoParallelIterator, 99 { split_off_left(&self) -> Self100 fn split_off_left(&self) -> Self { 101 FlatMapConsumer::new(self.base.split_off_left(), self.map_op) 102 } 103 to_reducer(&self) -> Self::Reducer104 fn to_reducer(&self) -> Self::Reducer { 105 self.base.to_reducer() 106 } 107 } 108 109 struct FlatMapFolder<'f, C, F, R> { 110 base: C, 111 map_op: &'f F, 112 previous: Option<R>, 113 } 114 115 impl<'f, T, U, C, F> Folder<T> for FlatMapFolder<'f, C, F, C::Result> 116 where 117 C: UnindexedConsumer<U::Item>, 118 F: Fn(T) -> U + Sync, 119 U: IntoParallelIterator, 120 { 121 type Result = C::Result; 122 consume(self, item: T) -> Self123 fn consume(self, item: T) -> Self { 124 let map_op = self.map_op; 125 let par_iter = map_op(item).into_par_iter(); 126 let consumer = self.base.split_off_left(); 127 let result = par_iter.drive_unindexed(consumer); 128 129 let previous = match self.previous { 130 None => Some(result), 131 Some(previous) => { 132 let reducer = self.base.to_reducer(); 133 Some(reducer.reduce(previous, result)) 134 } 135 }; 136 137 FlatMapFolder { 138 base: self.base, 139 map_op, 140 previous, 141 } 142 } 143 complete(self) -> Self::Result144 fn complete(self) -> Self::Result { 145 match self.previous { 146 Some(previous) => previous, 147 None => self.base.into_folder().complete(), 148 } 149 } 150 full(&self) -> bool151 fn full(&self) -> bool { 152 self.base.full() 153 } 154 } 155