• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use super::task::Task;
2 use super::FuturesUnordered;
3 use core::marker::PhantomData;
4 use core::pin::Pin;
5 use core::sync::atomic::Ordering::Relaxed;
6 
7 /// Mutable iterator over all futures in the unordered set.
8 #[derive(Debug)]
9 pub struct IterPinMut<'a, Fut> {
10     pub(super) task: *const Task<Fut>,
11     pub(super) len: usize,
12     pub(super) _marker: PhantomData<&'a mut FuturesUnordered<Fut>>,
13 }
14 
15 /// Mutable iterator over all futures in the unordered set.
16 #[derive(Debug)]
17 pub struct IterMut<'a, Fut: Unpin>(pub(super) IterPinMut<'a, Fut>);
18 
19 /// Immutable iterator over all futures in the unordered set.
20 #[derive(Debug)]
21 pub struct IterPinRef<'a, Fut> {
22     pub(super) task: *const Task<Fut>,
23     pub(super) len: usize,
24     pub(super) pending_next_all: *mut Task<Fut>,
25     pub(super) _marker: PhantomData<&'a FuturesUnordered<Fut>>,
26 }
27 
28 /// Immutable iterator over all the futures in the unordered set.
29 #[derive(Debug)]
30 pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>);
31 
32 /// Owned iterator over all futures in the unordered set.
33 #[derive(Debug)]
34 pub struct IntoIter<Fut: Unpin> {
35     pub(super) len: usize,
36     pub(super) inner: FuturesUnordered<Fut>,
37 }
38 
39 impl<Fut: Unpin> Iterator for IntoIter<Fut> {
40     type Item = Fut;
41 
next(&mut self) -> Option<Self::Item>42     fn next(&mut self) -> Option<Self::Item> {
43         // `head_all` can be accessed directly and we don't need to spin on
44         // `Task::next_all` since we have exclusive access to the set.
45         let task = self.inner.head_all.get_mut();
46 
47         if (*task).is_null() {
48             return None;
49         }
50 
51         unsafe {
52             // Moving out of the future is safe because it is `Unpin`
53             let future = (*(**task).future.get()).take().unwrap();
54 
55             // Mutable access to a previously shared `FuturesUnordered` implies
56             // that the other threads already released the object before the
57             // current thread acquired it, so relaxed ordering can be used and
58             // valid `next_all` checks can be skipped.
59             let next = (**task).next_all.load(Relaxed);
60             *task = next;
61             self.len -= 1;
62             Some(future)
63         }
64     }
65 
size_hint(&self) -> (usize, Option<usize>)66     fn size_hint(&self) -> (usize, Option<usize>) {
67         (self.len, Some(self.len))
68     }
69 }
70 
71 impl<Fut: Unpin> ExactSizeIterator for IntoIter<Fut> {}
72 
73 impl<'a, Fut> Iterator for IterPinMut<'a, Fut> {
74     type Item = Pin<&'a mut Fut>;
75 
next(&mut self) -> Option<Self::Item>76     fn next(&mut self) -> Option<Self::Item> {
77         if self.task.is_null() {
78             return None;
79         }
80 
81         unsafe {
82             let future = (*(*self.task).future.get()).as_mut().unwrap();
83 
84             // Mutable access to a previously shared `FuturesUnordered` implies
85             // that the other threads already released the object before the
86             // current thread acquired it, so relaxed ordering can be used and
87             // valid `next_all` checks can be skipped.
88             let next = (*self.task).next_all.load(Relaxed);
89             self.task = next;
90             self.len -= 1;
91             Some(Pin::new_unchecked(future))
92         }
93     }
94 
size_hint(&self) -> (usize, Option<usize>)95     fn size_hint(&self) -> (usize, Option<usize>) {
96         (self.len, Some(self.len))
97     }
98 }
99 
100 impl<Fut> ExactSizeIterator for IterPinMut<'_, Fut> {}
101 
102 impl<'a, Fut: Unpin> Iterator for IterMut<'a, Fut> {
103     type Item = &'a mut Fut;
104 
next(&mut self) -> Option<Self::Item>105     fn next(&mut self) -> Option<Self::Item> {
106         self.0.next().map(Pin::get_mut)
107     }
108 
size_hint(&self) -> (usize, Option<usize>)109     fn size_hint(&self) -> (usize, Option<usize>) {
110         self.0.size_hint()
111     }
112 }
113 
114 impl<Fut: Unpin> ExactSizeIterator for IterMut<'_, Fut> {}
115 
116 impl<'a, Fut> Iterator for IterPinRef<'a, Fut> {
117     type Item = Pin<&'a Fut>;
118 
next(&mut self) -> Option<Self::Item>119     fn next(&mut self) -> Option<Self::Item> {
120         if self.task.is_null() {
121             return None;
122         }
123 
124         unsafe {
125             let future = (*(*self.task).future.get()).as_ref().unwrap();
126 
127             // Relaxed ordering can be used since acquire ordering when
128             // `head_all` was initially read for this iterator implies acquire
129             // ordering for all previously inserted nodes (and we don't need to
130             // read `len_all` again for any other nodes).
131             let next = (*self.task).spin_next_all(self.pending_next_all, Relaxed);
132             self.task = next;
133             self.len -= 1;
134             Some(Pin::new_unchecked(future))
135         }
136     }
137 
size_hint(&self) -> (usize, Option<usize>)138     fn size_hint(&self) -> (usize, Option<usize>) {
139         (self.len, Some(self.len))
140     }
141 }
142 
143 impl<Fut> ExactSizeIterator for IterPinRef<'_, Fut> {}
144 
145 impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> {
146     type Item = &'a Fut;
147 
next(&mut self) -> Option<Self::Item>148     fn next(&mut self) -> Option<Self::Item> {
149         self.0.next().map(Pin::get_ref)
150     }
151 
size_hint(&self) -> (usize, Option<usize>)152     fn size_hint(&self) -> (usize, Option<usize>) {
153         self.0.size_hint()
154     }
155 }
156 
157 impl<Fut: Unpin> ExactSizeIterator for Iter<'_, Fut> {}
158 
159 // SAFETY: we do nothing thread-local and there is no interior mutability,
160 // so the usual structural `Send`/`Sync` apply.
161 unsafe impl<Fut: Send> Send for IterPinRef<'_, Fut> {}
162 unsafe impl<Fut: Sync> Sync for IterPinRef<'_, Fut> {}
163 
164 unsafe impl<Fut: Send> Send for IterPinMut<'_, Fut> {}
165 unsafe impl<Fut: Sync> Sync for IterPinMut<'_, Fut> {}
166 
167 unsafe impl<Fut: Send + Unpin> Send for IntoIter<Fut> {}
168 unsafe impl<Fut: Sync + Unpin> Sync for IntoIter<Fut> {}
169