1 // Allow `unreachable_pub` warnings when sync is not enabled
2 // due to the usage of `Notify` within the `rt` feature set.
3 // When this module is compiled with `sync` enabled we will warn on
4 // this lint. When `rt` is enabled we use `pub(crate)` which
5 // triggers this warning but it is safe to ignore in this case.
6 #![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
7
8 use crate::loom::sync::atomic::AtomicUsize;
9 use crate::loom::sync::Mutex;
10 use crate::util::linked_list::{self, LinkedList};
11 use crate::util::WakeList;
12
13 use std::cell::UnsafeCell;
14 use std::future::Future;
15 use std::marker::PhantomPinned;
16 use std::panic::{RefUnwindSafe, UnwindSafe};
17 use std::pin::Pin;
18 use std::ptr::NonNull;
19 use std::sync::atomic::Ordering::SeqCst;
20 use std::task::{Context, Poll, Waker};
21
22 type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
23
24 /// Notifies a single task to wake up.
25 ///
26 /// `Notify` provides a basic mechanism to notify a single task of an event.
27 /// `Notify` itself does not carry any data. Instead, it is to be used to signal
28 /// another task to perform an operation.
29 ///
30 /// A `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. The
31 /// [`notified().await`] method waits for a permit to become available, and
32 /// [`notify_one()`] sets a permit **if there currently are no available
33 /// permits**.
34 ///
35 /// The synchronization details of `Notify` are similar to
36 /// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`]
37 /// value contains a single permit. [`notified().await`] waits for the permit to
38 /// be made available, consumes the permit, and resumes. [`notify_one()`] sets
39 /// the permit, waking a pending task if there is one.
40 ///
41 /// If `notify_one()` is called **before** `notified().await`, then the next
42 /// call to `notified().await` will complete immediately, consuming the permit.
43 /// Any subsequent calls to `notified().await` will wait for a new permit.
44 ///
45 /// If `notify_one()` is called **multiple** times before `notified().await`,
46 /// only a **single** permit is stored. The next call to `notified().await` will
47 /// complete immediately, but the one after will wait for a new permit.
48 ///
49 /// # Examples
50 ///
51 /// Basic usage.
52 ///
53 /// ```
54 /// use tokio::sync::Notify;
55 /// use std::sync::Arc;
56 ///
57 /// #[tokio::main]
58 /// async fn main() {
59 /// let notify = Arc::new(Notify::new());
60 /// let notify2 = notify.clone();
61 ///
62 /// let handle = tokio::spawn(async move {
63 /// notify2.notified().await;
64 /// println!("received notification");
65 /// });
66 ///
67 /// println!("sending notification");
68 /// notify.notify_one();
69 ///
70 /// // Wait for task to receive notification.
71 /// handle.await.unwrap();
72 /// }
73 /// ```
74 ///
75 /// Unbound multi-producer single-consumer (mpsc) channel.
76 ///
77 /// No wakeups can be lost when using this channel because the call to
78 /// `notify_one()` will store a permit in the `Notify`, which the following call
79 /// to `notified()` will consume.
80 ///
81 /// ```
82 /// use tokio::sync::Notify;
83 ///
84 /// use std::collections::VecDeque;
85 /// use std::sync::Mutex;
86 ///
87 /// struct Channel<T> {
88 /// values: Mutex<VecDeque<T>>,
89 /// notify: Notify,
90 /// }
91 ///
92 /// impl<T> Channel<T> {
93 /// pub fn send(&self, value: T) {
94 /// self.values.lock().unwrap()
95 /// .push_back(value);
96 ///
97 /// // Notify the consumer a value is available
98 /// self.notify.notify_one();
99 /// }
100 ///
101 /// // This is a single-consumer channel, so several concurrent calls to
102 /// // `recv` are not allowed.
103 /// pub async fn recv(&self) -> T {
104 /// loop {
105 /// // Drain values
106 /// if let Some(value) = self.values.lock().unwrap().pop_front() {
107 /// return value;
108 /// }
109 ///
110 /// // Wait for values to be available
111 /// self.notify.notified().await;
112 /// }
113 /// }
114 /// }
115 /// ```
116 ///
117 /// Unbound multi-producer multi-consumer (mpmc) channel.
118 ///
119 /// The call to [`enable`] is important because otherwise if you have two
120 /// calls to `recv` and two calls to `send` in parallel, the following could
121 /// happen:
122 ///
123 /// 1. Both calls to `try_recv` return `None`.
124 /// 2. Both new elements are added to the vector.
125 /// 3. The `notify_one` method is called twice, adding only a single
126 /// permit to the `Notify`.
127 /// 4. Both calls to `recv` reach the `Notified` future. One of them
128 /// consumes the permit, and the other sleeps forever.
129 ///
130 /// By adding the `Notified` futures to the list by calling `enable` before
131 /// `try_recv`, the `notify_one` calls in step three would remove the
132 /// futures from the list and mark them notified instead of adding a permit
133 /// to the `Notify`. This ensures that both futures are woken.
134 ///
135 /// Notice that this failure can only happen if there are two concurrent calls
136 /// to `recv`. This is why the mpsc example above does not require a call to
137 /// `enable`.
138 ///
139 /// ```
140 /// use tokio::sync::Notify;
141 ///
142 /// use std::collections::VecDeque;
143 /// use std::sync::Mutex;
144 ///
145 /// struct Channel<T> {
146 /// messages: Mutex<VecDeque<T>>,
147 /// notify_on_sent: Notify,
148 /// }
149 ///
150 /// impl<T> Channel<T> {
151 /// pub fn send(&self, msg: T) {
152 /// let mut locked_queue = self.messages.lock().unwrap();
153 /// locked_queue.push_back(msg);
154 /// drop(locked_queue);
155 ///
156 /// // Send a notification to one of the calls currently
157 /// // waiting in a call to `recv`.
158 /// self.notify_on_sent.notify_one();
159 /// }
160 ///
161 /// pub fn try_recv(&self) -> Option<T> {
162 /// let mut locked_queue = self.messages.lock().unwrap();
163 /// locked_queue.pop_front()
164 /// }
165 ///
166 /// pub async fn recv(&self) -> T {
167 /// let future = self.notify_on_sent.notified();
168 /// tokio::pin!(future);
169 ///
170 /// loop {
171 /// // Make sure that no wakeup is lost if we get
172 /// // `None` from `try_recv`.
173 /// future.as_mut().enable();
174 ///
175 /// if let Some(msg) = self.try_recv() {
176 /// return msg;
177 /// }
178 ///
179 /// // Wait for a call to `notify_one`.
180 /// //
181 /// // This uses `.as_mut()` to avoid consuming the future,
182 /// // which lets us call `Pin::set` below.
183 /// future.as_mut().await;
184 ///
185 /// // Reset the future in case another call to
186 /// // `try_recv` got the message before us.
187 /// future.set(self.notify_on_sent.notified());
188 /// }
189 /// }
190 /// }
191 /// ```
192 ///
193 /// [park]: std::thread::park
194 /// [unpark]: std::thread::Thread::unpark
195 /// [`notified().await`]: Notify::notified()
196 /// [`notify_one()`]: Notify::notify_one()
197 /// [`enable`]: Notified::enable()
198 /// [`Semaphore`]: crate::sync::Semaphore
199 #[derive(Debug)]
200 pub struct Notify {
201 // This uses 2 bits to store one of `EMPTY`,
202 // `WAITING` or `NOTIFIED`. The rest of the bits
203 // are used to store the number of times `notify_waiters`
204 // was called.
205 state: AtomicUsize,
206 waiters: Mutex<WaitList>,
207 }
208
209 #[derive(Debug, Clone, Copy)]
210 enum NotificationType {
211 // Notification triggered by calling `notify_waiters`
212 AllWaiters,
213 // Notification triggered by calling `notify_one`
214 OneWaiter,
215 }
216
217 #[derive(Debug)]
218 struct Waiter {
219 /// Intrusive linked-list pointers.
220 pointers: linked_list::Pointers<Waiter>,
221
222 /// Waiting task's waker.
223 waker: Option<Waker>,
224
225 /// `true` if the notification has been assigned to this waiter.
226 notified: Option<NotificationType>,
227
228 /// Should not be `Unpin`.
229 _p: PhantomPinned,
230 }
231
232 generate_addr_of_methods! {
233 impl<> Waiter {
234 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
235 &self.pointers
236 }
237 }
238 }
239
240 /// Future returned from [`Notify::notified()`].
241 ///
242 /// This future is fused, so once it has completed, any future calls to poll
243 /// will immediately return `Poll::Ready`.
244 #[derive(Debug)]
245 pub struct Notified<'a> {
246 /// The `Notify` being received on.
247 notify: &'a Notify,
248
249 /// The current state of the receiving process.
250 state: State,
251
252 /// Entry in the waiter `LinkedList`.
253 waiter: UnsafeCell<Waiter>,
254 }
255
256 unsafe impl<'a> Send for Notified<'a> {}
257 unsafe impl<'a> Sync for Notified<'a> {}
258
259 #[derive(Debug)]
260 enum State {
261 Init(usize),
262 Waiting,
263 Done,
264 }
265
266 const NOTIFY_WAITERS_SHIFT: usize = 2;
267 const STATE_MASK: usize = (1 << NOTIFY_WAITERS_SHIFT) - 1;
268 const NOTIFY_WAITERS_CALLS_MASK: usize = !STATE_MASK;
269
270 /// Initial "idle" state.
271 const EMPTY: usize = 0;
272
273 /// One or more threads are currently waiting to be notified.
274 const WAITING: usize = 1;
275
276 /// Pending notification.
277 const NOTIFIED: usize = 2;
278
set_state(data: usize, state: usize) -> usize279 fn set_state(data: usize, state: usize) -> usize {
280 (data & NOTIFY_WAITERS_CALLS_MASK) | (state & STATE_MASK)
281 }
282
get_state(data: usize) -> usize283 fn get_state(data: usize) -> usize {
284 data & STATE_MASK
285 }
286
get_num_notify_waiters_calls(data: usize) -> usize287 fn get_num_notify_waiters_calls(data: usize) -> usize {
288 (data & NOTIFY_WAITERS_CALLS_MASK) >> NOTIFY_WAITERS_SHIFT
289 }
290
inc_num_notify_waiters_calls(data: usize) -> usize291 fn inc_num_notify_waiters_calls(data: usize) -> usize {
292 data + (1 << NOTIFY_WAITERS_SHIFT)
293 }
294
atomic_inc_num_notify_waiters_calls(data: &AtomicUsize)295 fn atomic_inc_num_notify_waiters_calls(data: &AtomicUsize) {
296 data.fetch_add(1 << NOTIFY_WAITERS_SHIFT, SeqCst);
297 }
298
299 impl Notify {
300 /// Create a new `Notify`, initialized without a permit.
301 ///
302 /// # Examples
303 ///
304 /// ```
305 /// use tokio::sync::Notify;
306 ///
307 /// let notify = Notify::new();
308 /// ```
new() -> Notify309 pub fn new() -> Notify {
310 Notify {
311 state: AtomicUsize::new(0),
312 waiters: Mutex::new(LinkedList::new()),
313 }
314 }
315
316 /// Create a new `Notify`, initialized without a permit.
317 ///
318 /// # Examples
319 ///
320 /// ```
321 /// use tokio::sync::Notify;
322 ///
323 /// static NOTIFY: Notify = Notify::const_new();
324 /// ```
325 #[cfg(all(feature = "parking_lot", not(all(loom, test))))]
326 #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))]
const_new() -> Notify327 pub const fn const_new() -> Notify {
328 Notify {
329 state: AtomicUsize::new(0),
330 waiters: Mutex::const_new(LinkedList::new()),
331 }
332 }
333
334 /// Wait for a notification.
335 ///
336 /// Equivalent to:
337 ///
338 /// ```ignore
339 /// async fn notified(&self);
340 /// ```
341 ///
342 /// Each `Notify` value holds a single permit. If a permit is available from
343 /// an earlier call to [`notify_one()`], then `notified().await` will complete
344 /// immediately, consuming that permit. Otherwise, `notified().await` waits
345 /// for a permit to be made available by the next call to `notify_one()`.
346 ///
347 /// The `Notified` future is not guaranteed to receive wakeups from calls to
348 /// `notify_one()` if it has not yet been polled. See the documentation for
349 /// [`Notified::enable()`] for more details.
350 ///
351 /// The `Notified` future is guaranteed to receive wakeups from
352 /// `notify_waiters()` as soon as it has been created, even if it has not
353 /// yet been polled.
354 ///
355 /// [`notify_one()`]: Notify::notify_one
356 /// [`Notified::enable()`]: Notified::enable
357 ///
358 /// # Cancel safety
359 ///
360 /// This method uses a queue to fairly distribute notifications in the order
361 /// they were requested. Cancelling a call to `notified` makes you lose your
362 /// place in the queue.
363 ///
364 /// # Examples
365 ///
366 /// ```
367 /// use tokio::sync::Notify;
368 /// use std::sync::Arc;
369 ///
370 /// #[tokio::main]
371 /// async fn main() {
372 /// let notify = Arc::new(Notify::new());
373 /// let notify2 = notify.clone();
374 ///
375 /// tokio::spawn(async move {
376 /// notify2.notified().await;
377 /// println!("received notification");
378 /// });
379 ///
380 /// println!("sending notification");
381 /// notify.notify_one();
382 /// }
383 /// ```
notified(&self) -> Notified<'_>384 pub fn notified(&self) -> Notified<'_> {
385 // we load the number of times notify_waiters
386 // was called and store that in our initial state
387 let state = self.state.load(SeqCst);
388 Notified {
389 notify: self,
390 state: State::Init(state >> NOTIFY_WAITERS_SHIFT),
391 waiter: UnsafeCell::new(Waiter {
392 pointers: linked_list::Pointers::new(),
393 waker: None,
394 notified: None,
395 _p: PhantomPinned,
396 }),
397 }
398 }
399
400 /// Notifies a waiting task.
401 ///
402 /// If a task is currently waiting, that task is notified. Otherwise, a
403 /// permit is stored in this `Notify` value and the **next** call to
404 /// [`notified().await`] will complete immediately consuming the permit made
405 /// available by this call to `notify_one()`.
406 ///
407 /// At most one permit may be stored by `Notify`. Many sequential calls to
408 /// `notify_one` will result in a single permit being stored. The next call to
409 /// `notified().await` will complete immediately, but the one after that
410 /// will wait.
411 ///
412 /// [`notified().await`]: Notify::notified()
413 ///
414 /// # Examples
415 ///
416 /// ```
417 /// use tokio::sync::Notify;
418 /// use std::sync::Arc;
419 ///
420 /// #[tokio::main]
421 /// async fn main() {
422 /// let notify = Arc::new(Notify::new());
423 /// let notify2 = notify.clone();
424 ///
425 /// tokio::spawn(async move {
426 /// notify2.notified().await;
427 /// println!("received notification");
428 /// });
429 ///
430 /// println!("sending notification");
431 /// notify.notify_one();
432 /// }
433 /// ```
434 // Alias for old name in 0.x
435 #[cfg_attr(docsrs, doc(alias = "notify"))]
notify_one(&self)436 pub fn notify_one(&self) {
437 // Load the current state
438 let mut curr = self.state.load(SeqCst);
439
440 // If the state is `EMPTY`, transition to `NOTIFIED` and return.
441 while let EMPTY | NOTIFIED = get_state(curr) {
442 // The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A
443 // happens-before synchronization must happen between this atomic
444 // operation and a task calling `notified().await`.
445 let new = set_state(curr, NOTIFIED);
446 let res = self.state.compare_exchange(curr, new, SeqCst, SeqCst);
447
448 match res {
449 // No waiters, no further work to do
450 Ok(_) => return,
451 Err(actual) => {
452 curr = actual;
453 }
454 }
455 }
456
457 // There are waiters, the lock must be acquired to notify.
458 let mut waiters = self.waiters.lock();
459
460 // The state must be reloaded while the lock is held. The state may only
461 // transition out of WAITING while the lock is held.
462 curr = self.state.load(SeqCst);
463
464 if let Some(waker) = notify_locked(&mut waiters, &self.state, curr) {
465 drop(waiters);
466 waker.wake();
467 }
468 }
469
470 /// Notifies all waiting tasks.
471 ///
472 /// If a task is currently waiting, that task is notified. Unlike with
473 /// `notify_one()`, no permit is stored to be used by the next call to
474 /// `notified().await`. The purpose of this method is to notify all
475 /// already registered waiters. Registering for notification is done by
476 /// acquiring an instance of the `Notified` future via calling `notified()`.
477 ///
478 /// # Examples
479 ///
480 /// ```
481 /// use tokio::sync::Notify;
482 /// use std::sync::Arc;
483 ///
484 /// #[tokio::main]
485 /// async fn main() {
486 /// let notify = Arc::new(Notify::new());
487 /// let notify2 = notify.clone();
488 ///
489 /// let notified1 = notify.notified();
490 /// let notified2 = notify.notified();
491 ///
492 /// let handle = tokio::spawn(async move {
493 /// println!("sending notifications");
494 /// notify2.notify_waiters();
495 /// });
496 ///
497 /// notified1.await;
498 /// notified2.await;
499 /// println!("received notifications");
500 /// }
501 /// ```
notify_waiters(&self)502 pub fn notify_waiters(&self) {
503 let mut wakers = WakeList::new();
504
505 // There are waiters, the lock must be acquired to notify.
506 let mut waiters = self.waiters.lock();
507
508 // The state must be reloaded while the lock is held. The state may only
509 // transition out of WAITING while the lock is held.
510 let curr = self.state.load(SeqCst);
511
512 if matches!(get_state(curr), EMPTY | NOTIFIED) {
513 // There are no waiting tasks. All we need to do is increment the
514 // number of times this method was called.
515 atomic_inc_num_notify_waiters_calls(&self.state);
516 return;
517 }
518
519 // At this point, it is guaranteed that the state will not
520 // concurrently change, as holding the lock is required to
521 // transition **out** of `WAITING`.
522 'outer: loop {
523 while wakers.can_push() {
524 match waiters.pop_back() {
525 Some(mut waiter) => {
526 // Safety: `waiters` lock is still held.
527 let waiter = unsafe { waiter.as_mut() };
528
529 assert!(waiter.notified.is_none());
530
531 waiter.notified = Some(NotificationType::AllWaiters);
532
533 if let Some(waker) = waiter.waker.take() {
534 wakers.push(waker);
535 }
536 }
537 None => {
538 break 'outer;
539 }
540 }
541 }
542
543 drop(waiters);
544
545 wakers.wake_all();
546
547 // Acquire the lock again.
548 waiters = self.waiters.lock();
549 }
550
551 // All waiters will be notified, the state must be transitioned to
552 // `EMPTY`. As transitioning **from** `WAITING` requires the lock to be
553 // held, a `store` is sufficient.
554 let new = set_state(inc_num_notify_waiters_calls(curr), EMPTY);
555 self.state.store(new, SeqCst);
556
557 // Release the lock before notifying
558 drop(waiters);
559
560 wakers.wake_all();
561 }
562 }
563
564 impl Default for Notify {
default() -> Notify565 fn default() -> Notify {
566 Notify::new()
567 }
568 }
569
570 impl UnwindSafe for Notify {}
571 impl RefUnwindSafe for Notify {}
572
notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Option<Waker>573 fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Option<Waker> {
574 loop {
575 match get_state(curr) {
576 EMPTY | NOTIFIED => {
577 let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst);
578
579 match res {
580 Ok(_) => return None,
581 Err(actual) => {
582 let actual_state = get_state(actual);
583 assert!(actual_state == EMPTY || actual_state == NOTIFIED);
584 state.store(set_state(actual, NOTIFIED), SeqCst);
585 return None;
586 }
587 }
588 }
589 WAITING => {
590 // At this point, it is guaranteed that the state will not
591 // concurrently change as holding the lock is required to
592 // transition **out** of `WAITING`.
593 //
594 // Get a pending waiter
595 let mut waiter = waiters.pop_back().unwrap();
596
597 // Safety: `waiters` lock is still held.
598 let waiter = unsafe { waiter.as_mut() };
599
600 assert!(waiter.notified.is_none());
601
602 waiter.notified = Some(NotificationType::OneWaiter);
603 let waker = waiter.waker.take();
604
605 if waiters.is_empty() {
606 // As this the **final** waiter in the list, the state
607 // must be transitioned to `EMPTY`. As transitioning
608 // **from** `WAITING` requires the lock to be held, a
609 // `store` is sufficient.
610 state.store(set_state(curr, EMPTY), SeqCst);
611 }
612
613 return waker;
614 }
615 _ => unreachable!(),
616 }
617 }
618 }
619
620 // ===== impl Notified =====
621
622 impl Notified<'_> {
623 /// Adds this future to the list of futures that are ready to receive
624 /// wakeups from calls to [`notify_one`].
625 ///
626 /// Polling the future also adds it to the list, so this method should only
627 /// be used if you want to add the future to the list before the first call
628 /// to `poll`. (In fact, this method is equivalent to calling `poll` except
629 /// that no `Waker` is registered.)
630 ///
631 /// This has no effect on notifications sent using [`notify_waiters`], which
632 /// are received as long as they happen after the creation of the `Notified`
633 /// regardless of whether `enable` or `poll` has been called.
634 ///
635 /// This method returns true if the `Notified` is ready. This happens in the
636 /// following situations:
637 ///
638 /// 1. The `notify_waiters` method was called between the creation of the
639 /// `Notified` and the call to this method.
640 /// 2. This is the first call to `enable` or `poll` on this future, and the
641 /// `Notify` was holding a permit from a previous call to `notify_one`.
642 /// The call consumes the permit in that case.
643 /// 3. The future has previously been enabled or polled, and it has since
644 /// then been marked ready by either consuming a permit from the
645 /// `Notify`, or by a call to `notify_one` or `notify_waiters` that
646 /// removed it from the list of futures ready to receive wakeups.
647 ///
648 /// If this method returns true, any future calls to poll on the same future
649 /// will immediately return `Poll::Ready`.
650 ///
651 /// # Examples
652 ///
653 /// Unbound multi-producer multi-consumer (mpmc) channel.
654 ///
655 /// The call to `enable` is important because otherwise if you have two
656 /// calls to `recv` and two calls to `send` in parallel, the following could
657 /// happen:
658 ///
659 /// 1. Both calls to `try_recv` return `None`.
660 /// 2. Both new elements are added to the vector.
661 /// 3. The `notify_one` method is called twice, adding only a single
662 /// permit to the `Notify`.
663 /// 4. Both calls to `recv` reach the `Notified` future. One of them
664 /// consumes the permit, and the other sleeps forever.
665 ///
666 /// By adding the `Notified` futures to the list by calling `enable` before
667 /// `try_recv`, the `notify_one` calls in step three would remove the
668 /// futures from the list and mark them notified instead of adding a permit
669 /// to the `Notify`. This ensures that both futures are woken.
670 ///
671 /// ```
672 /// use tokio::sync::Notify;
673 ///
674 /// use std::collections::VecDeque;
675 /// use std::sync::Mutex;
676 ///
677 /// struct Channel<T> {
678 /// messages: Mutex<VecDeque<T>>,
679 /// notify_on_sent: Notify,
680 /// }
681 ///
682 /// impl<T> Channel<T> {
683 /// pub fn send(&self, msg: T) {
684 /// let mut locked_queue = self.messages.lock().unwrap();
685 /// locked_queue.push_back(msg);
686 /// drop(locked_queue);
687 ///
688 /// // Send a notification to one of the calls currently
689 /// // waiting in a call to `recv`.
690 /// self.notify_on_sent.notify_one();
691 /// }
692 ///
693 /// pub fn try_recv(&self) -> Option<T> {
694 /// let mut locked_queue = self.messages.lock().unwrap();
695 /// locked_queue.pop_front()
696 /// }
697 ///
698 /// pub async fn recv(&self) -> T {
699 /// let future = self.notify_on_sent.notified();
700 /// tokio::pin!(future);
701 ///
702 /// loop {
703 /// // Make sure that no wakeup is lost if we get
704 /// // `None` from `try_recv`.
705 /// future.as_mut().enable();
706 ///
707 /// if let Some(msg) = self.try_recv() {
708 /// return msg;
709 /// }
710 ///
711 /// // Wait for a call to `notify_one`.
712 /// //
713 /// // This uses `.as_mut()` to avoid consuming the future,
714 /// // which lets us call `Pin::set` below.
715 /// future.as_mut().await;
716 ///
717 /// // Reset the future in case another call to
718 /// // `try_recv` got the message before us.
719 /// future.set(self.notify_on_sent.notified());
720 /// }
721 /// }
722 /// }
723 /// ```
724 ///
725 /// [`notify_one`]: Notify::notify_one()
726 /// [`notify_waiters`]: Notify::notify_waiters()
enable(self: Pin<&mut Self>) -> bool727 pub fn enable(self: Pin<&mut Self>) -> bool {
728 self.poll_notified(None).is_ready()
729 }
730
731 /// A custom `project` implementation is used in place of `pin-project-lite`
732 /// as a custom drop implementation is needed.
project(self: Pin<&mut Self>) -> (&Notify, &mut State, &UnsafeCell<Waiter>)733 fn project(self: Pin<&mut Self>) -> (&Notify, &mut State, &UnsafeCell<Waiter>) {
734 unsafe {
735 // Safety: both `notify` and `state` are `Unpin`.
736
737 is_unpin::<&Notify>();
738 is_unpin::<AtomicUsize>();
739
740 let me = self.get_unchecked_mut();
741 (me.notify, &mut me.state, &me.waiter)
742 }
743 }
744
poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()>745 fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> {
746 use State::*;
747
748 let (notify, state, waiter) = self.project();
749
750 loop {
751 match *state {
752 Init(initial_notify_waiters_calls) => {
753 let curr = notify.state.load(SeqCst);
754
755 // Optimistically try acquiring a pending notification
756 let res = notify.state.compare_exchange(
757 set_state(curr, NOTIFIED),
758 set_state(curr, EMPTY),
759 SeqCst,
760 SeqCst,
761 );
762
763 if res.is_ok() {
764 // Acquired the notification
765 *state = Done;
766 return Poll::Ready(());
767 }
768
769 // Clone the waker before locking, a waker clone can be
770 // triggering arbitrary code.
771 let waker = waker.cloned();
772
773 // Acquire the lock and attempt to transition to the waiting
774 // state.
775 let mut waiters = notify.waiters.lock();
776
777 // Reload the state with the lock held
778 let mut curr = notify.state.load(SeqCst);
779
780 // if notify_waiters has been called after the future
781 // was created, then we are done
782 if get_num_notify_waiters_calls(curr) != initial_notify_waiters_calls {
783 *state = Done;
784 return Poll::Ready(());
785 }
786
787 // Transition the state to WAITING.
788 loop {
789 match get_state(curr) {
790 EMPTY => {
791 // Transition to WAITING
792 let res = notify.state.compare_exchange(
793 set_state(curr, EMPTY),
794 set_state(curr, WAITING),
795 SeqCst,
796 SeqCst,
797 );
798
799 if let Err(actual) = res {
800 assert_eq!(get_state(actual), NOTIFIED);
801 curr = actual;
802 } else {
803 break;
804 }
805 }
806 WAITING => break,
807 NOTIFIED => {
808 // Try consuming the notification
809 let res = notify.state.compare_exchange(
810 set_state(curr, NOTIFIED),
811 set_state(curr, EMPTY),
812 SeqCst,
813 SeqCst,
814 );
815
816 match res {
817 Ok(_) => {
818 // Acquired the notification
819 *state = Done;
820 return Poll::Ready(());
821 }
822 Err(actual) => {
823 assert_eq!(get_state(actual), EMPTY);
824 curr = actual;
825 }
826 }
827 }
828 _ => unreachable!(),
829 }
830 }
831
832 if waker.is_some() {
833 // Safety: called while locked.
834 unsafe {
835 (*waiter.get()).waker = waker;
836 }
837 }
838
839 // Insert the waiter into the linked list
840 //
841 // safety: pointers from `UnsafeCell` are never null.
842 waiters.push_front(unsafe { NonNull::new_unchecked(waiter.get()) });
843
844 *state = Waiting;
845
846 return Poll::Pending;
847 }
848 Waiting => {
849 // Currently in the "Waiting" state, implying the caller has
850 // a waiter stored in the waiter list (guarded by
851 // `notify.waiters`). In order to access the waker fields,
852 // we must hold the lock.
853
854 let waiters = notify.waiters.lock();
855
856 // Safety: called while locked
857 let w = unsafe { &mut *waiter.get() };
858
859 if w.notified.is_some() {
860 // Our waker has been notified. Reset the fields and
861 // remove it from the list.
862 w.waker = None;
863 w.notified = None;
864
865 *state = Done;
866 } else {
867 // Update the waker, if necessary.
868 if let Some(waker) = waker {
869 let should_update = match w.waker.as_ref() {
870 Some(current_waker) => !current_waker.will_wake(waker),
871 None => true,
872 };
873 if should_update {
874 w.waker = Some(waker.clone());
875 }
876 }
877
878 return Poll::Pending;
879 }
880
881 // Explicit drop of the lock to indicate the scope that the
882 // lock is held. Because holding the lock is required to
883 // ensure safe access to fields not held within the lock, it
884 // is helpful to visualize the scope of the critical
885 // section.
886 drop(waiters);
887 }
888 Done => {
889 return Poll::Ready(());
890 }
891 }
892 }
893 }
894 }
895
896 impl Future for Notified<'_> {
897 type Output = ();
898
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>899 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
900 self.poll_notified(Some(cx.waker()))
901 }
902 }
903
904 impl Drop for Notified<'_> {
drop(&mut self)905 fn drop(&mut self) {
906 use State::*;
907
908 // Safety: The type only transitions to a "Waiting" state when pinned.
909 let (notify, state, waiter) = unsafe { Pin::new_unchecked(self).project() };
910
911 // This is where we ensure safety. The `Notified` value is being
912 // dropped, which means we must ensure that the waiter entry is no
913 // longer stored in the linked list.
914 if matches!(*state, Waiting) {
915 let mut waiters = notify.waiters.lock();
916 let mut notify_state = notify.state.load(SeqCst);
917
918 // remove the entry from the list (if not already removed)
919 //
920 // safety: the waiter is only added to `waiters` by virtue of it
921 // being the only `LinkedList` available to the type.
922 unsafe { waiters.remove(NonNull::new_unchecked(waiter.get())) };
923
924 if waiters.is_empty() && get_state(notify_state) == WAITING {
925 notify_state = set_state(notify_state, EMPTY);
926 notify.state.store(notify_state, SeqCst);
927 }
928
929 // See if the node was notified but not received. In this case, if
930 // the notification was triggered via `notify_one`, it must be sent
931 // to the next waiter.
932 //
933 // Safety: with the entry removed from the linked list, there can be
934 // no concurrent access to the entry
935 if matches!(
936 unsafe { (*waiter.get()).notified },
937 Some(NotificationType::OneWaiter)
938 ) {
939 if let Some(waker) = notify_locked(&mut waiters, ¬ify.state, notify_state) {
940 drop(waiters);
941 waker.wake();
942 }
943 }
944 }
945 }
946 }
947
948 /// # Safety
949 ///
950 /// `Waiter` is forced to be !Unpin.
951 unsafe impl linked_list::Link for Waiter {
952 type Handle = NonNull<Waiter>;
953 type Target = Waiter;
954
as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter>955 fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
956 *handle
957 }
958
from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter>959 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
960 ptr
961 }
962
pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>>963 unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
964 Waiter::addr_of_pointers(target)
965 }
966 }
967
is_unpin<T: Unpin>()968 fn is_unpin<T: Unpin>() {}
969