• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2 
3 //! A single-producer, multi-consumer channel that only retains the *last* sent
4 //! value.
5 //!
6 //! This channel is useful for watching for changes to a value from multiple
7 //! points in the code base, for example, changes to configuration values.
8 //!
9 //! # Usage
10 //!
11 //! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer
12 //! and consumer halves of the channel. The channel is created with an initial
13 //! value. The **latest** value stored in the channel is accessed with
14 //! [`Receiver::borrow()`]. Awaiting [`Receiver::changed()`] waits for a new
15 //! value to be sent by the [`Sender`] half.
16 //!
17 //! # Examples
18 //!
19 //! ```
20 //! use tokio::sync::watch;
21 //!
22 //! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
23 //! let (tx, mut rx) = watch::channel("hello");
24 //!
25 //! tokio::spawn(async move {
26 //!     while rx.changed().await.is_ok() {
27 //!         println!("received = {:?}", *rx.borrow());
28 //!     }
29 //! });
30 //!
31 //! tx.send("world")?;
32 //! # Ok(())
33 //! # }
34 //! ```
35 //!
36 //! # Closing
37 //!
38 //! [`Sender::is_closed`] and [`Sender::closed`] allow the producer to detect
39 //! when all [`Receiver`] handles have been dropped. This indicates that there
40 //! is no further interest in the values being produced and work can be stopped.
41 //!
42 //! The value in the channel will not be dropped until the sender and all receivers
43 //! have been dropped.
44 //!
45 //! # Thread safety
46 //!
47 //! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
48 //! threads and can be used in a concurrent environment. Clones of [`Receiver`]
49 //! handles may be moved to separate threads and also used concurrently.
50 //!
51 //! [`Sender`]: crate::sync::watch::Sender
52 //! [`Receiver`]: crate::sync::watch::Receiver
53 //! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
54 //! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow
55 //! [`channel`]: crate::sync::watch::channel
56 //! [`Sender::is_closed`]: crate::sync::watch::Sender::is_closed
57 //! [`Sender::closed`]: crate::sync::watch::Sender::closed
58 
59 use crate::sync::notify::Notify;
60 
61 use crate::loom::sync::atomic::AtomicUsize;
62 use crate::loom::sync::atomic::Ordering::Relaxed;
63 use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
64 use std::fmt;
65 use std::mem;
66 use std::ops;
67 use std::panic;
68 
69 /// Receives values from the associated [`Sender`](struct@Sender).
70 ///
71 /// Instances are created by the [`channel`](fn@channel) function.
72 ///
73 /// To turn this receiver into a `Stream`, you can use the [`WatchStream`]
74 /// wrapper.
75 ///
76 /// [`WatchStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.WatchStream.html
77 #[derive(Debug)]
78 pub struct Receiver<T> {
79     /// Pointer to the shared state
80     shared: Arc<Shared<T>>,
81 
82     /// Last observed version
83     version: Version,
84 }
85 
86 /// Sends values to the associated [`Receiver`](struct@Receiver).
87 ///
88 /// Instances are created by the [`channel`](fn@channel) function.
89 #[derive(Debug)]
90 pub struct Sender<T> {
91     shared: Arc<Shared<T>>,
92 }
93 
94 /// Returns a reference to the inner value.
95 ///
96 /// Outstanding borrows hold a read lock on the inner value. This means that
97 /// long-lived borrows could cause the producer half to block. It is recommended
98 /// to keep the borrow as short-lived as possible. Additionally, if you are
99 /// running in an environment that allows `!Send` futures, you must ensure that
100 /// the returned `Ref` type is never held alive across an `.await` point,
101 /// otherwise, it can lead to a deadlock.
102 ///
103 /// The priority policy of the lock is dependent on the underlying lock
104 /// implementation, and this type does not guarantee that any particular policy
105 /// will be used. In particular, a producer which is waiting to acquire the lock
106 /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
107 ///
108 /// <details><summary>Potential deadlock example</summary>
109 ///
110 /// ```text
111 /// // Task 1 (on thread A)    |  // Task 2 (on thread B)
112 /// let _ref1 = rx.borrow();   |
113 ///                            |  // will block
114 ///                            |  let _ = tx.send(());
115 /// // may deadlock            |
116 /// let _ref2 = rx.borrow();   |
117 /// ```
118 /// </details>
119 #[derive(Debug)]
120 pub struct Ref<'a, T> {
121     inner: RwLockReadGuard<'a, T>,
122     has_changed: bool,
123 }
124 
125 impl<'a, T> Ref<'a, T> {
126     /// Indicates if the borrowed value is considered as _changed_ since the last
127     /// time it has been marked as seen.
128     ///
129     /// Unlike [`Receiver::has_changed()`], this method does not fail if the channel is closed.
130     ///
131     /// When borrowed from the [`Sender`] this function will always return `false`.
132     ///
133     /// # Examples
134     ///
135     /// ```
136     /// use tokio::sync::watch;
137     ///
138     /// #[tokio::main]
139     /// async fn main() {
140     ///     let (tx, mut rx) = watch::channel("hello");
141     ///
142     ///     tx.send("goodbye").unwrap();
143     ///     // The sender does never consider the value as changed.
144     ///     assert!(!tx.borrow().has_changed());
145     ///
146     ///     // Drop the sender immediately, just for testing purposes.
147     ///     drop(tx);
148     ///
149     ///     // Even if the sender has already been dropped...
150     ///     assert!(rx.has_changed().is_err());
151     ///     // ...the modified value is still readable and detected as changed.
152     ///     assert_eq!(*rx.borrow(), "goodbye");
153     ///     assert!(rx.borrow().has_changed());
154     ///
155     ///     // Read the changed value and mark it as seen.
156     ///     {
157     ///         let received = rx.borrow_and_update();
158     ///         assert_eq!(*received, "goodbye");
159     ///         assert!(received.has_changed());
160     ///         // Release the read lock when leaving this scope.
161     ///     }
162     ///
163     ///     // Now the value has already been marked as seen and could
164     ///     // never be modified again (after the sender has been dropped).
165     ///     assert!(!rx.borrow().has_changed());
166     /// }
167     /// ```
has_changed(&self) -> bool168     pub fn has_changed(&self) -> bool {
169         self.has_changed
170     }
171 }
172 
173 struct Shared<T> {
174     /// The most recent value.
175     value: RwLock<T>,
176 
177     /// The current version.
178     ///
179     /// The lowest bit represents a "closed" state. The rest of the bits
180     /// represent the current version.
181     state: AtomicState,
182 
183     /// Tracks the number of `Receiver` instances.
184     ref_count_rx: AtomicUsize,
185 
186     /// Notifies waiting receivers that the value changed.
187     notify_rx: big_notify::BigNotify,
188 
189     /// Notifies any task listening for `Receiver` dropped events.
190     notify_tx: Notify,
191 }
192 
193 impl<T: fmt::Debug> fmt::Debug for Shared<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result194     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195         let state = self.state.load();
196         f.debug_struct("Shared")
197             .field("value", &self.value)
198             .field("version", &state.version())
199             .field("is_closed", &state.is_closed())
200             .field("ref_count_rx", &self.ref_count_rx)
201             .finish()
202     }
203 }
204 
205 pub mod error {
206     //! Watch error types.
207 
208     use std::error::Error;
209     use std::fmt;
210 
211     /// Error produced when sending a value fails.
212     #[derive(PartialEq, Eq, Clone, Copy)]
213     pub struct SendError<T>(pub T);
214 
215     // ===== impl SendError =====
216 
217     impl<T> fmt::Debug for SendError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result218         fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219             f.debug_struct("SendError").finish_non_exhaustive()
220         }
221     }
222 
223     impl<T> fmt::Display for SendError<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result224         fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
225             write!(fmt, "channel closed")
226         }
227     }
228 
229     impl<T> Error for SendError<T> {}
230 
231     /// Error produced when receiving a change notification.
232     #[derive(Debug, Clone)]
233     pub struct RecvError(pub(super) ());
234 
235     // ===== impl RecvError =====
236 
237     impl fmt::Display for RecvError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result238         fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
239             write!(fmt, "channel closed")
240         }
241     }
242 
243     impl Error for RecvError {}
244 }
245 
246 mod big_notify {
247     use super::*;
248     use crate::sync::notify::Notified;
249 
250     // To avoid contention on the lock inside the `Notify`, we store multiple
251     // copies of it. Then, we use either circular access or randomness to spread
252     // out threads over different `Notify` objects.
253     //
254     // Some simple benchmarks show that randomness performs slightly better than
255     // circular access (probably due to contention on `next`), so we prefer to
256     // use randomness when Tokio is compiled with a random number generator.
257     //
258     // When the random number generator is not available, we fall back to
259     // circular access.
260 
261     pub(super) struct BigNotify {
262         #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
263         next: AtomicUsize,
264         inner: [Notify; 8],
265     }
266 
267     impl BigNotify {
new() -> Self268         pub(super) fn new() -> Self {
269             Self {
270                 #[cfg(not(all(
271                     not(loom),
272                     feature = "sync",
273                     any(feature = "rt", feature = "macros")
274                 )))]
275                 next: AtomicUsize::new(0),
276                 inner: Default::default(),
277             }
278         }
279 
notify_waiters(&self)280         pub(super) fn notify_waiters(&self) {
281             for notify in &self.inner {
282                 notify.notify_waiters();
283             }
284         }
285 
286         /// This function implements the case where randomness is not available.
287         #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
notified(&self) -> Notified<'_>288         pub(super) fn notified(&self) -> Notified<'_> {
289             let i = self.next.fetch_add(1, Relaxed) % 8;
290             self.inner[i].notified()
291         }
292 
293         /// This function implements the case where randomness is available.
294         #[cfg(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros")))]
notified(&self) -> Notified<'_>295         pub(super) fn notified(&self) -> Notified<'_> {
296             let i = crate::runtime::context::thread_rng_n(8) as usize;
297             self.inner[i].notified()
298         }
299     }
300 }
301 
302 use self::state::{AtomicState, Version};
303 mod state {
304     use crate::loom::sync::atomic::AtomicUsize;
305     use crate::loom::sync::atomic::Ordering::SeqCst;
306 
307     const CLOSED: usize = 1;
308 
309     /// The version part of the state. The lowest bit is always zero.
310     #[derive(Copy, Clone, Debug, Eq, PartialEq)]
311     pub(super) struct Version(usize);
312 
313     /// Snapshot of the state. The first bit is used as the CLOSED bit.
314     /// The remaining bits are used as the version.
315     ///
316     /// The CLOSED bit tracks whether the Sender has been dropped. Dropping all
317     /// receivers does not set it.
318     #[derive(Copy, Clone, Debug)]
319     pub(super) struct StateSnapshot(usize);
320 
321     /// The state stored in an atomic integer.
322     #[derive(Debug)]
323     pub(super) struct AtomicState(AtomicUsize);
324 
325     impl Version {
326         /// Get the initial version when creating the channel.
initial() -> Self327         pub(super) fn initial() -> Self {
328             Version(0)
329         }
330     }
331 
332     impl StateSnapshot {
333         /// Extract the version from the state.
version(self) -> Version334         pub(super) fn version(self) -> Version {
335             Version(self.0 & !CLOSED)
336         }
337 
338         /// Is the closed bit set?
is_closed(self) -> bool339         pub(super) fn is_closed(self) -> bool {
340             (self.0 & CLOSED) == CLOSED
341         }
342     }
343 
344     impl AtomicState {
345         /// Create a new `AtomicState` that is not closed and which has the
346         /// version set to `Version::initial()`.
new() -> Self347         pub(super) fn new() -> Self {
348             AtomicState(AtomicUsize::new(0))
349         }
350 
351         /// Load the current value of the state.
load(&self) -> StateSnapshot352         pub(super) fn load(&self) -> StateSnapshot {
353             StateSnapshot(self.0.load(SeqCst))
354         }
355 
356         /// Increment the version counter.
increment_version(&self)357         pub(super) fn increment_version(&self) {
358             // Increment by two to avoid touching the CLOSED bit.
359             self.0.fetch_add(2, SeqCst);
360         }
361 
362         /// Set the closed bit in the state.
set_closed(&self)363         pub(super) fn set_closed(&self) {
364             self.0.fetch_or(CLOSED, SeqCst);
365         }
366     }
367 }
368 
369 /// Creates a new watch channel, returning the "send" and "receive" handles.
370 ///
371 /// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
372 /// Only the last value sent is made available to the [`Receiver`] half. All
373 /// intermediate values are dropped.
374 ///
375 /// # Examples
376 ///
377 /// ```
378 /// use tokio::sync::watch;
379 ///
380 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
381 ///     let (tx, mut rx) = watch::channel("hello");
382 ///
383 ///     tokio::spawn(async move {
384 ///         while rx.changed().await.is_ok() {
385 ///             println!("received = {:?}", *rx.borrow());
386 ///         }
387 ///     });
388 ///
389 ///     tx.send("world")?;
390 /// # Ok(())
391 /// # }
392 /// ```
393 ///
394 /// [`Sender`]: struct@Sender
395 /// [`Receiver`]: struct@Receiver
channel<T>(init: T) -> (Sender<T>, Receiver<T>)396 pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
397     let shared = Arc::new(Shared {
398         value: RwLock::new(init),
399         state: AtomicState::new(),
400         ref_count_rx: AtomicUsize::new(1),
401         notify_rx: big_notify::BigNotify::new(),
402         notify_tx: Notify::new(),
403     });
404 
405     let tx = Sender {
406         shared: shared.clone(),
407     };
408 
409     let rx = Receiver {
410         shared,
411         version: Version::initial(),
412     };
413 
414     (tx, rx)
415 }
416 
417 impl<T> Receiver<T> {
from_shared(version: Version, shared: Arc<Shared<T>>) -> Self418     fn from_shared(version: Version, shared: Arc<Shared<T>>) -> Self {
419         // No synchronization necessary as this is only used as a counter and
420         // not memory access.
421         shared.ref_count_rx.fetch_add(1, Relaxed);
422 
423         Self { shared, version }
424     }
425 
426     /// Returns a reference to the most recently sent value.
427     ///
428     /// This method does not mark the returned value as seen, so future calls to
429     /// [`changed`] may return immediately even if you have already seen the
430     /// value with a call to `borrow`.
431     ///
432     /// Outstanding borrows hold a read lock on the inner value. This means that
433     /// long-lived borrows could cause the producer half to block. It is recommended
434     /// to keep the borrow as short-lived as possible. Additionally, if you are
435     /// running in an environment that allows `!Send` futures, you must ensure that
436     /// the returned `Ref` type is never held alive across an `.await` point,
437     /// otherwise, it can lead to a deadlock.
438     ///
439     /// The priority policy of the lock is dependent on the underlying lock
440     /// implementation, and this type does not guarantee that any particular policy
441     /// will be used. In particular, a producer which is waiting to acquire the lock
442     /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
443     ///
444     /// <details><summary>Potential deadlock example</summary>
445     ///
446     /// ```text
447     /// // Task 1 (on thread A)    |  // Task 2 (on thread B)
448     /// let _ref1 = rx.borrow();   |
449     ///                            |  // will block
450     ///                            |  let _ = tx.send(());
451     /// // may deadlock            |
452     /// let _ref2 = rx.borrow();   |
453     /// ```
454     /// </details>
455     ///
456     /// [`changed`]: Receiver::changed
457     ///
458     /// # Examples
459     ///
460     /// ```
461     /// use tokio::sync::watch;
462     ///
463     /// let (_, rx) = watch::channel("hello");
464     /// assert_eq!(*rx.borrow(), "hello");
465     /// ```
borrow(&self) -> Ref<'_, T>466     pub fn borrow(&self) -> Ref<'_, T> {
467         let inner = self.shared.value.read().unwrap();
468 
469         // After obtaining a read-lock no concurrent writes could occur
470         // and the loaded version matches that of the borrowed reference.
471         let new_version = self.shared.state.load().version();
472         let has_changed = self.version != new_version;
473 
474         Ref { inner, has_changed }
475     }
476 
477     /// Returns a reference to the most recently sent value and marks that value
478     /// as seen.
479     ///
480     /// This method marks the current value as seen. Subsequent calls to [`changed`]
481     /// will not return immediately until the [`Sender`] has modified the shared
482     /// value again.
483     ///
484     /// Outstanding borrows hold a read lock on the inner value. This means that
485     /// long-lived borrows could cause the producer half to block. It is recommended
486     /// to keep the borrow as short-lived as possible. Additionally, if you are
487     /// running in an environment that allows `!Send` futures, you must ensure that
488     /// the returned `Ref` type is never held alive across an `.await` point,
489     /// otherwise, it can lead to a deadlock.
490     ///
491     /// The priority policy of the lock is dependent on the underlying lock
492     /// implementation, and this type does not guarantee that any particular policy
493     /// will be used. In particular, a producer which is waiting to acquire the lock
494     /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
495     ///
496     /// <details><summary>Potential deadlock example</summary>
497     ///
498     /// ```text
499     /// // Task 1 (on thread A)                |  // Task 2 (on thread B)
500     /// let _ref1 = rx1.borrow_and_update();   |
501     ///                                        |  // will block
502     ///                                        |  let _ = tx.send(());
503     /// // may deadlock                        |
504     /// let _ref2 = rx2.borrow_and_update();   |
505     /// ```
506     /// </details>
507     ///
508     /// [`changed`]: Receiver::changed
borrow_and_update(&mut self) -> Ref<'_, T>509     pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
510         let inner = self.shared.value.read().unwrap();
511 
512         // After obtaining a read-lock no concurrent writes could occur
513         // and the loaded version matches that of the borrowed reference.
514         let new_version = self.shared.state.load().version();
515         let has_changed = self.version != new_version;
516 
517         // Mark the shared value as seen by updating the version
518         self.version = new_version;
519 
520         Ref { inner, has_changed }
521     }
522 
523     /// Checks if this channel contains a message that this receiver has not yet
524     /// seen. The new value is not marked as seen.
525     ///
526     /// Although this method is called `has_changed`, it does not check new
527     /// messages for equality, so this call will return true even if the new
528     /// message is equal to the old message.
529     ///
530     /// Returns an error if the channel has been closed.
531     /// # Examples
532     ///
533     /// ```
534     /// use tokio::sync::watch;
535     ///
536     /// #[tokio::main]
537     /// async fn main() {
538     ///     let (tx, mut rx) = watch::channel("hello");
539     ///
540     ///     tx.send("goodbye").unwrap();
541     ///
542     ///     assert!(rx.has_changed().unwrap());
543     ///     assert_eq!(*rx.borrow_and_update(), "goodbye");
544     ///
545     ///     // The value has been marked as seen
546     ///     assert!(!rx.has_changed().unwrap());
547     ///
548     ///     drop(tx);
549     ///     // The `tx` handle has been dropped
550     ///     assert!(rx.has_changed().is_err());
551     /// }
552     /// ```
has_changed(&self) -> Result<bool, error::RecvError>553     pub fn has_changed(&self) -> Result<bool, error::RecvError> {
554         // Load the version from the state
555         let state = self.shared.state.load();
556         if state.is_closed() {
557             // The sender has dropped.
558             return Err(error::RecvError(()));
559         }
560         let new_version = state.version();
561 
562         Ok(self.version != new_version)
563     }
564 
565     /// Waits for a change notification, then marks the newest value as seen.
566     ///
567     /// If the newest value in the channel has not yet been marked seen when
568     /// this method is called, the method marks that value seen and returns
569     /// immediately. If the newest value has already been marked seen, then the
570     /// method sleeps until a new message is sent by the [`Sender`] connected to
571     /// this `Receiver`, or until the [`Sender`] is dropped.
572     ///
573     /// This method returns an error if and only if the [`Sender`] is dropped.
574     ///
575     /// # Cancel safety
576     ///
577     /// This method is cancel safe. If you use it as the event in a
578     /// [`tokio::select!`](crate::select) statement and some other branch
579     /// completes first, then it is guaranteed that no values have been marked
580     /// seen by this call to `changed`.
581     ///
582     /// [`Sender`]: struct@Sender
583     ///
584     /// # Examples
585     ///
586     /// ```
587     /// use tokio::sync::watch;
588     ///
589     /// #[tokio::main]
590     /// async fn main() {
591     ///     let (tx, mut rx) = watch::channel("hello");
592     ///
593     ///     tokio::spawn(async move {
594     ///         tx.send("goodbye").unwrap();
595     ///     });
596     ///
597     ///     assert!(rx.changed().await.is_ok());
598     ///     assert_eq!(*rx.borrow(), "goodbye");
599     ///
600     ///     // The `tx` handle has been dropped
601     ///     assert!(rx.changed().await.is_err());
602     /// }
603     /// ```
changed(&mut self) -> Result<(), error::RecvError>604     pub async fn changed(&mut self) -> Result<(), error::RecvError> {
605         changed_impl(&self.shared, &mut self.version).await
606     }
607 
608     /// Waits for a value that satisifes the provided condition.
609     ///
610     /// This method will call the provided closure whenever something is sent on
611     /// the channel. Once the closure returns `true`, this method will return a
612     /// reference to the value that was passed to the closure.
613     ///
614     /// Before `wait_for` starts waiting for changes, it will call the closure
615     /// on the current value. If the closure returns `true` when given the
616     /// current value, then `wait_for` will immediately return a reference to
617     /// the current value. This is the case even if the current value is already
618     /// considered seen.
619     ///
620     /// The watch channel only keeps track of the most recent value, so if
621     /// several messages are sent faster than `wait_for` is able to call the
622     /// closure, then it may skip some updates. Whenever the closure is called,
623     /// it will be called with the most recent value.
624     ///
625     /// When this function returns, the value that was passed to the closure
626     /// when it returned `true` will be considered seen.
627     ///
628     /// If the channel is closed, then `wait_for` will return a `RecvError`.
629     /// Once this happens, no more messages can ever be sent on the channel.
630     /// When an error is returned, it is guaranteed that the closure has been
631     /// called on the last value, and that it returned `false` for that value.
632     /// (If the closure returned `true`, then the last value would have been
633     /// returned instead of the error.)
634     ///
635     /// Like the `borrow` method, the returned borrow holds a read lock on the
636     /// inner value. This means that long-lived borrows could cause the producer
637     /// half to block. It is recommended to keep the borrow as short-lived as
638     /// possible. See the documentation of `borrow` for more information on
639     /// this.
640     ///
641     /// [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
642     ///
643     /// # Examples
644     ///
645     /// ```
646     /// use tokio::sync::watch;
647     ///
648     /// #[tokio::main]
649     ///
650     /// async fn main() {
651     ///     let (tx, _rx) = watch::channel("hello");
652     ///
653     ///     tx.send("goodbye").unwrap();
654     ///
655     ///     // here we subscribe to a second receiver
656     ///     // now in case of using `changed` we would have
657     ///     // to first check the current value and then wait
658     ///     // for changes or else `changed` would hang.
659     ///     let mut rx2 = tx.subscribe();
660     ///
661     ///     // in place of changed we have use `wait_for`
662     ///     // which would automatically check the current value
663     ///     // and wait for changes until the closure returns true.
664     ///     assert!(rx2.wait_for(|val| *val == "goodbye").await.is_ok());
665     ///     assert_eq!(*rx2.borrow(), "goodbye");
666     /// }
667     /// ```
wait_for( &mut self, mut f: impl FnMut(&T) -> bool, ) -> Result<Ref<'_, T>, error::RecvError>668     pub async fn wait_for(
669         &mut self,
670         mut f: impl FnMut(&T) -> bool,
671     ) -> Result<Ref<'_, T>, error::RecvError> {
672         let mut closed = false;
673         loop {
674             {
675                 let inner = self.shared.value.read().unwrap();
676 
677                 let new_version = self.shared.state.load().version();
678                 let has_changed = self.version != new_version;
679                 self.version = new_version;
680 
681                 if (!closed || has_changed) && f(&inner) {
682                     return Ok(Ref { inner, has_changed });
683                 }
684             }
685 
686             if closed {
687                 return Err(error::RecvError(()));
688             }
689 
690             // Wait for the value to change.
691             closed = changed_impl(&self.shared, &mut self.version).await.is_err();
692         }
693     }
694 
695     /// Returns `true` if receivers belong to the same channel.
696     ///
697     /// # Examples
698     ///
699     /// ```
700     /// let (tx, rx) = tokio::sync::watch::channel(true);
701     /// let rx2 = rx.clone();
702     /// assert!(rx.same_channel(&rx2));
703     ///
704     /// let (tx3, rx3) = tokio::sync::watch::channel(true);
705     /// assert!(!rx3.same_channel(&rx2));
706     /// ```
same_channel(&self, other: &Self) -> bool707     pub fn same_channel(&self, other: &Self) -> bool {
708         Arc::ptr_eq(&self.shared, &other.shared)
709     }
710 
711     cfg_process_driver! {
712         pub(crate) fn try_has_changed(&mut self) -> Option<Result<(), error::RecvError>> {
713             maybe_changed(&self.shared, &mut self.version)
714         }
715     }
716 }
717 
maybe_changed<T>( shared: &Shared<T>, version: &mut Version, ) -> Option<Result<(), error::RecvError>>718 fn maybe_changed<T>(
719     shared: &Shared<T>,
720     version: &mut Version,
721 ) -> Option<Result<(), error::RecvError>> {
722     // Load the version from the state
723     let state = shared.state.load();
724     let new_version = state.version();
725 
726     if *version != new_version {
727         // Observe the new version and return
728         *version = new_version;
729         return Some(Ok(()));
730     }
731 
732     if state.is_closed() {
733         // All receivers have dropped.
734         return Some(Err(error::RecvError(())));
735     }
736 
737     None
738 }
739 
changed_impl<T>( shared: &Shared<T>, version: &mut Version, ) -> Result<(), error::RecvError>740 async fn changed_impl<T>(
741     shared: &Shared<T>,
742     version: &mut Version,
743 ) -> Result<(), error::RecvError> {
744     crate::trace::async_trace_leaf().await;
745 
746     loop {
747         // In order to avoid a race condition, we first request a notification,
748         // **then** check the current value's version. If a new version exists,
749         // the notification request is dropped.
750         let notified = shared.notify_rx.notified();
751 
752         if let Some(ret) = maybe_changed(shared, version) {
753             return ret;
754         }
755 
756         notified.await;
757         // loop around again in case the wake-up was spurious
758     }
759 }
760 
761 impl<T> Clone for Receiver<T> {
clone(&self) -> Self762     fn clone(&self) -> Self {
763         let version = self.version;
764         let shared = self.shared.clone();
765 
766         Self::from_shared(version, shared)
767     }
768 }
769 
770 impl<T> Drop for Receiver<T> {
drop(&mut self)771     fn drop(&mut self) {
772         // No synchronization necessary as this is only used as a counter and
773         // not memory access.
774         if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) {
775             // This is the last `Receiver` handle, tasks waiting on `Sender::closed()`
776             self.shared.notify_tx.notify_waiters();
777         }
778     }
779 }
780 
781 impl<T> Sender<T> {
782     /// Sends a new value via the channel, notifying all receivers.
783     ///
784     /// This method fails if the channel is closed, which is the case when
785     /// every receiver has been dropped. It is possible to reopen the channel
786     /// using the [`subscribe`] method. However, when `send` fails, the value
787     /// isn't made available for future receivers (but returned with the
788     /// [`SendError`]).
789     ///
790     /// To always make a new value available for future receivers, even if no
791     /// receiver currently exists, one of the other send methods
792     /// ([`send_if_modified`], [`send_modify`], or [`send_replace`]) can be
793     /// used instead.
794     ///
795     /// [`subscribe`]: Sender::subscribe
796     /// [`SendError`]: error::SendError
797     /// [`send_if_modified`]: Sender::send_if_modified
798     /// [`send_modify`]: Sender::send_modify
799     /// [`send_replace`]: Sender::send_replace
send(&self, value: T) -> Result<(), error::SendError<T>>800     pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
801         // This is pretty much only useful as a hint anyway, so synchronization isn't critical.
802         if 0 == self.receiver_count() {
803             return Err(error::SendError(value));
804         }
805 
806         self.send_replace(value);
807         Ok(())
808     }
809 
810     /// Modifies the watched value **unconditionally** in-place,
811     /// notifying all receivers.
812     ///
813     /// This can be useful for modifying the watched value, without
814     /// having to allocate a new instance. Additionally, this
815     /// method permits sending values even when there are no receivers.
816     ///
817     /// Prefer to use the more versatile function [`Self::send_if_modified()`]
818     /// if the value is only modified conditionally during the mutable borrow
819     /// to prevent unneeded change notifications for unmodified values.
820     ///
821     /// # Panics
822     ///
823     /// This function panics when the invocation of the `modify` closure panics.
824     /// No receivers are notified when panicking. All changes of the watched
825     /// value applied by the closure before panicking will be visible in
826     /// subsequent calls to `borrow`.
827     ///
828     /// # Examples
829     ///
830     /// ```
831     /// use tokio::sync::watch;
832     ///
833     /// struct State {
834     ///     counter: usize,
835     /// }
836     /// let (state_tx, state_rx) = watch::channel(State { counter: 0 });
837     /// state_tx.send_modify(|state| state.counter += 1);
838     /// assert_eq!(state_rx.borrow().counter, 1);
839     /// ```
send_modify<F>(&self, modify: F) where F: FnOnce(&mut T),840     pub fn send_modify<F>(&self, modify: F)
841     where
842         F: FnOnce(&mut T),
843     {
844         self.send_if_modified(|value| {
845             modify(value);
846             true
847         });
848     }
849 
850     /// Modifies the watched value **conditionally** in-place,
851     /// notifying all receivers only if modified.
852     ///
853     /// This can be useful for modifying the watched value, without
854     /// having to allocate a new instance. Additionally, this
855     /// method permits sending values even when there are no receivers.
856     ///
857     /// The `modify` closure must return `true` if the value has actually
858     /// been modified during the mutable borrow. It should only return `false`
859     /// if the value is guaranteed to be unmodified despite the mutable
860     /// borrow.
861     ///
862     /// Receivers are only notified if the closure returned `true`. If the
863     /// closure has modified the value but returned `false` this results
864     /// in a *silent modification*, i.e. the modified value will be visible
865     /// in subsequent calls to `borrow`, but receivers will not receive
866     /// a change notification.
867     ///
868     /// Returns the result of the closure, i.e. `true` if the value has
869     /// been modified and `false` otherwise.
870     ///
871     /// # Panics
872     ///
873     /// This function panics when the invocation of the `modify` closure panics.
874     /// No receivers are notified when panicking. All changes of the watched
875     /// value applied by the closure before panicking will be visible in
876     /// subsequent calls to `borrow`.
877     ///
878     /// # Examples
879     ///
880     /// ```
881     /// use tokio::sync::watch;
882     ///
883     /// struct State {
884     ///     counter: usize,
885     /// }
886     /// let (state_tx, mut state_rx) = watch::channel(State { counter: 1 });
887     /// let inc_counter_if_odd = |state: &mut State| {
888     ///     if state.counter % 2 == 1 {
889     ///         state.counter += 1;
890     ///         return true;
891     ///     }
892     ///     false
893     /// };
894     ///
895     /// assert_eq!(state_rx.borrow().counter, 1);
896     ///
897     /// assert!(!state_rx.has_changed().unwrap());
898     /// assert!(state_tx.send_if_modified(inc_counter_if_odd));
899     /// assert!(state_rx.has_changed().unwrap());
900     /// assert_eq!(state_rx.borrow_and_update().counter, 2);
901     ///
902     /// assert!(!state_rx.has_changed().unwrap());
903     /// assert!(!state_tx.send_if_modified(inc_counter_if_odd));
904     /// assert!(!state_rx.has_changed().unwrap());
905     /// assert_eq!(state_rx.borrow_and_update().counter, 2);
906     /// ```
send_if_modified<F>(&self, modify: F) -> bool where F: FnOnce(&mut T) -> bool,907     pub fn send_if_modified<F>(&self, modify: F) -> bool
908     where
909         F: FnOnce(&mut T) -> bool,
910     {
911         {
912             // Acquire the write lock and update the value.
913             let mut lock = self.shared.value.write().unwrap();
914 
915             // Update the value and catch possible panic inside func.
916             let result = panic::catch_unwind(panic::AssertUnwindSafe(|| modify(&mut lock)));
917             match result {
918                 Ok(modified) => {
919                     if !modified {
920                         // Abort, i.e. don't notify receivers if unmodified
921                         return false;
922                     }
923                     // Continue if modified
924                 }
925                 Err(panicked) => {
926                     // Drop the lock to avoid poisoning it.
927                     drop(lock);
928                     // Forward the panic to the caller.
929                     panic::resume_unwind(panicked);
930                     // Unreachable
931                 }
932             };
933 
934             self.shared.state.increment_version();
935 
936             // Release the write lock.
937             //
938             // Incrementing the version counter while holding the lock ensures
939             // that receivers are able to figure out the version number of the
940             // value they are currently looking at.
941             drop(lock);
942         }
943 
944         self.shared.notify_rx.notify_waiters();
945 
946         true
947     }
948 
949     /// Sends a new value via the channel, notifying all receivers and returning
950     /// the previous value in the channel.
951     ///
952     /// This can be useful for reusing the buffers inside a watched value.
953     /// Additionally, this method permits sending values even when there are no
954     /// receivers.
955     ///
956     /// # Examples
957     ///
958     /// ```
959     /// use tokio::sync::watch;
960     ///
961     /// let (tx, _rx) = watch::channel(1);
962     /// assert_eq!(tx.send_replace(2), 1);
963     /// assert_eq!(tx.send_replace(3), 2);
964     /// ```
send_replace(&self, mut value: T) -> T965     pub fn send_replace(&self, mut value: T) -> T {
966         // swap old watched value with the new one
967         self.send_modify(|old| mem::swap(old, &mut value));
968 
969         value
970     }
971 
972     /// Returns a reference to the most recently sent value
973     ///
974     /// Outstanding borrows hold a read lock on the inner value. This means that
975     /// long-lived borrows could cause the producer half to block. It is recommended
976     /// to keep the borrow as short-lived as possible. Additionally, if you are
977     /// running in an environment that allows `!Send` futures, you must ensure that
978     /// the returned `Ref` type is never held alive across an `.await` point,
979     /// otherwise, it can lead to a deadlock.
980     ///
981     /// # Examples
982     ///
983     /// ```
984     /// use tokio::sync::watch;
985     ///
986     /// let (tx, _) = watch::channel("hello");
987     /// assert_eq!(*tx.borrow(), "hello");
988     /// ```
borrow(&self) -> Ref<'_, T>989     pub fn borrow(&self) -> Ref<'_, T> {
990         let inner = self.shared.value.read().unwrap();
991 
992         // The sender/producer always sees the current version
993         let has_changed = false;
994 
995         Ref { inner, has_changed }
996     }
997 
998     /// Checks if the channel has been closed. This happens when all receivers
999     /// have dropped.
1000     ///
1001     /// # Examples
1002     ///
1003     /// ```
1004     /// let (tx, rx) = tokio::sync::watch::channel(());
1005     /// assert!(!tx.is_closed());
1006     ///
1007     /// drop(rx);
1008     /// assert!(tx.is_closed());
1009     /// ```
is_closed(&self) -> bool1010     pub fn is_closed(&self) -> bool {
1011         self.receiver_count() == 0
1012     }
1013 
1014     /// Completes when all receivers have dropped.
1015     ///
1016     /// This allows the producer to get notified when interest in the produced
1017     /// values is canceled and immediately stop doing work.
1018     ///
1019     /// # Cancel safety
1020     ///
1021     /// This method is cancel safe. Once the channel is closed, it stays closed
1022     /// forever and all future calls to `closed` will return immediately.
1023     ///
1024     /// # Examples
1025     ///
1026     /// ```
1027     /// use tokio::sync::watch;
1028     ///
1029     /// #[tokio::main]
1030     /// async fn main() {
1031     ///     let (tx, rx) = watch::channel("hello");
1032     ///
1033     ///     tokio::spawn(async move {
1034     ///         // use `rx`
1035     ///         drop(rx);
1036     ///     });
1037     ///
1038     ///     // Waits for `rx` to drop
1039     ///     tx.closed().await;
1040     ///     println!("the `rx` handles dropped")
1041     /// }
1042     /// ```
closed(&self)1043     pub async fn closed(&self) {
1044         crate::trace::async_trace_leaf().await;
1045 
1046         while self.receiver_count() > 0 {
1047             let notified = self.shared.notify_tx.notified();
1048 
1049             if self.receiver_count() == 0 {
1050                 return;
1051             }
1052 
1053             notified.await;
1054             // The channel could have been reopened in the meantime by calling
1055             // `subscribe`, so we loop again.
1056         }
1057     }
1058 
1059     /// Creates a new [`Receiver`] connected to this `Sender`.
1060     ///
1061     /// All messages sent before this call to `subscribe` are initially marked
1062     /// as seen by the new `Receiver`.
1063     ///
1064     /// This method can be called even if there are no other receivers. In this
1065     /// case, the channel is reopened.
1066     ///
1067     /// # Examples
1068     ///
1069     /// The new channel will receive messages sent on this `Sender`.
1070     ///
1071     /// ```
1072     /// use tokio::sync::watch;
1073     ///
1074     /// #[tokio::main]
1075     /// async fn main() {
1076     ///     let (tx, _rx) = watch::channel(0u64);
1077     ///
1078     ///     tx.send(5).unwrap();
1079     ///
1080     ///     let rx = tx.subscribe();
1081     ///     assert_eq!(5, *rx.borrow());
1082     ///
1083     ///     tx.send(10).unwrap();
1084     ///     assert_eq!(10, *rx.borrow());
1085     /// }
1086     /// ```
1087     ///
1088     /// The most recent message is considered seen by the channel, so this test
1089     /// is guaranteed to pass.
1090     ///
1091     /// ```
1092     /// use tokio::sync::watch;
1093     /// use tokio::time::Duration;
1094     ///
1095     /// #[tokio::main]
1096     /// async fn main() {
1097     ///     let (tx, _rx) = watch::channel(0u64);
1098     ///     tx.send(5).unwrap();
1099     ///     let mut rx = tx.subscribe();
1100     ///
1101     ///     tokio::spawn(async move {
1102     ///         // by spawning and sleeping, the message is sent after `main`
1103     ///         // hits the call to `changed`.
1104     ///         # if false {
1105     ///         tokio::time::sleep(Duration::from_millis(10)).await;
1106     ///         # }
1107     ///         tx.send(100).unwrap();
1108     ///     });
1109     ///
1110     ///     rx.changed().await.unwrap();
1111     ///     assert_eq!(100, *rx.borrow());
1112     /// }
1113     /// ```
subscribe(&self) -> Receiver<T>1114     pub fn subscribe(&self) -> Receiver<T> {
1115         let shared = self.shared.clone();
1116         let version = shared.state.load().version();
1117 
1118         // The CLOSED bit in the state tracks only whether the sender is
1119         // dropped, so we do not need to unset it if this reopens the channel.
1120         Receiver::from_shared(version, shared)
1121     }
1122 
1123     /// Returns the number of receivers that currently exist.
1124     ///
1125     /// # Examples
1126     ///
1127     /// ```
1128     /// use tokio::sync::watch;
1129     ///
1130     /// #[tokio::main]
1131     /// async fn main() {
1132     ///     let (tx, rx1) = watch::channel("hello");
1133     ///
1134     ///     assert_eq!(1, tx.receiver_count());
1135     ///
1136     ///     let mut _rx2 = rx1.clone();
1137     ///
1138     ///     assert_eq!(2, tx.receiver_count());
1139     /// }
1140     /// ```
receiver_count(&self) -> usize1141     pub fn receiver_count(&self) -> usize {
1142         self.shared.ref_count_rx.load(Relaxed)
1143     }
1144 }
1145 
1146 impl<T> Drop for Sender<T> {
drop(&mut self)1147     fn drop(&mut self) {
1148         self.shared.state.set_closed();
1149         self.shared.notify_rx.notify_waiters();
1150     }
1151 }
1152 
1153 // ===== impl Ref =====
1154 
1155 impl<T> ops::Deref for Ref<'_, T> {
1156     type Target = T;
1157 
deref(&self) -> &T1158     fn deref(&self) -> &T {
1159         self.inner.deref()
1160     }
1161 }
1162 
1163 #[cfg(all(test, loom))]
1164 mod tests {
1165     use futures::future::FutureExt;
1166     use loom::thread;
1167 
1168     // test for https://github.com/tokio-rs/tokio/issues/3168
1169     #[test]
watch_spurious_wakeup()1170     fn watch_spurious_wakeup() {
1171         loom::model(|| {
1172             let (send, mut recv) = crate::sync::watch::channel(0i32);
1173 
1174             send.send(1).unwrap();
1175 
1176             let send_thread = thread::spawn(move || {
1177                 send.send(2).unwrap();
1178                 send
1179             });
1180 
1181             recv.changed().now_or_never();
1182 
1183             let send = send_thread.join().unwrap();
1184             let recv_thread = thread::spawn(move || {
1185                 recv.changed().now_or_never();
1186                 recv.changed().now_or_never();
1187                 recv
1188             });
1189 
1190             send.send(3).unwrap();
1191 
1192             let mut recv = recv_thread.join().unwrap();
1193             let send_thread = thread::spawn(move || {
1194                 send.send(2).unwrap();
1195             });
1196 
1197             recv.changed().now_or_never();
1198 
1199             send_thread.join().unwrap();
1200         });
1201     }
1202 
1203     #[test]
watch_borrow()1204     fn watch_borrow() {
1205         loom::model(|| {
1206             let (send, mut recv) = crate::sync::watch::channel(0i32);
1207 
1208             assert!(send.borrow().eq(&0));
1209             assert!(recv.borrow().eq(&0));
1210 
1211             send.send(1).unwrap();
1212             assert!(send.borrow().eq(&1));
1213 
1214             let send_thread = thread::spawn(move || {
1215                 send.send(2).unwrap();
1216                 send
1217             });
1218 
1219             recv.changed().now_or_never();
1220 
1221             let send = send_thread.join().unwrap();
1222             let recv_thread = thread::spawn(move || {
1223                 recv.changed().now_or_never();
1224                 recv.changed().now_or_never();
1225                 recv
1226             });
1227 
1228             send.send(3).unwrap();
1229 
1230             let recv = recv_thread.join().unwrap();
1231             assert!(recv.borrow().eq(&3));
1232             assert!(send.borrow().eq(&3));
1233 
1234             send.send(2).unwrap();
1235 
1236             thread::spawn(move || {
1237                 assert!(recv.borrow().eq(&2));
1238             });
1239             assert!(send.borrow().eq(&2));
1240         });
1241     }
1242 }
1243