• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Inject queue used to send wakeups to a work-stealing scheduler
2 
3 use crate::loom::sync::Mutex;
4 use crate::runtime::task;
5 
6 mod pop;
7 pub(crate) use pop::Pop;
8 
9 mod shared;
10 pub(crate) use shared::Shared;
11 
12 mod synced;
13 pub(crate) use synced::Synced;
14 
15 cfg_rt_multi_thread! {
16     mod rt_multi_thread;
17 }
18 
19 cfg_metrics! {
20     mod metrics;
21 }
22 
23 /// Growable, MPMC queue used to inject new tasks into the scheduler and as an
24 /// overflow queue when the local, fixed-size, array queue overflows.
25 pub(crate) struct Inject<T: 'static> {
26     shared: Shared<T>,
27     synced: Mutex<Synced>,
28 }
29 
30 impl<T: 'static> Inject<T> {
new() -> Inject<T>31     pub(crate) fn new() -> Inject<T> {
32         let (shared, synced) = Shared::new();
33 
34         Inject {
35             shared,
36             synced: Mutex::new(synced),
37         }
38     }
39 
40     // Kind of annoying to have to include the cfg here
41     #[cfg(tokio_taskdump)]
is_closed(&self) -> bool42     pub(crate) fn is_closed(&self) -> bool {
43         let synced = self.synced.lock();
44         self.shared.is_closed(&synced)
45     }
46 
47     /// Closes the injection queue, returns `true` if the queue is open when the
48     /// transition is made.
close(&self) -> bool49     pub(crate) fn close(&self) -> bool {
50         let mut synced = self.synced.lock();
51         self.shared.close(&mut synced)
52     }
53 
54     /// Pushes a value into the queue.
55     ///
56     /// This does nothing if the queue is closed.
push(&self, task: task::Notified<T>)57     pub(crate) fn push(&self, task: task::Notified<T>) {
58         let mut synced = self.synced.lock();
59         // safety: passing correct `Synced`
60         unsafe { self.shared.push(&mut synced, task) }
61     }
62 
pop(&self) -> Option<task::Notified<T>>63     pub(crate) fn pop(&self) -> Option<task::Notified<T>> {
64         if self.shared.is_empty() {
65             return None;
66         }
67 
68         let mut synced = self.synced.lock();
69         // safety: passing correct `Synced`
70         unsafe { self.shared.pop(&mut synced) }
71     }
72 }
73