1 //! A multi-producer, single-consumer queue for sending values across
2 //! asynchronous tasks.
3 //!
4 //! Similarly to the `std`, channel creation provides [`Receiver`] and
5 //! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to
6 //! read values out of the channel. If there is no message to read from the
7 //! channel, the current task will be notified when a new value is sent.
8 //! [`Sender`] implements the `Sink` trait and allows a task to send messages into
9 //! the channel. If the channel is at capacity, the send will be rejected and
10 //! the task will be notified when additional capacity is available. In other
11 //! words, the channel provides backpressure.
12 //!
13 //! Unbounded channels are also available using the `unbounded` constructor.
14 //!
15 //! # Disconnection
16 //!
17 //! When all [`Sender`] handles have been dropped, it is no longer
18 //! possible to send values into the channel. This is considered the termination
19 //! event of the stream. As such, [`Receiver::poll_next`]
20 //! will return `Ok(Ready(None))`.
21 //!
22 //! If the [`Receiver`] handle is dropped, then messages can no longer
23 //! be read out of the channel. In this case, all further attempts to send will
24 //! result in an error.
25 //!
26 //! # Clean Shutdown
27 //!
28 //! If the [`Receiver`] is simply dropped, then it is possible for
29 //! there to be messages still in the channel that will not be processed. As
30 //! such, it is usually desirable to perform a "clean" shutdown. To do this, the
31 //! receiver will first call `close`, which will prevent any further messages to
32 //! be sent into the channel. Then, the receiver consumes the channel to
33 //! completion, at which point the receiver can be dropped.
34 //!
35 //! [`Sender`]: struct.Sender.html
36 //! [`Receiver`]: struct.Receiver.html
37 //! [`Stream`]: ../../futures_core/stream/trait.Stream.html
38 //! [`Receiver::poll_next`]:
39 //! ../../futures_core/stream/trait.Stream.html#tymethod.poll_next
40
41 // At the core, the channel uses an atomic FIFO queue for message passing. This
42 // queue is used as the primary coordination primitive. In order to enforce
43 // capacity limits and handle back pressure, a secondary FIFO queue is used to
44 // send parked task handles.
45 //
46 // The general idea is that the channel is created with a `buffer` size of `n`.
47 // The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
48 // slot to hold a message. This allows `Sender` to know for a fact that a send
49 // will succeed *before* starting to do the actual work of sending the value.
50 // Since most of this work is lock-free, once the work starts, it is impossible
51 // to safely revert.
52 //
53 // If the sender is unable to process a send operation, then the current
54 // task is parked and the handle is sent on the parked task queue.
55 //
56 // Note that the implementation guarantees that the channel capacity will never
57 // exceed the configured limit, however there is no *strict* guarantee that the
58 // receiver will wake up a parked task *immediately* when a slot becomes
59 // available. However, it will almost always unpark a task when a slot becomes
60 // available and it is *guaranteed* that a sender will be unparked when the
61 // message that caused the sender to become parked is read out of the channel.
62 //
63 // The steps for sending a message are roughly:
64 //
65 // 1) Increment the channel message count
66 // 2) If the channel is at capacity, push the task handle onto the wait queue
67 // 3) Push the message onto the message queue.
68 //
69 // The steps for receiving a message are roughly:
70 //
71 // 1) Pop a message from the message queue
72 // 2) Pop a task handle from the wait queue
73 // 3) Decrement the channel message count.
74 //
75 // It's important for the order of operations on lock-free structures to happen
76 // in reverse order between the sender and receiver. This makes the message
77 // queue the primary coordination structure and establishes the necessary
78 // happens-before semantics required for the acquire / release semantics used
79 // by the queue structure.
80
81 use futures_core::stream::{FusedStream, Stream};
82 use futures_core::task::{Context, Poll, Waker};
83 use futures_core::task::__internal::AtomicWaker;
84 use std::fmt;
85 use std::pin::Pin;
86 use std::sync::{Arc, Mutex};
87 use std::sync::atomic::AtomicUsize;
88 use std::sync::atomic::Ordering::SeqCst;
89 use std::thread;
90
91 use crate::mpsc::queue::Queue;
92
93 mod queue;
94 #[cfg(feature = "sink")]
95 mod sink_impl;
96
97 #[derive(Debug)]
98 struct UnboundedSenderInner<T> {
99 // Channel state shared between the sender and receiver.
100 inner: Arc<UnboundedInner<T>>,
101 }
102
103 #[derive(Debug)]
104 struct BoundedSenderInner<T> {
105 // Channel state shared between the sender and receiver.
106 inner: Arc<BoundedInner<T>>,
107
108 // Handle to the task that is blocked on this sender. This handle is sent
109 // to the receiver half in order to be notified when the sender becomes
110 // unblocked.
111 sender_task: Arc<Mutex<SenderTask>>,
112
113 // `true` if the sender might be blocked. This is an optimization to avoid
114 // having to lock the mutex most of the time.
115 maybe_parked: bool,
116 }
117
118 // We never project Pin<&mut SenderInner> to `Pin<&mut T>`
119 impl<T> Unpin for UnboundedSenderInner<T> {}
120 impl<T> Unpin for BoundedSenderInner<T> {}
121
122 /// The transmission end of a bounded mpsc channel.
123 ///
124 /// This value is created by the [`channel`](channel) function.
125 #[derive(Debug)]
126 pub struct Sender<T>(Option<BoundedSenderInner<T>>);
127
128 /// The transmission end of an unbounded mpsc channel.
129 ///
130 /// This value is created by the [`unbounded`](unbounded) function.
131 #[derive(Debug)]
132 pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>);
133
134 trait AssertKinds: Send + Sync + Clone {}
135 impl AssertKinds for UnboundedSender<u32> {}
136
137 /// The receiving end of a bounded mpsc channel.
138 ///
139 /// This value is created by the [`channel`](channel) function.
140 #[derive(Debug)]
141 pub struct Receiver<T> {
142 inner: Option<Arc<BoundedInner<T>>>,
143 }
144
145 /// The receiving end of an unbounded mpsc channel.
146 ///
147 /// This value is created by the [`unbounded`](unbounded) function.
148 #[derive(Debug)]
149 pub struct UnboundedReceiver<T> {
150 inner: Option<Arc<UnboundedInner<T>>>,
151 }
152
153 // `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>`
154 impl<T> Unpin for UnboundedReceiver<T> {}
155
156 /// The error type for [`Sender`s](Sender) used as `Sink`s.
157 #[derive(Clone, Debug, PartialEq, Eq)]
158 pub struct SendError {
159 kind: SendErrorKind,
160 }
161
162 /// The error type returned from [`try_send`](Sender::try_send).
163 #[derive(Clone, PartialEq, Eq)]
164 pub struct TrySendError<T> {
165 err: SendError,
166 val: T,
167 }
168
169 #[derive(Clone, Debug, PartialEq, Eq)]
170 enum SendErrorKind {
171 Full,
172 Disconnected,
173 }
174
175 /// The error type returned from [`try_next`](Receiver::try_next).
176 pub struct TryRecvError {
177 _priv: (),
178 }
179
180 impl fmt::Display for SendError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result181 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
182 if self.is_full() {
183 write!(f, "send failed because channel is full")
184 } else {
185 write!(f, "send failed because receiver is gone")
186 }
187 }
188 }
189
190 impl std::error::Error for SendError {}
191
192 impl SendError {
193 /// Returns `true` if this error is a result of the channel being full.
is_full(&self) -> bool194 pub fn is_full(&self) -> bool {
195 match self.kind {
196 SendErrorKind::Full => true,
197 _ => false,
198 }
199 }
200
201 /// Returns `true` if this error is a result of the receiver being dropped.
is_disconnected(&self) -> bool202 pub fn is_disconnected(&self) -> bool {
203 match self.kind {
204 SendErrorKind::Disconnected => true,
205 _ => false,
206 }
207 }
208 }
209
210 impl<T> fmt::Debug for TrySendError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result211 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212 f.debug_struct("TrySendError")
213 .field("kind", &self.err.kind)
214 .finish()
215 }
216 }
217
218 impl<T> fmt::Display for TrySendError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result219 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220 if self.is_full() {
221 write!(f, "send failed because channel is full")
222 } else {
223 write!(f, "send failed because receiver is gone")
224 }
225 }
226 }
227
228 impl<T: core::any::Any> std::error::Error for TrySendError<T> {}
229
230 impl<T> TrySendError<T> {
231 /// Returns `true` if this error is a result of the channel being full.
is_full(&self) -> bool232 pub fn is_full(&self) -> bool {
233 self.err.is_full()
234 }
235
236 /// Returns `true` if this error is a result of the receiver being dropped.
is_disconnected(&self) -> bool237 pub fn is_disconnected(&self) -> bool {
238 self.err.is_disconnected()
239 }
240
241 /// Returns the message that was attempted to be sent but failed.
into_inner(self) -> T242 pub fn into_inner(self) -> T {
243 self.val
244 }
245
246 /// Drops the message and converts into a `SendError`.
into_send_error(self) -> SendError247 pub fn into_send_error(self) -> SendError {
248 self.err
249 }
250 }
251
252 impl fmt::Debug for TryRecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result253 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
254 f.debug_tuple("TryRecvError")
255 .finish()
256 }
257 }
258
259 impl fmt::Display for TryRecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result260 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261 write!(f, "receiver channel is empty")
262 }
263 }
264
265 impl std::error::Error for TryRecvError {}
266
267 #[derive(Debug)]
268 struct UnboundedInner<T> {
269 // Internal channel state. Consists of the number of messages stored in the
270 // channel as well as a flag signalling that the channel is closed.
271 state: AtomicUsize,
272
273 // Atomic, FIFO queue used to send messages to the receiver
274 message_queue: Queue<T>,
275
276 // Number of senders in existence
277 num_senders: AtomicUsize,
278
279 // Handle to the receiver's task.
280 recv_task: AtomicWaker,
281 }
282
283 #[derive(Debug)]
284 struct BoundedInner<T> {
285 // Max buffer size of the channel. If `None` then the channel is unbounded.
286 buffer: usize,
287
288 // Internal channel state. Consists of the number of messages stored in the
289 // channel as well as a flag signalling that the channel is closed.
290 state: AtomicUsize,
291
292 // Atomic, FIFO queue used to send messages to the receiver
293 message_queue: Queue<T>,
294
295 // Atomic, FIFO queue used to send parked task handles to the receiver.
296 parked_queue: Queue<Arc<Mutex<SenderTask>>>,
297
298 // Number of senders in existence
299 num_senders: AtomicUsize,
300
301 // Handle to the receiver's task.
302 recv_task: AtomicWaker,
303 }
304
305 // Struct representation of `Inner::state`.
306 #[derive(Debug, Clone, Copy)]
307 struct State {
308 // `true` when the channel is open
309 is_open: bool,
310
311 // Number of messages in the channel
312 num_messages: usize,
313 }
314
315 // The `is_open` flag is stored in the left-most bit of `Inner::state`
316 const OPEN_MASK: usize = usize::max_value() - (usize::max_value() >> 1);
317
318 // When a new channel is created, it is created in the open state with no
319 // pending messages.
320 const INIT_STATE: usize = OPEN_MASK;
321
322 // The maximum number of messages that a channel can track is `usize::max_value() >> 1`
323 const MAX_CAPACITY: usize = !(OPEN_MASK);
324
325 // The maximum requested buffer size must be less than the maximum capacity of
326 // a channel. This is because each sender gets a guaranteed slot.
327 const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
328
329 // Sent to the consumer to wake up blocked producers
330 #[derive(Debug)]
331 struct SenderTask {
332 task: Option<Waker>,
333 is_parked: bool,
334 }
335
336 impl SenderTask {
new() -> Self337 fn new() -> Self {
338 Self {
339 task: None,
340 is_parked: false,
341 }
342 }
343
notify(&mut self)344 fn notify(&mut self) {
345 self.is_parked = false;
346
347 if let Some(task) = self.task.take() {
348 task.wake();
349 }
350 }
351 }
352
353 /// Creates a bounded mpsc channel for communicating between asynchronous tasks.
354 ///
355 /// Being bounded, this channel provides backpressure to ensure that the sender
356 /// outpaces the receiver by only a limited amount. The channel's capacity is
357 /// equal to `buffer + num-senders`. In other words, each sender gets a
358 /// guaranteed slot in the channel capacity, and on top of that there are
359 /// `buffer` "first come, first serve" slots available to all senders.
360 ///
361 /// The [`Receiver`](Receiver) returned implements the
362 /// [`Stream`](futures_core::stream::Stream) trait, while [`Sender`](Sender) implements
363 /// `Sink`.
channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>)364 pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
365 // Check that the requested buffer size does not exceed the maximum buffer
366 // size permitted by the system.
367 assert!(buffer < MAX_BUFFER, "requested buffer size too large");
368
369 let inner = Arc::new(BoundedInner {
370 buffer,
371 state: AtomicUsize::new(INIT_STATE),
372 message_queue: Queue::new(),
373 parked_queue: Queue::new(),
374 num_senders: AtomicUsize::new(1),
375 recv_task: AtomicWaker::new(),
376 });
377
378 let tx = BoundedSenderInner {
379 inner: inner.clone(),
380 sender_task: Arc::new(Mutex::new(SenderTask::new())),
381 maybe_parked: false,
382 };
383
384 let rx = Receiver {
385 inner: Some(inner),
386 };
387
388 (Sender(Some(tx)), rx)
389 }
390
391 /// Creates an unbounded mpsc channel for communicating between asynchronous
392 /// tasks.
393 ///
394 /// A `send` on this channel will always succeed as long as the receive half has
395 /// not been closed. If the receiver falls behind, messages will be arbitrarily
396 /// buffered.
397 ///
398 /// **Note** that the amount of available system memory is an implicit bound to
399 /// the channel. Using an `unbounded` channel has the ability of causing the
400 /// process to run out of memory. In this case, the process will be aborted.
unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>)401 pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
402
403 let inner = Arc::new(UnboundedInner {
404 state: AtomicUsize::new(INIT_STATE),
405 message_queue: Queue::new(),
406 num_senders: AtomicUsize::new(1),
407 recv_task: AtomicWaker::new(),
408 });
409
410 let tx = UnboundedSenderInner {
411 inner: inner.clone(),
412 };
413
414 let rx = UnboundedReceiver {
415 inner: Some(inner),
416 };
417
418 (UnboundedSender(Some(tx)), rx)
419 }
420
421 /*
422 *
423 * ===== impl Sender =====
424 *
425 */
426
427 impl<T> UnboundedSenderInner<T> {
poll_ready_nb(&self) -> Poll<Result<(), SendError>>428 fn poll_ready_nb(&self) -> Poll<Result<(), SendError>> {
429 let state = decode_state(self.inner.state.load(SeqCst));
430 if state.is_open {
431 Poll::Ready(Ok(()))
432 } else {
433 Poll::Ready(Err(SendError {
434 kind: SendErrorKind::Disconnected,
435 }))
436 }
437 }
438
439
440 // Push message to the queue and signal to the receiver
queue_push_and_signal(&self, msg: T)441 fn queue_push_and_signal(&self, msg: T) {
442 // Push the message onto the message queue
443 self.inner.message_queue.push(msg);
444
445 // Signal to the receiver that a message has been enqueued. If the
446 // receiver is parked, this will unpark the task.
447 self.inner.recv_task.wake();
448 }
449
450 // Increment the number of queued messages. Returns the resulting number.
inc_num_messages(&self) -> Option<usize>451 fn inc_num_messages(&self) -> Option<usize> {
452 let mut curr = self.inner.state.load(SeqCst);
453
454 loop {
455 let mut state = decode_state(curr);
456
457 // The receiver end closed the channel.
458 if !state.is_open {
459 return None;
460 }
461
462 // This probably is never hit? Odds are the process will run out of
463 // memory first. It may be worth to return something else in this
464 // case?
465 assert!(state.num_messages < MAX_CAPACITY, "buffer space \
466 exhausted; sending this messages would overflow the state");
467
468 state.num_messages += 1;
469
470 let next = encode_state(&state);
471 match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
472 Ok(_) => {
473 return Some(state.num_messages)
474 }
475 Err(actual) => curr = actual,
476 }
477 }
478 }
479
480 /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool481 fn same_receiver(&self, other: &Self) -> bool {
482 Arc::ptr_eq(&self.inner, &other.inner)
483 }
484
485 /// Returns whether the sender send to this receiver.
is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool486 fn is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool {
487 Arc::ptr_eq(&self.inner, inner)
488 }
489
490 /// Returns pointer to the Arc containing sender
491 ///
492 /// The returned pointer is not referenced and should be only used for hashing!
ptr(&self) -> *const UnboundedInner<T>493 fn ptr(&self) -> *const UnboundedInner<T> {
494 &*self.inner
495 }
496
497 /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool498 fn is_closed(&self) -> bool {
499 !decode_state(self.inner.state.load(SeqCst)).is_open
500 }
501
502 /// Closes this channel from the sender side, preventing any new messages.
close_channel(&self)503 fn close_channel(&self) {
504 // There's no need to park this sender, its dropping,
505 // and we don't want to check for capacity, so skip
506 // that stuff from `do_send`.
507
508 self.inner.set_closed();
509 self.inner.recv_task.wake();
510 }
511 }
512
513 impl<T> BoundedSenderInner<T> {
514 /// Attempts to send a message on this `Sender`, returning the message
515 /// if there was an error.
try_send(&mut self, msg: T) -> Result<(), TrySendError<T>>516 fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
517 // If the sender is currently blocked, reject the message
518 if !self.poll_unparked(None).is_ready() {
519 return Err(TrySendError {
520 err: SendError {
521 kind: SendErrorKind::Full,
522 },
523 val: msg,
524 });
525 }
526
527 // The channel has capacity to accept the message, so send it
528 self.do_send_b(msg)
529 }
530
531 // Do the send without failing.
532 // Can be called only by bounded sender.
533 #[allow(clippy::debug_assert_with_mut_call)]
do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>>534 fn do_send_b(&mut self, msg: T)
535 -> Result<(), TrySendError<T>>
536 {
537 // Anyone callig do_send *should* make sure there is room first,
538 // but assert here for tests as a sanity check.
539 debug_assert!(self.poll_unparked(None).is_ready());
540
541 // First, increment the number of messages contained by the channel.
542 // This operation will also atomically determine if the sender task
543 // should be parked.
544 //
545 // `None` is returned in the case that the channel has been closed by the
546 // receiver. This happens when `Receiver::close` is called or the
547 // receiver is dropped.
548 let park_self = match self.inc_num_messages() {
549 Some(num_messages) => {
550 // Block if the current number of pending messages has exceeded
551 // the configured buffer size
552 num_messages > self.inner.buffer
553 }
554 None => return Err(TrySendError {
555 err: SendError {
556 kind: SendErrorKind::Disconnected,
557 },
558 val: msg,
559 }),
560 };
561
562 // If the channel has reached capacity, then the sender task needs to
563 // be parked. This will send the task handle on the parked task queue.
564 //
565 // However, when `do_send` is called while dropping the `Sender`,
566 // `task::current()` can't be called safely. In this case, in order to
567 // maintain internal consistency, a blank message is pushed onto the
568 // parked task queue.
569 if park_self {
570 self.park();
571 }
572
573 self.queue_push_and_signal(msg);
574
575 Ok(())
576 }
577
578 // Push message to the queue and signal to the receiver
queue_push_and_signal(&self, msg: T)579 fn queue_push_and_signal(&self, msg: T) {
580 // Push the message onto the message queue
581 self.inner.message_queue.push(msg);
582
583 // Signal to the receiver that a message has been enqueued. If the
584 // receiver is parked, this will unpark the task.
585 self.inner.recv_task.wake();
586 }
587
588 // Increment the number of queued messages. Returns the resulting number.
inc_num_messages(&self) -> Option<usize>589 fn inc_num_messages(&self) -> Option<usize> {
590 let mut curr = self.inner.state.load(SeqCst);
591
592 loop {
593 let mut state = decode_state(curr);
594
595 // The receiver end closed the channel.
596 if !state.is_open {
597 return None;
598 }
599
600 // This probably is never hit? Odds are the process will run out of
601 // memory first. It may be worth to return something else in this
602 // case?
603 assert!(state.num_messages < MAX_CAPACITY, "buffer space \
604 exhausted; sending this messages would overflow the state");
605
606 state.num_messages += 1;
607
608 let next = encode_state(&state);
609 match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
610 Ok(_) => {
611 return Some(state.num_messages)
612 }
613 Err(actual) => curr = actual,
614 }
615 }
616 }
617
park(&mut self)618 fn park(&mut self) {
619 {
620 let mut sender = self.sender_task.lock().unwrap();
621 sender.task = None;
622 sender.is_parked = true;
623 }
624
625 // Send handle over queue
626 let t = self.sender_task.clone();
627 self.inner.parked_queue.push(t);
628
629 // Check to make sure we weren't closed after we sent our task on the
630 // queue
631 let state = decode_state(self.inner.state.load(SeqCst));
632 self.maybe_parked = state.is_open;
633 }
634
635 /// Polls the channel to determine if there is guaranteed capacity to send
636 /// at least one item without waiting.
637 ///
638 /// # Return value
639 ///
640 /// This method returns:
641 ///
642 /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
643 /// - `Poll::Pending` if the channel may not have
644 /// capacity, in which case the current task is queued to be notified once
645 /// capacity is available;
646 /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
poll_ready( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<(), SendError>>647 fn poll_ready(
648 &mut self,
649 cx: &mut Context<'_>,
650 ) -> Poll<Result<(), SendError>> {
651 let state = decode_state(self.inner.state.load(SeqCst));
652 if !state.is_open {
653 return Poll::Ready(Err(SendError {
654 kind: SendErrorKind::Disconnected,
655 }));
656 }
657
658 self.poll_unparked(Some(cx)).map(Ok)
659 }
660
661 /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool662 fn same_receiver(&self, other: &Self) -> bool {
663 Arc::ptr_eq(&self.inner, &other.inner)
664 }
665
666 /// Returns whether the sender send to this receiver.
is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool667 fn is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool {
668 Arc::ptr_eq(&self.inner, receiver)
669 }
670
671 /// Returns pointer to the Arc containing sender
672 ///
673 /// The returned pointer is not referenced and should be only used for hashing!
ptr(&self) -> *const BoundedInner<T>674 fn ptr(&self) -> *const BoundedInner<T> {
675 &*self.inner
676 }
677
678 /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool679 fn is_closed(&self) -> bool {
680 !decode_state(self.inner.state.load(SeqCst)).is_open
681 }
682
683 /// Closes this channel from the sender side, preventing any new messages.
close_channel(&self)684 fn close_channel(&self) {
685 // There's no need to park this sender, its dropping,
686 // and we don't want to check for capacity, so skip
687 // that stuff from `do_send`.
688
689 self.inner.set_closed();
690 self.inner.recv_task.wake();
691 }
692
poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()>693 fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> {
694 // First check the `maybe_parked` variable. This avoids acquiring the
695 // lock in most cases
696 if self.maybe_parked {
697 // Get a lock on the task handle
698 let mut task = self.sender_task.lock().unwrap();
699
700 if !task.is_parked {
701 self.maybe_parked = false;
702 return Poll::Ready(())
703 }
704
705 // At this point, an unpark request is pending, so there will be an
706 // unpark sometime in the future. We just need to make sure that
707 // the correct task will be notified.
708 //
709 // Update the task in case the `Sender` has been moved to another
710 // task
711 task.task = cx.map(|cx| cx.waker().clone());
712
713 Poll::Pending
714 } else {
715 Poll::Ready(())
716 }
717 }
718 }
719
720 impl<T> Sender<T> {
721 /// Attempts to send a message on this `Sender`, returning the message
722 /// if there was an error.
try_send(&mut self, msg: T) -> Result<(), TrySendError<T>>723 pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
724 if let Some(inner) = &mut self.0 {
725 inner.try_send(msg)
726 } else {
727 Err(TrySendError {
728 err: SendError {
729 kind: SendErrorKind::Disconnected,
730 },
731 val: msg,
732 })
733 }
734 }
735
736 /// Send a message on the channel.
737 ///
738 /// This function should only be called after
739 /// [`poll_ready`](Sender::poll_ready) has reported that the channel is
740 /// ready to receive a message.
start_send(&mut self, msg: T) -> Result<(), SendError>741 pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
742 self.try_send(msg)
743 .map_err(|e| e.err)
744 }
745
746 /// Polls the channel to determine if there is guaranteed capacity to send
747 /// at least one item without waiting.
748 ///
749 /// # Return value
750 ///
751 /// This method returns:
752 ///
753 /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
754 /// - `Poll::Pending` if the channel may not have
755 /// capacity, in which case the current task is queued to be notified once
756 /// capacity is available;
757 /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
poll_ready( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<(), SendError>>758 pub fn poll_ready(
759 &mut self,
760 cx: &mut Context<'_>,
761 ) -> Poll<Result<(), SendError>> {
762 let inner = self.0.as_mut().ok_or(SendError {
763 kind: SendErrorKind::Disconnected,
764 })?;
765 inner.poll_ready(cx)
766 }
767
768 /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool769 pub fn is_closed(&self) -> bool {
770 self.0.as_ref().map(BoundedSenderInner::is_closed).unwrap_or(true)
771 }
772
773 /// Closes this channel from the sender side, preventing any new messages.
close_channel(&mut self)774 pub fn close_channel(&mut self) {
775 if let Some(inner) = &mut self.0 {
776 inner.close_channel();
777 }
778 }
779
780 /// Disconnects this sender from the channel, closing it if there are no more senders left.
disconnect(&mut self)781 pub fn disconnect(&mut self) {
782 self.0 = None;
783 }
784
785 /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool786 pub fn same_receiver(&self, other: &Self) -> bool {
787 match (&self.0, &other.0) {
788 (Some(inner), Some(other)) => inner.same_receiver(other),
789 _ => false,
790 }
791 }
792
793 /// Returns whether the sender send to this receiver.
is_connected_to(&self, receiver: &Receiver<T>) -> bool794 pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool {
795 match (&self.0, &receiver.inner) {
796 (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
797 _ => false,
798 }
799 }
800
801 /// Hashes the receiver into the provided hasher
hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher802 pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher {
803 use std::hash::Hash;
804
805 let ptr = self.0.as_ref().map(|inner| inner.ptr());
806 ptr.hash(hasher);
807 }
808 }
809
810 impl<T> UnboundedSender<T> {
811 /// Check if the channel is ready to receive a message.
poll_ready( &self, _: &mut Context<'_>, ) -> Poll<Result<(), SendError>>812 pub fn poll_ready(
813 &self,
814 _: &mut Context<'_>,
815 ) -> Poll<Result<(), SendError>> {
816 let inner = self.0.as_ref().ok_or(SendError {
817 kind: SendErrorKind::Disconnected,
818 })?;
819 inner.poll_ready_nb()
820 }
821
822 /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool823 pub fn is_closed(&self) -> bool {
824 self.0.as_ref().map(UnboundedSenderInner::is_closed).unwrap_or(true)
825 }
826
827 /// Closes this channel from the sender side, preventing any new messages.
close_channel(&self)828 pub fn close_channel(&self) {
829 if let Some(inner) = &self.0 {
830 inner.close_channel();
831 }
832 }
833
834 /// Disconnects this sender from the channel, closing it if there are no more senders left.
disconnect(&mut self)835 pub fn disconnect(&mut self) {
836 self.0 = None;
837 }
838
839 // Do the send without parking current task.
do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>>840 fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> {
841 if let Some(inner) = &self.0 {
842 if inner.inc_num_messages().is_some() {
843 inner.queue_push_and_signal(msg);
844 return Ok(());
845 }
846 }
847
848 Err(TrySendError {
849 err: SendError {
850 kind: SendErrorKind::Disconnected,
851 },
852 val: msg,
853 })
854 }
855
856 /// Send a message on the channel.
857 ///
858 /// This method should only be called after `poll_ready` has been used to
859 /// verify that the channel is ready to receive a message.
start_send(&mut self, msg: T) -> Result<(), SendError>860 pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
861 self.do_send_nb(msg)
862 .map_err(|e| e.err)
863 }
864
865 /// Sends a message along this channel.
866 ///
867 /// This is an unbounded sender, so this function differs from `Sink::send`
868 /// by ensuring the return type reflects that the channel is always ready to
869 /// receive messages.
unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>>870 pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
871 self.do_send_nb(msg)
872 }
873
874 /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool875 pub fn same_receiver(&self, other: &Self) -> bool {
876 match (&self.0, &other.0) {
877 (Some(inner), Some(other)) => inner.same_receiver(other),
878 _ => false,
879 }
880 }
881
882 /// Returns whether the sender send to this receiver.
is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool883 pub fn is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool {
884 match (&self.0, &receiver.inner) {
885 (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
886 _ => false,
887 }
888 }
889
890 /// Hashes the receiver into the provided hasher
hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher891 pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher {
892 use std::hash::Hash;
893
894 let ptr = self.0.as_ref().map(|inner| inner.ptr());
895 ptr.hash(hasher);
896 }
897 }
898
899 impl<T> Clone for Sender<T> {
clone(&self) -> Self900 fn clone(&self) -> Self {
901 Self(self.0.clone())
902 }
903 }
904
905 impl<T> Clone for UnboundedSender<T> {
clone(&self) -> Self906 fn clone(&self) -> Self {
907 Self(self.0.clone())
908 }
909 }
910
911 impl<T> Clone for UnboundedSenderInner<T> {
clone(&self) -> Self912 fn clone(&self) -> Self {
913 // Since this atomic op isn't actually guarding any memory and we don't
914 // care about any orderings besides the ordering on the single atomic
915 // variable, a relaxed ordering is acceptable.
916 let mut curr = self.inner.num_senders.load(SeqCst);
917
918 loop {
919 // If the maximum number of senders has been reached, then fail
920 if curr == MAX_BUFFER {
921 panic!("cannot clone `Sender` -- too many outstanding senders");
922 }
923
924 debug_assert!(curr < MAX_BUFFER);
925
926 let next = curr + 1;
927 match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
928 Ok(_) => {
929 // The ABA problem doesn't matter here. We only care that the
930 // number of senders never exceeds the maximum.
931 return Self {
932 inner: self.inner.clone(),
933 };
934 }
935 Err(actual) => curr = actual,
936 }
937 }
938 }
939 }
940
941 impl<T> Clone for BoundedSenderInner<T> {
clone(&self) -> Self942 fn clone(&self) -> Self {
943 // Since this atomic op isn't actually guarding any memory and we don't
944 // care about any orderings besides the ordering on the single atomic
945 // variable, a relaxed ordering is acceptable.
946 let mut curr = self.inner.num_senders.load(SeqCst);
947
948 loop {
949 // If the maximum number of senders has been reached, then fail
950 if curr == self.inner.max_senders() {
951 panic!("cannot clone `Sender` -- too many outstanding senders");
952 }
953
954 debug_assert!(curr < self.inner.max_senders());
955
956 let next = curr + 1;
957 match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
958 Ok(_) => {
959 // The ABA problem doesn't matter here. We only care that the
960 // number of senders never exceeds the maximum.
961 return Self {
962 inner: self.inner.clone(),
963 sender_task: Arc::new(Mutex::new(SenderTask::new())),
964 maybe_parked: false,
965 };
966 }
967 Err(actual) => curr = actual,
968 }
969 }
970 }
971 }
972
973 impl<T> Drop for UnboundedSenderInner<T> {
drop(&mut self)974 fn drop(&mut self) {
975 // Ordering between variables don't matter here
976 let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
977
978 if prev == 1 {
979 self.close_channel();
980 }
981 }
982 }
983
984 impl<T> Drop for BoundedSenderInner<T> {
drop(&mut self)985 fn drop(&mut self) {
986 // Ordering between variables don't matter here
987 let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
988
989 if prev == 1 {
990 self.close_channel();
991 }
992 }
993 }
994
995 /*
996 *
997 * ===== impl Receiver =====
998 *
999 */
1000
1001 impl<T> Receiver<T> {
1002 /// Closes the receiving half of a channel, without dropping it.
1003 ///
1004 /// This prevents any further messages from being sent on the channel while
1005 /// still enabling the receiver to drain messages that are buffered.
close(&mut self)1006 pub fn close(&mut self) {
1007 if let Some(inner) = &mut self.inner {
1008 inner.set_closed();
1009
1010 // Wake up any threads waiting as they'll see that we've closed the
1011 // channel and will continue on their merry way.
1012 while let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
1013 task.lock().unwrap().notify();
1014 }
1015 }
1016 }
1017
1018 /// Tries to receive the next message without notifying a context if empty.
1019 ///
1020 /// It is not recommended to call this function from inside of a future,
1021 /// only when you've otherwise arranged to be notified when the channel is
1022 /// no longer empty.
1023 ///
1024 /// This function will panic if called after `try_next` or `poll_next` has
1025 /// returned `None`.
try_next(&mut self) -> Result<Option<T>, TryRecvError>1026 pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1027 match self.next_message() {
1028 Poll::Ready(msg) => {
1029 Ok(msg)
1030 },
1031 Poll::Pending => Err(TryRecvError { _priv: () }),
1032 }
1033 }
1034
next_message(&mut self) -> Poll<Option<T>>1035 fn next_message(&mut self) -> Poll<Option<T>> {
1036 let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
1037 // Pop off a message
1038 match unsafe { inner.message_queue.pop_spin() } {
1039 Some(msg) => {
1040 // If there are any parked task handles in the parked queue,
1041 // pop one and unpark it.
1042 self.unpark_one();
1043
1044 // Decrement number of messages
1045 self.dec_num_messages();
1046
1047 Poll::Ready(Some(msg))
1048 }
1049 None => {
1050 let state = decode_state(inner.state.load(SeqCst));
1051 if state.is_closed() {
1052 // If closed flag is set AND there are no pending messages
1053 // it means end of stream
1054 self.inner = None;
1055 Poll::Ready(None)
1056 } else {
1057 // If queue is open, we need to return Pending
1058 // to be woken up when new messages arrive.
1059 // If queue is closed but num_messages is non-zero,
1060 // it means that senders updated the state,
1061 // but didn't put message to queue yet,
1062 // so we need to park until sender unparks the task
1063 // after queueing the message.
1064 Poll::Pending
1065 }
1066 }
1067 }
1068 }
1069
1070 // Unpark a single task handle if there is one pending in the parked queue
unpark_one(&mut self)1071 fn unpark_one(&mut self) {
1072 if let Some(inner) = &mut self.inner {
1073 if let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
1074 task.lock().unwrap().notify();
1075 }
1076 }
1077 }
1078
dec_num_messages(&self)1079 fn dec_num_messages(&self) {
1080 if let Some(inner) = &self.inner {
1081 // OPEN_MASK is highest bit, so it's unaffected by subtraction
1082 // unless there's underflow, and we know there's no underflow
1083 // because number of messages at this point is always > 0.
1084 inner.state.fetch_sub(1, SeqCst);
1085 }
1086 }
1087 }
1088
1089 // The receiver does not ever take a Pin to the inner T
1090 impl<T> Unpin for Receiver<T> {}
1091
1092 impl<T> FusedStream for Receiver<T> {
is_terminated(&self) -> bool1093 fn is_terminated(&self) -> bool {
1094 self.inner.is_none()
1095 }
1096 }
1097
1098 impl<T> Stream for Receiver<T> {
1099 type Item = T;
1100
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<T>>1101 fn poll_next(
1102 mut self: Pin<&mut Self>,
1103 cx: &mut Context<'_>,
1104 ) -> Poll<Option<T>> {
1105 // Try to read a message off of the message queue.
1106 match self.next_message() {
1107 Poll::Ready(msg) => {
1108 if msg.is_none() {
1109 self.inner = None;
1110 }
1111 Poll::Ready(msg)
1112 },
1113 Poll::Pending => {
1114 // There are no messages to read, in this case, park.
1115 self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1116 // Check queue again after parking to prevent race condition:
1117 // a message could be added to the queue after previous `next_message`
1118 // before `register` call.
1119 self.next_message()
1120 }
1121 }
1122 }
1123 }
1124
1125 impl<T> Drop for Receiver<T> {
drop(&mut self)1126 fn drop(&mut self) {
1127 // Drain the channel of all pending messages
1128 self.close();
1129 if self.inner.is_some() {
1130 loop {
1131 match self.next_message() {
1132 Poll::Ready(Some(_)) => {}
1133 Poll::Ready(None) => break,
1134 Poll::Pending => {
1135 let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1136
1137 // If the channel is closed, then there is no need to park.
1138 if state.is_closed() {
1139 break;
1140 }
1141
1142 // TODO: Spinning isn't ideal, it might be worth
1143 // investigating using a condvar or some other strategy
1144 // here. That said, if this case is hit, then another thread
1145 // is about to push the value into the queue and this isn't
1146 // the only spinlock in the impl right now.
1147 thread::yield_now();
1148 }
1149 }
1150 }
1151 }
1152 }
1153 }
1154
1155 impl<T> UnboundedReceiver<T> {
1156 /// Closes the receiving half of a channel, without dropping it.
1157 ///
1158 /// This prevents any further messages from being sent on the channel while
1159 /// still enabling the receiver to drain messages that are buffered.
close(&mut self)1160 pub fn close(&mut self) {
1161 if let Some(inner) = &mut self.inner {
1162 inner.set_closed();
1163 }
1164 }
1165
1166 /// Tries to receive the next message without notifying a context if empty.
1167 ///
1168 /// It is not recommended to call this function from inside of a future,
1169 /// only when you've otherwise arranged to be notified when the channel is
1170 /// no longer empty.
1171 ///
1172 /// This function will panic if called after `try_next` or `poll_next` has
1173 /// returned `None`.
try_next(&mut self) -> Result<Option<T>, TryRecvError>1174 pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1175 match self.next_message() {
1176 Poll::Ready(msg) => {
1177 Ok(msg)
1178 },
1179 Poll::Pending => Err(TryRecvError { _priv: () }),
1180 }
1181 }
1182
next_message(&mut self) -> Poll<Option<T>>1183 fn next_message(&mut self) -> Poll<Option<T>> {
1184 let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
1185 // Pop off a message
1186 match unsafe { inner.message_queue.pop_spin() } {
1187 Some(msg) => {
1188 // Decrement number of messages
1189 self.dec_num_messages();
1190
1191 Poll::Ready(Some(msg))
1192 }
1193 None => {
1194 let state = decode_state(inner.state.load(SeqCst));
1195 if state.is_closed() {
1196 // If closed flag is set AND there are no pending messages
1197 // it means end of stream
1198 self.inner = None;
1199 Poll::Ready(None)
1200 } else {
1201 // If queue is open, we need to return Pending
1202 // to be woken up when new messages arrive.
1203 // If queue is closed but num_messages is non-zero,
1204 // it means that senders updated the state,
1205 // but didn't put message to queue yet,
1206 // so we need to park until sender unparks the task
1207 // after queueing the message.
1208 Poll::Pending
1209 }
1210 }
1211 }
1212 }
1213
dec_num_messages(&self)1214 fn dec_num_messages(&self) {
1215 if let Some(inner) = &self.inner {
1216 // OPEN_MASK is highest bit, so it's unaffected by subtraction
1217 // unless there's underflow, and we know there's no underflow
1218 // because number of messages at this point is always > 0.
1219 inner.state.fetch_sub(1, SeqCst);
1220 }
1221 }
1222 }
1223
1224 impl<T> FusedStream for UnboundedReceiver<T> {
is_terminated(&self) -> bool1225 fn is_terminated(&self) -> bool {
1226 self.inner.is_none()
1227 }
1228 }
1229
1230 impl<T> Stream for UnboundedReceiver<T> {
1231 type Item = T;
1232
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<T>>1233 fn poll_next(
1234 mut self: Pin<&mut Self>,
1235 cx: &mut Context<'_>,
1236 ) -> Poll<Option<T>> {
1237 // Try to read a message off of the message queue.
1238 match self.next_message() {
1239 Poll::Ready(msg) => {
1240 if msg.is_none() {
1241 self.inner = None;
1242 }
1243 Poll::Ready(msg)
1244 },
1245 Poll::Pending => {
1246 // There are no messages to read, in this case, park.
1247 self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1248 // Check queue again after parking to prevent race condition:
1249 // a message could be added to the queue after previous `next_message`
1250 // before `register` call.
1251 self.next_message()
1252 }
1253 }
1254 }
1255 }
1256
1257 impl<T> Drop for UnboundedReceiver<T> {
drop(&mut self)1258 fn drop(&mut self) {
1259 // Drain the channel of all pending messages
1260 self.close();
1261 if self.inner.is_some() {
1262 loop {
1263 match self.next_message() {
1264 Poll::Ready(Some(_)) => {}
1265 Poll::Ready(None) => break,
1266 Poll::Pending => {
1267 let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1268
1269 // If the channel is closed, then there is no need to park.
1270 if state.is_closed() {
1271 break;
1272 }
1273
1274 // TODO: Spinning isn't ideal, it might be worth
1275 // investigating using a condvar or some other strategy
1276 // here. That said, if this case is hit, then another thread
1277 // is about to push the value into the queue and this isn't
1278 // the only spinlock in the impl right now.
1279 thread::yield_now();
1280 }
1281 }
1282 }
1283 }
1284 }
1285 }
1286
1287 /*
1288 *
1289 * ===== impl Inner =====
1290 *
1291 */
1292
1293 impl<T> UnboundedInner<T> {
1294 // Clear `open` flag in the state, keep `num_messages` intact.
set_closed(&self)1295 fn set_closed(&self) {
1296 let curr = self.state.load(SeqCst);
1297 if !decode_state(curr).is_open {
1298 return;
1299 }
1300
1301 self.state.fetch_and(!OPEN_MASK, SeqCst);
1302 }
1303 }
1304
1305 impl<T> BoundedInner<T> {
1306 // The return value is such that the total number of messages that can be
1307 // enqueued into the channel will never exceed MAX_CAPACITY
max_senders(&self) -> usize1308 fn max_senders(&self) -> usize {
1309 MAX_CAPACITY - self.buffer
1310 }
1311
1312 // Clear `open` flag in the state, keep `num_messages` intact.
set_closed(&self)1313 fn set_closed(&self) {
1314 let curr = self.state.load(SeqCst);
1315 if !decode_state(curr).is_open {
1316 return;
1317 }
1318
1319 self.state.fetch_and(!OPEN_MASK, SeqCst);
1320 }
1321 }
1322
1323 unsafe impl<T: Send> Send for UnboundedInner<T> {}
1324 unsafe impl<T: Send> Sync for UnboundedInner<T> {}
1325
1326 unsafe impl<T: Send> Send for BoundedInner<T> {}
1327 unsafe impl<T: Send> Sync for BoundedInner<T> {}
1328
1329 impl State {
is_closed(&self) -> bool1330 fn is_closed(&self) -> bool {
1331 !self.is_open && self.num_messages == 0
1332 }
1333 }
1334
1335 /*
1336 *
1337 * ===== Helpers =====
1338 *
1339 */
1340
decode_state(num: usize) -> State1341 fn decode_state(num: usize) -> State {
1342 State {
1343 is_open: num & OPEN_MASK == OPEN_MASK,
1344 num_messages: num & MAX_CAPACITY,
1345 }
1346 }
1347
encode_state(state: &State) -> usize1348 fn encode_state(state: &State) -> usize {
1349 let mut num = state.num_messages;
1350
1351 if state.is_open {
1352 num |= OPEN_MASK;
1353 }
1354
1355 num
1356 }
1357