• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! A multi-producer, single-consumer queue for sending values across
2 //! asynchronous tasks.
3 //!
4 //! Similarly to the `std`, channel creation provides [`Receiver`] and
5 //! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to
6 //! read values out of the channel. If there is no message to read from the
7 //! channel, the current task will be notified when a new value is sent.
8 //! [`Sender`] implements the `Sink` trait and allows a task to send messages into
9 //! the channel. If the channel is at capacity, the send will be rejected and
10 //! the task will be notified when additional capacity is available. In other
11 //! words, the channel provides backpressure.
12 //!
13 //! Unbounded channels are also available using the `unbounded` constructor.
14 //!
15 //! # Disconnection
16 //!
17 //! When all [`Sender`] handles have been dropped, it is no longer
18 //! possible to send values into the channel. This is considered the termination
19 //! event of the stream. As such, [`Receiver::poll_next`]
20 //! will return `Ok(Ready(None))`.
21 //!
22 //! If the [`Receiver`] handle is dropped, then messages can no longer
23 //! be read out of the channel. In this case, all further attempts to send will
24 //! result in an error.
25 //!
26 //! # Clean Shutdown
27 //!
28 //! If the [`Receiver`] is simply dropped, then it is possible for
29 //! there to be messages still in the channel that will not be processed. As
30 //! such, it is usually desirable to perform a "clean" shutdown. To do this, the
31 //! receiver will first call `close`, which will prevent any further messages to
32 //! be sent into the channel. Then, the receiver consumes the channel to
33 //! completion, at which point the receiver can be dropped.
34 //!
35 //! [`Sender`]: struct.Sender.html
36 //! [`Receiver`]: struct.Receiver.html
37 //! [`Stream`]: ../../futures_core/stream/trait.Stream.html
38 //! [`Receiver::poll_next`]:
39 //!     ../../futures_core/stream/trait.Stream.html#tymethod.poll_next
40 
41 // At the core, the channel uses an atomic FIFO queue for message passing. This
42 // queue is used as the primary coordination primitive. In order to enforce
43 // capacity limits and handle back pressure, a secondary FIFO queue is used to
44 // send parked task handles.
45 //
46 // The general idea is that the channel is created with a `buffer` size of `n`.
47 // The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
48 // slot to hold a message. This allows `Sender` to know for a fact that a send
49 // will succeed *before* starting to do the actual work of sending the value.
50 // Since most of this work is lock-free, once the work starts, it is impossible
51 // to safely revert.
52 //
53 // If the sender is unable to process a send operation, then the current
54 // task is parked and the handle is sent on the parked task queue.
55 //
56 // Note that the implementation guarantees that the channel capacity will never
57 // exceed the configured limit, however there is no *strict* guarantee that the
58 // receiver will wake up a parked task *immediately* when a slot becomes
59 // available. However, it will almost always unpark a task when a slot becomes
60 // available and it is *guaranteed* that a sender will be unparked when the
61 // message that caused the sender to become parked is read out of the channel.
62 //
63 // The steps for sending a message are roughly:
64 //
65 // 1) Increment the channel message count
66 // 2) If the channel is at capacity, push the task handle onto the wait queue
67 // 3) Push the message onto the message queue.
68 //
69 // The steps for receiving a message are roughly:
70 //
71 // 1) Pop a message from the message queue
72 // 2) Pop a task handle from the wait queue
73 // 3) Decrement the channel message count.
74 //
75 // It's important for the order of operations on lock-free structures to happen
76 // in reverse order between the sender and receiver. This makes the message
77 // queue the primary coordination structure and establishes the necessary
78 // happens-before semantics required for the acquire / release semantics used
79 // by the queue structure.
80 
81 use futures_core::stream::{FusedStream, Stream};
82 use futures_core::task::__internal::AtomicWaker;
83 use futures_core::task::{Context, Poll, Waker};
84 use std::fmt;
85 use std::pin::Pin;
86 use std::sync::atomic::AtomicUsize;
87 use std::sync::atomic::Ordering::SeqCst;
88 use std::sync::{Arc, Mutex};
89 use std::thread;
90 
91 use crate::mpsc::queue::Queue;
92 
93 mod queue;
94 #[cfg(feature = "sink")]
95 mod sink_impl;
96 
97 struct UnboundedSenderInner<T> {
98     // Channel state shared between the sender and receiver.
99     inner: Arc<UnboundedInner<T>>,
100 }
101 
102 struct BoundedSenderInner<T> {
103     // Channel state shared between the sender and receiver.
104     inner: Arc<BoundedInner<T>>,
105 
106     // Handle to the task that is blocked on this sender. This handle is sent
107     // to the receiver half in order to be notified when the sender becomes
108     // unblocked.
109     sender_task: Arc<Mutex<SenderTask>>,
110 
111     // `true` if the sender might be blocked. This is an optimization to avoid
112     // having to lock the mutex most of the time.
113     maybe_parked: bool,
114 }
115 
116 // We never project Pin<&mut SenderInner> to `Pin<&mut T>`
117 impl<T> Unpin for UnboundedSenderInner<T> {}
118 impl<T> Unpin for BoundedSenderInner<T> {}
119 
120 /// The transmission end of a bounded mpsc channel.
121 ///
122 /// This value is created by the [`channel`] function.
123 pub struct Sender<T>(Option<BoundedSenderInner<T>>);
124 
125 /// The transmission end of an unbounded mpsc channel.
126 ///
127 /// This value is created by the [`unbounded`] function.
128 pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>);
129 
130 trait AssertKinds: Send + Sync + Clone {}
131 impl AssertKinds for UnboundedSender<u32> {}
132 
133 /// The receiving end of a bounded mpsc channel.
134 ///
135 /// This value is created by the [`channel`] function.
136 pub struct Receiver<T> {
137     inner: Option<Arc<BoundedInner<T>>>,
138 }
139 
140 /// The receiving end of an unbounded mpsc channel.
141 ///
142 /// This value is created by the [`unbounded`] function.
143 pub struct UnboundedReceiver<T> {
144     inner: Option<Arc<UnboundedInner<T>>>,
145 }
146 
147 // `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>`
148 impl<T> Unpin for UnboundedReceiver<T> {}
149 
150 /// The error type for [`Sender`s](Sender) used as `Sink`s.
151 #[derive(Clone, Debug, PartialEq, Eq)]
152 pub struct SendError {
153     kind: SendErrorKind,
154 }
155 
156 /// The error type returned from [`try_send`](Sender::try_send).
157 #[derive(Clone, PartialEq, Eq)]
158 pub struct TrySendError<T> {
159     err: SendError,
160     val: T,
161 }
162 
163 #[derive(Clone, Debug, PartialEq, Eq)]
164 enum SendErrorKind {
165     Full,
166     Disconnected,
167 }
168 
169 /// The error type returned from [`try_next`](Receiver::try_next).
170 pub struct TryRecvError {
171     _priv: (),
172 }
173 
174 impl fmt::Display for SendError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result175     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176         if self.is_full() {
177             write!(f, "send failed because channel is full")
178         } else {
179             write!(f, "send failed because receiver is gone")
180         }
181     }
182 }
183 
184 impl std::error::Error for SendError {}
185 
186 impl SendError {
187     /// Returns `true` if this error is a result of the channel being full.
is_full(&self) -> bool188     pub fn is_full(&self) -> bool {
189         match self.kind {
190             SendErrorKind::Full => true,
191             _ => false,
192         }
193     }
194 
195     /// Returns `true` if this error is a result of the receiver being dropped.
is_disconnected(&self) -> bool196     pub fn is_disconnected(&self) -> bool {
197         match self.kind {
198             SendErrorKind::Disconnected => true,
199             _ => false,
200         }
201     }
202 }
203 
204 impl<T> fmt::Debug for TrySendError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result205     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
206         f.debug_struct("TrySendError").field("kind", &self.err.kind).finish()
207     }
208 }
209 
210 impl<T> fmt::Display for TrySendError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result211     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212         if self.is_full() {
213             write!(f, "send failed because channel is full")
214         } else {
215             write!(f, "send failed because receiver is gone")
216         }
217     }
218 }
219 
220 impl<T: core::any::Any> std::error::Error for TrySendError<T> {}
221 
222 impl<T> TrySendError<T> {
223     /// Returns `true` if this error is a result of the channel being full.
is_full(&self) -> bool224     pub fn is_full(&self) -> bool {
225         self.err.is_full()
226     }
227 
228     /// Returns `true` if this error is a result of the receiver being dropped.
is_disconnected(&self) -> bool229     pub fn is_disconnected(&self) -> bool {
230         self.err.is_disconnected()
231     }
232 
233     /// Returns the message that was attempted to be sent but failed.
into_inner(self) -> T234     pub fn into_inner(self) -> T {
235         self.val
236     }
237 
238     /// Drops the message and converts into a `SendError`.
into_send_error(self) -> SendError239     pub fn into_send_error(self) -> SendError {
240         self.err
241     }
242 }
243 
244 impl fmt::Debug for TryRecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result245     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246         f.debug_tuple("TryRecvError").finish()
247     }
248 }
249 
250 impl fmt::Display for TryRecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result251     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252         write!(f, "receiver channel is empty")
253     }
254 }
255 
256 impl std::error::Error for TryRecvError {}
257 
258 struct UnboundedInner<T> {
259     // Internal channel state. Consists of the number of messages stored in the
260     // channel as well as a flag signalling that the channel is closed.
261     state: AtomicUsize,
262 
263     // Atomic, FIFO queue used to send messages to the receiver
264     message_queue: Queue<T>,
265 
266     // Number of senders in existence
267     num_senders: AtomicUsize,
268 
269     // Handle to the receiver's task.
270     recv_task: AtomicWaker,
271 }
272 
273 struct BoundedInner<T> {
274     // Max buffer size of the channel. If `None` then the channel is unbounded.
275     buffer: usize,
276 
277     // Internal channel state. Consists of the number of messages stored in the
278     // channel as well as a flag signalling that the channel is closed.
279     state: AtomicUsize,
280 
281     // Atomic, FIFO queue used to send messages to the receiver
282     message_queue: Queue<T>,
283 
284     // Atomic, FIFO queue used to send parked task handles to the receiver.
285     parked_queue: Queue<Arc<Mutex<SenderTask>>>,
286 
287     // Number of senders in existence
288     num_senders: AtomicUsize,
289 
290     // Handle to the receiver's task.
291     recv_task: AtomicWaker,
292 }
293 
294 // Struct representation of `Inner::state`.
295 #[derive(Clone, Copy)]
296 struct State {
297     // `true` when the channel is open
298     is_open: bool,
299 
300     // Number of messages in the channel
301     num_messages: usize,
302 }
303 
304 // The `is_open` flag is stored in the left-most bit of `Inner::state`
305 const OPEN_MASK: usize = usize::max_value() - (usize::max_value() >> 1);
306 
307 // When a new channel is created, it is created in the open state with no
308 // pending messages.
309 const INIT_STATE: usize = OPEN_MASK;
310 
311 // The maximum number of messages that a channel can track is `usize::max_value() >> 1`
312 const MAX_CAPACITY: usize = !(OPEN_MASK);
313 
314 // The maximum requested buffer size must be less than the maximum capacity of
315 // a channel. This is because each sender gets a guaranteed slot.
316 const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
317 
318 // Sent to the consumer to wake up blocked producers
319 struct SenderTask {
320     task: Option<Waker>,
321     is_parked: bool,
322 }
323 
324 impl SenderTask {
new() -> Self325     fn new() -> Self {
326         Self { task: None, is_parked: false }
327     }
328 
notify(&mut self)329     fn notify(&mut self) {
330         self.is_parked = false;
331 
332         if let Some(task) = self.task.take() {
333             task.wake();
334         }
335     }
336 }
337 
338 /// Creates a bounded mpsc channel for communicating between asynchronous tasks.
339 ///
340 /// Being bounded, this channel provides backpressure to ensure that the sender
341 /// outpaces the receiver by only a limited amount. The channel's capacity is
342 /// equal to `buffer + num-senders`. In other words, each sender gets a
343 /// guaranteed slot in the channel capacity, and on top of that there are
344 /// `buffer` "first come, first serve" slots available to all senders.
345 ///
346 /// The [`Receiver`] returned implements the [`Stream`] trait, while [`Sender`]
347 /// implements `Sink`.
channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>)348 pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
349     // Check that the requested buffer size does not exceed the maximum buffer
350     // size permitted by the system.
351     assert!(buffer < MAX_BUFFER, "requested buffer size too large");
352 
353     let inner = Arc::new(BoundedInner {
354         buffer,
355         state: AtomicUsize::new(INIT_STATE),
356         message_queue: Queue::new(),
357         parked_queue: Queue::new(),
358         num_senders: AtomicUsize::new(1),
359         recv_task: AtomicWaker::new(),
360     });
361 
362     let tx = BoundedSenderInner {
363         inner: inner.clone(),
364         sender_task: Arc::new(Mutex::new(SenderTask::new())),
365         maybe_parked: false,
366     };
367 
368     let rx = Receiver { inner: Some(inner) };
369 
370     (Sender(Some(tx)), rx)
371 }
372 
373 /// Creates an unbounded mpsc channel for communicating between asynchronous
374 /// tasks.
375 ///
376 /// A `send` on this channel will always succeed as long as the receive half has
377 /// not been closed. If the receiver falls behind, messages will be arbitrarily
378 /// buffered.
379 ///
380 /// **Note** that the amount of available system memory is an implicit bound to
381 /// the channel. Using an `unbounded` channel has the ability of causing the
382 /// process to run out of memory. In this case, the process will be aborted.
unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>)383 pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
384     let inner = Arc::new(UnboundedInner {
385         state: AtomicUsize::new(INIT_STATE),
386         message_queue: Queue::new(),
387         num_senders: AtomicUsize::new(1),
388         recv_task: AtomicWaker::new(),
389     });
390 
391     let tx = UnboundedSenderInner { inner: inner.clone() };
392 
393     let rx = UnboundedReceiver { inner: Some(inner) };
394 
395     (UnboundedSender(Some(tx)), rx)
396 }
397 
398 /*
399  *
400  * ===== impl Sender =====
401  *
402  */
403 
404 impl<T> UnboundedSenderInner<T> {
poll_ready_nb(&self) -> Poll<Result<(), SendError>>405     fn poll_ready_nb(&self) -> Poll<Result<(), SendError>> {
406         let state = decode_state(self.inner.state.load(SeqCst));
407         if state.is_open {
408             Poll::Ready(Ok(()))
409         } else {
410             Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }))
411         }
412     }
413 
414     // Push message to the queue and signal to the receiver
queue_push_and_signal(&self, msg: T)415     fn queue_push_and_signal(&self, msg: T) {
416         // Push the message onto the message queue
417         self.inner.message_queue.push(msg);
418 
419         // Signal to the receiver that a message has been enqueued. If the
420         // receiver is parked, this will unpark the task.
421         self.inner.recv_task.wake();
422     }
423 
424     // Increment the number of queued messages. Returns the resulting number.
inc_num_messages(&self) -> Option<usize>425     fn inc_num_messages(&self) -> Option<usize> {
426         let mut curr = self.inner.state.load(SeqCst);
427 
428         loop {
429             let mut state = decode_state(curr);
430 
431             // The receiver end closed the channel.
432             if !state.is_open {
433                 return None;
434             }
435 
436             // This probably is never hit? Odds are the process will run out of
437             // memory first. It may be worth to return something else in this
438             // case?
439             assert!(
440                 state.num_messages < MAX_CAPACITY,
441                 "buffer space \
442                     exhausted; sending this messages would overflow the state"
443             );
444 
445             state.num_messages += 1;
446 
447             let next = encode_state(&state);
448             match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
449                 Ok(_) => return Some(state.num_messages),
450                 Err(actual) => curr = actual,
451             }
452         }
453     }
454 
455     /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool456     fn same_receiver(&self, other: &Self) -> bool {
457         Arc::ptr_eq(&self.inner, &other.inner)
458     }
459 
460     /// Returns whether the sender send to this receiver.
is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool461     fn is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool {
462         Arc::ptr_eq(&self.inner, inner)
463     }
464 
465     /// Returns pointer to the Arc containing sender
466     ///
467     /// The returned pointer is not referenced and should be only used for hashing!
ptr(&self) -> *const UnboundedInner<T>468     fn ptr(&self) -> *const UnboundedInner<T> {
469         &*self.inner
470     }
471 
472     /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool473     fn is_closed(&self) -> bool {
474         !decode_state(self.inner.state.load(SeqCst)).is_open
475     }
476 
477     /// Closes this channel from the sender side, preventing any new messages.
close_channel(&self)478     fn close_channel(&self) {
479         // There's no need to park this sender, its dropping,
480         // and we don't want to check for capacity, so skip
481         // that stuff from `do_send`.
482 
483         self.inner.set_closed();
484         self.inner.recv_task.wake();
485     }
486 }
487 
488 impl<T> BoundedSenderInner<T> {
489     /// Attempts to send a message on this `Sender`, returning the message
490     /// if there was an error.
try_send(&mut self, msg: T) -> Result<(), TrySendError<T>>491     fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
492         // If the sender is currently blocked, reject the message
493         if !self.poll_unparked(None).is_ready() {
494             return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg });
495         }
496 
497         // The channel has capacity to accept the message, so send it
498         self.do_send_b(msg)
499     }
500 
501     // Do the send without failing.
502     // Can be called only by bounded sender.
do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>>503     fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> {
504         // Anyone calling do_send *should* make sure there is room first,
505         // but assert here for tests as a sanity check.
506         debug_assert!(self.poll_unparked(None).is_ready());
507 
508         // First, increment the number of messages contained by the channel.
509         // This operation will also atomically determine if the sender task
510         // should be parked.
511         //
512         // `None` is returned in the case that the channel has been closed by the
513         // receiver. This happens when `Receiver::close` is called or the
514         // receiver is dropped.
515         let park_self = match self.inc_num_messages() {
516             Some(num_messages) => {
517                 // Block if the current number of pending messages has exceeded
518                 // the configured buffer size
519                 num_messages > self.inner.buffer
520             }
521             None => {
522                 return Err(TrySendError {
523                     err: SendError { kind: SendErrorKind::Disconnected },
524                     val: msg,
525                 })
526             }
527         };
528 
529         // If the channel has reached capacity, then the sender task needs to
530         // be parked. This will send the task handle on the parked task queue.
531         //
532         // However, when `do_send` is called while dropping the `Sender`,
533         // `task::current()` can't be called safely. In this case, in order to
534         // maintain internal consistency, a blank message is pushed onto the
535         // parked task queue.
536         if park_self {
537             self.park();
538         }
539 
540         self.queue_push_and_signal(msg);
541 
542         Ok(())
543     }
544 
545     // Push message to the queue and signal to the receiver
queue_push_and_signal(&self, msg: T)546     fn queue_push_and_signal(&self, msg: T) {
547         // Push the message onto the message queue
548         self.inner.message_queue.push(msg);
549 
550         // Signal to the receiver that a message has been enqueued. If the
551         // receiver is parked, this will unpark the task.
552         self.inner.recv_task.wake();
553     }
554 
555     // Increment the number of queued messages. Returns the resulting number.
inc_num_messages(&self) -> Option<usize>556     fn inc_num_messages(&self) -> Option<usize> {
557         let mut curr = self.inner.state.load(SeqCst);
558 
559         loop {
560             let mut state = decode_state(curr);
561 
562             // The receiver end closed the channel.
563             if !state.is_open {
564                 return None;
565             }
566 
567             // This probably is never hit? Odds are the process will run out of
568             // memory first. It may be worth to return something else in this
569             // case?
570             assert!(
571                 state.num_messages < MAX_CAPACITY,
572                 "buffer space \
573                     exhausted; sending this messages would overflow the state"
574             );
575 
576             state.num_messages += 1;
577 
578             let next = encode_state(&state);
579             match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
580                 Ok(_) => return Some(state.num_messages),
581                 Err(actual) => curr = actual,
582             }
583         }
584     }
585 
park(&mut self)586     fn park(&mut self) {
587         {
588             let mut sender = self.sender_task.lock().unwrap();
589             sender.task = None;
590             sender.is_parked = true;
591         }
592 
593         // Send handle over queue
594         let t = self.sender_task.clone();
595         self.inner.parked_queue.push(t);
596 
597         // Check to make sure we weren't closed after we sent our task on the
598         // queue
599         let state = decode_state(self.inner.state.load(SeqCst));
600         self.maybe_parked = state.is_open;
601     }
602 
603     /// Polls the channel to determine if there is guaranteed capacity to send
604     /// at least one item without waiting.
605     ///
606     /// # Return value
607     ///
608     /// This method returns:
609     ///
610     /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
611     /// - `Poll::Pending` if the channel may not have
612     ///   capacity, in which case the current task is queued to be notified once
613     ///   capacity is available;
614     /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>>615     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
616         let state = decode_state(self.inner.state.load(SeqCst));
617         if !state.is_open {
618             return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }));
619         }
620 
621         self.poll_unparked(Some(cx)).map(Ok)
622     }
623 
624     /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool625     fn same_receiver(&self, other: &Self) -> bool {
626         Arc::ptr_eq(&self.inner, &other.inner)
627     }
628 
629     /// Returns whether the sender send to this receiver.
is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool630     fn is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool {
631         Arc::ptr_eq(&self.inner, receiver)
632     }
633 
634     /// Returns pointer to the Arc containing sender
635     ///
636     /// The returned pointer is not referenced and should be only used for hashing!
ptr(&self) -> *const BoundedInner<T>637     fn ptr(&self) -> *const BoundedInner<T> {
638         &*self.inner
639     }
640 
641     /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool642     fn is_closed(&self) -> bool {
643         !decode_state(self.inner.state.load(SeqCst)).is_open
644     }
645 
646     /// Closes this channel from the sender side, preventing any new messages.
close_channel(&self)647     fn close_channel(&self) {
648         // There's no need to park this sender, its dropping,
649         // and we don't want to check for capacity, so skip
650         // that stuff from `do_send`.
651 
652         self.inner.set_closed();
653         self.inner.recv_task.wake();
654     }
655 
poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()>656     fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> {
657         // First check the `maybe_parked` variable. This avoids acquiring the
658         // lock in most cases
659         if self.maybe_parked {
660             // Get a lock on the task handle
661             let mut task = self.sender_task.lock().unwrap();
662 
663             if !task.is_parked {
664                 self.maybe_parked = false;
665                 return Poll::Ready(());
666             }
667 
668             // At this point, an unpark request is pending, so there will be an
669             // unpark sometime in the future. We just need to make sure that
670             // the correct task will be notified.
671             //
672             // Update the task in case the `Sender` has been moved to another
673             // task
674             task.task = cx.map(|cx| cx.waker().clone());
675 
676             Poll::Pending
677         } else {
678             Poll::Ready(())
679         }
680     }
681 }
682 
683 impl<T> Sender<T> {
684     /// Attempts to send a message on this `Sender`, returning the message
685     /// if there was an error.
try_send(&mut self, msg: T) -> Result<(), TrySendError<T>>686     pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
687         if let Some(inner) = &mut self.0 {
688             inner.try_send(msg)
689         } else {
690             Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
691         }
692     }
693 
694     /// Send a message on the channel.
695     ///
696     /// This function should only be called after
697     /// [`poll_ready`](Sender::poll_ready) has reported that the channel is
698     /// ready to receive a message.
start_send(&mut self, msg: T) -> Result<(), SendError>699     pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
700         self.try_send(msg).map_err(|e| e.err)
701     }
702 
703     /// Polls the channel to determine if there is guaranteed capacity to send
704     /// at least one item without waiting.
705     ///
706     /// # Return value
707     ///
708     /// This method returns:
709     ///
710     /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
711     /// - `Poll::Pending` if the channel may not have
712     ///   capacity, in which case the current task is queued to be notified once
713     ///   capacity is available;
714     /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>>715     pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
716         let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
717         inner.poll_ready(cx)
718     }
719 
720     /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool721     pub fn is_closed(&self) -> bool {
722         self.0.as_ref().map(BoundedSenderInner::is_closed).unwrap_or(true)
723     }
724 
725     /// Closes this channel from the sender side, preventing any new messages.
close_channel(&mut self)726     pub fn close_channel(&mut self) {
727         if let Some(inner) = &mut self.0 {
728             inner.close_channel();
729         }
730     }
731 
732     /// Disconnects this sender from the channel, closing it if there are no more senders left.
disconnect(&mut self)733     pub fn disconnect(&mut self) {
734         self.0 = None;
735     }
736 
737     /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool738     pub fn same_receiver(&self, other: &Self) -> bool {
739         match (&self.0, &other.0) {
740             (Some(inner), Some(other)) => inner.same_receiver(other),
741             _ => false,
742         }
743     }
744 
745     /// Returns whether the sender send to this receiver.
is_connected_to(&self, receiver: &Receiver<T>) -> bool746     pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool {
747         match (&self.0, &receiver.inner) {
748             (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
749             _ => false,
750         }
751     }
752 
753     /// Hashes the receiver into the provided hasher
hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher,754     pub fn hash_receiver<H>(&self, hasher: &mut H)
755     where
756         H: std::hash::Hasher,
757     {
758         use std::hash::Hash;
759 
760         let ptr = self.0.as_ref().map(|inner| inner.ptr());
761         ptr.hash(hasher);
762     }
763 }
764 
765 impl<T> UnboundedSender<T> {
766     /// Check if the channel is ready to receive a message.
poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>>767     pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> {
768         let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
769         inner.poll_ready_nb()
770     }
771 
772     /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool773     pub fn is_closed(&self) -> bool {
774         self.0.as_ref().map(UnboundedSenderInner::is_closed).unwrap_or(true)
775     }
776 
777     /// Closes this channel from the sender side, preventing any new messages.
close_channel(&self)778     pub fn close_channel(&self) {
779         if let Some(inner) = &self.0 {
780             inner.close_channel();
781         }
782     }
783 
784     /// Disconnects this sender from the channel, closing it if there are no more senders left.
disconnect(&mut self)785     pub fn disconnect(&mut self) {
786         self.0 = None;
787     }
788 
789     // Do the send without parking current task.
do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>>790     fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> {
791         if let Some(inner) = &self.0 {
792             if inner.inc_num_messages().is_some() {
793                 inner.queue_push_and_signal(msg);
794                 return Ok(());
795             }
796         }
797 
798         Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
799     }
800 
801     /// Send a message on the channel.
802     ///
803     /// This method should only be called after `poll_ready` has been used to
804     /// verify that the channel is ready to receive a message.
start_send(&mut self, msg: T) -> Result<(), SendError>805     pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
806         self.do_send_nb(msg).map_err(|e| e.err)
807     }
808 
809     /// Sends a message along this channel.
810     ///
811     /// This is an unbounded sender, so this function differs from `Sink::send`
812     /// by ensuring the return type reflects that the channel is always ready to
813     /// receive messages.
unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>>814     pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
815         self.do_send_nb(msg)
816     }
817 
818     /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool819     pub fn same_receiver(&self, other: &Self) -> bool {
820         match (&self.0, &other.0) {
821             (Some(inner), Some(other)) => inner.same_receiver(other),
822             _ => false,
823         }
824     }
825 
826     /// Returns whether the sender send to this receiver.
is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool827     pub fn is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool {
828         match (&self.0, &receiver.inner) {
829             (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
830             _ => false,
831         }
832     }
833 
834     /// Hashes the receiver into the provided hasher
hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher,835     pub fn hash_receiver<H>(&self, hasher: &mut H)
836     where
837         H: std::hash::Hasher,
838     {
839         use std::hash::Hash;
840 
841         let ptr = self.0.as_ref().map(|inner| inner.ptr());
842         ptr.hash(hasher);
843     }
844 
845     /// Return the number of messages in the queue or 0 if channel is disconnected.
len(&self) -> usize846     pub fn len(&self) -> usize {
847         if let Some(sender) = &self.0 {
848             decode_state(sender.inner.state.load(SeqCst)).num_messages
849         } else {
850             0
851         }
852     }
853 
854     /// Return false is channel has no queued messages, true otherwise.
is_empty(&self) -> bool855     pub fn is_empty(&self) -> bool {
856         self.len() == 0
857     }
858 }
859 
860 impl<T> Clone for Sender<T> {
clone(&self) -> Self861     fn clone(&self) -> Self {
862         Self(self.0.clone())
863     }
864 }
865 
866 impl<T> Clone for UnboundedSender<T> {
clone(&self) -> Self867     fn clone(&self) -> Self {
868         Self(self.0.clone())
869     }
870 }
871 
872 impl<T> Clone for UnboundedSenderInner<T> {
clone(&self) -> Self873     fn clone(&self) -> Self {
874         // Since this atomic op isn't actually guarding any memory and we don't
875         // care about any orderings besides the ordering on the single atomic
876         // variable, a relaxed ordering is acceptable.
877         let mut curr = self.inner.num_senders.load(SeqCst);
878 
879         loop {
880             // If the maximum number of senders has been reached, then fail
881             if curr == MAX_BUFFER {
882                 panic!("cannot clone `Sender` -- too many outstanding senders");
883             }
884 
885             debug_assert!(curr < MAX_BUFFER);
886 
887             let next = curr + 1;
888             match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
889                 Ok(_) => {
890                     // The ABA problem doesn't matter here. We only care that the
891                     // number of senders never exceeds the maximum.
892                     return Self { inner: self.inner.clone() };
893                 }
894                 Err(actual) => curr = actual,
895             }
896         }
897     }
898 }
899 
900 impl<T> Clone for BoundedSenderInner<T> {
clone(&self) -> Self901     fn clone(&self) -> Self {
902         // Since this atomic op isn't actually guarding any memory and we don't
903         // care about any orderings besides the ordering on the single atomic
904         // variable, a relaxed ordering is acceptable.
905         let mut curr = self.inner.num_senders.load(SeqCst);
906 
907         loop {
908             // If the maximum number of senders has been reached, then fail
909             if curr == self.inner.max_senders() {
910                 panic!("cannot clone `Sender` -- too many outstanding senders");
911             }
912 
913             debug_assert!(curr < self.inner.max_senders());
914 
915             let next = curr + 1;
916             match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
917                 Ok(_) => {
918                     // The ABA problem doesn't matter here. We only care that the
919                     // number of senders never exceeds the maximum.
920                     return Self {
921                         inner: self.inner.clone(),
922                         sender_task: Arc::new(Mutex::new(SenderTask::new())),
923                         maybe_parked: false,
924                     };
925                 }
926                 Err(actual) => curr = actual,
927             }
928         }
929     }
930 }
931 
932 impl<T> Drop for UnboundedSenderInner<T> {
drop(&mut self)933     fn drop(&mut self) {
934         // Ordering between variables don't matter here
935         let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
936 
937         if prev == 1 {
938             self.close_channel();
939         }
940     }
941 }
942 
943 impl<T> Drop for BoundedSenderInner<T> {
drop(&mut self)944     fn drop(&mut self) {
945         // Ordering between variables don't matter here
946         let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
947 
948         if prev == 1 {
949             self.close_channel();
950         }
951     }
952 }
953 
954 impl<T> fmt::Debug for Sender<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result955     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
956         f.debug_struct("Sender").field("closed", &self.is_closed()).finish()
957     }
958 }
959 
960 impl<T> fmt::Debug for UnboundedSender<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result961     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
962         f.debug_struct("UnboundedSender").field("closed", &self.is_closed()).finish()
963     }
964 }
965 
966 /*
967  *
968  * ===== impl Receiver =====
969  *
970  */
971 
972 impl<T> Receiver<T> {
973     /// Closes the receiving half of a channel, without dropping it.
974     ///
975     /// This prevents any further messages from being sent on the channel while
976     /// still enabling the receiver to drain messages that are buffered.
close(&mut self)977     pub fn close(&mut self) {
978         if let Some(inner) = &mut self.inner {
979             inner.set_closed();
980 
981             // Wake up any threads waiting as they'll see that we've closed the
982             // channel and will continue on their merry way.
983             while let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
984                 task.lock().unwrap().notify();
985             }
986         }
987     }
988 
989     /// Tries to receive the next message without notifying a context if empty.
990     ///
991     /// It is not recommended to call this function from inside of a future,
992     /// only when you've otherwise arranged to be notified when the channel is
993     /// no longer empty.
994     ///
995     /// This function returns:
996     /// * `Ok(Some(t))` when message is fetched
997     /// * `Ok(None)` when channel is closed and no messages left in the queue
998     /// * `Err(e)` when there are no messages available, but channel is not yet closed
try_next(&mut self) -> Result<Option<T>, TryRecvError>999     pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1000         match self.next_message() {
1001             Poll::Ready(msg) => Ok(msg),
1002             Poll::Pending => Err(TryRecvError { _priv: () }),
1003         }
1004     }
1005 
next_message(&mut self) -> Poll<Option<T>>1006     fn next_message(&mut self) -> Poll<Option<T>> {
1007         let inner = match self.inner.as_mut() {
1008             None => return Poll::Ready(None),
1009             Some(inner) => inner,
1010         };
1011         // Pop off a message
1012         match unsafe { inner.message_queue.pop_spin() } {
1013             Some(msg) => {
1014                 // If there are any parked task handles in the parked queue,
1015                 // pop one and unpark it.
1016                 self.unpark_one();
1017 
1018                 // Decrement number of messages
1019                 self.dec_num_messages();
1020 
1021                 Poll::Ready(Some(msg))
1022             }
1023             None => {
1024                 let state = decode_state(inner.state.load(SeqCst));
1025                 if state.is_closed() {
1026                     // If closed flag is set AND there are no pending messages
1027                     // it means end of stream
1028                     self.inner = None;
1029                     Poll::Ready(None)
1030                 } else {
1031                     // If queue is open, we need to return Pending
1032                     // to be woken up when new messages arrive.
1033                     // If queue is closed but num_messages is non-zero,
1034                     // it means that senders updated the state,
1035                     // but didn't put message to queue yet,
1036                     // so we need to park until sender unparks the task
1037                     // after queueing the message.
1038                     Poll::Pending
1039                 }
1040             }
1041         }
1042     }
1043 
1044     // Unpark a single task handle if there is one pending in the parked queue
unpark_one(&mut self)1045     fn unpark_one(&mut self) {
1046         if let Some(inner) = &mut self.inner {
1047             if let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
1048                 task.lock().unwrap().notify();
1049             }
1050         }
1051     }
1052 
dec_num_messages(&self)1053     fn dec_num_messages(&self) {
1054         if let Some(inner) = &self.inner {
1055             // OPEN_MASK is highest bit, so it's unaffected by subtraction
1056             // unless there's underflow, and we know there's no underflow
1057             // because number of messages at this point is always > 0.
1058             inner.state.fetch_sub(1, SeqCst);
1059         }
1060     }
1061 }
1062 
1063 // The receiver does not ever take a Pin to the inner T
1064 impl<T> Unpin for Receiver<T> {}
1065 
1066 impl<T> FusedStream for Receiver<T> {
is_terminated(&self) -> bool1067     fn is_terminated(&self) -> bool {
1068         self.inner.is_none()
1069     }
1070 }
1071 
1072 impl<T> Stream for Receiver<T> {
1073     type Item = T;
1074 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>>1075     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1076         // Try to read a message off of the message queue.
1077         match self.next_message() {
1078             Poll::Ready(msg) => {
1079                 if msg.is_none() {
1080                     self.inner = None;
1081                 }
1082                 Poll::Ready(msg)
1083             }
1084             Poll::Pending => {
1085                 // There are no messages to read, in this case, park.
1086                 self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1087                 // Check queue again after parking to prevent race condition:
1088                 // a message could be added to the queue after previous `next_message`
1089                 // before `register` call.
1090                 self.next_message()
1091             }
1092         }
1093     }
1094 
size_hint(&self) -> (usize, Option<usize>)1095     fn size_hint(&self) -> (usize, Option<usize>) {
1096         if let Some(inner) = &self.inner {
1097             decode_state(inner.state.load(SeqCst)).size_hint()
1098         } else {
1099             (0, Some(0))
1100         }
1101     }
1102 }
1103 
1104 impl<T> Drop for Receiver<T> {
drop(&mut self)1105     fn drop(&mut self) {
1106         // Drain the channel of all pending messages
1107         self.close();
1108         if self.inner.is_some() {
1109             loop {
1110                 match self.next_message() {
1111                     Poll::Ready(Some(_)) => {}
1112                     Poll::Ready(None) => break,
1113                     Poll::Pending => {
1114                         let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1115 
1116                         // If the channel is closed, then there is no need to park.
1117                         if state.is_closed() {
1118                             break;
1119                         }
1120 
1121                         // TODO: Spinning isn't ideal, it might be worth
1122                         // investigating using a condvar or some other strategy
1123                         // here. That said, if this case is hit, then another thread
1124                         // is about to push the value into the queue and this isn't
1125                         // the only spinlock in the impl right now.
1126                         thread::yield_now();
1127                     }
1128                 }
1129             }
1130         }
1131     }
1132 }
1133 
1134 impl<T> fmt::Debug for Receiver<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1135     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1136         let closed = if let Some(ref inner) = self.inner {
1137             decode_state(inner.state.load(SeqCst)).is_closed()
1138         } else {
1139             false
1140         };
1141 
1142         f.debug_struct("Receiver").field("closed", &closed).finish()
1143     }
1144 }
1145 
1146 impl<T> UnboundedReceiver<T> {
1147     /// Closes the receiving half of a channel, without dropping it.
1148     ///
1149     /// This prevents any further messages from being sent on the channel while
1150     /// still enabling the receiver to drain messages that are buffered.
close(&mut self)1151     pub fn close(&mut self) {
1152         if let Some(inner) = &mut self.inner {
1153             inner.set_closed();
1154         }
1155     }
1156 
1157     /// Tries to receive the next message without notifying a context if empty.
1158     ///
1159     /// It is not recommended to call this function from inside of a future,
1160     /// only when you've otherwise arranged to be notified when the channel is
1161     /// no longer empty.
1162     ///
1163     /// This function returns:
1164     /// * `Ok(Some(t))` when message is fetched
1165     /// * `Ok(None)` when channel is closed and no messages left in the queue
1166     /// * `Err(e)` when there are no messages available, but channel is not yet closed
try_next(&mut self) -> Result<Option<T>, TryRecvError>1167     pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1168         match self.next_message() {
1169             Poll::Ready(msg) => Ok(msg),
1170             Poll::Pending => Err(TryRecvError { _priv: () }),
1171         }
1172     }
1173 
next_message(&mut self) -> Poll<Option<T>>1174     fn next_message(&mut self) -> Poll<Option<T>> {
1175         let inner = match self.inner.as_mut() {
1176             None => return Poll::Ready(None),
1177             Some(inner) => inner,
1178         };
1179         // Pop off a message
1180         match unsafe { inner.message_queue.pop_spin() } {
1181             Some(msg) => {
1182                 // Decrement number of messages
1183                 self.dec_num_messages();
1184 
1185                 Poll::Ready(Some(msg))
1186             }
1187             None => {
1188                 let state = decode_state(inner.state.load(SeqCst));
1189                 if state.is_closed() {
1190                     // If closed flag is set AND there are no pending messages
1191                     // it means end of stream
1192                     self.inner = None;
1193                     Poll::Ready(None)
1194                 } else {
1195                     // If queue is open, we need to return Pending
1196                     // to be woken up when new messages arrive.
1197                     // If queue is closed but num_messages is non-zero,
1198                     // it means that senders updated the state,
1199                     // but didn't put message to queue yet,
1200                     // so we need to park until sender unparks the task
1201                     // after queueing the message.
1202                     Poll::Pending
1203                 }
1204             }
1205         }
1206     }
1207 
dec_num_messages(&self)1208     fn dec_num_messages(&self) {
1209         if let Some(inner) = &self.inner {
1210             // OPEN_MASK is highest bit, so it's unaffected by subtraction
1211             // unless there's underflow, and we know there's no underflow
1212             // because number of messages at this point is always > 0.
1213             inner.state.fetch_sub(1, SeqCst);
1214         }
1215     }
1216 }
1217 
1218 impl<T> FusedStream for UnboundedReceiver<T> {
is_terminated(&self) -> bool1219     fn is_terminated(&self) -> bool {
1220         self.inner.is_none()
1221     }
1222 }
1223 
1224 impl<T> Stream for UnboundedReceiver<T> {
1225     type Item = T;
1226 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>>1227     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1228         // Try to read a message off of the message queue.
1229         match self.next_message() {
1230             Poll::Ready(msg) => {
1231                 if msg.is_none() {
1232                     self.inner = None;
1233                 }
1234                 Poll::Ready(msg)
1235             }
1236             Poll::Pending => {
1237                 // There are no messages to read, in this case, park.
1238                 self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1239                 // Check queue again after parking to prevent race condition:
1240                 // a message could be added to the queue after previous `next_message`
1241                 // before `register` call.
1242                 self.next_message()
1243             }
1244         }
1245     }
1246 
size_hint(&self) -> (usize, Option<usize>)1247     fn size_hint(&self) -> (usize, Option<usize>) {
1248         if let Some(inner) = &self.inner {
1249             decode_state(inner.state.load(SeqCst)).size_hint()
1250         } else {
1251             (0, Some(0))
1252         }
1253     }
1254 }
1255 
1256 impl<T> Drop for UnboundedReceiver<T> {
drop(&mut self)1257     fn drop(&mut self) {
1258         // Drain the channel of all pending messages
1259         self.close();
1260         if self.inner.is_some() {
1261             loop {
1262                 match self.next_message() {
1263                     Poll::Ready(Some(_)) => {}
1264                     Poll::Ready(None) => break,
1265                     Poll::Pending => {
1266                         let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1267 
1268                         // If the channel is closed, then there is no need to park.
1269                         if state.is_closed() {
1270                             break;
1271                         }
1272 
1273                         // TODO: Spinning isn't ideal, it might be worth
1274                         // investigating using a condvar or some other strategy
1275                         // here. That said, if this case is hit, then another thread
1276                         // is about to push the value into the queue and this isn't
1277                         // the only spinlock in the impl right now.
1278                         thread::yield_now();
1279                     }
1280                 }
1281             }
1282         }
1283     }
1284 }
1285 
1286 impl<T> fmt::Debug for UnboundedReceiver<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1287     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1288         let closed = if let Some(ref inner) = self.inner {
1289             decode_state(inner.state.load(SeqCst)).is_closed()
1290         } else {
1291             false
1292         };
1293 
1294         f.debug_struct("Receiver").field("closed", &closed).finish()
1295     }
1296 }
1297 
1298 /*
1299  *
1300  * ===== impl Inner =====
1301  *
1302  */
1303 
1304 impl<T> UnboundedInner<T> {
1305     // Clear `open` flag in the state, keep `num_messages` intact.
set_closed(&self)1306     fn set_closed(&self) {
1307         let curr = self.state.load(SeqCst);
1308         if !decode_state(curr).is_open {
1309             return;
1310         }
1311 
1312         self.state.fetch_and(!OPEN_MASK, SeqCst);
1313     }
1314 }
1315 
1316 impl<T> BoundedInner<T> {
1317     // The return value is such that the total number of messages that can be
1318     // enqueued into the channel will never exceed MAX_CAPACITY
max_senders(&self) -> usize1319     fn max_senders(&self) -> usize {
1320         MAX_CAPACITY - self.buffer
1321     }
1322 
1323     // Clear `open` flag in the state, keep `num_messages` intact.
set_closed(&self)1324     fn set_closed(&self) {
1325         let curr = self.state.load(SeqCst);
1326         if !decode_state(curr).is_open {
1327             return;
1328         }
1329 
1330         self.state.fetch_and(!OPEN_MASK, SeqCst);
1331     }
1332 }
1333 
1334 unsafe impl<T: Send> Send for UnboundedInner<T> {}
1335 unsafe impl<T: Send> Sync for UnboundedInner<T> {}
1336 
1337 unsafe impl<T: Send> Send for BoundedInner<T> {}
1338 unsafe impl<T: Send> Sync for BoundedInner<T> {}
1339 
1340 impl State {
is_closed(&self) -> bool1341     fn is_closed(&self) -> bool {
1342         !self.is_open && self.num_messages == 0
1343     }
1344 
size_hint(&self) -> (usize, Option<usize>)1345     fn size_hint(&self) -> (usize, Option<usize>) {
1346         if self.is_open {
1347             (self.num_messages, None)
1348         } else {
1349             (self.num_messages, Some(self.num_messages))
1350         }
1351     }
1352 }
1353 
1354 /*
1355  *
1356  * ===== Helpers =====
1357  *
1358  */
1359 
decode_state(num: usize) -> State1360 fn decode_state(num: usize) -> State {
1361     State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY }
1362 }
1363 
encode_state(state: &State) -> usize1364 fn encode_state(state: &State) -> usize {
1365     let mut num = state.num_messages;
1366 
1367     if state.is_open {
1368         num |= OPEN_MASK;
1369     }
1370 
1371     num
1372 }
1373