• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crossbeam_deque::{Steal, Stealer, Worker};
2 
3 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
4 use std::sync::{Mutex, TryLockError};
5 use std::thread::yield_now;
6 
7 use crate::current_num_threads;
8 use crate::iter::plumbing::{bridge_unindexed, Folder, UnindexedConsumer, UnindexedProducer};
9 use crate::iter::ParallelIterator;
10 
11 /// Conversion trait to convert an `Iterator` to a `ParallelIterator`.
12 ///
13 /// This creates a "bridge" from a sequential iterator to a parallel one, by distributing its items
14 /// across the Rayon thread pool. This has the advantage of being able to parallelize just about
15 /// anything, but the resulting `ParallelIterator` can be less efficient than if you started with
16 /// `par_iter` instead. However, it can still be useful for iterators that are difficult to
17 /// parallelize by other means, like channels or file or network I/O.
18 ///
19 /// The resulting iterator is not guaranteed to keep the order of the original iterator.
20 ///
21 /// # Examples
22 ///
23 /// To use this trait, take an existing `Iterator` and call `par_bridge` on it. After that, you can
24 /// use any of the `ParallelIterator` methods:
25 ///
26 /// ```
27 /// use rayon::iter::ParallelBridge;
28 /// use rayon::prelude::ParallelIterator;
29 /// use std::sync::mpsc::channel;
30 ///
31 /// let rx = {
32 ///     let (tx, rx) = channel();
33 ///
34 ///     tx.send("one!");
35 ///     tx.send("two!");
36 ///     tx.send("three!");
37 ///
38 ///     rx
39 /// };
40 ///
41 /// let mut output: Vec<&'static str> = rx.into_iter().par_bridge().collect();
42 /// output.sort_unstable();
43 ///
44 /// assert_eq!(&*output, &["one!", "three!", "two!"]);
45 /// ```
46 pub trait ParallelBridge: Sized {
47     /// Creates a bridge from this type to a `ParallelIterator`.
par_bridge(self) -> IterBridge<Self>48     fn par_bridge(self) -> IterBridge<Self>;
49 }
50 
51 impl<T: Iterator + Send> ParallelBridge for T
52 where
53     T::Item: Send,
54 {
par_bridge(self) -> IterBridge<Self>55     fn par_bridge(self) -> IterBridge<Self> {
56         IterBridge { iter: self }
57     }
58 }
59 
60 /// `IterBridge` is a parallel iterator that wraps a sequential iterator.
61 ///
62 /// This type is created when using the `par_bridge` method on `ParallelBridge`. See the
63 /// [`ParallelBridge`] documentation for details.
64 ///
65 /// [`ParallelBridge`]: trait.ParallelBridge.html
66 #[derive(Debug, Clone)]
67 pub struct IterBridge<Iter> {
68     iter: Iter,
69 }
70 
71 impl<Iter: Iterator + Send> ParallelIterator for IterBridge<Iter>
72 where
73     Iter::Item: Send,
74 {
75     type Item = Iter::Item;
76 
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,77     fn drive_unindexed<C>(self, consumer: C) -> C::Result
78     where
79         C: UnindexedConsumer<Self::Item>,
80     {
81         let split_count = AtomicUsize::new(current_num_threads());
82         let worker = Worker::new_fifo();
83         let stealer = worker.stealer();
84         let done = AtomicBool::new(false);
85         let iter = Mutex::new((self.iter, worker));
86 
87         bridge_unindexed(
88             IterParallelProducer {
89                 split_count: &split_count,
90                 done: &done,
91                 iter: &iter,
92                 items: stealer,
93             },
94             consumer,
95         )
96     }
97 }
98 
99 struct IterParallelProducer<'a, Iter: Iterator> {
100     split_count: &'a AtomicUsize,
101     done: &'a AtomicBool,
102     iter: &'a Mutex<(Iter, Worker<Iter::Item>)>,
103     items: Stealer<Iter::Item>,
104 }
105 
106 // manual clone because T doesn't need to be Clone, but the derive assumes it should be
107 impl<'a, Iter: Iterator + 'a> Clone for IterParallelProducer<'a, Iter> {
clone(&self) -> Self108     fn clone(&self) -> Self {
109         IterParallelProducer {
110             split_count: self.split_count,
111             done: self.done,
112             iter: self.iter,
113             items: self.items.clone(),
114         }
115     }
116 }
117 
118 impl<'a, Iter: Iterator + Send + 'a> UnindexedProducer for IterParallelProducer<'a, Iter>
119 where
120     Iter::Item: Send,
121 {
122     type Item = Iter::Item;
123 
split(self) -> (Self, Option<Self>)124     fn split(self) -> (Self, Option<Self>) {
125         let mut count = self.split_count.load(Ordering::SeqCst);
126 
127         loop {
128             let done = self.done.load(Ordering::SeqCst);
129             match count.checked_sub(1) {
130                 Some(new_count) if !done => {
131                     let last_count =
132                         self.split_count
133                             .compare_and_swap(count, new_count, Ordering::SeqCst);
134                     if last_count == count {
135                         return (self.clone(), Some(self));
136                     } else {
137                         count = last_count;
138                     }
139                 }
140                 _ => {
141                     return (self, None);
142                 }
143             }
144         }
145     }
146 
fold_with<F>(self, mut folder: F) -> F where F: Folder<Self::Item>,147     fn fold_with<F>(self, mut folder: F) -> F
148     where
149         F: Folder<Self::Item>,
150     {
151         loop {
152             match self.items.steal() {
153                 Steal::Success(it) => {
154                     folder = folder.consume(it);
155                     if folder.full() {
156                         return folder;
157                     }
158                 }
159                 Steal::Empty => {
160                     if self.done.load(Ordering::SeqCst) {
161                         // the iterator is out of items, no use in continuing
162                         return folder;
163                     } else {
164                         // our cache is out of items, time to load more from the iterator
165                         match self.iter.try_lock() {
166                             Ok(mut guard) => {
167                                 let count = current_num_threads();
168                                 let count = (count * count) * 2;
169 
170                                 let (ref mut iter, ref worker) = *guard;
171 
172                                 // while worker.len() < count {
173                                 // FIXME the new deque doesn't let us count items.  We can just
174                                 // push a number of items, but that doesn't consider active
175                                 // stealers elsewhere.
176                                 for _ in 0..count {
177                                     if let Some(it) = iter.next() {
178                                         worker.push(it);
179                                     } else {
180                                         self.done.store(true, Ordering::SeqCst);
181                                         break;
182                                     }
183                                 }
184                             }
185                             Err(TryLockError::WouldBlock) => {
186                                 // someone else has the mutex, just sit tight until it's ready
187                                 yield_now(); //TODO: use a thread=pool-aware yield? (#548)
188                             }
189                             Err(TryLockError::Poisoned(_)) => {
190                                 // any panics from other threads will have been caught by the pool,
191                                 // and will be re-thrown when joined - just exit
192                                 return folder;
193                             }
194                         }
195                     }
196                 }
197                 Steal::Retry => (),
198             }
199         }
200     }
201 }
202