• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Allow `unreachable_pub` warnings when sync is not enabled
2 // due to the usage of `Notify` within the `rt` feature set.
3 // When this module is compiled with `sync` enabled we will warn on
4 // this lint. When `rt` is enabled we use `pub(crate)` which
5 // triggers this warning but it is safe to ignore in this case.
6 #![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
7 
8 use crate::loom::cell::UnsafeCell;
9 use crate::loom::sync::atomic::AtomicUsize;
10 use crate::loom::sync::Mutex;
11 use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
12 use crate::util::WakeList;
13 
14 use std::future::Future;
15 use std::marker::PhantomPinned;
16 use std::panic::{RefUnwindSafe, UnwindSafe};
17 use std::pin::Pin;
18 use std::ptr::NonNull;
19 use std::sync::atomic::Ordering::{self, Acquire, Relaxed, Release, SeqCst};
20 use std::task::{Context, Poll, Waker};
21 
22 type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
23 type GuardedWaitList = GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
24 
25 /// Notifies a single task to wake up.
26 ///
27 /// `Notify` provides a basic mechanism to notify a single task of an event.
28 /// `Notify` itself does not carry any data. Instead, it is to be used to signal
29 /// another task to perform an operation.
30 ///
31 /// A `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. The
32 /// [`notified().await`] method waits for a permit to become available, and
33 /// [`notify_one()`] sets a permit **if there currently are no available
34 /// permits**.
35 ///
36 /// The synchronization details of `Notify` are similar to
37 /// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`]
38 /// value contains a single permit. [`notified().await`] waits for the permit to
39 /// be made available, consumes the permit, and resumes.  [`notify_one()`] sets
40 /// the permit, waking a pending task if there is one.
41 ///
42 /// If `notify_one()` is called **before** `notified().await`, then the next
43 /// call to `notified().await` will complete immediately, consuming the permit.
44 /// Any subsequent calls to `notified().await` will wait for a new permit.
45 ///
46 /// If `notify_one()` is called **multiple** times before `notified().await`,
47 /// only a **single** permit is stored. The next call to `notified().await` will
48 /// complete immediately, but the one after will wait for a new permit.
49 ///
50 /// # Examples
51 ///
52 /// Basic usage.
53 ///
54 /// ```
55 /// use tokio::sync::Notify;
56 /// use std::sync::Arc;
57 ///
58 /// #[tokio::main]
59 /// async fn main() {
60 ///     let notify = Arc::new(Notify::new());
61 ///     let notify2 = notify.clone();
62 ///
63 ///     let handle = tokio::spawn(async move {
64 ///         notify2.notified().await;
65 ///         println!("received notification");
66 ///     });
67 ///
68 ///     println!("sending notification");
69 ///     notify.notify_one();
70 ///
71 ///     // Wait for task to receive notification.
72 ///     handle.await.unwrap();
73 /// }
74 /// ```
75 ///
76 /// Unbound multi-producer single-consumer (mpsc) channel.
77 ///
78 /// No wakeups can be lost when using this channel because the call to
79 /// `notify_one()` will store a permit in the `Notify`, which the following call
80 /// to `notified()` will consume.
81 ///
82 /// ```
83 /// use tokio::sync::Notify;
84 ///
85 /// use std::collections::VecDeque;
86 /// use std::sync::Mutex;
87 ///
88 /// struct Channel<T> {
89 ///     values: Mutex<VecDeque<T>>,
90 ///     notify: Notify,
91 /// }
92 ///
93 /// impl<T> Channel<T> {
94 ///     pub fn send(&self, value: T) {
95 ///         self.values.lock().unwrap()
96 ///             .push_back(value);
97 ///
98 ///         // Notify the consumer a value is available
99 ///         self.notify.notify_one();
100 ///     }
101 ///
102 ///     // This is a single-consumer channel, so several concurrent calls to
103 ///     // `recv` are not allowed.
104 ///     pub async fn recv(&self) -> T {
105 ///         loop {
106 ///             // Drain values
107 ///             if let Some(value) = self.values.lock().unwrap().pop_front() {
108 ///                 return value;
109 ///             }
110 ///
111 ///             // Wait for values to be available
112 ///             self.notify.notified().await;
113 ///         }
114 ///     }
115 /// }
116 /// ```
117 ///
118 /// Unbound multi-producer multi-consumer (mpmc) channel.
119 ///
120 /// The call to [`enable`] is important because otherwise if you have two
121 /// calls to `recv` and two calls to `send` in parallel, the following could
122 /// happen:
123 ///
124 ///  1. Both calls to `try_recv` return `None`.
125 ///  2. Both new elements are added to the vector.
126 ///  3. The `notify_one` method is called twice, adding only a single
127 ///     permit to the `Notify`.
128 ///  4. Both calls to `recv` reach the `Notified` future. One of them
129 ///     consumes the permit, and the other sleeps forever.
130 ///
131 /// By adding the `Notified` futures to the list by calling `enable` before
132 /// `try_recv`, the `notify_one` calls in step three would remove the
133 /// futures from the list and mark them notified instead of adding a permit
134 /// to the `Notify`. This ensures that both futures are woken.
135 ///
136 /// Notice that this failure can only happen if there are two concurrent calls
137 /// to `recv`. This is why the mpsc example above does not require a call to
138 /// `enable`.
139 ///
140 /// ```
141 /// use tokio::sync::Notify;
142 ///
143 /// use std::collections::VecDeque;
144 /// use std::sync::Mutex;
145 ///
146 /// struct Channel<T> {
147 ///     messages: Mutex<VecDeque<T>>,
148 ///     notify_on_sent: Notify,
149 /// }
150 ///
151 /// impl<T> Channel<T> {
152 ///     pub fn send(&self, msg: T) {
153 ///         let mut locked_queue = self.messages.lock().unwrap();
154 ///         locked_queue.push_back(msg);
155 ///         drop(locked_queue);
156 ///
157 ///         // Send a notification to one of the calls currently
158 ///         // waiting in a call to `recv`.
159 ///         self.notify_on_sent.notify_one();
160 ///     }
161 ///
162 ///     pub fn try_recv(&self) -> Option<T> {
163 ///         let mut locked_queue = self.messages.lock().unwrap();
164 ///         locked_queue.pop_front()
165 ///     }
166 ///
167 ///     pub async fn recv(&self) -> T {
168 ///         let future = self.notify_on_sent.notified();
169 ///         tokio::pin!(future);
170 ///
171 ///         loop {
172 ///             // Make sure that no wakeup is lost if we get
173 ///             // `None` from `try_recv`.
174 ///             future.as_mut().enable();
175 ///
176 ///             if let Some(msg) = self.try_recv() {
177 ///                 return msg;
178 ///             }
179 ///
180 ///             // Wait for a call to `notify_one`.
181 ///             //
182 ///             // This uses `.as_mut()` to avoid consuming the future,
183 ///             // which lets us call `Pin::set` below.
184 ///             future.as_mut().await;
185 ///
186 ///             // Reset the future in case another call to
187 ///             // `try_recv` got the message before us.
188 ///             future.set(self.notify_on_sent.notified());
189 ///         }
190 ///     }
191 /// }
192 /// ```
193 ///
194 /// [park]: std::thread::park
195 /// [unpark]: std::thread::Thread::unpark
196 /// [`notified().await`]: Notify::notified()
197 /// [`notify_one()`]: Notify::notify_one()
198 /// [`enable`]: Notified::enable()
199 /// [`Semaphore`]: crate::sync::Semaphore
200 #[derive(Debug)]
201 pub struct Notify {
202     // `state` uses 2 bits to store one of `EMPTY`,
203     // `WAITING` or `NOTIFIED`. The rest of the bits
204     // are used to store the number of times `notify_waiters`
205     // was called.
206     //
207     // Throughout the code there are two assumptions:
208     // - state can be transitioned *from* `WAITING` only if
209     //   `waiters` lock is held
210     // - number of times `notify_waiters` was called can
211     //   be modified only if `waiters` lock is held
212     state: AtomicUsize,
213     waiters: Mutex<WaitList>,
214 }
215 
216 #[derive(Debug)]
217 struct Waiter {
218     /// Intrusive linked-list pointers.
219     pointers: linked_list::Pointers<Waiter>,
220 
221     /// Waiting task's waker. Depending on the value of `notification`,
222     /// this field is either protected by the `waiters` lock in
223     /// `Notify`, or it is exclusively owned by the enclosing `Waiter`.
224     waker: UnsafeCell<Option<Waker>>,
225 
226     /// Notification for this waiter.
227     /// * if it's `None`, then `waker` is protected by the `waiters` lock.
228     /// * if it's `Some`, then `waker` is exclusively owned by the
229     ///   enclosing `Waiter` and can be accessed without locking.
230     notification: AtomicNotification,
231 
232     /// Should not be `Unpin`.
233     _p: PhantomPinned,
234 }
235 
236 impl Waiter {
new() -> Waiter237     fn new() -> Waiter {
238         Waiter {
239             pointers: linked_list::Pointers::new(),
240             waker: UnsafeCell::new(None),
241             notification: AtomicNotification::none(),
242             _p: PhantomPinned,
243         }
244     }
245 }
246 
247 generate_addr_of_methods! {
248     impl<> Waiter {
249         unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
250             &self.pointers
251         }
252     }
253 }
254 
255 // No notification.
256 const NOTIFICATION_NONE: usize = 0;
257 
258 // Notification type used by `notify_one`.
259 const NOTIFICATION_ONE: usize = 1;
260 
261 // Notification type used by `notify_waiters`.
262 const NOTIFICATION_ALL: usize = 2;
263 
264 /// Notification for a `Waiter`.
265 /// This struct is equivalent to `Option<Notification>`, but uses
266 /// `AtomicUsize` inside for atomic operations.
267 #[derive(Debug)]
268 struct AtomicNotification(AtomicUsize);
269 
270 impl AtomicNotification {
none() -> Self271     fn none() -> Self {
272         AtomicNotification(AtomicUsize::new(NOTIFICATION_NONE))
273     }
274 
275     /// Store-release a notification.
276     /// This method should be called exactly once.
store_release(&self, notification: Notification)277     fn store_release(&self, notification: Notification) {
278         self.0.store(notification as usize, Release);
279     }
280 
load(&self, ordering: Ordering) -> Option<Notification>281     fn load(&self, ordering: Ordering) -> Option<Notification> {
282         match self.0.load(ordering) {
283             NOTIFICATION_NONE => None,
284             NOTIFICATION_ONE => Some(Notification::One),
285             NOTIFICATION_ALL => Some(Notification::All),
286             _ => unreachable!(),
287         }
288     }
289 
290     /// Clears the notification.
291     /// This method is used by a `Notified` future to consume the
292     /// notification. It uses relaxed ordering and should be only
293     /// used once the atomic notification is no longer shared.
clear(&self)294     fn clear(&self) {
295         self.0.store(NOTIFICATION_NONE, Relaxed);
296     }
297 }
298 
299 #[derive(Debug, PartialEq, Eq)]
300 #[repr(usize)]
301 enum Notification {
302     One = NOTIFICATION_ONE,
303     All = NOTIFICATION_ALL,
304 }
305 
306 /// List used in `Notify::notify_waiters`. It wraps a guarded linked list
307 /// and gates the access to it on `notify.waiters` mutex. It also empties
308 /// the list on drop.
309 struct NotifyWaitersList<'a> {
310     list: GuardedWaitList,
311     is_empty: bool,
312     notify: &'a Notify,
313 }
314 
315 impl<'a> NotifyWaitersList<'a> {
new( unguarded_list: WaitList, guard: Pin<&'a Waiter>, notify: &'a Notify, ) -> NotifyWaitersList<'a>316     fn new(
317         unguarded_list: WaitList,
318         guard: Pin<&'a Waiter>,
319         notify: &'a Notify,
320     ) -> NotifyWaitersList<'a> {
321         let guard_ptr = NonNull::from(guard.get_ref());
322         let list = unguarded_list.into_guarded(guard_ptr);
323         NotifyWaitersList {
324             list,
325             is_empty: false,
326             notify,
327         }
328     }
329 
330     /// Removes the last element from the guarded list. Modifying this list
331     /// requires an exclusive access to the main list in `Notify`.
pop_back_locked(&mut self, _waiters: &mut WaitList) -> Option<NonNull<Waiter>>332     fn pop_back_locked(&mut self, _waiters: &mut WaitList) -> Option<NonNull<Waiter>> {
333         let result = self.list.pop_back();
334         if result.is_none() {
335             // Save information about emptiness to avoid waiting for lock
336             // in the destructor.
337             self.is_empty = true;
338         }
339         result
340     }
341 }
342 
343 impl Drop for NotifyWaitersList<'_> {
drop(&mut self)344     fn drop(&mut self) {
345         // If the list is not empty, we unlink all waiters from it.
346         // We do not wake the waiters to avoid double panics.
347         if !self.is_empty {
348             let _lock_guard = self.notify.waiters.lock();
349             while let Some(waiter) = self.list.pop_back() {
350                 // Safety: we never make mutable references to waiters.
351                 let waiter = unsafe { waiter.as_ref() };
352                 waiter.notification.store_release(Notification::All);
353             }
354         }
355     }
356 }
357 
358 /// Future returned from [`Notify::notified()`].
359 ///
360 /// This future is fused, so once it has completed, any future calls to poll
361 /// will immediately return `Poll::Ready`.
362 #[derive(Debug)]
363 pub struct Notified<'a> {
364     /// The `Notify` being received on.
365     notify: &'a Notify,
366 
367     /// The current state of the receiving process.
368     state: State,
369 
370     /// Number of calls to `notify_waiters` at the time of creation.
371     notify_waiters_calls: usize,
372 
373     /// Entry in the waiter `LinkedList`.
374     waiter: Waiter,
375 }
376 
377 unsafe impl<'a> Send for Notified<'a> {}
378 unsafe impl<'a> Sync for Notified<'a> {}
379 
380 #[derive(Debug)]
381 enum State {
382     Init,
383     Waiting,
384     Done,
385 }
386 
387 const NOTIFY_WAITERS_SHIFT: usize = 2;
388 const STATE_MASK: usize = (1 << NOTIFY_WAITERS_SHIFT) - 1;
389 const NOTIFY_WAITERS_CALLS_MASK: usize = !STATE_MASK;
390 
391 /// Initial "idle" state.
392 const EMPTY: usize = 0;
393 
394 /// One or more threads are currently waiting to be notified.
395 const WAITING: usize = 1;
396 
397 /// Pending notification.
398 const NOTIFIED: usize = 2;
399 
set_state(data: usize, state: usize) -> usize400 fn set_state(data: usize, state: usize) -> usize {
401     (data & NOTIFY_WAITERS_CALLS_MASK) | (state & STATE_MASK)
402 }
403 
get_state(data: usize) -> usize404 fn get_state(data: usize) -> usize {
405     data & STATE_MASK
406 }
407 
get_num_notify_waiters_calls(data: usize) -> usize408 fn get_num_notify_waiters_calls(data: usize) -> usize {
409     (data & NOTIFY_WAITERS_CALLS_MASK) >> NOTIFY_WAITERS_SHIFT
410 }
411 
inc_num_notify_waiters_calls(data: usize) -> usize412 fn inc_num_notify_waiters_calls(data: usize) -> usize {
413     data + (1 << NOTIFY_WAITERS_SHIFT)
414 }
415 
atomic_inc_num_notify_waiters_calls(data: &AtomicUsize)416 fn atomic_inc_num_notify_waiters_calls(data: &AtomicUsize) {
417     data.fetch_add(1 << NOTIFY_WAITERS_SHIFT, SeqCst);
418 }
419 
420 impl Notify {
421     /// Create a new `Notify`, initialized without a permit.
422     ///
423     /// # Examples
424     ///
425     /// ```
426     /// use tokio::sync::Notify;
427     ///
428     /// let notify = Notify::new();
429     /// ```
new() -> Notify430     pub fn new() -> Notify {
431         Notify {
432             state: AtomicUsize::new(0),
433             waiters: Mutex::new(LinkedList::new()),
434         }
435     }
436 
437     /// Create a new `Notify`, initialized without a permit.
438     ///
439     /// # Examples
440     ///
441     /// ```
442     /// use tokio::sync::Notify;
443     ///
444     /// static NOTIFY: Notify = Notify::const_new();
445     /// ```
446     #[cfg(not(all(loom, test)))]
const_new() -> Notify447     pub const fn const_new() -> Notify {
448         Notify {
449             state: AtomicUsize::new(0),
450             waiters: Mutex::const_new(LinkedList::new()),
451         }
452     }
453 
454     /// Wait for a notification.
455     ///
456     /// Equivalent to:
457     ///
458     /// ```ignore
459     /// async fn notified(&self);
460     /// ```
461     ///
462     /// Each `Notify` value holds a single permit. If a permit is available from
463     /// an earlier call to [`notify_one()`], then `notified().await` will complete
464     /// immediately, consuming that permit. Otherwise, `notified().await` waits
465     /// for a permit to be made available by the next call to `notify_one()`.
466     ///
467     /// The `Notified` future is not guaranteed to receive wakeups from calls to
468     /// `notify_one()` if it has not yet been polled. See the documentation for
469     /// [`Notified::enable()`] for more details.
470     ///
471     /// The `Notified` future is guaranteed to receive wakeups from
472     /// `notify_waiters()` as soon as it has been created, even if it has not
473     /// yet been polled.
474     ///
475     /// [`notify_one()`]: Notify::notify_one
476     /// [`Notified::enable()`]: Notified::enable
477     ///
478     /// # Cancel safety
479     ///
480     /// This method uses a queue to fairly distribute notifications in the order
481     /// they were requested. Cancelling a call to `notified` makes you lose your
482     /// place in the queue.
483     ///
484     /// # Examples
485     ///
486     /// ```
487     /// use tokio::sync::Notify;
488     /// use std::sync::Arc;
489     ///
490     /// #[tokio::main]
491     /// async fn main() {
492     ///     let notify = Arc::new(Notify::new());
493     ///     let notify2 = notify.clone();
494     ///
495     ///     tokio::spawn(async move {
496     ///         notify2.notified().await;
497     ///         println!("received notification");
498     ///     });
499     ///
500     ///     println!("sending notification");
501     ///     notify.notify_one();
502     /// }
503     /// ```
notified(&self) -> Notified<'_>504     pub fn notified(&self) -> Notified<'_> {
505         // we load the number of times notify_waiters
506         // was called and store that in the future.
507         let state = self.state.load(SeqCst);
508         Notified {
509             notify: self,
510             state: State::Init,
511             notify_waiters_calls: get_num_notify_waiters_calls(state),
512             waiter: Waiter::new(),
513         }
514     }
515 
516     /// Notifies a waiting task.
517     ///
518     /// If a task is currently waiting, that task is notified. Otherwise, a
519     /// permit is stored in this `Notify` value and the **next** call to
520     /// [`notified().await`] will complete immediately consuming the permit made
521     /// available by this call to `notify_one()`.
522     ///
523     /// At most one permit may be stored by `Notify`. Many sequential calls to
524     /// `notify_one` will result in a single permit being stored. The next call to
525     /// `notified().await` will complete immediately, but the one after that
526     /// will wait.
527     ///
528     /// [`notified().await`]: Notify::notified()
529     ///
530     /// # Examples
531     ///
532     /// ```
533     /// use tokio::sync::Notify;
534     /// use std::sync::Arc;
535     ///
536     /// #[tokio::main]
537     /// async fn main() {
538     ///     let notify = Arc::new(Notify::new());
539     ///     let notify2 = notify.clone();
540     ///
541     ///     tokio::spawn(async move {
542     ///         notify2.notified().await;
543     ///         println!("received notification");
544     ///     });
545     ///
546     ///     println!("sending notification");
547     ///     notify.notify_one();
548     /// }
549     /// ```
550     // Alias for old name in 0.x
551     #[cfg_attr(docsrs, doc(alias = "notify"))]
notify_one(&self)552     pub fn notify_one(&self) {
553         // Load the current state
554         let mut curr = self.state.load(SeqCst);
555 
556         // If the state is `EMPTY`, transition to `NOTIFIED` and return.
557         while let EMPTY | NOTIFIED = get_state(curr) {
558             // The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A
559             // happens-before synchronization must happen between this atomic
560             // operation and a task calling `notified().await`.
561             let new = set_state(curr, NOTIFIED);
562             let res = self.state.compare_exchange(curr, new, SeqCst, SeqCst);
563 
564             match res {
565                 // No waiters, no further work to do
566                 Ok(_) => return,
567                 Err(actual) => {
568                     curr = actual;
569                 }
570             }
571         }
572 
573         // There are waiters, the lock must be acquired to notify.
574         let mut waiters = self.waiters.lock();
575 
576         // The state must be reloaded while the lock is held. The state may only
577         // transition out of WAITING while the lock is held.
578         curr = self.state.load(SeqCst);
579 
580         if let Some(waker) = notify_locked(&mut waiters, &self.state, curr) {
581             drop(waiters);
582             waker.wake();
583         }
584     }
585 
586     /// Notifies all waiting tasks.
587     ///
588     /// If a task is currently waiting, that task is notified. Unlike with
589     /// `notify_one()`, no permit is stored to be used by the next call to
590     /// `notified().await`. The purpose of this method is to notify all
591     /// already registered waiters. Registering for notification is done by
592     /// acquiring an instance of the `Notified` future via calling `notified()`.
593     ///
594     /// # Examples
595     ///
596     /// ```
597     /// use tokio::sync::Notify;
598     /// use std::sync::Arc;
599     ///
600     /// #[tokio::main]
601     /// async fn main() {
602     ///     let notify = Arc::new(Notify::new());
603     ///     let notify2 = notify.clone();
604     ///
605     ///     let notified1 = notify.notified();
606     ///     let notified2 = notify.notified();
607     ///
608     ///     let handle = tokio::spawn(async move {
609     ///         println!("sending notifications");
610     ///         notify2.notify_waiters();
611     ///     });
612     ///
613     ///     notified1.await;
614     ///     notified2.await;
615     ///     println!("received notifications");
616     /// }
617     /// ```
notify_waiters(&self)618     pub fn notify_waiters(&self) {
619         let mut waiters = self.waiters.lock();
620 
621         // The state must be loaded while the lock is held. The state may only
622         // transition out of WAITING while the lock is held.
623         let curr = self.state.load(SeqCst);
624 
625         if matches!(get_state(curr), EMPTY | NOTIFIED) {
626             // There are no waiting tasks. All we need to do is increment the
627             // number of times this method was called.
628             atomic_inc_num_notify_waiters_calls(&self.state);
629             return;
630         }
631 
632         // Increment the number of times this method was called
633         // and transition to empty.
634         let new_state = set_state(inc_num_notify_waiters_calls(curr), EMPTY);
635         self.state.store(new_state, SeqCst);
636 
637         // It is critical for `GuardedLinkedList` safety that the guard node is
638         // pinned in memory and is not dropped until the guarded list is dropped.
639         let guard = Waiter::new();
640         pin!(guard);
641 
642         // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
643         // underneath to allow every waiter to safely remove itself from it.
644         //
645         // * This list will be still guarded by the `waiters` lock.
646         //   `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
647         // * This wrapper will empty the list on drop. It is critical for safety
648         //   that we will not leave any list entry with a pointer to the local
649         //   guard node after this function returns / panics.
650         let mut list = NotifyWaitersList::new(std::mem::take(&mut *waiters), guard.as_ref(), self);
651 
652         let mut wakers = WakeList::new();
653         'outer: loop {
654             while wakers.can_push() {
655                 match list.pop_back_locked(&mut waiters) {
656                     Some(waiter) => {
657                         // Safety: we never make mutable references to waiters.
658                         let waiter = unsafe { waiter.as_ref() };
659 
660                         // Safety: we hold the lock, so we can access the waker.
661                         if let Some(waker) =
662                             unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }
663                         {
664                             wakers.push(waker);
665                         }
666 
667                         // This waiter is unlinked and will not be shared ever again, release it.
668                         waiter.notification.store_release(Notification::All);
669                     }
670                     None => {
671                         break 'outer;
672                     }
673                 }
674             }
675 
676             // Release the lock before notifying.
677             drop(waiters);
678 
679             // One of the wakers may panic, but the remaining waiters will still
680             // be unlinked from the list in `NotifyWaitersList` destructor.
681             wakers.wake_all();
682 
683             // Acquire the lock again.
684             waiters = self.waiters.lock();
685         }
686 
687         // Release the lock before notifying
688         drop(waiters);
689 
690         wakers.wake_all();
691     }
692 }
693 
694 impl Default for Notify {
default() -> Notify695     fn default() -> Notify {
696         Notify::new()
697     }
698 }
699 
700 impl UnwindSafe for Notify {}
701 impl RefUnwindSafe for Notify {}
702 
notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Option<Waker>703 fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Option<Waker> {
704     loop {
705         match get_state(curr) {
706             EMPTY | NOTIFIED => {
707                 let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst);
708 
709                 match res {
710                     Ok(_) => return None,
711                     Err(actual) => {
712                         let actual_state = get_state(actual);
713                         assert!(actual_state == EMPTY || actual_state == NOTIFIED);
714                         state.store(set_state(actual, NOTIFIED), SeqCst);
715                         return None;
716                     }
717                 }
718             }
719             WAITING => {
720                 // At this point, it is guaranteed that the state will not
721                 // concurrently change as holding the lock is required to
722                 // transition **out** of `WAITING`.
723                 //
724                 // Get a pending waiter
725                 let waiter = waiters.pop_back().unwrap();
726 
727                 // Safety: we never make mutable references to waiters.
728                 let waiter = unsafe { waiter.as_ref() };
729 
730                 // Safety: we hold the lock, so we can access the waker.
731                 let waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
732 
733                 // This waiter is unlinked and will not be shared ever again, release it.
734                 waiter.notification.store_release(Notification::One);
735 
736                 if waiters.is_empty() {
737                     // As this the **final** waiter in the list, the state
738                     // must be transitioned to `EMPTY`. As transitioning
739                     // **from** `WAITING` requires the lock to be held, a
740                     // `store` is sufficient.
741                     state.store(set_state(curr, EMPTY), SeqCst);
742                 }
743 
744                 return waker;
745             }
746             _ => unreachable!(),
747         }
748     }
749 }
750 
751 // ===== impl Notified =====
752 
753 impl Notified<'_> {
754     /// Adds this future to the list of futures that are ready to receive
755     /// wakeups from calls to [`notify_one`].
756     ///
757     /// Polling the future also adds it to the list, so this method should only
758     /// be used if you want to add the future to the list before the first call
759     /// to `poll`. (In fact, this method is equivalent to calling `poll` except
760     /// that no `Waker` is registered.)
761     ///
762     /// This has no effect on notifications sent using [`notify_waiters`], which
763     /// are received as long as they happen after the creation of the `Notified`
764     /// regardless of whether `enable` or `poll` has been called.
765     ///
766     /// This method returns true if the `Notified` is ready. This happens in the
767     /// following situations:
768     ///
769     ///  1. The `notify_waiters` method was called between the creation of the
770     ///     `Notified` and the call to this method.
771     ///  2. This is the first call to `enable` or `poll` on this future, and the
772     ///     `Notify` was holding a permit from a previous call to `notify_one`.
773     ///     The call consumes the permit in that case.
774     ///  3. The future has previously been enabled or polled, and it has since
775     ///     then been marked ready by either consuming a permit from the
776     ///     `Notify`, or by a call to `notify_one` or `notify_waiters` that
777     ///     removed it from the list of futures ready to receive wakeups.
778     ///
779     /// If this method returns true, any future calls to poll on the same future
780     /// will immediately return `Poll::Ready`.
781     ///
782     /// # Examples
783     ///
784     /// Unbound multi-producer multi-consumer (mpmc) channel.
785     ///
786     /// The call to `enable` is important because otherwise if you have two
787     /// calls to `recv` and two calls to `send` in parallel, the following could
788     /// happen:
789     ///
790     ///  1. Both calls to `try_recv` return `None`.
791     ///  2. Both new elements are added to the vector.
792     ///  3. The `notify_one` method is called twice, adding only a single
793     ///     permit to the `Notify`.
794     ///  4. Both calls to `recv` reach the `Notified` future. One of them
795     ///     consumes the permit, and the other sleeps forever.
796     ///
797     /// By adding the `Notified` futures to the list by calling `enable` before
798     /// `try_recv`, the `notify_one` calls in step three would remove the
799     /// futures from the list and mark them notified instead of adding a permit
800     /// to the `Notify`. This ensures that both futures are woken.
801     ///
802     /// ```
803     /// use tokio::sync::Notify;
804     ///
805     /// use std::collections::VecDeque;
806     /// use std::sync::Mutex;
807     ///
808     /// struct Channel<T> {
809     ///     messages: Mutex<VecDeque<T>>,
810     ///     notify_on_sent: Notify,
811     /// }
812     ///
813     /// impl<T> Channel<T> {
814     ///     pub fn send(&self, msg: T) {
815     ///         let mut locked_queue = self.messages.lock().unwrap();
816     ///         locked_queue.push_back(msg);
817     ///         drop(locked_queue);
818     ///
819     ///         // Send a notification to one of the calls currently
820     ///         // waiting in a call to `recv`.
821     ///         self.notify_on_sent.notify_one();
822     ///     }
823     ///
824     ///     pub fn try_recv(&self) -> Option<T> {
825     ///         let mut locked_queue = self.messages.lock().unwrap();
826     ///         locked_queue.pop_front()
827     ///     }
828     ///
829     ///     pub async fn recv(&self) -> T {
830     ///         let future = self.notify_on_sent.notified();
831     ///         tokio::pin!(future);
832     ///
833     ///         loop {
834     ///             // Make sure that no wakeup is lost if we get
835     ///             // `None` from `try_recv`.
836     ///             future.as_mut().enable();
837     ///
838     ///             if let Some(msg) = self.try_recv() {
839     ///                 return msg;
840     ///             }
841     ///
842     ///             // Wait for a call to `notify_one`.
843     ///             //
844     ///             // This uses `.as_mut()` to avoid consuming the future,
845     ///             // which lets us call `Pin::set` below.
846     ///             future.as_mut().await;
847     ///
848     ///             // Reset the future in case another call to
849     ///             // `try_recv` got the message before us.
850     ///             future.set(self.notify_on_sent.notified());
851     ///         }
852     ///     }
853     /// }
854     /// ```
855     ///
856     /// [`notify_one`]: Notify::notify_one()
857     /// [`notify_waiters`]: Notify::notify_waiters()
enable(self: Pin<&mut Self>) -> bool858     pub fn enable(self: Pin<&mut Self>) -> bool {
859         self.poll_notified(None).is_ready()
860     }
861 
862     /// A custom `project` implementation is used in place of `pin-project-lite`
863     /// as a custom drop implementation is needed.
project(self: Pin<&mut Self>) -> (&Notify, &mut State, &usize, &Waiter)864     fn project(self: Pin<&mut Self>) -> (&Notify, &mut State, &usize, &Waiter) {
865         unsafe {
866             // Safety: `notify`, `state` and `notify_waiters_calls` are `Unpin`.
867 
868             is_unpin::<&Notify>();
869             is_unpin::<State>();
870             is_unpin::<usize>();
871 
872             let me = self.get_unchecked_mut();
873             (
874                 me.notify,
875                 &mut me.state,
876                 &me.notify_waiters_calls,
877                 &me.waiter,
878             )
879         }
880     }
881 
poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()>882     fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> {
883         use State::*;
884 
885         let (notify, state, notify_waiters_calls, waiter) = self.project();
886 
887         'outer_loop: loop {
888             match *state {
889                 Init => {
890                     let curr = notify.state.load(SeqCst);
891 
892                     // Optimistically try acquiring a pending notification
893                     let res = notify.state.compare_exchange(
894                         set_state(curr, NOTIFIED),
895                         set_state(curr, EMPTY),
896                         SeqCst,
897                         SeqCst,
898                     );
899 
900                     if res.is_ok() {
901                         // Acquired the notification
902                         *state = Done;
903                         continue 'outer_loop;
904                     }
905 
906                     // Clone the waker before locking, a waker clone can be
907                     // triggering arbitrary code.
908                     let waker = waker.cloned();
909 
910                     // Acquire the lock and attempt to transition to the waiting
911                     // state.
912                     let mut waiters = notify.waiters.lock();
913 
914                     // Reload the state with the lock held
915                     let mut curr = notify.state.load(SeqCst);
916 
917                     // if notify_waiters has been called after the future
918                     // was created, then we are done
919                     if get_num_notify_waiters_calls(curr) != *notify_waiters_calls {
920                         *state = Done;
921                         continue 'outer_loop;
922                     }
923 
924                     // Transition the state to WAITING.
925                     loop {
926                         match get_state(curr) {
927                             EMPTY => {
928                                 // Transition to WAITING
929                                 let res = notify.state.compare_exchange(
930                                     set_state(curr, EMPTY),
931                                     set_state(curr, WAITING),
932                                     SeqCst,
933                                     SeqCst,
934                                 );
935 
936                                 if let Err(actual) = res {
937                                     assert_eq!(get_state(actual), NOTIFIED);
938                                     curr = actual;
939                                 } else {
940                                     break;
941                                 }
942                             }
943                             WAITING => break,
944                             NOTIFIED => {
945                                 // Try consuming the notification
946                                 let res = notify.state.compare_exchange(
947                                     set_state(curr, NOTIFIED),
948                                     set_state(curr, EMPTY),
949                                     SeqCst,
950                                     SeqCst,
951                                 );
952 
953                                 match res {
954                                     Ok(_) => {
955                                         // Acquired the notification
956                                         *state = Done;
957                                         continue 'outer_loop;
958                                     }
959                                     Err(actual) => {
960                                         assert_eq!(get_state(actual), EMPTY);
961                                         curr = actual;
962                                     }
963                                 }
964                             }
965                             _ => unreachable!(),
966                         }
967                     }
968 
969                     let mut old_waker = None;
970                     if waker.is_some() {
971                         // Safety: called while locked.
972                         //
973                         // The use of `old_waiter` here is not necessary, as the field is always
974                         // None when we reach this line.
975                         unsafe {
976                             old_waker =
977                                 waiter.waker.with_mut(|v| std::mem::replace(&mut *v, waker));
978                         }
979                     }
980 
981                     // Insert the waiter into the linked list
982                     waiters.push_front(NonNull::from(waiter));
983 
984                     *state = Waiting;
985 
986                     drop(waiters);
987                     drop(old_waker);
988 
989                     return Poll::Pending;
990                 }
991                 Waiting => {
992                     #[cfg(tokio_taskdump)]
993                     if let Some(waker) = waker {
994                         let mut ctx = Context::from_waker(waker);
995                         ready!(crate::trace::trace_leaf(&mut ctx));
996                     }
997 
998                     if waiter.notification.load(Acquire).is_some() {
999                         // Safety: waiter is already unlinked and will not be shared again,
1000                         // so we have an exclusive access to `waker`.
1001                         drop(unsafe { waiter.waker.with_mut(|waker| (*waker).take()) });
1002 
1003                         waiter.notification.clear();
1004                         *state = Done;
1005                         return Poll::Ready(());
1006                     }
1007 
1008                     // Our waiter was not notified, implying it is still stored in a waiter
1009                     // list (guarded by `notify.waiters`). In order to access the waker
1010                     // fields, we must acquire the lock.
1011 
1012                     let mut old_waker = None;
1013                     let mut waiters = notify.waiters.lock();
1014 
1015                     // We hold the lock and notifications are set only with the lock held,
1016                     // so this can be relaxed, because the happens-before relationship is
1017                     // established through the mutex.
1018                     if waiter.notification.load(Relaxed).is_some() {
1019                         // Safety: waiter is already unlinked and will not be shared again,
1020                         // so we have an exclusive access to `waker`.
1021                         old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
1022 
1023                         waiter.notification.clear();
1024 
1025                         // Drop the old waker after releasing the lock.
1026                         drop(waiters);
1027                         drop(old_waker);
1028 
1029                         *state = Done;
1030                         return Poll::Ready(());
1031                     }
1032 
1033                     // Load the state with the lock held.
1034                     let curr = notify.state.load(SeqCst);
1035 
1036                     if get_num_notify_waiters_calls(curr) != *notify_waiters_calls {
1037                         // Before we add a waiter to the list we check if these numbers are
1038                         // different while holding the lock. If these numbers are different now,
1039                         // it means that there is a call to `notify_waiters` in progress and this
1040                         // waiter must be contained by a guarded list used in `notify_waiters`.
1041                         // We can treat the waiter as notified and remove it from the list, as
1042                         // it would have been notified in the `notify_waiters` call anyways.
1043 
1044                         // Safety: we hold the lock, so we can modify the waker.
1045                         old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
1046 
1047                         // Safety: we hold the lock, so we have an exclusive access to the list.
1048                         // The list is used in `notify_waiters`, so it must be guarded.
1049                         unsafe { waiters.remove(NonNull::from(waiter)) };
1050 
1051                         *state = Done;
1052                     } else {
1053                         // Safety: we hold the lock, so we can modify the waker.
1054                         unsafe {
1055                             waiter.waker.with_mut(|v| {
1056                                 if let Some(waker) = waker {
1057                                     let should_update = match &*v {
1058                                         Some(current_waker) => !current_waker.will_wake(waker),
1059                                         None => true,
1060                                     };
1061                                     if should_update {
1062                                         old_waker = std::mem::replace(&mut *v, Some(waker.clone()));
1063                                     }
1064                                 }
1065                             });
1066                         }
1067 
1068                         // Drop the old waker after releasing the lock.
1069                         drop(waiters);
1070                         drop(old_waker);
1071 
1072                         return Poll::Pending;
1073                     }
1074 
1075                     // Explicit drop of the lock to indicate the scope that the
1076                     // lock is held. Because holding the lock is required to
1077                     // ensure safe access to fields not held within the lock, it
1078                     // is helpful to visualize the scope of the critical
1079                     // section.
1080                     drop(waiters);
1081 
1082                     // Drop the old waker after releasing the lock.
1083                     drop(old_waker);
1084                 }
1085                 Done => {
1086                     #[cfg(tokio_taskdump)]
1087                     if let Some(waker) = waker {
1088                         let mut ctx = Context::from_waker(waker);
1089                         ready!(crate::trace::trace_leaf(&mut ctx));
1090                     }
1091                     return Poll::Ready(());
1092                 }
1093             }
1094         }
1095     }
1096 }
1097 
1098 impl Future for Notified<'_> {
1099     type Output = ();
1100 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>1101     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
1102         self.poll_notified(Some(cx.waker()))
1103     }
1104 }
1105 
1106 impl Drop for Notified<'_> {
drop(&mut self)1107     fn drop(&mut self) {
1108         use State::*;
1109 
1110         // Safety: The type only transitions to a "Waiting" state when pinned.
1111         let (notify, state, _, waiter) = unsafe { Pin::new_unchecked(self).project() };
1112 
1113         // This is where we ensure safety. The `Notified` value is being
1114         // dropped, which means we must ensure that the waiter entry is no
1115         // longer stored in the linked list.
1116         if matches!(*state, Waiting) {
1117             let mut waiters = notify.waiters.lock();
1118             let mut notify_state = notify.state.load(SeqCst);
1119 
1120             // We hold the lock, so this field is not concurrently accessed by
1121             // `notify_*` functions and we can use the relaxed ordering.
1122             let notification = waiter.notification.load(Relaxed);
1123 
1124             // remove the entry from the list (if not already removed)
1125             //
1126             // Safety: we hold the lock, so we have an exclusive access to every list the
1127             // waiter may be contained in. If the node is not contained in the `waiters`
1128             // list, then it is contained by a guarded list used by `notify_waiters`.
1129             unsafe { waiters.remove(NonNull::from(waiter)) };
1130 
1131             if waiters.is_empty() && get_state(notify_state) == WAITING {
1132                 notify_state = set_state(notify_state, EMPTY);
1133                 notify.state.store(notify_state, SeqCst);
1134             }
1135 
1136             // See if the node was notified but not received. In this case, if
1137             // the notification was triggered via `notify_one`, it must be sent
1138             // to the next waiter.
1139             if notification == Some(Notification::One) {
1140                 if let Some(waker) = notify_locked(&mut waiters, &notify.state, notify_state) {
1141                     drop(waiters);
1142                     waker.wake();
1143                 }
1144             }
1145         }
1146     }
1147 }
1148 
1149 /// # Safety
1150 ///
1151 /// `Waiter` is forced to be !Unpin.
1152 unsafe impl linked_list::Link for Waiter {
1153     type Handle = NonNull<Waiter>;
1154     type Target = Waiter;
1155 
as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter>1156     fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1157         *handle
1158     }
1159 
from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter>1160     unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1161         ptr
1162     }
1163 
pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>>1164     unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1165         Waiter::addr_of_pointers(target)
1166     }
1167 }
1168 
is_unpin<T: Unpin>()1169 fn is_unpin<T: Unpin>() {}
1170