• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::loom::sync::{atomic::AtomicUsize, Arc};
2 use crate::sync::mpsc::chan;
3 use crate::sync::mpsc::error::{SendError, TryRecvError};
4 
5 use std::fmt;
6 use std::task::{Context, Poll};
7 
8 /// Send values to the associated `UnboundedReceiver`.
9 ///
10 /// Instances are created by the
11 /// [`unbounded_channel`](unbounded_channel) function.
12 pub struct UnboundedSender<T> {
13     chan: chan::Tx<T, Semaphore>,
14 }
15 
16 /// An unbounded sender that does not prevent the channel from being closed.
17 ///
18 /// If all [`UnboundedSender`] instances of a channel were dropped and only
19 /// `WeakUnboundedSender` instances remain, the channel is closed.
20 ///
21 /// In order to send messages, the `WeakUnboundedSender` needs to be upgraded using
22 /// [`WeakUnboundedSender::upgrade`], which returns `Option<UnboundedSender>`. It returns `None`
23 /// if all `UnboundedSender`s have been dropped, and otherwise it returns an `UnboundedSender`.
24 ///
25 /// [`UnboundedSender`]: UnboundedSender
26 /// [`WeakUnboundedSender::upgrade`]: WeakUnboundedSender::upgrade
27 ///
28 /// #Examples
29 ///
30 /// ```
31 /// use tokio::sync::mpsc::unbounded_channel;
32 ///
33 /// #[tokio::main]
34 /// async fn main() {
35 ///     let (tx, _rx) = unbounded_channel::<i32>();
36 ///     let tx_weak = tx.downgrade();
37 ///
38 ///     // Upgrading will succeed because `tx` still exists.
39 ///     assert!(tx_weak.upgrade().is_some());
40 ///
41 ///     // If we drop `tx`, then it will fail.
42 ///     drop(tx);
43 ///     assert!(tx_weak.clone().upgrade().is_none());
44 /// }
45 /// ```
46 pub struct WeakUnboundedSender<T> {
47     chan: Arc<chan::Chan<T, Semaphore>>,
48 }
49 
50 impl<T> Clone for UnboundedSender<T> {
clone(&self) -> Self51     fn clone(&self) -> Self {
52         UnboundedSender {
53             chan: self.chan.clone(),
54         }
55     }
56 }
57 
58 impl<T> fmt::Debug for UnboundedSender<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result59     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
60         fmt.debug_struct("UnboundedSender")
61             .field("chan", &self.chan)
62             .finish()
63     }
64 }
65 
66 /// Receive values from the associated `UnboundedSender`.
67 ///
68 /// Instances are created by the
69 /// [`unbounded_channel`](unbounded_channel) function.
70 ///
71 /// This receiver can be turned into a `Stream` using [`UnboundedReceiverStream`].
72 ///
73 /// [`UnboundedReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnboundedReceiverStream.html
74 pub struct UnboundedReceiver<T> {
75     /// The channel receiver
76     chan: chan::Rx<T, Semaphore>,
77 }
78 
79 impl<T> fmt::Debug for UnboundedReceiver<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result80     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
81         fmt.debug_struct("UnboundedReceiver")
82             .field("chan", &self.chan)
83             .finish()
84     }
85 }
86 
87 /// Creates an unbounded mpsc channel for communicating between asynchronous
88 /// tasks without backpressure.
89 ///
90 /// A `send` on this channel will always succeed as long as the receive half has
91 /// not been closed. If the receiver falls behind, messages will be arbitrarily
92 /// buffered.
93 ///
94 /// **Note** that the amount of available system memory is an implicit bound to
95 /// the channel. Using an `unbounded` channel has the ability of causing the
96 /// process to run out of memory. In this case, the process will be aborted.
unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>)97 pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
98     let (tx, rx) = chan::channel(Semaphore(AtomicUsize::new(0)));
99 
100     let tx = UnboundedSender::new(tx);
101     let rx = UnboundedReceiver::new(rx);
102 
103     (tx, rx)
104 }
105 
106 /// No capacity
107 #[derive(Debug)]
108 pub(crate) struct Semaphore(pub(crate) AtomicUsize);
109 
110 impl<T> UnboundedReceiver<T> {
new(chan: chan::Rx<T, Semaphore>) -> UnboundedReceiver<T>111     pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> UnboundedReceiver<T> {
112         UnboundedReceiver { chan }
113     }
114 
115     /// Receives the next value for this receiver.
116     ///
117     /// This method returns `None` if the channel has been closed and there are
118     /// no remaining messages in the channel's buffer. This indicates that no
119     /// further values can ever be received from this `Receiver`. The channel is
120     /// closed when all senders have been dropped, or when [`close`] is called.
121     ///
122     /// If there are no messages in the channel's buffer, but the channel has
123     /// not yet been closed, this method will sleep until a message is sent or
124     /// the channel is closed.
125     ///
126     /// # Cancel safety
127     ///
128     /// This method is cancel safe. If `recv` is used as the event in a
129     /// [`tokio::select!`](crate::select) statement and some other branch
130     /// completes first, it is guaranteed that no messages were received on this
131     /// channel.
132     ///
133     /// [`close`]: Self::close
134     ///
135     /// # Examples
136     ///
137     /// ```
138     /// use tokio::sync::mpsc;
139     ///
140     /// #[tokio::main]
141     /// async fn main() {
142     ///     let (tx, mut rx) = mpsc::unbounded_channel();
143     ///
144     ///     tokio::spawn(async move {
145     ///         tx.send("hello").unwrap();
146     ///     });
147     ///
148     ///     assert_eq!(Some("hello"), rx.recv().await);
149     ///     assert_eq!(None, rx.recv().await);
150     /// }
151     /// ```
152     ///
153     /// Values are buffered:
154     ///
155     /// ```
156     /// use tokio::sync::mpsc;
157     ///
158     /// #[tokio::main]
159     /// async fn main() {
160     ///     let (tx, mut rx) = mpsc::unbounded_channel();
161     ///
162     ///     tx.send("hello").unwrap();
163     ///     tx.send("world").unwrap();
164     ///
165     ///     assert_eq!(Some("hello"), rx.recv().await);
166     ///     assert_eq!(Some("world"), rx.recv().await);
167     /// }
168     /// ```
recv(&mut self) -> Option<T>169     pub async fn recv(&mut self) -> Option<T> {
170         use crate::future::poll_fn;
171 
172         poll_fn(|cx| self.poll_recv(cx)).await
173     }
174 
175     /// Tries to receive the next value for this receiver.
176     ///
177     /// This method returns the [`Empty`] error if the channel is currently
178     /// empty, but there are still outstanding [senders] or [permits].
179     ///
180     /// This method returns the [`Disconnected`] error if the channel is
181     /// currently empty, and there are no outstanding [senders] or [permits].
182     ///
183     /// Unlike the [`poll_recv`] method, this method will never return an
184     /// [`Empty`] error spuriously.
185     ///
186     /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty
187     /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected
188     /// [`poll_recv`]: Self::poll_recv
189     /// [senders]: crate::sync::mpsc::Sender
190     /// [permits]: crate::sync::mpsc::Permit
191     ///
192     /// # Examples
193     ///
194     /// ```
195     /// use tokio::sync::mpsc;
196     /// use tokio::sync::mpsc::error::TryRecvError;
197     ///
198     /// #[tokio::main]
199     /// async fn main() {
200     ///     let (tx, mut rx) = mpsc::unbounded_channel();
201     ///
202     ///     tx.send("hello").unwrap();
203     ///
204     ///     assert_eq!(Ok("hello"), rx.try_recv());
205     ///     assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
206     ///
207     ///     tx.send("hello").unwrap();
208     ///     // Drop the last sender, closing the channel.
209     ///     drop(tx);
210     ///
211     ///     assert_eq!(Ok("hello"), rx.try_recv());
212     ///     assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
213     /// }
214     /// ```
try_recv(&mut self) -> Result<T, TryRecvError>215     pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
216         self.chan.try_recv()
217     }
218 
219     /// Blocking receive to call outside of asynchronous contexts.
220     ///
221     /// # Panics
222     ///
223     /// This function panics if called within an asynchronous execution
224     /// context.
225     ///
226     /// # Examples
227     ///
228     /// ```
229     /// use std::thread;
230     /// use tokio::sync::mpsc;
231     ///
232     /// #[tokio::main]
233     /// async fn main() {
234     ///     let (tx, mut rx) = mpsc::unbounded_channel::<u8>();
235     ///
236     ///     let sync_code = thread::spawn(move || {
237     ///         assert_eq!(Some(10), rx.blocking_recv());
238     ///     });
239     ///
240     ///     let _ = tx.send(10);
241     ///     sync_code.join().unwrap();
242     /// }
243     /// ```
244     #[track_caller]
245     #[cfg(feature = "sync")]
246     #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
blocking_recv(&mut self) -> Option<T>247     pub fn blocking_recv(&mut self) -> Option<T> {
248         crate::future::block_on(self.recv())
249     }
250 
251     /// Closes the receiving half of a channel, without dropping it.
252     ///
253     /// This prevents any further messages from being sent on the channel while
254     /// still enabling the receiver to drain messages that are buffered.
255     ///
256     /// To guarantee that no messages are dropped, after calling `close()`,
257     /// `recv()` must be called until `None` is returned.
close(&mut self)258     pub fn close(&mut self) {
259         self.chan.close();
260     }
261 
262     /// Polls to receive the next message on this channel.
263     ///
264     /// This method returns:
265     ///
266     ///  * `Poll::Pending` if no messages are available but the channel is not
267     ///    closed, or if a spurious failure happens.
268     ///  * `Poll::Ready(Some(message))` if a message is available.
269     ///  * `Poll::Ready(None)` if the channel has been closed and all messages
270     ///    sent before it was closed have been received.
271     ///
272     /// When the method returns `Poll::Pending`, the `Waker` in the provided
273     /// `Context` is scheduled to receive a wakeup when a message is sent on any
274     /// receiver, or when the channel is closed.  Note that on multiple calls to
275     /// `poll_recv`, only the `Waker` from the `Context` passed to the most
276     /// recent call is scheduled to receive a wakeup.
277     ///
278     /// If this method returns `Poll::Pending` due to a spurious failure, then
279     /// the `Waker` will be notified when the situation causing the spurious
280     /// failure has been resolved. Note that receiving such a wakeup does not
281     /// guarantee that the next call will succeed — it could fail with another
282     /// spurious failure.
poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>>283     pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
284         self.chan.recv(cx)
285     }
286 }
287 
288 impl<T> UnboundedSender<T> {
new(chan: chan::Tx<T, Semaphore>) -> UnboundedSender<T>289     pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> UnboundedSender<T> {
290         UnboundedSender { chan }
291     }
292 
293     /// Attempts to send a message on this `UnboundedSender` without blocking.
294     ///
295     /// This method is not marked async because sending a message to an unbounded channel
296     /// never requires any form of waiting. Because of this, the `send` method can be
297     /// used in both synchronous and asynchronous code without problems.
298     ///
299     /// If the receive half of the channel is closed, either due to [`close`]
300     /// being called or the [`UnboundedReceiver`] having been dropped, this
301     /// function returns an error. The error includes the value passed to `send`.
302     ///
303     /// [`close`]: UnboundedReceiver::close
304     /// [`UnboundedReceiver`]: UnboundedReceiver
send(&self, message: T) -> Result<(), SendError<T>>305     pub fn send(&self, message: T) -> Result<(), SendError<T>> {
306         if !self.inc_num_messages() {
307             return Err(SendError(message));
308         }
309 
310         self.chan.send(message);
311         Ok(())
312     }
313 
inc_num_messages(&self) -> bool314     fn inc_num_messages(&self) -> bool {
315         use std::process;
316         use std::sync::atomic::Ordering::{AcqRel, Acquire};
317 
318         let mut curr = self.chan.semaphore().0.load(Acquire);
319 
320         loop {
321             if curr & 1 == 1 {
322                 return false;
323             }
324 
325             if curr == usize::MAX ^ 1 {
326                 // Overflowed the ref count. There is no safe way to recover, so
327                 // abort the process. In practice, this should never happen.
328                 process::abort()
329             }
330 
331             match self
332                 .chan
333                 .semaphore()
334                 .0
335                 .compare_exchange(curr, curr + 2, AcqRel, Acquire)
336             {
337                 Ok(_) => return true,
338                 Err(actual) => {
339                     curr = actual;
340                 }
341             }
342         }
343     }
344 
345     /// Completes when the receiver has dropped.
346     ///
347     /// This allows the producers to get notified when interest in the produced
348     /// values is canceled and immediately stop doing work.
349     ///
350     /// # Cancel safety
351     ///
352     /// This method is cancel safe. Once the channel is closed, it stays closed
353     /// forever and all future calls to `closed` will return immediately.
354     ///
355     /// # Examples
356     ///
357     /// ```
358     /// use tokio::sync::mpsc;
359     ///
360     /// #[tokio::main]
361     /// async fn main() {
362     ///     let (tx1, rx) = mpsc::unbounded_channel::<()>();
363     ///     let tx2 = tx1.clone();
364     ///     let tx3 = tx1.clone();
365     ///     let tx4 = tx1.clone();
366     ///     let tx5 = tx1.clone();
367     ///     tokio::spawn(async move {
368     ///         drop(rx);
369     ///     });
370     ///
371     ///     futures::join!(
372     ///         tx1.closed(),
373     ///         tx2.closed(),
374     ///         tx3.closed(),
375     ///         tx4.closed(),
376     ///         tx5.closed()
377     ///     );
378     ////     println!("Receiver dropped");
379     /// }
380     /// ```
closed(&self)381     pub async fn closed(&self) {
382         self.chan.closed().await
383     }
384 
385     /// Checks if the channel has been closed. This happens when the
386     /// [`UnboundedReceiver`] is dropped, or when the
387     /// [`UnboundedReceiver::close`] method is called.
388     ///
389     /// [`UnboundedReceiver`]: crate::sync::mpsc::UnboundedReceiver
390     /// [`UnboundedReceiver::close`]: crate::sync::mpsc::UnboundedReceiver::close
391     ///
392     /// ```
393     /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>();
394     /// assert!(!tx.is_closed());
395     ///
396     /// let tx2 = tx.clone();
397     /// assert!(!tx2.is_closed());
398     ///
399     /// drop(rx);
400     /// assert!(tx.is_closed());
401     /// assert!(tx2.is_closed());
402     /// ```
is_closed(&self) -> bool403     pub fn is_closed(&self) -> bool {
404         self.chan.is_closed()
405     }
406 
407     /// Returns `true` if senders belong to the same channel.
408     ///
409     /// # Examples
410     ///
411     /// ```
412     /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>();
413     /// let  tx2 = tx.clone();
414     /// assert!(tx.same_channel(&tx2));
415     ///
416     /// let (tx3, rx3) = tokio::sync::mpsc::unbounded_channel::<()>();
417     /// assert!(!tx3.same_channel(&tx2));
418     /// ```
same_channel(&self, other: &Self) -> bool419     pub fn same_channel(&self, other: &Self) -> bool {
420         self.chan.same_channel(&other.chan)
421     }
422 
423     /// Converts the `UnboundedSender` to a [`WeakUnboundedSender`] that does not count
424     /// towards RAII semantics, i.e. if all `UnboundedSender` instances of the
425     /// channel were dropped and only `WeakUnboundedSender` instances remain,
426     /// the channel is closed.
downgrade(&self) -> WeakUnboundedSender<T>427     pub fn downgrade(&self) -> WeakUnboundedSender<T> {
428         WeakUnboundedSender {
429             chan: self.chan.downgrade(),
430         }
431     }
432 }
433 
434 impl<T> Clone for WeakUnboundedSender<T> {
clone(&self) -> Self435     fn clone(&self) -> Self {
436         WeakUnboundedSender {
437             chan: self.chan.clone(),
438         }
439     }
440 }
441 
442 impl<T> WeakUnboundedSender<T> {
443     /// Tries to convert a WeakUnboundedSender into an [`UnboundedSender`].
444     /// This will return `Some` if there are other `Sender` instances alive and
445     /// the channel wasn't previously dropped, otherwise `None` is returned.
upgrade(&self) -> Option<UnboundedSender<T>>446     pub fn upgrade(&self) -> Option<UnboundedSender<T>> {
447         chan::Tx::upgrade(self.chan.clone()).map(UnboundedSender::new)
448     }
449 }
450 
451 impl<T> fmt::Debug for WeakUnboundedSender<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result452     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
453         fmt.debug_struct("WeakUnboundedSender").finish()
454     }
455 }
456