1 use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
2 use crate::sync::mpsc::chan;
3 use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
4
5 cfg_time! {
6 use crate::sync::mpsc::error::SendTimeoutError;
7 use crate::time::Duration;
8 }
9
10 use std::fmt;
11 use std::task::{Context, Poll};
12
13 /// Sends values to the associated `Receiver`.
14 ///
15 /// Instances are created by the [`channel`](channel) function.
16 ///
17 /// To convert the `Sender` into a `Sink` or use it in a poll function, you can
18 /// use the [`PollSender`] utility.
19 ///
20 /// [`PollSender`]: https://docs.rs/tokio-util/0.6/tokio_util/sync/struct.PollSender.html
21 pub struct Sender<T> {
22 chan: chan::Tx<T, Semaphore>,
23 }
24
25 /// Permits to send one value into the channel.
26 ///
27 /// `Permit` values are returned by [`Sender::reserve()`] and [`Sender::try_reserve()`]
28 /// and are used to guarantee channel capacity before generating a message to send.
29 ///
30 /// [`Sender::reserve()`]: Sender::reserve
31 /// [`Sender::try_reserve()`]: Sender::try_reserve
32 pub struct Permit<'a, T> {
33 chan: &'a chan::Tx<T, Semaphore>,
34 }
35
36 /// Owned permit to send one value into the channel.
37 ///
38 /// This is identical to the [`Permit`] type, except that it moves the sender
39 /// rather than borrowing it.
40 ///
41 /// `OwnedPermit` values are returned by [`Sender::reserve_owned()`] and
42 /// [`Sender::try_reserve_owned()`] and are used to guarantee channel capacity
43 /// before generating a message to send.
44 ///
45 /// [`Permit`]: Permit
46 /// [`Sender::reserve_owned()`]: Sender::reserve_owned
47 /// [`Sender::try_reserve_owned()`]: Sender::try_reserve_owned
48 pub struct OwnedPermit<T> {
49 chan: Option<chan::Tx<T, Semaphore>>,
50 }
51
52 /// Receives values from the associated `Sender`.
53 ///
54 /// Instances are created by the [`channel`](channel) function.
55 ///
56 /// This receiver can be turned into a `Stream` using [`ReceiverStream`].
57 ///
58 /// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html
59 pub struct Receiver<T> {
60 /// The channel receiver.
61 chan: chan::Rx<T, Semaphore>,
62 }
63
64 /// Creates a bounded mpsc channel for communicating between asynchronous tasks
65 /// with backpressure.
66 ///
67 /// The channel will buffer up to the provided number of messages. Once the
68 /// buffer is full, attempts to send new messages will wait until a message is
69 /// received from the channel. The provided buffer capacity must be at least 1.
70 ///
71 /// All data sent on `Sender` will become available on `Receiver` in the same
72 /// order as it was sent.
73 ///
74 /// The `Sender` can be cloned to `send` to the same channel from multiple code
75 /// locations. Only one `Receiver` is supported.
76 ///
77 /// If the `Receiver` is disconnected while trying to `send`, the `send` method
78 /// will return a `SendError`. Similarly, if `Sender` is disconnected while
79 /// trying to `recv`, the `recv` method will return `None`.
80 ///
81 /// # Panics
82 ///
83 /// Panics if the buffer capacity is 0.
84 ///
85 /// # Examples
86 ///
87 /// ```rust
88 /// use tokio::sync::mpsc;
89 ///
90 /// #[tokio::main]
91 /// async fn main() {
92 /// let (tx, mut rx) = mpsc::channel(100);
93 ///
94 /// tokio::spawn(async move {
95 /// for i in 0..10 {
96 /// if let Err(_) = tx.send(i).await {
97 /// println!("receiver dropped");
98 /// return;
99 /// }
100 /// }
101 /// });
102 ///
103 /// while let Some(i) = rx.recv().await {
104 /// println!("got = {}", i);
105 /// }
106 /// }
107 /// ```
channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>)108 pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
109 assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
110 let semaphore = (semaphore::Semaphore::new(buffer), buffer);
111 let (tx, rx) = chan::channel(semaphore);
112
113 let tx = Sender::new(tx);
114 let rx = Receiver::new(rx);
115
116 (tx, rx)
117 }
118
119 /// Channel semaphore is a tuple of the semaphore implementation and a `usize`
120 /// representing the channel bound.
121 type Semaphore = (semaphore::Semaphore, usize);
122
123 impl<T> Receiver<T> {
new(chan: chan::Rx<T, Semaphore>) -> Receiver<T>124 pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
125 Receiver { chan }
126 }
127
128 /// Receives the next value for this receiver.
129 ///
130 /// This method returns `None` if the channel has been closed and there are
131 /// no remaining messages in the channel's buffer. This indicates that no
132 /// further values can ever be received from this `Receiver`. The channel is
133 /// closed when all senders have been dropped, or when [`close`] is called.
134 ///
135 /// If there are no messages in the channel's buffer, but the channel has
136 /// not yet been closed, this method will sleep until a message is sent or
137 /// the channel is closed. Note that if [`close`] is called, but there are
138 /// still outstanding [`Permits`] from before it was closed, the channel is
139 /// not considered closed by `recv` until the permits are released.
140 ///
141 /// # Cancel safety
142 ///
143 /// This method is cancel safe. If `recv` is used as the event in a
144 /// [`tokio::select!`](crate::select) statement and some other branch
145 /// completes first, it is guaranteed that no messages were received on this
146 /// channel.
147 ///
148 /// [`close`]: Self::close
149 /// [`Permits`]: struct@crate::sync::mpsc::Permit
150 ///
151 /// # Examples
152 ///
153 /// ```
154 /// use tokio::sync::mpsc;
155 ///
156 /// #[tokio::main]
157 /// async fn main() {
158 /// let (tx, mut rx) = mpsc::channel(100);
159 ///
160 /// tokio::spawn(async move {
161 /// tx.send("hello").await.unwrap();
162 /// });
163 ///
164 /// assert_eq!(Some("hello"), rx.recv().await);
165 /// assert_eq!(None, rx.recv().await);
166 /// }
167 /// ```
168 ///
169 /// Values are buffered:
170 ///
171 /// ```
172 /// use tokio::sync::mpsc;
173 ///
174 /// #[tokio::main]
175 /// async fn main() {
176 /// let (tx, mut rx) = mpsc::channel(100);
177 ///
178 /// tx.send("hello").await.unwrap();
179 /// tx.send("world").await.unwrap();
180 ///
181 /// assert_eq!(Some("hello"), rx.recv().await);
182 /// assert_eq!(Some("world"), rx.recv().await);
183 /// }
184 /// ```
recv(&mut self) -> Option<T>185 pub async fn recv(&mut self) -> Option<T> {
186 use crate::future::poll_fn;
187 poll_fn(|cx| self.chan.recv(cx)).await
188 }
189
190 /// Tries to receive the next value for this receiver.
191 ///
192 /// This method returns the [`Empty`] error if the channel is currently
193 /// empty, but there are still outstanding [senders] or [permits].
194 ///
195 /// This method returns the [`Disconnected`] error if the channel is
196 /// currently empty, and there are no outstanding [senders] or [permits].
197 ///
198 /// Unlike the [`poll_recv`] method, this method will never return an
199 /// [`Empty`] error spuriously.
200 ///
201 /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty
202 /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected
203 /// [`poll_recv`]: Self::poll_recv
204 /// [senders]: crate::sync::mpsc::Sender
205 /// [permits]: crate::sync::mpsc::Permit
206 ///
207 /// # Examples
208 ///
209 /// ```
210 /// use tokio::sync::mpsc;
211 /// use tokio::sync::mpsc::error::TryRecvError;
212 ///
213 /// #[tokio::main]
214 /// async fn main() {
215 /// let (tx, mut rx) = mpsc::channel(100);
216 ///
217 /// tx.send("hello").await.unwrap();
218 ///
219 /// assert_eq!(Ok("hello"), rx.try_recv());
220 /// assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
221 ///
222 /// tx.send("hello").await.unwrap();
223 /// // Drop the last sender, closing the channel.
224 /// drop(tx);
225 ///
226 /// assert_eq!(Ok("hello"), rx.try_recv());
227 /// assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
228 /// }
229 /// ```
try_recv(&mut self) -> Result<T, TryRecvError>230 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
231 self.chan.try_recv()
232 }
233
234 /// Blocking receive to call outside of asynchronous contexts.
235 ///
236 /// This method returns `None` if the channel has been closed and there are
237 /// no remaining messages in the channel's buffer. This indicates that no
238 /// further values can ever be received from this `Receiver`. The channel is
239 /// closed when all senders have been dropped, or when [`close`] is called.
240 ///
241 /// If there are no messages in the channel's buffer, but the channel has
242 /// not yet been closed, this method will block until a message is sent or
243 /// the channel is closed.
244 ///
245 /// This method is intended for use cases where you are sending from
246 /// asynchronous code to synchronous code, and will work even if the sender
247 /// is not using [`blocking_send`] to send the message.
248 ///
249 /// Note that if [`close`] is called, but there are still outstanding
250 /// [`Permits`] from before it was closed, the channel is not considered
251 /// closed by `blocking_recv` until the permits are released.
252 ///
253 /// [`close`]: Self::close
254 /// [`Permits`]: struct@crate::sync::mpsc::Permit
255 /// [`blocking_send`]: fn@crate::sync::mpsc::Sender::blocking_send
256 ///
257 /// # Panics
258 ///
259 /// This function panics if called within an asynchronous execution
260 /// context.
261 ///
262 /// # Examples
263 ///
264 /// ```
265 /// use std::thread;
266 /// use tokio::runtime::Runtime;
267 /// use tokio::sync::mpsc;
268 ///
269 /// fn main() {
270 /// let (tx, mut rx) = mpsc::channel::<u8>(10);
271 ///
272 /// let sync_code = thread::spawn(move || {
273 /// assert_eq!(Some(10), rx.blocking_recv());
274 /// });
275 ///
276 /// Runtime::new()
277 /// .unwrap()
278 /// .block_on(async move {
279 /// let _ = tx.send(10).await;
280 /// });
281 /// sync_code.join().unwrap()
282 /// }
283 /// ```
284 #[cfg(feature = "sync")]
blocking_recv(&mut self) -> Option<T>285 pub fn blocking_recv(&mut self) -> Option<T> {
286 crate::future::block_on(self.recv())
287 }
288
289 /// Closes the receiving half of a channel without dropping it.
290 ///
291 /// This prevents any further messages from being sent on the channel while
292 /// still enabling the receiver to drain messages that are buffered. Any
293 /// outstanding [`Permit`] values will still be able to send messages.
294 ///
295 /// To guarantee that no messages are dropped, after calling `close()`,
296 /// `recv()` must be called until `None` is returned. If there are
297 /// outstanding [`Permit`] or [`OwnedPermit`] values, the `recv` method will
298 /// not return `None` until those are released.
299 ///
300 /// [`Permit`]: Permit
301 /// [`OwnedPermit`]: OwnedPermit
302 ///
303 /// # Examples
304 ///
305 /// ```
306 /// use tokio::sync::mpsc;
307 ///
308 /// #[tokio::main]
309 /// async fn main() {
310 /// let (tx, mut rx) = mpsc::channel(20);
311 ///
312 /// tokio::spawn(async move {
313 /// let mut i = 0;
314 /// while let Ok(permit) = tx.reserve().await {
315 /// permit.send(i);
316 /// i += 1;
317 /// }
318 /// });
319 ///
320 /// rx.close();
321 ///
322 /// while let Some(msg) = rx.recv().await {
323 /// println!("got {}", msg);
324 /// }
325 ///
326 /// // Channel closed and no messages are lost.
327 /// }
328 /// ```
close(&mut self)329 pub fn close(&mut self) {
330 self.chan.close();
331 }
332
333 /// Polls to receive the next message on this channel.
334 ///
335 /// This method returns:
336 ///
337 /// * `Poll::Pending` if no messages are available but the channel is not
338 /// closed, or if a spurious failure happens.
339 /// * `Poll::Ready(Some(message))` if a message is available.
340 /// * `Poll::Ready(None)` if the channel has been closed and all messages
341 /// sent before it was closed have been received.
342 ///
343 /// When the method returns `Poll::Pending`, the `Waker` in the provided
344 /// `Context` is scheduled to receive a wakeup when a message is sent on any
345 /// receiver, or when the channel is closed. Note that on multiple calls to
346 /// `poll_recv`, only the `Waker` from the `Context` passed to the most
347 /// recent call is scheduled to receive a wakeup.
348 ///
349 /// If this method returns `Poll::Pending` due to a spurious failure, then
350 /// the `Waker` will be notified when the situation causing the spurious
351 /// failure has been resolved. Note that receiving such a wakeup does not
352 /// guarantee that the next call will succeed — it could fail with another
353 /// spurious failure.
poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>>354 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
355 self.chan.recv(cx)
356 }
357 }
358
359 impl<T> fmt::Debug for Receiver<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result360 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
361 fmt.debug_struct("Receiver")
362 .field("chan", &self.chan)
363 .finish()
364 }
365 }
366
367 impl<T> Unpin for Receiver<T> {}
368
369 impl<T> Sender<T> {
new(chan: chan::Tx<T, Semaphore>) -> Sender<T>370 pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
371 Sender { chan }
372 }
373
374 /// Sends a value, waiting until there is capacity.
375 ///
376 /// A successful send occurs when it is determined that the other end of the
377 /// channel has not hung up already. An unsuccessful send would be one where
378 /// the corresponding receiver has already been closed. Note that a return
379 /// value of `Err` means that the data will never be received, but a return
380 /// value of `Ok` does not mean that the data will be received. It is
381 /// possible for the corresponding receiver to hang up immediately after
382 /// this function returns `Ok`.
383 ///
384 /// # Errors
385 ///
386 /// If the receive half of the channel is closed, either due to [`close`]
387 /// being called or the [`Receiver`] handle dropping, the function returns
388 /// an error. The error includes the value passed to `send`.
389 ///
390 /// [`close`]: Receiver::close
391 /// [`Receiver`]: Receiver
392 ///
393 /// # Cancel safety
394 ///
395 /// If `send` is used as the event in a [`tokio::select!`](crate::select)
396 /// statement and some other branch completes first, then it is guaranteed
397 /// that the message was not sent.
398 ///
399 /// This channel uses a queue to ensure that calls to `send` and `reserve`
400 /// complete in the order they were requested. Cancelling a call to
401 /// `send` makes you lose your place in the queue.
402 ///
403 /// # Examples
404 ///
405 /// In the following example, each call to `send` will block until the
406 /// previously sent value was received.
407 ///
408 /// ```rust
409 /// use tokio::sync::mpsc;
410 ///
411 /// #[tokio::main]
412 /// async fn main() {
413 /// let (tx, mut rx) = mpsc::channel(1);
414 ///
415 /// tokio::spawn(async move {
416 /// for i in 0..10 {
417 /// if let Err(_) = tx.send(i).await {
418 /// println!("receiver dropped");
419 /// return;
420 /// }
421 /// }
422 /// });
423 ///
424 /// while let Some(i) = rx.recv().await {
425 /// println!("got = {}", i);
426 /// }
427 /// }
428 /// ```
send(&self, value: T) -> Result<(), SendError<T>>429 pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
430 match self.reserve().await {
431 Ok(permit) => {
432 permit.send(value);
433 Ok(())
434 }
435 Err(_) => Err(SendError(value)),
436 }
437 }
438
439 /// Completes when the receiver has dropped.
440 ///
441 /// This allows the producers to get notified when interest in the produced
442 /// values is canceled and immediately stop doing work.
443 ///
444 /// # Cancel safety
445 ///
446 /// This method is cancel safe. Once the channel is closed, it stays closed
447 /// forever and all future calls to `closed` will return immediately.
448 ///
449 /// # Examples
450 ///
451 /// ```
452 /// use tokio::sync::mpsc;
453 ///
454 /// #[tokio::main]
455 /// async fn main() {
456 /// let (tx1, rx) = mpsc::channel::<()>(1);
457 /// let tx2 = tx1.clone();
458 /// let tx3 = tx1.clone();
459 /// let tx4 = tx1.clone();
460 /// let tx5 = tx1.clone();
461 /// tokio::spawn(async move {
462 /// drop(rx);
463 /// });
464 ///
465 /// futures::join!(
466 /// tx1.closed(),
467 /// tx2.closed(),
468 /// tx3.closed(),
469 /// tx4.closed(),
470 /// tx5.closed()
471 /// );
472 /// println!("Receiver dropped");
473 /// }
474 /// ```
closed(&self)475 pub async fn closed(&self) {
476 self.chan.closed().await
477 }
478
479 /// Attempts to immediately send a message on this `Sender`
480 ///
481 /// This method differs from [`send`] by returning immediately if the channel's
482 /// buffer is full or no receiver is waiting to acquire some data. Compared
483 /// with [`send`], this function has two failure cases instead of one (one for
484 /// disconnection, one for a full buffer).
485 ///
486 /// # Errors
487 ///
488 /// If the channel capacity has been reached, i.e., the channel has `n`
489 /// buffered values where `n` is the argument passed to [`channel`], then an
490 /// error is returned.
491 ///
492 /// If the receive half of the channel is closed, either due to [`close`]
493 /// being called or the [`Receiver`] handle dropping, the function returns
494 /// an error. The error includes the value passed to `send`.
495 ///
496 /// [`send`]: Sender::send
497 /// [`channel`]: channel
498 /// [`close`]: Receiver::close
499 ///
500 /// # Examples
501 ///
502 /// ```
503 /// use tokio::sync::mpsc;
504 ///
505 /// #[tokio::main]
506 /// async fn main() {
507 /// // Create a channel with buffer size 1
508 /// let (tx1, mut rx) = mpsc::channel(1);
509 /// let tx2 = tx1.clone();
510 ///
511 /// tokio::spawn(async move {
512 /// tx1.send(1).await.unwrap();
513 /// tx1.send(2).await.unwrap();
514 /// // task waits until the receiver receives a value.
515 /// });
516 ///
517 /// tokio::spawn(async move {
518 /// // This will return an error and send
519 /// // no message if the buffer is full
520 /// let _ = tx2.try_send(3);
521 /// });
522 ///
523 /// let mut msg;
524 /// msg = rx.recv().await.unwrap();
525 /// println!("message {} received", msg);
526 ///
527 /// msg = rx.recv().await.unwrap();
528 /// println!("message {} received", msg);
529 ///
530 /// // Third message may have never been sent
531 /// match rx.recv().await {
532 /// Some(msg) => println!("message {} received", msg),
533 /// None => println!("the third message was never sent"),
534 /// }
535 /// }
536 /// ```
try_send(&self, message: T) -> Result<(), TrySendError<T>>537 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
538 match self.chan.semaphore().0.try_acquire(1) {
539 Ok(_) => {}
540 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)),
541 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)),
542 }
543
544 // Send the message
545 self.chan.send(message);
546 Ok(())
547 }
548
549 /// Sends a value, waiting until there is capacity, but only for a limited time.
550 ///
551 /// Shares the same success and error conditions as [`send`], adding one more
552 /// condition for an unsuccessful send, which is when the provided timeout has
553 /// elapsed, and there is no capacity available.
554 ///
555 /// [`send`]: Sender::send
556 ///
557 /// # Errors
558 ///
559 /// If the receive half of the channel is closed, either due to [`close`]
560 /// being called or the [`Receiver`] having been dropped,
561 /// the function returns an error. The error includes the value passed to `send`.
562 ///
563 /// [`close`]: Receiver::close
564 /// [`Receiver`]: Receiver
565 ///
566 /// # Examples
567 ///
568 /// In the following example, each call to `send_timeout` will block until the
569 /// previously sent value was received, unless the timeout has elapsed.
570 ///
571 /// ```rust
572 /// use tokio::sync::mpsc;
573 /// use tokio::time::{sleep, Duration};
574 ///
575 /// #[tokio::main]
576 /// async fn main() {
577 /// let (tx, mut rx) = mpsc::channel(1);
578 ///
579 /// tokio::spawn(async move {
580 /// for i in 0..10 {
581 /// if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await {
582 /// println!("send error: #{:?}", e);
583 /// return;
584 /// }
585 /// }
586 /// });
587 ///
588 /// while let Some(i) = rx.recv().await {
589 /// println!("got = {}", i);
590 /// sleep(Duration::from_millis(200)).await;
591 /// }
592 /// }
593 /// ```
594 #[cfg(feature = "time")]
595 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
send_timeout( &self, value: T, timeout: Duration, ) -> Result<(), SendTimeoutError<T>>596 pub async fn send_timeout(
597 &self,
598 value: T,
599 timeout: Duration,
600 ) -> Result<(), SendTimeoutError<T>> {
601 let permit = match crate::time::timeout(timeout, self.reserve()).await {
602 Err(_) => {
603 return Err(SendTimeoutError::Timeout(value));
604 }
605 Ok(Err(_)) => {
606 return Err(SendTimeoutError::Closed(value));
607 }
608 Ok(Ok(permit)) => permit,
609 };
610
611 permit.send(value);
612 Ok(())
613 }
614
615 /// Blocking send to call outside of asynchronous contexts.
616 ///
617 /// This method is intended for use cases where you are sending from
618 /// synchronous code to asynchronous code, and will work even if the
619 /// receiver is not using [`blocking_recv`] to receive the message.
620 ///
621 /// [`blocking_recv`]: fn@crate::sync::mpsc::Receiver::blocking_recv
622 ///
623 /// # Panics
624 ///
625 /// This function panics if called within an asynchronous execution
626 /// context.
627 ///
628 /// # Examples
629 ///
630 /// ```
631 /// use std::thread;
632 /// use tokio::runtime::Runtime;
633 /// use tokio::sync::mpsc;
634 ///
635 /// fn main() {
636 /// let (tx, mut rx) = mpsc::channel::<u8>(1);
637 ///
638 /// let sync_code = thread::spawn(move || {
639 /// tx.blocking_send(10).unwrap();
640 /// });
641 ///
642 /// Runtime::new().unwrap().block_on(async move {
643 /// assert_eq!(Some(10), rx.recv().await);
644 /// });
645 /// sync_code.join().unwrap()
646 /// }
647 /// ```
648 #[cfg(feature = "sync")]
blocking_send(&self, value: T) -> Result<(), SendError<T>>649 pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> {
650 crate::future::block_on(self.send(value))
651 }
652
653 /// Checks if the channel has been closed. This happens when the
654 /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is
655 /// called.
656 ///
657 /// [`Receiver`]: crate::sync::mpsc::Receiver
658 /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
659 ///
660 /// ```
661 /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(42);
662 /// assert!(!tx.is_closed());
663 ///
664 /// let tx2 = tx.clone();
665 /// assert!(!tx2.is_closed());
666 ///
667 /// drop(rx);
668 /// assert!(tx.is_closed());
669 /// assert!(tx2.is_closed());
670 /// ```
is_closed(&self) -> bool671 pub fn is_closed(&self) -> bool {
672 self.chan.is_closed()
673 }
674
675 /// Waits for channel capacity. Once capacity to send one message is
676 /// available, it is reserved for the caller.
677 ///
678 /// If the channel is full, the function waits for the number of unreceived
679 /// messages to become less than the channel capacity. Capacity to send one
680 /// message is reserved for the caller. A [`Permit`] is returned to track
681 /// the reserved capacity. The [`send`] function on [`Permit`] consumes the
682 /// reserved capacity.
683 ///
684 /// Dropping [`Permit`] without sending a message releases the capacity back
685 /// to the channel.
686 ///
687 /// [`Permit`]: Permit
688 /// [`send`]: Permit::send
689 ///
690 /// # Cancel safety
691 ///
692 /// This channel uses a queue to ensure that calls to `send` and `reserve`
693 /// complete in the order they were requested. Cancelling a call to
694 /// `reserve` makes you lose your place in the queue.
695 ///
696 /// # Examples
697 ///
698 /// ```
699 /// use tokio::sync::mpsc;
700 ///
701 /// #[tokio::main]
702 /// async fn main() {
703 /// let (tx, mut rx) = mpsc::channel(1);
704 ///
705 /// // Reserve capacity
706 /// let permit = tx.reserve().await.unwrap();
707 ///
708 /// // Trying to send directly on the `tx` will fail due to no
709 /// // available capacity.
710 /// assert!(tx.try_send(123).is_err());
711 ///
712 /// // Sending on the permit succeeds
713 /// permit.send(456);
714 ///
715 /// // The value sent on the permit is received
716 /// assert_eq!(rx.recv().await.unwrap(), 456);
717 /// }
718 /// ```
reserve(&self) -> Result<Permit<'_, T>, SendError<()>>719 pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
720 self.reserve_inner().await?;
721 Ok(Permit { chan: &self.chan })
722 }
723
724 /// Waits for channel capacity, moving the `Sender` and returning an owned
725 /// permit. Once capacity to send one message is available, it is reserved
726 /// for the caller.
727 ///
728 /// This moves the sender _by value_, and returns an owned permit that can
729 /// be used to send a message into the channel. Unlike [`Sender::reserve`],
730 /// this method may be used in cases where the permit must be valid for the
731 /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
732 /// essentially a reference count increment, comparable to [`Arc::clone`]),
733 /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
734 /// moved, it can be cloned prior to calling `reserve_owned`.
735 ///
736 /// If the channel is full, the function waits for the number of unreceived
737 /// messages to become less than the channel capacity. Capacity to send one
738 /// message is reserved for the caller. An [`OwnedPermit`] is returned to
739 /// track the reserved capacity. The [`send`] function on [`OwnedPermit`]
740 /// consumes the reserved capacity.
741 ///
742 /// Dropping the [`OwnedPermit`] without sending a message releases the
743 /// capacity back to the channel.
744 ///
745 /// # Cancel safety
746 ///
747 /// This channel uses a queue to ensure that calls to `send` and `reserve`
748 /// complete in the order they were requested. Cancelling a call to
749 /// `reserve_owned` makes you lose your place in the queue.
750 ///
751 /// # Examples
752 /// Sending a message using an [`OwnedPermit`]:
753 /// ```
754 /// use tokio::sync::mpsc;
755 ///
756 /// #[tokio::main]
757 /// async fn main() {
758 /// let (tx, mut rx) = mpsc::channel(1);
759 ///
760 /// // Reserve capacity, moving the sender.
761 /// let permit = tx.reserve_owned().await.unwrap();
762 ///
763 /// // Send a message, consuming the permit and returning
764 /// // the moved sender.
765 /// let tx = permit.send(123);
766 ///
767 /// // The value sent on the permit is received.
768 /// assert_eq!(rx.recv().await.unwrap(), 123);
769 ///
770 /// // The sender can now be used again.
771 /// tx.send(456).await.unwrap();
772 /// }
773 /// ```
774 ///
775 /// When multiple [`OwnedPermit`]s are needed, or the sender cannot be moved
776 /// by value, it can be inexpensively cloned before calling `reserve_owned`:
777 ///
778 /// ```
779 /// use tokio::sync::mpsc;
780 ///
781 /// #[tokio::main]
782 /// async fn main() {
783 /// let (tx, mut rx) = mpsc::channel(1);
784 ///
785 /// // Clone the sender and reserve capacity.
786 /// let permit = tx.clone().reserve_owned().await.unwrap();
787 ///
788 /// // Trying to send directly on the `tx` will fail due to no
789 /// // available capacity.
790 /// assert!(tx.try_send(123).is_err());
791 ///
792 /// // Sending on the permit succeeds.
793 /// permit.send(456);
794 ///
795 /// // The value sent on the permit is received
796 /// assert_eq!(rx.recv().await.unwrap(), 456);
797 /// }
798 /// ```
799 ///
800 /// [`Sender::reserve`]: Sender::reserve
801 /// [`OwnedPermit`]: OwnedPermit
802 /// [`send`]: OwnedPermit::send
803 /// [`Arc::clone`]: std::sync::Arc::clone
reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>>804 pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> {
805 self.reserve_inner().await?;
806 Ok(OwnedPermit {
807 chan: Some(self.chan),
808 })
809 }
810
reserve_inner(&self) -> Result<(), SendError<()>>811 async fn reserve_inner(&self) -> Result<(), SendError<()>> {
812 match self.chan.semaphore().0.acquire(1).await {
813 Ok(_) => Ok(()),
814 Err(_) => Err(SendError(())),
815 }
816 }
817
818 /// Tries to acquire a slot in the channel without waiting for the slot to become
819 /// available.
820 ///
821 /// If the channel is full this function will return [`TrySendError`], otherwise
822 /// if there is a slot available it will return a [`Permit`] that will then allow you
823 /// to [`send`] on the channel with a guaranteed slot. This function is similar to
824 /// [`reserve`] except it does not await for the slot to become available.
825 ///
826 /// Dropping [`Permit`] without sending a message releases the capacity back
827 /// to the channel.
828 ///
829 /// [`Permit`]: Permit
830 /// [`send`]: Permit::send
831 /// [`reserve`]: Sender::reserve
832 ///
833 /// # Examples
834 ///
835 /// ```
836 /// use tokio::sync::mpsc;
837 ///
838 /// #[tokio::main]
839 /// async fn main() {
840 /// let (tx, mut rx) = mpsc::channel(1);
841 ///
842 /// // Reserve capacity
843 /// let permit = tx.try_reserve().unwrap();
844 ///
845 /// // Trying to send directly on the `tx` will fail due to no
846 /// // available capacity.
847 /// assert!(tx.try_send(123).is_err());
848 ///
849 /// // Trying to reserve an additional slot on the `tx` will
850 /// // fail because there is no capacity.
851 /// assert!(tx.try_reserve().is_err());
852 ///
853 /// // Sending on the permit succeeds
854 /// permit.send(456);
855 ///
856 /// // The value sent on the permit is received
857 /// assert_eq!(rx.recv().await.unwrap(), 456);
858 ///
859 /// }
860 /// ```
try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>>861 pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
862 match self.chan.semaphore().0.try_acquire(1) {
863 Ok(_) => {}
864 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
865 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
866 }
867
868 Ok(Permit { chan: &self.chan })
869 }
870
871 /// Tries to acquire a slot in the channel without waiting for the slot to become
872 /// available, returning an owned permit.
873 ///
874 /// This moves the sender _by value_, and returns an owned permit that can
875 /// be used to send a message into the channel. Unlike [`Sender::try_reserve`],
876 /// this method may be used in cases where the permit must be valid for the
877 /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
878 /// essentially a reference count increment, comparable to [`Arc::clone`]),
879 /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
880 /// moved, it can be cloned prior to calling `try_reserve_owned`.
881 ///
882 /// If the channel is full this function will return a [`TrySendError`].
883 /// Since the sender is taken by value, the `TrySendError` returned in this
884 /// case contains the sender, so that it may be used again. Otherwise, if
885 /// there is a slot available, this method will return an [`OwnedPermit`]
886 /// that can then be used to [`send`] on the channel with a guaranteed slot.
887 /// This function is similar to [`reserve_owned`] except it does not await
888 /// for the slot to become available.
889 ///
890 /// Dropping the [`OwnedPermit`] without sending a message releases the capacity back
891 /// to the channel.
892 ///
893 /// [`OwnedPermit`]: OwnedPermit
894 /// [`send`]: OwnedPermit::send
895 /// [`reserve_owned`]: Sender::reserve_owned
896 /// [`Arc::clone`]: std::sync::Arc::clone
897 ///
898 /// # Examples
899 ///
900 /// ```
901 /// use tokio::sync::mpsc;
902 ///
903 /// #[tokio::main]
904 /// async fn main() {
905 /// let (tx, mut rx) = mpsc::channel(1);
906 ///
907 /// // Reserve capacity
908 /// let permit = tx.clone().try_reserve_owned().unwrap();
909 ///
910 /// // Trying to send directly on the `tx` will fail due to no
911 /// // available capacity.
912 /// assert!(tx.try_send(123).is_err());
913 ///
914 /// // Trying to reserve an additional slot on the `tx` will
915 /// // fail because there is no capacity.
916 /// assert!(tx.try_reserve().is_err());
917 ///
918 /// // Sending on the permit succeeds
919 /// permit.send(456);
920 ///
921 /// // The value sent on the permit is received
922 /// assert_eq!(rx.recv().await.unwrap(), 456);
923 ///
924 /// }
925 /// ```
try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>>926 pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
927 match self.chan.semaphore().0.try_acquire(1) {
928 Ok(_) => {}
929 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(self)),
930 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(self)),
931 }
932
933 Ok(OwnedPermit {
934 chan: Some(self.chan),
935 })
936 }
937
938 /// Returns `true` if senders belong to the same channel.
939 ///
940 /// # Examples
941 ///
942 /// ```
943 /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(1);
944 /// let tx2 = tx.clone();
945 /// assert!(tx.same_channel(&tx2));
946 ///
947 /// let (tx3, rx3) = tokio::sync::mpsc::channel::<()>(1);
948 /// assert!(!tx3.same_channel(&tx2));
949 /// ```
same_channel(&self, other: &Self) -> bool950 pub fn same_channel(&self, other: &Self) -> bool {
951 self.chan.same_channel(&other.chan)
952 }
953
954 /// Returns the current capacity of the channel.
955 ///
956 /// The capacity goes down when sending a value by calling [`send`] or by reserving capacity
957 /// with [`reserve`]. The capacity goes up when values are received by the [`Receiver`].
958 ///
959 /// # Examples
960 ///
961 /// ```
962 /// use tokio::sync::mpsc;
963 ///
964 /// #[tokio::main]
965 /// async fn main() {
966 /// let (tx, mut rx) = mpsc::channel::<()>(5);
967 ///
968 /// assert_eq!(tx.capacity(), 5);
969 ///
970 /// // Making a reservation drops the capacity by one.
971 /// let permit = tx.reserve().await.unwrap();
972 /// assert_eq!(tx.capacity(), 4);
973 ///
974 /// // Sending and receiving a value increases the capacity by one.
975 /// permit.send(());
976 /// rx.recv().await.unwrap();
977 /// assert_eq!(tx.capacity(), 5);
978 /// }
979 /// ```
980 ///
981 /// [`send`]: Sender::send
982 /// [`reserve`]: Sender::reserve
capacity(&self) -> usize983 pub fn capacity(&self) -> usize {
984 self.chan.semaphore().0.available_permits()
985 }
986 }
987
988 impl<T> Clone for Sender<T> {
clone(&self) -> Self989 fn clone(&self) -> Self {
990 Sender {
991 chan: self.chan.clone(),
992 }
993 }
994 }
995
996 impl<T> fmt::Debug for Sender<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result997 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
998 fmt.debug_struct("Sender")
999 .field("chan", &self.chan)
1000 .finish()
1001 }
1002 }
1003
1004 // ===== impl Permit =====
1005
1006 impl<T> Permit<'_, T> {
1007 /// Sends a value using the reserved capacity.
1008 ///
1009 /// Capacity for the message has already been reserved. The message is sent
1010 /// to the receiver and the permit is consumed. The operation will succeed
1011 /// even if the receiver half has been closed. See [`Receiver::close`] for
1012 /// more details on performing a clean shutdown.
1013 ///
1014 /// [`Receiver::close`]: Receiver::close
1015 ///
1016 /// # Examples
1017 ///
1018 /// ```
1019 /// use tokio::sync::mpsc;
1020 ///
1021 /// #[tokio::main]
1022 /// async fn main() {
1023 /// let (tx, mut rx) = mpsc::channel(1);
1024 ///
1025 /// // Reserve capacity
1026 /// let permit = tx.reserve().await.unwrap();
1027 ///
1028 /// // Trying to send directly on the `tx` will fail due to no
1029 /// // available capacity.
1030 /// assert!(tx.try_send(123).is_err());
1031 ///
1032 /// // Send a message on the permit
1033 /// permit.send(456);
1034 ///
1035 /// // The value sent on the permit is received
1036 /// assert_eq!(rx.recv().await.unwrap(), 456);
1037 /// }
1038 /// ```
send(self, value: T)1039 pub fn send(self, value: T) {
1040 use std::mem;
1041
1042 self.chan.send(value);
1043
1044 // Avoid the drop logic
1045 mem::forget(self);
1046 }
1047 }
1048
1049 impl<T> Drop for Permit<'_, T> {
drop(&mut self)1050 fn drop(&mut self) {
1051 use chan::Semaphore;
1052
1053 let semaphore = self.chan.semaphore();
1054
1055 // Add the permit back to the semaphore
1056 semaphore.add_permit();
1057
1058 // If this is the last sender for this channel, wake the receiver so
1059 // that it can be notified that the channel is closed.
1060 if semaphore.is_closed() && semaphore.is_idle() {
1061 self.chan.wake_rx();
1062 }
1063 }
1064 }
1065
1066 impl<T> fmt::Debug for Permit<'_, T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1067 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1068 fmt.debug_struct("Permit")
1069 .field("chan", &self.chan)
1070 .finish()
1071 }
1072 }
1073
1074 // ===== impl Permit =====
1075
1076 impl<T> OwnedPermit<T> {
1077 /// Sends a value using the reserved capacity.
1078 ///
1079 /// Capacity for the message has already been reserved. The message is sent
1080 /// to the receiver and the permit is consumed. The operation will succeed
1081 /// even if the receiver half has been closed. See [`Receiver::close`] for
1082 /// more details on performing a clean shutdown.
1083 ///
1084 /// Unlike [`Permit::send`], this method returns the [`Sender`] from which
1085 /// the `OwnedPermit` was reserved.
1086 ///
1087 /// [`Receiver::close`]: Receiver::close
1088 ///
1089 /// # Examples
1090 ///
1091 /// ```
1092 /// use tokio::sync::mpsc;
1093 ///
1094 /// #[tokio::main]
1095 /// async fn main() {
1096 /// let (tx, mut rx) = mpsc::channel(1);
1097 ///
1098 /// // Reserve capacity
1099 /// let permit = tx.reserve_owned().await.unwrap();
1100 ///
1101 /// // Send a message on the permit, returning the sender.
1102 /// let tx = permit.send(456);
1103 ///
1104 /// // The value sent on the permit is received
1105 /// assert_eq!(rx.recv().await.unwrap(), 456);
1106 ///
1107 /// // We may now reuse `tx` to send another message.
1108 /// tx.send(789).await.unwrap();
1109 /// }
1110 /// ```
send(mut self, value: T) -> Sender<T>1111 pub fn send(mut self, value: T) -> Sender<T> {
1112 let chan = self.chan.take().unwrap_or_else(|| {
1113 unreachable!("OwnedPermit channel is only taken when the permit is moved")
1114 });
1115 chan.send(value);
1116
1117 Sender { chan }
1118 }
1119
1120 /// Releases the reserved capacity *without* sending a message, returning the
1121 /// [`Sender`].
1122 ///
1123 /// # Examples
1124 ///
1125 /// ```
1126 /// use tokio::sync::mpsc;
1127 ///
1128 /// #[tokio::main]
1129 /// async fn main() {
1130 /// let (tx, rx) = mpsc::channel(1);
1131 ///
1132 /// // Clone the sender and reserve capacity
1133 /// let permit = tx.clone().reserve_owned().await.unwrap();
1134 ///
1135 /// // Trying to send on the original `tx` will fail, since the `permit`
1136 /// // has reserved all the available capacity.
1137 /// assert!(tx.try_send(123).is_err());
1138 ///
1139 /// // Release the permit without sending a message, returning the clone
1140 /// // of the sender.
1141 /// let tx2 = permit.release();
1142 ///
1143 /// // We may now reuse `tx` to send another message.
1144 /// tx.send(789).await.unwrap();
1145 /// # drop(rx); drop(tx2);
1146 /// }
1147 /// ```
1148 ///
1149 /// [`Sender`]: Sender
release(mut self) -> Sender<T>1150 pub fn release(mut self) -> Sender<T> {
1151 use chan::Semaphore;
1152
1153 let chan = self.chan.take().unwrap_or_else(|| {
1154 unreachable!("OwnedPermit channel is only taken when the permit is moved")
1155 });
1156
1157 // Add the permit back to the semaphore
1158 chan.semaphore().add_permit();
1159 Sender { chan }
1160 }
1161 }
1162
1163 impl<T> Drop for OwnedPermit<T> {
drop(&mut self)1164 fn drop(&mut self) {
1165 use chan::Semaphore;
1166
1167 // Are we still holding onto the sender?
1168 if let Some(chan) = self.chan.take() {
1169 let semaphore = chan.semaphore();
1170
1171 // Add the permit back to the semaphore
1172 semaphore.add_permit();
1173
1174 // If this `OwnedPermit` is holding the last sender for this
1175 // channel, wake the receiver so that it can be notified that the
1176 // channel is closed.
1177 if semaphore.is_closed() && semaphore.is_idle() {
1178 chan.wake_rx();
1179 }
1180 }
1181
1182 // Otherwise, do nothing.
1183 }
1184 }
1185
1186 impl<T> fmt::Debug for OwnedPermit<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1187 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1188 fmt.debug_struct("OwnedPermit")
1189 .field("chan", &self.chan)
1190 .finish()
1191 }
1192 }
1193