1 // Allow `unreachable_pub` warnings when sync is not enabled
2 // due to the usage of `Notify` within the `rt` feature set.
3 // When this module is compiled with `sync` enabled we will warn on
4 // this lint. When `rt` is enabled we use `pub(crate)` which
5 // triggers this warning but it is safe to ignore in this case.
6 #![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
7
8 use crate::loom::cell::UnsafeCell;
9 use crate::loom::sync::atomic::AtomicUsize;
10 use crate::loom::sync::Mutex;
11 use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
12 use crate::util::WakeList;
13
14 use std::future::Future;
15 use std::marker::PhantomPinned;
16 use std::panic::{RefUnwindSafe, UnwindSafe};
17 use std::pin::Pin;
18 use std::ptr::NonNull;
19 use std::sync::atomic::Ordering::{self, Acquire, Relaxed, Release, SeqCst};
20 use std::task::{Context, Poll, Waker};
21
22 type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
23 type GuardedWaitList = GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
24
25 /// Notifies a single task to wake up.
26 ///
27 /// `Notify` provides a basic mechanism to notify a single task of an event.
28 /// `Notify` itself does not carry any data. Instead, it is to be used to signal
29 /// another task to perform an operation.
30 ///
31 /// A `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. The
32 /// [`notified().await`] method waits for a permit to become available, and
33 /// [`notify_one()`] sets a permit **if there currently are no available
34 /// permits**.
35 ///
36 /// The synchronization details of `Notify` are similar to
37 /// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`]
38 /// value contains a single permit. [`notified().await`] waits for the permit to
39 /// be made available, consumes the permit, and resumes. [`notify_one()`] sets
40 /// the permit, waking a pending task if there is one.
41 ///
42 /// If `notify_one()` is called **before** `notified().await`, then the next
43 /// call to `notified().await` will complete immediately, consuming the permit.
44 /// Any subsequent calls to `notified().await` will wait for a new permit.
45 ///
46 /// If `notify_one()` is called **multiple** times before `notified().await`,
47 /// only a **single** permit is stored. The next call to `notified().await` will
48 /// complete immediately, but the one after will wait for a new permit.
49 ///
50 /// # Examples
51 ///
52 /// Basic usage.
53 ///
54 /// ```
55 /// use tokio::sync::Notify;
56 /// use std::sync::Arc;
57 ///
58 /// #[tokio::main]
59 /// async fn main() {
60 /// let notify = Arc::new(Notify::new());
61 /// let notify2 = notify.clone();
62 ///
63 /// let handle = tokio::spawn(async move {
64 /// notify2.notified().await;
65 /// println!("received notification");
66 /// });
67 ///
68 /// println!("sending notification");
69 /// notify.notify_one();
70 ///
71 /// // Wait for task to receive notification.
72 /// handle.await.unwrap();
73 /// }
74 /// ```
75 ///
76 /// Unbound multi-producer single-consumer (mpsc) channel.
77 ///
78 /// No wakeups can be lost when using this channel because the call to
79 /// `notify_one()` will store a permit in the `Notify`, which the following call
80 /// to `notified()` will consume.
81 ///
82 /// ```
83 /// use tokio::sync::Notify;
84 ///
85 /// use std::collections::VecDeque;
86 /// use std::sync::Mutex;
87 ///
88 /// struct Channel<T> {
89 /// values: Mutex<VecDeque<T>>,
90 /// notify: Notify,
91 /// }
92 ///
93 /// impl<T> Channel<T> {
94 /// pub fn send(&self, value: T) {
95 /// self.values.lock().unwrap()
96 /// .push_back(value);
97 ///
98 /// // Notify the consumer a value is available
99 /// self.notify.notify_one();
100 /// }
101 ///
102 /// // This is a single-consumer channel, so several concurrent calls to
103 /// // `recv` are not allowed.
104 /// pub async fn recv(&self) -> T {
105 /// loop {
106 /// // Drain values
107 /// if let Some(value) = self.values.lock().unwrap().pop_front() {
108 /// return value;
109 /// }
110 ///
111 /// // Wait for values to be available
112 /// self.notify.notified().await;
113 /// }
114 /// }
115 /// }
116 /// ```
117 ///
118 /// Unbound multi-producer multi-consumer (mpmc) channel.
119 ///
120 /// The call to [`enable`] is important because otherwise if you have two
121 /// calls to `recv` and two calls to `send` in parallel, the following could
122 /// happen:
123 ///
124 /// 1. Both calls to `try_recv` return `None`.
125 /// 2. Both new elements are added to the vector.
126 /// 3. The `notify_one` method is called twice, adding only a single
127 /// permit to the `Notify`.
128 /// 4. Both calls to `recv` reach the `Notified` future. One of them
129 /// consumes the permit, and the other sleeps forever.
130 ///
131 /// By adding the `Notified` futures to the list by calling `enable` before
132 /// `try_recv`, the `notify_one` calls in step three would remove the
133 /// futures from the list and mark them notified instead of adding a permit
134 /// to the `Notify`. This ensures that both futures are woken.
135 ///
136 /// Notice that this failure can only happen if there are two concurrent calls
137 /// to `recv`. This is why the mpsc example above does not require a call to
138 /// `enable`.
139 ///
140 /// ```
141 /// use tokio::sync::Notify;
142 ///
143 /// use std::collections::VecDeque;
144 /// use std::sync::Mutex;
145 ///
146 /// struct Channel<T> {
147 /// messages: Mutex<VecDeque<T>>,
148 /// notify_on_sent: Notify,
149 /// }
150 ///
151 /// impl<T> Channel<T> {
152 /// pub fn send(&self, msg: T) {
153 /// let mut locked_queue = self.messages.lock().unwrap();
154 /// locked_queue.push_back(msg);
155 /// drop(locked_queue);
156 ///
157 /// // Send a notification to one of the calls currently
158 /// // waiting in a call to `recv`.
159 /// self.notify_on_sent.notify_one();
160 /// }
161 ///
162 /// pub fn try_recv(&self) -> Option<T> {
163 /// let mut locked_queue = self.messages.lock().unwrap();
164 /// locked_queue.pop_front()
165 /// }
166 ///
167 /// pub async fn recv(&self) -> T {
168 /// let future = self.notify_on_sent.notified();
169 /// tokio::pin!(future);
170 ///
171 /// loop {
172 /// // Make sure that no wakeup is lost if we get
173 /// // `None` from `try_recv`.
174 /// future.as_mut().enable();
175 ///
176 /// if let Some(msg) = self.try_recv() {
177 /// return msg;
178 /// }
179 ///
180 /// // Wait for a call to `notify_one`.
181 /// //
182 /// // This uses `.as_mut()` to avoid consuming the future,
183 /// // which lets us call `Pin::set` below.
184 /// future.as_mut().await;
185 ///
186 /// // Reset the future in case another call to
187 /// // `try_recv` got the message before us.
188 /// future.set(self.notify_on_sent.notified());
189 /// }
190 /// }
191 /// }
192 /// ```
193 ///
194 /// [park]: std::thread::park
195 /// [unpark]: std::thread::Thread::unpark
196 /// [`notified().await`]: Notify::notified()
197 /// [`notify_one()`]: Notify::notify_one()
198 /// [`enable`]: Notified::enable()
199 /// [`Semaphore`]: crate::sync::Semaphore
200 #[derive(Debug)]
201 pub struct Notify {
202 // `state` uses 2 bits to store one of `EMPTY`,
203 // `WAITING` or `NOTIFIED`. The rest of the bits
204 // are used to store the number of times `notify_waiters`
205 // was called.
206 //
207 // Throughout the code there are two assumptions:
208 // - state can be transitioned *from* `WAITING` only if
209 // `waiters` lock is held
210 // - number of times `notify_waiters` was called can
211 // be modified only if `waiters` lock is held
212 state: AtomicUsize,
213 waiters: Mutex<WaitList>,
214 }
215
216 #[derive(Debug)]
217 struct Waiter {
218 /// Intrusive linked-list pointers.
219 pointers: linked_list::Pointers<Waiter>,
220
221 /// Waiting task's waker. Depending on the value of `notification`,
222 /// this field is either protected by the `waiters` lock in
223 /// `Notify`, or it is exclusively owned by the enclosing `Waiter`.
224 waker: UnsafeCell<Option<Waker>>,
225
226 /// Notification for this waiter.
227 /// * if it's `None`, then `waker` is protected by the `waiters` lock.
228 /// * if it's `Some`, then `waker` is exclusively owned by the
229 /// enclosing `Waiter` and can be accessed without locking.
230 notification: AtomicNotification,
231
232 /// Should not be `Unpin`.
233 _p: PhantomPinned,
234 }
235
236 impl Waiter {
new() -> Waiter237 fn new() -> Waiter {
238 Waiter {
239 pointers: linked_list::Pointers::new(),
240 waker: UnsafeCell::new(None),
241 notification: AtomicNotification::none(),
242 _p: PhantomPinned,
243 }
244 }
245 }
246
247 generate_addr_of_methods! {
248 impl<> Waiter {
249 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
250 &self.pointers
251 }
252 }
253 }
254
255 // No notification.
256 const NOTIFICATION_NONE: usize = 0;
257
258 // Notification type used by `notify_one`.
259 const NOTIFICATION_ONE: usize = 1;
260
261 // Notification type used by `notify_waiters`.
262 const NOTIFICATION_ALL: usize = 2;
263
264 /// Notification for a `Waiter`.
265 /// This struct is equivalent to `Option<Notification>`, but uses
266 /// `AtomicUsize` inside for atomic operations.
267 #[derive(Debug)]
268 struct AtomicNotification(AtomicUsize);
269
270 impl AtomicNotification {
none() -> Self271 fn none() -> Self {
272 AtomicNotification(AtomicUsize::new(NOTIFICATION_NONE))
273 }
274
275 /// Store-release a notification.
276 /// This method should be called exactly once.
store_release(&self, notification: Notification)277 fn store_release(&self, notification: Notification) {
278 self.0.store(notification as usize, Release);
279 }
280
load(&self, ordering: Ordering) -> Option<Notification>281 fn load(&self, ordering: Ordering) -> Option<Notification> {
282 match self.0.load(ordering) {
283 NOTIFICATION_NONE => None,
284 NOTIFICATION_ONE => Some(Notification::One),
285 NOTIFICATION_ALL => Some(Notification::All),
286 _ => unreachable!(),
287 }
288 }
289
290 /// Clears the notification.
291 /// This method is used by a `Notified` future to consume the
292 /// notification. It uses relaxed ordering and should be only
293 /// used once the atomic notification is no longer shared.
clear(&self)294 fn clear(&self) {
295 self.0.store(NOTIFICATION_NONE, Relaxed);
296 }
297 }
298
299 #[derive(Debug, PartialEq, Eq)]
300 #[repr(usize)]
301 enum Notification {
302 One = NOTIFICATION_ONE,
303 All = NOTIFICATION_ALL,
304 }
305
306 /// List used in `Notify::notify_waiters`. It wraps a guarded linked list
307 /// and gates the access to it on `notify.waiters` mutex. It also empties
308 /// the list on drop.
309 struct NotifyWaitersList<'a> {
310 list: GuardedWaitList,
311 is_empty: bool,
312 notify: &'a Notify,
313 }
314
315 impl<'a> NotifyWaitersList<'a> {
new( unguarded_list: WaitList, guard: Pin<&'a Waiter>, notify: &'a Notify, ) -> NotifyWaitersList<'a>316 fn new(
317 unguarded_list: WaitList,
318 guard: Pin<&'a Waiter>,
319 notify: &'a Notify,
320 ) -> NotifyWaitersList<'a> {
321 let guard_ptr = NonNull::from(guard.get_ref());
322 let list = unguarded_list.into_guarded(guard_ptr);
323 NotifyWaitersList {
324 list,
325 is_empty: false,
326 notify,
327 }
328 }
329
330 /// Removes the last element from the guarded list. Modifying this list
331 /// requires an exclusive access to the main list in `Notify`.
pop_back_locked(&mut self, _waiters: &mut WaitList) -> Option<NonNull<Waiter>>332 fn pop_back_locked(&mut self, _waiters: &mut WaitList) -> Option<NonNull<Waiter>> {
333 let result = self.list.pop_back();
334 if result.is_none() {
335 // Save information about emptiness to avoid waiting for lock
336 // in the destructor.
337 self.is_empty = true;
338 }
339 result
340 }
341 }
342
343 impl Drop for NotifyWaitersList<'_> {
drop(&mut self)344 fn drop(&mut self) {
345 // If the list is not empty, we unlink all waiters from it.
346 // We do not wake the waiters to avoid double panics.
347 if !self.is_empty {
348 let _lock_guard = self.notify.waiters.lock();
349 while let Some(waiter) = self.list.pop_back() {
350 // Safety: we never make mutable references to waiters.
351 let waiter = unsafe { waiter.as_ref() };
352 waiter.notification.store_release(Notification::All);
353 }
354 }
355 }
356 }
357
358 /// Future returned from [`Notify::notified()`].
359 ///
360 /// This future is fused, so once it has completed, any future calls to poll
361 /// will immediately return `Poll::Ready`.
362 #[derive(Debug)]
363 pub struct Notified<'a> {
364 /// The `Notify` being received on.
365 notify: &'a Notify,
366
367 /// The current state of the receiving process.
368 state: State,
369
370 /// Number of calls to `notify_waiters` at the time of creation.
371 notify_waiters_calls: usize,
372
373 /// Entry in the waiter `LinkedList`.
374 waiter: Waiter,
375 }
376
377 unsafe impl<'a> Send for Notified<'a> {}
378 unsafe impl<'a> Sync for Notified<'a> {}
379
380 #[derive(Debug)]
381 enum State {
382 Init,
383 Waiting,
384 Done,
385 }
386
387 const NOTIFY_WAITERS_SHIFT: usize = 2;
388 const STATE_MASK: usize = (1 << NOTIFY_WAITERS_SHIFT) - 1;
389 const NOTIFY_WAITERS_CALLS_MASK: usize = !STATE_MASK;
390
391 /// Initial "idle" state.
392 const EMPTY: usize = 0;
393
394 /// One or more threads are currently waiting to be notified.
395 const WAITING: usize = 1;
396
397 /// Pending notification.
398 const NOTIFIED: usize = 2;
399
set_state(data: usize, state: usize) -> usize400 fn set_state(data: usize, state: usize) -> usize {
401 (data & NOTIFY_WAITERS_CALLS_MASK) | (state & STATE_MASK)
402 }
403
get_state(data: usize) -> usize404 fn get_state(data: usize) -> usize {
405 data & STATE_MASK
406 }
407
get_num_notify_waiters_calls(data: usize) -> usize408 fn get_num_notify_waiters_calls(data: usize) -> usize {
409 (data & NOTIFY_WAITERS_CALLS_MASK) >> NOTIFY_WAITERS_SHIFT
410 }
411
inc_num_notify_waiters_calls(data: usize) -> usize412 fn inc_num_notify_waiters_calls(data: usize) -> usize {
413 data + (1 << NOTIFY_WAITERS_SHIFT)
414 }
415
atomic_inc_num_notify_waiters_calls(data: &AtomicUsize)416 fn atomic_inc_num_notify_waiters_calls(data: &AtomicUsize) {
417 data.fetch_add(1 << NOTIFY_WAITERS_SHIFT, SeqCst);
418 }
419
420 impl Notify {
421 /// Create a new `Notify`, initialized without a permit.
422 ///
423 /// # Examples
424 ///
425 /// ```
426 /// use tokio::sync::Notify;
427 ///
428 /// let notify = Notify::new();
429 /// ```
new() -> Notify430 pub fn new() -> Notify {
431 Notify {
432 state: AtomicUsize::new(0),
433 waiters: Mutex::new(LinkedList::new()),
434 }
435 }
436
437 /// Create a new `Notify`, initialized without a permit.
438 ///
439 /// # Examples
440 ///
441 /// ```
442 /// use tokio::sync::Notify;
443 ///
444 /// static NOTIFY: Notify = Notify::const_new();
445 /// ```
446 #[cfg(not(all(loom, test)))]
const_new() -> Notify447 pub const fn const_new() -> Notify {
448 Notify {
449 state: AtomicUsize::new(0),
450 waiters: Mutex::const_new(LinkedList::new()),
451 }
452 }
453
454 /// Wait for a notification.
455 ///
456 /// Equivalent to:
457 ///
458 /// ```ignore
459 /// async fn notified(&self);
460 /// ```
461 ///
462 /// Each `Notify` value holds a single permit. If a permit is available from
463 /// an earlier call to [`notify_one()`], then `notified().await` will complete
464 /// immediately, consuming that permit. Otherwise, `notified().await` waits
465 /// for a permit to be made available by the next call to `notify_one()`.
466 ///
467 /// The `Notified` future is not guaranteed to receive wakeups from calls to
468 /// `notify_one()` if it has not yet been polled. See the documentation for
469 /// [`Notified::enable()`] for more details.
470 ///
471 /// The `Notified` future is guaranteed to receive wakeups from
472 /// `notify_waiters()` as soon as it has been created, even if it has not
473 /// yet been polled.
474 ///
475 /// [`notify_one()`]: Notify::notify_one
476 /// [`Notified::enable()`]: Notified::enable
477 ///
478 /// # Cancel safety
479 ///
480 /// This method uses a queue to fairly distribute notifications in the order
481 /// they were requested. Cancelling a call to `notified` makes you lose your
482 /// place in the queue.
483 ///
484 /// # Examples
485 ///
486 /// ```
487 /// use tokio::sync::Notify;
488 /// use std::sync::Arc;
489 ///
490 /// #[tokio::main]
491 /// async fn main() {
492 /// let notify = Arc::new(Notify::new());
493 /// let notify2 = notify.clone();
494 ///
495 /// tokio::spawn(async move {
496 /// notify2.notified().await;
497 /// println!("received notification");
498 /// });
499 ///
500 /// println!("sending notification");
501 /// notify.notify_one();
502 /// }
503 /// ```
notified(&self) -> Notified<'_>504 pub fn notified(&self) -> Notified<'_> {
505 // we load the number of times notify_waiters
506 // was called and store that in the future.
507 let state = self.state.load(SeqCst);
508 Notified {
509 notify: self,
510 state: State::Init,
511 notify_waiters_calls: get_num_notify_waiters_calls(state),
512 waiter: Waiter::new(),
513 }
514 }
515
516 /// Notifies a waiting task.
517 ///
518 /// If a task is currently waiting, that task is notified. Otherwise, a
519 /// permit is stored in this `Notify` value and the **next** call to
520 /// [`notified().await`] will complete immediately consuming the permit made
521 /// available by this call to `notify_one()`.
522 ///
523 /// At most one permit may be stored by `Notify`. Many sequential calls to
524 /// `notify_one` will result in a single permit being stored. The next call to
525 /// `notified().await` will complete immediately, but the one after that
526 /// will wait.
527 ///
528 /// [`notified().await`]: Notify::notified()
529 ///
530 /// # Examples
531 ///
532 /// ```
533 /// use tokio::sync::Notify;
534 /// use std::sync::Arc;
535 ///
536 /// #[tokio::main]
537 /// async fn main() {
538 /// let notify = Arc::new(Notify::new());
539 /// let notify2 = notify.clone();
540 ///
541 /// tokio::spawn(async move {
542 /// notify2.notified().await;
543 /// println!("received notification");
544 /// });
545 ///
546 /// println!("sending notification");
547 /// notify.notify_one();
548 /// }
549 /// ```
550 // Alias for old name in 0.x
551 #[cfg_attr(docsrs, doc(alias = "notify"))]
notify_one(&self)552 pub fn notify_one(&self) {
553 // Load the current state
554 let mut curr = self.state.load(SeqCst);
555
556 // If the state is `EMPTY`, transition to `NOTIFIED` and return.
557 while let EMPTY | NOTIFIED = get_state(curr) {
558 // The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A
559 // happens-before synchronization must happen between this atomic
560 // operation and a task calling `notified().await`.
561 let new = set_state(curr, NOTIFIED);
562 let res = self.state.compare_exchange(curr, new, SeqCst, SeqCst);
563
564 match res {
565 // No waiters, no further work to do
566 Ok(_) => return,
567 Err(actual) => {
568 curr = actual;
569 }
570 }
571 }
572
573 // There are waiters, the lock must be acquired to notify.
574 let mut waiters = self.waiters.lock();
575
576 // The state must be reloaded while the lock is held. The state may only
577 // transition out of WAITING while the lock is held.
578 curr = self.state.load(SeqCst);
579
580 if let Some(waker) = notify_locked(&mut waiters, &self.state, curr) {
581 drop(waiters);
582 waker.wake();
583 }
584 }
585
586 /// Notifies all waiting tasks.
587 ///
588 /// If a task is currently waiting, that task is notified. Unlike with
589 /// `notify_one()`, no permit is stored to be used by the next call to
590 /// `notified().await`. The purpose of this method is to notify all
591 /// already registered waiters. Registering for notification is done by
592 /// acquiring an instance of the `Notified` future via calling `notified()`.
593 ///
594 /// # Examples
595 ///
596 /// ```
597 /// use tokio::sync::Notify;
598 /// use std::sync::Arc;
599 ///
600 /// #[tokio::main]
601 /// async fn main() {
602 /// let notify = Arc::new(Notify::new());
603 /// let notify2 = notify.clone();
604 ///
605 /// let notified1 = notify.notified();
606 /// let notified2 = notify.notified();
607 ///
608 /// let handle = tokio::spawn(async move {
609 /// println!("sending notifications");
610 /// notify2.notify_waiters();
611 /// });
612 ///
613 /// notified1.await;
614 /// notified2.await;
615 /// println!("received notifications");
616 /// }
617 /// ```
notify_waiters(&self)618 pub fn notify_waiters(&self) {
619 let mut waiters = self.waiters.lock();
620
621 // The state must be loaded while the lock is held. The state may only
622 // transition out of WAITING while the lock is held.
623 let curr = self.state.load(SeqCst);
624
625 if matches!(get_state(curr), EMPTY | NOTIFIED) {
626 // There are no waiting tasks. All we need to do is increment the
627 // number of times this method was called.
628 atomic_inc_num_notify_waiters_calls(&self.state);
629 return;
630 }
631
632 // Increment the number of times this method was called
633 // and transition to empty.
634 let new_state = set_state(inc_num_notify_waiters_calls(curr), EMPTY);
635 self.state.store(new_state, SeqCst);
636
637 // It is critical for `GuardedLinkedList` safety that the guard node is
638 // pinned in memory and is not dropped until the guarded list is dropped.
639 let guard = Waiter::new();
640 pin!(guard);
641
642 // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
643 // underneath to allow every waiter to safely remove itself from it.
644 //
645 // * This list will be still guarded by the `waiters` lock.
646 // `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
647 // * This wrapper will empty the list on drop. It is critical for safety
648 // that we will not leave any list entry with a pointer to the local
649 // guard node after this function returns / panics.
650 let mut list = NotifyWaitersList::new(std::mem::take(&mut *waiters), guard.as_ref(), self);
651
652 let mut wakers = WakeList::new();
653 'outer: loop {
654 while wakers.can_push() {
655 match list.pop_back_locked(&mut waiters) {
656 Some(waiter) => {
657 // Safety: we never make mutable references to waiters.
658 let waiter = unsafe { waiter.as_ref() };
659
660 // Safety: we hold the lock, so we can access the waker.
661 if let Some(waker) =
662 unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }
663 {
664 wakers.push(waker);
665 }
666
667 // This waiter is unlinked and will not be shared ever again, release it.
668 waiter.notification.store_release(Notification::All);
669 }
670 None => {
671 break 'outer;
672 }
673 }
674 }
675
676 // Release the lock before notifying.
677 drop(waiters);
678
679 // One of the wakers may panic, but the remaining waiters will still
680 // be unlinked from the list in `NotifyWaitersList` destructor.
681 wakers.wake_all();
682
683 // Acquire the lock again.
684 waiters = self.waiters.lock();
685 }
686
687 // Release the lock before notifying
688 drop(waiters);
689
690 wakers.wake_all();
691 }
692 }
693
694 impl Default for Notify {
default() -> Notify695 fn default() -> Notify {
696 Notify::new()
697 }
698 }
699
700 impl UnwindSafe for Notify {}
701 impl RefUnwindSafe for Notify {}
702
notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Option<Waker>703 fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Option<Waker> {
704 loop {
705 match get_state(curr) {
706 EMPTY | NOTIFIED => {
707 let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst);
708
709 match res {
710 Ok(_) => return None,
711 Err(actual) => {
712 let actual_state = get_state(actual);
713 assert!(actual_state == EMPTY || actual_state == NOTIFIED);
714 state.store(set_state(actual, NOTIFIED), SeqCst);
715 return None;
716 }
717 }
718 }
719 WAITING => {
720 // At this point, it is guaranteed that the state will not
721 // concurrently change as holding the lock is required to
722 // transition **out** of `WAITING`.
723 //
724 // Get a pending waiter
725 let waiter = waiters.pop_back().unwrap();
726
727 // Safety: we never make mutable references to waiters.
728 let waiter = unsafe { waiter.as_ref() };
729
730 // Safety: we hold the lock, so we can access the waker.
731 let waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
732
733 // This waiter is unlinked and will not be shared ever again, release it.
734 waiter.notification.store_release(Notification::One);
735
736 if waiters.is_empty() {
737 // As this the **final** waiter in the list, the state
738 // must be transitioned to `EMPTY`. As transitioning
739 // **from** `WAITING` requires the lock to be held, a
740 // `store` is sufficient.
741 state.store(set_state(curr, EMPTY), SeqCst);
742 }
743
744 return waker;
745 }
746 _ => unreachable!(),
747 }
748 }
749 }
750
751 // ===== impl Notified =====
752
753 impl Notified<'_> {
754 /// Adds this future to the list of futures that are ready to receive
755 /// wakeups from calls to [`notify_one`].
756 ///
757 /// Polling the future also adds it to the list, so this method should only
758 /// be used if you want to add the future to the list before the first call
759 /// to `poll`. (In fact, this method is equivalent to calling `poll` except
760 /// that no `Waker` is registered.)
761 ///
762 /// This has no effect on notifications sent using [`notify_waiters`], which
763 /// are received as long as they happen after the creation of the `Notified`
764 /// regardless of whether `enable` or `poll` has been called.
765 ///
766 /// This method returns true if the `Notified` is ready. This happens in the
767 /// following situations:
768 ///
769 /// 1. The `notify_waiters` method was called between the creation of the
770 /// `Notified` and the call to this method.
771 /// 2. This is the first call to `enable` or `poll` on this future, and the
772 /// `Notify` was holding a permit from a previous call to `notify_one`.
773 /// The call consumes the permit in that case.
774 /// 3. The future has previously been enabled or polled, and it has since
775 /// then been marked ready by either consuming a permit from the
776 /// `Notify`, or by a call to `notify_one` or `notify_waiters` that
777 /// removed it from the list of futures ready to receive wakeups.
778 ///
779 /// If this method returns true, any future calls to poll on the same future
780 /// will immediately return `Poll::Ready`.
781 ///
782 /// # Examples
783 ///
784 /// Unbound multi-producer multi-consumer (mpmc) channel.
785 ///
786 /// The call to `enable` is important because otherwise if you have two
787 /// calls to `recv` and two calls to `send` in parallel, the following could
788 /// happen:
789 ///
790 /// 1. Both calls to `try_recv` return `None`.
791 /// 2. Both new elements are added to the vector.
792 /// 3. The `notify_one` method is called twice, adding only a single
793 /// permit to the `Notify`.
794 /// 4. Both calls to `recv` reach the `Notified` future. One of them
795 /// consumes the permit, and the other sleeps forever.
796 ///
797 /// By adding the `Notified` futures to the list by calling `enable` before
798 /// `try_recv`, the `notify_one` calls in step three would remove the
799 /// futures from the list and mark them notified instead of adding a permit
800 /// to the `Notify`. This ensures that both futures are woken.
801 ///
802 /// ```
803 /// use tokio::sync::Notify;
804 ///
805 /// use std::collections::VecDeque;
806 /// use std::sync::Mutex;
807 ///
808 /// struct Channel<T> {
809 /// messages: Mutex<VecDeque<T>>,
810 /// notify_on_sent: Notify,
811 /// }
812 ///
813 /// impl<T> Channel<T> {
814 /// pub fn send(&self, msg: T) {
815 /// let mut locked_queue = self.messages.lock().unwrap();
816 /// locked_queue.push_back(msg);
817 /// drop(locked_queue);
818 ///
819 /// // Send a notification to one of the calls currently
820 /// // waiting in a call to `recv`.
821 /// self.notify_on_sent.notify_one();
822 /// }
823 ///
824 /// pub fn try_recv(&self) -> Option<T> {
825 /// let mut locked_queue = self.messages.lock().unwrap();
826 /// locked_queue.pop_front()
827 /// }
828 ///
829 /// pub async fn recv(&self) -> T {
830 /// let future = self.notify_on_sent.notified();
831 /// tokio::pin!(future);
832 ///
833 /// loop {
834 /// // Make sure that no wakeup is lost if we get
835 /// // `None` from `try_recv`.
836 /// future.as_mut().enable();
837 ///
838 /// if let Some(msg) = self.try_recv() {
839 /// return msg;
840 /// }
841 ///
842 /// // Wait for a call to `notify_one`.
843 /// //
844 /// // This uses `.as_mut()` to avoid consuming the future,
845 /// // which lets us call `Pin::set` below.
846 /// future.as_mut().await;
847 ///
848 /// // Reset the future in case another call to
849 /// // `try_recv` got the message before us.
850 /// future.set(self.notify_on_sent.notified());
851 /// }
852 /// }
853 /// }
854 /// ```
855 ///
856 /// [`notify_one`]: Notify::notify_one()
857 /// [`notify_waiters`]: Notify::notify_waiters()
enable(self: Pin<&mut Self>) -> bool858 pub fn enable(self: Pin<&mut Self>) -> bool {
859 self.poll_notified(None).is_ready()
860 }
861
862 /// A custom `project` implementation is used in place of `pin-project-lite`
863 /// as a custom drop implementation is needed.
project(self: Pin<&mut Self>) -> (&Notify, &mut State, &usize, &Waiter)864 fn project(self: Pin<&mut Self>) -> (&Notify, &mut State, &usize, &Waiter) {
865 unsafe {
866 // Safety: `notify`, `state` and `notify_waiters_calls` are `Unpin`.
867
868 is_unpin::<&Notify>();
869 is_unpin::<State>();
870 is_unpin::<usize>();
871
872 let me = self.get_unchecked_mut();
873 (
874 me.notify,
875 &mut me.state,
876 &me.notify_waiters_calls,
877 &me.waiter,
878 )
879 }
880 }
881
poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()>882 fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> {
883 use State::*;
884
885 let (notify, state, notify_waiters_calls, waiter) = self.project();
886
887 'outer_loop: loop {
888 match *state {
889 Init => {
890 let curr = notify.state.load(SeqCst);
891
892 // Optimistically try acquiring a pending notification
893 let res = notify.state.compare_exchange(
894 set_state(curr, NOTIFIED),
895 set_state(curr, EMPTY),
896 SeqCst,
897 SeqCst,
898 );
899
900 if res.is_ok() {
901 // Acquired the notification
902 *state = Done;
903 continue 'outer_loop;
904 }
905
906 // Clone the waker before locking, a waker clone can be
907 // triggering arbitrary code.
908 let waker = waker.cloned();
909
910 // Acquire the lock and attempt to transition to the waiting
911 // state.
912 let mut waiters = notify.waiters.lock();
913
914 // Reload the state with the lock held
915 let mut curr = notify.state.load(SeqCst);
916
917 // if notify_waiters has been called after the future
918 // was created, then we are done
919 if get_num_notify_waiters_calls(curr) != *notify_waiters_calls {
920 *state = Done;
921 continue 'outer_loop;
922 }
923
924 // Transition the state to WAITING.
925 loop {
926 match get_state(curr) {
927 EMPTY => {
928 // Transition to WAITING
929 let res = notify.state.compare_exchange(
930 set_state(curr, EMPTY),
931 set_state(curr, WAITING),
932 SeqCst,
933 SeqCst,
934 );
935
936 if let Err(actual) = res {
937 assert_eq!(get_state(actual), NOTIFIED);
938 curr = actual;
939 } else {
940 break;
941 }
942 }
943 WAITING => break,
944 NOTIFIED => {
945 // Try consuming the notification
946 let res = notify.state.compare_exchange(
947 set_state(curr, NOTIFIED),
948 set_state(curr, EMPTY),
949 SeqCst,
950 SeqCst,
951 );
952
953 match res {
954 Ok(_) => {
955 // Acquired the notification
956 *state = Done;
957 continue 'outer_loop;
958 }
959 Err(actual) => {
960 assert_eq!(get_state(actual), EMPTY);
961 curr = actual;
962 }
963 }
964 }
965 _ => unreachable!(),
966 }
967 }
968
969 let mut old_waker = None;
970 if waker.is_some() {
971 // Safety: called while locked.
972 //
973 // The use of `old_waiter` here is not necessary, as the field is always
974 // None when we reach this line.
975 unsafe {
976 old_waker =
977 waiter.waker.with_mut(|v| std::mem::replace(&mut *v, waker));
978 }
979 }
980
981 // Insert the waiter into the linked list
982 waiters.push_front(NonNull::from(waiter));
983
984 *state = Waiting;
985
986 drop(waiters);
987 drop(old_waker);
988
989 return Poll::Pending;
990 }
991 Waiting => {
992 #[cfg(tokio_taskdump)]
993 if let Some(waker) = waker {
994 let mut ctx = Context::from_waker(waker);
995 ready!(crate::trace::trace_leaf(&mut ctx));
996 }
997
998 if waiter.notification.load(Acquire).is_some() {
999 // Safety: waiter is already unlinked and will not be shared again,
1000 // so we have an exclusive access to `waker`.
1001 drop(unsafe { waiter.waker.with_mut(|waker| (*waker).take()) });
1002
1003 waiter.notification.clear();
1004 *state = Done;
1005 return Poll::Ready(());
1006 }
1007
1008 // Our waiter was not notified, implying it is still stored in a waiter
1009 // list (guarded by `notify.waiters`). In order to access the waker
1010 // fields, we must acquire the lock.
1011
1012 let mut old_waker = None;
1013 let mut waiters = notify.waiters.lock();
1014
1015 // We hold the lock and notifications are set only with the lock held,
1016 // so this can be relaxed, because the happens-before relationship is
1017 // established through the mutex.
1018 if waiter.notification.load(Relaxed).is_some() {
1019 // Safety: waiter is already unlinked and will not be shared again,
1020 // so we have an exclusive access to `waker`.
1021 old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
1022
1023 waiter.notification.clear();
1024
1025 // Drop the old waker after releasing the lock.
1026 drop(waiters);
1027 drop(old_waker);
1028
1029 *state = Done;
1030 return Poll::Ready(());
1031 }
1032
1033 // Load the state with the lock held.
1034 let curr = notify.state.load(SeqCst);
1035
1036 if get_num_notify_waiters_calls(curr) != *notify_waiters_calls {
1037 // Before we add a waiter to the list we check if these numbers are
1038 // different while holding the lock. If these numbers are different now,
1039 // it means that there is a call to `notify_waiters` in progress and this
1040 // waiter must be contained by a guarded list used in `notify_waiters`.
1041 // We can treat the waiter as notified and remove it from the list, as
1042 // it would have been notified in the `notify_waiters` call anyways.
1043
1044 // Safety: we hold the lock, so we can modify the waker.
1045 old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
1046
1047 // Safety: we hold the lock, so we have an exclusive access to the list.
1048 // The list is used in `notify_waiters`, so it must be guarded.
1049 unsafe { waiters.remove(NonNull::from(waiter)) };
1050
1051 *state = Done;
1052 } else {
1053 // Safety: we hold the lock, so we can modify the waker.
1054 unsafe {
1055 waiter.waker.with_mut(|v| {
1056 if let Some(waker) = waker {
1057 let should_update = match &*v {
1058 Some(current_waker) => !current_waker.will_wake(waker),
1059 None => true,
1060 };
1061 if should_update {
1062 old_waker = std::mem::replace(&mut *v, Some(waker.clone()));
1063 }
1064 }
1065 });
1066 }
1067
1068 // Drop the old waker after releasing the lock.
1069 drop(waiters);
1070 drop(old_waker);
1071
1072 return Poll::Pending;
1073 }
1074
1075 // Explicit drop of the lock to indicate the scope that the
1076 // lock is held. Because holding the lock is required to
1077 // ensure safe access to fields not held within the lock, it
1078 // is helpful to visualize the scope of the critical
1079 // section.
1080 drop(waiters);
1081
1082 // Drop the old waker after releasing the lock.
1083 drop(old_waker);
1084 }
1085 Done => {
1086 #[cfg(tokio_taskdump)]
1087 if let Some(waker) = waker {
1088 let mut ctx = Context::from_waker(waker);
1089 ready!(crate::trace::trace_leaf(&mut ctx));
1090 }
1091 return Poll::Ready(());
1092 }
1093 }
1094 }
1095 }
1096 }
1097
1098 impl Future for Notified<'_> {
1099 type Output = ();
1100
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>1101 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
1102 self.poll_notified(Some(cx.waker()))
1103 }
1104 }
1105
1106 impl Drop for Notified<'_> {
drop(&mut self)1107 fn drop(&mut self) {
1108 use State::*;
1109
1110 // Safety: The type only transitions to a "Waiting" state when pinned.
1111 let (notify, state, _, waiter) = unsafe { Pin::new_unchecked(self).project() };
1112
1113 // This is where we ensure safety. The `Notified` value is being
1114 // dropped, which means we must ensure that the waiter entry is no
1115 // longer stored in the linked list.
1116 if matches!(*state, Waiting) {
1117 let mut waiters = notify.waiters.lock();
1118 let mut notify_state = notify.state.load(SeqCst);
1119
1120 // We hold the lock, so this field is not concurrently accessed by
1121 // `notify_*` functions and we can use the relaxed ordering.
1122 let notification = waiter.notification.load(Relaxed);
1123
1124 // remove the entry from the list (if not already removed)
1125 //
1126 // Safety: we hold the lock, so we have an exclusive access to every list the
1127 // waiter may be contained in. If the node is not contained in the `waiters`
1128 // list, then it is contained by a guarded list used by `notify_waiters`.
1129 unsafe { waiters.remove(NonNull::from(waiter)) };
1130
1131 if waiters.is_empty() && get_state(notify_state) == WAITING {
1132 notify_state = set_state(notify_state, EMPTY);
1133 notify.state.store(notify_state, SeqCst);
1134 }
1135
1136 // See if the node was notified but not received. In this case, if
1137 // the notification was triggered via `notify_one`, it must be sent
1138 // to the next waiter.
1139 if notification == Some(Notification::One) {
1140 if let Some(waker) = notify_locked(&mut waiters, ¬ify.state, notify_state) {
1141 drop(waiters);
1142 waker.wake();
1143 }
1144 }
1145 }
1146 }
1147 }
1148
1149 /// # Safety
1150 ///
1151 /// `Waiter` is forced to be !Unpin.
1152 unsafe impl linked_list::Link for Waiter {
1153 type Handle = NonNull<Waiter>;
1154 type Target = Waiter;
1155
as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter>1156 fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1157 *handle
1158 }
1159
from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter>1160 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1161 ptr
1162 }
1163
pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>>1164 unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1165 Waiter::addr_of_pointers(target)
1166 }
1167 }
1168
is_unpin<T: Unpin>()1169 fn is_unpin<T: Unpin>() {}
1170