1 use crate::task::AtomicWaker; 2 use alloc::sync::Arc; 3 use core::cell::UnsafeCell; 4 use core::ptr; 5 use core::sync::atomic::AtomicPtr; 6 use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; 7 8 use super::abort::abort; 9 use super::task::Task; 10 11 pub(super) enum Dequeue<Fut> { 12 Data(*const Task<Fut>), 13 Empty, 14 Inconsistent, 15 } 16 17 pub(super) struct ReadyToRunQueue<Fut> { 18 // The waker of the task using `FuturesUnordered`. 19 pub(super) waker: AtomicWaker, 20 21 // Head/tail of the readiness queue 22 pub(super) head: AtomicPtr<Task<Fut>>, 23 pub(super) tail: UnsafeCell<*const Task<Fut>>, 24 pub(super) stub: Arc<Task<Fut>>, 25 } 26 27 /// An MPSC queue into which the tasks containing the futures are inserted 28 /// whenever the future inside is scheduled for polling. 29 impl<Fut> ReadyToRunQueue<Fut> { 30 // FIXME: this takes raw pointer without safety conditions. 31 32 /// The enqueue function from the 1024cores intrusive MPSC queue algorithm. enqueue(&self, task: *const Task<Fut>)33 pub(super) fn enqueue(&self, task: *const Task<Fut>) { 34 unsafe { 35 debug_assert!((*task).queued.load(Relaxed)); 36 37 // This action does not require any coordination 38 (*task).next_ready_to_run.store(ptr::null_mut(), Relaxed); 39 40 // Note that these atomic orderings come from 1024cores 41 let task = task as *mut _; 42 let prev = self.head.swap(task, AcqRel); 43 (*prev).next_ready_to_run.store(task, Release); 44 } 45 } 46 47 /// The dequeue function from the 1024cores intrusive MPSC queue algorithm 48 /// 49 /// Note that this is unsafe as it required mutual exclusion (only one 50 /// thread can call this) to be guaranteed elsewhere. dequeue(&self) -> Dequeue<Fut>51 pub(super) unsafe fn dequeue(&self) -> Dequeue<Fut> { 52 unsafe { 53 let mut tail = *self.tail.get(); 54 let mut next = (*tail).next_ready_to_run.load(Acquire); 55 56 if tail == self.stub() { 57 if next.is_null() { 58 return Dequeue::Empty; 59 } 60 61 *self.tail.get() = next; 62 tail = next; 63 next = (*next).next_ready_to_run.load(Acquire); 64 } 65 66 if !next.is_null() { 67 *self.tail.get() = next; 68 debug_assert!(tail != self.stub()); 69 return Dequeue::Data(tail); 70 } 71 72 if self.head.load(Acquire) as *const _ != tail { 73 return Dequeue::Inconsistent; 74 } 75 76 self.enqueue(self.stub()); 77 78 next = (*tail).next_ready_to_run.load(Acquire); 79 80 if !next.is_null() { 81 *self.tail.get() = next; 82 return Dequeue::Data(tail); 83 } 84 85 Dequeue::Inconsistent 86 } 87 } 88 stub(&self) -> *const Task<Fut>89 pub(super) fn stub(&self) -> *const Task<Fut> { 90 Arc::as_ptr(&self.stub) 91 } 92 } 93 94 impl<Fut> Drop for ReadyToRunQueue<Fut> { drop(&mut self)95 fn drop(&mut self) { 96 // Once we're in the destructor for `Inner<Fut>` we need to clear out 97 // the ready to run queue of tasks if there's anything left in there. 98 // 99 // Note that each task has a strong reference count associated with it 100 // which is owned by the ready to run queue. All tasks should have had 101 // their futures dropped already by the `FuturesUnordered` destructor 102 // above, so we're just pulling out tasks and dropping their refcounts. 103 unsafe { 104 loop { 105 match self.dequeue() { 106 Dequeue::Empty => break, 107 Dequeue::Inconsistent => abort("inconsistent in drop"), 108 Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)), 109 } 110 } 111 } 112 } 113 } 114