• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! This module defines an `IdleNotifiedSet`, which is a collection of elements.
2 //! Each element is intended to correspond to a task, and the collection will
3 //! keep track of which tasks have had their waker notified, and which have not.
4 //!
5 //! Each entry in the set holds some user-specified value. The value's type is
6 //! specified using the `T` parameter. It will usually be a `JoinHandle` or
7 //! similar.
8 
9 use std::marker::PhantomPinned;
10 use std::mem::ManuallyDrop;
11 use std::ptr::NonNull;
12 use std::task::{Context, Waker};
13 
14 use crate::loom::cell::UnsafeCell;
15 use crate::loom::sync::{Arc, Mutex};
16 use crate::util::linked_list::{self, Link};
17 use crate::util::{waker_ref, Wake};
18 
19 type LinkedList<T> =
20     linked_list::LinkedList<ListEntry<T>, <ListEntry<T> as linked_list::Link>::Target>;
21 
22 /// This is the main handle to the collection.
23 pub(crate) struct IdleNotifiedSet<T> {
24     lists: Arc<Lists<T>>,
25     length: usize,
26 }
27 
28 /// A handle to an entry that is guaranteed to be stored in the idle or notified
29 /// list of its `IdleNotifiedSet`. This value borrows the `IdleNotifiedSet`
30 /// mutably to prevent the entry from being moved to the `Neither` list, which
31 /// only the `IdleNotifiedSet` may do.
32 ///
33 /// The main consequence of being stored in one of the lists is that the `value`
34 /// field has not yet been consumed.
35 ///
36 /// Note: This entry can be moved from the idle to the notified list while this
37 /// object exists by waking its waker.
38 pub(crate) struct EntryInOneOfTheLists<'a, T> {
39     entry: Arc<ListEntry<T>>,
40     set: &'a mut IdleNotifiedSet<T>,
41 }
42 
43 type Lists<T> = Mutex<ListsInner<T>>;
44 
45 /// The linked lists hold strong references to the ListEntry items, and the
46 /// ListEntry items also hold a strong reference back to the Lists object, but
47 /// the destructor of the `IdleNotifiedSet` will clear the two lists, so once
48 /// that object is destroyed, no ref-cycles will remain.
49 struct ListsInner<T> {
50     notified: LinkedList<T>,
51     idle: LinkedList<T>,
52     /// Whenever an element in the `notified` list is woken, this waker will be
53     /// notified and consumed, if it exists.
54     waker: Option<Waker>,
55 }
56 
57 /// Which of the two lists in the shared Lists object is this entry stored in?
58 ///
59 /// If the value is `Idle`, then an entry's waker may move it to the notified
60 /// list. Otherwise, only the `IdleNotifiedSet` may move it.
61 ///
62 /// If the value is `Neither`, then it is still possible that the entry is in
63 /// some third external list (this happens in `drain`).
64 #[derive(Copy, Clone, Eq, PartialEq)]
65 enum List {
66     Notified,
67     Idle,
68     Neither,
69 }
70 
71 /// An entry in the list.
72 ///
73 /// # Safety
74 ///
75 /// The `my_list` field must only be accessed while holding the mutex in
76 /// `parent`. It is an invariant that the value of `my_list` corresponds to
77 /// which linked list in the `parent` holds this entry. Once this field takes
78 /// the value `Neither`, then it may never be modified again.
79 ///
80 /// If the value of `my_list` is `Notified` or `Idle`, then the `pointers` field
81 /// must only be accessed while holding the mutex. If the value of `my_list` is
82 /// `Neither`, then the `pointers` field may be accessed by the
83 /// `IdleNotifiedSet` (this happens inside `drain`).
84 ///
85 /// The `value` field is owned by the `IdleNotifiedSet` and may only be accessed
86 /// by the `IdleNotifiedSet`. The operation that sets the value of `my_list` to
87 /// `Neither` assumes ownership of the `value`, and it must either drop it or
88 /// move it out from this entry to prevent it from getting leaked. (Since the
89 /// two linked lists are emptied in the destructor of `IdleNotifiedSet`, the
90 /// value should not be leaked.)
91 struct ListEntry<T> {
92     /// The linked list pointers of the list this entry is in.
93     pointers: linked_list::Pointers<ListEntry<T>>,
94     /// Pointer to the shared `Lists` struct.
95     parent: Arc<Lists<T>>,
96     /// The value stored in this entry.
97     value: UnsafeCell<ManuallyDrop<T>>,
98     /// Used to remember which list this entry is in.
99     my_list: UnsafeCell<List>,
100     /// Required by the `linked_list::Pointers` field.
101     _pin: PhantomPinned,
102 }
103 
104 generate_addr_of_methods! {
105     impl<T> ListEntry<T> {
106         unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<ListEntry<T>>> {
107             &self.pointers
108         }
109     }
110 }
111 
112 // With mutable access to the `IdleNotifiedSet`, you can get mutable access to
113 // the values.
114 unsafe impl<T: Send> Send for IdleNotifiedSet<T> {}
115 // With the current API we strictly speaking don't even need `T: Sync`, but we
116 // require it anyway to support adding &self APIs that access the values in the
117 // future.
118 unsafe impl<T: Sync> Sync for IdleNotifiedSet<T> {}
119 
120 // These impls control when it is safe to create a Waker. Since the waker does
121 // not allow access to the value in any way (including its destructor), it is
122 // not necessary for `T` to be Send or Sync.
123 unsafe impl<T> Send for ListEntry<T> {}
124 unsafe impl<T> Sync for ListEntry<T> {}
125 
126 impl<T> IdleNotifiedSet<T> {
127     /// Create a new IdleNotifiedSet.
new() -> Self128     pub(crate) fn new() -> Self {
129         let lists = Mutex::new(ListsInner {
130             notified: LinkedList::new(),
131             idle: LinkedList::new(),
132             waker: None,
133         });
134 
135         IdleNotifiedSet {
136             lists: Arc::new(lists),
137             length: 0,
138         }
139     }
140 
len(&self) -> usize141     pub(crate) fn len(&self) -> usize {
142         self.length
143     }
144 
is_empty(&self) -> bool145     pub(crate) fn is_empty(&self) -> bool {
146         self.length == 0
147     }
148 
149     /// Insert the given value into the `idle` list.
insert_idle(&mut self, value: T) -> EntryInOneOfTheLists<'_, T>150     pub(crate) fn insert_idle(&mut self, value: T) -> EntryInOneOfTheLists<'_, T> {
151         self.length += 1;
152 
153         let entry = Arc::new(ListEntry {
154             parent: self.lists.clone(),
155             value: UnsafeCell::new(ManuallyDrop::new(value)),
156             my_list: UnsafeCell::new(List::Idle),
157             pointers: linked_list::Pointers::new(),
158             _pin: PhantomPinned,
159         });
160 
161         {
162             let mut lock = self.lists.lock();
163             lock.idle.push_front(entry.clone());
164         }
165 
166         // Safety: We just put the entry in the idle list, so it is in one of the lists.
167         EntryInOneOfTheLists { entry, set: self }
168     }
169 
170     /// Pop an entry from the notified list to poll it. The entry is moved to
171     /// the idle list atomically.
pop_notified(&mut self, waker: &Waker) -> Option<EntryInOneOfTheLists<'_, T>>172     pub(crate) fn pop_notified(&mut self, waker: &Waker) -> Option<EntryInOneOfTheLists<'_, T>> {
173         // We don't decrement the length because this call moves the entry to
174         // the idle list rather than removing it.
175         if self.length == 0 {
176             // Fast path.
177             return None;
178         }
179 
180         let mut lock = self.lists.lock();
181 
182         let should_update_waker = match lock.waker.as_mut() {
183             Some(cur_waker) => !waker.will_wake(cur_waker),
184             None => true,
185         };
186         if should_update_waker {
187             lock.waker = Some(waker.clone());
188         }
189 
190         // Pop the entry, returning None if empty.
191         let entry = lock.notified.pop_back()?;
192 
193         lock.idle.push_front(entry.clone());
194 
195         // Safety: We are holding the lock.
196         entry.my_list.with_mut(|ptr| unsafe {
197             *ptr = List::Idle;
198         });
199 
200         drop(lock);
201 
202         // Safety: We just put the entry in the idle list, so it is in one of the lists.
203         Some(EntryInOneOfTheLists { entry, set: self })
204     }
205 
206     /// Call a function on every element in this list.
for_each<F: FnMut(&mut T)>(&mut self, mut func: F)207     pub(crate) fn for_each<F: FnMut(&mut T)>(&mut self, mut func: F) {
208         fn get_ptrs<T>(list: &mut LinkedList<T>, ptrs: &mut Vec<*mut T>) {
209             let mut node = list.last();
210 
211             while let Some(entry) = node {
212                 ptrs.push(entry.value.with_mut(|ptr| {
213                     let ptr: *mut ManuallyDrop<T> = ptr;
214                     let ptr: *mut T = ptr.cast();
215                     ptr
216                 }));
217 
218                 let prev = entry.pointers.get_prev();
219                 node = prev.map(|prev| unsafe { &*prev.as_ptr() });
220             }
221         }
222 
223         // Atomically get a raw pointer to the value of every entry.
224         //
225         // Since this only locks the mutex once, it is not possible for a value
226         // to get moved from the idle list to the notified list during the
227         // operation, which would otherwise result in some value being listed
228         // twice.
229         let mut ptrs = Vec::with_capacity(self.len());
230         {
231             let mut lock = self.lists.lock();
232 
233             get_ptrs(&mut lock.idle, &mut ptrs);
234             get_ptrs(&mut lock.notified, &mut ptrs);
235         }
236         debug_assert_eq!(ptrs.len(), ptrs.capacity());
237 
238         for ptr in ptrs {
239             // Safety: When we grabbed the pointers, the entries were in one of
240             // the two lists. This means that their value was valid at the time,
241             // and it must still be valid because we are the IdleNotifiedSet,
242             // and only we can remove an entry from the two lists. (It's
243             // possible that an entry is moved from one list to the other during
244             // this loop, but that is ok.)
245             func(unsafe { &mut *ptr });
246         }
247     }
248 
249     /// Remove all entries in both lists, applying some function to each element.
250     ///
251     /// The closure is called on all elements even if it panics. Having it panic
252     /// twice is a double-panic, and will abort the application.
drain<F: FnMut(T)>(&mut self, func: F)253     pub(crate) fn drain<F: FnMut(T)>(&mut self, func: F) {
254         if self.length == 0 {
255             // Fast path.
256             return;
257         }
258         self.length = 0;
259 
260         // The LinkedList is not cleared on panic, so we use a bomb to clear it.
261         //
262         // This value has the invariant that any entry in its `all_entries` list
263         // has `my_list` set to `Neither` and that the value has not yet been
264         // dropped.
265         struct AllEntries<T, F: FnMut(T)> {
266             all_entries: LinkedList<T>,
267             func: F,
268         }
269 
270         impl<T, F: FnMut(T)> AllEntries<T, F> {
271             fn pop_next(&mut self) -> bool {
272                 if let Some(entry) = self.all_entries.pop_back() {
273                     // Safety: We just took this value from the list, so we can
274                     // destroy the value in the entry.
275                     entry
276                         .value
277                         .with_mut(|ptr| unsafe { (self.func)(ManuallyDrop::take(&mut *ptr)) });
278                     true
279                 } else {
280                     false
281                 }
282             }
283         }
284 
285         impl<T, F: FnMut(T)> Drop for AllEntries<T, F> {
286             fn drop(&mut self) {
287                 while self.pop_next() {}
288             }
289         }
290 
291         let mut all_entries = AllEntries {
292             all_entries: LinkedList::new(),
293             func,
294         };
295 
296         // Atomically move all entries to the new linked list in the AllEntries
297         // object.
298         {
299             let mut lock = self.lists.lock();
300             unsafe {
301                 // Safety: We are holding the lock and `all_entries` is a new
302                 // LinkedList.
303                 move_to_new_list(&mut lock.idle, &mut all_entries.all_entries);
304                 move_to_new_list(&mut lock.notified, &mut all_entries.all_entries);
305             }
306         }
307 
308         // Keep destroying entries in the list until it is empty.
309         //
310         // If the closure panics, then the destructor of the `AllEntries` bomb
311         // ensures that we keep running the destructor on the remaining values.
312         // A second panic will abort the program.
313         while all_entries.pop_next() {}
314     }
315 }
316 
317 /// # Safety
318 ///
319 /// The mutex for the entries must be held, and the target list must be such
320 /// that setting `my_list` to `Neither` is ok.
move_to_new_list<T>(from: &mut LinkedList<T>, to: &mut LinkedList<T>)321 unsafe fn move_to_new_list<T>(from: &mut LinkedList<T>, to: &mut LinkedList<T>) {
322     while let Some(entry) = from.pop_back() {
323         entry.my_list.with_mut(|ptr| {
324             *ptr = List::Neither;
325         });
326         to.push_front(entry);
327     }
328 }
329 
330 impl<'a, T> EntryInOneOfTheLists<'a, T> {
331     /// Remove this entry from the list it is in, returning the value associated
332     /// with the entry.
333     ///
334     /// This consumes the value, since it is no longer guaranteed to be in a
335     /// list.
remove(self) -> T336     pub(crate) fn remove(self) -> T {
337         self.set.length -= 1;
338 
339         {
340             let mut lock = self.set.lists.lock();
341 
342             // Safety: We are holding the lock so there is no race, and we will
343             // remove the entry afterwards to uphold invariants.
344             let old_my_list = self.entry.my_list.with_mut(|ptr| unsafe {
345                 let old_my_list = *ptr;
346                 *ptr = List::Neither;
347                 old_my_list
348             });
349 
350             let list = match old_my_list {
351                 List::Idle => &mut lock.idle,
352                 List::Notified => &mut lock.notified,
353                 // An entry in one of the lists is in one of the lists.
354                 List::Neither => unreachable!(),
355             };
356 
357             unsafe {
358                 // Safety: We just checked that the entry is in this particular
359                 // list.
360                 list.remove(ListEntry::as_raw(&self.entry)).unwrap();
361             }
362         }
363 
364         // By setting `my_list` to `Neither`, we have taken ownership of the
365         // value. We return it to the caller.
366         //
367         // Safety: We have a mutable reference to the `IdleNotifiedSet` that
368         // owns this entry, so we can use its permission to access the value.
369         self.entry
370             .value
371             .with_mut(|ptr| unsafe { ManuallyDrop::take(&mut *ptr) })
372     }
373 
374     /// Access the value in this entry together with a context for its waker.
with_value_and_context<F, U>(&mut self, func: F) -> U where F: FnOnce(&mut T, &mut Context<'_>) -> U, T: 'static,375     pub(crate) fn with_value_and_context<F, U>(&mut self, func: F) -> U
376     where
377         F: FnOnce(&mut T, &mut Context<'_>) -> U,
378         T: 'static,
379     {
380         let waker = waker_ref(&self.entry);
381 
382         let mut context = Context::from_waker(&waker);
383 
384         // Safety: We have a mutable reference to the `IdleNotifiedSet` that
385         // owns this entry, so we can use its permission to access the value.
386         self.entry
387             .value
388             .with_mut(|ptr| unsafe { func(&mut *ptr, &mut context) })
389     }
390 }
391 
392 impl<T> Drop for IdleNotifiedSet<T> {
drop(&mut self)393     fn drop(&mut self) {
394         // Clear both lists.
395         self.drain(drop);
396 
397         #[cfg(debug_assertions)]
398         if !std::thread::panicking() {
399             let lock = self.lists.lock();
400             assert!(lock.idle.is_empty());
401             assert!(lock.notified.is_empty());
402         }
403     }
404 }
405 
406 impl<T: 'static> Wake for ListEntry<T> {
wake_by_ref(me: &Arc<Self>)407     fn wake_by_ref(me: &Arc<Self>) {
408         let mut lock = me.parent.lock();
409 
410         // Safety: We are holding the lock and we will update the lists to
411         // maintain invariants.
412         let old_my_list = me.my_list.with_mut(|ptr| unsafe {
413             let old_my_list = *ptr;
414             if old_my_list == List::Idle {
415                 *ptr = List::Notified;
416             }
417             old_my_list
418         });
419 
420         if old_my_list == List::Idle {
421             // We move ourself to the notified list.
422             let me = unsafe {
423                 // Safety: We just checked that we are in this particular list.
424                 lock.idle.remove(ListEntry::as_raw(me)).unwrap()
425             };
426             lock.notified.push_front(me);
427 
428             if let Some(waker) = lock.waker.take() {
429                 drop(lock);
430                 waker.wake();
431             }
432         }
433     }
434 
wake(me: Arc<Self>)435     fn wake(me: Arc<Self>) {
436         Self::wake_by_ref(&me)
437     }
438 }
439 
440 /// # Safety
441 ///
442 /// `ListEntry` is forced to be !Unpin.
443 unsafe impl<T> linked_list::Link for ListEntry<T> {
444     type Handle = Arc<ListEntry<T>>;
445     type Target = ListEntry<T>;
446 
as_raw(handle: &Self::Handle) -> NonNull<ListEntry<T>>447     fn as_raw(handle: &Self::Handle) -> NonNull<ListEntry<T>> {
448         let ptr: *const ListEntry<T> = Arc::as_ptr(handle);
449         // Safety: We can't get a null pointer from `Arc::as_ptr`.
450         unsafe { NonNull::new_unchecked(ptr as *mut ListEntry<T>) }
451     }
452 
from_raw(ptr: NonNull<ListEntry<T>>) -> Arc<ListEntry<T>>453     unsafe fn from_raw(ptr: NonNull<ListEntry<T>>) -> Arc<ListEntry<T>> {
454         Arc::from_raw(ptr.as_ptr())
455     }
456 
pointers( target: NonNull<ListEntry<T>>, ) -> NonNull<linked_list::Pointers<ListEntry<T>>>457     unsafe fn pointers(
458         target: NonNull<ListEntry<T>>,
459     ) -> NonNull<linked_list::Pointers<ListEntry<T>>> {
460         ListEntry::addr_of_pointers(target)
461     }
462 }
463 
464 #[cfg(all(test, not(loom)))]
465 mod tests {
466     use crate::runtime::Builder;
467     use crate::task::JoinSet;
468 
469     // A test that runs under miri.
470     //
471     // https://github.com/tokio-rs/tokio/pull/5693
472     #[test]
join_set_test()473     fn join_set_test() {
474         let rt = Builder::new_current_thread().build().unwrap();
475 
476         let mut set = JoinSet::new();
477         set.spawn_on(futures::future::ready(()), rt.handle());
478 
479         rt.block_on(set.join_next()).unwrap().unwrap();
480     }
481 }
482