1 #![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2
3 //! A single-producer, multi-consumer channel that only retains the *last* sent
4 //! value.
5 //!
6 //! This channel is useful for watching for changes to a value from multiple
7 //! points in the code base, for example, changes to configuration values.
8 //!
9 //! # Usage
10 //!
11 //! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer
12 //! and sender halves of the channel. The channel is created with an initial
13 //! value. The **latest** value stored in the channel is accessed with
14 //! [`Receiver::borrow()`]. Awaiting [`Receiver::changed()`] waits for a new
15 //! value to sent by the [`Sender`] half.
16 //!
17 //! # Examples
18 //!
19 //! ```
20 //! use tokio::sync::watch;
21 //!
22 //! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
23 //! let (tx, mut rx) = watch::channel("hello");
24 //!
25 //! tokio::spawn(async move {
26 //! while rx.changed().await.is_ok() {
27 //! println!("received = {:?}", *rx.borrow());
28 //! }
29 //! });
30 //!
31 //! tx.send("world")?;
32 //! # Ok(())
33 //! # }
34 //! ```
35 //!
36 //! # Closing
37 //!
38 //! [`Sender::is_closed`] and [`Sender::closed`] allow the producer to detect
39 //! when all [`Receiver`] handles have been dropped. This indicates that there
40 //! is no further interest in the values being produced and work can be stopped.
41 //!
42 //! # Thread safety
43 //!
44 //! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
45 //! threads and can be used in a concurrent environment. Clones of [`Receiver`]
46 //! handles may be moved to separate threads and also used concurrently.
47 //!
48 //! [`Sender`]: crate::sync::watch::Sender
49 //! [`Receiver`]: crate::sync::watch::Receiver
50 //! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
51 //! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow
52 //! [`channel`]: crate::sync::watch::channel
53 //! [`Sender::is_closed`]: crate::sync::watch::Sender::is_closed
54 //! [`Sender::closed`]: crate::sync::watch::Sender::closed
55
56 use crate::sync::notify::Notify;
57
58 use crate::loom::sync::atomic::AtomicUsize;
59 use crate::loom::sync::atomic::Ordering::Relaxed;
60 use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
61 use std::mem;
62 use std::ops;
63
64 /// Receives values from the associated [`Sender`](struct@Sender).
65 ///
66 /// Instances are created by the [`channel`](fn@channel) function.
67 ///
68 /// To turn this receiver into a `Stream`, you can use the [`WatchStream`]
69 /// wrapper.
70 ///
71 /// [`WatchStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.WatchStream.html
72 #[derive(Debug)]
73 pub struct Receiver<T> {
74 /// Pointer to the shared state
75 shared: Arc<Shared<T>>,
76
77 /// Last observed version
78 version: Version,
79 }
80
81 /// Sends values to the associated [`Receiver`](struct@Receiver).
82 ///
83 /// Instances are created by the [`channel`](fn@channel) function.
84 #[derive(Debug)]
85 pub struct Sender<T> {
86 shared: Arc<Shared<T>>,
87 }
88
89 /// Returns a reference to the inner value.
90 ///
91 /// Outstanding borrows hold a read lock on the inner value. This means that
92 /// long lived borrows could cause the produce half to block. It is recommended
93 /// to keep the borrow as short lived as possible.
94 #[derive(Debug)]
95 pub struct Ref<'a, T> {
96 inner: RwLockReadGuard<'a, T>,
97 }
98
99 #[derive(Debug)]
100 struct Shared<T> {
101 /// The most recent value.
102 value: RwLock<T>,
103
104 /// The current version.
105 ///
106 /// The lowest bit represents a "closed" state. The rest of the bits
107 /// represent the current version.
108 state: AtomicState,
109
110 /// Tracks the number of `Receiver` instances.
111 ref_count_rx: AtomicUsize,
112
113 /// Notifies waiting receivers that the value changed.
114 notify_rx: Notify,
115
116 /// Notifies any task listening for `Receiver` dropped events.
117 notify_tx: Notify,
118 }
119
120 pub mod error {
121 //! Watch error types.
122
123 use std::fmt;
124
125 /// Error produced when sending a value fails.
126 #[derive(Debug)]
127 pub struct SendError<T>(pub T);
128
129 // ===== impl SendError =====
130
131 impl<T: fmt::Debug> fmt::Display for SendError<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result132 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
133 write!(fmt, "channel closed")
134 }
135 }
136
137 impl<T: fmt::Debug> std::error::Error for SendError<T> {}
138
139 /// Error produced when receiving a change notification.
140 #[derive(Debug)]
141 pub struct RecvError(pub(super) ());
142
143 // ===== impl RecvError =====
144
145 impl fmt::Display for RecvError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result146 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
147 write!(fmt, "channel closed")
148 }
149 }
150
151 impl std::error::Error for RecvError {}
152 }
153
154 use self::state::{AtomicState, Version};
155 mod state {
156 use crate::loom::sync::atomic::AtomicUsize;
157 use crate::loom::sync::atomic::Ordering::SeqCst;
158
159 const CLOSED: usize = 1;
160
161 /// The version part of the state. The lowest bit is always zero.
162 #[derive(Copy, Clone, Debug, Eq, PartialEq)]
163 pub(super) struct Version(usize);
164
165 /// Snapshot of the state. The first bit is used as the CLOSED bit.
166 /// The remaining bits are used as the version.
167 ///
168 /// The CLOSED bit tracks whether the Sender has been dropped. Dropping all
169 /// receivers does not set it.
170 #[derive(Copy, Clone, Debug)]
171 pub(super) struct StateSnapshot(usize);
172
173 /// The state stored in an atomic integer.
174 #[derive(Debug)]
175 pub(super) struct AtomicState(AtomicUsize);
176
177 impl Version {
178 /// Get the initial version when creating the channel.
initial() -> Self179 pub(super) fn initial() -> Self {
180 Version(0)
181 }
182 }
183
184 impl StateSnapshot {
185 /// Extract the version from the state.
version(self) -> Version186 pub(super) fn version(self) -> Version {
187 Version(self.0 & !CLOSED)
188 }
189
190 /// Is the closed bit set?
is_closed(self) -> bool191 pub(super) fn is_closed(self) -> bool {
192 (self.0 & CLOSED) == CLOSED
193 }
194 }
195
196 impl AtomicState {
197 /// Create a new `AtomicState` that is not closed and which has the
198 /// version set to `Version::initial()`.
new() -> Self199 pub(super) fn new() -> Self {
200 AtomicState(AtomicUsize::new(0))
201 }
202
203 /// Load the current value of the state.
load(&self) -> StateSnapshot204 pub(super) fn load(&self) -> StateSnapshot {
205 StateSnapshot(self.0.load(SeqCst))
206 }
207
208 /// Increment the version counter.
increment_version(&self)209 pub(super) fn increment_version(&self) {
210 // Increment by two to avoid touching the CLOSED bit.
211 self.0.fetch_add(2, SeqCst);
212 }
213
214 /// Set the closed bit in the state.
set_closed(&self)215 pub(super) fn set_closed(&self) {
216 self.0.fetch_or(CLOSED, SeqCst);
217 }
218 }
219 }
220
221 /// Creates a new watch channel, returning the "send" and "receive" handles.
222 ///
223 /// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
224 /// Only the last value sent is made available to the [`Receiver`] half. All
225 /// intermediate values are dropped.
226 ///
227 /// # Examples
228 ///
229 /// ```
230 /// use tokio::sync::watch;
231 ///
232 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
233 /// let (tx, mut rx) = watch::channel("hello");
234 ///
235 /// tokio::spawn(async move {
236 /// while rx.changed().await.is_ok() {
237 /// println!("received = {:?}", *rx.borrow());
238 /// }
239 /// });
240 ///
241 /// tx.send("world")?;
242 /// # Ok(())
243 /// # }
244 /// ```
245 ///
246 /// [`Sender`]: struct@Sender
247 /// [`Receiver`]: struct@Receiver
channel<T>(init: T) -> (Sender<T>, Receiver<T>)248 pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
249 let shared = Arc::new(Shared {
250 value: RwLock::new(init),
251 state: AtomicState::new(),
252 ref_count_rx: AtomicUsize::new(1),
253 notify_rx: Notify::new(),
254 notify_tx: Notify::new(),
255 });
256
257 let tx = Sender {
258 shared: shared.clone(),
259 };
260
261 let rx = Receiver {
262 shared,
263 version: Version::initial(),
264 };
265
266 (tx, rx)
267 }
268
269 impl<T> Receiver<T> {
from_shared(version: Version, shared: Arc<Shared<T>>) -> Self270 fn from_shared(version: Version, shared: Arc<Shared<T>>) -> Self {
271 // No synchronization necessary as this is only used as a counter and
272 // not memory access.
273 shared.ref_count_rx.fetch_add(1, Relaxed);
274
275 Self { shared, version }
276 }
277
278 /// Returns a reference to the most recently sent value.
279 ///
280 /// This method does not mark the returned value as seen, so future calls to
281 /// [`changed`] may return immediately even if you have already seen the
282 /// value with a call to `borrow`.
283 ///
284 /// Outstanding borrows hold a read lock. This means that long lived borrows
285 /// could cause the send half to block. It is recommended to keep the borrow
286 /// as short lived as possible.
287 ///
288 /// [`changed`]: Receiver::changed
289 ///
290 /// # Examples
291 ///
292 /// ```
293 /// use tokio::sync::watch;
294 ///
295 /// let (_, rx) = watch::channel("hello");
296 /// assert_eq!(*rx.borrow(), "hello");
297 /// ```
borrow(&self) -> Ref<'_, T>298 pub fn borrow(&self) -> Ref<'_, T> {
299 let inner = self.shared.value.read().unwrap();
300 Ref { inner }
301 }
302
303 /// Returns a reference to the most recently sent value and mark that value
304 /// as seen.
305 ///
306 /// This method marks the value as seen, so [`changed`] will not return
307 /// immediately if the newest value is one previously returned by
308 /// `borrow_and_update`.
309 ///
310 /// Outstanding borrows hold a read lock. This means that long lived borrows
311 /// could cause the send half to block. It is recommended to keep the borrow
312 /// as short lived as possible.
313 ///
314 /// [`changed`]: Receiver::changed
borrow_and_update(&mut self) -> Ref<'_, T>315 pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
316 let inner = self.shared.value.read().unwrap();
317 self.version = self.shared.state.load().version();
318 Ref { inner }
319 }
320
321 /// Waits for a change notification, then marks the newest value as seen.
322 ///
323 /// If the newest value in the channel has not yet been marked seen when
324 /// this method is called, the method marks that value seen and returns
325 /// immediately. If the newest value has already been marked seen, then the
326 /// method sleeps until a new message is sent by the [`Sender`] connected to
327 /// this `Receiver`, or until the [`Sender`] is dropped.
328 ///
329 /// This method returns an error if and only if the [`Sender`] is dropped.
330 ///
331 /// # Cancel safety
332 ///
333 /// This method is cancel safe. If you use it as the event in a
334 /// [`tokio::select!`](crate::select) statement and some other branch
335 /// completes first, then it is guaranteed that no values have been marked
336 /// seen by this call to `changed`.
337 ///
338 /// [`Sender`]: struct@Sender
339 ///
340 /// # Examples
341 ///
342 /// ```
343 /// use tokio::sync::watch;
344 ///
345 /// #[tokio::main]
346 /// async fn main() {
347 /// let (tx, mut rx) = watch::channel("hello");
348 ///
349 /// tokio::spawn(async move {
350 /// tx.send("goodbye").unwrap();
351 /// });
352 ///
353 /// assert!(rx.changed().await.is_ok());
354 /// assert_eq!(*rx.borrow(), "goodbye");
355 ///
356 /// // The `tx` handle has been dropped
357 /// assert!(rx.changed().await.is_err());
358 /// }
359 /// ```
changed(&mut self) -> Result<(), error::RecvError>360 pub async fn changed(&mut self) -> Result<(), error::RecvError> {
361 loop {
362 // In order to avoid a race condition, we first request a notification,
363 // **then** check the current value's version. If a new version exists,
364 // the notification request is dropped.
365 let notified = self.shared.notify_rx.notified();
366
367 if let Some(ret) = maybe_changed(&self.shared, &mut self.version) {
368 return ret;
369 }
370
371 notified.await;
372 // loop around again in case the wake-up was spurious
373 }
374 }
375
376 cfg_process_driver! {
377 pub(crate) fn try_has_changed(&mut self) -> Option<Result<(), error::RecvError>> {
378 maybe_changed(&self.shared, &mut self.version)
379 }
380 }
381 }
382
maybe_changed<T>( shared: &Shared<T>, version: &mut Version, ) -> Option<Result<(), error::RecvError>>383 fn maybe_changed<T>(
384 shared: &Shared<T>,
385 version: &mut Version,
386 ) -> Option<Result<(), error::RecvError>> {
387 // Load the version from the state
388 let state = shared.state.load();
389 let new_version = state.version();
390
391 if *version != new_version {
392 // Observe the new version and return
393 *version = new_version;
394 return Some(Ok(()));
395 }
396
397 if state.is_closed() {
398 // All receivers have dropped.
399 return Some(Err(error::RecvError(())));
400 }
401
402 None
403 }
404
405 impl<T> Clone for Receiver<T> {
clone(&self) -> Self406 fn clone(&self) -> Self {
407 let version = self.version;
408 let shared = self.shared.clone();
409
410 Self::from_shared(version, shared)
411 }
412 }
413
414 impl<T> Drop for Receiver<T> {
drop(&mut self)415 fn drop(&mut self) {
416 // No synchronization necessary as this is only used as a counter and
417 // not memory access.
418 if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) {
419 // This is the last `Receiver` handle, tasks waiting on `Sender::closed()`
420 self.shared.notify_tx.notify_waiters();
421 }
422 }
423 }
424
425 impl<T> Sender<T> {
426 /// Sends a new value via the channel, notifying all receivers.
427 ///
428 /// This method fails if the channel has been closed, which happens when
429 /// every receiver has been dropped.
send(&self, value: T) -> Result<(), error::SendError<T>>430 pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
431 // This is pretty much only useful as a hint anyway, so synchronization isn't critical.
432 if 0 == self.receiver_count() {
433 return Err(error::SendError(value));
434 }
435
436 self.send_replace(value);
437 Ok(())
438 }
439
440 /// Sends a new value via the channel, notifying all receivers and returning
441 /// the previous value in the channel.
442 ///
443 /// This can be useful for reusing the buffers inside a watched value.
444 /// Additionally, this method permits sending values even when there are no
445 /// receivers.
446 ///
447 /// # Examples
448 ///
449 /// ```
450 /// use tokio::sync::watch;
451 ///
452 /// let (tx, _rx) = watch::channel(1);
453 /// assert_eq!(tx.send_replace(2), 1);
454 /// assert_eq!(tx.send_replace(3), 2);
455 /// ```
send_replace(&self, value: T) -> T456 pub fn send_replace(&self, value: T) -> T {
457 let old = {
458 // Acquire the write lock and update the value.
459 let mut lock = self.shared.value.write().unwrap();
460 let old = mem::replace(&mut *lock, value);
461
462 self.shared.state.increment_version();
463
464 // Release the write lock.
465 //
466 // Incrementing the version counter while holding the lock ensures
467 // that receivers are able to figure out the version number of the
468 // value they are currently looking at.
469 drop(lock);
470
471 old
472 };
473
474 // Notify all watchers
475 self.shared.notify_rx.notify_waiters();
476
477 old
478 }
479
480 /// Returns a reference to the most recently sent value
481 ///
482 /// Outstanding borrows hold a read lock. This means that long lived borrows
483 /// could cause the send half to block. It is recommended to keep the borrow
484 /// as short lived as possible.
485 ///
486 /// # Examples
487 ///
488 /// ```
489 /// use tokio::sync::watch;
490 ///
491 /// let (tx, _) = watch::channel("hello");
492 /// assert_eq!(*tx.borrow(), "hello");
493 /// ```
borrow(&self) -> Ref<'_, T>494 pub fn borrow(&self) -> Ref<'_, T> {
495 let inner = self.shared.value.read().unwrap();
496 Ref { inner }
497 }
498
499 /// Checks if the channel has been closed. This happens when all receivers
500 /// have dropped.
501 ///
502 /// # Examples
503 ///
504 /// ```
505 /// let (tx, rx) = tokio::sync::watch::channel(());
506 /// assert!(!tx.is_closed());
507 ///
508 /// drop(rx);
509 /// assert!(tx.is_closed());
510 /// ```
is_closed(&self) -> bool511 pub fn is_closed(&self) -> bool {
512 self.receiver_count() == 0
513 }
514
515 /// Completes when all receivers have dropped.
516 ///
517 /// This allows the producer to get notified when interest in the produced
518 /// values is canceled and immediately stop doing work.
519 ///
520 /// # Cancel safety
521 ///
522 /// This method is cancel safe. Once the channel is closed, it stays closed
523 /// forever and all future calls to `closed` will return immediately.
524 ///
525 /// # Examples
526 ///
527 /// ```
528 /// use tokio::sync::watch;
529 ///
530 /// #[tokio::main]
531 /// async fn main() {
532 /// let (tx, rx) = watch::channel("hello");
533 ///
534 /// tokio::spawn(async move {
535 /// // use `rx`
536 /// drop(rx);
537 /// });
538 ///
539 /// // Waits for `rx` to drop
540 /// tx.closed().await;
541 /// println!("the `rx` handles dropped")
542 /// }
543 /// ```
closed(&self)544 pub async fn closed(&self) {
545 while self.receiver_count() > 0 {
546 let notified = self.shared.notify_tx.notified();
547
548 if self.receiver_count() == 0 {
549 return;
550 }
551
552 notified.await;
553 // The channel could have been reopened in the meantime by calling
554 // `subscribe`, so we loop again.
555 }
556 }
557
558 /// Creates a new [`Receiver`] connected to this `Sender`.
559 ///
560 /// All messages sent before this call to `subscribe` are initially marked
561 /// as seen by the new `Receiver`.
562 ///
563 /// This method can be called even if there are no other receivers. In this
564 /// case, the channel is reopened.
565 ///
566 /// # Examples
567 ///
568 /// The new channel will receive messages sent on this `Sender`.
569 ///
570 /// ```
571 /// use tokio::sync::watch;
572 ///
573 /// #[tokio::main]
574 /// async fn main() {
575 /// let (tx, _rx) = watch::channel(0u64);
576 ///
577 /// tx.send(5).unwrap();
578 ///
579 /// let rx = tx.subscribe();
580 /// assert_eq!(5, *rx.borrow());
581 ///
582 /// tx.send(10).unwrap();
583 /// assert_eq!(10, *rx.borrow());
584 /// }
585 /// ```
586 ///
587 /// The most recent message is considered seen by the channel, so this test
588 /// is guaranteed to pass.
589 ///
590 /// ```
591 /// use tokio::sync::watch;
592 /// use tokio::time::Duration;
593 ///
594 /// #[tokio::main]
595 /// async fn main() {
596 /// let (tx, _rx) = watch::channel(0u64);
597 /// tx.send(5).unwrap();
598 /// let mut rx = tx.subscribe();
599 ///
600 /// tokio::spawn(async move {
601 /// // by spawning and sleeping, the message is sent after `main`
602 /// // hits the call to `changed`.
603 /// # if false {
604 /// tokio::time::sleep(Duration::from_millis(10)).await;
605 /// # }
606 /// tx.send(100).unwrap();
607 /// });
608 ///
609 /// rx.changed().await.unwrap();
610 /// assert_eq!(100, *rx.borrow());
611 /// }
612 /// ```
subscribe(&self) -> Receiver<T>613 pub fn subscribe(&self) -> Receiver<T> {
614 let shared = self.shared.clone();
615 let version = shared.state.load().version();
616
617 // The CLOSED bit in the state tracks only whether the sender is
618 // dropped, so we do not need to unset it if this reopens the channel.
619 Receiver::from_shared(version, shared)
620 }
621
622 /// Returns the number of receivers that currently exist.
623 ///
624 /// # Examples
625 ///
626 /// ```
627 /// use tokio::sync::watch;
628 ///
629 /// #[tokio::main]
630 /// async fn main() {
631 /// let (tx, rx1) = watch::channel("hello");
632 ///
633 /// assert_eq!(1, tx.receiver_count());
634 ///
635 /// let mut _rx2 = rx1.clone();
636 ///
637 /// assert_eq!(2, tx.receiver_count());
638 /// }
639 /// ```
receiver_count(&self) -> usize640 pub fn receiver_count(&self) -> usize {
641 self.shared.ref_count_rx.load(Relaxed)
642 }
643 }
644
645 impl<T> Drop for Sender<T> {
drop(&mut self)646 fn drop(&mut self) {
647 self.shared.state.set_closed();
648 self.shared.notify_rx.notify_waiters();
649 }
650 }
651
652 // ===== impl Ref =====
653
654 impl<T> ops::Deref for Ref<'_, T> {
655 type Target = T;
656
deref(&self) -> &T657 fn deref(&self) -> &T {
658 self.inner.deref()
659 }
660 }
661
662 #[cfg(all(test, loom))]
663 mod tests {
664 use futures::future::FutureExt;
665 use loom::thread;
666
667 // test for https://github.com/tokio-rs/tokio/issues/3168
668 #[test]
watch_spurious_wakeup()669 fn watch_spurious_wakeup() {
670 loom::model(|| {
671 let (send, mut recv) = crate::sync::watch::channel(0i32);
672
673 send.send(1).unwrap();
674
675 let send_thread = thread::spawn(move || {
676 send.send(2).unwrap();
677 send
678 });
679
680 recv.changed().now_or_never();
681
682 let send = send_thread.join().unwrap();
683 let recv_thread = thread::spawn(move || {
684 recv.changed().now_or_never();
685 recv.changed().now_or_never();
686 recv
687 });
688
689 send.send(3).unwrap();
690
691 let mut recv = recv_thread.join().unwrap();
692 let send_thread = thread::spawn(move || {
693 send.send(2).unwrap();
694 });
695
696 recv.changed().now_or_never();
697
698 send_thread.join().unwrap();
699 });
700 }
701
702 #[test]
watch_borrow()703 fn watch_borrow() {
704 loom::model(|| {
705 let (send, mut recv) = crate::sync::watch::channel(0i32);
706
707 assert!(send.borrow().eq(&0));
708 assert!(recv.borrow().eq(&0));
709
710 send.send(1).unwrap();
711 assert!(send.borrow().eq(&1));
712
713 let send_thread = thread::spawn(move || {
714 send.send(2).unwrap();
715 send
716 });
717
718 recv.changed().now_or_never();
719
720 let send = send_thread.join().unwrap();
721 let recv_thread = thread::spawn(move || {
722 recv.changed().now_or_never();
723 recv.changed().now_or_never();
724 recv
725 });
726
727 send.send(3).unwrap();
728
729 let recv = recv_thread.join().unwrap();
730 assert!(recv.borrow().eq(&3));
731 assert!(send.borrow().eq(&3));
732
733 send.send(2).unwrap();
734
735 thread::spawn(move || {
736 assert!(recv.borrow().eq(&2));
737 });
738 assert!(send.borrow().eq(&2));
739 });
740 }
741 }
742