• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::task::AtomicWaker;
2 use alloc::sync::Arc;
3 use core::cell::UnsafeCell;
4 use core::ptr;
5 use core::sync::atomic::AtomicPtr;
6 use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
7 
8 use super::abort::abort;
9 use super::task::Task;
10 
11 pub(super) enum Dequeue<Fut> {
12     Data(*const Task<Fut>),
13     Empty,
14     Inconsistent,
15 }
16 
17 pub(super) struct ReadyToRunQueue<Fut> {
18     // The waker of the task using `FuturesUnordered`.
19     pub(super) waker: AtomicWaker,
20 
21     // Head/tail of the readiness queue
22     pub(super) head: AtomicPtr<Task<Fut>>,
23     pub(super) tail: UnsafeCell<*const Task<Fut>>,
24     pub(super) stub: Arc<Task<Fut>>,
25 }
26 
27 /// An MPSC queue into which the tasks containing the futures are inserted
28 /// whenever the future inside is scheduled for polling.
29 impl<Fut> ReadyToRunQueue<Fut> {
30     // FIXME: this takes raw pointer without safety conditions.
31 
32     /// The enqueue function from the 1024cores intrusive MPSC queue algorithm.
enqueue(&self, task: *const Task<Fut>)33     pub(super) fn enqueue(&self, task: *const Task<Fut>) {
34         unsafe {
35             debug_assert!((*task).queued.load(Relaxed));
36 
37             // This action does not require any coordination
38             (*task).next_ready_to_run.store(ptr::null_mut(), Relaxed);
39 
40             // Note that these atomic orderings come from 1024cores
41             let task = task as *mut _;
42             let prev = self.head.swap(task, AcqRel);
43             (*prev).next_ready_to_run.store(task, Release);
44         }
45     }
46 
47     /// The dequeue function from the 1024cores intrusive MPSC queue algorithm
48     ///
49     /// Note that this is unsafe as it required mutual exclusion (only one
50     /// thread can call this) to be guaranteed elsewhere.
dequeue(&self) -> Dequeue<Fut>51     pub(super) unsafe fn dequeue(&self) -> Dequeue<Fut> {
52         unsafe {
53             let mut tail = *self.tail.get();
54             let mut next = (*tail).next_ready_to_run.load(Acquire);
55 
56             if tail == self.stub() {
57                 if next.is_null() {
58                     return Dequeue::Empty;
59                 }
60 
61                 *self.tail.get() = next;
62                 tail = next;
63                 next = (*next).next_ready_to_run.load(Acquire);
64             }
65 
66             if !next.is_null() {
67                 *self.tail.get() = next;
68                 debug_assert!(tail != self.stub());
69                 return Dequeue::Data(tail);
70             }
71 
72             if self.head.load(Acquire) as *const _ != tail {
73                 return Dequeue::Inconsistent;
74             }
75 
76             self.enqueue(self.stub());
77 
78             next = (*tail).next_ready_to_run.load(Acquire);
79 
80             if !next.is_null() {
81                 *self.tail.get() = next;
82                 return Dequeue::Data(tail);
83             }
84 
85             Dequeue::Inconsistent
86         }
87     }
88 
stub(&self) -> *const Task<Fut>89     pub(super) fn stub(&self) -> *const Task<Fut> {
90         Arc::as_ptr(&self.stub)
91     }
92 }
93 
94 impl<Fut> Drop for ReadyToRunQueue<Fut> {
drop(&mut self)95     fn drop(&mut self) {
96         // Once we're in the destructor for `Inner<Fut>` we need to clear out
97         // the ready to run queue of tasks if there's anything left in there.
98         //
99         // Note that each task has a strong reference count associated with it
100         // which is owned by the ready to run queue. All tasks should have had
101         // their futures dropped already by the `FuturesUnordered` destructor
102         // above, so we're just pulling out tasks and dropping their refcounts.
103         unsafe {
104             loop {
105                 match self.dequeue() {
106                     Dequeue::Empty => break,
107                     Dequeue::Inconsistent => abort("inconsistent in drop"),
108                     Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)),
109                 }
110             }
111         }
112     }
113 }
114