• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Allow `unreachable_pub` warnings when sync is not enabled
2 // due to the usage of `Notify` within the `rt` feature set.
3 // When this module is compiled with `sync` enabled we will warn on
4 // this lint. When `rt` is enabled we use `pub(crate)` which
5 // triggers this warning but it is safe to ignore in this case.
6 #![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
7 
8 use crate::loom::sync::atomic::AtomicUsize;
9 use crate::loom::sync::Mutex;
10 use crate::util::linked_list::{self, LinkedList};
11 use crate::util::WakeList;
12 
13 use std::cell::UnsafeCell;
14 use std::future::Future;
15 use std::marker::PhantomPinned;
16 use std::panic::{RefUnwindSafe, UnwindSafe};
17 use std::pin::Pin;
18 use std::ptr::NonNull;
19 use std::sync::atomic::Ordering::SeqCst;
20 use std::task::{Context, Poll, Waker};
21 
22 type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
23 
24 /// Notifies a single task to wake up.
25 ///
26 /// `Notify` provides a basic mechanism to notify a single task of an event.
27 /// `Notify` itself does not carry any data. Instead, it is to be used to signal
28 /// another task to perform an operation.
29 ///
30 /// A `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. The
31 /// [`notified().await`] method waits for a permit to become available, and
32 /// [`notify_one()`] sets a permit **if there currently are no available
33 /// permits**.
34 ///
35 /// The synchronization details of `Notify` are similar to
36 /// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`]
37 /// value contains a single permit. [`notified().await`] waits for the permit to
38 /// be made available, consumes the permit, and resumes.  [`notify_one()`] sets
39 /// the permit, waking a pending task if there is one.
40 ///
41 /// If `notify_one()` is called **before** `notified().await`, then the next
42 /// call to `notified().await` will complete immediately, consuming the permit.
43 /// Any subsequent calls to `notified().await` will wait for a new permit.
44 ///
45 /// If `notify_one()` is called **multiple** times before `notified().await`,
46 /// only a **single** permit is stored. The next call to `notified().await` will
47 /// complete immediately, but the one after will wait for a new permit.
48 ///
49 /// # Examples
50 ///
51 /// Basic usage.
52 ///
53 /// ```
54 /// use tokio::sync::Notify;
55 /// use std::sync::Arc;
56 ///
57 /// #[tokio::main]
58 /// async fn main() {
59 ///     let notify = Arc::new(Notify::new());
60 ///     let notify2 = notify.clone();
61 ///
62 ///     let handle = tokio::spawn(async move {
63 ///         notify2.notified().await;
64 ///         println!("received notification");
65 ///     });
66 ///
67 ///     println!("sending notification");
68 ///     notify.notify_one();
69 ///
70 ///     // Wait for task to receive notification.
71 ///     handle.await.unwrap();
72 /// }
73 /// ```
74 ///
75 /// Unbound multi-producer single-consumer (mpsc) channel.
76 ///
77 /// No wakeups can be lost when using this channel because the call to
78 /// `notify_one()` will store a permit in the `Notify`, which the following call
79 /// to `notified()` will consume.
80 ///
81 /// ```
82 /// use tokio::sync::Notify;
83 ///
84 /// use std::collections::VecDeque;
85 /// use std::sync::Mutex;
86 ///
87 /// struct Channel<T> {
88 ///     values: Mutex<VecDeque<T>>,
89 ///     notify: Notify,
90 /// }
91 ///
92 /// impl<T> Channel<T> {
93 ///     pub fn send(&self, value: T) {
94 ///         self.values.lock().unwrap()
95 ///             .push_back(value);
96 ///
97 ///         // Notify the consumer a value is available
98 ///         self.notify.notify_one();
99 ///     }
100 ///
101 ///     // This is a single-consumer channel, so several concurrent calls to
102 ///     // `recv` are not allowed.
103 ///     pub async fn recv(&self) -> T {
104 ///         loop {
105 ///             // Drain values
106 ///             if let Some(value) = self.values.lock().unwrap().pop_front() {
107 ///                 return value;
108 ///             }
109 ///
110 ///             // Wait for values to be available
111 ///             self.notify.notified().await;
112 ///         }
113 ///     }
114 /// }
115 /// ```
116 ///
117 /// Unbound multi-producer multi-consumer (mpmc) channel.
118 ///
119 /// The call to [`enable`] is important because otherwise if you have two
120 /// calls to `recv` and two calls to `send` in parallel, the following could
121 /// happen:
122 ///
123 ///  1. Both calls to `try_recv` return `None`.
124 ///  2. Both new elements are added to the vector.
125 ///  3. The `notify_one` method is called twice, adding only a single
126 ///     permit to the `Notify`.
127 ///  4. Both calls to `recv` reach the `Notified` future. One of them
128 ///     consumes the permit, and the other sleeps forever.
129 ///
130 /// By adding the `Notified` futures to the list by calling `enable` before
131 /// `try_recv`, the `notify_one` calls in step three would remove the
132 /// futures from the list and mark them notified instead of adding a permit
133 /// to the `Notify`. This ensures that both futures are woken.
134 ///
135 /// Notice that this failure can only happen if there are two concurrent calls
136 /// to `recv`. This is why the mpsc example above does not require a call to
137 /// `enable`.
138 ///
139 /// ```
140 /// use tokio::sync::Notify;
141 ///
142 /// use std::collections::VecDeque;
143 /// use std::sync::Mutex;
144 ///
145 /// struct Channel<T> {
146 ///     messages: Mutex<VecDeque<T>>,
147 ///     notify_on_sent: Notify,
148 /// }
149 ///
150 /// impl<T> Channel<T> {
151 ///     pub fn send(&self, msg: T) {
152 ///         let mut locked_queue = self.messages.lock().unwrap();
153 ///         locked_queue.push_back(msg);
154 ///         drop(locked_queue);
155 ///
156 ///         // Send a notification to one of the calls currently
157 ///         // waiting in a call to `recv`.
158 ///         self.notify_on_sent.notify_one();
159 ///     }
160 ///
161 ///     pub fn try_recv(&self) -> Option<T> {
162 ///         let mut locked_queue = self.messages.lock().unwrap();
163 ///         locked_queue.pop_front()
164 ///     }
165 ///
166 ///     pub async fn recv(&self) -> T {
167 ///         let future = self.notify_on_sent.notified();
168 ///         tokio::pin!(future);
169 ///
170 ///         loop {
171 ///             // Make sure that no wakeup is lost if we get
172 ///             // `None` from `try_recv`.
173 ///             future.as_mut().enable();
174 ///
175 ///             if let Some(msg) = self.try_recv() {
176 ///                 return msg;
177 ///             }
178 ///
179 ///             // Wait for a call to `notify_one`.
180 ///             //
181 ///             // This uses `.as_mut()` to avoid consuming the future,
182 ///             // which lets us call `Pin::set` below.
183 ///             future.as_mut().await;
184 ///
185 ///             // Reset the future in case another call to
186 ///             // `try_recv` got the message before us.
187 ///             future.set(self.notify_on_sent.notified());
188 ///         }
189 ///     }
190 /// }
191 /// ```
192 ///
193 /// [park]: std::thread::park
194 /// [unpark]: std::thread::Thread::unpark
195 /// [`notified().await`]: Notify::notified()
196 /// [`notify_one()`]: Notify::notify_one()
197 /// [`enable`]: Notified::enable()
198 /// [`Semaphore`]: crate::sync::Semaphore
199 #[derive(Debug)]
200 pub struct Notify {
201     // This uses 2 bits to store one of `EMPTY`,
202     // `WAITING` or `NOTIFIED`. The rest of the bits
203     // are used to store the number of times `notify_waiters`
204     // was called.
205     state: AtomicUsize,
206     waiters: Mutex<WaitList>,
207 }
208 
209 #[derive(Debug, Clone, Copy)]
210 enum NotificationType {
211     // Notification triggered by calling `notify_waiters`
212     AllWaiters,
213     // Notification triggered by calling `notify_one`
214     OneWaiter,
215 }
216 
217 #[derive(Debug)]
218 struct Waiter {
219     /// Intrusive linked-list pointers.
220     pointers: linked_list::Pointers<Waiter>,
221 
222     /// Waiting task's waker.
223     waker: Option<Waker>,
224 
225     /// `true` if the notification has been assigned to this waiter.
226     notified: Option<NotificationType>,
227 
228     /// Should not be `Unpin`.
229     _p: PhantomPinned,
230 }
231 
232 generate_addr_of_methods! {
233     impl<> Waiter {
234         unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
235             &self.pointers
236         }
237     }
238 }
239 
240 /// Future returned from [`Notify::notified()`].
241 ///
242 /// This future is fused, so once it has completed, any future calls to poll
243 /// will immediately return `Poll::Ready`.
244 #[derive(Debug)]
245 pub struct Notified<'a> {
246     /// The `Notify` being received on.
247     notify: &'a Notify,
248 
249     /// The current state of the receiving process.
250     state: State,
251 
252     /// Entry in the waiter `LinkedList`.
253     waiter: UnsafeCell<Waiter>,
254 }
255 
256 unsafe impl<'a> Send for Notified<'a> {}
257 unsafe impl<'a> Sync for Notified<'a> {}
258 
259 #[derive(Debug)]
260 enum State {
261     Init(usize),
262     Waiting,
263     Done,
264 }
265 
266 const NOTIFY_WAITERS_SHIFT: usize = 2;
267 const STATE_MASK: usize = (1 << NOTIFY_WAITERS_SHIFT) - 1;
268 const NOTIFY_WAITERS_CALLS_MASK: usize = !STATE_MASK;
269 
270 /// Initial "idle" state.
271 const EMPTY: usize = 0;
272 
273 /// One or more threads are currently waiting to be notified.
274 const WAITING: usize = 1;
275 
276 /// Pending notification.
277 const NOTIFIED: usize = 2;
278 
set_state(data: usize, state: usize) -> usize279 fn set_state(data: usize, state: usize) -> usize {
280     (data & NOTIFY_WAITERS_CALLS_MASK) | (state & STATE_MASK)
281 }
282 
get_state(data: usize) -> usize283 fn get_state(data: usize) -> usize {
284     data & STATE_MASK
285 }
286 
get_num_notify_waiters_calls(data: usize) -> usize287 fn get_num_notify_waiters_calls(data: usize) -> usize {
288     (data & NOTIFY_WAITERS_CALLS_MASK) >> NOTIFY_WAITERS_SHIFT
289 }
290 
inc_num_notify_waiters_calls(data: usize) -> usize291 fn inc_num_notify_waiters_calls(data: usize) -> usize {
292     data + (1 << NOTIFY_WAITERS_SHIFT)
293 }
294 
atomic_inc_num_notify_waiters_calls(data: &AtomicUsize)295 fn atomic_inc_num_notify_waiters_calls(data: &AtomicUsize) {
296     data.fetch_add(1 << NOTIFY_WAITERS_SHIFT, SeqCst);
297 }
298 
299 impl Notify {
300     /// Create a new `Notify`, initialized without a permit.
301     ///
302     /// # Examples
303     ///
304     /// ```
305     /// use tokio::sync::Notify;
306     ///
307     /// let notify = Notify::new();
308     /// ```
new() -> Notify309     pub fn new() -> Notify {
310         Notify {
311             state: AtomicUsize::new(0),
312             waiters: Mutex::new(LinkedList::new()),
313         }
314     }
315 
316     /// Create a new `Notify`, initialized without a permit.
317     ///
318     /// # Examples
319     ///
320     /// ```
321     /// use tokio::sync::Notify;
322     ///
323     /// static NOTIFY: Notify = Notify::const_new();
324     /// ```
325     #[cfg(all(feature = "parking_lot", not(all(loom, test))))]
326     #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))]
const_new() -> Notify327     pub const fn const_new() -> Notify {
328         Notify {
329             state: AtomicUsize::new(0),
330             waiters: Mutex::const_new(LinkedList::new()),
331         }
332     }
333 
334     /// Wait for a notification.
335     ///
336     /// Equivalent to:
337     ///
338     /// ```ignore
339     /// async fn notified(&self);
340     /// ```
341     ///
342     /// Each `Notify` value holds a single permit. If a permit is available from
343     /// an earlier call to [`notify_one()`], then `notified().await` will complete
344     /// immediately, consuming that permit. Otherwise, `notified().await` waits
345     /// for a permit to be made available by the next call to `notify_one()`.
346     ///
347     /// The `Notified` future is not guaranteed to receive wakeups from calls to
348     /// `notify_one()` if it has not yet been polled. See the documentation for
349     /// [`Notified::enable()`] for more details.
350     ///
351     /// The `Notified` future is guaranteed to receive wakeups from
352     /// `notify_waiters()` as soon as it has been created, even if it has not
353     /// yet been polled.
354     ///
355     /// [`notify_one()`]: Notify::notify_one
356     /// [`Notified::enable()`]: Notified::enable
357     ///
358     /// # Cancel safety
359     ///
360     /// This method uses a queue to fairly distribute notifications in the order
361     /// they were requested. Cancelling a call to `notified` makes you lose your
362     /// place in the queue.
363     ///
364     /// # Examples
365     ///
366     /// ```
367     /// use tokio::sync::Notify;
368     /// use std::sync::Arc;
369     ///
370     /// #[tokio::main]
371     /// async fn main() {
372     ///     let notify = Arc::new(Notify::new());
373     ///     let notify2 = notify.clone();
374     ///
375     ///     tokio::spawn(async move {
376     ///         notify2.notified().await;
377     ///         println!("received notification");
378     ///     });
379     ///
380     ///     println!("sending notification");
381     ///     notify.notify_one();
382     /// }
383     /// ```
notified(&self) -> Notified<'_>384     pub fn notified(&self) -> Notified<'_> {
385         // we load the number of times notify_waiters
386         // was called and store that in our initial state
387         let state = self.state.load(SeqCst);
388         Notified {
389             notify: self,
390             state: State::Init(state >> NOTIFY_WAITERS_SHIFT),
391             waiter: UnsafeCell::new(Waiter {
392                 pointers: linked_list::Pointers::new(),
393                 waker: None,
394                 notified: None,
395                 _p: PhantomPinned,
396             }),
397         }
398     }
399 
400     /// Notifies a waiting task.
401     ///
402     /// If a task is currently waiting, that task is notified. Otherwise, a
403     /// permit is stored in this `Notify` value and the **next** call to
404     /// [`notified().await`] will complete immediately consuming the permit made
405     /// available by this call to `notify_one()`.
406     ///
407     /// At most one permit may be stored by `Notify`. Many sequential calls to
408     /// `notify_one` will result in a single permit being stored. The next call to
409     /// `notified().await` will complete immediately, but the one after that
410     /// will wait.
411     ///
412     /// [`notified().await`]: Notify::notified()
413     ///
414     /// # Examples
415     ///
416     /// ```
417     /// use tokio::sync::Notify;
418     /// use std::sync::Arc;
419     ///
420     /// #[tokio::main]
421     /// async fn main() {
422     ///     let notify = Arc::new(Notify::new());
423     ///     let notify2 = notify.clone();
424     ///
425     ///     tokio::spawn(async move {
426     ///         notify2.notified().await;
427     ///         println!("received notification");
428     ///     });
429     ///
430     ///     println!("sending notification");
431     ///     notify.notify_one();
432     /// }
433     /// ```
434     // Alias for old name in 0.x
435     #[cfg_attr(docsrs, doc(alias = "notify"))]
notify_one(&self)436     pub fn notify_one(&self) {
437         // Load the current state
438         let mut curr = self.state.load(SeqCst);
439 
440         // If the state is `EMPTY`, transition to `NOTIFIED` and return.
441         while let EMPTY | NOTIFIED = get_state(curr) {
442             // The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A
443             // happens-before synchronization must happen between this atomic
444             // operation and a task calling `notified().await`.
445             let new = set_state(curr, NOTIFIED);
446             let res = self.state.compare_exchange(curr, new, SeqCst, SeqCst);
447 
448             match res {
449                 // No waiters, no further work to do
450                 Ok(_) => return,
451                 Err(actual) => {
452                     curr = actual;
453                 }
454             }
455         }
456 
457         // There are waiters, the lock must be acquired to notify.
458         let mut waiters = self.waiters.lock();
459 
460         // The state must be reloaded while the lock is held. The state may only
461         // transition out of WAITING while the lock is held.
462         curr = self.state.load(SeqCst);
463 
464         if let Some(waker) = notify_locked(&mut waiters, &self.state, curr) {
465             drop(waiters);
466             waker.wake();
467         }
468     }
469 
470     /// Notifies all waiting tasks.
471     ///
472     /// If a task is currently waiting, that task is notified. Unlike with
473     /// `notify_one()`, no permit is stored to be used by the next call to
474     /// `notified().await`. The purpose of this method is to notify all
475     /// already registered waiters. Registering for notification is done by
476     /// acquiring an instance of the `Notified` future via calling `notified()`.
477     ///
478     /// # Examples
479     ///
480     /// ```
481     /// use tokio::sync::Notify;
482     /// use std::sync::Arc;
483     ///
484     /// #[tokio::main]
485     /// async fn main() {
486     ///     let notify = Arc::new(Notify::new());
487     ///     let notify2 = notify.clone();
488     ///
489     ///     let notified1 = notify.notified();
490     ///     let notified2 = notify.notified();
491     ///
492     ///     let handle = tokio::spawn(async move {
493     ///         println!("sending notifications");
494     ///         notify2.notify_waiters();
495     ///     });
496     ///
497     ///     notified1.await;
498     ///     notified2.await;
499     ///     println!("received notifications");
500     /// }
501     /// ```
notify_waiters(&self)502     pub fn notify_waiters(&self) {
503         let mut wakers = WakeList::new();
504 
505         // There are waiters, the lock must be acquired to notify.
506         let mut waiters = self.waiters.lock();
507 
508         // The state must be reloaded while the lock is held. The state may only
509         // transition out of WAITING while the lock is held.
510         let curr = self.state.load(SeqCst);
511 
512         if matches!(get_state(curr), EMPTY | NOTIFIED) {
513             // There are no waiting tasks. All we need to do is increment the
514             // number of times this method was called.
515             atomic_inc_num_notify_waiters_calls(&self.state);
516             return;
517         }
518 
519         // At this point, it is guaranteed that the state will not
520         // concurrently change, as holding the lock is required to
521         // transition **out** of `WAITING`.
522         'outer: loop {
523             while wakers.can_push() {
524                 match waiters.pop_back() {
525                     Some(mut waiter) => {
526                         // Safety: `waiters` lock is still held.
527                         let waiter = unsafe { waiter.as_mut() };
528 
529                         assert!(waiter.notified.is_none());
530 
531                         waiter.notified = Some(NotificationType::AllWaiters);
532 
533                         if let Some(waker) = waiter.waker.take() {
534                             wakers.push(waker);
535                         }
536                     }
537                     None => {
538                         break 'outer;
539                     }
540                 }
541             }
542 
543             drop(waiters);
544 
545             wakers.wake_all();
546 
547             // Acquire the lock again.
548             waiters = self.waiters.lock();
549         }
550 
551         // All waiters will be notified, the state must be transitioned to
552         // `EMPTY`. As transitioning **from** `WAITING` requires the lock to be
553         // held, a `store` is sufficient.
554         let new = set_state(inc_num_notify_waiters_calls(curr), EMPTY);
555         self.state.store(new, SeqCst);
556 
557         // Release the lock before notifying
558         drop(waiters);
559 
560         wakers.wake_all();
561     }
562 }
563 
564 impl Default for Notify {
default() -> Notify565     fn default() -> Notify {
566         Notify::new()
567     }
568 }
569 
570 impl UnwindSafe for Notify {}
571 impl RefUnwindSafe for Notify {}
572 
notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Option<Waker>573 fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Option<Waker> {
574     loop {
575         match get_state(curr) {
576             EMPTY | NOTIFIED => {
577                 let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst);
578 
579                 match res {
580                     Ok(_) => return None,
581                     Err(actual) => {
582                         let actual_state = get_state(actual);
583                         assert!(actual_state == EMPTY || actual_state == NOTIFIED);
584                         state.store(set_state(actual, NOTIFIED), SeqCst);
585                         return None;
586                     }
587                 }
588             }
589             WAITING => {
590                 // At this point, it is guaranteed that the state will not
591                 // concurrently change as holding the lock is required to
592                 // transition **out** of `WAITING`.
593                 //
594                 // Get a pending waiter
595                 let mut waiter = waiters.pop_back().unwrap();
596 
597                 // Safety: `waiters` lock is still held.
598                 let waiter = unsafe { waiter.as_mut() };
599 
600                 assert!(waiter.notified.is_none());
601 
602                 waiter.notified = Some(NotificationType::OneWaiter);
603                 let waker = waiter.waker.take();
604 
605                 if waiters.is_empty() {
606                     // As this the **final** waiter in the list, the state
607                     // must be transitioned to `EMPTY`. As transitioning
608                     // **from** `WAITING` requires the lock to be held, a
609                     // `store` is sufficient.
610                     state.store(set_state(curr, EMPTY), SeqCst);
611                 }
612 
613                 return waker;
614             }
615             _ => unreachable!(),
616         }
617     }
618 }
619 
620 // ===== impl Notified =====
621 
622 impl Notified<'_> {
623     /// Adds this future to the list of futures that are ready to receive
624     /// wakeups from calls to [`notify_one`].
625     ///
626     /// Polling the future also adds it to the list, so this method should only
627     /// be used if you want to add the future to the list before the first call
628     /// to `poll`. (In fact, this method is equivalent to calling `poll` except
629     /// that no `Waker` is registered.)
630     ///
631     /// This has no effect on notifications sent using [`notify_waiters`], which
632     /// are received as long as they happen after the creation of the `Notified`
633     /// regardless of whether `enable` or `poll` has been called.
634     ///
635     /// This method returns true if the `Notified` is ready. This happens in the
636     /// following situations:
637     ///
638     ///  1. The `notify_waiters` method was called between the creation of the
639     ///     `Notified` and the call to this method.
640     ///  2. This is the first call to `enable` or `poll` on this future, and the
641     ///     `Notify` was holding a permit from a previous call to `notify_one`.
642     ///     The call consumes the permit in that case.
643     ///  3. The future has previously been enabled or polled, and it has since
644     ///     then been marked ready by either consuming a permit from the
645     ///     `Notify`, or by a call to `notify_one` or `notify_waiters` that
646     ///     removed it from the list of futures ready to receive wakeups.
647     ///
648     /// If this method returns true, any future calls to poll on the same future
649     /// will immediately return `Poll::Ready`.
650     ///
651     /// # Examples
652     ///
653     /// Unbound multi-producer multi-consumer (mpmc) channel.
654     ///
655     /// The call to `enable` is important because otherwise if you have two
656     /// calls to `recv` and two calls to `send` in parallel, the following could
657     /// happen:
658     ///
659     ///  1. Both calls to `try_recv` return `None`.
660     ///  2. Both new elements are added to the vector.
661     ///  3. The `notify_one` method is called twice, adding only a single
662     ///     permit to the `Notify`.
663     ///  4. Both calls to `recv` reach the `Notified` future. One of them
664     ///     consumes the permit, and the other sleeps forever.
665     ///
666     /// By adding the `Notified` futures to the list by calling `enable` before
667     /// `try_recv`, the `notify_one` calls in step three would remove the
668     /// futures from the list and mark them notified instead of adding a permit
669     /// to the `Notify`. This ensures that both futures are woken.
670     ///
671     /// ```
672     /// use tokio::sync::Notify;
673     ///
674     /// use std::collections::VecDeque;
675     /// use std::sync::Mutex;
676     ///
677     /// struct Channel<T> {
678     ///     messages: Mutex<VecDeque<T>>,
679     ///     notify_on_sent: Notify,
680     /// }
681     ///
682     /// impl<T> Channel<T> {
683     ///     pub fn send(&self, msg: T) {
684     ///         let mut locked_queue = self.messages.lock().unwrap();
685     ///         locked_queue.push_back(msg);
686     ///         drop(locked_queue);
687     ///
688     ///         // Send a notification to one of the calls currently
689     ///         // waiting in a call to `recv`.
690     ///         self.notify_on_sent.notify_one();
691     ///     }
692     ///
693     ///     pub fn try_recv(&self) -> Option<T> {
694     ///         let mut locked_queue = self.messages.lock().unwrap();
695     ///         locked_queue.pop_front()
696     ///     }
697     ///
698     ///     pub async fn recv(&self) -> T {
699     ///         let future = self.notify_on_sent.notified();
700     ///         tokio::pin!(future);
701     ///
702     ///         loop {
703     ///             // Make sure that no wakeup is lost if we get
704     ///             // `None` from `try_recv`.
705     ///             future.as_mut().enable();
706     ///
707     ///             if let Some(msg) = self.try_recv() {
708     ///                 return msg;
709     ///             }
710     ///
711     ///             // Wait for a call to `notify_one`.
712     ///             //
713     ///             // This uses `.as_mut()` to avoid consuming the future,
714     ///             // which lets us call `Pin::set` below.
715     ///             future.as_mut().await;
716     ///
717     ///             // Reset the future in case another call to
718     ///             // `try_recv` got the message before us.
719     ///             future.set(self.notify_on_sent.notified());
720     ///         }
721     ///     }
722     /// }
723     /// ```
724     ///
725     /// [`notify_one`]: Notify::notify_one()
726     /// [`notify_waiters`]: Notify::notify_waiters()
enable(self: Pin<&mut Self>) -> bool727     pub fn enable(self: Pin<&mut Self>) -> bool {
728         self.poll_notified(None).is_ready()
729     }
730 
731     /// A custom `project` implementation is used in place of `pin-project-lite`
732     /// as a custom drop implementation is needed.
project(self: Pin<&mut Self>) -> (&Notify, &mut State, &UnsafeCell<Waiter>)733     fn project(self: Pin<&mut Self>) -> (&Notify, &mut State, &UnsafeCell<Waiter>) {
734         unsafe {
735             // Safety: both `notify` and `state` are `Unpin`.
736 
737             is_unpin::<&Notify>();
738             is_unpin::<AtomicUsize>();
739 
740             let me = self.get_unchecked_mut();
741             (me.notify, &mut me.state, &me.waiter)
742         }
743     }
744 
poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()>745     fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> {
746         use State::*;
747 
748         let (notify, state, waiter) = self.project();
749 
750         loop {
751             match *state {
752                 Init(initial_notify_waiters_calls) => {
753                     let curr = notify.state.load(SeqCst);
754 
755                     // Optimistically try acquiring a pending notification
756                     let res = notify.state.compare_exchange(
757                         set_state(curr, NOTIFIED),
758                         set_state(curr, EMPTY),
759                         SeqCst,
760                         SeqCst,
761                     );
762 
763                     if res.is_ok() {
764                         // Acquired the notification
765                         *state = Done;
766                         return Poll::Ready(());
767                     }
768 
769                     // Clone the waker before locking, a waker clone can be
770                     // triggering arbitrary code.
771                     let waker = waker.cloned();
772 
773                     // Acquire the lock and attempt to transition to the waiting
774                     // state.
775                     let mut waiters = notify.waiters.lock();
776 
777                     // Reload the state with the lock held
778                     let mut curr = notify.state.load(SeqCst);
779 
780                     // if notify_waiters has been called after the future
781                     // was created, then we are done
782                     if get_num_notify_waiters_calls(curr) != initial_notify_waiters_calls {
783                         *state = Done;
784                         return Poll::Ready(());
785                     }
786 
787                     // Transition the state to WAITING.
788                     loop {
789                         match get_state(curr) {
790                             EMPTY => {
791                                 // Transition to WAITING
792                                 let res = notify.state.compare_exchange(
793                                     set_state(curr, EMPTY),
794                                     set_state(curr, WAITING),
795                                     SeqCst,
796                                     SeqCst,
797                                 );
798 
799                                 if let Err(actual) = res {
800                                     assert_eq!(get_state(actual), NOTIFIED);
801                                     curr = actual;
802                                 } else {
803                                     break;
804                                 }
805                             }
806                             WAITING => break,
807                             NOTIFIED => {
808                                 // Try consuming the notification
809                                 let res = notify.state.compare_exchange(
810                                     set_state(curr, NOTIFIED),
811                                     set_state(curr, EMPTY),
812                                     SeqCst,
813                                     SeqCst,
814                                 );
815 
816                                 match res {
817                                     Ok(_) => {
818                                         // Acquired the notification
819                                         *state = Done;
820                                         return Poll::Ready(());
821                                     }
822                                     Err(actual) => {
823                                         assert_eq!(get_state(actual), EMPTY);
824                                         curr = actual;
825                                     }
826                                 }
827                             }
828                             _ => unreachable!(),
829                         }
830                     }
831 
832                     if waker.is_some() {
833                         // Safety: called while locked.
834                         unsafe {
835                             (*waiter.get()).waker = waker;
836                         }
837                     }
838 
839                     // Insert the waiter into the linked list
840                     //
841                     // safety: pointers from `UnsafeCell` are never null.
842                     waiters.push_front(unsafe { NonNull::new_unchecked(waiter.get()) });
843 
844                     *state = Waiting;
845 
846                     return Poll::Pending;
847                 }
848                 Waiting => {
849                     // Currently in the "Waiting" state, implying the caller has
850                     // a waiter stored in the waiter list (guarded by
851                     // `notify.waiters`). In order to access the waker fields,
852                     // we must hold the lock.
853 
854                     let waiters = notify.waiters.lock();
855 
856                     // Safety: called while locked
857                     let w = unsafe { &mut *waiter.get() };
858 
859                     if w.notified.is_some() {
860                         // Our waker has been notified. Reset the fields and
861                         // remove it from the list.
862                         w.waker = None;
863                         w.notified = None;
864 
865                         *state = Done;
866                     } else {
867                         // Update the waker, if necessary.
868                         if let Some(waker) = waker {
869                             let should_update = match w.waker.as_ref() {
870                                 Some(current_waker) => !current_waker.will_wake(waker),
871                                 None => true,
872                             };
873                             if should_update {
874                                 w.waker = Some(waker.clone());
875                             }
876                         }
877 
878                         return Poll::Pending;
879                     }
880 
881                     // Explicit drop of the lock to indicate the scope that the
882                     // lock is held. Because holding the lock is required to
883                     // ensure safe access to fields not held within the lock, it
884                     // is helpful to visualize the scope of the critical
885                     // section.
886                     drop(waiters);
887                 }
888                 Done => {
889                     return Poll::Ready(());
890                 }
891             }
892         }
893     }
894 }
895 
896 impl Future for Notified<'_> {
897     type Output = ();
898 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>899     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
900         self.poll_notified(Some(cx.waker()))
901     }
902 }
903 
904 impl Drop for Notified<'_> {
drop(&mut self)905     fn drop(&mut self) {
906         use State::*;
907 
908         // Safety: The type only transitions to a "Waiting" state when pinned.
909         let (notify, state, waiter) = unsafe { Pin::new_unchecked(self).project() };
910 
911         // This is where we ensure safety. The `Notified` value is being
912         // dropped, which means we must ensure that the waiter entry is no
913         // longer stored in the linked list.
914         if matches!(*state, Waiting) {
915             let mut waiters = notify.waiters.lock();
916             let mut notify_state = notify.state.load(SeqCst);
917 
918             // remove the entry from the list (if not already removed)
919             //
920             // safety: the waiter is only added to `waiters` by virtue of it
921             // being the only `LinkedList` available to the type.
922             unsafe { waiters.remove(NonNull::new_unchecked(waiter.get())) };
923 
924             if waiters.is_empty() && get_state(notify_state) == WAITING {
925                 notify_state = set_state(notify_state, EMPTY);
926                 notify.state.store(notify_state, SeqCst);
927             }
928 
929             // See if the node was notified but not received. In this case, if
930             // the notification was triggered via `notify_one`, it must be sent
931             // to the next waiter.
932             //
933             // Safety: with the entry removed from the linked list, there can be
934             // no concurrent access to the entry
935             if matches!(
936                 unsafe { (*waiter.get()).notified },
937                 Some(NotificationType::OneWaiter)
938             ) {
939                 if let Some(waker) = notify_locked(&mut waiters, &notify.state, notify_state) {
940                     drop(waiters);
941                     waker.wake();
942                 }
943             }
944         }
945     }
946 }
947 
948 /// # Safety
949 ///
950 /// `Waiter` is forced to be !Unpin.
951 unsafe impl linked_list::Link for Waiter {
952     type Handle = NonNull<Waiter>;
953     type Target = Waiter;
954 
as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter>955     fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
956         *handle
957     }
958 
from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter>959     unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
960         ptr
961     }
962 
pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>>963     unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
964         Waiter::addr_of_pointers(target)
965     }
966 }
967 
is_unpin<T: Unpin>()968 fn is_unpin<T: Unpin>() {}
969