1 //! Inject queue used to send wakeups to a work-stealing scheduler 2 3 use crate::loom::sync::Mutex; 4 use crate::runtime::task; 5 6 mod pop; 7 pub(crate) use pop::Pop; 8 9 mod shared; 10 pub(crate) use shared::Shared; 11 12 mod synced; 13 pub(crate) use synced::Synced; 14 15 cfg_rt_multi_thread! { 16 mod rt_multi_thread; 17 } 18 19 cfg_metrics! { 20 mod metrics; 21 } 22 23 /// Growable, MPMC queue used to inject new tasks into the scheduler and as an 24 /// overflow queue when the local, fixed-size, array queue overflows. 25 pub(crate) struct Inject<T: 'static> { 26 shared: Shared<T>, 27 synced: Mutex<Synced>, 28 } 29 30 impl<T: 'static> Inject<T> { new() -> Inject<T>31 pub(crate) fn new() -> Inject<T> { 32 let (shared, synced) = Shared::new(); 33 34 Inject { 35 shared, 36 synced: Mutex::new(synced), 37 } 38 } 39 40 // Kind of annoying to have to include the cfg here 41 #[cfg(tokio_taskdump)] is_closed(&self) -> bool42 pub(crate) fn is_closed(&self) -> bool { 43 let synced = self.synced.lock(); 44 self.shared.is_closed(&synced) 45 } 46 47 /// Closes the injection queue, returns `true` if the queue is open when the 48 /// transition is made. close(&self) -> bool49 pub(crate) fn close(&self) -> bool { 50 let mut synced = self.synced.lock(); 51 self.shared.close(&mut synced) 52 } 53 54 /// Pushes a value into the queue. 55 /// 56 /// This does nothing if the queue is closed. push(&self, task: task::Notified<T>)57 pub(crate) fn push(&self, task: task::Notified<T>) { 58 let mut synced = self.synced.lock(); 59 // safety: passing correct `Synced` 60 unsafe { self.shared.push(&mut synced, task) } 61 } 62 pop(&self) -> Option<task::Notified<T>>63 pub(crate) fn pop(&self) -> Option<task::Notified<T>> { 64 if self.shared.is_empty() { 65 return None; 66 } 67 68 let mut synced = self.synced.lock(); 69 // safety: passing correct `Synced` 70 unsafe { self.shared.pop(&mut synced) } 71 } 72 } 73