• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! A multi-producer, single-consumer queue for sending values across
2 //! asynchronous tasks.
3 //!
4 //! Similarly to the `std`, channel creation provides [`Receiver`] and
5 //! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to
6 //! read values out of the channel. If there is no message to read from the
7 //! channel, the current task will be notified when a new value is sent.
8 //! [`Sender`] implements the `Sink` trait and allows a task to send messages into
9 //! the channel. If the channel is at capacity, the send will be rejected and
10 //! the task will be notified when additional capacity is available. In other
11 //! words, the channel provides backpressure.
12 //!
13 //! Unbounded channels are also available using the `unbounded` constructor.
14 //!
15 //! # Disconnection
16 //!
17 //! When all [`Sender`] handles have been dropped, it is no longer
18 //! possible to send values into the channel. This is considered the termination
19 //! event of the stream. As such, [`Receiver::poll_next`]
20 //! will return `Ok(Ready(None))`.
21 //!
22 //! If the [`Receiver`] handle is dropped, then messages can no longer
23 //! be read out of the channel. In this case, all further attempts to send will
24 //! result in an error.
25 //!
26 //! # Clean Shutdown
27 //!
28 //! If the [`Receiver`] is simply dropped, then it is possible for
29 //! there to be messages still in the channel that will not be processed. As
30 //! such, it is usually desirable to perform a "clean" shutdown. To do this, the
31 //! receiver will first call `close`, which will prevent any further messages to
32 //! be sent into the channel. Then, the receiver consumes the channel to
33 //! completion, at which point the receiver can be dropped.
34 //!
35 //! [`Sender`]: struct.Sender.html
36 //! [`Receiver`]: struct.Receiver.html
37 //! [`Stream`]: ../../futures_core/stream/trait.Stream.html
38 //! [`Receiver::poll_next`]:
39 //!     ../../futures_core/stream/trait.Stream.html#tymethod.poll_next
40 
41 // At the core, the channel uses an atomic FIFO queue for message passing. This
42 // queue is used as the primary coordination primitive. In order to enforce
43 // capacity limits and handle back pressure, a secondary FIFO queue is used to
44 // send parked task handles.
45 //
46 // The general idea is that the channel is created with a `buffer` size of `n`.
47 // The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
48 // slot to hold a message. This allows `Sender` to know for a fact that a send
49 // will succeed *before* starting to do the actual work of sending the value.
50 // Since most of this work is lock-free, once the work starts, it is impossible
51 // to safely revert.
52 //
53 // If the sender is unable to process a send operation, then the current
54 // task is parked and the handle is sent on the parked task queue.
55 //
56 // Note that the implementation guarantees that the channel capacity will never
57 // exceed the configured limit, however there is no *strict* guarantee that the
58 // receiver will wake up a parked task *immediately* when a slot becomes
59 // available. However, it will almost always unpark a task when a slot becomes
60 // available and it is *guaranteed* that a sender will be unparked when the
61 // message that caused the sender to become parked is read out of the channel.
62 //
63 // The steps for sending a message are roughly:
64 //
65 // 1) Increment the channel message count
66 // 2) If the channel is at capacity, push the task handle onto the wait queue
67 // 3) Push the message onto the message queue.
68 //
69 // The steps for receiving a message are roughly:
70 //
71 // 1) Pop a message from the message queue
72 // 2) Pop a task handle from the wait queue
73 // 3) Decrement the channel message count.
74 //
75 // It's important for the order of operations on lock-free structures to happen
76 // in reverse order between the sender and receiver. This makes the message
77 // queue the primary coordination structure and establishes the necessary
78 // happens-before semantics required for the acquire / release semantics used
79 // by the queue structure.
80 
81 use futures_core::stream::{FusedStream, Stream};
82 use futures_core::task::__internal::AtomicWaker;
83 use futures_core::task::{Context, Poll, Waker};
84 use std::fmt;
85 use std::pin::Pin;
86 use std::sync::atomic::AtomicUsize;
87 use std::sync::atomic::Ordering::SeqCst;
88 use std::sync::{Arc, Mutex};
89 use std::thread;
90 
91 use crate::mpsc::queue::Queue;
92 
93 mod queue;
94 #[cfg(feature = "sink")]
95 mod sink_impl;
96 
97 struct UnboundedSenderInner<T> {
98     // Channel state shared between the sender and receiver.
99     inner: Arc<UnboundedInner<T>>,
100 }
101 
102 struct BoundedSenderInner<T> {
103     // Channel state shared between the sender and receiver.
104     inner: Arc<BoundedInner<T>>,
105 
106     // Handle to the task that is blocked on this sender. This handle is sent
107     // to the receiver half in order to be notified when the sender becomes
108     // unblocked.
109     sender_task: Arc<Mutex<SenderTask>>,
110 
111     // `true` if the sender might be blocked. This is an optimization to avoid
112     // having to lock the mutex most of the time.
113     maybe_parked: bool,
114 }
115 
116 // We never project Pin<&mut SenderInner> to `Pin<&mut T>`
117 impl<T> Unpin for UnboundedSenderInner<T> {}
118 impl<T> Unpin for BoundedSenderInner<T> {}
119 
120 /// The transmission end of a bounded mpsc channel.
121 ///
122 /// This value is created by the [`channel`](channel) function.
123 pub struct Sender<T>(Option<BoundedSenderInner<T>>);
124 
125 /// The transmission end of an unbounded mpsc channel.
126 ///
127 /// This value is created by the [`unbounded`](unbounded) function.
128 pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>);
129 
130 trait AssertKinds: Send + Sync + Clone {}
131 impl AssertKinds for UnboundedSender<u32> {}
132 
133 /// The receiving end of a bounded mpsc channel.
134 ///
135 /// This value is created by the [`channel`](channel) function.
136 pub struct Receiver<T> {
137     inner: Option<Arc<BoundedInner<T>>>,
138 }
139 
140 /// The receiving end of an unbounded mpsc channel.
141 ///
142 /// This value is created by the [`unbounded`](unbounded) function.
143 pub struct UnboundedReceiver<T> {
144     inner: Option<Arc<UnboundedInner<T>>>,
145 }
146 
147 // `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>`
148 impl<T> Unpin for UnboundedReceiver<T> {}
149 
150 /// The error type for [`Sender`s](Sender) used as `Sink`s.
151 #[derive(Clone, Debug, PartialEq, Eq)]
152 pub struct SendError {
153     kind: SendErrorKind,
154 }
155 
156 /// The error type returned from [`try_send`](Sender::try_send).
157 #[derive(Clone, PartialEq, Eq)]
158 pub struct TrySendError<T> {
159     err: SendError,
160     val: T,
161 }
162 
163 #[derive(Clone, Debug, PartialEq, Eq)]
164 enum SendErrorKind {
165     Full,
166     Disconnected,
167 }
168 
169 /// The error type returned from [`try_next`](Receiver::try_next).
170 pub struct TryRecvError {
171     _priv: (),
172 }
173 
174 impl fmt::Display for SendError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result175     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176         if self.is_full() {
177             write!(f, "send failed because channel is full")
178         } else {
179             write!(f, "send failed because receiver is gone")
180         }
181     }
182 }
183 
184 impl std::error::Error for SendError {}
185 
186 impl SendError {
187     /// Returns `true` if this error is a result of the channel being full.
is_full(&self) -> bool188     pub fn is_full(&self) -> bool {
189         match self.kind {
190             SendErrorKind::Full => true,
191             _ => false,
192         }
193     }
194 
195     /// Returns `true` if this error is a result of the receiver being dropped.
is_disconnected(&self) -> bool196     pub fn is_disconnected(&self) -> bool {
197         match self.kind {
198             SendErrorKind::Disconnected => true,
199             _ => false,
200         }
201     }
202 }
203 
204 impl<T> fmt::Debug for TrySendError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result205     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
206         f.debug_struct("TrySendError").field("kind", &self.err.kind).finish()
207     }
208 }
209 
210 impl<T> fmt::Display for TrySendError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result211     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212         if self.is_full() {
213             write!(f, "send failed because channel is full")
214         } else {
215             write!(f, "send failed because receiver is gone")
216         }
217     }
218 }
219 
220 impl<T: core::any::Any> std::error::Error for TrySendError<T> {}
221 
222 impl<T> TrySendError<T> {
223     /// Returns `true` if this error is a result of the channel being full.
is_full(&self) -> bool224     pub fn is_full(&self) -> bool {
225         self.err.is_full()
226     }
227 
228     /// Returns `true` if this error is a result of the receiver being dropped.
is_disconnected(&self) -> bool229     pub fn is_disconnected(&self) -> bool {
230         self.err.is_disconnected()
231     }
232 
233     /// Returns the message that was attempted to be sent but failed.
into_inner(self) -> T234     pub fn into_inner(self) -> T {
235         self.val
236     }
237 
238     /// Drops the message and converts into a `SendError`.
into_send_error(self) -> SendError239     pub fn into_send_error(self) -> SendError {
240         self.err
241     }
242 }
243 
244 impl fmt::Debug for TryRecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result245     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246         f.debug_tuple("TryRecvError").finish()
247     }
248 }
249 
250 impl fmt::Display for TryRecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result251     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252         write!(f, "receiver channel is empty")
253     }
254 }
255 
256 impl std::error::Error for TryRecvError {}
257 
258 struct UnboundedInner<T> {
259     // Internal channel state. Consists of the number of messages stored in the
260     // channel as well as a flag signalling that the channel is closed.
261     state: AtomicUsize,
262 
263     // Atomic, FIFO queue used to send messages to the receiver
264     message_queue: Queue<T>,
265 
266     // Number of senders in existence
267     num_senders: AtomicUsize,
268 
269     // Handle to the receiver's task.
270     recv_task: AtomicWaker,
271 }
272 
273 struct BoundedInner<T> {
274     // Max buffer size of the channel. If `None` then the channel is unbounded.
275     buffer: usize,
276 
277     // Internal channel state. Consists of the number of messages stored in the
278     // channel as well as a flag signalling that the channel is closed.
279     state: AtomicUsize,
280 
281     // Atomic, FIFO queue used to send messages to the receiver
282     message_queue: Queue<T>,
283 
284     // Atomic, FIFO queue used to send parked task handles to the receiver.
285     parked_queue: Queue<Arc<Mutex<SenderTask>>>,
286 
287     // Number of senders in existence
288     num_senders: AtomicUsize,
289 
290     // Handle to the receiver's task.
291     recv_task: AtomicWaker,
292 }
293 
294 // Struct representation of `Inner::state`.
295 #[derive(Clone, Copy)]
296 struct State {
297     // `true` when the channel is open
298     is_open: bool,
299 
300     // Number of messages in the channel
301     num_messages: usize,
302 }
303 
304 // The `is_open` flag is stored in the left-most bit of `Inner::state`
305 const OPEN_MASK: usize = usize::max_value() - (usize::max_value() >> 1);
306 
307 // When a new channel is created, it is created in the open state with no
308 // pending messages.
309 const INIT_STATE: usize = OPEN_MASK;
310 
311 // The maximum number of messages that a channel can track is `usize::max_value() >> 1`
312 const MAX_CAPACITY: usize = !(OPEN_MASK);
313 
314 // The maximum requested buffer size must be less than the maximum capacity of
315 // a channel. This is because each sender gets a guaranteed slot.
316 const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
317 
318 // Sent to the consumer to wake up blocked producers
319 struct SenderTask {
320     task: Option<Waker>,
321     is_parked: bool,
322 }
323 
324 impl SenderTask {
new() -> Self325     fn new() -> Self {
326         Self { task: None, is_parked: false }
327     }
328 
notify(&mut self)329     fn notify(&mut self) {
330         self.is_parked = false;
331 
332         if let Some(task) = self.task.take() {
333             task.wake();
334         }
335     }
336 }
337 
338 /// Creates a bounded mpsc channel for communicating between asynchronous tasks.
339 ///
340 /// Being bounded, this channel provides backpressure to ensure that the sender
341 /// outpaces the receiver by only a limited amount. The channel's capacity is
342 /// equal to `buffer + num-senders`. In other words, each sender gets a
343 /// guaranteed slot in the channel capacity, and on top of that there are
344 /// `buffer` "first come, first serve" slots available to all senders.
345 ///
346 /// The [`Receiver`](Receiver) returned implements the
347 /// [`Stream`](futures_core::stream::Stream) trait, while [`Sender`](Sender) implements
348 /// `Sink`.
channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>)349 pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
350     // Check that the requested buffer size does not exceed the maximum buffer
351     // size permitted by the system.
352     assert!(buffer < MAX_BUFFER, "requested buffer size too large");
353 
354     let inner = Arc::new(BoundedInner {
355         buffer,
356         state: AtomicUsize::new(INIT_STATE),
357         message_queue: Queue::new(),
358         parked_queue: Queue::new(),
359         num_senders: AtomicUsize::new(1),
360         recv_task: AtomicWaker::new(),
361     });
362 
363     let tx = BoundedSenderInner {
364         inner: inner.clone(),
365         sender_task: Arc::new(Mutex::new(SenderTask::new())),
366         maybe_parked: false,
367     };
368 
369     let rx = Receiver { inner: Some(inner) };
370 
371     (Sender(Some(tx)), rx)
372 }
373 
374 /// Creates an unbounded mpsc channel for communicating between asynchronous
375 /// tasks.
376 ///
377 /// A `send` on this channel will always succeed as long as the receive half has
378 /// not been closed. If the receiver falls behind, messages will be arbitrarily
379 /// buffered.
380 ///
381 /// **Note** that the amount of available system memory is an implicit bound to
382 /// the channel. Using an `unbounded` channel has the ability of causing the
383 /// process to run out of memory. In this case, the process will be aborted.
unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>)384 pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
385     let inner = Arc::new(UnboundedInner {
386         state: AtomicUsize::new(INIT_STATE),
387         message_queue: Queue::new(),
388         num_senders: AtomicUsize::new(1),
389         recv_task: AtomicWaker::new(),
390     });
391 
392     let tx = UnboundedSenderInner { inner: inner.clone() };
393 
394     let rx = UnboundedReceiver { inner: Some(inner) };
395 
396     (UnboundedSender(Some(tx)), rx)
397 }
398 
399 /*
400  *
401  * ===== impl Sender =====
402  *
403  */
404 
405 impl<T> UnboundedSenderInner<T> {
poll_ready_nb(&self) -> Poll<Result<(), SendError>>406     fn poll_ready_nb(&self) -> Poll<Result<(), SendError>> {
407         let state = decode_state(self.inner.state.load(SeqCst));
408         if state.is_open {
409             Poll::Ready(Ok(()))
410         } else {
411             Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }))
412         }
413     }
414 
415     // Push message to the queue and signal to the receiver
queue_push_and_signal(&self, msg: T)416     fn queue_push_and_signal(&self, msg: T) {
417         // Push the message onto the message queue
418         self.inner.message_queue.push(msg);
419 
420         // Signal to the receiver that a message has been enqueued. If the
421         // receiver is parked, this will unpark the task.
422         self.inner.recv_task.wake();
423     }
424 
425     // Increment the number of queued messages. Returns the resulting number.
inc_num_messages(&self) -> Option<usize>426     fn inc_num_messages(&self) -> Option<usize> {
427         let mut curr = self.inner.state.load(SeqCst);
428 
429         loop {
430             let mut state = decode_state(curr);
431 
432             // The receiver end closed the channel.
433             if !state.is_open {
434                 return None;
435             }
436 
437             // This probably is never hit? Odds are the process will run out of
438             // memory first. It may be worth to return something else in this
439             // case?
440             assert!(
441                 state.num_messages < MAX_CAPACITY,
442                 "buffer space \
443                     exhausted; sending this messages would overflow the state"
444             );
445 
446             state.num_messages += 1;
447 
448             let next = encode_state(&state);
449             match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
450                 Ok(_) => return Some(state.num_messages),
451                 Err(actual) => curr = actual,
452             }
453         }
454     }
455 
456     /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool457     fn same_receiver(&self, other: &Self) -> bool {
458         Arc::ptr_eq(&self.inner, &other.inner)
459     }
460 
461     /// Returns whether the sender send to this receiver.
is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool462     fn is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool {
463         Arc::ptr_eq(&self.inner, inner)
464     }
465 
466     /// Returns pointer to the Arc containing sender
467     ///
468     /// The returned pointer is not referenced and should be only used for hashing!
ptr(&self) -> *const UnboundedInner<T>469     fn ptr(&self) -> *const UnboundedInner<T> {
470         &*self.inner
471     }
472 
473     /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool474     fn is_closed(&self) -> bool {
475         !decode_state(self.inner.state.load(SeqCst)).is_open
476     }
477 
478     /// Closes this channel from the sender side, preventing any new messages.
close_channel(&self)479     fn close_channel(&self) {
480         // There's no need to park this sender, its dropping,
481         // and we don't want to check for capacity, so skip
482         // that stuff from `do_send`.
483 
484         self.inner.set_closed();
485         self.inner.recv_task.wake();
486     }
487 }
488 
489 impl<T> BoundedSenderInner<T> {
490     /// Attempts to send a message on this `Sender`, returning the message
491     /// if there was an error.
try_send(&mut self, msg: T) -> Result<(), TrySendError<T>>492     fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
493         // If the sender is currently blocked, reject the message
494         if !self.poll_unparked(None).is_ready() {
495             return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg });
496         }
497 
498         // The channel has capacity to accept the message, so send it
499         self.do_send_b(msg)
500     }
501 
502     // Do the send without failing.
503     // Can be called only by bounded sender.
do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>>504     fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> {
505         // Anyone calling do_send *should* make sure there is room first,
506         // but assert here for tests as a sanity check.
507         debug_assert!(self.poll_unparked(None).is_ready());
508 
509         // First, increment the number of messages contained by the channel.
510         // This operation will also atomically determine if the sender task
511         // should be parked.
512         //
513         // `None` is returned in the case that the channel has been closed by the
514         // receiver. This happens when `Receiver::close` is called or the
515         // receiver is dropped.
516         let park_self = match self.inc_num_messages() {
517             Some(num_messages) => {
518                 // Block if the current number of pending messages has exceeded
519                 // the configured buffer size
520                 num_messages > self.inner.buffer
521             }
522             None => {
523                 return Err(TrySendError {
524                     err: SendError { kind: SendErrorKind::Disconnected },
525                     val: msg,
526                 })
527             }
528         };
529 
530         // If the channel has reached capacity, then the sender task needs to
531         // be parked. This will send the task handle on the parked task queue.
532         //
533         // However, when `do_send` is called while dropping the `Sender`,
534         // `task::current()` can't be called safely. In this case, in order to
535         // maintain internal consistency, a blank message is pushed onto the
536         // parked task queue.
537         if park_self {
538             self.park();
539         }
540 
541         self.queue_push_and_signal(msg);
542 
543         Ok(())
544     }
545 
546     // Push message to the queue and signal to the receiver
queue_push_and_signal(&self, msg: T)547     fn queue_push_and_signal(&self, msg: T) {
548         // Push the message onto the message queue
549         self.inner.message_queue.push(msg);
550 
551         // Signal to the receiver that a message has been enqueued. If the
552         // receiver is parked, this will unpark the task.
553         self.inner.recv_task.wake();
554     }
555 
556     // Increment the number of queued messages. Returns the resulting number.
inc_num_messages(&self) -> Option<usize>557     fn inc_num_messages(&self) -> Option<usize> {
558         let mut curr = self.inner.state.load(SeqCst);
559 
560         loop {
561             let mut state = decode_state(curr);
562 
563             // The receiver end closed the channel.
564             if !state.is_open {
565                 return None;
566             }
567 
568             // This probably is never hit? Odds are the process will run out of
569             // memory first. It may be worth to return something else in this
570             // case?
571             assert!(
572                 state.num_messages < MAX_CAPACITY,
573                 "buffer space \
574                     exhausted; sending this messages would overflow the state"
575             );
576 
577             state.num_messages += 1;
578 
579             let next = encode_state(&state);
580             match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
581                 Ok(_) => return Some(state.num_messages),
582                 Err(actual) => curr = actual,
583             }
584         }
585     }
586 
park(&mut self)587     fn park(&mut self) {
588         {
589             let mut sender = self.sender_task.lock().unwrap();
590             sender.task = None;
591             sender.is_parked = true;
592         }
593 
594         // Send handle over queue
595         let t = self.sender_task.clone();
596         self.inner.parked_queue.push(t);
597 
598         // Check to make sure we weren't closed after we sent our task on the
599         // queue
600         let state = decode_state(self.inner.state.load(SeqCst));
601         self.maybe_parked = state.is_open;
602     }
603 
604     /// Polls the channel to determine if there is guaranteed capacity to send
605     /// at least one item without waiting.
606     ///
607     /// # Return value
608     ///
609     /// This method returns:
610     ///
611     /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
612     /// - `Poll::Pending` if the channel may not have
613     ///   capacity, in which case the current task is queued to be notified once
614     ///   capacity is available;
615     /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>>616     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
617         let state = decode_state(self.inner.state.load(SeqCst));
618         if !state.is_open {
619             return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }));
620         }
621 
622         self.poll_unparked(Some(cx)).map(Ok)
623     }
624 
625     /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool626     fn same_receiver(&self, other: &Self) -> bool {
627         Arc::ptr_eq(&self.inner, &other.inner)
628     }
629 
630     /// Returns whether the sender send to this receiver.
is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool631     fn is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool {
632         Arc::ptr_eq(&self.inner, receiver)
633     }
634 
635     /// Returns pointer to the Arc containing sender
636     ///
637     /// The returned pointer is not referenced and should be only used for hashing!
ptr(&self) -> *const BoundedInner<T>638     fn ptr(&self) -> *const BoundedInner<T> {
639         &*self.inner
640     }
641 
642     /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool643     fn is_closed(&self) -> bool {
644         !decode_state(self.inner.state.load(SeqCst)).is_open
645     }
646 
647     /// Closes this channel from the sender side, preventing any new messages.
close_channel(&self)648     fn close_channel(&self) {
649         // There's no need to park this sender, its dropping,
650         // and we don't want to check for capacity, so skip
651         // that stuff from `do_send`.
652 
653         self.inner.set_closed();
654         self.inner.recv_task.wake();
655     }
656 
poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()>657     fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> {
658         // First check the `maybe_parked` variable. This avoids acquiring the
659         // lock in most cases
660         if self.maybe_parked {
661             // Get a lock on the task handle
662             let mut task = self.sender_task.lock().unwrap();
663 
664             if !task.is_parked {
665                 self.maybe_parked = false;
666                 return Poll::Ready(());
667             }
668 
669             // At this point, an unpark request is pending, so there will be an
670             // unpark sometime in the future. We just need to make sure that
671             // the correct task will be notified.
672             //
673             // Update the task in case the `Sender` has been moved to another
674             // task
675             task.task = cx.map(|cx| cx.waker().clone());
676 
677             Poll::Pending
678         } else {
679             Poll::Ready(())
680         }
681     }
682 }
683 
684 impl<T> Sender<T> {
685     /// Attempts to send a message on this `Sender`, returning the message
686     /// if there was an error.
try_send(&mut self, msg: T) -> Result<(), TrySendError<T>>687     pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
688         if let Some(inner) = &mut self.0 {
689             inner.try_send(msg)
690         } else {
691             Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
692         }
693     }
694 
695     /// Send a message on the channel.
696     ///
697     /// This function should only be called after
698     /// [`poll_ready`](Sender::poll_ready) has reported that the channel is
699     /// ready to receive a message.
start_send(&mut self, msg: T) -> Result<(), SendError>700     pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
701         self.try_send(msg).map_err(|e| e.err)
702     }
703 
704     /// Polls the channel to determine if there is guaranteed capacity to send
705     /// at least one item without waiting.
706     ///
707     /// # Return value
708     ///
709     /// This method returns:
710     ///
711     /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
712     /// - `Poll::Pending` if the channel may not have
713     ///   capacity, in which case the current task is queued to be notified once
714     ///   capacity is available;
715     /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>>716     pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
717         let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
718         inner.poll_ready(cx)
719     }
720 
721     /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool722     pub fn is_closed(&self) -> bool {
723         self.0.as_ref().map(BoundedSenderInner::is_closed).unwrap_or(true)
724     }
725 
726     /// Closes this channel from the sender side, preventing any new messages.
close_channel(&mut self)727     pub fn close_channel(&mut self) {
728         if let Some(inner) = &mut self.0 {
729             inner.close_channel();
730         }
731     }
732 
733     /// Disconnects this sender from the channel, closing it if there are no more senders left.
disconnect(&mut self)734     pub fn disconnect(&mut self) {
735         self.0 = None;
736     }
737 
738     /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool739     pub fn same_receiver(&self, other: &Self) -> bool {
740         match (&self.0, &other.0) {
741             (Some(inner), Some(other)) => inner.same_receiver(other),
742             _ => false,
743         }
744     }
745 
746     /// Returns whether the sender send to this receiver.
is_connected_to(&self, receiver: &Receiver<T>) -> bool747     pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool {
748         match (&self.0, &receiver.inner) {
749             (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
750             _ => false,
751         }
752     }
753 
754     /// Hashes the receiver into the provided hasher
hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher,755     pub fn hash_receiver<H>(&self, hasher: &mut H)
756     where
757         H: std::hash::Hasher,
758     {
759         use std::hash::Hash;
760 
761         let ptr = self.0.as_ref().map(|inner| inner.ptr());
762         ptr.hash(hasher);
763     }
764 }
765 
766 impl<T> UnboundedSender<T> {
767     /// Check if the channel is ready to receive a message.
poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>>768     pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> {
769         let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
770         inner.poll_ready_nb()
771     }
772 
773     /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool774     pub fn is_closed(&self) -> bool {
775         self.0.as_ref().map(UnboundedSenderInner::is_closed).unwrap_or(true)
776     }
777 
778     /// Closes this channel from the sender side, preventing any new messages.
close_channel(&self)779     pub fn close_channel(&self) {
780         if let Some(inner) = &self.0 {
781             inner.close_channel();
782         }
783     }
784 
785     /// Disconnects this sender from the channel, closing it if there are no more senders left.
disconnect(&mut self)786     pub fn disconnect(&mut self) {
787         self.0 = None;
788     }
789 
790     // Do the send without parking current task.
do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>>791     fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> {
792         if let Some(inner) = &self.0 {
793             if inner.inc_num_messages().is_some() {
794                 inner.queue_push_and_signal(msg);
795                 return Ok(());
796             }
797         }
798 
799         Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
800     }
801 
802     /// Send a message on the channel.
803     ///
804     /// This method should only be called after `poll_ready` has been used to
805     /// verify that the channel is ready to receive a message.
start_send(&mut self, msg: T) -> Result<(), SendError>806     pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
807         self.do_send_nb(msg).map_err(|e| e.err)
808     }
809 
810     /// Sends a message along this channel.
811     ///
812     /// This is an unbounded sender, so this function differs from `Sink::send`
813     /// by ensuring the return type reflects that the channel is always ready to
814     /// receive messages.
unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>>815     pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
816         self.do_send_nb(msg)
817     }
818 
819     /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool820     pub fn same_receiver(&self, other: &Self) -> bool {
821         match (&self.0, &other.0) {
822             (Some(inner), Some(other)) => inner.same_receiver(other),
823             _ => false,
824         }
825     }
826 
827     /// Returns whether the sender send to this receiver.
is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool828     pub fn is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool {
829         match (&self.0, &receiver.inner) {
830             (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
831             _ => false,
832         }
833     }
834 
835     /// Hashes the receiver into the provided hasher
hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher,836     pub fn hash_receiver<H>(&self, hasher: &mut H)
837     where
838         H: std::hash::Hasher,
839     {
840         use std::hash::Hash;
841 
842         let ptr = self.0.as_ref().map(|inner| inner.ptr());
843         ptr.hash(hasher);
844     }
845 }
846 
847 impl<T> Clone for Sender<T> {
clone(&self) -> Self848     fn clone(&self) -> Self {
849         Self(self.0.clone())
850     }
851 }
852 
853 impl<T> Clone for UnboundedSender<T> {
clone(&self) -> Self854     fn clone(&self) -> Self {
855         Self(self.0.clone())
856     }
857 }
858 
859 impl<T> Clone for UnboundedSenderInner<T> {
clone(&self) -> Self860     fn clone(&self) -> Self {
861         // Since this atomic op isn't actually guarding any memory and we don't
862         // care about any orderings besides the ordering on the single atomic
863         // variable, a relaxed ordering is acceptable.
864         let mut curr = self.inner.num_senders.load(SeqCst);
865 
866         loop {
867             // If the maximum number of senders has been reached, then fail
868             if curr == MAX_BUFFER {
869                 panic!("cannot clone `Sender` -- too many outstanding senders");
870             }
871 
872             debug_assert!(curr < MAX_BUFFER);
873 
874             let next = curr + 1;
875             match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
876                 Ok(_) => {
877                     // The ABA problem doesn't matter here. We only care that the
878                     // number of senders never exceeds the maximum.
879                     return Self { inner: self.inner.clone() };
880                 }
881                 Err(actual) => curr = actual,
882             }
883         }
884     }
885 }
886 
887 impl<T> Clone for BoundedSenderInner<T> {
clone(&self) -> Self888     fn clone(&self) -> Self {
889         // Since this atomic op isn't actually guarding any memory and we don't
890         // care about any orderings besides the ordering on the single atomic
891         // variable, a relaxed ordering is acceptable.
892         let mut curr = self.inner.num_senders.load(SeqCst);
893 
894         loop {
895             // If the maximum number of senders has been reached, then fail
896             if curr == self.inner.max_senders() {
897                 panic!("cannot clone `Sender` -- too many outstanding senders");
898             }
899 
900             debug_assert!(curr < self.inner.max_senders());
901 
902             let next = curr + 1;
903             match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
904                 Ok(_) => {
905                     // The ABA problem doesn't matter here. We only care that the
906                     // number of senders never exceeds the maximum.
907                     return Self {
908                         inner: self.inner.clone(),
909                         sender_task: Arc::new(Mutex::new(SenderTask::new())),
910                         maybe_parked: false,
911                     };
912                 }
913                 Err(actual) => curr = actual,
914             }
915         }
916     }
917 }
918 
919 impl<T> Drop for UnboundedSenderInner<T> {
drop(&mut self)920     fn drop(&mut self) {
921         // Ordering between variables don't matter here
922         let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
923 
924         if prev == 1 {
925             self.close_channel();
926         }
927     }
928 }
929 
930 impl<T> Drop for BoundedSenderInner<T> {
drop(&mut self)931     fn drop(&mut self) {
932         // Ordering between variables don't matter here
933         let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
934 
935         if prev == 1 {
936             self.close_channel();
937         }
938     }
939 }
940 
941 impl<T> fmt::Debug for Sender<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result942     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
943         f.debug_struct("Sender").field("closed", &self.is_closed()).finish()
944     }
945 }
946 
947 impl<T> fmt::Debug for UnboundedSender<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result948     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
949         f.debug_struct("UnboundedSender").field("closed", &self.is_closed()).finish()
950     }
951 }
952 
953 /*
954  *
955  * ===== impl Receiver =====
956  *
957  */
958 
959 impl<T> Receiver<T> {
960     /// Closes the receiving half of a channel, without dropping it.
961     ///
962     /// This prevents any further messages from being sent on the channel while
963     /// still enabling the receiver to drain messages that are buffered.
close(&mut self)964     pub fn close(&mut self) {
965         if let Some(inner) = &mut self.inner {
966             inner.set_closed();
967 
968             // Wake up any threads waiting as they'll see that we've closed the
969             // channel and will continue on their merry way.
970             while let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
971                 task.lock().unwrap().notify();
972             }
973         }
974     }
975 
976     /// Tries to receive the next message without notifying a context if empty.
977     ///
978     /// It is not recommended to call this function from inside of a future,
979     /// only when you've otherwise arranged to be notified when the channel is
980     /// no longer empty.
981     ///
982     /// This function returns:
983     /// * `Ok(Some(t))` when message is fetched
984     /// * `Ok(None)` when channel is closed and no messages left in the queue
985     /// * `Err(e)` when there are no messages available, but channel is not yet closed
try_next(&mut self) -> Result<Option<T>, TryRecvError>986     pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
987         match self.next_message() {
988             Poll::Ready(msg) => Ok(msg),
989             Poll::Pending => Err(TryRecvError { _priv: () }),
990         }
991     }
992 
next_message(&mut self) -> Poll<Option<T>>993     fn next_message(&mut self) -> Poll<Option<T>> {
994         let inner = match self.inner.as_mut() {
995             None => return Poll::Ready(None),
996             Some(inner) => inner,
997         };
998         // Pop off a message
999         match unsafe { inner.message_queue.pop_spin() } {
1000             Some(msg) => {
1001                 // If there are any parked task handles in the parked queue,
1002                 // pop one and unpark it.
1003                 self.unpark_one();
1004 
1005                 // Decrement number of messages
1006                 self.dec_num_messages();
1007 
1008                 Poll::Ready(Some(msg))
1009             }
1010             None => {
1011                 let state = decode_state(inner.state.load(SeqCst));
1012                 if state.is_closed() {
1013                     // If closed flag is set AND there are no pending messages
1014                     // it means end of stream
1015                     self.inner = None;
1016                     Poll::Ready(None)
1017                 } else {
1018                     // If queue is open, we need to return Pending
1019                     // to be woken up when new messages arrive.
1020                     // If queue is closed but num_messages is non-zero,
1021                     // it means that senders updated the state,
1022                     // but didn't put message to queue yet,
1023                     // so we need to park until sender unparks the task
1024                     // after queueing the message.
1025                     Poll::Pending
1026                 }
1027             }
1028         }
1029     }
1030 
1031     // Unpark a single task handle if there is one pending in the parked queue
unpark_one(&mut self)1032     fn unpark_one(&mut self) {
1033         if let Some(inner) = &mut self.inner {
1034             if let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
1035                 task.lock().unwrap().notify();
1036             }
1037         }
1038     }
1039 
dec_num_messages(&self)1040     fn dec_num_messages(&self) {
1041         if let Some(inner) = &self.inner {
1042             // OPEN_MASK is highest bit, so it's unaffected by subtraction
1043             // unless there's underflow, and we know there's no underflow
1044             // because number of messages at this point is always > 0.
1045             inner.state.fetch_sub(1, SeqCst);
1046         }
1047     }
1048 }
1049 
1050 // The receiver does not ever take a Pin to the inner T
1051 impl<T> Unpin for Receiver<T> {}
1052 
1053 impl<T> FusedStream for Receiver<T> {
is_terminated(&self) -> bool1054     fn is_terminated(&self) -> bool {
1055         self.inner.is_none()
1056     }
1057 }
1058 
1059 impl<T> Stream for Receiver<T> {
1060     type Item = T;
1061 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>>1062     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1063         // Try to read a message off of the message queue.
1064         match self.next_message() {
1065             Poll::Ready(msg) => {
1066                 if msg.is_none() {
1067                     self.inner = None;
1068                 }
1069                 Poll::Ready(msg)
1070             }
1071             Poll::Pending => {
1072                 // There are no messages to read, in this case, park.
1073                 self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1074                 // Check queue again after parking to prevent race condition:
1075                 // a message could be added to the queue after previous `next_message`
1076                 // before `register` call.
1077                 self.next_message()
1078             }
1079         }
1080     }
1081 
size_hint(&self) -> (usize, Option<usize>)1082     fn size_hint(&self) -> (usize, Option<usize>) {
1083         if let Some(inner) = &self.inner {
1084             decode_state(inner.state.load(SeqCst)).size_hint()
1085         } else {
1086             (0, Some(0))
1087         }
1088     }
1089 }
1090 
1091 impl<T> Drop for Receiver<T> {
drop(&mut self)1092     fn drop(&mut self) {
1093         // Drain the channel of all pending messages
1094         self.close();
1095         if self.inner.is_some() {
1096             loop {
1097                 match self.next_message() {
1098                     Poll::Ready(Some(_)) => {}
1099                     Poll::Ready(None) => break,
1100                     Poll::Pending => {
1101                         let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1102 
1103                         // If the channel is closed, then there is no need to park.
1104                         if state.is_closed() {
1105                             break;
1106                         }
1107 
1108                         // TODO: Spinning isn't ideal, it might be worth
1109                         // investigating using a condvar or some other strategy
1110                         // here. That said, if this case is hit, then another thread
1111                         // is about to push the value into the queue and this isn't
1112                         // the only spinlock in the impl right now.
1113                         thread::yield_now();
1114                     }
1115                 }
1116             }
1117         }
1118     }
1119 }
1120 
1121 impl<T> fmt::Debug for Receiver<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1122     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1123         let closed = if let Some(ref inner) = self.inner {
1124             decode_state(inner.state.load(SeqCst)).is_closed()
1125         } else {
1126             false
1127         };
1128 
1129         f.debug_struct("Receiver").field("closed", &closed).finish()
1130     }
1131 }
1132 
1133 impl<T> UnboundedReceiver<T> {
1134     /// Closes the receiving half of a channel, without dropping it.
1135     ///
1136     /// This prevents any further messages from being sent on the channel while
1137     /// still enabling the receiver to drain messages that are buffered.
close(&mut self)1138     pub fn close(&mut self) {
1139         if let Some(inner) = &mut self.inner {
1140             inner.set_closed();
1141         }
1142     }
1143 
1144     /// Tries to receive the next message without notifying a context if empty.
1145     ///
1146     /// It is not recommended to call this function from inside of a future,
1147     /// only when you've otherwise arranged to be notified when the channel is
1148     /// no longer empty.
1149     ///
1150     /// This function returns:
1151     /// * `Ok(Some(t))` when message is fetched
1152     /// * `Ok(None)` when channel is closed and no messages left in the queue
1153     /// * `Err(e)` when there are no messages available, but channel is not yet closed
try_next(&mut self) -> Result<Option<T>, TryRecvError>1154     pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1155         match self.next_message() {
1156             Poll::Ready(msg) => Ok(msg),
1157             Poll::Pending => Err(TryRecvError { _priv: () }),
1158         }
1159     }
1160 
next_message(&mut self) -> Poll<Option<T>>1161     fn next_message(&mut self) -> Poll<Option<T>> {
1162         let inner = match self.inner.as_mut() {
1163             None => return Poll::Ready(None),
1164             Some(inner) => inner,
1165         };
1166         // Pop off a message
1167         match unsafe { inner.message_queue.pop_spin() } {
1168             Some(msg) => {
1169                 // Decrement number of messages
1170                 self.dec_num_messages();
1171 
1172                 Poll::Ready(Some(msg))
1173             }
1174             None => {
1175                 let state = decode_state(inner.state.load(SeqCst));
1176                 if state.is_closed() {
1177                     // If closed flag is set AND there are no pending messages
1178                     // it means end of stream
1179                     self.inner = None;
1180                     Poll::Ready(None)
1181                 } else {
1182                     // If queue is open, we need to return Pending
1183                     // to be woken up when new messages arrive.
1184                     // If queue is closed but num_messages is non-zero,
1185                     // it means that senders updated the state,
1186                     // but didn't put message to queue yet,
1187                     // so we need to park until sender unparks the task
1188                     // after queueing the message.
1189                     Poll::Pending
1190                 }
1191             }
1192         }
1193     }
1194 
dec_num_messages(&self)1195     fn dec_num_messages(&self) {
1196         if let Some(inner) = &self.inner {
1197             // OPEN_MASK is highest bit, so it's unaffected by subtraction
1198             // unless there's underflow, and we know there's no underflow
1199             // because number of messages at this point is always > 0.
1200             inner.state.fetch_sub(1, SeqCst);
1201         }
1202     }
1203 }
1204 
1205 impl<T> FusedStream for UnboundedReceiver<T> {
is_terminated(&self) -> bool1206     fn is_terminated(&self) -> bool {
1207         self.inner.is_none()
1208     }
1209 }
1210 
1211 impl<T> Stream for UnboundedReceiver<T> {
1212     type Item = T;
1213 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>>1214     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1215         // Try to read a message off of the message queue.
1216         match self.next_message() {
1217             Poll::Ready(msg) => {
1218                 if msg.is_none() {
1219                     self.inner = None;
1220                 }
1221                 Poll::Ready(msg)
1222             }
1223             Poll::Pending => {
1224                 // There are no messages to read, in this case, park.
1225                 self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1226                 // Check queue again after parking to prevent race condition:
1227                 // a message could be added to the queue after previous `next_message`
1228                 // before `register` call.
1229                 self.next_message()
1230             }
1231         }
1232     }
1233 
size_hint(&self) -> (usize, Option<usize>)1234     fn size_hint(&self) -> (usize, Option<usize>) {
1235         if let Some(inner) = &self.inner {
1236             decode_state(inner.state.load(SeqCst)).size_hint()
1237         } else {
1238             (0, Some(0))
1239         }
1240     }
1241 }
1242 
1243 impl<T> Drop for UnboundedReceiver<T> {
drop(&mut self)1244     fn drop(&mut self) {
1245         // Drain the channel of all pending messages
1246         self.close();
1247         if self.inner.is_some() {
1248             loop {
1249                 match self.next_message() {
1250                     Poll::Ready(Some(_)) => {}
1251                     Poll::Ready(None) => break,
1252                     Poll::Pending => {
1253                         let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1254 
1255                         // If the channel is closed, then there is no need to park.
1256                         if state.is_closed() {
1257                             break;
1258                         }
1259 
1260                         // TODO: Spinning isn't ideal, it might be worth
1261                         // investigating using a condvar or some other strategy
1262                         // here. That said, if this case is hit, then another thread
1263                         // is about to push the value into the queue and this isn't
1264                         // the only spinlock in the impl right now.
1265                         thread::yield_now();
1266                     }
1267                 }
1268             }
1269         }
1270     }
1271 }
1272 
1273 impl<T> fmt::Debug for UnboundedReceiver<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1274     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1275         let closed = if let Some(ref inner) = self.inner {
1276             decode_state(inner.state.load(SeqCst)).is_closed()
1277         } else {
1278             false
1279         };
1280 
1281         f.debug_struct("Receiver").field("closed", &closed).finish()
1282     }
1283 }
1284 
1285 /*
1286  *
1287  * ===== impl Inner =====
1288  *
1289  */
1290 
1291 impl<T> UnboundedInner<T> {
1292     // Clear `open` flag in the state, keep `num_messages` intact.
set_closed(&self)1293     fn set_closed(&self) {
1294         let curr = self.state.load(SeqCst);
1295         if !decode_state(curr).is_open {
1296             return;
1297         }
1298 
1299         self.state.fetch_and(!OPEN_MASK, SeqCst);
1300     }
1301 }
1302 
1303 impl<T> BoundedInner<T> {
1304     // The return value is such that the total number of messages that can be
1305     // enqueued into the channel will never exceed MAX_CAPACITY
max_senders(&self) -> usize1306     fn max_senders(&self) -> usize {
1307         MAX_CAPACITY - self.buffer
1308     }
1309 
1310     // Clear `open` flag in the state, keep `num_messages` intact.
set_closed(&self)1311     fn set_closed(&self) {
1312         let curr = self.state.load(SeqCst);
1313         if !decode_state(curr).is_open {
1314             return;
1315         }
1316 
1317         self.state.fetch_and(!OPEN_MASK, SeqCst);
1318     }
1319 }
1320 
1321 unsafe impl<T: Send> Send for UnboundedInner<T> {}
1322 unsafe impl<T: Send> Sync for UnboundedInner<T> {}
1323 
1324 unsafe impl<T: Send> Send for BoundedInner<T> {}
1325 unsafe impl<T: Send> Sync for BoundedInner<T> {}
1326 
1327 impl State {
is_closed(&self) -> bool1328     fn is_closed(&self) -> bool {
1329         !self.is_open && self.num_messages == 0
1330     }
1331 
size_hint(&self) -> (usize, Option<usize>)1332     fn size_hint(&self) -> (usize, Option<usize>) {
1333         if self.is_open {
1334             (self.num_messages, None)
1335         } else {
1336             (self.num_messages, Some(self.num_messages))
1337         }
1338     }
1339 }
1340 
1341 /*
1342  *
1343  * ===== Helpers =====
1344  *
1345  */
1346 
decode_state(num: usize) -> State1347 fn decode_state(num: usize) -> State {
1348     State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY }
1349 }
1350 
encode_state(state: &State) -> usize1351 fn encode_state(state: &State) -> usize {
1352     let mut num = state.num_messages;
1353 
1354     if state.is_open {
1355         num |= OPEN_MASK;
1356     }
1357 
1358     num
1359 }
1360