1 use super::task::Task; 2 use super::FuturesUnordered; 3 use core::marker::PhantomData; 4 use core::pin::Pin; 5 use core::sync::atomic::Ordering::Relaxed; 6 7 /// Mutable iterator over all futures in the unordered set. 8 #[derive(Debug)] 9 pub struct IterPinMut<'a, Fut> { 10 pub(super) task: *const Task<Fut>, 11 pub(super) len: usize, 12 pub(super) _marker: PhantomData<&'a mut FuturesUnordered<Fut>>, 13 } 14 15 /// Mutable iterator over all futures in the unordered set. 16 #[derive(Debug)] 17 pub struct IterMut<'a, Fut: Unpin>(pub(super) IterPinMut<'a, Fut>); 18 19 /// Immutable iterator over all futures in the unordered set. 20 #[derive(Debug)] 21 pub struct IterPinRef<'a, Fut> { 22 pub(super) task: *const Task<Fut>, 23 pub(super) len: usize, 24 pub(super) pending_next_all: *mut Task<Fut>, 25 pub(super) _marker: PhantomData<&'a FuturesUnordered<Fut>>, 26 } 27 28 /// Immutable iterator over all the futures in the unordered set. 29 #[derive(Debug)] 30 pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>); 31 32 /// Owned iterator over all futures in the unordered set. 33 #[derive(Debug)] 34 pub struct IntoIter<Fut: Unpin> { 35 pub(super) len: usize, 36 pub(super) inner: FuturesUnordered<Fut>, 37 } 38 39 impl<Fut: Unpin> Iterator for IntoIter<Fut> { 40 type Item = Fut; 41 next(&mut self) -> Option<Self::Item>42 fn next(&mut self) -> Option<Self::Item> { 43 // `head_all` can be accessed directly and we don't need to spin on 44 // `Task::next_all` since we have exclusive access to the set. 45 let task = self.inner.head_all.get_mut(); 46 47 if (*task).is_null() { 48 return None; 49 } 50 51 unsafe { 52 // Moving out of the future is safe because it is `Unpin` 53 let future = (*(**task).future.get()).take().unwrap(); 54 55 // Mutable access to a previously shared `FuturesUnordered` implies 56 // that the other threads already released the object before the 57 // current thread acquired it, so relaxed ordering can be used and 58 // valid `next_all` checks can be skipped. 59 let next = (**task).next_all.load(Relaxed); 60 *task = next; 61 self.len -= 1; 62 Some(future) 63 } 64 } 65 size_hint(&self) -> (usize, Option<usize>)66 fn size_hint(&self) -> (usize, Option<usize>) { 67 (self.len, Some(self.len)) 68 } 69 } 70 71 impl<Fut: Unpin> ExactSizeIterator for IntoIter<Fut> {} 72 73 impl<'a, Fut> Iterator for IterPinMut<'a, Fut> { 74 type Item = Pin<&'a mut Fut>; 75 next(&mut self) -> Option<Self::Item>76 fn next(&mut self) -> Option<Self::Item> { 77 if self.task.is_null() { 78 return None; 79 } 80 81 unsafe { 82 let future = (*(*self.task).future.get()).as_mut().unwrap(); 83 84 // Mutable access to a previously shared `FuturesUnordered` implies 85 // that the other threads already released the object before the 86 // current thread acquired it, so relaxed ordering can be used and 87 // valid `next_all` checks can be skipped. 88 let next = (*self.task).next_all.load(Relaxed); 89 self.task = next; 90 self.len -= 1; 91 Some(Pin::new_unchecked(future)) 92 } 93 } 94 size_hint(&self) -> (usize, Option<usize>)95 fn size_hint(&self) -> (usize, Option<usize>) { 96 (self.len, Some(self.len)) 97 } 98 } 99 100 impl<Fut> ExactSizeIterator for IterPinMut<'_, Fut> {} 101 102 impl<'a, Fut: Unpin> Iterator for IterMut<'a, Fut> { 103 type Item = &'a mut Fut; 104 next(&mut self) -> Option<Self::Item>105 fn next(&mut self) -> Option<Self::Item> { 106 self.0.next().map(Pin::get_mut) 107 } 108 size_hint(&self) -> (usize, Option<usize>)109 fn size_hint(&self) -> (usize, Option<usize>) { 110 self.0.size_hint() 111 } 112 } 113 114 impl<Fut: Unpin> ExactSizeIterator for IterMut<'_, Fut> {} 115 116 impl<'a, Fut> Iterator for IterPinRef<'a, Fut> { 117 type Item = Pin<&'a Fut>; 118 next(&mut self) -> Option<Self::Item>119 fn next(&mut self) -> Option<Self::Item> { 120 if self.task.is_null() { 121 return None; 122 } 123 124 unsafe { 125 let future = (*(*self.task).future.get()).as_ref().unwrap(); 126 127 // Relaxed ordering can be used since acquire ordering when 128 // `head_all` was initially read for this iterator implies acquire 129 // ordering for all previously inserted nodes (and we don't need to 130 // read `len_all` again for any other nodes). 131 let next = (*self.task).spin_next_all(self.pending_next_all, Relaxed); 132 self.task = next; 133 self.len -= 1; 134 Some(Pin::new_unchecked(future)) 135 } 136 } 137 size_hint(&self) -> (usize, Option<usize>)138 fn size_hint(&self) -> (usize, Option<usize>) { 139 (self.len, Some(self.len)) 140 } 141 } 142 143 impl<Fut> ExactSizeIterator for IterPinRef<'_, Fut> {} 144 145 impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> { 146 type Item = &'a Fut; 147 next(&mut self) -> Option<Self::Item>148 fn next(&mut self) -> Option<Self::Item> { 149 self.0.next().map(Pin::get_ref) 150 } 151 size_hint(&self) -> (usize, Option<usize>)152 fn size_hint(&self) -> (usize, Option<usize>) { 153 self.0.size_hint() 154 } 155 } 156 157 impl<Fut: Unpin> ExactSizeIterator for Iter<'_, Fut> {} 158 159 // SAFETY: we do nothing thread-local and there is no interior mutability, 160 // so the usual structural `Send`/`Sync` apply. 161 unsafe impl<Fut: Send> Send for IterPinRef<'_, Fut> {} 162 unsafe impl<Fut: Sync> Sync for IterPinRef<'_, Fut> {} 163 164 unsafe impl<Fut: Send> Send for IterPinMut<'_, Fut> {} 165 unsafe impl<Fut: Sync> Sync for IterPinMut<'_, Fut> {} 166 167 unsafe impl<Fut: Send + Unpin> Send for IntoIter<Fut> {} 168 unsafe impl<Fut: Sync + Unpin> Sync for IntoIter<Fut> {} 169