• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Inject queue used to send wakeups to a work-stealing scheduler
2 
3 use crate::loom::sync::Mutex;
4 use crate::runtime::task;
5 
6 mod pop;
7 pub(crate) use pop::Pop;
8 
9 mod shared;
10 pub(crate) use shared::Shared;
11 
12 mod synced;
13 pub(crate) use synced::Synced;
14 
15 cfg_rt_multi_thread! {
16     mod rt_multi_thread;
17 }
18 
19 mod metrics;
20 
21 /// Growable, MPMC queue used to inject new tasks into the scheduler and as an
22 /// overflow queue when the local, fixed-size, array queue overflows.
23 pub(crate) struct Inject<T: 'static> {
24     shared: Shared<T>,
25     synced: Mutex<Synced>,
26 }
27 
28 impl<T: 'static> Inject<T> {
new() -> Inject<T>29     pub(crate) fn new() -> Inject<T> {
30         let (shared, synced) = Shared::new();
31 
32         Inject {
33             shared,
34             synced: Mutex::new(synced),
35         }
36     }
37 
38     // Kind of annoying to have to include the cfg here
39     #[cfg(tokio_taskdump)]
is_closed(&self) -> bool40     pub(crate) fn is_closed(&self) -> bool {
41         let synced = self.synced.lock();
42         self.shared.is_closed(&synced)
43     }
44 
45     /// Closes the injection queue, returns `true` if the queue is open when the
46     /// transition is made.
close(&self) -> bool47     pub(crate) fn close(&self) -> bool {
48         let mut synced = self.synced.lock();
49         self.shared.close(&mut synced)
50     }
51 
52     /// Pushes a value into the queue.
53     ///
54     /// This does nothing if the queue is closed.
push(&self, task: task::Notified<T>)55     pub(crate) fn push(&self, task: task::Notified<T>) {
56         let mut synced = self.synced.lock();
57         // safety: passing correct `Synced`
58         unsafe { self.shared.push(&mut synced, task) }
59     }
60 
pop(&self) -> Option<task::Notified<T>>61     pub(crate) fn pop(&self) -> Option<task::Notified<T>> {
62         if self.shared.is_empty() {
63             return None;
64         }
65 
66         let mut synced = self.synced.lock();
67         // safety: passing correct `Synced`
68         unsafe { self.shared.pop(&mut synced) }
69     }
70 }
71