• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2 
3 //! A single-producer, multi-consumer channel that only retains the *last* sent
4 //! value.
5 //!
6 //! This channel is useful for watching for changes to a value from multiple
7 //! points in the code base, for example, changes to configuration values.
8 //!
9 //! # Usage
10 //!
11 //! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer
12 //! and consumer halves of the channel. The channel is created with an initial
13 //! value. The **latest** value stored in the channel is accessed with
14 //! [`Receiver::borrow()`]. Awaiting [`Receiver::changed()`] waits for a new
15 //! value to be sent by the [`Sender`] half.
16 //!
17 //! # Examples
18 //!
19 //! ```
20 //! use tokio::sync::watch;
21 //!
22 //! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
23 //! let (tx, mut rx) = watch::channel("hello");
24 //!
25 //! tokio::spawn(async move {
26 //!     while rx.changed().await.is_ok() {
27 //!         println!("received = {:?}", *rx.borrow());
28 //!     }
29 //! });
30 //!
31 //! tx.send("world")?;
32 //! # Ok(())
33 //! # }
34 //! ```
35 //!
36 //! # Closing
37 //!
38 //! [`Sender::is_closed`] and [`Sender::closed`] allow the producer to detect
39 //! when all [`Receiver`] handles have been dropped. This indicates that there
40 //! is no further interest in the values being produced and work can be stopped.
41 //!
42 //! # Thread safety
43 //!
44 //! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
45 //! threads and can be used in a concurrent environment. Clones of [`Receiver`]
46 //! handles may be moved to separate threads and also used concurrently.
47 //!
48 //! [`Sender`]: crate::sync::watch::Sender
49 //! [`Receiver`]: crate::sync::watch::Receiver
50 //! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
51 //! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow
52 //! [`channel`]: crate::sync::watch::channel
53 //! [`Sender::is_closed`]: crate::sync::watch::Sender::is_closed
54 //! [`Sender::closed`]: crate::sync::watch::Sender::closed
55 
56 use crate::sync::notify::Notify;
57 
58 use crate::loom::sync::atomic::AtomicUsize;
59 use crate::loom::sync::atomic::Ordering::Relaxed;
60 use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
61 use std::mem;
62 use std::ops;
63 use std::panic;
64 
65 /// Receives values from the associated [`Sender`](struct@Sender).
66 ///
67 /// Instances are created by the [`channel`](fn@channel) function.
68 ///
69 /// To turn this receiver into a `Stream`, you can use the [`WatchStream`]
70 /// wrapper.
71 ///
72 /// [`WatchStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.WatchStream.html
73 #[derive(Debug)]
74 pub struct Receiver<T> {
75     /// Pointer to the shared state
76     shared: Arc<Shared<T>>,
77 
78     /// Last observed version
79     version: Version,
80 }
81 
82 /// Sends values to the associated [`Receiver`](struct@Receiver).
83 ///
84 /// Instances are created by the [`channel`](fn@channel) function.
85 #[derive(Debug)]
86 pub struct Sender<T> {
87     shared: Arc<Shared<T>>,
88 }
89 
90 /// Returns a reference to the inner value.
91 ///
92 /// Outstanding borrows hold a read lock on the inner value. This means that
93 /// long-lived borrows could cause the producer half to block. It is recommended
94 /// to keep the borrow as short-lived as possible. Additionally, if you are
95 /// running in an environment that allows `!Send` futures, you must ensure that
96 /// the returned `Ref` type is never held alive across an `.await` point,
97 /// otherwise, it can lead to a deadlock.
98 ///
99 /// The priority policy of the lock is dependent on the underlying lock
100 /// implementation, and this type does not guarantee that any particular policy
101 /// will be used. In particular, a producer which is waiting to acquire the lock
102 /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
103 ///
104 /// <details><summary>Potential deadlock example</summary>
105 ///
106 /// ```text
107 /// // Task 1 (on thread A)    |  // Task 2 (on thread B)
108 /// let _ref1 = rx.borrow();   |
109 ///                            |  // will block
110 ///                            |  let _ = tx.send(());
111 /// // may deadlock            |
112 /// let _ref2 = rx.borrow();   |
113 /// ```
114 /// </details>
115 #[derive(Debug)]
116 pub struct Ref<'a, T> {
117     inner: RwLockReadGuard<'a, T>,
118     has_changed: bool,
119 }
120 
121 impl<'a, T> Ref<'a, T> {
122     /// Indicates if the borrowed value is considered as _changed_ since the last
123     /// time it has been marked as seen.
124     ///
125     /// Unlike [`Receiver::has_changed()`], this method does not fail if the channel is closed.
126     ///
127     /// When borrowed from the [`Sender`] this function will always return `false`.
128     ///
129     /// # Examples
130     ///
131     /// ```
132     /// use tokio::sync::watch;
133     ///
134     /// #[tokio::main]
135     /// async fn main() {
136     ///     let (tx, mut rx) = watch::channel("hello");
137     ///
138     ///     tx.send("goodbye").unwrap();
139     ///     // The sender does never consider the value as changed.
140     ///     assert!(!tx.borrow().has_changed());
141     ///
142     ///     // Drop the sender immediately, just for testing purposes.
143     ///     drop(tx);
144     ///
145     ///     // Even if the sender has already been dropped...
146     ///     assert!(rx.has_changed().is_err());
147     ///     // ...the modified value is still readable and detected as changed.
148     ///     assert_eq!(*rx.borrow(), "goodbye");
149     ///     assert!(rx.borrow().has_changed());
150     ///
151     ///     // Read the changed value and mark it as seen.
152     ///     {
153     ///         let received = rx.borrow_and_update();
154     ///         assert_eq!(*received, "goodbye");
155     ///         assert!(received.has_changed());
156     ///         // Release the read lock when leaving this scope.
157     ///     }
158     ///
159     ///     // Now the value has already been marked as seen and could
160     ///     // never be modified again (after the sender has been dropped).
161     ///     assert!(!rx.borrow().has_changed());
162     /// }
163     /// ```
has_changed(&self) -> bool164     pub fn has_changed(&self) -> bool {
165         self.has_changed
166     }
167 }
168 
169 #[derive(Debug)]
170 struct Shared<T> {
171     /// The most recent value.
172     value: RwLock<T>,
173 
174     /// The current version.
175     ///
176     /// The lowest bit represents a "closed" state. The rest of the bits
177     /// represent the current version.
178     state: AtomicState,
179 
180     /// Tracks the number of `Receiver` instances.
181     ref_count_rx: AtomicUsize,
182 
183     /// Notifies waiting receivers that the value changed.
184     notify_rx: Notify,
185 
186     /// Notifies any task listening for `Receiver` dropped events.
187     notify_tx: Notify,
188 }
189 
190 pub mod error {
191     //! Watch error types.
192 
193     use std::fmt;
194 
195     /// Error produced when sending a value fails.
196     #[derive(Debug)]
197     pub struct SendError<T>(pub T);
198 
199     // ===== impl SendError =====
200 
201     impl<T: fmt::Debug> fmt::Display for SendError<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result202         fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
203             write!(fmt, "channel closed")
204         }
205     }
206 
207     impl<T: fmt::Debug> std::error::Error for SendError<T> {}
208 
209     /// Error produced when receiving a change notification.
210     #[derive(Debug, Clone)]
211     pub struct RecvError(pub(super) ());
212 
213     // ===== impl RecvError =====
214 
215     impl fmt::Display for RecvError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result216         fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
217             write!(fmt, "channel closed")
218         }
219     }
220 
221     impl std::error::Error for RecvError {}
222 }
223 
224 use self::state::{AtomicState, Version};
225 mod state {
226     use crate::loom::sync::atomic::AtomicUsize;
227     use crate::loom::sync::atomic::Ordering::SeqCst;
228 
229     const CLOSED: usize = 1;
230 
231     /// The version part of the state. The lowest bit is always zero.
232     #[derive(Copy, Clone, Debug, Eq, PartialEq)]
233     pub(super) struct Version(usize);
234 
235     /// Snapshot of the state. The first bit is used as the CLOSED bit.
236     /// The remaining bits are used as the version.
237     ///
238     /// The CLOSED bit tracks whether the Sender has been dropped. Dropping all
239     /// receivers does not set it.
240     #[derive(Copy, Clone, Debug)]
241     pub(super) struct StateSnapshot(usize);
242 
243     /// The state stored in an atomic integer.
244     #[derive(Debug)]
245     pub(super) struct AtomicState(AtomicUsize);
246 
247     impl Version {
248         /// Get the initial version when creating the channel.
initial() -> Self249         pub(super) fn initial() -> Self {
250             Version(0)
251         }
252     }
253 
254     impl StateSnapshot {
255         /// Extract the version from the state.
version(self) -> Version256         pub(super) fn version(self) -> Version {
257             Version(self.0 & !CLOSED)
258         }
259 
260         /// Is the closed bit set?
is_closed(self) -> bool261         pub(super) fn is_closed(self) -> bool {
262             (self.0 & CLOSED) == CLOSED
263         }
264     }
265 
266     impl AtomicState {
267         /// Create a new `AtomicState` that is not closed and which has the
268         /// version set to `Version::initial()`.
new() -> Self269         pub(super) fn new() -> Self {
270             AtomicState(AtomicUsize::new(0))
271         }
272 
273         /// Load the current value of the state.
load(&self) -> StateSnapshot274         pub(super) fn load(&self) -> StateSnapshot {
275             StateSnapshot(self.0.load(SeqCst))
276         }
277 
278         /// Increment the version counter.
increment_version(&self)279         pub(super) fn increment_version(&self) {
280             // Increment by two to avoid touching the CLOSED bit.
281             self.0.fetch_add(2, SeqCst);
282         }
283 
284         /// Set the closed bit in the state.
set_closed(&self)285         pub(super) fn set_closed(&self) {
286             self.0.fetch_or(CLOSED, SeqCst);
287         }
288     }
289 }
290 
291 /// Creates a new watch channel, returning the "send" and "receive" handles.
292 ///
293 /// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
294 /// Only the last value sent is made available to the [`Receiver`] half. All
295 /// intermediate values are dropped.
296 ///
297 /// # Examples
298 ///
299 /// ```
300 /// use tokio::sync::watch;
301 ///
302 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
303 ///     let (tx, mut rx) = watch::channel("hello");
304 ///
305 ///     tokio::spawn(async move {
306 ///         while rx.changed().await.is_ok() {
307 ///             println!("received = {:?}", *rx.borrow());
308 ///         }
309 ///     });
310 ///
311 ///     tx.send("world")?;
312 /// # Ok(())
313 /// # }
314 /// ```
315 ///
316 /// [`Sender`]: struct@Sender
317 /// [`Receiver`]: struct@Receiver
channel<T>(init: T) -> (Sender<T>, Receiver<T>)318 pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
319     let shared = Arc::new(Shared {
320         value: RwLock::new(init),
321         state: AtomicState::new(),
322         ref_count_rx: AtomicUsize::new(1),
323         notify_rx: Notify::new(),
324         notify_tx: Notify::new(),
325     });
326 
327     let tx = Sender {
328         shared: shared.clone(),
329     };
330 
331     let rx = Receiver {
332         shared,
333         version: Version::initial(),
334     };
335 
336     (tx, rx)
337 }
338 
339 impl<T> Receiver<T> {
from_shared(version: Version, shared: Arc<Shared<T>>) -> Self340     fn from_shared(version: Version, shared: Arc<Shared<T>>) -> Self {
341         // No synchronization necessary as this is only used as a counter and
342         // not memory access.
343         shared.ref_count_rx.fetch_add(1, Relaxed);
344 
345         Self { shared, version }
346     }
347 
348     /// Returns a reference to the most recently sent value.
349     ///
350     /// This method does not mark the returned value as seen, so future calls to
351     /// [`changed`] may return immediately even if you have already seen the
352     /// value with a call to `borrow`.
353     ///
354     /// Outstanding borrows hold a read lock on the inner value. This means that
355     /// long-lived borrows could cause the producer half to block. It is recommended
356     /// to keep the borrow as short-lived as possible. Additionally, if you are
357     /// running in an environment that allows `!Send` futures, you must ensure that
358     /// the returned `Ref` type is never held alive across an `.await` point,
359     /// otherwise, it can lead to a deadlock.
360     ///
361     /// The priority policy of the lock is dependent on the underlying lock
362     /// implementation, and this type does not guarantee that any particular policy
363     /// will be used. In particular, a producer which is waiting to acquire the lock
364     /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
365     ///
366     /// <details><summary>Potential deadlock example</summary>
367     ///
368     /// ```text
369     /// // Task 1 (on thread A)    |  // Task 2 (on thread B)
370     /// let _ref1 = rx.borrow();   |
371     ///                            |  // will block
372     ///                            |  let _ = tx.send(());
373     /// // may deadlock            |
374     /// let _ref2 = rx.borrow();   |
375     /// ```
376     /// </details>
377     ///
378     /// [`changed`]: Receiver::changed
379     ///
380     /// # Examples
381     ///
382     /// ```
383     /// use tokio::sync::watch;
384     ///
385     /// let (_, rx) = watch::channel("hello");
386     /// assert_eq!(*rx.borrow(), "hello");
387     /// ```
borrow(&self) -> Ref<'_, T>388     pub fn borrow(&self) -> Ref<'_, T> {
389         let inner = self.shared.value.read().unwrap();
390 
391         // After obtaining a read-lock no concurrent writes could occur
392         // and the loaded version matches that of the borrowed reference.
393         let new_version = self.shared.state.load().version();
394         let has_changed = self.version != new_version;
395 
396         Ref { inner, has_changed }
397     }
398 
399     /// Returns a reference to the most recently sent value and marks that value
400     /// as seen.
401     ///
402     /// This method marks the current value as seen. Subsequent calls to [`changed`]
403     /// will not return immediately until the [`Sender`] has modified the shared
404     /// value again.
405     ///
406     /// Outstanding borrows hold a read lock on the inner value. This means that
407     /// long-lived borrows could cause the producer half to block. It is recommended
408     /// to keep the borrow as short-lived as possible. Additionally, if you are
409     /// running in an environment that allows `!Send` futures, you must ensure that
410     /// the returned `Ref` type is never held alive across an `.await` point,
411     /// otherwise, it can lead to a deadlock.
412     ///
413     /// The priority policy of the lock is dependent on the underlying lock
414     /// implementation, and this type does not guarantee that any particular policy
415     /// will be used. In particular, a producer which is waiting to acquire the lock
416     /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
417     ///
418     /// <details><summary>Potential deadlock example</summary>
419     ///
420     /// ```text
421     /// // Task 1 (on thread A)                |  // Task 2 (on thread B)
422     /// let _ref1 = rx1.borrow_and_update();   |
423     ///                                        |  // will block
424     ///                                        |  let _ = tx.send(());
425     /// // may deadlock                        |
426     /// let _ref2 = rx2.borrow_and_update();   |
427     /// ```
428     /// </details>
429     ///
430     /// [`changed`]: Receiver::changed
borrow_and_update(&mut self) -> Ref<'_, T>431     pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
432         let inner = self.shared.value.read().unwrap();
433 
434         // After obtaining a read-lock no concurrent writes could occur
435         // and the loaded version matches that of the borrowed reference.
436         let new_version = self.shared.state.load().version();
437         let has_changed = self.version != new_version;
438 
439         // Mark the shared value as seen by updating the version
440         self.version = new_version;
441 
442         Ref { inner, has_changed }
443     }
444 
445     /// Checks if this channel contains a message that this receiver has not yet
446     /// seen. The new value is not marked as seen.
447     ///
448     /// Although this method is called `has_changed`, it does not check new
449     /// messages for equality, so this call will return true even if the new
450     /// message is equal to the old message.
451     ///
452     /// Returns an error if the channel has been closed.
453     /// # Examples
454     ///
455     /// ```
456     /// use tokio::sync::watch;
457     ///
458     /// #[tokio::main]
459     /// async fn main() {
460     ///     let (tx, mut rx) = watch::channel("hello");
461     ///
462     ///     tx.send("goodbye").unwrap();
463     ///
464     ///     assert!(rx.has_changed().unwrap());
465     ///     assert_eq!(*rx.borrow_and_update(), "goodbye");
466     ///
467     ///     // The value has been marked as seen
468     ///     assert!(!rx.has_changed().unwrap());
469     ///
470     ///     drop(tx);
471     ///     // The `tx` handle has been dropped
472     ///     assert!(rx.has_changed().is_err());
473     /// }
474     /// ```
has_changed(&self) -> Result<bool, error::RecvError>475     pub fn has_changed(&self) -> Result<bool, error::RecvError> {
476         // Load the version from the state
477         let state = self.shared.state.load();
478         if state.is_closed() {
479             // The sender has dropped.
480             return Err(error::RecvError(()));
481         }
482         let new_version = state.version();
483 
484         Ok(self.version != new_version)
485     }
486 
487     /// Waits for a change notification, then marks the newest value as seen.
488     ///
489     /// If the newest value in the channel has not yet been marked seen when
490     /// this method is called, the method marks that value seen and returns
491     /// immediately. If the newest value has already been marked seen, then the
492     /// method sleeps until a new message is sent by the [`Sender`] connected to
493     /// this `Receiver`, or until the [`Sender`] is dropped.
494     ///
495     /// This method returns an error if and only if the [`Sender`] is dropped.
496     ///
497     /// # Cancel safety
498     ///
499     /// This method is cancel safe. If you use it as the event in a
500     /// [`tokio::select!`](crate::select) statement and some other branch
501     /// completes first, then it is guaranteed that no values have been marked
502     /// seen by this call to `changed`.
503     ///
504     /// [`Sender`]: struct@Sender
505     ///
506     /// # Examples
507     ///
508     /// ```
509     /// use tokio::sync::watch;
510     ///
511     /// #[tokio::main]
512     /// async fn main() {
513     ///     let (tx, mut rx) = watch::channel("hello");
514     ///
515     ///     tokio::spawn(async move {
516     ///         tx.send("goodbye").unwrap();
517     ///     });
518     ///
519     ///     assert!(rx.changed().await.is_ok());
520     ///     assert_eq!(*rx.borrow(), "goodbye");
521     ///
522     ///     // The `tx` handle has been dropped
523     ///     assert!(rx.changed().await.is_err());
524     /// }
525     /// ```
changed(&mut self) -> Result<(), error::RecvError>526     pub async fn changed(&mut self) -> Result<(), error::RecvError> {
527         loop {
528             // In order to avoid a race condition, we first request a notification,
529             // **then** check the current value's version. If a new version exists,
530             // the notification request is dropped.
531             let notified = self.shared.notify_rx.notified();
532 
533             if let Some(ret) = maybe_changed(&self.shared, &mut self.version) {
534                 return ret;
535             }
536 
537             notified.await;
538             // loop around again in case the wake-up was spurious
539         }
540     }
541 
542     /// Returns `true` if receivers belong to the same channel.
543     ///
544     /// # Examples
545     ///
546     /// ```
547     /// let (tx, rx) = tokio::sync::watch::channel(true);
548     /// let rx2 = rx.clone();
549     /// assert!(rx.same_channel(&rx2));
550     ///
551     /// let (tx3, rx3) = tokio::sync::watch::channel(true);
552     /// assert!(!rx3.same_channel(&rx2));
553     /// ```
same_channel(&self, other: &Self) -> bool554     pub fn same_channel(&self, other: &Self) -> bool {
555         Arc::ptr_eq(&self.shared, &other.shared)
556     }
557 
558     cfg_process_driver! {
559         pub(crate) fn try_has_changed(&mut self) -> Option<Result<(), error::RecvError>> {
560             maybe_changed(&self.shared, &mut self.version)
561         }
562     }
563 }
564 
maybe_changed<T>( shared: &Shared<T>, version: &mut Version, ) -> Option<Result<(), error::RecvError>>565 fn maybe_changed<T>(
566     shared: &Shared<T>,
567     version: &mut Version,
568 ) -> Option<Result<(), error::RecvError>> {
569     // Load the version from the state
570     let state = shared.state.load();
571     let new_version = state.version();
572 
573     if *version != new_version {
574         // Observe the new version and return
575         *version = new_version;
576         return Some(Ok(()));
577     }
578 
579     if state.is_closed() {
580         // All receivers have dropped.
581         return Some(Err(error::RecvError(())));
582     }
583 
584     None
585 }
586 
587 impl<T> Clone for Receiver<T> {
clone(&self) -> Self588     fn clone(&self) -> Self {
589         let version = self.version;
590         let shared = self.shared.clone();
591 
592         Self::from_shared(version, shared)
593     }
594 }
595 
596 impl<T> Drop for Receiver<T> {
drop(&mut self)597     fn drop(&mut self) {
598         // No synchronization necessary as this is only used as a counter and
599         // not memory access.
600         if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) {
601             // This is the last `Receiver` handle, tasks waiting on `Sender::closed()`
602             self.shared.notify_tx.notify_waiters();
603         }
604     }
605 }
606 
607 impl<T> Sender<T> {
608     /// Sends a new value via the channel, notifying all receivers.
609     ///
610     /// This method fails if the channel is closed, which is the case when
611     /// every receiver has been dropped. It is possible to reopen the channel
612     /// using the [`subscribe`] method. However, when `send` fails, the value
613     /// isn't made available for future receivers (but returned with the
614     /// [`SendError`]).
615     ///
616     /// To always make a new value available for future receivers, even if no
617     /// receiver currently exists, one of the other send methods
618     /// ([`send_if_modified`], [`send_modify`], or [`send_replace`]) can be
619     /// used instead.
620     ///
621     /// [`subscribe`]: Sender::subscribe
622     /// [`SendError`]: error::SendError
623     /// [`send_if_modified`]: Sender::send_if_modified
624     /// [`send_modify`]: Sender::send_modify
625     /// [`send_replace`]: Sender::send_replace
send(&self, value: T) -> Result<(), error::SendError<T>>626     pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
627         // This is pretty much only useful as a hint anyway, so synchronization isn't critical.
628         if 0 == self.receiver_count() {
629             return Err(error::SendError(value));
630         }
631 
632         self.send_replace(value);
633         Ok(())
634     }
635 
636     /// Modifies the watched value **unconditionally** in-place,
637     /// notifying all receivers.
638     ///
639     /// This can useful for modifying the watched value, without
640     /// having to allocate a new instance. Additionally, this
641     /// method permits sending values even when there are no receivers.
642     ///
643     /// Prefer to use the more versatile function [`Self::send_if_modified()`]
644     /// if the value is only modified conditionally during the mutable borrow
645     /// to prevent unneeded change notifications for unmodified values.
646     ///
647     /// # Panics
648     ///
649     /// This function panics when the invocation of the `modify` closure panics.
650     /// No receivers are notified when panicking. All changes of the watched
651     /// value applied by the closure before panicking will be visible in
652     /// subsequent calls to `borrow`.
653     ///
654     /// # Examples
655     ///
656     /// ```
657     /// use tokio::sync::watch;
658     ///
659     /// struct State {
660     ///     counter: usize,
661     /// }
662     /// let (state_tx, state_rx) = watch::channel(State { counter: 0 });
663     /// state_tx.send_modify(|state| state.counter += 1);
664     /// assert_eq!(state_rx.borrow().counter, 1);
665     /// ```
send_modify<F>(&self, modify: F) where F: FnOnce(&mut T),666     pub fn send_modify<F>(&self, modify: F)
667     where
668         F: FnOnce(&mut T),
669     {
670         self.send_if_modified(|value| {
671             modify(value);
672             true
673         });
674     }
675 
676     /// Modifies the watched value **conditionally** in-place,
677     /// notifying all receivers only if modified.
678     ///
679     /// This can useful for modifying the watched value, without
680     /// having to allocate a new instance. Additionally, this
681     /// method permits sending values even when there are no receivers.
682     ///
683     /// The `modify` closure must return `true` if the value has actually
684     /// been modified during the mutable borrow. It should only return `false`
685     /// if the value is guaranteed to be unmodified despite the mutable
686     /// borrow.
687     ///
688     /// Receivers are only notified if the closure returned `true`. If the
689     /// closure has modified the value but returned `false` this results
690     /// in a *silent modification*, i.e. the modified value will be visible
691     /// in subsequent calls to `borrow`, but receivers will not receive
692     /// a change notification.
693     ///
694     /// Returns the result of the closure, i.e. `true` if the value has
695     /// been modified and `false` otherwise.
696     ///
697     /// # Panics
698     ///
699     /// This function panics when the invocation of the `modify` closure panics.
700     /// No receivers are notified when panicking. All changes of the watched
701     /// value applied by the closure before panicking will be visible in
702     /// subsequent calls to `borrow`.
703     ///
704     /// # Examples
705     ///
706     /// ```
707     /// use tokio::sync::watch;
708     ///
709     /// struct State {
710     ///     counter: usize,
711     /// }
712     /// let (state_tx, mut state_rx) = watch::channel(State { counter: 1 });
713     /// let inc_counter_if_odd = |state: &mut State| {
714     ///     if state.counter % 2 == 1 {
715     ///         state.counter += 1;
716     ///         return true;
717     ///     }
718     ///     false
719     /// };
720     ///
721     /// assert_eq!(state_rx.borrow().counter, 1);
722     ///
723     /// assert!(!state_rx.has_changed().unwrap());
724     /// assert!(state_tx.send_if_modified(inc_counter_if_odd));
725     /// assert!(state_rx.has_changed().unwrap());
726     /// assert_eq!(state_rx.borrow_and_update().counter, 2);
727     ///
728     /// assert!(!state_rx.has_changed().unwrap());
729     /// assert!(!state_tx.send_if_modified(inc_counter_if_odd));
730     /// assert!(!state_rx.has_changed().unwrap());
731     /// assert_eq!(state_rx.borrow_and_update().counter, 2);
732     /// ```
send_if_modified<F>(&self, modify: F) -> bool where F: FnOnce(&mut T) -> bool,733     pub fn send_if_modified<F>(&self, modify: F) -> bool
734     where
735         F: FnOnce(&mut T) -> bool,
736     {
737         {
738             // Acquire the write lock and update the value.
739             let mut lock = self.shared.value.write().unwrap();
740 
741             // Update the value and catch possible panic inside func.
742             let result = panic::catch_unwind(panic::AssertUnwindSafe(|| modify(&mut lock)));
743             match result {
744                 Ok(modified) => {
745                     if !modified {
746                         // Abort, i.e. don't notify receivers if unmodified
747                         return false;
748                     }
749                     // Continue if modified
750                 }
751                 Err(panicked) => {
752                     // Drop the lock to avoid poisoning it.
753                     drop(lock);
754                     // Forward the panic to the caller.
755                     panic::resume_unwind(panicked);
756                     // Unreachable
757                 }
758             };
759 
760             self.shared.state.increment_version();
761 
762             // Release the write lock.
763             //
764             // Incrementing the version counter while holding the lock ensures
765             // that receivers are able to figure out the version number of the
766             // value they are currently looking at.
767             drop(lock);
768         }
769 
770         self.shared.notify_rx.notify_waiters();
771 
772         true
773     }
774 
775     /// Sends a new value via the channel, notifying all receivers and returning
776     /// the previous value in the channel.
777     ///
778     /// This can be useful for reusing the buffers inside a watched value.
779     /// Additionally, this method permits sending values even when there are no
780     /// receivers.
781     ///
782     /// # Examples
783     ///
784     /// ```
785     /// use tokio::sync::watch;
786     ///
787     /// let (tx, _rx) = watch::channel(1);
788     /// assert_eq!(tx.send_replace(2), 1);
789     /// assert_eq!(tx.send_replace(3), 2);
790     /// ```
send_replace(&self, mut value: T) -> T791     pub fn send_replace(&self, mut value: T) -> T {
792         // swap old watched value with the new one
793         self.send_modify(|old| mem::swap(old, &mut value));
794 
795         value
796     }
797 
798     /// Returns a reference to the most recently sent value
799     ///
800     /// Outstanding borrows hold a read lock on the inner value. This means that
801     /// long-lived borrows could cause the producer half to block. It is recommended
802     /// to keep the borrow as short-lived as possible. Additionally, if you are
803     /// running in an environment that allows `!Send` futures, you must ensure that
804     /// the returned `Ref` type is never held alive across an `.await` point,
805     /// otherwise, it can lead to a deadlock.
806     ///
807     /// # Examples
808     ///
809     /// ```
810     /// use tokio::sync::watch;
811     ///
812     /// let (tx, _) = watch::channel("hello");
813     /// assert_eq!(*tx.borrow(), "hello");
814     /// ```
borrow(&self) -> Ref<'_, T>815     pub fn borrow(&self) -> Ref<'_, T> {
816         let inner = self.shared.value.read().unwrap();
817 
818         // The sender/producer always sees the current version
819         let has_changed = false;
820 
821         Ref { inner, has_changed }
822     }
823 
824     /// Checks if the channel has been closed. This happens when all receivers
825     /// have dropped.
826     ///
827     /// # Examples
828     ///
829     /// ```
830     /// let (tx, rx) = tokio::sync::watch::channel(());
831     /// assert!(!tx.is_closed());
832     ///
833     /// drop(rx);
834     /// assert!(tx.is_closed());
835     /// ```
is_closed(&self) -> bool836     pub fn is_closed(&self) -> bool {
837         self.receiver_count() == 0
838     }
839 
840     /// Completes when all receivers have dropped.
841     ///
842     /// This allows the producer to get notified when interest in the produced
843     /// values is canceled and immediately stop doing work.
844     ///
845     /// # Cancel safety
846     ///
847     /// This method is cancel safe. Once the channel is closed, it stays closed
848     /// forever and all future calls to `closed` will return immediately.
849     ///
850     /// # Examples
851     ///
852     /// ```
853     /// use tokio::sync::watch;
854     ///
855     /// #[tokio::main]
856     /// async fn main() {
857     ///     let (tx, rx) = watch::channel("hello");
858     ///
859     ///     tokio::spawn(async move {
860     ///         // use `rx`
861     ///         drop(rx);
862     ///     });
863     ///
864     ///     // Waits for `rx` to drop
865     ///     tx.closed().await;
866     ///     println!("the `rx` handles dropped")
867     /// }
868     /// ```
closed(&self)869     pub async fn closed(&self) {
870         while self.receiver_count() > 0 {
871             let notified = self.shared.notify_tx.notified();
872 
873             if self.receiver_count() == 0 {
874                 return;
875             }
876 
877             notified.await;
878             // The channel could have been reopened in the meantime by calling
879             // `subscribe`, so we loop again.
880         }
881     }
882 
883     /// Creates a new [`Receiver`] connected to this `Sender`.
884     ///
885     /// All messages sent before this call to `subscribe` are initially marked
886     /// as seen by the new `Receiver`.
887     ///
888     /// This method can be called even if there are no other receivers. In this
889     /// case, the channel is reopened.
890     ///
891     /// # Examples
892     ///
893     /// The new channel will receive messages sent on this `Sender`.
894     ///
895     /// ```
896     /// use tokio::sync::watch;
897     ///
898     /// #[tokio::main]
899     /// async fn main() {
900     ///     let (tx, _rx) = watch::channel(0u64);
901     ///
902     ///     tx.send(5).unwrap();
903     ///
904     ///     let rx = tx.subscribe();
905     ///     assert_eq!(5, *rx.borrow());
906     ///
907     ///     tx.send(10).unwrap();
908     ///     assert_eq!(10, *rx.borrow());
909     /// }
910     /// ```
911     ///
912     /// The most recent message is considered seen by the channel, so this test
913     /// is guaranteed to pass.
914     ///
915     /// ```
916     /// use tokio::sync::watch;
917     /// use tokio::time::Duration;
918     ///
919     /// #[tokio::main]
920     /// async fn main() {
921     ///     let (tx, _rx) = watch::channel(0u64);
922     ///     tx.send(5).unwrap();
923     ///     let mut rx = tx.subscribe();
924     ///
925     ///     tokio::spawn(async move {
926     ///         // by spawning and sleeping, the message is sent after `main`
927     ///         // hits the call to `changed`.
928     ///         # if false {
929     ///         tokio::time::sleep(Duration::from_millis(10)).await;
930     ///         # }
931     ///         tx.send(100).unwrap();
932     ///     });
933     ///
934     ///     rx.changed().await.unwrap();
935     ///     assert_eq!(100, *rx.borrow());
936     /// }
937     /// ```
subscribe(&self) -> Receiver<T>938     pub fn subscribe(&self) -> Receiver<T> {
939         let shared = self.shared.clone();
940         let version = shared.state.load().version();
941 
942         // The CLOSED bit in the state tracks only whether the sender is
943         // dropped, so we do not need to unset it if this reopens the channel.
944         Receiver::from_shared(version, shared)
945     }
946 
947     /// Returns the number of receivers that currently exist.
948     ///
949     /// # Examples
950     ///
951     /// ```
952     /// use tokio::sync::watch;
953     ///
954     /// #[tokio::main]
955     /// async fn main() {
956     ///     let (tx, rx1) = watch::channel("hello");
957     ///
958     ///     assert_eq!(1, tx.receiver_count());
959     ///
960     ///     let mut _rx2 = rx1.clone();
961     ///
962     ///     assert_eq!(2, tx.receiver_count());
963     /// }
964     /// ```
receiver_count(&self) -> usize965     pub fn receiver_count(&self) -> usize {
966         self.shared.ref_count_rx.load(Relaxed)
967     }
968 }
969 
970 impl<T> Drop for Sender<T> {
drop(&mut self)971     fn drop(&mut self) {
972         self.shared.state.set_closed();
973         self.shared.notify_rx.notify_waiters();
974     }
975 }
976 
977 // ===== impl Ref =====
978 
979 impl<T> ops::Deref for Ref<'_, T> {
980     type Target = T;
981 
deref(&self) -> &T982     fn deref(&self) -> &T {
983         self.inner.deref()
984     }
985 }
986 
987 #[cfg(all(test, loom))]
988 mod tests {
989     use futures::future::FutureExt;
990     use loom::thread;
991 
992     // test for https://github.com/tokio-rs/tokio/issues/3168
993     #[test]
watch_spurious_wakeup()994     fn watch_spurious_wakeup() {
995         loom::model(|| {
996             let (send, mut recv) = crate::sync::watch::channel(0i32);
997 
998             send.send(1).unwrap();
999 
1000             let send_thread = thread::spawn(move || {
1001                 send.send(2).unwrap();
1002                 send
1003             });
1004 
1005             recv.changed().now_or_never();
1006 
1007             let send = send_thread.join().unwrap();
1008             let recv_thread = thread::spawn(move || {
1009                 recv.changed().now_or_never();
1010                 recv.changed().now_or_never();
1011                 recv
1012             });
1013 
1014             send.send(3).unwrap();
1015 
1016             let mut recv = recv_thread.join().unwrap();
1017             let send_thread = thread::spawn(move || {
1018                 send.send(2).unwrap();
1019             });
1020 
1021             recv.changed().now_or_never();
1022 
1023             send_thread.join().unwrap();
1024         });
1025     }
1026 
1027     #[test]
watch_borrow()1028     fn watch_borrow() {
1029         loom::model(|| {
1030             let (send, mut recv) = crate::sync::watch::channel(0i32);
1031 
1032             assert!(send.borrow().eq(&0));
1033             assert!(recv.borrow().eq(&0));
1034 
1035             send.send(1).unwrap();
1036             assert!(send.borrow().eq(&1));
1037 
1038             let send_thread = thread::spawn(move || {
1039                 send.send(2).unwrap();
1040                 send
1041             });
1042 
1043             recv.changed().now_or_never();
1044 
1045             let send = send_thread.join().unwrap();
1046             let recv_thread = thread::spawn(move || {
1047                 recv.changed().now_or_never();
1048                 recv.changed().now_or_never();
1049                 recv
1050             });
1051 
1052             send.send(3).unwrap();
1053 
1054             let recv = recv_thread.join().unwrap();
1055             assert!(recv.borrow().eq(&3));
1056             assert!(send.borrow().eq(&3));
1057 
1058             send.send(2).unwrap();
1059 
1060             thread::spawn(move || {
1061                 assert!(recv.borrow().eq(&2));
1062             });
1063             assert!(send.borrow().eq(&2));
1064         });
1065     }
1066 }
1067