1 //! Inject queue used to send wakeups to a work-stealing scheduler 2 3 use crate::loom::sync::Mutex; 4 use crate::runtime::task; 5 6 mod pop; 7 pub(crate) use pop::Pop; 8 9 mod shared; 10 pub(crate) use shared::Shared; 11 12 mod synced; 13 pub(crate) use synced::Synced; 14 15 cfg_rt_multi_thread! { 16 mod rt_multi_thread; 17 } 18 19 mod metrics; 20 21 /// Growable, MPMC queue used to inject new tasks into the scheduler and as an 22 /// overflow queue when the local, fixed-size, array queue overflows. 23 pub(crate) struct Inject<T: 'static> { 24 shared: Shared<T>, 25 synced: Mutex<Synced>, 26 } 27 28 impl<T: 'static> Inject<T> { new() -> Inject<T>29 pub(crate) fn new() -> Inject<T> { 30 let (shared, synced) = Shared::new(); 31 32 Inject { 33 shared, 34 synced: Mutex::new(synced), 35 } 36 } 37 38 // Kind of annoying to have to include the cfg here 39 #[cfg(tokio_taskdump)] is_closed(&self) -> bool40 pub(crate) fn is_closed(&self) -> bool { 41 let synced = self.synced.lock(); 42 self.shared.is_closed(&synced) 43 } 44 45 /// Closes the injection queue, returns `true` if the queue is open when the 46 /// transition is made. close(&self) -> bool47 pub(crate) fn close(&self) -> bool { 48 let mut synced = self.synced.lock(); 49 self.shared.close(&mut synced) 50 } 51 52 /// Pushes a value into the queue. 53 /// 54 /// This does nothing if the queue is closed. push(&self, task: task::Notified<T>)55 pub(crate) fn push(&self, task: task::Notified<T>) { 56 let mut synced = self.synced.lock(); 57 // safety: passing correct `Synced` 58 unsafe { self.shared.push(&mut synced, task) } 59 } 60 pop(&self) -> Option<task::Notified<T>>61 pub(crate) fn pop(&self) -> Option<task::Notified<T>> { 62 if self.shared.is_empty() { 63 return None; 64 } 65 66 let mut synced = self.synced.lock(); 67 // safety: passing correct `Synced` 68 unsafe { self.shared.pop(&mut synced) } 69 } 70 } 71