1 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; 2 use std::sync::Mutex; 3 4 use crate::iter::plumbing::{bridge_unindexed, Folder, UnindexedConsumer, UnindexedProducer}; 5 use crate::iter::ParallelIterator; 6 use crate::{current_num_threads, current_thread_index}; 7 8 /// Conversion trait to convert an `Iterator` to a `ParallelIterator`. 9 /// 10 /// This creates a "bridge" from a sequential iterator to a parallel one, by distributing its items 11 /// across the Rayon thread pool. This has the advantage of being able to parallelize just about 12 /// anything, but the resulting `ParallelIterator` can be less efficient than if you started with 13 /// `par_iter` instead. However, it can still be useful for iterators that are difficult to 14 /// parallelize by other means, like channels or file or network I/O. 15 /// 16 /// The resulting iterator is not guaranteed to keep the order of the original iterator. 17 /// 18 /// # Examples 19 /// 20 /// To use this trait, take an existing `Iterator` and call `par_bridge` on it. After that, you can 21 /// use any of the `ParallelIterator` methods: 22 /// 23 /// ``` 24 /// use rayon::iter::ParallelBridge; 25 /// use rayon::prelude::ParallelIterator; 26 /// use std::sync::mpsc::channel; 27 /// 28 /// let rx = { 29 /// let (tx, rx) = channel(); 30 /// 31 /// tx.send("one!"); 32 /// tx.send("two!"); 33 /// tx.send("three!"); 34 /// 35 /// rx 36 /// }; 37 /// 38 /// let mut output: Vec<&'static str> = rx.into_iter().par_bridge().collect(); 39 /// output.sort_unstable(); 40 /// 41 /// assert_eq!(&*output, &["one!", "three!", "two!"]); 42 /// ``` 43 pub trait ParallelBridge: Sized { 44 /// Creates a bridge from this type to a `ParallelIterator`. par_bridge(self) -> IterBridge<Self>45 fn par_bridge(self) -> IterBridge<Self>; 46 } 47 48 impl<T: Iterator + Send> ParallelBridge for T 49 where 50 T::Item: Send, 51 { par_bridge(self) -> IterBridge<Self>52 fn par_bridge(self) -> IterBridge<Self> { 53 IterBridge { iter: self } 54 } 55 } 56 57 /// `IterBridge` is a parallel iterator that wraps a sequential iterator. 58 /// 59 /// This type is created when using the `par_bridge` method on `ParallelBridge`. See the 60 /// [`ParallelBridge`] documentation for details. 61 /// 62 /// [`ParallelBridge`]: trait.ParallelBridge.html 63 #[derive(Debug, Clone)] 64 pub struct IterBridge<Iter> { 65 iter: Iter, 66 } 67 68 impl<Iter: Iterator + Send> ParallelIterator for IterBridge<Iter> 69 where 70 Iter::Item: Send, 71 { 72 type Item = Iter::Item; 73 drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,74 fn drive_unindexed<C>(self, consumer: C) -> C::Result 75 where 76 C: UnindexedConsumer<Self::Item>, 77 { 78 let num_threads = current_num_threads(); 79 let threads_started: Vec<_> = (0..num_threads).map(|_| AtomicBool::new(false)).collect(); 80 81 bridge_unindexed( 82 &IterParallelProducer { 83 split_count: AtomicUsize::new(num_threads), 84 iter: Mutex::new(self.iter.fuse()), 85 threads_started: &threads_started, 86 }, 87 consumer, 88 ) 89 } 90 } 91 92 struct IterParallelProducer<'a, Iter> { 93 split_count: AtomicUsize, 94 iter: Mutex<std::iter::Fuse<Iter>>, 95 threads_started: &'a [AtomicBool], 96 } 97 98 impl<Iter: Iterator + Send> UnindexedProducer for &IterParallelProducer<'_, Iter> { 99 type Item = Iter::Item; 100 split(self) -> (Self, Option<Self>)101 fn split(self) -> (Self, Option<Self>) { 102 let mut count = self.split_count.load(Ordering::SeqCst); 103 104 loop { 105 // Check if the iterator is exhausted 106 if let Some(new_count) = count.checked_sub(1) { 107 match self.split_count.compare_exchange_weak( 108 count, 109 new_count, 110 Ordering::SeqCst, 111 Ordering::SeqCst, 112 ) { 113 Ok(_) => return (self, Some(self)), 114 Err(last_count) => count = last_count, 115 } 116 } else { 117 return (self, None); 118 } 119 } 120 } 121 fold_with<F>(self, mut folder: F) -> F where F: Folder<Self::Item>,122 fn fold_with<F>(self, mut folder: F) -> F 123 where 124 F: Folder<Self::Item>, 125 { 126 // Guard against work-stealing-induced recursion, in case `Iter::next()` 127 // calls rayon internally, so we don't deadlock our mutex. We might also 128 // be recursing via `folder` methods, which doesn't present a mutex hazard, 129 // but it's lower overhead for us to just check this once, rather than 130 // updating additional shared state on every mutex lock/unlock. 131 // (If this isn't a rayon thread, then there's no work-stealing anyway...) 132 if let Some(i) = current_thread_index() { 133 // Note: If the number of threads in the pool ever grows dynamically, then 134 // we'll end up sharing flags and may falsely detect recursion -- that's 135 // still fine for overall correctness, just not optimal for parallelism. 136 let thread_started = &self.threads_started[i % self.threads_started.len()]; 137 if thread_started.swap(true, Ordering::Relaxed) { 138 // We can't make progress with a nested mutex, so just return and let 139 // the outermost loop continue with the rest of the iterator items. 140 return folder; 141 } 142 } 143 144 loop { 145 if let Ok(mut iter) = self.iter.lock() { 146 if let Some(it) = iter.next() { 147 drop(iter); 148 folder = folder.consume(it); 149 if folder.full() { 150 return folder; 151 } 152 } else { 153 return folder; 154 } 155 } else { 156 // any panics from other threads will have been caught by the pool, 157 // and will be re-thrown when joined - just exit 158 return folder; 159 } 160 } 161 } 162 } 163