1 use super::{Pop, Synced}; 2 3 use crate::loom::sync::atomic::AtomicUsize; 4 use crate::runtime::task; 5 6 use std::marker::PhantomData; 7 use std::sync::atomic::Ordering::{Acquire, Release}; 8 9 pub(crate) struct Shared<T: 'static> { 10 /// Number of pending tasks in the queue. This helps prevent unnecessary 11 /// locking in the hot path. 12 pub(super) len: AtomicUsize, 13 14 _p: PhantomData<T>, 15 } 16 17 unsafe impl<T> Send for Shared<T> {} 18 unsafe impl<T> Sync for Shared<T> {} 19 20 impl<T: 'static> Shared<T> { new() -> (Shared<T>, Synced)21 pub(crate) fn new() -> (Shared<T>, Synced) { 22 let inject = Shared { 23 len: AtomicUsize::new(0), 24 _p: PhantomData, 25 }; 26 27 let synced = Synced { 28 is_closed: false, 29 head: None, 30 tail: None, 31 }; 32 33 (inject, synced) 34 } 35 is_empty(&self) -> bool36 pub(crate) fn is_empty(&self) -> bool { 37 self.len() == 0 38 } 39 40 // Kind of annoying to have to include the cfg here 41 #[cfg(any( 42 tokio_taskdump, 43 all(feature = "rt-multi-thread", not(target_os = "wasi")) 44 ))] is_closed(&self, synced: &Synced) -> bool45 pub(crate) fn is_closed(&self, synced: &Synced) -> bool { 46 synced.is_closed 47 } 48 49 /// Closes the injection queue, returns `true` if the queue is open when the 50 /// transition is made. close(&self, synced: &mut Synced) -> bool51 pub(crate) fn close(&self, synced: &mut Synced) -> bool { 52 if synced.is_closed { 53 return false; 54 } 55 56 synced.is_closed = true; 57 true 58 } 59 len(&self) -> usize60 pub(crate) fn len(&self) -> usize { 61 self.len.load(Acquire) 62 } 63 64 /// Pushes a value into the queue. 65 /// 66 /// This does nothing if the queue is closed. 67 /// 68 /// # Safety 69 /// 70 /// Must be called with the same `Synced` instance returned by `Inject::new` push(&self, synced: &mut Synced, task: task::Notified<T>)71 pub(crate) unsafe fn push(&self, synced: &mut Synced, task: task::Notified<T>) { 72 if synced.is_closed { 73 return; 74 } 75 76 // safety: only mutated with the lock held 77 let len = self.len.unsync_load(); 78 let task = task.into_raw(); 79 80 // The next pointer should already be null 81 debug_assert!(unsafe { task.get_queue_next().is_none() }); 82 83 if let Some(tail) = synced.tail { 84 // safety: Holding the Notified for a task guarantees exclusive 85 // access to the `queue_next` field. 86 unsafe { tail.set_queue_next(Some(task)) }; 87 } else { 88 synced.head = Some(task); 89 } 90 91 synced.tail = Some(task); 92 self.len.store(len + 1, Release); 93 } 94 95 /// Pop a value from the queue. 96 /// 97 /// # Safety 98 /// 99 /// Must be called with the same `Synced` instance returned by `Inject::new` pop(&self, synced: &mut Synced) -> Option<task::Notified<T>>100 pub(crate) unsafe fn pop(&self, synced: &mut Synced) -> Option<task::Notified<T>> { 101 self.pop_n(synced, 1).next() 102 } 103 104 /// Pop `n` values from the queue 105 /// 106 /// # Safety 107 /// 108 /// Must be called with the same `Synced` instance returned by `Inject::new` pop_n<'a>(&'a self, synced: &'a mut Synced, n: usize) -> Pop<'a, T>109 pub(crate) unsafe fn pop_n<'a>(&'a self, synced: &'a mut Synced, n: usize) -> Pop<'a, T> { 110 use std::cmp; 111 112 debug_assert!(n > 0); 113 114 // safety: All updates to the len atomic are guarded by the mutex. As 115 // such, a non-atomic load followed by a store is safe. 116 let len = self.len.unsync_load(); 117 let n = cmp::min(n, len); 118 119 // Decrement the count. 120 self.len.store(len - n, Release); 121 122 Pop::new(n, synced) 123 } 124 } 125