• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2 
3 //! A single-producer, multi-consumer channel that only retains the *last* sent
4 //! value.
5 //!
6 //! This channel is useful for watching for changes to a value from multiple
7 //! points in the code base, for example, changes to configuration values.
8 //!
9 //! # Usage
10 //!
11 //! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer
12 //! and sender halves of the channel. The channel is created with an initial
13 //! value. The **latest** value stored in the channel is accessed with
14 //! [`Receiver::borrow()`]. Awaiting [`Receiver::changed()`] waits for a new
15 //! value to sent by the [`Sender`] half.
16 //!
17 //! # Examples
18 //!
19 //! ```
20 //! use tokio::sync::watch;
21 //!
22 //! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
23 //!     let (tx, mut rx) = watch::channel("hello");
24 //!
25 //!     tokio::spawn(async move {
26 //!         while rx.changed().await.is_ok() {
27 //!             println!("received = {:?}", *rx.borrow());
28 //!         }
29 //!     });
30 //!
31 //!     tx.send("world")?;
32 //! # Ok(())
33 //! # }
34 //! ```
35 //!
36 //! # Closing
37 //!
38 //! [`Sender::is_closed`] and [`Sender::closed`] allow the producer to detect
39 //! when all [`Receiver`] handles have been dropped. This indicates that there
40 //! is no further interest in the values being produced and work can be stopped.
41 //!
42 //! # Thread safety
43 //!
44 //! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
45 //! threads and can be used in a concurrent environment. Clones of [`Receiver`]
46 //! handles may be moved to separate threads and also used concurrently.
47 //!
48 //! [`Sender`]: crate::sync::watch::Sender
49 //! [`Receiver`]: crate::sync::watch::Receiver
50 //! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
51 //! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow
52 //! [`channel`]: crate::sync::watch::channel
53 //! [`Sender::is_closed`]: crate::sync::watch::Sender::is_closed
54 //! [`Sender::closed`]: crate::sync::watch::Sender::closed
55 
56 use crate::sync::notify::Notify;
57 
58 use crate::loom::sync::atomic::AtomicUsize;
59 use crate::loom::sync::atomic::Ordering::Relaxed;
60 use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
61 use std::mem;
62 use std::ops;
63 
64 /// Receives values from the associated [`Sender`](struct@Sender).
65 ///
66 /// Instances are created by the [`channel`](fn@channel) function.
67 ///
68 /// To turn this receiver into a `Stream`, you can use the [`WatchStream`]
69 /// wrapper.
70 ///
71 /// [`WatchStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.WatchStream.html
72 #[derive(Debug)]
73 pub struct Receiver<T> {
74     /// Pointer to the shared state
75     shared: Arc<Shared<T>>,
76 
77     /// Last observed version
78     version: Version,
79 }
80 
81 /// Sends values to the associated [`Receiver`](struct@Receiver).
82 ///
83 /// Instances are created by the [`channel`](fn@channel) function.
84 #[derive(Debug)]
85 pub struct Sender<T> {
86     shared: Arc<Shared<T>>,
87 }
88 
89 /// Returns a reference to the inner value.
90 ///
91 /// Outstanding borrows hold a read lock on the inner value. This means that
92 /// long lived borrows could cause the produce half to block. It is recommended
93 /// to keep the borrow as short lived as possible.
94 #[derive(Debug)]
95 pub struct Ref<'a, T> {
96     inner: RwLockReadGuard<'a, T>,
97 }
98 
99 #[derive(Debug)]
100 struct Shared<T> {
101     /// The most recent value.
102     value: RwLock<T>,
103 
104     /// The current version.
105     ///
106     /// The lowest bit represents a "closed" state. The rest of the bits
107     /// represent the current version.
108     state: AtomicState,
109 
110     /// Tracks the number of `Receiver` instances.
111     ref_count_rx: AtomicUsize,
112 
113     /// Notifies waiting receivers that the value changed.
114     notify_rx: Notify,
115 
116     /// Notifies any task listening for `Receiver` dropped events.
117     notify_tx: Notify,
118 }
119 
120 pub mod error {
121     //! Watch error types.
122 
123     use std::fmt;
124 
125     /// Error produced when sending a value fails.
126     #[derive(Debug)]
127     pub struct SendError<T>(pub T);
128 
129     // ===== impl SendError =====
130 
131     impl<T: fmt::Debug> fmt::Display for SendError<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result132         fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
133             write!(fmt, "channel closed")
134         }
135     }
136 
137     impl<T: fmt::Debug> std::error::Error for SendError<T> {}
138 
139     /// Error produced when receiving a change notification.
140     #[derive(Debug)]
141     pub struct RecvError(pub(super) ());
142 
143     // ===== impl RecvError =====
144 
145     impl fmt::Display for RecvError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result146         fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
147             write!(fmt, "channel closed")
148         }
149     }
150 
151     impl std::error::Error for RecvError {}
152 }
153 
154 use self::state::{AtomicState, Version};
155 mod state {
156     use crate::loom::sync::atomic::AtomicUsize;
157     use crate::loom::sync::atomic::Ordering::SeqCst;
158 
159     const CLOSED: usize = 1;
160 
161     /// The version part of the state. The lowest bit is always zero.
162     #[derive(Copy, Clone, Debug, Eq, PartialEq)]
163     pub(super) struct Version(usize);
164 
165     /// Snapshot of the state. The first bit is used as the CLOSED bit.
166     /// The remaining bits are used as the version.
167     ///
168     /// The CLOSED bit tracks whether the Sender has been dropped. Dropping all
169     /// receivers does not set it.
170     #[derive(Copy, Clone, Debug)]
171     pub(super) struct StateSnapshot(usize);
172 
173     /// The state stored in an atomic integer.
174     #[derive(Debug)]
175     pub(super) struct AtomicState(AtomicUsize);
176 
177     impl Version {
178         /// Get the initial version when creating the channel.
initial() -> Self179         pub(super) fn initial() -> Self {
180             Version(0)
181         }
182     }
183 
184     impl StateSnapshot {
185         /// Extract the version from the state.
version(self) -> Version186         pub(super) fn version(self) -> Version {
187             Version(self.0 & !CLOSED)
188         }
189 
190         /// Is the closed bit set?
is_closed(self) -> bool191         pub(super) fn is_closed(self) -> bool {
192             (self.0 & CLOSED) == CLOSED
193         }
194     }
195 
196     impl AtomicState {
197         /// Create a new `AtomicState` that is not closed and which has the
198         /// version set to `Version::initial()`.
new() -> Self199         pub(super) fn new() -> Self {
200             AtomicState(AtomicUsize::new(0))
201         }
202 
203         /// Load the current value of the state.
load(&self) -> StateSnapshot204         pub(super) fn load(&self) -> StateSnapshot {
205             StateSnapshot(self.0.load(SeqCst))
206         }
207 
208         /// Increment the version counter.
increment_version(&self)209         pub(super) fn increment_version(&self) {
210             // Increment by two to avoid touching the CLOSED bit.
211             self.0.fetch_add(2, SeqCst);
212         }
213 
214         /// Set the closed bit in the state.
set_closed(&self)215         pub(super) fn set_closed(&self) {
216             self.0.fetch_or(CLOSED, SeqCst);
217         }
218     }
219 }
220 
221 /// Creates a new watch channel, returning the "send" and "receive" handles.
222 ///
223 /// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
224 /// Only the last value sent is made available to the [`Receiver`] half. All
225 /// intermediate values are dropped.
226 ///
227 /// # Examples
228 ///
229 /// ```
230 /// use tokio::sync::watch;
231 ///
232 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
233 ///     let (tx, mut rx) = watch::channel("hello");
234 ///
235 ///     tokio::spawn(async move {
236 ///         while rx.changed().await.is_ok() {
237 ///             println!("received = {:?}", *rx.borrow());
238 ///         }
239 ///     });
240 ///
241 ///     tx.send("world")?;
242 /// # Ok(())
243 /// # }
244 /// ```
245 ///
246 /// [`Sender`]: struct@Sender
247 /// [`Receiver`]: struct@Receiver
channel<T>(init: T) -> (Sender<T>, Receiver<T>)248 pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
249     let shared = Arc::new(Shared {
250         value: RwLock::new(init),
251         state: AtomicState::new(),
252         ref_count_rx: AtomicUsize::new(1),
253         notify_rx: Notify::new(),
254         notify_tx: Notify::new(),
255     });
256 
257     let tx = Sender {
258         shared: shared.clone(),
259     };
260 
261     let rx = Receiver {
262         shared,
263         version: Version::initial(),
264     };
265 
266     (tx, rx)
267 }
268 
269 impl<T> Receiver<T> {
from_shared(version: Version, shared: Arc<Shared<T>>) -> Self270     fn from_shared(version: Version, shared: Arc<Shared<T>>) -> Self {
271         // No synchronization necessary as this is only used as a counter and
272         // not memory access.
273         shared.ref_count_rx.fetch_add(1, Relaxed);
274 
275         Self { shared, version }
276     }
277 
278     /// Returns a reference to the most recently sent value.
279     ///
280     /// This method does not mark the returned value as seen, so future calls to
281     /// [`changed`] may return immediately even if you have already seen the
282     /// value with a call to `borrow`.
283     ///
284     /// Outstanding borrows hold a read lock. This means that long lived borrows
285     /// could cause the send half to block. It is recommended to keep the borrow
286     /// as short lived as possible.
287     ///
288     /// [`changed`]: Receiver::changed
289     ///
290     /// # Examples
291     ///
292     /// ```
293     /// use tokio::sync::watch;
294     ///
295     /// let (_, rx) = watch::channel("hello");
296     /// assert_eq!(*rx.borrow(), "hello");
297     /// ```
borrow(&self) -> Ref<'_, T>298     pub fn borrow(&self) -> Ref<'_, T> {
299         let inner = self.shared.value.read().unwrap();
300         Ref { inner }
301     }
302 
303     /// Returns a reference to the most recently sent value and mark that value
304     /// as seen.
305     ///
306     /// This method marks the value as seen, so [`changed`] will not return
307     /// immediately if the newest value is one previously returned by
308     /// `borrow_and_update`.
309     ///
310     /// Outstanding borrows hold a read lock. This means that long lived borrows
311     /// could cause the send half to block. It is recommended to keep the borrow
312     /// as short lived as possible.
313     ///
314     /// [`changed`]: Receiver::changed
borrow_and_update(&mut self) -> Ref<'_, T>315     pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
316         let inner = self.shared.value.read().unwrap();
317         self.version = self.shared.state.load().version();
318         Ref { inner }
319     }
320 
321     /// Waits for a change notification, then marks the newest value as seen.
322     ///
323     /// If the newest value in the channel has not yet been marked seen when
324     /// this method is called, the method marks that value seen and returns
325     /// immediately. If the newest value has already been marked seen, then the
326     /// method sleeps until a new message is sent by the [`Sender`] connected to
327     /// this `Receiver`, or until the [`Sender`] is dropped.
328     ///
329     /// This method returns an error if and only if the [`Sender`] is dropped.
330     ///
331     /// # Cancel safety
332     ///
333     /// This method is cancel safe. If you use it as the event in a
334     /// [`tokio::select!`](crate::select) statement and some other branch
335     /// completes first, then it is guaranteed that no values have been marked
336     /// seen by this call to `changed`.
337     ///
338     /// [`Sender`]: struct@Sender
339     ///
340     /// # Examples
341     ///
342     /// ```
343     /// use tokio::sync::watch;
344     ///
345     /// #[tokio::main]
346     /// async fn main() {
347     ///     let (tx, mut rx) = watch::channel("hello");
348     ///
349     ///     tokio::spawn(async move {
350     ///         tx.send("goodbye").unwrap();
351     ///     });
352     ///
353     ///     assert!(rx.changed().await.is_ok());
354     ///     assert_eq!(*rx.borrow(), "goodbye");
355     ///
356     ///     // The `tx` handle has been dropped
357     ///     assert!(rx.changed().await.is_err());
358     /// }
359     /// ```
changed(&mut self) -> Result<(), error::RecvError>360     pub async fn changed(&mut self) -> Result<(), error::RecvError> {
361         loop {
362             // In order to avoid a race condition, we first request a notification,
363             // **then** check the current value's version. If a new version exists,
364             // the notification request is dropped.
365             let notified = self.shared.notify_rx.notified();
366 
367             if let Some(ret) = maybe_changed(&self.shared, &mut self.version) {
368                 return ret;
369             }
370 
371             notified.await;
372             // loop around again in case the wake-up was spurious
373         }
374     }
375 
376     cfg_process_driver! {
377         pub(crate) fn try_has_changed(&mut self) -> Option<Result<(), error::RecvError>> {
378             maybe_changed(&self.shared, &mut self.version)
379         }
380     }
381 }
382 
maybe_changed<T>( shared: &Shared<T>, version: &mut Version, ) -> Option<Result<(), error::RecvError>>383 fn maybe_changed<T>(
384     shared: &Shared<T>,
385     version: &mut Version,
386 ) -> Option<Result<(), error::RecvError>> {
387     // Load the version from the state
388     let state = shared.state.load();
389     let new_version = state.version();
390 
391     if *version != new_version {
392         // Observe the new version and return
393         *version = new_version;
394         return Some(Ok(()));
395     }
396 
397     if state.is_closed() {
398         // All receivers have dropped.
399         return Some(Err(error::RecvError(())));
400     }
401 
402     None
403 }
404 
405 impl<T> Clone for Receiver<T> {
clone(&self) -> Self406     fn clone(&self) -> Self {
407         let version = self.version;
408         let shared = self.shared.clone();
409 
410         Self::from_shared(version, shared)
411     }
412 }
413 
414 impl<T> Drop for Receiver<T> {
drop(&mut self)415     fn drop(&mut self) {
416         // No synchronization necessary as this is only used as a counter and
417         // not memory access.
418         if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) {
419             // This is the last `Receiver` handle, tasks waiting on `Sender::closed()`
420             self.shared.notify_tx.notify_waiters();
421         }
422     }
423 }
424 
425 impl<T> Sender<T> {
426     /// Sends a new value via the channel, notifying all receivers.
427     ///
428     /// This method fails if the channel has been closed, which happens when
429     /// every receiver has been dropped.
send(&self, value: T) -> Result<(), error::SendError<T>>430     pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
431         // This is pretty much only useful as a hint anyway, so synchronization isn't critical.
432         if 0 == self.receiver_count() {
433             return Err(error::SendError(value));
434         }
435 
436         self.send_replace(value);
437         Ok(())
438     }
439 
440     /// Sends a new value via the channel, notifying all receivers and returning
441     /// the previous value in the channel.
442     ///
443     /// This can be useful for reusing the buffers inside a watched value.
444     /// Additionally, this method permits sending values even when there are no
445     /// receivers.
446     ///
447     /// # Examples
448     ///
449     /// ```
450     /// use tokio::sync::watch;
451     ///
452     /// let (tx, _rx) = watch::channel(1);
453     /// assert_eq!(tx.send_replace(2), 1);
454     /// assert_eq!(tx.send_replace(3), 2);
455     /// ```
send_replace(&self, value: T) -> T456     pub fn send_replace(&self, value: T) -> T {
457         let old = {
458             // Acquire the write lock and update the value.
459             let mut lock = self.shared.value.write().unwrap();
460             let old = mem::replace(&mut *lock, value);
461 
462             self.shared.state.increment_version();
463 
464             // Release the write lock.
465             //
466             // Incrementing the version counter while holding the lock ensures
467             // that receivers are able to figure out the version number of the
468             // value they are currently looking at.
469             drop(lock);
470 
471             old
472         };
473 
474         // Notify all watchers
475         self.shared.notify_rx.notify_waiters();
476 
477         old
478     }
479 
480     /// Returns a reference to the most recently sent value
481     ///
482     /// Outstanding borrows hold a read lock. This means that long lived borrows
483     /// could cause the send half to block. It is recommended to keep the borrow
484     /// as short lived as possible.
485     ///
486     /// # Examples
487     ///
488     /// ```
489     /// use tokio::sync::watch;
490     ///
491     /// let (tx, _) = watch::channel("hello");
492     /// assert_eq!(*tx.borrow(), "hello");
493     /// ```
borrow(&self) -> Ref<'_, T>494     pub fn borrow(&self) -> Ref<'_, T> {
495         let inner = self.shared.value.read().unwrap();
496         Ref { inner }
497     }
498 
499     /// Checks if the channel has been closed. This happens when all receivers
500     /// have dropped.
501     ///
502     /// # Examples
503     ///
504     /// ```
505     /// let (tx, rx) = tokio::sync::watch::channel(());
506     /// assert!(!tx.is_closed());
507     ///
508     /// drop(rx);
509     /// assert!(tx.is_closed());
510     /// ```
is_closed(&self) -> bool511     pub fn is_closed(&self) -> bool {
512         self.receiver_count() == 0
513     }
514 
515     /// Completes when all receivers have dropped.
516     ///
517     /// This allows the producer to get notified when interest in the produced
518     /// values is canceled and immediately stop doing work.
519     ///
520     /// # Cancel safety
521     ///
522     /// This method is cancel safe. Once the channel is closed, it stays closed
523     /// forever and all future calls to `closed` will return immediately.
524     ///
525     /// # Examples
526     ///
527     /// ```
528     /// use tokio::sync::watch;
529     ///
530     /// #[tokio::main]
531     /// async fn main() {
532     ///     let (tx, rx) = watch::channel("hello");
533     ///
534     ///     tokio::spawn(async move {
535     ///         // use `rx`
536     ///         drop(rx);
537     ///     });
538     ///
539     ///     // Waits for `rx` to drop
540     ///     tx.closed().await;
541     ///     println!("the `rx` handles dropped")
542     /// }
543     /// ```
closed(&self)544     pub async fn closed(&self) {
545         while self.receiver_count() > 0 {
546             let notified = self.shared.notify_tx.notified();
547 
548             if self.receiver_count() == 0 {
549                 return;
550             }
551 
552             notified.await;
553             // The channel could have been reopened in the meantime by calling
554             // `subscribe`, so we loop again.
555         }
556     }
557 
558     /// Creates a new [`Receiver`] connected to this `Sender`.
559     ///
560     /// All messages sent before this call to `subscribe` are initially marked
561     /// as seen by the new `Receiver`.
562     ///
563     /// This method can be called even if there are no other receivers. In this
564     /// case, the channel is reopened.
565     ///
566     /// # Examples
567     ///
568     /// The new channel will receive messages sent on this `Sender`.
569     ///
570     /// ```
571     /// use tokio::sync::watch;
572     ///
573     /// #[tokio::main]
574     /// async fn main() {
575     ///     let (tx, _rx) = watch::channel(0u64);
576     ///
577     ///     tx.send(5).unwrap();
578     ///
579     ///     let rx = tx.subscribe();
580     ///     assert_eq!(5, *rx.borrow());
581     ///
582     ///     tx.send(10).unwrap();
583     ///     assert_eq!(10, *rx.borrow());
584     /// }
585     /// ```
586     ///
587     /// The most recent message is considered seen by the channel, so this test
588     /// is guaranteed to pass.
589     ///
590     /// ```
591     /// use tokio::sync::watch;
592     /// use tokio::time::Duration;
593     ///
594     /// #[tokio::main]
595     /// async fn main() {
596     ///     let (tx, _rx) = watch::channel(0u64);
597     ///     tx.send(5).unwrap();
598     ///     let mut rx = tx.subscribe();
599     ///
600     ///     tokio::spawn(async move {
601     ///         // by spawning and sleeping, the message is sent after `main`
602     ///         // hits the call to `changed`.
603     ///         # if false {
604     ///         tokio::time::sleep(Duration::from_millis(10)).await;
605     ///         # }
606     ///         tx.send(100).unwrap();
607     ///     });
608     ///
609     ///     rx.changed().await.unwrap();
610     ///     assert_eq!(100, *rx.borrow());
611     /// }
612     /// ```
subscribe(&self) -> Receiver<T>613     pub fn subscribe(&self) -> Receiver<T> {
614         let shared = self.shared.clone();
615         let version = shared.state.load().version();
616 
617         // The CLOSED bit in the state tracks only whether the sender is
618         // dropped, so we do not need to unset it if this reopens the channel.
619         Receiver::from_shared(version, shared)
620     }
621 
622     /// Returns the number of receivers that currently exist.
623     ///
624     /// # Examples
625     ///
626     /// ```
627     /// use tokio::sync::watch;
628     ///
629     /// #[tokio::main]
630     /// async fn main() {
631     ///     let (tx, rx1) = watch::channel("hello");
632     ///
633     ///     assert_eq!(1, tx.receiver_count());
634     ///
635     ///     let mut _rx2 = rx1.clone();
636     ///
637     ///     assert_eq!(2, tx.receiver_count());
638     /// }
639     /// ```
receiver_count(&self) -> usize640     pub fn receiver_count(&self) -> usize {
641         self.shared.ref_count_rx.load(Relaxed)
642     }
643 }
644 
645 impl<T> Drop for Sender<T> {
drop(&mut self)646     fn drop(&mut self) {
647         self.shared.state.set_closed();
648         self.shared.notify_rx.notify_waiters();
649     }
650 }
651 
652 // ===== impl Ref =====
653 
654 impl<T> ops::Deref for Ref<'_, T> {
655     type Target = T;
656 
deref(&self) -> &T657     fn deref(&self) -> &T {
658         self.inner.deref()
659     }
660 }
661 
662 #[cfg(all(test, loom))]
663 mod tests {
664     use futures::future::FutureExt;
665     use loom::thread;
666 
667     // test for https://github.com/tokio-rs/tokio/issues/3168
668     #[test]
watch_spurious_wakeup()669     fn watch_spurious_wakeup() {
670         loom::model(|| {
671             let (send, mut recv) = crate::sync::watch::channel(0i32);
672 
673             send.send(1).unwrap();
674 
675             let send_thread = thread::spawn(move || {
676                 send.send(2).unwrap();
677                 send
678             });
679 
680             recv.changed().now_or_never();
681 
682             let send = send_thread.join().unwrap();
683             let recv_thread = thread::spawn(move || {
684                 recv.changed().now_or_never();
685                 recv.changed().now_or_never();
686                 recv
687             });
688 
689             send.send(3).unwrap();
690 
691             let mut recv = recv_thread.join().unwrap();
692             let send_thread = thread::spawn(move || {
693                 send.send(2).unwrap();
694             });
695 
696             recv.changed().now_or_never();
697 
698             send_thread.join().unwrap();
699         });
700     }
701 
702     #[test]
watch_borrow()703     fn watch_borrow() {
704         loom::model(|| {
705             let (send, mut recv) = crate::sync::watch::channel(0i32);
706 
707             assert!(send.borrow().eq(&0));
708             assert!(recv.borrow().eq(&0));
709 
710             send.send(1).unwrap();
711             assert!(send.borrow().eq(&1));
712 
713             let send_thread = thread::spawn(move || {
714                 send.send(2).unwrap();
715                 send
716             });
717 
718             recv.changed().now_or_never();
719 
720             let send = send_thread.join().unwrap();
721             let recv_thread = thread::spawn(move || {
722                 recv.changed().now_or_never();
723                 recv.changed().now_or_never();
724                 recv
725             });
726 
727             send.send(3).unwrap();
728 
729             let recv = recv_thread.join().unwrap();
730             assert!(recv.borrow().eq(&3));
731             assert!(send.borrow().eq(&3));
732 
733             send.send(2).unwrap();
734 
735             thread::spawn(move || {
736                 assert!(recv.borrow().eq(&2));
737             });
738             assert!(send.borrow().eq(&2));
739         });
740     }
741 }
742