• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2 use std::sync::Mutex;
3 
4 use crate::iter::plumbing::{bridge_unindexed, Folder, UnindexedConsumer, UnindexedProducer};
5 use crate::iter::ParallelIterator;
6 use crate::{current_num_threads, current_thread_index};
7 
8 /// Conversion trait to convert an `Iterator` to a `ParallelIterator`.
9 ///
10 /// This creates a "bridge" from a sequential iterator to a parallel one, by distributing its items
11 /// across the Rayon thread pool. This has the advantage of being able to parallelize just about
12 /// anything, but the resulting `ParallelIterator` can be less efficient than if you started with
13 /// `par_iter` instead. However, it can still be useful for iterators that are difficult to
14 /// parallelize by other means, like channels or file or network I/O.
15 ///
16 /// The resulting iterator is not guaranteed to keep the order of the original iterator.
17 ///
18 /// # Examples
19 ///
20 /// To use this trait, take an existing `Iterator` and call `par_bridge` on it. After that, you can
21 /// use any of the `ParallelIterator` methods:
22 ///
23 /// ```
24 /// use rayon::iter::ParallelBridge;
25 /// use rayon::prelude::ParallelIterator;
26 /// use std::sync::mpsc::channel;
27 ///
28 /// let rx = {
29 ///     let (tx, rx) = channel();
30 ///
31 ///     tx.send("one!");
32 ///     tx.send("two!");
33 ///     tx.send("three!");
34 ///
35 ///     rx
36 /// };
37 ///
38 /// let mut output: Vec<&'static str> = rx.into_iter().par_bridge().collect();
39 /// output.sort_unstable();
40 ///
41 /// assert_eq!(&*output, &["one!", "three!", "two!"]);
42 /// ```
43 pub trait ParallelBridge: Sized {
44     /// Creates a bridge from this type to a `ParallelIterator`.
par_bridge(self) -> IterBridge<Self>45     fn par_bridge(self) -> IterBridge<Self>;
46 }
47 
48 impl<T: Iterator + Send> ParallelBridge for T
49 where
50     T::Item: Send,
51 {
par_bridge(self) -> IterBridge<Self>52     fn par_bridge(self) -> IterBridge<Self> {
53         IterBridge { iter: self }
54     }
55 }
56 
57 /// `IterBridge` is a parallel iterator that wraps a sequential iterator.
58 ///
59 /// This type is created when using the `par_bridge` method on `ParallelBridge`. See the
60 /// [`ParallelBridge`] documentation for details.
61 ///
62 /// [`ParallelBridge`]: trait.ParallelBridge.html
63 #[derive(Debug, Clone)]
64 pub struct IterBridge<Iter> {
65     iter: Iter,
66 }
67 
68 impl<Iter: Iterator + Send> ParallelIterator for IterBridge<Iter>
69 where
70     Iter::Item: Send,
71 {
72     type Item = Iter::Item;
73 
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,74     fn drive_unindexed<C>(self, consumer: C) -> C::Result
75     where
76         C: UnindexedConsumer<Self::Item>,
77     {
78         let num_threads = current_num_threads();
79         let threads_started: Vec<_> = (0..num_threads).map(|_| AtomicBool::new(false)).collect();
80 
81         bridge_unindexed(
82             &IterParallelProducer {
83                 split_count: AtomicUsize::new(num_threads),
84                 iter: Mutex::new(self.iter.fuse()),
85                 threads_started: &threads_started,
86             },
87             consumer,
88         )
89     }
90 }
91 
92 struct IterParallelProducer<'a, Iter> {
93     split_count: AtomicUsize,
94     iter: Mutex<std::iter::Fuse<Iter>>,
95     threads_started: &'a [AtomicBool],
96 }
97 
98 impl<Iter: Iterator + Send> UnindexedProducer for &IterParallelProducer<'_, Iter> {
99     type Item = Iter::Item;
100 
split(self) -> (Self, Option<Self>)101     fn split(self) -> (Self, Option<Self>) {
102         let mut count = self.split_count.load(Ordering::SeqCst);
103 
104         loop {
105             // Check if the iterator is exhausted
106             if let Some(new_count) = count.checked_sub(1) {
107                 match self.split_count.compare_exchange_weak(
108                     count,
109                     new_count,
110                     Ordering::SeqCst,
111                     Ordering::SeqCst,
112                 ) {
113                     Ok(_) => return (self, Some(self)),
114                     Err(last_count) => count = last_count,
115                 }
116             } else {
117                 return (self, None);
118             }
119         }
120     }
121 
fold_with<F>(self, mut folder: F) -> F where F: Folder<Self::Item>,122     fn fold_with<F>(self, mut folder: F) -> F
123     where
124         F: Folder<Self::Item>,
125     {
126         // Guard against work-stealing-induced recursion, in case `Iter::next()`
127         // calls rayon internally, so we don't deadlock our mutex. We might also
128         // be recursing via `folder` methods, which doesn't present a mutex hazard,
129         // but it's lower overhead for us to just check this once, rather than
130         // updating additional shared state on every mutex lock/unlock.
131         // (If this isn't a rayon thread, then there's no work-stealing anyway...)
132         if let Some(i) = current_thread_index() {
133             // Note: If the number of threads in the pool ever grows dynamically, then
134             // we'll end up sharing flags and may falsely detect recursion -- that's
135             // still fine for overall correctness, just not optimal for parallelism.
136             let thread_started = &self.threads_started[i % self.threads_started.len()];
137             if thread_started.swap(true, Ordering::Relaxed) {
138                 // We can't make progress with a nested mutex, so just return and let
139                 // the outermost loop continue with the rest of the iterator items.
140                 return folder;
141             }
142         }
143 
144         loop {
145             if let Ok(mut iter) = self.iter.lock() {
146                 if let Some(it) = iter.next() {
147                     drop(iter);
148                     folder = folder.consume(it);
149                     if folder.full() {
150                         return folder;
151                     }
152                 } else {
153                     return folder;
154                 }
155             } else {
156                 // any panics from other threads will have been caught by the pool,
157                 // and will be re-thrown when joined - just exit
158                 return folder;
159             }
160         }
161     }
162 }
163