1 #![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2
3 //! A single-producer, multi-consumer channel that only retains the *last* sent
4 //! value.
5 //!
6 //! This channel is useful for watching for changes to a value from multiple
7 //! points in the code base, for example, changes to configuration values.
8 //!
9 //! # Usage
10 //!
11 //! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer
12 //! and consumer halves of the channel. The channel is created with an initial
13 //! value. The **latest** value stored in the channel is accessed with
14 //! [`Receiver::borrow()`]. Awaiting [`Receiver::changed()`] waits for a new
15 //! value to be sent by the [`Sender`] half.
16 //!
17 //! # Examples
18 //!
19 //! ```
20 //! use tokio::sync::watch;
21 //!
22 //! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
23 //! let (tx, mut rx) = watch::channel("hello");
24 //!
25 //! tokio::spawn(async move {
26 //! while rx.changed().await.is_ok() {
27 //! println!("received = {:?}", *rx.borrow());
28 //! }
29 //! });
30 //!
31 //! tx.send("world")?;
32 //! # Ok(())
33 //! # }
34 //! ```
35 //!
36 //! # Closing
37 //!
38 //! [`Sender::is_closed`] and [`Sender::closed`] allow the producer to detect
39 //! when all [`Receiver`] handles have been dropped. This indicates that there
40 //! is no further interest in the values being produced and work can be stopped.
41 //!
42 //! # Thread safety
43 //!
44 //! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
45 //! threads and can be used in a concurrent environment. Clones of [`Receiver`]
46 //! handles may be moved to separate threads and also used concurrently.
47 //!
48 //! [`Sender`]: crate::sync::watch::Sender
49 //! [`Receiver`]: crate::sync::watch::Receiver
50 //! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
51 //! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow
52 //! [`channel`]: crate::sync::watch::channel
53 //! [`Sender::is_closed`]: crate::sync::watch::Sender::is_closed
54 //! [`Sender::closed`]: crate::sync::watch::Sender::closed
55
56 use crate::sync::notify::Notify;
57
58 use crate::loom::sync::atomic::AtomicUsize;
59 use crate::loom::sync::atomic::Ordering::Relaxed;
60 use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
61 use std::mem;
62 use std::ops;
63 use std::panic;
64
65 /// Receives values from the associated [`Sender`](struct@Sender).
66 ///
67 /// Instances are created by the [`channel`](fn@channel) function.
68 ///
69 /// To turn this receiver into a `Stream`, you can use the [`WatchStream`]
70 /// wrapper.
71 ///
72 /// [`WatchStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.WatchStream.html
73 #[derive(Debug)]
74 pub struct Receiver<T> {
75 /// Pointer to the shared state
76 shared: Arc<Shared<T>>,
77
78 /// Last observed version
79 version: Version,
80 }
81
82 /// Sends values to the associated [`Receiver`](struct@Receiver).
83 ///
84 /// Instances are created by the [`channel`](fn@channel) function.
85 #[derive(Debug)]
86 pub struct Sender<T> {
87 shared: Arc<Shared<T>>,
88 }
89
90 /// Returns a reference to the inner value.
91 ///
92 /// Outstanding borrows hold a read lock on the inner value. This means that
93 /// long-lived borrows could cause the producer half to block. It is recommended
94 /// to keep the borrow as short-lived as possible. Additionally, if you are
95 /// running in an environment that allows `!Send` futures, you must ensure that
96 /// the returned `Ref` type is never held alive across an `.await` point,
97 /// otherwise, it can lead to a deadlock.
98 ///
99 /// The priority policy of the lock is dependent on the underlying lock
100 /// implementation, and this type does not guarantee that any particular policy
101 /// will be used. In particular, a producer which is waiting to acquire the lock
102 /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
103 ///
104 /// <details><summary>Potential deadlock example</summary>
105 ///
106 /// ```text
107 /// // Task 1 (on thread A) | // Task 2 (on thread B)
108 /// let _ref1 = rx.borrow(); |
109 /// | // will block
110 /// | let _ = tx.send(());
111 /// // may deadlock |
112 /// let _ref2 = rx.borrow(); |
113 /// ```
114 /// </details>
115 #[derive(Debug)]
116 pub struct Ref<'a, T> {
117 inner: RwLockReadGuard<'a, T>,
118 has_changed: bool,
119 }
120
121 impl<'a, T> Ref<'a, T> {
122 /// Indicates if the borrowed value is considered as _changed_ since the last
123 /// time it has been marked as seen.
124 ///
125 /// Unlike [`Receiver::has_changed()`], this method does not fail if the channel is closed.
126 ///
127 /// When borrowed from the [`Sender`] this function will always return `false`.
128 ///
129 /// # Examples
130 ///
131 /// ```
132 /// use tokio::sync::watch;
133 ///
134 /// #[tokio::main]
135 /// async fn main() {
136 /// let (tx, mut rx) = watch::channel("hello");
137 ///
138 /// tx.send("goodbye").unwrap();
139 /// // The sender does never consider the value as changed.
140 /// assert!(!tx.borrow().has_changed());
141 ///
142 /// // Drop the sender immediately, just for testing purposes.
143 /// drop(tx);
144 ///
145 /// // Even if the sender has already been dropped...
146 /// assert!(rx.has_changed().is_err());
147 /// // ...the modified value is still readable and detected as changed.
148 /// assert_eq!(*rx.borrow(), "goodbye");
149 /// assert!(rx.borrow().has_changed());
150 ///
151 /// // Read the changed value and mark it as seen.
152 /// {
153 /// let received = rx.borrow_and_update();
154 /// assert_eq!(*received, "goodbye");
155 /// assert!(received.has_changed());
156 /// // Release the read lock when leaving this scope.
157 /// }
158 ///
159 /// // Now the value has already been marked as seen and could
160 /// // never be modified again (after the sender has been dropped).
161 /// assert!(!rx.borrow().has_changed());
162 /// }
163 /// ```
has_changed(&self) -> bool164 pub fn has_changed(&self) -> bool {
165 self.has_changed
166 }
167 }
168
169 #[derive(Debug)]
170 struct Shared<T> {
171 /// The most recent value.
172 value: RwLock<T>,
173
174 /// The current version.
175 ///
176 /// The lowest bit represents a "closed" state. The rest of the bits
177 /// represent the current version.
178 state: AtomicState,
179
180 /// Tracks the number of `Receiver` instances.
181 ref_count_rx: AtomicUsize,
182
183 /// Notifies waiting receivers that the value changed.
184 notify_rx: Notify,
185
186 /// Notifies any task listening for `Receiver` dropped events.
187 notify_tx: Notify,
188 }
189
190 pub mod error {
191 //! Watch error types.
192
193 use std::fmt;
194
195 /// Error produced when sending a value fails.
196 #[derive(Debug)]
197 pub struct SendError<T>(pub T);
198
199 // ===== impl SendError =====
200
201 impl<T: fmt::Debug> fmt::Display for SendError<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result202 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
203 write!(fmt, "channel closed")
204 }
205 }
206
207 impl<T: fmt::Debug> std::error::Error for SendError<T> {}
208
209 /// Error produced when receiving a change notification.
210 #[derive(Debug, Clone)]
211 pub struct RecvError(pub(super) ());
212
213 // ===== impl RecvError =====
214
215 impl fmt::Display for RecvError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result216 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
217 write!(fmt, "channel closed")
218 }
219 }
220
221 impl std::error::Error for RecvError {}
222 }
223
224 use self::state::{AtomicState, Version};
225 mod state {
226 use crate::loom::sync::atomic::AtomicUsize;
227 use crate::loom::sync::atomic::Ordering::SeqCst;
228
229 const CLOSED: usize = 1;
230
231 /// The version part of the state. The lowest bit is always zero.
232 #[derive(Copy, Clone, Debug, Eq, PartialEq)]
233 pub(super) struct Version(usize);
234
235 /// Snapshot of the state. The first bit is used as the CLOSED bit.
236 /// The remaining bits are used as the version.
237 ///
238 /// The CLOSED bit tracks whether the Sender has been dropped. Dropping all
239 /// receivers does not set it.
240 #[derive(Copy, Clone, Debug)]
241 pub(super) struct StateSnapshot(usize);
242
243 /// The state stored in an atomic integer.
244 #[derive(Debug)]
245 pub(super) struct AtomicState(AtomicUsize);
246
247 impl Version {
248 /// Get the initial version when creating the channel.
initial() -> Self249 pub(super) fn initial() -> Self {
250 Version(0)
251 }
252 }
253
254 impl StateSnapshot {
255 /// Extract the version from the state.
version(self) -> Version256 pub(super) fn version(self) -> Version {
257 Version(self.0 & !CLOSED)
258 }
259
260 /// Is the closed bit set?
is_closed(self) -> bool261 pub(super) fn is_closed(self) -> bool {
262 (self.0 & CLOSED) == CLOSED
263 }
264 }
265
266 impl AtomicState {
267 /// Create a new `AtomicState` that is not closed and which has the
268 /// version set to `Version::initial()`.
new() -> Self269 pub(super) fn new() -> Self {
270 AtomicState(AtomicUsize::new(0))
271 }
272
273 /// Load the current value of the state.
load(&self) -> StateSnapshot274 pub(super) fn load(&self) -> StateSnapshot {
275 StateSnapshot(self.0.load(SeqCst))
276 }
277
278 /// Increment the version counter.
increment_version(&self)279 pub(super) fn increment_version(&self) {
280 // Increment by two to avoid touching the CLOSED bit.
281 self.0.fetch_add(2, SeqCst);
282 }
283
284 /// Set the closed bit in the state.
set_closed(&self)285 pub(super) fn set_closed(&self) {
286 self.0.fetch_or(CLOSED, SeqCst);
287 }
288 }
289 }
290
291 /// Creates a new watch channel, returning the "send" and "receive" handles.
292 ///
293 /// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
294 /// Only the last value sent is made available to the [`Receiver`] half. All
295 /// intermediate values are dropped.
296 ///
297 /// # Examples
298 ///
299 /// ```
300 /// use tokio::sync::watch;
301 ///
302 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
303 /// let (tx, mut rx) = watch::channel("hello");
304 ///
305 /// tokio::spawn(async move {
306 /// while rx.changed().await.is_ok() {
307 /// println!("received = {:?}", *rx.borrow());
308 /// }
309 /// });
310 ///
311 /// tx.send("world")?;
312 /// # Ok(())
313 /// # }
314 /// ```
315 ///
316 /// [`Sender`]: struct@Sender
317 /// [`Receiver`]: struct@Receiver
channel<T>(init: T) -> (Sender<T>, Receiver<T>)318 pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
319 let shared = Arc::new(Shared {
320 value: RwLock::new(init),
321 state: AtomicState::new(),
322 ref_count_rx: AtomicUsize::new(1),
323 notify_rx: Notify::new(),
324 notify_tx: Notify::new(),
325 });
326
327 let tx = Sender {
328 shared: shared.clone(),
329 };
330
331 let rx = Receiver {
332 shared,
333 version: Version::initial(),
334 };
335
336 (tx, rx)
337 }
338
339 impl<T> Receiver<T> {
from_shared(version: Version, shared: Arc<Shared<T>>) -> Self340 fn from_shared(version: Version, shared: Arc<Shared<T>>) -> Self {
341 // No synchronization necessary as this is only used as a counter and
342 // not memory access.
343 shared.ref_count_rx.fetch_add(1, Relaxed);
344
345 Self { shared, version }
346 }
347
348 /// Returns a reference to the most recently sent value.
349 ///
350 /// This method does not mark the returned value as seen, so future calls to
351 /// [`changed`] may return immediately even if you have already seen the
352 /// value with a call to `borrow`.
353 ///
354 /// Outstanding borrows hold a read lock on the inner value. This means that
355 /// long-lived borrows could cause the producer half to block. It is recommended
356 /// to keep the borrow as short-lived as possible. Additionally, if you are
357 /// running in an environment that allows `!Send` futures, you must ensure that
358 /// the returned `Ref` type is never held alive across an `.await` point,
359 /// otherwise, it can lead to a deadlock.
360 ///
361 /// The priority policy of the lock is dependent on the underlying lock
362 /// implementation, and this type does not guarantee that any particular policy
363 /// will be used. In particular, a producer which is waiting to acquire the lock
364 /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
365 ///
366 /// <details><summary>Potential deadlock example</summary>
367 ///
368 /// ```text
369 /// // Task 1 (on thread A) | // Task 2 (on thread B)
370 /// let _ref1 = rx.borrow(); |
371 /// | // will block
372 /// | let _ = tx.send(());
373 /// // may deadlock |
374 /// let _ref2 = rx.borrow(); |
375 /// ```
376 /// </details>
377 ///
378 /// [`changed`]: Receiver::changed
379 ///
380 /// # Examples
381 ///
382 /// ```
383 /// use tokio::sync::watch;
384 ///
385 /// let (_, rx) = watch::channel("hello");
386 /// assert_eq!(*rx.borrow(), "hello");
387 /// ```
borrow(&self) -> Ref<'_, T>388 pub fn borrow(&self) -> Ref<'_, T> {
389 let inner = self.shared.value.read().unwrap();
390
391 // After obtaining a read-lock no concurrent writes could occur
392 // and the loaded version matches that of the borrowed reference.
393 let new_version = self.shared.state.load().version();
394 let has_changed = self.version != new_version;
395
396 Ref { inner, has_changed }
397 }
398
399 /// Returns a reference to the most recently sent value and marks that value
400 /// as seen.
401 ///
402 /// This method marks the current value as seen. Subsequent calls to [`changed`]
403 /// will not return immediately until the [`Sender`] has modified the shared
404 /// value again.
405 ///
406 /// Outstanding borrows hold a read lock on the inner value. This means that
407 /// long-lived borrows could cause the producer half to block. It is recommended
408 /// to keep the borrow as short-lived as possible. Additionally, if you are
409 /// running in an environment that allows `!Send` futures, you must ensure that
410 /// the returned `Ref` type is never held alive across an `.await` point,
411 /// otherwise, it can lead to a deadlock.
412 ///
413 /// The priority policy of the lock is dependent on the underlying lock
414 /// implementation, and this type does not guarantee that any particular policy
415 /// will be used. In particular, a producer which is waiting to acquire the lock
416 /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
417 ///
418 /// <details><summary>Potential deadlock example</summary>
419 ///
420 /// ```text
421 /// // Task 1 (on thread A) | // Task 2 (on thread B)
422 /// let _ref1 = rx1.borrow_and_update(); |
423 /// | // will block
424 /// | let _ = tx.send(());
425 /// // may deadlock |
426 /// let _ref2 = rx2.borrow_and_update(); |
427 /// ```
428 /// </details>
429 ///
430 /// [`changed`]: Receiver::changed
borrow_and_update(&mut self) -> Ref<'_, T>431 pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
432 let inner = self.shared.value.read().unwrap();
433
434 // After obtaining a read-lock no concurrent writes could occur
435 // and the loaded version matches that of the borrowed reference.
436 let new_version = self.shared.state.load().version();
437 let has_changed = self.version != new_version;
438
439 // Mark the shared value as seen by updating the version
440 self.version = new_version;
441
442 Ref { inner, has_changed }
443 }
444
445 /// Checks if this channel contains a message that this receiver has not yet
446 /// seen. The new value is not marked as seen.
447 ///
448 /// Although this method is called `has_changed`, it does not check new
449 /// messages for equality, so this call will return true even if the new
450 /// message is equal to the old message.
451 ///
452 /// Returns an error if the channel has been closed.
453 /// # Examples
454 ///
455 /// ```
456 /// use tokio::sync::watch;
457 ///
458 /// #[tokio::main]
459 /// async fn main() {
460 /// let (tx, mut rx) = watch::channel("hello");
461 ///
462 /// tx.send("goodbye").unwrap();
463 ///
464 /// assert!(rx.has_changed().unwrap());
465 /// assert_eq!(*rx.borrow_and_update(), "goodbye");
466 ///
467 /// // The value has been marked as seen
468 /// assert!(!rx.has_changed().unwrap());
469 ///
470 /// drop(tx);
471 /// // The `tx` handle has been dropped
472 /// assert!(rx.has_changed().is_err());
473 /// }
474 /// ```
has_changed(&self) -> Result<bool, error::RecvError>475 pub fn has_changed(&self) -> Result<bool, error::RecvError> {
476 // Load the version from the state
477 let state = self.shared.state.load();
478 if state.is_closed() {
479 // The sender has dropped.
480 return Err(error::RecvError(()));
481 }
482 let new_version = state.version();
483
484 Ok(self.version != new_version)
485 }
486
487 /// Waits for a change notification, then marks the newest value as seen.
488 ///
489 /// If the newest value in the channel has not yet been marked seen when
490 /// this method is called, the method marks that value seen and returns
491 /// immediately. If the newest value has already been marked seen, then the
492 /// method sleeps until a new message is sent by the [`Sender`] connected to
493 /// this `Receiver`, or until the [`Sender`] is dropped.
494 ///
495 /// This method returns an error if and only if the [`Sender`] is dropped.
496 ///
497 /// # Cancel safety
498 ///
499 /// This method is cancel safe. If you use it as the event in a
500 /// [`tokio::select!`](crate::select) statement and some other branch
501 /// completes first, then it is guaranteed that no values have been marked
502 /// seen by this call to `changed`.
503 ///
504 /// [`Sender`]: struct@Sender
505 ///
506 /// # Examples
507 ///
508 /// ```
509 /// use tokio::sync::watch;
510 ///
511 /// #[tokio::main]
512 /// async fn main() {
513 /// let (tx, mut rx) = watch::channel("hello");
514 ///
515 /// tokio::spawn(async move {
516 /// tx.send("goodbye").unwrap();
517 /// });
518 ///
519 /// assert!(rx.changed().await.is_ok());
520 /// assert_eq!(*rx.borrow(), "goodbye");
521 ///
522 /// // The `tx` handle has been dropped
523 /// assert!(rx.changed().await.is_err());
524 /// }
525 /// ```
changed(&mut self) -> Result<(), error::RecvError>526 pub async fn changed(&mut self) -> Result<(), error::RecvError> {
527 loop {
528 // In order to avoid a race condition, we first request a notification,
529 // **then** check the current value's version. If a new version exists,
530 // the notification request is dropped.
531 let notified = self.shared.notify_rx.notified();
532
533 if let Some(ret) = maybe_changed(&self.shared, &mut self.version) {
534 return ret;
535 }
536
537 notified.await;
538 // loop around again in case the wake-up was spurious
539 }
540 }
541
542 /// Returns `true` if receivers belong to the same channel.
543 ///
544 /// # Examples
545 ///
546 /// ```
547 /// let (tx, rx) = tokio::sync::watch::channel(true);
548 /// let rx2 = rx.clone();
549 /// assert!(rx.same_channel(&rx2));
550 ///
551 /// let (tx3, rx3) = tokio::sync::watch::channel(true);
552 /// assert!(!rx3.same_channel(&rx2));
553 /// ```
same_channel(&self, other: &Self) -> bool554 pub fn same_channel(&self, other: &Self) -> bool {
555 Arc::ptr_eq(&self.shared, &other.shared)
556 }
557
558 cfg_process_driver! {
559 pub(crate) fn try_has_changed(&mut self) -> Option<Result<(), error::RecvError>> {
560 maybe_changed(&self.shared, &mut self.version)
561 }
562 }
563 }
564
maybe_changed<T>( shared: &Shared<T>, version: &mut Version, ) -> Option<Result<(), error::RecvError>>565 fn maybe_changed<T>(
566 shared: &Shared<T>,
567 version: &mut Version,
568 ) -> Option<Result<(), error::RecvError>> {
569 // Load the version from the state
570 let state = shared.state.load();
571 let new_version = state.version();
572
573 if *version != new_version {
574 // Observe the new version and return
575 *version = new_version;
576 return Some(Ok(()));
577 }
578
579 if state.is_closed() {
580 // All receivers have dropped.
581 return Some(Err(error::RecvError(())));
582 }
583
584 None
585 }
586
587 impl<T> Clone for Receiver<T> {
clone(&self) -> Self588 fn clone(&self) -> Self {
589 let version = self.version;
590 let shared = self.shared.clone();
591
592 Self::from_shared(version, shared)
593 }
594 }
595
596 impl<T> Drop for Receiver<T> {
drop(&mut self)597 fn drop(&mut self) {
598 // No synchronization necessary as this is only used as a counter and
599 // not memory access.
600 if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) {
601 // This is the last `Receiver` handle, tasks waiting on `Sender::closed()`
602 self.shared.notify_tx.notify_waiters();
603 }
604 }
605 }
606
607 impl<T> Sender<T> {
608 /// Sends a new value via the channel, notifying all receivers.
609 ///
610 /// This method fails if the channel is closed, which is the case when
611 /// every receiver has been dropped. It is possible to reopen the channel
612 /// using the [`subscribe`] method. However, when `send` fails, the value
613 /// isn't made available for future receivers (but returned with the
614 /// [`SendError`]).
615 ///
616 /// To always make a new value available for future receivers, even if no
617 /// receiver currently exists, one of the other send methods
618 /// ([`send_if_modified`], [`send_modify`], or [`send_replace`]) can be
619 /// used instead.
620 ///
621 /// [`subscribe`]: Sender::subscribe
622 /// [`SendError`]: error::SendError
623 /// [`send_if_modified`]: Sender::send_if_modified
624 /// [`send_modify`]: Sender::send_modify
625 /// [`send_replace`]: Sender::send_replace
send(&self, value: T) -> Result<(), error::SendError<T>>626 pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
627 // This is pretty much only useful as a hint anyway, so synchronization isn't critical.
628 if 0 == self.receiver_count() {
629 return Err(error::SendError(value));
630 }
631
632 self.send_replace(value);
633 Ok(())
634 }
635
636 /// Modifies the watched value **unconditionally** in-place,
637 /// notifying all receivers.
638 ///
639 /// This can useful for modifying the watched value, without
640 /// having to allocate a new instance. Additionally, this
641 /// method permits sending values even when there are no receivers.
642 ///
643 /// Prefer to use the more versatile function [`Self::send_if_modified()`]
644 /// if the value is only modified conditionally during the mutable borrow
645 /// to prevent unneeded change notifications for unmodified values.
646 ///
647 /// # Panics
648 ///
649 /// This function panics when the invocation of the `modify` closure panics.
650 /// No receivers are notified when panicking. All changes of the watched
651 /// value applied by the closure before panicking will be visible in
652 /// subsequent calls to `borrow`.
653 ///
654 /// # Examples
655 ///
656 /// ```
657 /// use tokio::sync::watch;
658 ///
659 /// struct State {
660 /// counter: usize,
661 /// }
662 /// let (state_tx, state_rx) = watch::channel(State { counter: 0 });
663 /// state_tx.send_modify(|state| state.counter += 1);
664 /// assert_eq!(state_rx.borrow().counter, 1);
665 /// ```
send_modify<F>(&self, modify: F) where F: FnOnce(&mut T),666 pub fn send_modify<F>(&self, modify: F)
667 where
668 F: FnOnce(&mut T),
669 {
670 self.send_if_modified(|value| {
671 modify(value);
672 true
673 });
674 }
675
676 /// Modifies the watched value **conditionally** in-place,
677 /// notifying all receivers only if modified.
678 ///
679 /// This can useful for modifying the watched value, without
680 /// having to allocate a new instance. Additionally, this
681 /// method permits sending values even when there are no receivers.
682 ///
683 /// The `modify` closure must return `true` if the value has actually
684 /// been modified during the mutable borrow. It should only return `false`
685 /// if the value is guaranteed to be unmodified despite the mutable
686 /// borrow.
687 ///
688 /// Receivers are only notified if the closure returned `true`. If the
689 /// closure has modified the value but returned `false` this results
690 /// in a *silent modification*, i.e. the modified value will be visible
691 /// in subsequent calls to `borrow`, but receivers will not receive
692 /// a change notification.
693 ///
694 /// Returns the result of the closure, i.e. `true` if the value has
695 /// been modified and `false` otherwise.
696 ///
697 /// # Panics
698 ///
699 /// This function panics when the invocation of the `modify` closure panics.
700 /// No receivers are notified when panicking. All changes of the watched
701 /// value applied by the closure before panicking will be visible in
702 /// subsequent calls to `borrow`.
703 ///
704 /// # Examples
705 ///
706 /// ```
707 /// use tokio::sync::watch;
708 ///
709 /// struct State {
710 /// counter: usize,
711 /// }
712 /// let (state_tx, mut state_rx) = watch::channel(State { counter: 1 });
713 /// let inc_counter_if_odd = |state: &mut State| {
714 /// if state.counter % 2 == 1 {
715 /// state.counter += 1;
716 /// return true;
717 /// }
718 /// false
719 /// };
720 ///
721 /// assert_eq!(state_rx.borrow().counter, 1);
722 ///
723 /// assert!(!state_rx.has_changed().unwrap());
724 /// assert!(state_tx.send_if_modified(inc_counter_if_odd));
725 /// assert!(state_rx.has_changed().unwrap());
726 /// assert_eq!(state_rx.borrow_and_update().counter, 2);
727 ///
728 /// assert!(!state_rx.has_changed().unwrap());
729 /// assert!(!state_tx.send_if_modified(inc_counter_if_odd));
730 /// assert!(!state_rx.has_changed().unwrap());
731 /// assert_eq!(state_rx.borrow_and_update().counter, 2);
732 /// ```
send_if_modified<F>(&self, modify: F) -> bool where F: FnOnce(&mut T) -> bool,733 pub fn send_if_modified<F>(&self, modify: F) -> bool
734 where
735 F: FnOnce(&mut T) -> bool,
736 {
737 {
738 // Acquire the write lock and update the value.
739 let mut lock = self.shared.value.write().unwrap();
740
741 // Update the value and catch possible panic inside func.
742 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| modify(&mut lock)));
743 match result {
744 Ok(modified) => {
745 if !modified {
746 // Abort, i.e. don't notify receivers if unmodified
747 return false;
748 }
749 // Continue if modified
750 }
751 Err(panicked) => {
752 // Drop the lock to avoid poisoning it.
753 drop(lock);
754 // Forward the panic to the caller.
755 panic::resume_unwind(panicked);
756 // Unreachable
757 }
758 };
759
760 self.shared.state.increment_version();
761
762 // Release the write lock.
763 //
764 // Incrementing the version counter while holding the lock ensures
765 // that receivers are able to figure out the version number of the
766 // value they are currently looking at.
767 drop(lock);
768 }
769
770 self.shared.notify_rx.notify_waiters();
771
772 true
773 }
774
775 /// Sends a new value via the channel, notifying all receivers and returning
776 /// the previous value in the channel.
777 ///
778 /// This can be useful for reusing the buffers inside a watched value.
779 /// Additionally, this method permits sending values even when there are no
780 /// receivers.
781 ///
782 /// # Examples
783 ///
784 /// ```
785 /// use tokio::sync::watch;
786 ///
787 /// let (tx, _rx) = watch::channel(1);
788 /// assert_eq!(tx.send_replace(2), 1);
789 /// assert_eq!(tx.send_replace(3), 2);
790 /// ```
send_replace(&self, mut value: T) -> T791 pub fn send_replace(&self, mut value: T) -> T {
792 // swap old watched value with the new one
793 self.send_modify(|old| mem::swap(old, &mut value));
794
795 value
796 }
797
798 /// Returns a reference to the most recently sent value
799 ///
800 /// Outstanding borrows hold a read lock on the inner value. This means that
801 /// long-lived borrows could cause the producer half to block. It is recommended
802 /// to keep the borrow as short-lived as possible. Additionally, if you are
803 /// running in an environment that allows `!Send` futures, you must ensure that
804 /// the returned `Ref` type is never held alive across an `.await` point,
805 /// otherwise, it can lead to a deadlock.
806 ///
807 /// # Examples
808 ///
809 /// ```
810 /// use tokio::sync::watch;
811 ///
812 /// let (tx, _) = watch::channel("hello");
813 /// assert_eq!(*tx.borrow(), "hello");
814 /// ```
borrow(&self) -> Ref<'_, T>815 pub fn borrow(&self) -> Ref<'_, T> {
816 let inner = self.shared.value.read().unwrap();
817
818 // The sender/producer always sees the current version
819 let has_changed = false;
820
821 Ref { inner, has_changed }
822 }
823
824 /// Checks if the channel has been closed. This happens when all receivers
825 /// have dropped.
826 ///
827 /// # Examples
828 ///
829 /// ```
830 /// let (tx, rx) = tokio::sync::watch::channel(());
831 /// assert!(!tx.is_closed());
832 ///
833 /// drop(rx);
834 /// assert!(tx.is_closed());
835 /// ```
is_closed(&self) -> bool836 pub fn is_closed(&self) -> bool {
837 self.receiver_count() == 0
838 }
839
840 /// Completes when all receivers have dropped.
841 ///
842 /// This allows the producer to get notified when interest in the produced
843 /// values is canceled and immediately stop doing work.
844 ///
845 /// # Cancel safety
846 ///
847 /// This method is cancel safe. Once the channel is closed, it stays closed
848 /// forever and all future calls to `closed` will return immediately.
849 ///
850 /// # Examples
851 ///
852 /// ```
853 /// use tokio::sync::watch;
854 ///
855 /// #[tokio::main]
856 /// async fn main() {
857 /// let (tx, rx) = watch::channel("hello");
858 ///
859 /// tokio::spawn(async move {
860 /// // use `rx`
861 /// drop(rx);
862 /// });
863 ///
864 /// // Waits for `rx` to drop
865 /// tx.closed().await;
866 /// println!("the `rx` handles dropped")
867 /// }
868 /// ```
closed(&self)869 pub async fn closed(&self) {
870 while self.receiver_count() > 0 {
871 let notified = self.shared.notify_tx.notified();
872
873 if self.receiver_count() == 0 {
874 return;
875 }
876
877 notified.await;
878 // The channel could have been reopened in the meantime by calling
879 // `subscribe`, so we loop again.
880 }
881 }
882
883 /// Creates a new [`Receiver`] connected to this `Sender`.
884 ///
885 /// All messages sent before this call to `subscribe` are initially marked
886 /// as seen by the new `Receiver`.
887 ///
888 /// This method can be called even if there are no other receivers. In this
889 /// case, the channel is reopened.
890 ///
891 /// # Examples
892 ///
893 /// The new channel will receive messages sent on this `Sender`.
894 ///
895 /// ```
896 /// use tokio::sync::watch;
897 ///
898 /// #[tokio::main]
899 /// async fn main() {
900 /// let (tx, _rx) = watch::channel(0u64);
901 ///
902 /// tx.send(5).unwrap();
903 ///
904 /// let rx = tx.subscribe();
905 /// assert_eq!(5, *rx.borrow());
906 ///
907 /// tx.send(10).unwrap();
908 /// assert_eq!(10, *rx.borrow());
909 /// }
910 /// ```
911 ///
912 /// The most recent message is considered seen by the channel, so this test
913 /// is guaranteed to pass.
914 ///
915 /// ```
916 /// use tokio::sync::watch;
917 /// use tokio::time::Duration;
918 ///
919 /// #[tokio::main]
920 /// async fn main() {
921 /// let (tx, _rx) = watch::channel(0u64);
922 /// tx.send(5).unwrap();
923 /// let mut rx = tx.subscribe();
924 ///
925 /// tokio::spawn(async move {
926 /// // by spawning and sleeping, the message is sent after `main`
927 /// // hits the call to `changed`.
928 /// # if false {
929 /// tokio::time::sleep(Duration::from_millis(10)).await;
930 /// # }
931 /// tx.send(100).unwrap();
932 /// });
933 ///
934 /// rx.changed().await.unwrap();
935 /// assert_eq!(100, *rx.borrow());
936 /// }
937 /// ```
subscribe(&self) -> Receiver<T>938 pub fn subscribe(&self) -> Receiver<T> {
939 let shared = self.shared.clone();
940 let version = shared.state.load().version();
941
942 // The CLOSED bit in the state tracks only whether the sender is
943 // dropped, so we do not need to unset it if this reopens the channel.
944 Receiver::from_shared(version, shared)
945 }
946
947 /// Returns the number of receivers that currently exist.
948 ///
949 /// # Examples
950 ///
951 /// ```
952 /// use tokio::sync::watch;
953 ///
954 /// #[tokio::main]
955 /// async fn main() {
956 /// let (tx, rx1) = watch::channel("hello");
957 ///
958 /// assert_eq!(1, tx.receiver_count());
959 ///
960 /// let mut _rx2 = rx1.clone();
961 ///
962 /// assert_eq!(2, tx.receiver_count());
963 /// }
964 /// ```
receiver_count(&self) -> usize965 pub fn receiver_count(&self) -> usize {
966 self.shared.ref_count_rx.load(Relaxed)
967 }
968 }
969
970 impl<T> Drop for Sender<T> {
drop(&mut self)971 fn drop(&mut self) {
972 self.shared.state.set_closed();
973 self.shared.notify_rx.notify_waiters();
974 }
975 }
976
977 // ===== impl Ref =====
978
979 impl<T> ops::Deref for Ref<'_, T> {
980 type Target = T;
981
deref(&self) -> &T982 fn deref(&self) -> &T {
983 self.inner.deref()
984 }
985 }
986
987 #[cfg(all(test, loom))]
988 mod tests {
989 use futures::future::FutureExt;
990 use loom::thread;
991
992 // test for https://github.com/tokio-rs/tokio/issues/3168
993 #[test]
watch_spurious_wakeup()994 fn watch_spurious_wakeup() {
995 loom::model(|| {
996 let (send, mut recv) = crate::sync::watch::channel(0i32);
997
998 send.send(1).unwrap();
999
1000 let send_thread = thread::spawn(move || {
1001 send.send(2).unwrap();
1002 send
1003 });
1004
1005 recv.changed().now_or_never();
1006
1007 let send = send_thread.join().unwrap();
1008 let recv_thread = thread::spawn(move || {
1009 recv.changed().now_or_never();
1010 recv.changed().now_or_never();
1011 recv
1012 });
1013
1014 send.send(3).unwrap();
1015
1016 let mut recv = recv_thread.join().unwrap();
1017 let send_thread = thread::spawn(move || {
1018 send.send(2).unwrap();
1019 });
1020
1021 recv.changed().now_or_never();
1022
1023 send_thread.join().unwrap();
1024 });
1025 }
1026
1027 #[test]
watch_borrow()1028 fn watch_borrow() {
1029 loom::model(|| {
1030 let (send, mut recv) = crate::sync::watch::channel(0i32);
1031
1032 assert!(send.borrow().eq(&0));
1033 assert!(recv.borrow().eq(&0));
1034
1035 send.send(1).unwrap();
1036 assert!(send.borrow().eq(&1));
1037
1038 let send_thread = thread::spawn(move || {
1039 send.send(2).unwrap();
1040 send
1041 });
1042
1043 recv.changed().now_or_never();
1044
1045 let send = send_thread.join().unwrap();
1046 let recv_thread = thread::spawn(move || {
1047 recv.changed().now_or_never();
1048 recv.changed().now_or_never();
1049 recv
1050 });
1051
1052 send.send(3).unwrap();
1053
1054 let recv = recv_thread.join().unwrap();
1055 assert!(recv.borrow().eq(&3));
1056 assert!(send.borrow().eq(&3));
1057
1058 send.send(2).unwrap();
1059
1060 thread::spawn(move || {
1061 assert!(recv.borrow().eq(&2));
1062 });
1063 assert!(send.borrow().eq(&2));
1064 });
1065 }
1066 }
1067