1 // Allow `unreachable_pub` warnings when sync is not enabled
2 // due to the usage of `Notify` within the `rt` feature set.
3 // When this module is compiled with `sync` enabled we will warn on
4 // this lint. When `rt` is enabled we use `pub(crate)` which
5 // triggers this warning but it is safe to ignore in this case.
6 #![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
7
8 use crate::loom::sync::atomic::AtomicUsize;
9 use crate::loom::sync::Mutex;
10 use crate::util::linked_list::{self, LinkedList};
11 use crate::util::WakeList;
12
13 use std::cell::UnsafeCell;
14 use std::future::Future;
15 use std::marker::PhantomPinned;
16 use std::pin::Pin;
17 use std::ptr::NonNull;
18 use std::sync::atomic::Ordering::SeqCst;
19 use std::task::{Context, Poll, Waker};
20
21 type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
22
23 /// Notifies a single task to wake up.
24 ///
25 /// `Notify` provides a basic mechanism to notify a single task of an event.
26 /// `Notify` itself does not carry any data. Instead, it is to be used to signal
27 /// another task to perform an operation.
28 ///
29 /// `Notify` can be thought of as a [`Semaphore`] starting with 0 permits.
30 /// [`notified().await`] waits for a permit to become available, and [`notify_one()`]
31 /// sets a permit **if there currently are no available permits**.
32 ///
33 /// The synchronization details of `Notify` are similar to
34 /// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`]
35 /// value contains a single permit. [`notified().await`] waits for the permit to
36 /// be made available, consumes the permit, and resumes. [`notify_one()`] sets the
37 /// permit, waking a pending task if there is one.
38 ///
39 /// If `notify_one()` is called **before** `notified().await`, then the next call to
40 /// `notified().await` will complete immediately, consuming the permit. Any
41 /// subsequent calls to `notified().await` will wait for a new permit.
42 ///
43 /// If `notify_one()` is called **multiple** times before `notified().await`, only a
44 /// **single** permit is stored. The next call to `notified().await` will
45 /// complete immediately, but the one after will wait for a new permit.
46 ///
47 /// # Examples
48 ///
49 /// Basic usage.
50 ///
51 /// ```
52 /// use tokio::sync::Notify;
53 /// use std::sync::Arc;
54 ///
55 /// #[tokio::main]
56 /// async fn main() {
57 /// let notify = Arc::new(Notify::new());
58 /// let notify2 = notify.clone();
59 ///
60 /// let handle = tokio::spawn(async move {
61 /// notify2.notified().await;
62 /// println!("received notification");
63 /// });
64 ///
65 /// println!("sending notification");
66 /// notify.notify_one();
67 ///
68 /// // Wait for task to receive notification.
69 /// handle.await.unwrap();
70 /// }
71 /// ```
72 ///
73 /// Unbound mpsc channel.
74 ///
75 /// ```
76 /// use tokio::sync::Notify;
77 ///
78 /// use std::collections::VecDeque;
79 /// use std::sync::Mutex;
80 ///
81 /// struct Channel<T> {
82 /// values: Mutex<VecDeque<T>>,
83 /// notify: Notify,
84 /// }
85 ///
86 /// impl<T> Channel<T> {
87 /// pub fn send(&self, value: T) {
88 /// self.values.lock().unwrap()
89 /// .push_back(value);
90 ///
91 /// // Notify the consumer a value is available
92 /// self.notify.notify_one();
93 /// }
94 ///
95 /// pub async fn recv(&self) -> T {
96 /// loop {
97 /// // Drain values
98 /// if let Some(value) = self.values.lock().unwrap().pop_front() {
99 /// return value;
100 /// }
101 ///
102 /// // Wait for values to be available
103 /// self.notify.notified().await;
104 /// }
105 /// }
106 /// }
107 /// ```
108 ///
109 /// [park]: std::thread::park
110 /// [unpark]: std::thread::Thread::unpark
111 /// [`notified().await`]: Notify::notified()
112 /// [`notify_one()`]: Notify::notify_one()
113 /// [`Semaphore`]: crate::sync::Semaphore
114 #[derive(Debug)]
115 pub struct Notify {
116 // This uses 2 bits to store one of `EMPTY`,
117 // `WAITING` or `NOTIFIED`. The rest of the bits
118 // are used to store the number of times `notify_waiters`
119 // was called.
120 state: AtomicUsize,
121 waiters: Mutex<WaitList>,
122 }
123
124 #[derive(Debug, Clone, Copy)]
125 enum NotificationType {
126 // Notification triggered by calling `notify_waiters`
127 AllWaiters,
128 // Notification triggered by calling `notify_one`
129 OneWaiter,
130 }
131
132 #[derive(Debug)]
133 struct Waiter {
134 /// Intrusive linked-list pointers.
135 pointers: linked_list::Pointers<Waiter>,
136
137 /// Waiting task's waker.
138 waker: Option<Waker>,
139
140 /// `true` if the notification has been assigned to this waiter.
141 notified: Option<NotificationType>,
142
143 /// Should not be `Unpin`.
144 _p: PhantomPinned,
145 }
146
147 /// Future returned from [`Notify::notified()`]
148 #[derive(Debug)]
149 pub struct Notified<'a> {
150 /// The `Notify` being received on.
151 notify: &'a Notify,
152
153 /// The current state of the receiving process.
154 state: State,
155
156 /// Entry in the waiter `LinkedList`.
157 waiter: UnsafeCell<Waiter>,
158 }
159
160 unsafe impl<'a> Send for Notified<'a> {}
161 unsafe impl<'a> Sync for Notified<'a> {}
162
163 #[derive(Debug)]
164 enum State {
165 Init(usize),
166 Waiting,
167 Done,
168 }
169
170 const NOTIFY_WAITERS_SHIFT: usize = 2;
171 const STATE_MASK: usize = (1 << NOTIFY_WAITERS_SHIFT) - 1;
172 const NOTIFY_WAITERS_CALLS_MASK: usize = !STATE_MASK;
173
174 /// Initial "idle" state.
175 const EMPTY: usize = 0;
176
177 /// One or more threads are currently waiting to be notified.
178 const WAITING: usize = 1;
179
180 /// Pending notification.
181 const NOTIFIED: usize = 2;
182
set_state(data: usize, state: usize) -> usize183 fn set_state(data: usize, state: usize) -> usize {
184 (data & NOTIFY_WAITERS_CALLS_MASK) | (state & STATE_MASK)
185 }
186
get_state(data: usize) -> usize187 fn get_state(data: usize) -> usize {
188 data & STATE_MASK
189 }
190
get_num_notify_waiters_calls(data: usize) -> usize191 fn get_num_notify_waiters_calls(data: usize) -> usize {
192 (data & NOTIFY_WAITERS_CALLS_MASK) >> NOTIFY_WAITERS_SHIFT
193 }
194
inc_num_notify_waiters_calls(data: usize) -> usize195 fn inc_num_notify_waiters_calls(data: usize) -> usize {
196 data + (1 << NOTIFY_WAITERS_SHIFT)
197 }
198
atomic_inc_num_notify_waiters_calls(data: &AtomicUsize)199 fn atomic_inc_num_notify_waiters_calls(data: &AtomicUsize) {
200 data.fetch_add(1 << NOTIFY_WAITERS_SHIFT, SeqCst);
201 }
202
203 impl Notify {
204 /// Create a new `Notify`, initialized without a permit.
205 ///
206 /// # Examples
207 ///
208 /// ```
209 /// use tokio::sync::Notify;
210 ///
211 /// let notify = Notify::new();
212 /// ```
new() -> Notify213 pub fn new() -> Notify {
214 Notify {
215 state: AtomicUsize::new(0),
216 waiters: Mutex::new(LinkedList::new()),
217 }
218 }
219
220 /// Create a new `Notify`, initialized without a permit.
221 ///
222 /// # Examples
223 ///
224 /// ```
225 /// use tokio::sync::Notify;
226 ///
227 /// static NOTIFY: Notify = Notify::const_new();
228 /// ```
229 #[cfg(all(feature = "parking_lot", not(all(loom, test))))]
230 #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))]
const_new() -> Notify231 pub const fn const_new() -> Notify {
232 Notify {
233 state: AtomicUsize::new(0),
234 waiters: Mutex::const_new(LinkedList::new()),
235 }
236 }
237
238 /// Wait for a notification.
239 ///
240 /// Equivalent to:
241 ///
242 /// ```ignore
243 /// async fn notified(&self);
244 /// ```
245 ///
246 /// Each `Notify` value holds a single permit. If a permit is available from
247 /// an earlier call to [`notify_one()`], then `notified().await` will complete
248 /// immediately, consuming that permit. Otherwise, `notified().await` waits
249 /// for a permit to be made available by the next call to `notify_one()`.
250 ///
251 /// [`notify_one()`]: Notify::notify_one
252 ///
253 /// # Cancel safety
254 ///
255 /// This method uses a queue to fairly distribute notifications in the order
256 /// they were requested. Cancelling a call to `notified` makes you lose your
257 /// place in the queue.
258 ///
259 /// # Examples
260 ///
261 /// ```
262 /// use tokio::sync::Notify;
263 /// use std::sync::Arc;
264 ///
265 /// #[tokio::main]
266 /// async fn main() {
267 /// let notify = Arc::new(Notify::new());
268 /// let notify2 = notify.clone();
269 ///
270 /// tokio::spawn(async move {
271 /// notify2.notified().await;
272 /// println!("received notification");
273 /// });
274 ///
275 /// println!("sending notification");
276 /// notify.notify_one();
277 /// }
278 /// ```
notified(&self) -> Notified<'_>279 pub fn notified(&self) -> Notified<'_> {
280 // we load the number of times notify_waiters
281 // was called and store that in our initial state
282 let state = self.state.load(SeqCst);
283 Notified {
284 notify: self,
285 state: State::Init(state >> NOTIFY_WAITERS_SHIFT),
286 waiter: UnsafeCell::new(Waiter {
287 pointers: linked_list::Pointers::new(),
288 waker: None,
289 notified: None,
290 _p: PhantomPinned,
291 }),
292 }
293 }
294
295 /// Notifies a waiting task.
296 ///
297 /// If a task is currently waiting, that task is notified. Otherwise, a
298 /// permit is stored in this `Notify` value and the **next** call to
299 /// [`notified().await`] will complete immediately consuming the permit made
300 /// available by this call to `notify_one()`.
301 ///
302 /// At most one permit may be stored by `Notify`. Many sequential calls to
303 /// `notify_one` will result in a single permit being stored. The next call to
304 /// `notified().await` will complete immediately, but the one after that
305 /// will wait.
306 ///
307 /// [`notified().await`]: Notify::notified()
308 ///
309 /// # Examples
310 ///
311 /// ```
312 /// use tokio::sync::Notify;
313 /// use std::sync::Arc;
314 ///
315 /// #[tokio::main]
316 /// async fn main() {
317 /// let notify = Arc::new(Notify::new());
318 /// let notify2 = notify.clone();
319 ///
320 /// tokio::spawn(async move {
321 /// notify2.notified().await;
322 /// println!("received notification");
323 /// });
324 ///
325 /// println!("sending notification");
326 /// notify.notify_one();
327 /// }
328 /// ```
329 // Alias for old name in 0.x
330 #[cfg_attr(docsrs, doc(alias = "notify"))]
notify_one(&self)331 pub fn notify_one(&self) {
332 // Load the current state
333 let mut curr = self.state.load(SeqCst);
334
335 // If the state is `EMPTY`, transition to `NOTIFIED` and return.
336 while let EMPTY | NOTIFIED = get_state(curr) {
337 // The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A
338 // happens-before synchronization must happen between this atomic
339 // operation and a task calling `notified().await`.
340 let new = set_state(curr, NOTIFIED);
341 let res = self.state.compare_exchange(curr, new, SeqCst, SeqCst);
342
343 match res {
344 // No waiters, no further work to do
345 Ok(_) => return,
346 Err(actual) => {
347 curr = actual;
348 }
349 }
350 }
351
352 // There are waiters, the lock must be acquired to notify.
353 let mut waiters = self.waiters.lock();
354
355 // The state must be reloaded while the lock is held. The state may only
356 // transition out of WAITING while the lock is held.
357 curr = self.state.load(SeqCst);
358
359 if let Some(waker) = notify_locked(&mut waiters, &self.state, curr) {
360 drop(waiters);
361 waker.wake();
362 }
363 }
364
365 /// Notifies all waiting tasks.
366 ///
367 /// If a task is currently waiting, that task is notified. Unlike with
368 /// `notify_one()`, no permit is stored to be used by the next call to
369 /// `notified().await`. The purpose of this method is to notify all
370 /// already registered waiters. Registering for notification is done by
371 /// acquiring an instance of the `Notified` future via calling `notified()`.
372 ///
373 /// # Examples
374 ///
375 /// ```
376 /// use tokio::sync::Notify;
377 /// use std::sync::Arc;
378 ///
379 /// #[tokio::main]
380 /// async fn main() {
381 /// let notify = Arc::new(Notify::new());
382 /// let notify2 = notify.clone();
383 ///
384 /// let notified1 = notify.notified();
385 /// let notified2 = notify.notified();
386 ///
387 /// let handle = tokio::spawn(async move {
388 /// println!("sending notifications");
389 /// notify2.notify_waiters();
390 /// });
391 ///
392 /// notified1.await;
393 /// notified2.await;
394 /// println!("received notifications");
395 /// }
396 /// ```
notify_waiters(&self)397 pub fn notify_waiters(&self) {
398 let mut wakers = WakeList::new();
399
400 // There are waiters, the lock must be acquired to notify.
401 let mut waiters = self.waiters.lock();
402
403 // The state must be reloaded while the lock is held. The state may only
404 // transition out of WAITING while the lock is held.
405 let curr = self.state.load(SeqCst);
406
407 if let EMPTY | NOTIFIED = get_state(curr) {
408 // There are no waiting tasks. All we need to do is increment the
409 // number of times this method was called.
410 atomic_inc_num_notify_waiters_calls(&self.state);
411 return;
412 }
413
414 // At this point, it is guaranteed that the state will not
415 // concurrently change, as holding the lock is required to
416 // transition **out** of `WAITING`.
417 'outer: loop {
418 while wakers.can_push() {
419 match waiters.pop_back() {
420 Some(mut waiter) => {
421 // Safety: `waiters` lock is still held.
422 let waiter = unsafe { waiter.as_mut() };
423
424 assert!(waiter.notified.is_none());
425
426 waiter.notified = Some(NotificationType::AllWaiters);
427
428 if let Some(waker) = waiter.waker.take() {
429 wakers.push(waker);
430 }
431 }
432 None => {
433 break 'outer;
434 }
435 }
436 }
437
438 drop(waiters);
439
440 wakers.wake_all();
441
442 // Acquire the lock again.
443 waiters = self.waiters.lock();
444 }
445
446 // All waiters will be notified, the state must be transitioned to
447 // `EMPTY`. As transitioning **from** `WAITING` requires the lock to be
448 // held, a `store` is sufficient.
449 let new = set_state(inc_num_notify_waiters_calls(curr), EMPTY);
450 self.state.store(new, SeqCst);
451
452 // Release the lock before notifying
453 drop(waiters);
454
455 wakers.wake_all();
456 }
457 }
458
459 impl Default for Notify {
default() -> Notify460 fn default() -> Notify {
461 Notify::new()
462 }
463 }
464
notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Option<Waker>465 fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Option<Waker> {
466 loop {
467 match get_state(curr) {
468 EMPTY | NOTIFIED => {
469 let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst);
470
471 match res {
472 Ok(_) => return None,
473 Err(actual) => {
474 let actual_state = get_state(actual);
475 assert!(actual_state == EMPTY || actual_state == NOTIFIED);
476 state.store(set_state(actual, NOTIFIED), SeqCst);
477 return None;
478 }
479 }
480 }
481 WAITING => {
482 // At this point, it is guaranteed that the state will not
483 // concurrently change as holding the lock is required to
484 // transition **out** of `WAITING`.
485 //
486 // Get a pending waiter
487 let mut waiter = waiters.pop_back().unwrap();
488
489 // Safety: `waiters` lock is still held.
490 let waiter = unsafe { waiter.as_mut() };
491
492 assert!(waiter.notified.is_none());
493
494 waiter.notified = Some(NotificationType::OneWaiter);
495 let waker = waiter.waker.take();
496
497 if waiters.is_empty() {
498 // As this the **final** waiter in the list, the state
499 // must be transitioned to `EMPTY`. As transitioning
500 // **from** `WAITING` requires the lock to be held, a
501 // `store` is sufficient.
502 state.store(set_state(curr, EMPTY), SeqCst);
503 }
504
505 return waker;
506 }
507 _ => unreachable!(),
508 }
509 }
510 }
511
512 // ===== impl Notified =====
513
514 impl Notified<'_> {
515 /// A custom `project` implementation is used in place of `pin-project-lite`
516 /// as a custom drop implementation is needed.
project(self: Pin<&mut Self>) -> (&Notify, &mut State, &UnsafeCell<Waiter>)517 fn project(self: Pin<&mut Self>) -> (&Notify, &mut State, &UnsafeCell<Waiter>) {
518 unsafe {
519 // Safety: both `notify` and `state` are `Unpin`.
520
521 is_unpin::<&Notify>();
522 is_unpin::<AtomicUsize>();
523
524 let me = self.get_unchecked_mut();
525 (me.notify, &mut me.state, &me.waiter)
526 }
527 }
528 }
529
530 impl Future for Notified<'_> {
531 type Output = ();
532
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>533 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
534 use State::*;
535
536 let (notify, state, waiter) = self.project();
537
538 loop {
539 match *state {
540 Init(initial_notify_waiters_calls) => {
541 let curr = notify.state.load(SeqCst);
542
543 // Optimistically try acquiring a pending notification
544 let res = notify.state.compare_exchange(
545 set_state(curr, NOTIFIED),
546 set_state(curr, EMPTY),
547 SeqCst,
548 SeqCst,
549 );
550
551 if res.is_ok() {
552 // Acquired the notification
553 *state = Done;
554 return Poll::Ready(());
555 }
556
557 // Clone the waker before locking, a waker clone can be
558 // triggering arbitrary code.
559 let waker = cx.waker().clone();
560
561 // Acquire the lock and attempt to transition to the waiting
562 // state.
563 let mut waiters = notify.waiters.lock();
564
565 // Reload the state with the lock held
566 let mut curr = notify.state.load(SeqCst);
567
568 // if notify_waiters has been called after the future
569 // was created, then we are done
570 if get_num_notify_waiters_calls(curr) != initial_notify_waiters_calls {
571 *state = Done;
572 return Poll::Ready(());
573 }
574
575 // Transition the state to WAITING.
576 loop {
577 match get_state(curr) {
578 EMPTY => {
579 // Transition to WAITING
580 let res = notify.state.compare_exchange(
581 set_state(curr, EMPTY),
582 set_state(curr, WAITING),
583 SeqCst,
584 SeqCst,
585 );
586
587 if let Err(actual) = res {
588 assert_eq!(get_state(actual), NOTIFIED);
589 curr = actual;
590 } else {
591 break;
592 }
593 }
594 WAITING => break,
595 NOTIFIED => {
596 // Try consuming the notification
597 let res = notify.state.compare_exchange(
598 set_state(curr, NOTIFIED),
599 set_state(curr, EMPTY),
600 SeqCst,
601 SeqCst,
602 );
603
604 match res {
605 Ok(_) => {
606 // Acquired the notification
607 *state = Done;
608 return Poll::Ready(());
609 }
610 Err(actual) => {
611 assert_eq!(get_state(actual), EMPTY);
612 curr = actual;
613 }
614 }
615 }
616 _ => unreachable!(),
617 }
618 }
619
620 // Safety: called while locked.
621 unsafe {
622 (*waiter.get()).waker = Some(waker);
623 }
624
625 // Insert the waiter into the linked list
626 //
627 // safety: pointers from `UnsafeCell` are never null.
628 waiters.push_front(unsafe { NonNull::new_unchecked(waiter.get()) });
629
630 *state = Waiting;
631
632 return Poll::Pending;
633 }
634 Waiting => {
635 // Currently in the "Waiting" state, implying the caller has
636 // a waiter stored in the waiter list (guarded by
637 // `notify.waiters`). In order to access the waker fields,
638 // we must hold the lock.
639
640 let waiters = notify.waiters.lock();
641
642 // Safety: called while locked
643 let w = unsafe { &mut *waiter.get() };
644
645 if w.notified.is_some() {
646 // Our waker has been notified. Reset the fields and
647 // remove it from the list.
648 w.waker = None;
649 w.notified = None;
650
651 *state = Done;
652 } else {
653 // Update the waker, if necessary.
654 if !w.waker.as_ref().unwrap().will_wake(cx.waker()) {
655 w.waker = Some(cx.waker().clone());
656 }
657
658 return Poll::Pending;
659 }
660
661 // Explicit drop of the lock to indicate the scope that the
662 // lock is held. Because holding the lock is required to
663 // ensure safe access to fields not held within the lock, it
664 // is helpful to visualize the scope of the critical
665 // section.
666 drop(waiters);
667 }
668 Done => {
669 return Poll::Ready(());
670 }
671 }
672 }
673 }
674 }
675
676 impl Drop for Notified<'_> {
drop(&mut self)677 fn drop(&mut self) {
678 use State::*;
679
680 // Safety: The type only transitions to a "Waiting" state when pinned.
681 let (notify, state, waiter) = unsafe { Pin::new_unchecked(self).project() };
682
683 // This is where we ensure safety. The `Notified` value is being
684 // dropped, which means we must ensure that the waiter entry is no
685 // longer stored in the linked list.
686 if let Waiting = *state {
687 let mut waiters = notify.waiters.lock();
688 let mut notify_state = notify.state.load(SeqCst);
689
690 // remove the entry from the list (if not already removed)
691 //
692 // safety: the waiter is only added to `waiters` by virtue of it
693 // being the only `LinkedList` available to the type.
694 unsafe { waiters.remove(NonNull::new_unchecked(waiter.get())) };
695
696 if waiters.is_empty() {
697 if let WAITING = get_state(notify_state) {
698 notify_state = set_state(notify_state, EMPTY);
699 notify.state.store(notify_state, SeqCst);
700 }
701 }
702
703 // See if the node was notified but not received. In this case, if
704 // the notification was triggered via `notify_one`, it must be sent
705 // to the next waiter.
706 //
707 // Safety: with the entry removed from the linked list, there can be
708 // no concurrent access to the entry
709 if let Some(NotificationType::OneWaiter) = unsafe { (*waiter.get()).notified } {
710 if let Some(waker) = notify_locked(&mut waiters, ¬ify.state, notify_state) {
711 drop(waiters);
712 waker.wake();
713 }
714 }
715 }
716 }
717 }
718
719 /// # Safety
720 ///
721 /// `Waiter` is forced to be !Unpin.
722 unsafe impl linked_list::Link for Waiter {
723 type Handle = NonNull<Waiter>;
724 type Target = Waiter;
725
as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter>726 fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
727 *handle
728 }
729
from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter>730 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
731 ptr
732 }
733
pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>>734 unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
735 NonNull::from(&mut target.as_mut().pointers)
736 }
737 }
738
is_unpin<T: Unpin>()739 fn is_unpin<T: Unpin>() {}
740