1 #![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2
3 //! A single-producer, multi-consumer channel that only retains the *last* sent
4 //! value.
5 //!
6 //! This channel is useful for watching for changes to a value from multiple
7 //! points in the code base, for example, changes to configuration values.
8 //!
9 //! # Usage
10 //!
11 //! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer
12 //! and consumer halves of the channel. The channel is created with an initial
13 //! value. The **latest** value stored in the channel is accessed with
14 //! [`Receiver::borrow()`]. Awaiting [`Receiver::changed()`] waits for a new
15 //! value to be sent by the [`Sender`] half.
16 //!
17 //! # Examples
18 //!
19 //! ```
20 //! use tokio::sync::watch;
21 //!
22 //! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
23 //! let (tx, mut rx) = watch::channel("hello");
24 //!
25 //! tokio::spawn(async move {
26 //! while rx.changed().await.is_ok() {
27 //! println!("received = {:?}", *rx.borrow());
28 //! }
29 //! });
30 //!
31 //! tx.send("world")?;
32 //! # Ok(())
33 //! # }
34 //! ```
35 //!
36 //! # Closing
37 //!
38 //! [`Sender::is_closed`] and [`Sender::closed`] allow the producer to detect
39 //! when all [`Receiver`] handles have been dropped. This indicates that there
40 //! is no further interest in the values being produced and work can be stopped.
41 //!
42 //! The value in the channel will not be dropped until the sender and all receivers
43 //! have been dropped.
44 //!
45 //! # Thread safety
46 //!
47 //! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
48 //! threads and can be used in a concurrent environment. Clones of [`Receiver`]
49 //! handles may be moved to separate threads and also used concurrently.
50 //!
51 //! [`Sender`]: crate::sync::watch::Sender
52 //! [`Receiver`]: crate::sync::watch::Receiver
53 //! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
54 //! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow
55 //! [`channel`]: crate::sync::watch::channel
56 //! [`Sender::is_closed`]: crate::sync::watch::Sender::is_closed
57 //! [`Sender::closed`]: crate::sync::watch::Sender::closed
58
59 use crate::sync::notify::Notify;
60
61 use crate::loom::sync::atomic::AtomicUsize;
62 use crate::loom::sync::atomic::Ordering::Relaxed;
63 use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
64 use std::fmt;
65 use std::mem;
66 use std::ops;
67 use std::panic;
68
69 /// Receives values from the associated [`Sender`](struct@Sender).
70 ///
71 /// Instances are created by the [`channel`](fn@channel) function.
72 ///
73 /// To turn this receiver into a `Stream`, you can use the [`WatchStream`]
74 /// wrapper.
75 ///
76 /// [`WatchStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.WatchStream.html
77 #[derive(Debug)]
78 pub struct Receiver<T> {
79 /// Pointer to the shared state
80 shared: Arc<Shared<T>>,
81
82 /// Last observed version
83 version: Version,
84 }
85
86 /// Sends values to the associated [`Receiver`](struct@Receiver).
87 ///
88 /// Instances are created by the [`channel`](fn@channel) function.
89 #[derive(Debug)]
90 pub struct Sender<T> {
91 shared: Arc<Shared<T>>,
92 }
93
94 /// Returns a reference to the inner value.
95 ///
96 /// Outstanding borrows hold a read lock on the inner value. This means that
97 /// long-lived borrows could cause the producer half to block. It is recommended
98 /// to keep the borrow as short-lived as possible. Additionally, if you are
99 /// running in an environment that allows `!Send` futures, you must ensure that
100 /// the returned `Ref` type is never held alive across an `.await` point,
101 /// otherwise, it can lead to a deadlock.
102 ///
103 /// The priority policy of the lock is dependent on the underlying lock
104 /// implementation, and this type does not guarantee that any particular policy
105 /// will be used. In particular, a producer which is waiting to acquire the lock
106 /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
107 ///
108 /// <details><summary>Potential deadlock example</summary>
109 ///
110 /// ```text
111 /// // Task 1 (on thread A) | // Task 2 (on thread B)
112 /// let _ref1 = rx.borrow(); |
113 /// | // will block
114 /// | let _ = tx.send(());
115 /// // may deadlock |
116 /// let _ref2 = rx.borrow(); |
117 /// ```
118 /// </details>
119 #[derive(Debug)]
120 pub struct Ref<'a, T> {
121 inner: RwLockReadGuard<'a, T>,
122 has_changed: bool,
123 }
124
125 impl<'a, T> Ref<'a, T> {
126 /// Indicates if the borrowed value is considered as _changed_ since the last
127 /// time it has been marked as seen.
128 ///
129 /// Unlike [`Receiver::has_changed()`], this method does not fail if the channel is closed.
130 ///
131 /// When borrowed from the [`Sender`] this function will always return `false`.
132 ///
133 /// # Examples
134 ///
135 /// ```
136 /// use tokio::sync::watch;
137 ///
138 /// #[tokio::main]
139 /// async fn main() {
140 /// let (tx, mut rx) = watch::channel("hello");
141 ///
142 /// tx.send("goodbye").unwrap();
143 /// // The sender does never consider the value as changed.
144 /// assert!(!tx.borrow().has_changed());
145 ///
146 /// // Drop the sender immediately, just for testing purposes.
147 /// drop(tx);
148 ///
149 /// // Even if the sender has already been dropped...
150 /// assert!(rx.has_changed().is_err());
151 /// // ...the modified value is still readable and detected as changed.
152 /// assert_eq!(*rx.borrow(), "goodbye");
153 /// assert!(rx.borrow().has_changed());
154 ///
155 /// // Read the changed value and mark it as seen.
156 /// {
157 /// let received = rx.borrow_and_update();
158 /// assert_eq!(*received, "goodbye");
159 /// assert!(received.has_changed());
160 /// // Release the read lock when leaving this scope.
161 /// }
162 ///
163 /// // Now the value has already been marked as seen and could
164 /// // never be modified again (after the sender has been dropped).
165 /// assert!(!rx.borrow().has_changed());
166 /// }
167 /// ```
has_changed(&self) -> bool168 pub fn has_changed(&self) -> bool {
169 self.has_changed
170 }
171 }
172
173 struct Shared<T> {
174 /// The most recent value.
175 value: RwLock<T>,
176
177 /// The current version.
178 ///
179 /// The lowest bit represents a "closed" state. The rest of the bits
180 /// represent the current version.
181 state: AtomicState,
182
183 /// Tracks the number of `Receiver` instances.
184 ref_count_rx: AtomicUsize,
185
186 /// Notifies waiting receivers that the value changed.
187 notify_rx: big_notify::BigNotify,
188
189 /// Notifies any task listening for `Receiver` dropped events.
190 notify_tx: Notify,
191 }
192
193 impl<T: fmt::Debug> fmt::Debug for Shared<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result194 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195 let state = self.state.load();
196 f.debug_struct("Shared")
197 .field("value", &self.value)
198 .field("version", &state.version())
199 .field("is_closed", &state.is_closed())
200 .field("ref_count_rx", &self.ref_count_rx)
201 .finish()
202 }
203 }
204
205 pub mod error {
206 //! Watch error types.
207
208 use std::error::Error;
209 use std::fmt;
210
211 /// Error produced when sending a value fails.
212 #[derive(PartialEq, Eq, Clone, Copy)]
213 pub struct SendError<T>(pub T);
214
215 // ===== impl SendError =====
216
217 impl<T> fmt::Debug for SendError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result218 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219 f.debug_struct("SendError").finish_non_exhaustive()
220 }
221 }
222
223 impl<T> fmt::Display for SendError<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result224 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
225 write!(fmt, "channel closed")
226 }
227 }
228
229 impl<T> Error for SendError<T> {}
230
231 /// Error produced when receiving a change notification.
232 #[derive(Debug, Clone)]
233 pub struct RecvError(pub(super) ());
234
235 // ===== impl RecvError =====
236
237 impl fmt::Display for RecvError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result238 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
239 write!(fmt, "channel closed")
240 }
241 }
242
243 impl Error for RecvError {}
244 }
245
246 mod big_notify {
247 use super::*;
248 use crate::sync::notify::Notified;
249
250 // To avoid contention on the lock inside the `Notify`, we store multiple
251 // copies of it. Then, we use either circular access or randomness to spread
252 // out threads over different `Notify` objects.
253 //
254 // Some simple benchmarks show that randomness performs slightly better than
255 // circular access (probably due to contention on `next`), so we prefer to
256 // use randomness when Tokio is compiled with a random number generator.
257 //
258 // When the random number generator is not available, we fall back to
259 // circular access.
260
261 pub(super) struct BigNotify {
262 #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
263 next: AtomicUsize,
264 inner: [Notify; 8],
265 }
266
267 impl BigNotify {
new() -> Self268 pub(super) fn new() -> Self {
269 Self {
270 #[cfg(not(all(
271 not(loom),
272 feature = "sync",
273 any(feature = "rt", feature = "macros")
274 )))]
275 next: AtomicUsize::new(0),
276 inner: Default::default(),
277 }
278 }
279
notify_waiters(&self)280 pub(super) fn notify_waiters(&self) {
281 for notify in &self.inner {
282 notify.notify_waiters();
283 }
284 }
285
286 /// This function implements the case where randomness is not available.
287 #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
notified(&self) -> Notified<'_>288 pub(super) fn notified(&self) -> Notified<'_> {
289 let i = self.next.fetch_add(1, Relaxed) % 8;
290 self.inner[i].notified()
291 }
292
293 /// This function implements the case where randomness is available.
294 #[cfg(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros")))]
notified(&self) -> Notified<'_>295 pub(super) fn notified(&self) -> Notified<'_> {
296 let i = crate::runtime::context::thread_rng_n(8) as usize;
297 self.inner[i].notified()
298 }
299 }
300 }
301
302 use self::state::{AtomicState, Version};
303 mod state {
304 use crate::loom::sync::atomic::AtomicUsize;
305 use crate::loom::sync::atomic::Ordering::SeqCst;
306
307 const CLOSED: usize = 1;
308
309 /// The version part of the state. The lowest bit is always zero.
310 #[derive(Copy, Clone, Debug, Eq, PartialEq)]
311 pub(super) struct Version(usize);
312
313 /// Snapshot of the state. The first bit is used as the CLOSED bit.
314 /// The remaining bits are used as the version.
315 ///
316 /// The CLOSED bit tracks whether the Sender has been dropped. Dropping all
317 /// receivers does not set it.
318 #[derive(Copy, Clone, Debug)]
319 pub(super) struct StateSnapshot(usize);
320
321 /// The state stored in an atomic integer.
322 #[derive(Debug)]
323 pub(super) struct AtomicState(AtomicUsize);
324
325 impl Version {
326 /// Get the initial version when creating the channel.
initial() -> Self327 pub(super) fn initial() -> Self {
328 Version(0)
329 }
330 }
331
332 impl StateSnapshot {
333 /// Extract the version from the state.
version(self) -> Version334 pub(super) fn version(self) -> Version {
335 Version(self.0 & !CLOSED)
336 }
337
338 /// Is the closed bit set?
is_closed(self) -> bool339 pub(super) fn is_closed(self) -> bool {
340 (self.0 & CLOSED) == CLOSED
341 }
342 }
343
344 impl AtomicState {
345 /// Create a new `AtomicState` that is not closed and which has the
346 /// version set to `Version::initial()`.
new() -> Self347 pub(super) fn new() -> Self {
348 AtomicState(AtomicUsize::new(0))
349 }
350
351 /// Load the current value of the state.
load(&self) -> StateSnapshot352 pub(super) fn load(&self) -> StateSnapshot {
353 StateSnapshot(self.0.load(SeqCst))
354 }
355
356 /// Increment the version counter.
increment_version(&self)357 pub(super) fn increment_version(&self) {
358 // Increment by two to avoid touching the CLOSED bit.
359 self.0.fetch_add(2, SeqCst);
360 }
361
362 /// Set the closed bit in the state.
set_closed(&self)363 pub(super) fn set_closed(&self) {
364 self.0.fetch_or(CLOSED, SeqCst);
365 }
366 }
367 }
368
369 /// Creates a new watch channel, returning the "send" and "receive" handles.
370 ///
371 /// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
372 /// Only the last value sent is made available to the [`Receiver`] half. All
373 /// intermediate values are dropped.
374 ///
375 /// # Examples
376 ///
377 /// ```
378 /// use tokio::sync::watch;
379 ///
380 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
381 /// let (tx, mut rx) = watch::channel("hello");
382 ///
383 /// tokio::spawn(async move {
384 /// while rx.changed().await.is_ok() {
385 /// println!("received = {:?}", *rx.borrow());
386 /// }
387 /// });
388 ///
389 /// tx.send("world")?;
390 /// # Ok(())
391 /// # }
392 /// ```
393 ///
394 /// [`Sender`]: struct@Sender
395 /// [`Receiver`]: struct@Receiver
channel<T>(init: T) -> (Sender<T>, Receiver<T>)396 pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
397 let shared = Arc::new(Shared {
398 value: RwLock::new(init),
399 state: AtomicState::new(),
400 ref_count_rx: AtomicUsize::new(1),
401 notify_rx: big_notify::BigNotify::new(),
402 notify_tx: Notify::new(),
403 });
404
405 let tx = Sender {
406 shared: shared.clone(),
407 };
408
409 let rx = Receiver {
410 shared,
411 version: Version::initial(),
412 };
413
414 (tx, rx)
415 }
416
417 impl<T> Receiver<T> {
from_shared(version: Version, shared: Arc<Shared<T>>) -> Self418 fn from_shared(version: Version, shared: Arc<Shared<T>>) -> Self {
419 // No synchronization necessary as this is only used as a counter and
420 // not memory access.
421 shared.ref_count_rx.fetch_add(1, Relaxed);
422
423 Self { shared, version }
424 }
425
426 /// Returns a reference to the most recently sent value.
427 ///
428 /// This method does not mark the returned value as seen, so future calls to
429 /// [`changed`] may return immediately even if you have already seen the
430 /// value with a call to `borrow`.
431 ///
432 /// Outstanding borrows hold a read lock on the inner value. This means that
433 /// long-lived borrows could cause the producer half to block. It is recommended
434 /// to keep the borrow as short-lived as possible. Additionally, if you are
435 /// running in an environment that allows `!Send` futures, you must ensure that
436 /// the returned `Ref` type is never held alive across an `.await` point,
437 /// otherwise, it can lead to a deadlock.
438 ///
439 /// The priority policy of the lock is dependent on the underlying lock
440 /// implementation, and this type does not guarantee that any particular policy
441 /// will be used. In particular, a producer which is waiting to acquire the lock
442 /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
443 ///
444 /// <details><summary>Potential deadlock example</summary>
445 ///
446 /// ```text
447 /// // Task 1 (on thread A) | // Task 2 (on thread B)
448 /// let _ref1 = rx.borrow(); |
449 /// | // will block
450 /// | let _ = tx.send(());
451 /// // may deadlock |
452 /// let _ref2 = rx.borrow(); |
453 /// ```
454 /// </details>
455 ///
456 /// [`changed`]: Receiver::changed
457 ///
458 /// # Examples
459 ///
460 /// ```
461 /// use tokio::sync::watch;
462 ///
463 /// let (_, rx) = watch::channel("hello");
464 /// assert_eq!(*rx.borrow(), "hello");
465 /// ```
borrow(&self) -> Ref<'_, T>466 pub fn borrow(&self) -> Ref<'_, T> {
467 let inner = self.shared.value.read().unwrap();
468
469 // After obtaining a read-lock no concurrent writes could occur
470 // and the loaded version matches that of the borrowed reference.
471 let new_version = self.shared.state.load().version();
472 let has_changed = self.version != new_version;
473
474 Ref { inner, has_changed }
475 }
476
477 /// Returns a reference to the most recently sent value and marks that value
478 /// as seen.
479 ///
480 /// This method marks the current value as seen. Subsequent calls to [`changed`]
481 /// will not return immediately until the [`Sender`] has modified the shared
482 /// value again.
483 ///
484 /// Outstanding borrows hold a read lock on the inner value. This means that
485 /// long-lived borrows could cause the producer half to block. It is recommended
486 /// to keep the borrow as short-lived as possible. Additionally, if you are
487 /// running in an environment that allows `!Send` futures, you must ensure that
488 /// the returned `Ref` type is never held alive across an `.await` point,
489 /// otherwise, it can lead to a deadlock.
490 ///
491 /// The priority policy of the lock is dependent on the underlying lock
492 /// implementation, and this type does not guarantee that any particular policy
493 /// will be used. In particular, a producer which is waiting to acquire the lock
494 /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
495 ///
496 /// <details><summary>Potential deadlock example</summary>
497 ///
498 /// ```text
499 /// // Task 1 (on thread A) | // Task 2 (on thread B)
500 /// let _ref1 = rx1.borrow_and_update(); |
501 /// | // will block
502 /// | let _ = tx.send(());
503 /// // may deadlock |
504 /// let _ref2 = rx2.borrow_and_update(); |
505 /// ```
506 /// </details>
507 ///
508 /// [`changed`]: Receiver::changed
borrow_and_update(&mut self) -> Ref<'_, T>509 pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
510 let inner = self.shared.value.read().unwrap();
511
512 // After obtaining a read-lock no concurrent writes could occur
513 // and the loaded version matches that of the borrowed reference.
514 let new_version = self.shared.state.load().version();
515 let has_changed = self.version != new_version;
516
517 // Mark the shared value as seen by updating the version
518 self.version = new_version;
519
520 Ref { inner, has_changed }
521 }
522
523 /// Checks if this channel contains a message that this receiver has not yet
524 /// seen. The new value is not marked as seen.
525 ///
526 /// Although this method is called `has_changed`, it does not check new
527 /// messages for equality, so this call will return true even if the new
528 /// message is equal to the old message.
529 ///
530 /// Returns an error if the channel has been closed.
531 /// # Examples
532 ///
533 /// ```
534 /// use tokio::sync::watch;
535 ///
536 /// #[tokio::main]
537 /// async fn main() {
538 /// let (tx, mut rx) = watch::channel("hello");
539 ///
540 /// tx.send("goodbye").unwrap();
541 ///
542 /// assert!(rx.has_changed().unwrap());
543 /// assert_eq!(*rx.borrow_and_update(), "goodbye");
544 ///
545 /// // The value has been marked as seen
546 /// assert!(!rx.has_changed().unwrap());
547 ///
548 /// drop(tx);
549 /// // The `tx` handle has been dropped
550 /// assert!(rx.has_changed().is_err());
551 /// }
552 /// ```
has_changed(&self) -> Result<bool, error::RecvError>553 pub fn has_changed(&self) -> Result<bool, error::RecvError> {
554 // Load the version from the state
555 let state = self.shared.state.load();
556 if state.is_closed() {
557 // The sender has dropped.
558 return Err(error::RecvError(()));
559 }
560 let new_version = state.version();
561
562 Ok(self.version != new_version)
563 }
564
565 /// Waits for a change notification, then marks the newest value as seen.
566 ///
567 /// If the newest value in the channel has not yet been marked seen when
568 /// this method is called, the method marks that value seen and returns
569 /// immediately. If the newest value has already been marked seen, then the
570 /// method sleeps until a new message is sent by the [`Sender`] connected to
571 /// this `Receiver`, or until the [`Sender`] is dropped.
572 ///
573 /// This method returns an error if and only if the [`Sender`] is dropped.
574 ///
575 /// # Cancel safety
576 ///
577 /// This method is cancel safe. If you use it as the event in a
578 /// [`tokio::select!`](crate::select) statement and some other branch
579 /// completes first, then it is guaranteed that no values have been marked
580 /// seen by this call to `changed`.
581 ///
582 /// [`Sender`]: struct@Sender
583 ///
584 /// # Examples
585 ///
586 /// ```
587 /// use tokio::sync::watch;
588 ///
589 /// #[tokio::main]
590 /// async fn main() {
591 /// let (tx, mut rx) = watch::channel("hello");
592 ///
593 /// tokio::spawn(async move {
594 /// tx.send("goodbye").unwrap();
595 /// });
596 ///
597 /// assert!(rx.changed().await.is_ok());
598 /// assert_eq!(*rx.borrow(), "goodbye");
599 ///
600 /// // The `tx` handle has been dropped
601 /// assert!(rx.changed().await.is_err());
602 /// }
603 /// ```
changed(&mut self) -> Result<(), error::RecvError>604 pub async fn changed(&mut self) -> Result<(), error::RecvError> {
605 changed_impl(&self.shared, &mut self.version).await
606 }
607
608 /// Waits for a value that satisifes the provided condition.
609 ///
610 /// This method will call the provided closure whenever something is sent on
611 /// the channel. Once the closure returns `true`, this method will return a
612 /// reference to the value that was passed to the closure.
613 ///
614 /// Before `wait_for` starts waiting for changes, it will call the closure
615 /// on the current value. If the closure returns `true` when given the
616 /// current value, then `wait_for` will immediately return a reference to
617 /// the current value. This is the case even if the current value is already
618 /// considered seen.
619 ///
620 /// The watch channel only keeps track of the most recent value, so if
621 /// several messages are sent faster than `wait_for` is able to call the
622 /// closure, then it may skip some updates. Whenever the closure is called,
623 /// it will be called with the most recent value.
624 ///
625 /// When this function returns, the value that was passed to the closure
626 /// when it returned `true` will be considered seen.
627 ///
628 /// If the channel is closed, then `wait_for` will return a `RecvError`.
629 /// Once this happens, no more messages can ever be sent on the channel.
630 /// When an error is returned, it is guaranteed that the closure has been
631 /// called on the last value, and that it returned `false` for that value.
632 /// (If the closure returned `true`, then the last value would have been
633 /// returned instead of the error.)
634 ///
635 /// Like the `borrow` method, the returned borrow holds a read lock on the
636 /// inner value. This means that long-lived borrows could cause the producer
637 /// half to block. It is recommended to keep the borrow as short-lived as
638 /// possible. See the documentation of `borrow` for more information on
639 /// this.
640 ///
641 /// [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
642 ///
643 /// # Examples
644 ///
645 /// ```
646 /// use tokio::sync::watch;
647 ///
648 /// #[tokio::main]
649 ///
650 /// async fn main() {
651 /// let (tx, _rx) = watch::channel("hello");
652 ///
653 /// tx.send("goodbye").unwrap();
654 ///
655 /// // here we subscribe to a second receiver
656 /// // now in case of using `changed` we would have
657 /// // to first check the current value and then wait
658 /// // for changes or else `changed` would hang.
659 /// let mut rx2 = tx.subscribe();
660 ///
661 /// // in place of changed we have use `wait_for`
662 /// // which would automatically check the current value
663 /// // and wait for changes until the closure returns true.
664 /// assert!(rx2.wait_for(|val| *val == "goodbye").await.is_ok());
665 /// assert_eq!(*rx2.borrow(), "goodbye");
666 /// }
667 /// ```
wait_for( &mut self, mut f: impl FnMut(&T) -> bool, ) -> Result<Ref<'_, T>, error::RecvError>668 pub async fn wait_for(
669 &mut self,
670 mut f: impl FnMut(&T) -> bool,
671 ) -> Result<Ref<'_, T>, error::RecvError> {
672 let mut closed = false;
673 loop {
674 {
675 let inner = self.shared.value.read().unwrap();
676
677 let new_version = self.shared.state.load().version();
678 let has_changed = self.version != new_version;
679 self.version = new_version;
680
681 if (!closed || has_changed) && f(&inner) {
682 return Ok(Ref { inner, has_changed });
683 }
684 }
685
686 if closed {
687 return Err(error::RecvError(()));
688 }
689
690 // Wait for the value to change.
691 closed = changed_impl(&self.shared, &mut self.version).await.is_err();
692 }
693 }
694
695 /// Returns `true` if receivers belong to the same channel.
696 ///
697 /// # Examples
698 ///
699 /// ```
700 /// let (tx, rx) = tokio::sync::watch::channel(true);
701 /// let rx2 = rx.clone();
702 /// assert!(rx.same_channel(&rx2));
703 ///
704 /// let (tx3, rx3) = tokio::sync::watch::channel(true);
705 /// assert!(!rx3.same_channel(&rx2));
706 /// ```
same_channel(&self, other: &Self) -> bool707 pub fn same_channel(&self, other: &Self) -> bool {
708 Arc::ptr_eq(&self.shared, &other.shared)
709 }
710
711 cfg_process_driver! {
712 pub(crate) fn try_has_changed(&mut self) -> Option<Result<(), error::RecvError>> {
713 maybe_changed(&self.shared, &mut self.version)
714 }
715 }
716 }
717
maybe_changed<T>( shared: &Shared<T>, version: &mut Version, ) -> Option<Result<(), error::RecvError>>718 fn maybe_changed<T>(
719 shared: &Shared<T>,
720 version: &mut Version,
721 ) -> Option<Result<(), error::RecvError>> {
722 // Load the version from the state
723 let state = shared.state.load();
724 let new_version = state.version();
725
726 if *version != new_version {
727 // Observe the new version and return
728 *version = new_version;
729 return Some(Ok(()));
730 }
731
732 if state.is_closed() {
733 // All receivers have dropped.
734 return Some(Err(error::RecvError(())));
735 }
736
737 None
738 }
739
changed_impl<T>( shared: &Shared<T>, version: &mut Version, ) -> Result<(), error::RecvError>740 async fn changed_impl<T>(
741 shared: &Shared<T>,
742 version: &mut Version,
743 ) -> Result<(), error::RecvError> {
744 crate::trace::async_trace_leaf().await;
745
746 loop {
747 // In order to avoid a race condition, we first request a notification,
748 // **then** check the current value's version. If a new version exists,
749 // the notification request is dropped.
750 let notified = shared.notify_rx.notified();
751
752 if let Some(ret) = maybe_changed(shared, version) {
753 return ret;
754 }
755
756 notified.await;
757 // loop around again in case the wake-up was spurious
758 }
759 }
760
761 impl<T> Clone for Receiver<T> {
clone(&self) -> Self762 fn clone(&self) -> Self {
763 let version = self.version;
764 let shared = self.shared.clone();
765
766 Self::from_shared(version, shared)
767 }
768 }
769
770 impl<T> Drop for Receiver<T> {
drop(&mut self)771 fn drop(&mut self) {
772 // No synchronization necessary as this is only used as a counter and
773 // not memory access.
774 if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) {
775 // This is the last `Receiver` handle, tasks waiting on `Sender::closed()`
776 self.shared.notify_tx.notify_waiters();
777 }
778 }
779 }
780
781 impl<T> Sender<T> {
782 /// Sends a new value via the channel, notifying all receivers.
783 ///
784 /// This method fails if the channel is closed, which is the case when
785 /// every receiver has been dropped. It is possible to reopen the channel
786 /// using the [`subscribe`] method. However, when `send` fails, the value
787 /// isn't made available for future receivers (but returned with the
788 /// [`SendError`]).
789 ///
790 /// To always make a new value available for future receivers, even if no
791 /// receiver currently exists, one of the other send methods
792 /// ([`send_if_modified`], [`send_modify`], or [`send_replace`]) can be
793 /// used instead.
794 ///
795 /// [`subscribe`]: Sender::subscribe
796 /// [`SendError`]: error::SendError
797 /// [`send_if_modified`]: Sender::send_if_modified
798 /// [`send_modify`]: Sender::send_modify
799 /// [`send_replace`]: Sender::send_replace
send(&self, value: T) -> Result<(), error::SendError<T>>800 pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
801 // This is pretty much only useful as a hint anyway, so synchronization isn't critical.
802 if 0 == self.receiver_count() {
803 return Err(error::SendError(value));
804 }
805
806 self.send_replace(value);
807 Ok(())
808 }
809
810 /// Modifies the watched value **unconditionally** in-place,
811 /// notifying all receivers.
812 ///
813 /// This can be useful for modifying the watched value, without
814 /// having to allocate a new instance. Additionally, this
815 /// method permits sending values even when there are no receivers.
816 ///
817 /// Prefer to use the more versatile function [`Self::send_if_modified()`]
818 /// if the value is only modified conditionally during the mutable borrow
819 /// to prevent unneeded change notifications for unmodified values.
820 ///
821 /// # Panics
822 ///
823 /// This function panics when the invocation of the `modify` closure panics.
824 /// No receivers are notified when panicking. All changes of the watched
825 /// value applied by the closure before panicking will be visible in
826 /// subsequent calls to `borrow`.
827 ///
828 /// # Examples
829 ///
830 /// ```
831 /// use tokio::sync::watch;
832 ///
833 /// struct State {
834 /// counter: usize,
835 /// }
836 /// let (state_tx, state_rx) = watch::channel(State { counter: 0 });
837 /// state_tx.send_modify(|state| state.counter += 1);
838 /// assert_eq!(state_rx.borrow().counter, 1);
839 /// ```
send_modify<F>(&self, modify: F) where F: FnOnce(&mut T),840 pub fn send_modify<F>(&self, modify: F)
841 where
842 F: FnOnce(&mut T),
843 {
844 self.send_if_modified(|value| {
845 modify(value);
846 true
847 });
848 }
849
850 /// Modifies the watched value **conditionally** in-place,
851 /// notifying all receivers only if modified.
852 ///
853 /// This can be useful for modifying the watched value, without
854 /// having to allocate a new instance. Additionally, this
855 /// method permits sending values even when there are no receivers.
856 ///
857 /// The `modify` closure must return `true` if the value has actually
858 /// been modified during the mutable borrow. It should only return `false`
859 /// if the value is guaranteed to be unmodified despite the mutable
860 /// borrow.
861 ///
862 /// Receivers are only notified if the closure returned `true`. If the
863 /// closure has modified the value but returned `false` this results
864 /// in a *silent modification*, i.e. the modified value will be visible
865 /// in subsequent calls to `borrow`, but receivers will not receive
866 /// a change notification.
867 ///
868 /// Returns the result of the closure, i.e. `true` if the value has
869 /// been modified and `false` otherwise.
870 ///
871 /// # Panics
872 ///
873 /// This function panics when the invocation of the `modify` closure panics.
874 /// No receivers are notified when panicking. All changes of the watched
875 /// value applied by the closure before panicking will be visible in
876 /// subsequent calls to `borrow`.
877 ///
878 /// # Examples
879 ///
880 /// ```
881 /// use tokio::sync::watch;
882 ///
883 /// struct State {
884 /// counter: usize,
885 /// }
886 /// let (state_tx, mut state_rx) = watch::channel(State { counter: 1 });
887 /// let inc_counter_if_odd = |state: &mut State| {
888 /// if state.counter % 2 == 1 {
889 /// state.counter += 1;
890 /// return true;
891 /// }
892 /// false
893 /// };
894 ///
895 /// assert_eq!(state_rx.borrow().counter, 1);
896 ///
897 /// assert!(!state_rx.has_changed().unwrap());
898 /// assert!(state_tx.send_if_modified(inc_counter_if_odd));
899 /// assert!(state_rx.has_changed().unwrap());
900 /// assert_eq!(state_rx.borrow_and_update().counter, 2);
901 ///
902 /// assert!(!state_rx.has_changed().unwrap());
903 /// assert!(!state_tx.send_if_modified(inc_counter_if_odd));
904 /// assert!(!state_rx.has_changed().unwrap());
905 /// assert_eq!(state_rx.borrow_and_update().counter, 2);
906 /// ```
send_if_modified<F>(&self, modify: F) -> bool where F: FnOnce(&mut T) -> bool,907 pub fn send_if_modified<F>(&self, modify: F) -> bool
908 where
909 F: FnOnce(&mut T) -> bool,
910 {
911 {
912 // Acquire the write lock and update the value.
913 let mut lock = self.shared.value.write().unwrap();
914
915 // Update the value and catch possible panic inside func.
916 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| modify(&mut lock)));
917 match result {
918 Ok(modified) => {
919 if !modified {
920 // Abort, i.e. don't notify receivers if unmodified
921 return false;
922 }
923 // Continue if modified
924 }
925 Err(panicked) => {
926 // Drop the lock to avoid poisoning it.
927 drop(lock);
928 // Forward the panic to the caller.
929 panic::resume_unwind(panicked);
930 // Unreachable
931 }
932 };
933
934 self.shared.state.increment_version();
935
936 // Release the write lock.
937 //
938 // Incrementing the version counter while holding the lock ensures
939 // that receivers are able to figure out the version number of the
940 // value they are currently looking at.
941 drop(lock);
942 }
943
944 self.shared.notify_rx.notify_waiters();
945
946 true
947 }
948
949 /// Sends a new value via the channel, notifying all receivers and returning
950 /// the previous value in the channel.
951 ///
952 /// This can be useful for reusing the buffers inside a watched value.
953 /// Additionally, this method permits sending values even when there are no
954 /// receivers.
955 ///
956 /// # Examples
957 ///
958 /// ```
959 /// use tokio::sync::watch;
960 ///
961 /// let (tx, _rx) = watch::channel(1);
962 /// assert_eq!(tx.send_replace(2), 1);
963 /// assert_eq!(tx.send_replace(3), 2);
964 /// ```
send_replace(&self, mut value: T) -> T965 pub fn send_replace(&self, mut value: T) -> T {
966 // swap old watched value with the new one
967 self.send_modify(|old| mem::swap(old, &mut value));
968
969 value
970 }
971
972 /// Returns a reference to the most recently sent value
973 ///
974 /// Outstanding borrows hold a read lock on the inner value. This means that
975 /// long-lived borrows could cause the producer half to block. It is recommended
976 /// to keep the borrow as short-lived as possible. Additionally, if you are
977 /// running in an environment that allows `!Send` futures, you must ensure that
978 /// the returned `Ref` type is never held alive across an `.await` point,
979 /// otherwise, it can lead to a deadlock.
980 ///
981 /// # Examples
982 ///
983 /// ```
984 /// use tokio::sync::watch;
985 ///
986 /// let (tx, _) = watch::channel("hello");
987 /// assert_eq!(*tx.borrow(), "hello");
988 /// ```
borrow(&self) -> Ref<'_, T>989 pub fn borrow(&self) -> Ref<'_, T> {
990 let inner = self.shared.value.read().unwrap();
991
992 // The sender/producer always sees the current version
993 let has_changed = false;
994
995 Ref { inner, has_changed }
996 }
997
998 /// Checks if the channel has been closed. This happens when all receivers
999 /// have dropped.
1000 ///
1001 /// # Examples
1002 ///
1003 /// ```
1004 /// let (tx, rx) = tokio::sync::watch::channel(());
1005 /// assert!(!tx.is_closed());
1006 ///
1007 /// drop(rx);
1008 /// assert!(tx.is_closed());
1009 /// ```
is_closed(&self) -> bool1010 pub fn is_closed(&self) -> bool {
1011 self.receiver_count() == 0
1012 }
1013
1014 /// Completes when all receivers have dropped.
1015 ///
1016 /// This allows the producer to get notified when interest in the produced
1017 /// values is canceled and immediately stop doing work.
1018 ///
1019 /// # Cancel safety
1020 ///
1021 /// This method is cancel safe. Once the channel is closed, it stays closed
1022 /// forever and all future calls to `closed` will return immediately.
1023 ///
1024 /// # Examples
1025 ///
1026 /// ```
1027 /// use tokio::sync::watch;
1028 ///
1029 /// #[tokio::main]
1030 /// async fn main() {
1031 /// let (tx, rx) = watch::channel("hello");
1032 ///
1033 /// tokio::spawn(async move {
1034 /// // use `rx`
1035 /// drop(rx);
1036 /// });
1037 ///
1038 /// // Waits for `rx` to drop
1039 /// tx.closed().await;
1040 /// println!("the `rx` handles dropped")
1041 /// }
1042 /// ```
closed(&self)1043 pub async fn closed(&self) {
1044 crate::trace::async_trace_leaf().await;
1045
1046 while self.receiver_count() > 0 {
1047 let notified = self.shared.notify_tx.notified();
1048
1049 if self.receiver_count() == 0 {
1050 return;
1051 }
1052
1053 notified.await;
1054 // The channel could have been reopened in the meantime by calling
1055 // `subscribe`, so we loop again.
1056 }
1057 }
1058
1059 /// Creates a new [`Receiver`] connected to this `Sender`.
1060 ///
1061 /// All messages sent before this call to `subscribe` are initially marked
1062 /// as seen by the new `Receiver`.
1063 ///
1064 /// This method can be called even if there are no other receivers. In this
1065 /// case, the channel is reopened.
1066 ///
1067 /// # Examples
1068 ///
1069 /// The new channel will receive messages sent on this `Sender`.
1070 ///
1071 /// ```
1072 /// use tokio::sync::watch;
1073 ///
1074 /// #[tokio::main]
1075 /// async fn main() {
1076 /// let (tx, _rx) = watch::channel(0u64);
1077 ///
1078 /// tx.send(5).unwrap();
1079 ///
1080 /// let rx = tx.subscribe();
1081 /// assert_eq!(5, *rx.borrow());
1082 ///
1083 /// tx.send(10).unwrap();
1084 /// assert_eq!(10, *rx.borrow());
1085 /// }
1086 /// ```
1087 ///
1088 /// The most recent message is considered seen by the channel, so this test
1089 /// is guaranteed to pass.
1090 ///
1091 /// ```
1092 /// use tokio::sync::watch;
1093 /// use tokio::time::Duration;
1094 ///
1095 /// #[tokio::main]
1096 /// async fn main() {
1097 /// let (tx, _rx) = watch::channel(0u64);
1098 /// tx.send(5).unwrap();
1099 /// let mut rx = tx.subscribe();
1100 ///
1101 /// tokio::spawn(async move {
1102 /// // by spawning and sleeping, the message is sent after `main`
1103 /// // hits the call to `changed`.
1104 /// # if false {
1105 /// tokio::time::sleep(Duration::from_millis(10)).await;
1106 /// # }
1107 /// tx.send(100).unwrap();
1108 /// });
1109 ///
1110 /// rx.changed().await.unwrap();
1111 /// assert_eq!(100, *rx.borrow());
1112 /// }
1113 /// ```
subscribe(&self) -> Receiver<T>1114 pub fn subscribe(&self) -> Receiver<T> {
1115 let shared = self.shared.clone();
1116 let version = shared.state.load().version();
1117
1118 // The CLOSED bit in the state tracks only whether the sender is
1119 // dropped, so we do not need to unset it if this reopens the channel.
1120 Receiver::from_shared(version, shared)
1121 }
1122
1123 /// Returns the number of receivers that currently exist.
1124 ///
1125 /// # Examples
1126 ///
1127 /// ```
1128 /// use tokio::sync::watch;
1129 ///
1130 /// #[tokio::main]
1131 /// async fn main() {
1132 /// let (tx, rx1) = watch::channel("hello");
1133 ///
1134 /// assert_eq!(1, tx.receiver_count());
1135 ///
1136 /// let mut _rx2 = rx1.clone();
1137 ///
1138 /// assert_eq!(2, tx.receiver_count());
1139 /// }
1140 /// ```
receiver_count(&self) -> usize1141 pub fn receiver_count(&self) -> usize {
1142 self.shared.ref_count_rx.load(Relaxed)
1143 }
1144 }
1145
1146 impl<T> Drop for Sender<T> {
drop(&mut self)1147 fn drop(&mut self) {
1148 self.shared.state.set_closed();
1149 self.shared.notify_rx.notify_waiters();
1150 }
1151 }
1152
1153 // ===== impl Ref =====
1154
1155 impl<T> ops::Deref for Ref<'_, T> {
1156 type Target = T;
1157
deref(&self) -> &T1158 fn deref(&self) -> &T {
1159 self.inner.deref()
1160 }
1161 }
1162
1163 #[cfg(all(test, loom))]
1164 mod tests {
1165 use futures::future::FutureExt;
1166 use loom::thread;
1167
1168 // test for https://github.com/tokio-rs/tokio/issues/3168
1169 #[test]
watch_spurious_wakeup()1170 fn watch_spurious_wakeup() {
1171 loom::model(|| {
1172 let (send, mut recv) = crate::sync::watch::channel(0i32);
1173
1174 send.send(1).unwrap();
1175
1176 let send_thread = thread::spawn(move || {
1177 send.send(2).unwrap();
1178 send
1179 });
1180
1181 recv.changed().now_or_never();
1182
1183 let send = send_thread.join().unwrap();
1184 let recv_thread = thread::spawn(move || {
1185 recv.changed().now_or_never();
1186 recv.changed().now_or_never();
1187 recv
1188 });
1189
1190 send.send(3).unwrap();
1191
1192 let mut recv = recv_thread.join().unwrap();
1193 let send_thread = thread::spawn(move || {
1194 send.send(2).unwrap();
1195 });
1196
1197 recv.changed().now_or_never();
1198
1199 send_thread.join().unwrap();
1200 });
1201 }
1202
1203 #[test]
watch_borrow()1204 fn watch_borrow() {
1205 loom::model(|| {
1206 let (send, mut recv) = crate::sync::watch::channel(0i32);
1207
1208 assert!(send.borrow().eq(&0));
1209 assert!(recv.borrow().eq(&0));
1210
1211 send.send(1).unwrap();
1212 assert!(send.borrow().eq(&1));
1213
1214 let send_thread = thread::spawn(move || {
1215 send.send(2).unwrap();
1216 send
1217 });
1218
1219 recv.changed().now_or_never();
1220
1221 let send = send_thread.join().unwrap();
1222 let recv_thread = thread::spawn(move || {
1223 recv.changed().now_or_never();
1224 recv.changed().now_or_never();
1225 recv
1226 });
1227
1228 send.send(3).unwrap();
1229
1230 let recv = recv_thread.join().unwrap();
1231 assert!(recv.borrow().eq(&3));
1232 assert!(send.borrow().eq(&3));
1233
1234 send.send(2).unwrap();
1235
1236 thread::spawn(move || {
1237 assert!(recv.borrow().eq(&2));
1238 });
1239 assert!(send.borrow().eq(&2));
1240 });
1241 }
1242 }
1243