• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use super::{Pop, Synced};
2 
3 use crate::loom::sync::atomic::AtomicUsize;
4 use crate::runtime::task;
5 
6 use std::marker::PhantomData;
7 use std::sync::atomic::Ordering::{Acquire, Release};
8 
9 pub(crate) struct Shared<T: 'static> {
10     /// Number of pending tasks in the queue. This helps prevent unnecessary
11     /// locking in the hot path.
12     pub(super) len: AtomicUsize,
13 
14     _p: PhantomData<T>,
15 }
16 
17 unsafe impl<T> Send for Shared<T> {}
18 unsafe impl<T> Sync for Shared<T> {}
19 
20 impl<T: 'static> Shared<T> {
new() -> (Shared<T>, Synced)21     pub(crate) fn new() -> (Shared<T>, Synced) {
22         let inject = Shared {
23             len: AtomicUsize::new(0),
24             _p: PhantomData,
25         };
26 
27         let synced = Synced {
28             is_closed: false,
29             head: None,
30             tail: None,
31         };
32 
33         (inject, synced)
34     }
35 
is_empty(&self) -> bool36     pub(crate) fn is_empty(&self) -> bool {
37         self.len() == 0
38     }
39 
40     // Kind of annoying to have to include the cfg here
41     #[cfg(any(
42         tokio_taskdump,
43         all(feature = "rt-multi-thread", not(target_os = "wasi"))
44     ))]
is_closed(&self, synced: &Synced) -> bool45     pub(crate) fn is_closed(&self, synced: &Synced) -> bool {
46         synced.is_closed
47     }
48 
49     /// Closes the injection queue, returns `true` if the queue is open when the
50     /// transition is made.
close(&self, synced: &mut Synced) -> bool51     pub(crate) fn close(&self, synced: &mut Synced) -> bool {
52         if synced.is_closed {
53             return false;
54         }
55 
56         synced.is_closed = true;
57         true
58     }
59 
len(&self) -> usize60     pub(crate) fn len(&self) -> usize {
61         self.len.load(Acquire)
62     }
63 
64     /// Pushes a value into the queue.
65     ///
66     /// This does nothing if the queue is closed.
67     ///
68     /// # Safety
69     ///
70     /// Must be called with the same `Synced` instance returned by `Inject::new`
push(&self, synced: &mut Synced, task: task::Notified<T>)71     pub(crate) unsafe fn push(&self, synced: &mut Synced, task: task::Notified<T>) {
72         if synced.is_closed {
73             return;
74         }
75 
76         // safety: only mutated with the lock held
77         let len = self.len.unsync_load();
78         let task = task.into_raw();
79 
80         // The next pointer should already be null
81         debug_assert!(unsafe { task.get_queue_next().is_none() });
82 
83         if let Some(tail) = synced.tail {
84             // safety: Holding the Notified for a task guarantees exclusive
85             // access to the `queue_next` field.
86             unsafe { tail.set_queue_next(Some(task)) };
87         } else {
88             synced.head = Some(task);
89         }
90 
91         synced.tail = Some(task);
92         self.len.store(len + 1, Release);
93     }
94 
95     /// Pop a value from the queue.
96     ///
97     /// # Safety
98     ///
99     /// Must be called with the same `Synced` instance returned by `Inject::new`
pop(&self, synced: &mut Synced) -> Option<task::Notified<T>>100     pub(crate) unsafe fn pop(&self, synced: &mut Synced) -> Option<task::Notified<T>> {
101         self.pop_n(synced, 1).next()
102     }
103 
104     /// Pop `n` values from the queue
105     ///
106     /// # Safety
107     ///
108     /// Must be called with the same `Synced` instance returned by `Inject::new`
pop_n<'a>(&'a self, synced: &'a mut Synced, n: usize) -> Pop<'a, T>109     pub(crate) unsafe fn pop_n<'a>(&'a self, synced: &'a mut Synced, n: usize) -> Pop<'a, T> {
110         use std::cmp;
111 
112         debug_assert!(n > 0);
113 
114         // safety: All updates to the len atomic are guarded by the mutex. As
115         // such, a non-atomic load followed by a store is safe.
116         let len = self.len.unsync_load();
117         let n = cmp::min(n, len);
118 
119         // Decrement the count.
120         self.len.store(len - n, Release);
121 
122         Pop::new(n, synced)
123     }
124 }
125