1 //! A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
2 //! all consumers.
3 //!
4 //! A [`Sender`] is used to broadcast values to **all** connected [`Receiver`]
5 //! values. [`Sender`] handles are clone-able, allowing concurrent send and
6 //! receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as
7 //! long as `T` is `Send`.
8 //!
9 //! When a value is sent, **all** [`Receiver`] handles are notified and will
10 //! receive the value. The value is stored once inside the channel and cloned on
11 //! demand for each receiver. Once all receivers have received a clone of the
12 //! value, the value is released from the channel.
13 //!
14 //! A channel is created by calling [`channel`], specifying the maximum number
15 //! of messages the channel can retain at any given time.
16 //!
17 //! New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The
18 //! returned [`Receiver`] will receive values sent **after** the call to
19 //! `subscribe`.
20 //!
21 //! This channel is also suitable for the single-producer multi-consumer
22 //! use-case, where a single sender broadcasts values to many receivers.
23 //!
24 //! ## Lagging
25 //!
26 //! As sent messages must be retained until **all** [`Receiver`] handles receive
27 //! a clone, broadcast channels are susceptible to the "slow receiver" problem.
28 //! In this case, all but one receiver are able to receive values at the rate
29 //! they are sent. Because one receiver is stalled, the channel starts to fill
30 //! up.
31 //!
32 //! This broadcast channel implementation handles this case by setting a hard
33 //! upper bound on the number of values the channel may retain at any given
34 //! time. This upper bound is passed to the [`channel`] function as an argument.
35 //!
36 //! If a value is sent when the channel is at capacity, the oldest value
37 //! currently held by the channel is released. This frees up space for the new
38 //! value. Any receiver that has not yet seen the released value will return
39 //! [`RecvError::Lagged`] the next time [`recv`] is called.
40 //!
41 //! Once [`RecvError::Lagged`] is returned, the lagging receiver's position is
42 //! updated to the oldest value contained by the channel. The next call to
43 //! [`recv`] will return this value.
44 //!
45 //! This behavior enables a receiver to detect when it has lagged so far behind
46 //! that data has been dropped. The caller may decide how to respond to this:
47 //! either by aborting its task or by tolerating lost messages and resuming
48 //! consumption of the channel.
49 //!
50 //! ## Closing
51 //!
52 //! When **all** [`Sender`] handles have been dropped, no new values may be
53 //! sent. At this point, the channel is "closed". Once a receiver has received
54 //! all values retained by the channel, the next call to [`recv`] will return
55 //! with [`RecvError::Closed`].
56 //!
57 //! When a [`Receiver`] handle is dropped, any messages not read by the receiver
58 //! will be marked as read. If this receiver was the only one not to have read
59 //! that message, the message will be dropped at this point.
60 //!
61 //! [`Sender`]: crate::sync::broadcast::Sender
62 //! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
63 //! [`Receiver`]: crate::sync::broadcast::Receiver
64 //! [`channel`]: crate::sync::broadcast::channel
65 //! [`RecvError::Lagged`]: crate::sync::broadcast::error::RecvError::Lagged
66 //! [`RecvError::Closed`]: crate::sync::broadcast::error::RecvError::Closed
67 //! [`recv`]: crate::sync::broadcast::Receiver::recv
68 //!
69 //! # Examples
70 //!
71 //! Basic usage
72 //!
73 //! ```
74 //! use tokio::sync::broadcast;
75 //!
76 //! #[tokio::main]
77 //! async fn main() {
78 //! let (tx, mut rx1) = broadcast::channel(16);
79 //! let mut rx2 = tx.subscribe();
80 //!
81 //! tokio::spawn(async move {
82 //! assert_eq!(rx1.recv().await.unwrap(), 10);
83 //! assert_eq!(rx1.recv().await.unwrap(), 20);
84 //! });
85 //!
86 //! tokio::spawn(async move {
87 //! assert_eq!(rx2.recv().await.unwrap(), 10);
88 //! assert_eq!(rx2.recv().await.unwrap(), 20);
89 //! });
90 //!
91 //! tx.send(10).unwrap();
92 //! tx.send(20).unwrap();
93 //! }
94 //! ```
95 //!
96 //! Handling lag
97 //!
98 //! ```
99 //! use tokio::sync::broadcast;
100 //!
101 //! #[tokio::main]
102 //! async fn main() {
103 //! let (tx, mut rx) = broadcast::channel(2);
104 //!
105 //! tx.send(10).unwrap();
106 //! tx.send(20).unwrap();
107 //! tx.send(30).unwrap();
108 //!
109 //! // The receiver lagged behind
110 //! assert!(rx.recv().await.is_err());
111 //!
112 //! // At this point, we can abort or continue with lost messages
113 //!
114 //! assert_eq!(20, rx.recv().await.unwrap());
115 //! assert_eq!(30, rx.recv().await.unwrap());
116 //! }
117 //! ```
118
119 use crate::loom::cell::UnsafeCell;
120 use crate::loom::sync::atomic::AtomicUsize;
121 use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
122 use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
123 use crate::util::WakeList;
124
125 use std::fmt;
126 use std::future::Future;
127 use std::marker::PhantomPinned;
128 use std::pin::Pin;
129 use std::ptr::NonNull;
130 use std::sync::atomic::Ordering::SeqCst;
131 use std::task::{Context, Poll, Waker};
132 use std::usize;
133
134 /// Sending-half of the [`broadcast`] channel.
135 ///
136 /// May be used from many threads. Messages can be sent with
137 /// [`send`][Sender::send].
138 ///
139 /// # Examples
140 ///
141 /// ```
142 /// use tokio::sync::broadcast;
143 ///
144 /// #[tokio::main]
145 /// async fn main() {
146 /// let (tx, mut rx1) = broadcast::channel(16);
147 /// let mut rx2 = tx.subscribe();
148 ///
149 /// tokio::spawn(async move {
150 /// assert_eq!(rx1.recv().await.unwrap(), 10);
151 /// assert_eq!(rx1.recv().await.unwrap(), 20);
152 /// });
153 ///
154 /// tokio::spawn(async move {
155 /// assert_eq!(rx2.recv().await.unwrap(), 10);
156 /// assert_eq!(rx2.recv().await.unwrap(), 20);
157 /// });
158 ///
159 /// tx.send(10).unwrap();
160 /// tx.send(20).unwrap();
161 /// }
162 /// ```
163 ///
164 /// [`broadcast`]: crate::sync::broadcast
165 pub struct Sender<T> {
166 shared: Arc<Shared<T>>,
167 }
168
169 /// Receiving-half of the [`broadcast`] channel.
170 ///
171 /// Must not be used concurrently. Messages may be retrieved using
172 /// [`recv`][Receiver::recv].
173 ///
174 /// To turn this receiver into a `Stream`, you can use the [`BroadcastStream`]
175 /// wrapper.
176 ///
177 /// [`BroadcastStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.BroadcastStream.html
178 ///
179 /// # Examples
180 ///
181 /// ```
182 /// use tokio::sync::broadcast;
183 ///
184 /// #[tokio::main]
185 /// async fn main() {
186 /// let (tx, mut rx1) = broadcast::channel(16);
187 /// let mut rx2 = tx.subscribe();
188 ///
189 /// tokio::spawn(async move {
190 /// assert_eq!(rx1.recv().await.unwrap(), 10);
191 /// assert_eq!(rx1.recv().await.unwrap(), 20);
192 /// });
193 ///
194 /// tokio::spawn(async move {
195 /// assert_eq!(rx2.recv().await.unwrap(), 10);
196 /// assert_eq!(rx2.recv().await.unwrap(), 20);
197 /// });
198 ///
199 /// tx.send(10).unwrap();
200 /// tx.send(20).unwrap();
201 /// }
202 /// ```
203 ///
204 /// [`broadcast`]: crate::sync::broadcast
205 pub struct Receiver<T> {
206 /// State shared with all receivers and senders.
207 shared: Arc<Shared<T>>,
208
209 /// Next position to read from
210 next: u64,
211 }
212
213 pub mod error {
214 //! Broadcast error types
215
216 use std::fmt;
217
218 /// Error returned by from the [`send`] function on a [`Sender`].
219 ///
220 /// A **send** operation can only fail if there are no active receivers,
221 /// implying that the message could never be received. The error contains the
222 /// message being sent as a payload so it can be recovered.
223 ///
224 /// [`send`]: crate::sync::broadcast::Sender::send
225 /// [`Sender`]: crate::sync::broadcast::Sender
226 #[derive(Debug)]
227 pub struct SendError<T>(pub T);
228
229 impl<T> fmt::Display for SendError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result230 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
231 write!(f, "channel closed")
232 }
233 }
234
235 impl<T: fmt::Debug> std::error::Error for SendError<T> {}
236
237 /// An error returned from the [`recv`] function on a [`Receiver`].
238 ///
239 /// [`recv`]: crate::sync::broadcast::Receiver::recv
240 /// [`Receiver`]: crate::sync::broadcast::Receiver
241 #[derive(Debug, PartialEq, Eq, Clone)]
242 pub enum RecvError {
243 /// There are no more active senders implying no further messages will ever
244 /// be sent.
245 Closed,
246
247 /// The receiver lagged too far behind. Attempting to receive again will
248 /// return the oldest message still retained by the channel.
249 ///
250 /// Includes the number of skipped messages.
251 Lagged(u64),
252 }
253
254 impl fmt::Display for RecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result255 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
256 match self {
257 RecvError::Closed => write!(f, "channel closed"),
258 RecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
259 }
260 }
261 }
262
263 impl std::error::Error for RecvError {}
264
265 /// An error returned from the [`try_recv`] function on a [`Receiver`].
266 ///
267 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
268 /// [`Receiver`]: crate::sync::broadcast::Receiver
269 #[derive(Debug, PartialEq, Eq, Clone)]
270 pub enum TryRecvError {
271 /// The channel is currently empty. There are still active
272 /// [`Sender`] handles, so data may yet become available.
273 ///
274 /// [`Sender`]: crate::sync::broadcast::Sender
275 Empty,
276
277 /// There are no more active senders implying no further messages will ever
278 /// be sent.
279 Closed,
280
281 /// The receiver lagged too far behind and has been forcibly disconnected.
282 /// Attempting to receive again will return the oldest message still
283 /// retained by the channel.
284 ///
285 /// Includes the number of skipped messages.
286 Lagged(u64),
287 }
288
289 impl fmt::Display for TryRecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result290 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
291 match self {
292 TryRecvError::Empty => write!(f, "channel empty"),
293 TryRecvError::Closed => write!(f, "channel closed"),
294 TryRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
295 }
296 }
297 }
298
299 impl std::error::Error for TryRecvError {}
300 }
301
302 use self::error::*;
303
304 /// Data shared between senders and receivers.
305 struct Shared<T> {
306 /// slots in the channel.
307 buffer: Box<[RwLock<Slot<T>>]>,
308
309 /// Mask a position -> index.
310 mask: usize,
311
312 /// Tail of the queue. Includes the rx wait list.
313 tail: Mutex<Tail>,
314
315 /// Number of outstanding Sender handles.
316 num_tx: AtomicUsize,
317 }
318
319 /// Next position to write a value.
320 struct Tail {
321 /// Next position to write to.
322 pos: u64,
323
324 /// Number of active receivers.
325 rx_cnt: usize,
326
327 /// True if the channel is closed.
328 closed: bool,
329
330 /// Receivers waiting for a value.
331 waiters: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
332 }
333
334 /// Slot in the buffer.
335 struct Slot<T> {
336 /// Remaining number of receivers that are expected to see this value.
337 ///
338 /// When this goes to zero, the value is released.
339 ///
340 /// An atomic is used as it is mutated concurrently with the slot read lock
341 /// acquired.
342 rem: AtomicUsize,
343
344 /// Uniquely identifies the `send` stored in the slot.
345 pos: u64,
346
347 /// The value being broadcast.
348 ///
349 /// The value is set by `send` when the write lock is held. When a reader
350 /// drops, `rem` is decremented. When it hits zero, the value is dropped.
351 val: UnsafeCell<Option<T>>,
352 }
353
354 /// An entry in the wait queue.
355 struct Waiter {
356 /// True if queued.
357 queued: bool,
358
359 /// Task waiting on the broadcast channel.
360 waker: Option<Waker>,
361
362 /// Intrusive linked-list pointers.
363 pointers: linked_list::Pointers<Waiter>,
364
365 /// Should not be `Unpin`.
366 _p: PhantomPinned,
367 }
368
369 impl Waiter {
new() -> Self370 fn new() -> Self {
371 Self {
372 queued: false,
373 waker: None,
374 pointers: linked_list::Pointers::new(),
375 _p: PhantomPinned,
376 }
377 }
378 }
379
380 generate_addr_of_methods! {
381 impl<> Waiter {
382 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
383 &self.pointers
384 }
385 }
386 }
387
388 struct RecvGuard<'a, T> {
389 slot: RwLockReadGuard<'a, Slot<T>>,
390 }
391
392 /// Receive a value future.
393 struct Recv<'a, T> {
394 /// Receiver being waited on.
395 receiver: &'a mut Receiver<T>,
396
397 /// Entry in the waiter `LinkedList`.
398 waiter: UnsafeCell<Waiter>,
399 }
400
401 unsafe impl<'a, T: Send> Send for Recv<'a, T> {}
402 unsafe impl<'a, T: Send> Sync for Recv<'a, T> {}
403
404 /// Max number of receivers. Reserve space to lock.
405 const MAX_RECEIVERS: usize = usize::MAX >> 2;
406
407 /// Create a bounded, multi-producer, multi-consumer channel where each sent
408 /// value is broadcasted to all active receivers.
409 ///
410 /// All data sent on [`Sender`] will become available on every active
411 /// [`Receiver`] in the same order as it was sent.
412 ///
413 /// The `Sender` can be cloned to `send` to the same channel from multiple
414 /// points in the process or it can be used concurrently from an `Arc`. New
415 /// `Receiver` handles are created by calling [`Sender::subscribe`].
416 ///
417 /// If all [`Receiver`] handles are dropped, the `send` method will return a
418 /// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`]
419 /// method will return a [`RecvError`].
420 ///
421 /// [`Sender`]: crate::sync::broadcast::Sender
422 /// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
423 /// [`Receiver`]: crate::sync::broadcast::Receiver
424 /// [`recv`]: crate::sync::broadcast::Receiver::recv
425 /// [`SendError`]: crate::sync::broadcast::error::SendError
426 /// [`RecvError`]: crate::sync::broadcast::error::RecvError
427 ///
428 /// # Examples
429 ///
430 /// ```
431 /// use tokio::sync::broadcast;
432 ///
433 /// #[tokio::main]
434 /// async fn main() {
435 /// let (tx, mut rx1) = broadcast::channel(16);
436 /// let mut rx2 = tx.subscribe();
437 ///
438 /// tokio::spawn(async move {
439 /// assert_eq!(rx1.recv().await.unwrap(), 10);
440 /// assert_eq!(rx1.recv().await.unwrap(), 20);
441 /// });
442 ///
443 /// tokio::spawn(async move {
444 /// assert_eq!(rx2.recv().await.unwrap(), 10);
445 /// assert_eq!(rx2.recv().await.unwrap(), 20);
446 /// });
447 ///
448 /// tx.send(10).unwrap();
449 /// tx.send(20).unwrap();
450 /// }
451 /// ```
452 ///
453 /// # Panics
454 ///
455 /// This will panic if `capacity` is equal to `0` or larger
456 /// than `usize::MAX / 2`.
457 #[track_caller]
channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>)458 pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
459 // SAFETY: In the line below we are creating one extra receiver, so there will be 1 in total.
460 let tx = unsafe { Sender::new_with_receiver_count(1, capacity) };
461 let rx = Receiver {
462 shared: tx.shared.clone(),
463 next: 0,
464 };
465 (tx, rx)
466 }
467
468 unsafe impl<T: Send> Send for Sender<T> {}
469 unsafe impl<T: Send> Sync for Sender<T> {}
470
471 unsafe impl<T: Send> Send for Receiver<T> {}
472 unsafe impl<T: Send> Sync for Receiver<T> {}
473
474 impl<T> Sender<T> {
475 /// Creates the sending-half of the [`broadcast`] channel.
476 ///
477 /// See documentation of [`broadcast::channel`] for errors when calling this function.
478 ///
479 /// [`broadcast`]: crate::sync::broadcast
480 /// [`broadcast::channel`]: crate::sync::broadcast
481 #[track_caller]
new(capacity: usize) -> Self482 pub fn new(capacity: usize) -> Self {
483 // SAFETY: We don't create extra receivers, so there are 0.
484 unsafe { Self::new_with_receiver_count(0, capacity) }
485 }
486
487 /// Creates the sending-half of the [`broadcast`](self) channel, and provide the receiver
488 /// count.
489 ///
490 /// See the documentation of [`broadcast::channel`](self::channel) for more errors when
491 /// calling this function.
492 ///
493 /// # Safety:
494 ///
495 /// The caller must ensure that the amount of receivers for this Sender is correct before
496 /// the channel functionalities are used, the count is zero by default, as this function
497 /// does not create any receivers by itself.
498 #[track_caller]
new_with_receiver_count(receiver_count: usize, mut capacity: usize) -> Self499 unsafe fn new_with_receiver_count(receiver_count: usize, mut capacity: usize) -> Self {
500 assert!(capacity > 0, "broadcast channel capacity cannot be zero");
501 assert!(
502 capacity <= usize::MAX >> 1,
503 "broadcast channel capacity exceeded `usize::MAX / 2`"
504 );
505
506 // Round to a power of two
507 capacity = capacity.next_power_of_two();
508
509 let mut buffer = Vec::with_capacity(capacity);
510
511 for i in 0..capacity {
512 buffer.push(RwLock::new(Slot {
513 rem: AtomicUsize::new(0),
514 pos: (i as u64).wrapping_sub(capacity as u64),
515 val: UnsafeCell::new(None),
516 }));
517 }
518
519 let shared = Arc::new(Shared {
520 buffer: buffer.into_boxed_slice(),
521 mask: capacity - 1,
522 tail: Mutex::new(Tail {
523 pos: 0,
524 rx_cnt: receiver_count,
525 closed: false,
526 waiters: LinkedList::new(),
527 }),
528 num_tx: AtomicUsize::new(1),
529 });
530
531 Sender { shared }
532 }
533
534 /// Attempts to send a value to all active [`Receiver`] handles, returning
535 /// it back if it could not be sent.
536 ///
537 /// A successful send occurs when there is at least one active [`Receiver`]
538 /// handle. An unsuccessful send would be one where all associated
539 /// [`Receiver`] handles have already been dropped.
540 ///
541 /// # Return
542 ///
543 /// On success, the number of subscribed [`Receiver`] handles is returned.
544 /// This does not mean that this number of receivers will see the message as
545 /// a receiver may drop or lag ([see lagging](self#lagging)) before receiving
546 /// the message.
547 ///
548 /// # Note
549 ///
550 /// A return value of `Ok` **does not** mean that the sent value will be
551 /// observed by all or any of the active [`Receiver`] handles. [`Receiver`]
552 /// handles may be dropped before receiving the sent message.
553 ///
554 /// A return value of `Err` **does not** mean that future calls to `send`
555 /// will fail. New [`Receiver`] handles may be created by calling
556 /// [`subscribe`].
557 ///
558 /// [`Receiver`]: crate::sync::broadcast::Receiver
559 /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
560 ///
561 /// # Examples
562 ///
563 /// ```
564 /// use tokio::sync::broadcast;
565 ///
566 /// #[tokio::main]
567 /// async fn main() {
568 /// let (tx, mut rx1) = broadcast::channel(16);
569 /// let mut rx2 = tx.subscribe();
570 ///
571 /// tokio::spawn(async move {
572 /// assert_eq!(rx1.recv().await.unwrap(), 10);
573 /// assert_eq!(rx1.recv().await.unwrap(), 20);
574 /// });
575 ///
576 /// tokio::spawn(async move {
577 /// assert_eq!(rx2.recv().await.unwrap(), 10);
578 /// assert_eq!(rx2.recv().await.unwrap(), 20);
579 /// });
580 ///
581 /// tx.send(10).unwrap();
582 /// tx.send(20).unwrap();
583 /// }
584 /// ```
send(&self, value: T) -> Result<usize, SendError<T>>585 pub fn send(&self, value: T) -> Result<usize, SendError<T>> {
586 let mut tail = self.shared.tail.lock();
587
588 if tail.rx_cnt == 0 {
589 return Err(SendError(value));
590 }
591
592 // Position to write into
593 let pos = tail.pos;
594 let rem = tail.rx_cnt;
595 let idx = (pos & self.shared.mask as u64) as usize;
596
597 // Update the tail position
598 tail.pos = tail.pos.wrapping_add(1);
599
600 // Get the slot
601 let mut slot = self.shared.buffer[idx].write().unwrap();
602
603 // Track the position
604 slot.pos = pos;
605
606 // Set remaining receivers
607 slot.rem.with_mut(|v| *v = rem);
608
609 // Write the value
610 slot.val = UnsafeCell::new(Some(value));
611
612 // Release the slot lock before notifying the receivers.
613 drop(slot);
614
615 // Notify and release the mutex. This must happen after the slot lock is
616 // released, otherwise the writer lock bit could be cleared while another
617 // thread is in the critical section.
618 self.shared.notify_rx(tail);
619
620 Ok(rem)
621 }
622
623 /// Creates a new [`Receiver`] handle that will receive values sent **after**
624 /// this call to `subscribe`.
625 ///
626 /// # Examples
627 ///
628 /// ```
629 /// use tokio::sync::broadcast;
630 ///
631 /// #[tokio::main]
632 /// async fn main() {
633 /// let (tx, _rx) = broadcast::channel(16);
634 ///
635 /// // Will not be seen
636 /// tx.send(10).unwrap();
637 ///
638 /// let mut rx = tx.subscribe();
639 ///
640 /// tx.send(20).unwrap();
641 ///
642 /// let value = rx.recv().await.unwrap();
643 /// assert_eq!(20, value);
644 /// }
645 /// ```
subscribe(&self) -> Receiver<T>646 pub fn subscribe(&self) -> Receiver<T> {
647 let shared = self.shared.clone();
648 new_receiver(shared)
649 }
650
651 /// Returns the number of queued values.
652 ///
653 /// A value is queued until it has either been seen by all receivers that were alive at the time
654 /// it was sent, or has been evicted from the queue by subsequent sends that exceeded the
655 /// queue's capacity.
656 ///
657 /// # Note
658 ///
659 /// In contrast to [`Receiver::len`], this method only reports queued values and not values that
660 /// have been evicted from the queue before being seen by all receivers.
661 ///
662 /// # Examples
663 ///
664 /// ```
665 /// use tokio::sync::broadcast;
666 ///
667 /// #[tokio::main]
668 /// async fn main() {
669 /// let (tx, mut rx1) = broadcast::channel(16);
670 /// let mut rx2 = tx.subscribe();
671 ///
672 /// tx.send(10).unwrap();
673 /// tx.send(20).unwrap();
674 /// tx.send(30).unwrap();
675 ///
676 /// assert_eq!(tx.len(), 3);
677 ///
678 /// rx1.recv().await.unwrap();
679 ///
680 /// // The len is still 3 since rx2 hasn't seen the first value yet.
681 /// assert_eq!(tx.len(), 3);
682 ///
683 /// rx2.recv().await.unwrap();
684 ///
685 /// assert_eq!(tx.len(), 2);
686 /// }
687 /// ```
len(&self) -> usize688 pub fn len(&self) -> usize {
689 let tail = self.shared.tail.lock();
690
691 let base_idx = (tail.pos & self.shared.mask as u64) as usize;
692 let mut low = 0;
693 let mut high = self.shared.buffer.len();
694 while low < high {
695 let mid = low + (high - low) / 2;
696 let idx = base_idx.wrapping_add(mid) & self.shared.mask;
697 if self.shared.buffer[idx].read().unwrap().rem.load(SeqCst) == 0 {
698 low = mid + 1;
699 } else {
700 high = mid;
701 }
702 }
703
704 self.shared.buffer.len() - low
705 }
706
707 /// Returns true if there are no queued values.
708 ///
709 /// # Examples
710 ///
711 /// ```
712 /// use tokio::sync::broadcast;
713 ///
714 /// #[tokio::main]
715 /// async fn main() {
716 /// let (tx, mut rx1) = broadcast::channel(16);
717 /// let mut rx2 = tx.subscribe();
718 ///
719 /// assert!(tx.is_empty());
720 ///
721 /// tx.send(10).unwrap();
722 ///
723 /// assert!(!tx.is_empty());
724 ///
725 /// rx1.recv().await.unwrap();
726 ///
727 /// // The queue is still not empty since rx2 hasn't seen the value.
728 /// assert!(!tx.is_empty());
729 ///
730 /// rx2.recv().await.unwrap();
731 ///
732 /// assert!(tx.is_empty());
733 /// }
734 /// ```
is_empty(&self) -> bool735 pub fn is_empty(&self) -> bool {
736 let tail = self.shared.tail.lock();
737
738 let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize;
739 self.shared.buffer[idx].read().unwrap().rem.load(SeqCst) == 0
740 }
741
742 /// Returns the number of active receivers
743 ///
744 /// An active receiver is a [`Receiver`] handle returned from [`channel`] or
745 /// [`subscribe`]. These are the handles that will receive values sent on
746 /// this [`Sender`].
747 ///
748 /// # Note
749 ///
750 /// It is not guaranteed that a sent message will reach this number of
751 /// receivers. Active receivers may never call [`recv`] again before
752 /// dropping.
753 ///
754 /// [`recv`]: crate::sync::broadcast::Receiver::recv
755 /// [`Receiver`]: crate::sync::broadcast::Receiver
756 /// [`Sender`]: crate::sync::broadcast::Sender
757 /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
758 /// [`channel`]: crate::sync::broadcast::channel
759 ///
760 /// # Examples
761 ///
762 /// ```
763 /// use tokio::sync::broadcast;
764 ///
765 /// #[tokio::main]
766 /// async fn main() {
767 /// let (tx, _rx1) = broadcast::channel(16);
768 ///
769 /// assert_eq!(1, tx.receiver_count());
770 ///
771 /// let mut _rx2 = tx.subscribe();
772 ///
773 /// assert_eq!(2, tx.receiver_count());
774 ///
775 /// tx.send(10).unwrap();
776 /// }
777 /// ```
receiver_count(&self) -> usize778 pub fn receiver_count(&self) -> usize {
779 let tail = self.shared.tail.lock();
780 tail.rx_cnt
781 }
782
783 /// Returns `true` if senders belong to the same channel.
784 ///
785 /// # Examples
786 ///
787 /// ```
788 /// use tokio::sync::broadcast;
789 ///
790 /// #[tokio::main]
791 /// async fn main() {
792 /// let (tx, _rx) = broadcast::channel::<()>(16);
793 /// let tx2 = tx.clone();
794 ///
795 /// assert!(tx.same_channel(&tx2));
796 ///
797 /// let (tx3, _rx3) = broadcast::channel::<()>(16);
798 ///
799 /// assert!(!tx3.same_channel(&tx2));
800 /// }
801 /// ```
same_channel(&self, other: &Self) -> bool802 pub fn same_channel(&self, other: &Self) -> bool {
803 Arc::ptr_eq(&self.shared, &other.shared)
804 }
805
close_channel(&self)806 fn close_channel(&self) {
807 let mut tail = self.shared.tail.lock();
808 tail.closed = true;
809
810 self.shared.notify_rx(tail);
811 }
812 }
813
814 /// Create a new `Receiver` which reads starting from the tail.
new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T>815 fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
816 let mut tail = shared.tail.lock();
817
818 if tail.rx_cnt == MAX_RECEIVERS {
819 panic!("max receivers");
820 }
821
822 tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
823
824 let next = tail.pos;
825
826 drop(tail);
827
828 Receiver { shared, next }
829 }
830
831 /// List used in `Shared::notify_rx`. It wraps a guarded linked list
832 /// and gates the access to it on the `Shared.tail` mutex. It also empties
833 /// the list on drop.
834 struct WaitersList<'a, T> {
835 list: GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
836 is_empty: bool,
837 shared: &'a Shared<T>,
838 }
839
840 impl<'a, T> Drop for WaitersList<'a, T> {
drop(&mut self)841 fn drop(&mut self) {
842 // If the list is not empty, we unlink all waiters from it.
843 // We do not wake the waiters to avoid double panics.
844 if !self.is_empty {
845 let _lock_guard = self.shared.tail.lock();
846 while self.list.pop_back().is_some() {}
847 }
848 }
849 }
850
851 impl<'a, T> WaitersList<'a, T> {
new( unguarded_list: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>, guard: Pin<&'a Waiter>, shared: &'a Shared<T>, ) -> Self852 fn new(
853 unguarded_list: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
854 guard: Pin<&'a Waiter>,
855 shared: &'a Shared<T>,
856 ) -> Self {
857 let guard_ptr = NonNull::from(guard.get_ref());
858 let list = unguarded_list.into_guarded(guard_ptr);
859 WaitersList {
860 list,
861 is_empty: false,
862 shared,
863 }
864 }
865
866 /// Removes the last element from the guarded list. Modifying this list
867 /// requires an exclusive access to the main list in `Notify`.
pop_back_locked(&mut self, _tail: &mut Tail) -> Option<NonNull<Waiter>>868 fn pop_back_locked(&mut self, _tail: &mut Tail) -> Option<NonNull<Waiter>> {
869 let result = self.list.pop_back();
870 if result.is_none() {
871 // Save information about emptiness to avoid waiting for lock
872 // in the destructor.
873 self.is_empty = true;
874 }
875 result
876 }
877 }
878
879 impl<T> Shared<T> {
notify_rx<'a, 'b: 'a>(&'b self, mut tail: MutexGuard<'a, Tail>)880 fn notify_rx<'a, 'b: 'a>(&'b self, mut tail: MutexGuard<'a, Tail>) {
881 // It is critical for `GuardedLinkedList` safety that the guard node is
882 // pinned in memory and is not dropped until the guarded list is dropped.
883 let guard = Waiter::new();
884 pin!(guard);
885
886 // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
887 // underneath to allow every waiter to safely remove itself from it.
888 //
889 // * This list will be still guarded by the `waiters` lock.
890 // `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
891 // * This wrapper will empty the list on drop. It is critical for safety
892 // that we will not leave any list entry with a pointer to the local
893 // guard node after this function returns / panics.
894 let mut list = WaitersList::new(std::mem::take(&mut tail.waiters), guard.as_ref(), self);
895
896 let mut wakers = WakeList::new();
897 'outer: loop {
898 while wakers.can_push() {
899 match list.pop_back_locked(&mut tail) {
900 Some(mut waiter) => {
901 // Safety: `tail` lock is still held.
902 let waiter = unsafe { waiter.as_mut() };
903
904 assert!(waiter.queued);
905 waiter.queued = false;
906
907 if let Some(waker) = waiter.waker.take() {
908 wakers.push(waker);
909 }
910 }
911 None => {
912 break 'outer;
913 }
914 }
915 }
916
917 // Release the lock before waking.
918 drop(tail);
919
920 // Before we acquire the lock again all sorts of things can happen:
921 // some waiters may remove themselves from the list and new waiters
922 // may be added. This is fine since at worst we will unnecessarily
923 // wake up waiters which will then queue themselves again.
924
925 wakers.wake_all();
926
927 // Acquire the lock again.
928 tail = self.tail.lock();
929 }
930
931 // Release the lock before waking.
932 drop(tail);
933
934 wakers.wake_all();
935 }
936 }
937
938 impl<T> Clone for Sender<T> {
clone(&self) -> Sender<T>939 fn clone(&self) -> Sender<T> {
940 let shared = self.shared.clone();
941 shared.num_tx.fetch_add(1, SeqCst);
942
943 Sender { shared }
944 }
945 }
946
947 impl<T> Drop for Sender<T> {
drop(&mut self)948 fn drop(&mut self) {
949 if 1 == self.shared.num_tx.fetch_sub(1, SeqCst) {
950 self.close_channel();
951 }
952 }
953 }
954
955 impl<T> Receiver<T> {
956 /// Returns the number of messages that were sent into the channel and that
957 /// this [`Receiver`] has yet to receive.
958 ///
959 /// If the returned value from `len` is larger than the next largest power of 2
960 /// of the capacity of the channel any call to [`recv`] will return an
961 /// `Err(RecvError::Lagged)` and any call to [`try_recv`] will return an
962 /// `Err(TryRecvError::Lagged)`, e.g. if the capacity of the channel is 10,
963 /// [`recv`] will start to return `Err(RecvError::Lagged)` once `len` returns
964 /// values larger than 16.
965 ///
966 /// [`Receiver`]: crate::sync::broadcast::Receiver
967 /// [`recv`]: crate::sync::broadcast::Receiver::recv
968 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
969 ///
970 /// # Examples
971 ///
972 /// ```
973 /// use tokio::sync::broadcast;
974 ///
975 /// #[tokio::main]
976 /// async fn main() {
977 /// let (tx, mut rx1) = broadcast::channel(16);
978 ///
979 /// tx.send(10).unwrap();
980 /// tx.send(20).unwrap();
981 ///
982 /// assert_eq!(rx1.len(), 2);
983 /// assert_eq!(rx1.recv().await.unwrap(), 10);
984 /// assert_eq!(rx1.len(), 1);
985 /// assert_eq!(rx1.recv().await.unwrap(), 20);
986 /// assert_eq!(rx1.len(), 0);
987 /// }
988 /// ```
len(&self) -> usize989 pub fn len(&self) -> usize {
990 let next_send_pos = self.shared.tail.lock().pos;
991 (next_send_pos - self.next) as usize
992 }
993
994 /// Returns true if there aren't any messages in the channel that the [`Receiver`]
995 /// has yet to receive.
996 ///
997 /// [`Receiver]: create::sync::broadcast::Receiver
998 ///
999 /// # Examples
1000 ///
1001 /// ```
1002 /// use tokio::sync::broadcast;
1003 ///
1004 /// #[tokio::main]
1005 /// async fn main() {
1006 /// let (tx, mut rx1) = broadcast::channel(16);
1007 ///
1008 /// assert!(rx1.is_empty());
1009 ///
1010 /// tx.send(10).unwrap();
1011 /// tx.send(20).unwrap();
1012 ///
1013 /// assert!(!rx1.is_empty());
1014 /// assert_eq!(rx1.recv().await.unwrap(), 10);
1015 /// assert_eq!(rx1.recv().await.unwrap(), 20);
1016 /// assert!(rx1.is_empty());
1017 /// }
1018 /// ```
is_empty(&self) -> bool1019 pub fn is_empty(&self) -> bool {
1020 self.len() == 0
1021 }
1022
1023 /// Returns `true` if receivers belong to the same channel.
1024 ///
1025 /// # Examples
1026 ///
1027 /// ```
1028 /// use tokio::sync::broadcast;
1029 ///
1030 /// #[tokio::main]
1031 /// async fn main() {
1032 /// let (tx, rx) = broadcast::channel::<()>(16);
1033 /// let rx2 = tx.subscribe();
1034 ///
1035 /// assert!(rx.same_channel(&rx2));
1036 ///
1037 /// let (_tx3, rx3) = broadcast::channel::<()>(16);
1038 ///
1039 /// assert!(!rx3.same_channel(&rx2));
1040 /// }
1041 /// ```
same_channel(&self, other: &Self) -> bool1042 pub fn same_channel(&self, other: &Self) -> bool {
1043 Arc::ptr_eq(&self.shared, &other.shared)
1044 }
1045
1046 /// Locks the next value if there is one.
recv_ref( &mut self, waiter: Option<(&UnsafeCell<Waiter>, &Waker)>, ) -> Result<RecvGuard<'_, T>, TryRecvError>1047 fn recv_ref(
1048 &mut self,
1049 waiter: Option<(&UnsafeCell<Waiter>, &Waker)>,
1050 ) -> Result<RecvGuard<'_, T>, TryRecvError> {
1051 let idx = (self.next & self.shared.mask as u64) as usize;
1052
1053 // The slot holding the next value to read
1054 let mut slot = self.shared.buffer[idx].read().unwrap();
1055
1056 if slot.pos != self.next {
1057 // Release the `slot` lock before attempting to acquire the `tail`
1058 // lock. This is required because `send2` acquires the tail lock
1059 // first followed by the slot lock. Acquiring the locks in reverse
1060 // order here would result in a potential deadlock: `recv_ref`
1061 // acquires the `slot` lock and attempts to acquire the `tail` lock
1062 // while `send2` acquired the `tail` lock and attempts to acquire
1063 // the slot lock.
1064 drop(slot);
1065
1066 let mut old_waker = None;
1067
1068 let mut tail = self.shared.tail.lock();
1069
1070 // Acquire slot lock again
1071 slot = self.shared.buffer[idx].read().unwrap();
1072
1073 // Make sure the position did not change. This could happen in the
1074 // unlikely event that the buffer is wrapped between dropping the
1075 // read lock and acquiring the tail lock.
1076 if slot.pos != self.next {
1077 let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64);
1078
1079 if next_pos == self.next {
1080 // At this point the channel is empty for *this* receiver. If
1081 // it's been closed, then that's what we return, otherwise we
1082 // set a waker and return empty.
1083 if tail.closed {
1084 return Err(TryRecvError::Closed);
1085 }
1086
1087 // Store the waker
1088 if let Some((waiter, waker)) = waiter {
1089 // Safety: called while locked.
1090 unsafe {
1091 // Only queue if not already queued
1092 waiter.with_mut(|ptr| {
1093 // If there is no waker **or** if the currently
1094 // stored waker references a **different** task,
1095 // track the tasks' waker to be notified on
1096 // receipt of a new value.
1097 match (*ptr).waker {
1098 Some(ref w) if w.will_wake(waker) => {}
1099 _ => {
1100 old_waker = std::mem::replace(
1101 &mut (*ptr).waker,
1102 Some(waker.clone()),
1103 );
1104 }
1105 }
1106
1107 if !(*ptr).queued {
1108 (*ptr).queued = true;
1109 tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr));
1110 }
1111 });
1112 }
1113 }
1114
1115 // Drop the old waker after releasing the locks.
1116 drop(slot);
1117 drop(tail);
1118 drop(old_waker);
1119
1120 return Err(TryRecvError::Empty);
1121 }
1122
1123 // At this point, the receiver has lagged behind the sender by
1124 // more than the channel capacity. The receiver will attempt to
1125 // catch up by skipping dropped messages and setting the
1126 // internal cursor to the **oldest** message stored by the
1127 // channel.
1128 let next = tail.pos.wrapping_sub(self.shared.buffer.len() as u64);
1129
1130 let missed = next.wrapping_sub(self.next);
1131
1132 drop(tail);
1133
1134 // The receiver is slow but no values have been missed
1135 if missed == 0 {
1136 self.next = self.next.wrapping_add(1);
1137
1138 return Ok(RecvGuard { slot });
1139 }
1140
1141 self.next = next;
1142
1143 return Err(TryRecvError::Lagged(missed));
1144 }
1145 }
1146
1147 self.next = self.next.wrapping_add(1);
1148
1149 Ok(RecvGuard { slot })
1150 }
1151 }
1152
1153 impl<T: Clone> Receiver<T> {
1154 /// Re-subscribes to the channel starting from the current tail element.
1155 ///
1156 /// This [`Receiver`] handle will receive a clone of all values sent
1157 /// **after** it has resubscribed. This will not include elements that are
1158 /// in the queue of the current receiver. Consider the following example.
1159 ///
1160 /// # Examples
1161 ///
1162 /// ```
1163 /// use tokio::sync::broadcast;
1164 ///
1165 /// #[tokio::main]
1166 /// async fn main() {
1167 /// let (tx, mut rx) = broadcast::channel(2);
1168 ///
1169 /// tx.send(1).unwrap();
1170 /// let mut rx2 = rx.resubscribe();
1171 /// tx.send(2).unwrap();
1172 ///
1173 /// assert_eq!(rx2.recv().await.unwrap(), 2);
1174 /// assert_eq!(rx.recv().await.unwrap(), 1);
1175 /// }
1176 /// ```
resubscribe(&self) -> Self1177 pub fn resubscribe(&self) -> Self {
1178 let shared = self.shared.clone();
1179 new_receiver(shared)
1180 }
1181 /// Receives the next value for this receiver.
1182 ///
1183 /// Each [`Receiver`] handle will receive a clone of all values sent
1184 /// **after** it has subscribed.
1185 ///
1186 /// `Err(RecvError::Closed)` is returned when all `Sender` halves have
1187 /// dropped, indicating that no further values can be sent on the channel.
1188 ///
1189 /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1190 /// sent values will overwrite old values. At this point, a call to [`recv`]
1191 /// will return with `Err(RecvError::Lagged)` and the [`Receiver`]'s
1192 /// internal cursor is updated to point to the oldest value still held by
1193 /// the channel. A subsequent call to [`recv`] will return this value
1194 /// **unless** it has been since overwritten.
1195 ///
1196 /// # Cancel safety
1197 ///
1198 /// This method is cancel safe. If `recv` is used as the event in a
1199 /// [`tokio::select!`](crate::select) statement and some other branch
1200 /// completes first, it is guaranteed that no messages were received on this
1201 /// channel.
1202 ///
1203 /// [`Receiver`]: crate::sync::broadcast::Receiver
1204 /// [`recv`]: crate::sync::broadcast::Receiver::recv
1205 ///
1206 /// # Examples
1207 ///
1208 /// ```
1209 /// use tokio::sync::broadcast;
1210 ///
1211 /// #[tokio::main]
1212 /// async fn main() {
1213 /// let (tx, mut rx1) = broadcast::channel(16);
1214 /// let mut rx2 = tx.subscribe();
1215 ///
1216 /// tokio::spawn(async move {
1217 /// assert_eq!(rx1.recv().await.unwrap(), 10);
1218 /// assert_eq!(rx1.recv().await.unwrap(), 20);
1219 /// });
1220 ///
1221 /// tokio::spawn(async move {
1222 /// assert_eq!(rx2.recv().await.unwrap(), 10);
1223 /// assert_eq!(rx2.recv().await.unwrap(), 20);
1224 /// });
1225 ///
1226 /// tx.send(10).unwrap();
1227 /// tx.send(20).unwrap();
1228 /// }
1229 /// ```
1230 ///
1231 /// Handling lag
1232 ///
1233 /// ```
1234 /// use tokio::sync::broadcast;
1235 ///
1236 /// #[tokio::main]
1237 /// async fn main() {
1238 /// let (tx, mut rx) = broadcast::channel(2);
1239 ///
1240 /// tx.send(10).unwrap();
1241 /// tx.send(20).unwrap();
1242 /// tx.send(30).unwrap();
1243 ///
1244 /// // The receiver lagged behind
1245 /// assert!(rx.recv().await.is_err());
1246 ///
1247 /// // At this point, we can abort or continue with lost messages
1248 ///
1249 /// assert_eq!(20, rx.recv().await.unwrap());
1250 /// assert_eq!(30, rx.recv().await.unwrap());
1251 /// }
1252 /// ```
recv(&mut self) -> Result<T, RecvError>1253 pub async fn recv(&mut self) -> Result<T, RecvError> {
1254 let fut = Recv::new(self);
1255 fut.await
1256 }
1257
1258 /// Attempts to return a pending value on this receiver without awaiting.
1259 ///
1260 /// This is useful for a flavor of "optimistic check" before deciding to
1261 /// await on a receiver.
1262 ///
1263 /// Compared with [`recv`], this function has three failure cases instead of two
1264 /// (one for closed, one for an empty buffer, one for a lagging receiver).
1265 ///
1266 /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
1267 /// dropped, indicating that no further values can be sent on the channel.
1268 ///
1269 /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1270 /// sent values will overwrite old values. At this point, a call to [`recv`]
1271 /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
1272 /// internal cursor is updated to point to the oldest value still held by
1273 /// the channel. A subsequent call to [`try_recv`] will return this value
1274 /// **unless** it has been since overwritten. If there are no values to
1275 /// receive, `Err(TryRecvError::Empty)` is returned.
1276 ///
1277 /// [`recv`]: crate::sync::broadcast::Receiver::recv
1278 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
1279 /// [`Receiver`]: crate::sync::broadcast::Receiver
1280 ///
1281 /// # Examples
1282 ///
1283 /// ```
1284 /// use tokio::sync::broadcast;
1285 ///
1286 /// #[tokio::main]
1287 /// async fn main() {
1288 /// let (tx, mut rx) = broadcast::channel(16);
1289 ///
1290 /// assert!(rx.try_recv().is_err());
1291 ///
1292 /// tx.send(10).unwrap();
1293 ///
1294 /// let value = rx.try_recv().unwrap();
1295 /// assert_eq!(10, value);
1296 /// }
1297 /// ```
try_recv(&mut self) -> Result<T, TryRecvError>1298 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1299 let guard = self.recv_ref(None)?;
1300 guard.clone_value().ok_or(TryRecvError::Closed)
1301 }
1302
1303 /// Blocking receive to call outside of asynchronous contexts.
1304 ///
1305 /// # Panics
1306 ///
1307 /// This function panics if called within an asynchronous execution
1308 /// context.
1309 ///
1310 /// # Examples
1311 /// ```
1312 /// use std::thread;
1313 /// use tokio::sync::broadcast;
1314 ///
1315 /// #[tokio::main]
1316 /// async fn main() {
1317 /// let (tx, mut rx) = broadcast::channel(16);
1318 ///
1319 /// let sync_code = thread::spawn(move || {
1320 /// assert_eq!(rx.blocking_recv(), Ok(10));
1321 /// });
1322 ///
1323 /// let _ = tx.send(10);
1324 /// sync_code.join().unwrap();
1325 /// }
blocking_recv(&mut self) -> Result<T, RecvError>1326 pub fn blocking_recv(&mut self) -> Result<T, RecvError> {
1327 crate::future::block_on(self.recv())
1328 }
1329 }
1330
1331 impl<T> Drop for Receiver<T> {
drop(&mut self)1332 fn drop(&mut self) {
1333 let mut tail = self.shared.tail.lock();
1334
1335 tail.rx_cnt -= 1;
1336 let until = tail.pos;
1337
1338 drop(tail);
1339
1340 while self.next < until {
1341 match self.recv_ref(None) {
1342 Ok(_) => {}
1343 // The channel is closed
1344 Err(TryRecvError::Closed) => break,
1345 // Ignore lagging, we will catch up
1346 Err(TryRecvError::Lagged(..)) => {}
1347 // Can't be empty
1348 Err(TryRecvError::Empty) => panic!("unexpected empty broadcast channel"),
1349 }
1350 }
1351 }
1352 }
1353
1354 impl<'a, T> Recv<'a, T> {
new(receiver: &'a mut Receiver<T>) -> Recv<'a, T>1355 fn new(receiver: &'a mut Receiver<T>) -> Recv<'a, T> {
1356 Recv {
1357 receiver,
1358 waiter: UnsafeCell::new(Waiter {
1359 queued: false,
1360 waker: None,
1361 pointers: linked_list::Pointers::new(),
1362 _p: PhantomPinned,
1363 }),
1364 }
1365 }
1366
1367 /// A custom `project` implementation is used in place of `pin-project-lite`
1368 /// as a custom drop implementation is needed.
project(self: Pin<&mut Self>) -> (&mut Receiver<T>, &UnsafeCell<Waiter>)1369 fn project(self: Pin<&mut Self>) -> (&mut Receiver<T>, &UnsafeCell<Waiter>) {
1370 unsafe {
1371 // Safety: Receiver is Unpin
1372 is_unpin::<&mut Receiver<T>>();
1373
1374 let me = self.get_unchecked_mut();
1375 (me.receiver, &me.waiter)
1376 }
1377 }
1378 }
1379
1380 impl<'a, T> Future for Recv<'a, T>
1381 where
1382 T: Clone,
1383 {
1384 type Output = Result<T, RecvError>;
1385
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>>1386 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1387 ready!(crate::trace::trace_leaf(cx));
1388
1389 let (receiver, waiter) = self.project();
1390
1391 let guard = match receiver.recv_ref(Some((waiter, cx.waker()))) {
1392 Ok(value) => value,
1393 Err(TryRecvError::Empty) => return Poll::Pending,
1394 Err(TryRecvError::Lagged(n)) => return Poll::Ready(Err(RecvError::Lagged(n))),
1395 Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
1396 };
1397
1398 Poll::Ready(guard.clone_value().ok_or(RecvError::Closed))
1399 }
1400 }
1401
1402 impl<'a, T> Drop for Recv<'a, T> {
drop(&mut self)1403 fn drop(&mut self) {
1404 // Acquire the tail lock. This is required for safety before accessing
1405 // the waiter node.
1406 let mut tail = self.receiver.shared.tail.lock();
1407
1408 // safety: tail lock is held
1409 let queued = self.waiter.with(|ptr| unsafe { (*ptr).queued });
1410
1411 if queued {
1412 // Remove the node
1413 //
1414 // safety: tail lock is held and the wait node is verified to be in
1415 // the list.
1416 unsafe {
1417 self.waiter.with_mut(|ptr| {
1418 tail.waiters.remove((&mut *ptr).into());
1419 });
1420 }
1421 }
1422 }
1423 }
1424
1425 /// # Safety
1426 ///
1427 /// `Waiter` is forced to be !Unpin.
1428 unsafe impl linked_list::Link for Waiter {
1429 type Handle = NonNull<Waiter>;
1430 type Target = Waiter;
1431
as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter>1432 fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1433 *handle
1434 }
1435
from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter>1436 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1437 ptr
1438 }
1439
pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>>1440 unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1441 Waiter::addr_of_pointers(target)
1442 }
1443 }
1444
1445 impl<T> fmt::Debug for Sender<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1446 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1447 write!(fmt, "broadcast::Sender")
1448 }
1449 }
1450
1451 impl<T> fmt::Debug for Receiver<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1452 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1453 write!(fmt, "broadcast::Receiver")
1454 }
1455 }
1456
1457 impl<'a, T> RecvGuard<'a, T> {
clone_value(&self) -> Option<T> where T: Clone,1458 fn clone_value(&self) -> Option<T>
1459 where
1460 T: Clone,
1461 {
1462 self.slot.val.with(|ptr| unsafe { (*ptr).clone() })
1463 }
1464 }
1465
1466 impl<'a, T> Drop for RecvGuard<'a, T> {
drop(&mut self)1467 fn drop(&mut self) {
1468 // Decrement the remaining counter
1469 if 1 == self.slot.rem.fetch_sub(1, SeqCst) {
1470 // Safety: Last receiver, drop the value
1471 self.slot.val.with_mut(|ptr| unsafe { *ptr = None });
1472 }
1473 }
1474 }
1475
is_unpin<T: Unpin>()1476 fn is_unpin<T: Unpin>() {}
1477
1478 #[cfg(not(loom))]
1479 #[cfg(test)]
1480 mod tests {
1481 use super::*;
1482
1483 #[test]
receiver_count_on_sender_constructor()1484 fn receiver_count_on_sender_constructor() {
1485 let sender = Sender::<i32>::new(16);
1486 assert_eq!(sender.receiver_count(), 0);
1487
1488 let rx_1 = sender.subscribe();
1489 assert_eq!(sender.receiver_count(), 1);
1490
1491 let rx_2 = rx_1.resubscribe();
1492 assert_eq!(sender.receiver_count(), 2);
1493
1494 let rx_3 = sender.subscribe();
1495 assert_eq!(sender.receiver_count(), 3);
1496
1497 drop(rx_3);
1498 drop(rx_1);
1499 assert_eq!(sender.receiver_count(), 1);
1500
1501 drop(rx_2);
1502 assert_eq!(sender.receiver_count(), 0);
1503 }
1504
1505 #[cfg(not(loom))]
1506 #[test]
receiver_count_on_channel_constructor()1507 fn receiver_count_on_channel_constructor() {
1508 let (sender, rx) = channel::<i32>(16);
1509 assert_eq!(sender.receiver_count(), 1);
1510
1511 let _rx_2 = rx.resubscribe();
1512 assert_eq!(sender.receiver_count(), 2);
1513 }
1514 }
1515