• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Oneshot spsc (single producer, single consumer) channel. Meaning each channel instance
2 //! can only transport a single message. This has a few nice outcomes. One thing is that
3 //! the implementation can be very efficient, utilizing the knowledge that there will
4 //! only be one message. But more importantly, it allows the API to be expressed in such
5 //! a way that certain edge cases that you don't want to care about when only sending a
6 //! single message on a channel does not exist. For example: The sender can't be copied
7 //! or cloned, and the send method takes ownership and consumes the sender.
8 //! So you are guaranteed, at the type level, that there can only be one message sent.
9 //!
10 //! The sender's send method is non-blocking, and potentially lock- and wait-free.
11 //! See documentation on [Sender::send] for situations where it might not be fully wait-free.
12 //! The receiver supports both lock- and wait-free `try_recv` as well as indefinite and time
13 //! limited thread blocking receive operations. The receiver also implements `Future` and
14 //! supports asynchronously awaiting the message.
15 //!
16 //!
17 //! # Examples
18 //!
19 //! This example sets up a background worker that processes requests coming in on a standard
20 //! mpsc channel and replies on a oneshot channel provided with each request. The worker can
21 //! be interacted with both from sync and async contexts since the oneshot receiver
22 //! can receive both blocking and async.
23 //!
24 //! ```rust
25 //! use std::sync::mpsc;
26 //! use std::thread;
27 //! use std::time::Duration;
28 //!
29 //! type Request = String;
30 //!
31 //! // Starts a background thread performing some computation on requests sent to it.
32 //! // Delivers the response back over a oneshot channel.
33 //! fn spawn_processing_thread() -> mpsc::Sender<(Request, oneshot::Sender<usize>)> {
34 //!     let (request_sender, request_receiver) = mpsc::channel::<(Request, oneshot::Sender<usize>)>();
35 //!     thread::spawn(move || {
36 //!         for (request_data, response_sender) in request_receiver.iter() {
37 //!             let compute_operation = || request_data.len();
38 //!             let _ = response_sender.send(compute_operation()); // <- Send on the oneshot channel
39 //!         }
40 //!     });
41 //!     request_sender
42 //! }
43 //!
44 //! let processor = spawn_processing_thread();
45 //!
46 //! // If compiled with `std` the library can receive messages with timeout on regular threads
47 //! #[cfg(feature = "std")] {
48 //!     let (response_sender, response_receiver) = oneshot::channel();
49 //!     let request = Request::from("data from sync thread");
50 //!
51 //!     processor.send((request, response_sender)).expect("Processor down");
52 //!     match response_receiver.recv_timeout(Duration::from_secs(1)) { // <- Receive on the oneshot channel
53 //!         Ok(result) => println!("Processor returned {}", result),
54 //!         Err(oneshot::RecvTimeoutError::Timeout) => eprintln!("Processor was too slow"),
55 //!         Err(oneshot::RecvTimeoutError::Disconnected) => panic!("Processor exited"),
56 //!     }
57 //! }
58 //!
59 //! // If compiled with the `async` feature, the `Receiver` can be awaited in an async context
60 //! #[cfg(feature = "async")] {
61 //!     tokio::runtime::Runtime::new()
62 //!         .unwrap()
63 //!         .block_on(async move {
64 //!             let (response_sender, response_receiver) = oneshot::channel();
65 //!             let request = Request::from("data from sync thread");
66 //!
67 //!             processor.send((request, response_sender)).expect("Processor down");
68 //!             match response_receiver.await { // <- Receive on the oneshot channel asynchronously
69 //!                 Ok(result) => println!("Processor returned {}", result),
70 //!                 Err(_e) => panic!("Processor exited"),
71 //!             }
72 //!         });
73 //! }
74 //! ```
75 //!
76 //! # Sync vs async
77 //!
78 //! The main motivation for writing this library was that there were no (known to me) channel
79 //! implementations allowing you to seamlessly send messages between a normal thread and an async
80 //! task, or the other way around. If message passing is the way you are communicating, of course
81 //! that should work smoothly between the sync and async parts of the program!
82 //!
83 //! This library achieves that by having a fast and cheap send operation that can
84 //! be used in both sync threads and async tasks. The receiver has both thread blocking
85 //! receive methods for synchronous usage, and implements `Future` for asynchronous usage.
86 //!
87 //! The receiving endpoint of this channel implements Rust's `Future` trait and can be waited on
88 //! in an asynchronous task. This implementation is completely executor/runtime agnostic. It should
89 //! be possible to use this library with any executor.
90 //!
91 
92 // # Implementation description
93 //
94 // When a channel is created via the channel function, it creates a single heap allocation
95 // containing:
96 // * A one byte atomic integer that represents the current channel state,
97 // * Uninitialized memory to fit the message,
98 // * Uninitialized memory to fit the waker that can wake the receiving task or thread up.
99 //
100 // The size of the waker depends on which features are activated, it ranges from 0 to 24 bytes[1].
101 // So with all features enabled (the default) each channel allocates 25 bytes plus the size of the
102 // message, plus any padding needed to get correct memory alignment.
103 //
104 // The Sender and Receiver only holds a raw pointer to the heap channel object. The last endpoint
105 // to be consumed or dropped is responsible for freeing the heap memory. The first endpoint to
106 // be consumed or dropped signal via the state that it is gone. And the second one see this and
107 // frees the memory.
108 //
109 // ## Footnotes
110 //
111 // [1]: Mind that the waker only takes zero bytes when all features are disabled, making it
112 //      impossible to *wait* for the message. `try_recv` the only available method in this scenario.
113 
114 #![deny(rust_2018_idioms)]
115 #![cfg_attr(not(feature = "std"), no_std)]
116 
117 #[cfg(not(loom))]
118 extern crate alloc;
119 
120 use core::{
121     marker::PhantomData,
122     mem::{self, MaybeUninit},
123     ptr::{self, NonNull},
124 };
125 
126 #[cfg(not(loom))]
127 use core::{
128     cell::UnsafeCell,
129     sync::atomic::{fence, AtomicU8, Ordering::*},
130 };
131 #[cfg(loom)]
132 use loom::{
133     cell::UnsafeCell,
134     sync::atomic::{fence, AtomicU8, Ordering::*},
135 };
136 
137 #[cfg(all(feature = "async", not(loom)))]
138 use core::hint;
139 #[cfg(all(feature = "async", loom))]
140 use loom::hint;
141 
142 #[cfg(feature = "async")]
143 use core::{
144     pin::Pin,
145     task::{self, Poll},
146 };
147 #[cfg(feature = "std")]
148 use std::time::{Duration, Instant};
149 
150 #[cfg(feature = "std")]
151 mod thread {
152     #[cfg(not(loom))]
153     pub use std::thread::{current, park, park_timeout, yield_now, Thread};
154 
155     #[cfg(loom)]
156     pub use loom::thread::{current, park, yield_now, Thread};
157 
158     // loom does not support parking with a timeout. So we just
159     // yield. This means that the "park" will "spuriously" wake up
160     // way too early. But the code should properly handle this.
161     // One thing to note is that very short timeouts are needed
162     // when using loom, since otherwise the looping will cause
163     // an overflow in loom.
164     #[cfg(loom)]
park_timeout(_timeout: std::time::Duration)165     pub fn park_timeout(_timeout: std::time::Duration) {
166         loom::thread::yield_now()
167     }
168 }
169 
170 #[cfg(loom)]
171 mod loombox;
172 #[cfg(not(loom))]
173 use alloc::boxed::Box;
174 #[cfg(loom)]
175 use loombox::Box;
176 
177 mod errors;
178 pub use errors::{RecvError, RecvTimeoutError, SendError, TryRecvError};
179 
180 /// Creates a new oneshot channel and returns the two endpoints, [`Sender`] and [`Receiver`].
channel<T>() -> (Sender<T>, Receiver<T>)181 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
182     // Allocate the channel on the heap and get the pointer.
183     // The last endpoint of the channel to be alive is responsible for freeing the channel
184     // and dropping any object that might have been written to it.
185 
186     let channel_ptr = Box::into_raw(Box::new(Channel::new()));
187 
188     // SAFETY: `channel_ptr` came from a Box and thus is not null
189     let channel_ptr = unsafe { NonNull::new_unchecked(channel_ptr) };
190 
191     (
192         Sender {
193             channel_ptr,
194             _invariant: PhantomData,
195         },
196         Receiver { channel_ptr },
197     )
198 }
199 
200 #[derive(Debug)]
201 pub struct Sender<T> {
202     channel_ptr: NonNull<Channel<T>>,
203     // In reality we want contravariance, however we can't obtain that.
204     //
205     // Consider the following scenario:
206     // ```
207     // let (mut tx, rx) = channel::<&'short u8>();
208     // let (tx2, rx2) = channel::<&'long u8>();
209     //
210     // tx = tx2;
211     //
212     // // Pretend short_ref is some &'short u8
213     // tx.send(short_ref).unwrap();
214     // let long_ref = rx2.recv().unwrap();
215     // ```
216     //
217     // If this type were covariant then we could safely extend lifetimes, which is not okay.
218     // Hence, we enforce invariance.
219     _invariant: PhantomData<fn(T) -> T>,
220 }
221 
222 #[derive(Debug)]
223 pub struct Receiver<T> {
224     // Covariance is the right choice here. Consider the example presented in Sender, and you'll
225     // see that if we replaced `rx` instead then we would get the expected behavior
226     channel_ptr: NonNull<Channel<T>>,
227 }
228 
229 unsafe impl<T: Send> Send for Sender<T> {}
230 unsafe impl<T: Send> Send for Receiver<T> {}
231 impl<T> Unpin for Receiver<T> {}
232 
233 impl<T> Sender<T> {
234     /// Sends `message` over the channel to the corresponding [`Receiver`].
235     ///
236     /// Returns an error if the receiver has already been dropped. The message can
237     /// be extracted from the error.
238     ///
239     /// This method is lock-free and wait-free when sending on a channel that the
240     /// receiver is currently not receiving on. If the receiver is receiving during the send
241     /// operation this method includes waking up the thread/task. Unparking a thread involves
242     /// a mutex in Rust's standard library at the time of writing this.
243     /// How lock-free waking up an async task is
244     /// depends on your executor. If this method returns a `SendError`, please mind that dropping
245     /// the error involves running any drop implementation on the message type, and freeing the
246     /// channel's heap allocation, which might or might not be lock-free.
send(self, message: T) -> Result<(), SendError<T>>247     pub fn send(self, message: T) -> Result<(), SendError<T>> {
248         let channel_ptr = self.channel_ptr;
249 
250         // Don't run our Drop implementation if send was called, any cleanup now happens here
251         mem::forget(self);
252 
253         // SAFETY: The channel exists on the heap for the entire duration of this method and we
254         // only ever acquire shared references to it. Note that if the receiver disconnects it
255         // does not free the channel.
256         let channel = unsafe { channel_ptr.as_ref() };
257 
258         // Write the message into the channel on the heap.
259         // SAFETY: The receiver only ever accesses this memory location if we are in the MESSAGE
260         // state, and since we're responsible for setting that state, we can guarantee that we have
261         // exclusive access to this memory location to perform this write.
262         unsafe { channel.write_message(message) };
263 
264         // Set the state to signal there is a message on the channel.
265         // ORDERING: we use release ordering to ensure the write of the message is visible to the
266         // receiving thread. The EMPTY and DISCONNECTED branches do not observe any shared state,
267         // and thus we do not need acquire orderng. The RECEIVING branch manages synchronization
268         // independent of this operation.
269         //
270         // EMPTY + 1 = MESSAGE
271         // RECEIVING + 1 = UNPARKING
272         // DISCONNECTED + 1 = invalid, however this state is never observed
273         match channel.state.fetch_add(1, Release) {
274             // The receiver is alive and has not started waiting. Send done.
275             EMPTY => Ok(()),
276             // The receiver is waiting. Wake it up so it can return the message.
277             RECEIVING => {
278                 // ORDERING: Synchronizes with the write of the waker to memory, and prevents the
279                 // taking of the waker from being ordered before this operation.
280                 fence(Acquire);
281 
282                 // Take the waker, but critically do not unpark it. If we unparked now, then the
283                 // receiving thread could still observe the UNPARKING state and re-park, meaning
284                 // that after we change to the MESSAGE state, it would remain parked indefinitely
285                 // or until a spurious wakeup.
286                 // SAFETY: at this point we are in the UNPARKING state, and the receiving thread
287                 // does not access the waker while in this state, nor does it free the channel
288                 // allocation in this state.
289                 let waker = unsafe { channel.take_waker() };
290 
291                 // ORDERING: this ordering serves two-fold: it synchronizes with the acquire load
292                 // in the receiving thread, ensuring that both our read of the waker and write of
293                 // the message happen-before the taking of the message and freeing of the channel.
294                 // Furthermore, we need acquire ordering to ensure the unparking of the receiver
295                 // happens after the channel state is updated.
296                 channel.state.swap(MESSAGE, AcqRel);
297 
298                 // Note: it is possible that between the store above and this statement that
299                 // the receiving thread is spuriously unparked, takes the message, and frees
300                 // the channel allocation. However, we took ownership of the channel out of
301                 // that allocation, and freeing the channel does not drop the waker since the
302                 // waker is wrapped in MaybeUninit. Therefore this data is valid regardless of
303                 // whether or not the receive has completed by this point.
304                 waker.unpark();
305 
306                 Ok(())
307             }
308             // The receiver was already dropped. The error is responsible for freeing the channel.
309             // SAFETY: since the receiver disconnected it will no longer access `channel_ptr`, so
310             // we can transfer exclusive ownership of the channel's resources to the error.
311             // Moreover, since we just placed the message in the channel, the channel contains a
312             // valid message.
313             DISCONNECTED => Err(unsafe { SendError::new(channel_ptr) }),
314             _ => unreachable!(),
315         }
316     }
317 
318     /// Consumes the Sender, returning a raw pointer to the channel on the heap.
319     ///
320     /// This is intended to simplify using oneshot channels with some FFI code. The only safe thing
321     /// to do with the returned pointer is to later reconstruct the Sender with [Sender::from_raw].
322     /// Memory will leak if the Sender is never reconstructed.
into_raw(self) -> *mut ()323     pub fn into_raw(self) -> *mut () {
324         let raw = self.channel_ptr.as_ptr() as *mut ();
325         mem::forget(self);
326         raw
327     }
328 
329     /// Consumes a raw pointer from [Sender::into_raw], recreating the Sender.
330     ///
331     /// # Safety
332     ///
333     /// This pointer must have come from [`Sender<T>::into_raw`] with the same message type, `T`.
334     /// At most one Sender must exist for a channel at any point in time.
335     /// Constructing multiple Senders from the same raw pointer leads to undefined behavior.
from_raw(raw: *mut ()) -> Self336     pub unsafe fn from_raw(raw: *mut ()) -> Self {
337         Self {
338             channel_ptr: NonNull::new_unchecked(raw as *mut Channel<T>),
339             _invariant: PhantomData,
340         }
341     }
342 }
343 
344 impl<T> Drop for Sender<T> {
drop(&mut self)345     fn drop(&mut self) {
346         // SAFETY: The receiver only ever frees the channel if we are in the MESSAGE or
347         // DISCONNECTED states. If we are in the MESSAGE state, then we called
348         // mem::forget(self), so we should not be in this function call. If we are in the
349         // DISCONNECTED state, then the receiver either received a MESSAGE so this statement is
350         // unreachable, or was dropped and observed that our side was still alive, and thus didn't
351         // free the channel.
352         let channel = unsafe { self.channel_ptr.as_ref() };
353 
354         // Set the channel state to disconnected and read what state the receiver was in
355         // ORDERING: we don't need release ordering here since there are no modifications we
356         // need to make visible to other thread, and the Err(RECEIVING) branch handles
357         // synchronization independent of this cmpxchg
358         //
359         // EMPTY ^ 001 = DISCONNECTED
360         // RECEIVING ^ 001 = UNPARKING
361         // DISCONNECTED ^ 001 = EMPTY (invalid), but this state is never observed
362         match channel.state.fetch_xor(0b001, Relaxed) {
363             // The receiver has not started waiting, nor is it dropped.
364             EMPTY => (),
365             // The receiver is waiting. Wake it up so it can detect that the channel disconnected.
366             RECEIVING => {
367                 // See comments in Sender::send
368 
369                 fence(Acquire);
370 
371                 let waker = unsafe { channel.take_waker() };
372 
373                 // We still need release ordering here to make sure our read of the waker happens
374                 // before this, and acquire ordering to ensure the unparking of the receiver
375                 // happens after this.
376                 channel.state.swap(DISCONNECTED, AcqRel);
377 
378                 // The Acquire ordering above ensures that the write of the DISCONNECTED state
379                 // happens-before unparking the receiver.
380                 waker.unpark();
381             }
382             // The receiver was already dropped. We are responsible for freeing the channel.
383             DISCONNECTED => {
384                 // SAFETY: when the receiver switches the state to DISCONNECTED they have received
385                 // the message or will no longer be trying to receive the message, and have
386                 // observed that the sender is still alive, meaning that we're responsible for
387                 // freeing the channel allocation.
388                 unsafe { dealloc(self.channel_ptr) };
389             }
390             _ => unreachable!(),
391         }
392     }
393 }
394 
395 impl<T> Receiver<T> {
396     /// Checks if there is a message in the channel without blocking. Returns:
397     ///  * `Ok(message)` if there was a message in the channel.
398     ///  * `Err(Empty)` if the [`Sender`] is alive, but has not yet sent a message.
399     ///  * `Err(Disconnected)` if the [`Sender`] was dropped before sending anything or if the
400     ///    message has already been extracted by a previous receive call.
401     ///
402     /// If a message is returned, the channel is disconnected and any subsequent receive operation
403     /// using this receiver will return an error.
404     ///
405     /// This method is completely lock-free and wait-free. The only thing it does is an atomic
406     /// integer load of the channel state. And if there is a message in the channel it additionally
407     /// performs one atomic integer store and copies the message from the heap to the stack for
408     /// returning it.
try_recv(&self) -> Result<T, TryRecvError>409     pub fn try_recv(&self) -> Result<T, TryRecvError> {
410         // SAFETY: The channel will not be freed while this method is still running.
411         let channel = unsafe { self.channel_ptr.as_ref() };
412 
413         // ORDERING: we use acquire ordering to synchronize with the store of the message.
414         match channel.state.load(Acquire) {
415             MESSAGE => {
416                 // It's okay to break up the load and store since once we're in the message state
417                 // the sender no longer modifies the state
418                 // ORDERING: at this point the sender has done its job and is no longer active, so
419                 // we don't need to make any side effects visible to it
420                 channel.state.store(DISCONNECTED, Relaxed);
421 
422                 // SAFETY: we are in the MESSAGE state so the message is present
423                 Ok(unsafe { channel.take_message() })
424             }
425             EMPTY => Err(TryRecvError::Empty),
426             DISCONNECTED => Err(TryRecvError::Disconnected),
427             #[cfg(feature = "async")]
428             RECEIVING | UNPARKING => Err(TryRecvError::Empty),
429             _ => unreachable!(),
430         }
431     }
432 
433     /// Attempts to wait for a message from the [`Sender`], returning an error if the channel is
434     /// disconnected.
435     ///
436     /// This method will always block the current thread if there is no data available and it is
437     /// still possible for the message to be sent. Once the message is sent to the corresponding
438     /// [`Sender`], then this receiver will wake up and return that message.
439     ///
440     /// If the corresponding [`Sender`] has disconnected (been dropped), or it disconnects while
441     /// this call is blocking, this call will wake up and return `Err` to indicate that the message
442     /// can never be received on this channel.
443     ///
444     /// If a sent message has already been extracted from this channel this method will return an
445     /// error.
446     ///
447     /// # Panics
448     ///
449     /// Panics if called after this receiver has been polled asynchronously.
450     #[cfg(feature = "std")]
recv(self) -> Result<T, RecvError>451     pub fn recv(self) -> Result<T, RecvError> {
452         // Note that we don't need to worry about changing the state to disconnected or setting the
453         // state to an invalid value at any point in this function because we take ownership of
454         // self, and this function does not exit until the message has been received or both side
455         // of the channel are inactive and cleaned up.
456 
457         let channel_ptr = self.channel_ptr;
458 
459         // Don't run our Drop implementation if we are receiving consuming ourselves.
460         mem::forget(self);
461 
462         // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
463         // is still alive, meaning that even if the sender was dropped then it would have observed
464         // the fact that we're still alive and left the responsibility of deallocating the
465         // channel to us, so channel_ptr is valid
466         let channel = unsafe { channel_ptr.as_ref() };
467 
468         // ORDERING: we use acquire ordering to synchronize with the write of the message in the
469         // case that it's available
470         match channel.state.load(Acquire) {
471             // The sender is alive but has not sent anything yet. We prepare to park.
472             EMPTY => {
473                 // Conditionally add a delay here to help the tests trigger the edge cases where
474                 // the sender manages to be dropped or send something before we are able to store
475                 // our waker object in the channel.
476                 #[cfg(oneshot_test_delay)]
477                 std::thread::sleep(std::time::Duration::from_millis(10));
478 
479                 // Write our waker instance to the channel.
480                 // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
481                 // try to access the waker until it sees the state set to RECEIVING below
482                 unsafe { channel.write_waker(ReceiverWaker::current_thread()) };
483 
484                 // Switch the state to RECEIVING. We need to do this in one atomic step in case the
485                 // sender disconnected or sent the message while we wrote the waker to memory. We
486                 // don't need to do a compare exchange here however because if the original state
487                 // was not EMPTY, then the sender has either finished sending the message or is
488                 // being dropped, so the RECEIVING state will never be observed after we return.
489                 // ORDERING: we use release ordering so the sender can synchronize with our writing
490                 // of the waker to memory. The individual branches handle any additional
491                 // synchronizaton
492                 match channel.state.swap(RECEIVING, Release) {
493                     // We stored our waker, now we park until the sender has changed the state
494                     EMPTY => loop {
495                         thread::park();
496 
497                         // ORDERING: synchronize with the write of the message
498                         match channel.state.load(Acquire) {
499                             // The sender sent the message while we were parked.
500                             MESSAGE => {
501                                 // SAFETY: we are in the message state so the message is valid
502                                 let message = unsafe { channel.take_message() };
503 
504                                 // SAFETY: the Sender delegates the responsibility of deallocating
505                                 // the channel to us upon sending the message
506                                 unsafe { dealloc(channel_ptr) };
507 
508                                 break Ok(message);
509                             }
510                             // The sender was dropped while we were parked.
511                             DISCONNECTED => {
512                                 // SAFETY: the Sender doesn't deallocate the channel allocation in
513                                 // its drop implementation if we're receiving
514                                 unsafe { dealloc(channel_ptr) };
515 
516                                 break Err(RecvError);
517                             }
518                             // State did not change, spurious wakeup, park again.
519                             RECEIVING | UNPARKING => (),
520                             _ => unreachable!(),
521                         }
522                     },
523                     // The sender sent the message while we prepared to park.
524                     MESSAGE => {
525                         // ORDERING: Synchronize with the write of the message. This branch is
526                         // unlikely to be taken, so it's likely more efficient to use a fence here
527                         // instead of AcqRel ordering on the RMW operation
528                         fence(Acquire);
529 
530                         // SAFETY: we started in the empty state and the sender switched us to the
531                         // message state. This means that it did not take the waker, so we're
532                         // responsible for dropping it.
533                         unsafe { channel.drop_waker() };
534 
535                         // SAFETY: we are in the message state so the message is valid
536                         let message = unsafe { channel.take_message() };
537 
538                         // SAFETY: the Sender delegates the responsibility of deallocating the
539                         // channel to us upon sending the message
540                         unsafe { dealloc(channel_ptr) };
541 
542                         Ok(message)
543                     }
544                     // The sender was dropped before sending anything while we prepared to park.
545                     DISCONNECTED => {
546                         // SAFETY: we started in the empty state and the sender switched us to the
547                         // disconnected state. It does not take the waker when it does this so we
548                         // need to drop it.
549                         unsafe { channel.drop_waker() };
550 
551                         // SAFETY: the sender does not deallocate the channel if it switches from
552                         // empty to disconnected so we need to free the allocation
553                         unsafe { dealloc(channel_ptr) };
554 
555                         Err(RecvError)
556                     }
557                     _ => unreachable!(),
558                 }
559             }
560             // The sender already sent the message.
561             MESSAGE => {
562                 // SAFETY: we are in the message state so the message is valid
563                 let message = unsafe { channel.take_message() };
564 
565                 // SAFETY: we are already in the message state so the sender has been forgotten
566                 // and it's our job to clean up resources
567                 unsafe { dealloc(channel_ptr) };
568 
569                 Ok(message)
570             }
571             // The sender was dropped before sending anything, or we already received the message.
572             DISCONNECTED => {
573                 // SAFETY: the sender does not deallocate the channel if it switches from empty to
574                 // disconnected so we need to free the allocation
575                 unsafe { dealloc(channel_ptr) };
576 
577                 Err(RecvError)
578             }
579             // The receiver must have been `Future::poll`ed prior to this call.
580             #[cfg(feature = "async")]
581             RECEIVING | UNPARKING => panic!("{}", RECEIVER_USED_SYNC_AND_ASYNC_ERROR),
582             _ => unreachable!(),
583         }
584     }
585 
586     /// Attempts to wait for a message from the [`Sender`], returning an error if the channel is
587     /// disconnected. This is a non consuming version of [`Receiver::recv`], but with a bit
588     /// worse performance. Prefer `[`Receiver::recv`]` if your code allows consuming the receiver.
589     ///
590     /// If a message is returned, the channel is disconnected and any subsequent receive operation
591     /// using this receiver will return an error.
592     ///
593     /// # Panics
594     ///
595     /// Panics if called after this receiver has been polled asynchronously.
596     #[cfg(feature = "std")]
recv_ref(&self) -> Result<T, RecvError>597     pub fn recv_ref(&self) -> Result<T, RecvError> {
598         self.start_recv_ref(RecvError, |channel| {
599             loop {
600                 thread::park();
601 
602                 // ORDERING: we use acquire ordering to synchronize with the write of the message
603                 match channel.state.load(Acquire) {
604                     // The sender sent the message while we were parked.
605                     // We take the message and mark the channel disconnected.
606                     MESSAGE => {
607                         // ORDERING: the sender is inactive at this point so we don't need to make
608                         // any reads or writes visible to the sending thread
609                         channel.state.store(DISCONNECTED, Relaxed);
610 
611                         // SAFETY: we were just in the message state so the message is valid
612                         break Ok(unsafe { channel.take_message() });
613                     }
614                     // The sender was dropped while we were parked.
615                     DISCONNECTED => break Err(RecvError),
616                     // State did not change, spurious wakeup, park again.
617                     RECEIVING | UNPARKING => (),
618                     _ => unreachable!(),
619                 }
620             }
621         })
622     }
623 
624     /// Like [`Receiver::recv`], but will not block longer than `timeout`. Returns:
625     ///  * `Ok(message)` if there was a message in the channel before the timeout was reached.
626     ///  * `Err(Timeout)` if no message arrived on the channel before the timeout was reached.
627     ///  * `Err(Disconnected)` if the sender was dropped before sending anything or if the message
628     ///    has already been extracted by a previous receive call.
629     ///
630     /// If a message is returned, the channel is disconnected and any subsequent receive operation
631     /// using this receiver will return an error.
632     ///
633     /// If the supplied `timeout` is so large that Rust's `Instant` type can't represent this point
634     /// in the future this falls back to an indefinitely blocking receive operation.
635     ///
636     /// # Panics
637     ///
638     /// Panics if called after this receiver has been polled asynchronously.
639     #[cfg(feature = "std")]
recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>640     pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
641         match Instant::now().checked_add(timeout) {
642             Some(deadline) => self.recv_deadline(deadline),
643             None => self.recv_ref().map_err(|_| RecvTimeoutError::Disconnected),
644         }
645     }
646 
647     /// Like [`Receiver::recv`], but will not block longer than until `deadline`. Returns:
648     ///  * `Ok(message)` if there was a message in the channel before the deadline was reached.
649     ///  * `Err(Timeout)` if no message arrived on the channel before the deadline was reached.
650     ///  * `Err(Disconnected)` if the sender was dropped before sending anything or if the message
651     ///    has already been extracted by a previous receive call.
652     ///
653     /// If a message is returned, the channel is disconnected and any subsequent receive operation
654     /// using this receiver will return an error.
655     ///
656     /// # Panics
657     ///
658     /// Panics if called after this receiver has been polled asynchronously.
659     #[cfg(feature = "std")]
recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError>660     pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
661         /// # Safety
662         ///
663         /// If the sender is unparking us after a message send, the message must already have been
664         /// written to the channel and an acquire memory barrier issued before calling this function
665         #[cold]
666         unsafe fn wait_for_unpark<T>(channel: &Channel<T>) -> Result<T, RecvTimeoutError> {
667             loop {
668                 thread::park();
669 
670                 // ORDERING: The callee has already synchronized with any message write
671                 match channel.state.load(Relaxed) {
672                     MESSAGE => {
673                         // ORDERING: the sender has been dropped, so this update only
674                         // needs to be visible to us
675                         channel.state.store(DISCONNECTED, Relaxed);
676                         break Ok(channel.take_message());
677                     }
678                     DISCONNECTED => break Err(RecvTimeoutError::Disconnected),
679                     // The sender is still unparking us. We continue on the empty state here since
680                     // the current implementation eagerly sets the state to EMPTY upon timeout.
681                     EMPTY => (),
682                     _ => unreachable!(),
683                 }
684             }
685         }
686 
687         self.start_recv_ref(RecvTimeoutError::Disconnected, |channel| {
688             loop {
689                 match deadline.checked_duration_since(Instant::now()) {
690                     Some(timeout) => {
691                         thread::park_timeout(timeout);
692 
693                         // ORDERING: synchronize with the write of the message
694                         match channel.state.load(Acquire) {
695                             // The sender sent the message while we were parked.
696                             MESSAGE => {
697                                 // ORDERING: the sender has been `mem::forget`-ed so this update
698                                 // only needs to be visible to us.
699                                 channel.state.store(DISCONNECTED, Relaxed);
700 
701                                 // SAFETY: we either are in the message state or were just in the
702                                 // message state
703                                 break Ok(unsafe { channel.take_message() });
704                             }
705                             // The sender was dropped while we were parked.
706                             DISCONNECTED => break Err(RecvTimeoutError::Disconnected),
707                             // State did not change, spurious wakeup, park again.
708                             RECEIVING | UNPARKING => (),
709                             _ => unreachable!(),
710                         }
711                     }
712                     None => {
713                         // ORDERING: synchronize with the write of the message
714                         match channel.state.swap(EMPTY, Acquire) {
715                             // We reached the end of the timeout without receiving a message
716                             RECEIVING => {
717                                 // SAFETY: we were in the receiving state and are now in the empty
718                                 // state, so the sender has not and will not try to read the waker,
719                                 // so we have exclusive access to drop it.
720                                 unsafe { channel.drop_waker() };
721 
722                                 break Err(RecvTimeoutError::Timeout);
723                             }
724                             // The sender sent the message while we were parked.
725                             MESSAGE => {
726                                 // Same safety and ordering as the Some branch
727 
728                                 channel.state.store(DISCONNECTED, Relaxed);
729                                 break Ok(unsafe { channel.take_message() });
730                             }
731                             // The sender was dropped while we were parked.
732                             DISCONNECTED => {
733                                 // ORDERING: we were originally in the disconnected state meaning
734                                 // that the sender is inactive and no longer observing the state,
735                                 // so we only need to change it back to DISCONNECTED for if the
736                                 // receiver is dropped or a recv* method is called again
737                                 channel.state.store(DISCONNECTED, Relaxed);
738 
739                                 break Err(RecvTimeoutError::Disconnected);
740                             }
741                             // The sender sent the message and started unparking us
742                             UNPARKING => {
743                                 // We were in the UNPARKING state and are now in the EMPTY state.
744                                 // We wait to be properly unparked and to observe if the sender
745                                 // sets MESSAGE or DISCONNECTED state.
746                                 // SAFETY: The load above has synchronized with any message write.
747                                 break unsafe { wait_for_unpark(channel) };
748                             }
749                             _ => unreachable!(),
750                         }
751                     }
752                 }
753             }
754         })
755     }
756 
757     /// Begins the process of receiving on the channel by reference. If the message is already
758     /// ready, or the sender has disconnected, then this function will return the appropriate
759     /// Result immediately. Otherwise, it will write the waker to memory, check to see if the
760     /// sender has finished or disconnected again, and then will call `finish`. `finish` is
761     /// thus responsible for cleaning up the channel's resources appropriately before it returns,
762     /// such as destroying the waker, for instance.
763     #[cfg(feature = "std")]
764     #[inline]
start_recv_ref<E>( &self, disconnected_error: E, finish: impl FnOnce(&Channel<T>) -> Result<T, E>, ) -> Result<T, E>765     fn start_recv_ref<E>(
766         &self,
767         disconnected_error: E,
768         finish: impl FnOnce(&Channel<T>) -> Result<T, E>,
769     ) -> Result<T, E> {
770         // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
771         // is still alive, meaning that even if the sender was dropped then it would have observed
772         // the fact that we're still alive and left the responsibility of deallocating the
773         // channel to us, so `self.channel` is valid
774         let channel = unsafe { self.channel_ptr.as_ref() };
775 
776         // ORDERING: synchronize with the write of the message
777         match channel.state.load(Acquire) {
778             // The sender is alive but has not sent anything yet. We prepare to park.
779             EMPTY => {
780                 // Conditionally add a delay here to help the tests trigger the edge cases where
781                 // the sender manages to be dropped or send something before we are able to store
782                 // our waker object in the channel.
783                 #[cfg(oneshot_test_delay)]
784                 std::thread::sleep(std::time::Duration::from_millis(10));
785 
786                 // Write our waker instance to the channel.
787                 // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
788                 // try to access the waker until it sees the state set to RECEIVING below
789                 unsafe { channel.write_waker(ReceiverWaker::current_thread()) };
790 
791                 // ORDERING: we use release ordering on success so the sender can synchronize with
792                 // our write of the waker. We use relaxed ordering on failure since the sender does
793                 // not need to synchronize with our write and the individual match arms handle any
794                 // additional synchronization
795                 match channel
796                     .state
797                     .compare_exchange(EMPTY, RECEIVING, Release, Relaxed)
798                 {
799                     // We stored our waker, now we delegate to the callback to finish the receive
800                     // operation
801                     Ok(_) => finish(channel),
802                     // The sender sent the message while we prepared to finish
803                     Err(MESSAGE) => {
804                         // See comments in `recv` for ordering and safety
805 
806                         fence(Acquire);
807 
808                         unsafe { channel.drop_waker() };
809 
810                         // ORDERING: the sender has been `mem::forget`-ed so this update only
811                         // needs to be visible to us
812                         channel.state.store(DISCONNECTED, Relaxed);
813 
814                         // SAFETY: The MESSAGE state tells us there is a correctly initialized
815                         // message
816                         Ok(unsafe { channel.take_message() })
817                     }
818                     // The sender was dropped before sending anything while we prepared to park.
819                     Err(DISCONNECTED) => {
820                         // See comments in `recv` for safety
821                         unsafe { channel.drop_waker() };
822                         Err(disconnected_error)
823                     }
824                     _ => unreachable!(),
825                 }
826             }
827             // The sender sent the message. We take the message and mark the channel disconnected.
828             MESSAGE => {
829                 // ORDERING: the sender has been `mem::forget`-ed so this update only needs to be
830                 // visible to us
831                 channel.state.store(DISCONNECTED, Relaxed);
832 
833                 // SAFETY: we are in the message state so the message is valid
834                 Ok(unsafe { channel.take_message() })
835             }
836             // The sender was dropped before sending anything, or we already received the message.
837             DISCONNECTED => Err(disconnected_error),
838             // The receiver must have been `Future::poll`ed prior to this call.
839             #[cfg(feature = "async")]
840             RECEIVING | UNPARKING => panic!("{}", RECEIVER_USED_SYNC_AND_ASYNC_ERROR),
841             _ => unreachable!(),
842         }
843     }
844 
845     /// Consumes the Receiver, returning a raw pointer to the channel on the heap.
846     ///
847     /// This is intended to simplify using oneshot channels with some FFI code. The only safe thing
848     /// to do with the returned pointer is to later reconstruct the Receiver with
849     /// [Receiver::from_raw]. Memory will leak if the Receiver is never reconstructed.
into_raw(self) -> *mut ()850     pub fn into_raw(self) -> *mut () {
851         let raw = self.channel_ptr.as_ptr() as *mut ();
852         mem::forget(self);
853         raw
854     }
855 
856     /// Consumes a raw pointer from [Receiver::into_raw], recreating the Receiver.
857     ///
858     /// # Safety
859     ///
860     /// This pointer must have come from [`Receiver<T>::into_raw`] with the same message type, `T`.
861     /// At most one Receiver must exist for a channel at any point in time.
862     /// Constructing multiple Receivers from the same raw pointer leads to undefined behavior.
from_raw(raw: *mut ()) -> Self863     pub unsafe fn from_raw(raw: *mut ()) -> Self {
864         Self {
865             channel_ptr: NonNull::new_unchecked(raw as *mut Channel<T>),
866         }
867     }
868 }
869 
870 #[cfg(feature = "async")]
871 impl<T> core::future::Future for Receiver<T> {
872     type Output = Result<T, RecvError>;
873 
poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>874     fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
875         // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
876         // is still alive, meaning that even if the sender was dropped then it would have observed
877         // the fact that we're still alive and left the responsibility of deallocating the
878         // channel to us, so `self.channel` is valid
879         let channel = unsafe { self.channel_ptr.as_ref() };
880 
881         // ORDERING: we use acquire ordering to synchronize with the store of the message.
882         match channel.state.load(Acquire) {
883             // The sender is alive but has not sent anything yet.
884             EMPTY => {
885                 // SAFETY: We can't be in the forbidden states, and no waker in the channel.
886                 unsafe { channel.write_async_waker(cx) }
887             }
888             // We were polled again while waiting for the sender. Replace the waker with the new one.
889             RECEIVING => {
890                 // ORDERING: We use relaxed ordering on both success and failure since we have not
891                 // written anything above that must be released, and the individual match arms
892                 // handle any additional synchronization.
893                 match channel
894                     .state
895                     .compare_exchange(RECEIVING, EMPTY, Relaxed, Relaxed)
896                 {
897                     // We successfully changed the state back to EMPTY. Replace the waker.
898                     // This is the most likely branch to be taken, which is why we don't use any
899                     // memory barriers in the compare_exchange above.
900                     Ok(_) => {
901                         // SAFETY: We wrote the waker in a previous call to poll. We do not need
902                         // a memory barrier since the previous write here was by ourselves.
903                         unsafe { channel.drop_waker() };
904                         // SAFETY: We can't be in the forbidden states, and no waker in the channel.
905                         unsafe { channel.write_async_waker(cx) }
906                     }
907                     // The sender sent the message while we prepared to replace the waker.
908                     // We take the message and mark the channel disconnected.
909                     // The sender has already taken the waker.
910                     Err(MESSAGE) => {
911                         // ORDERING: Synchronize with the write of the message. This branch is
912                         // unlikely to be taken.
913                         channel.state.swap(DISCONNECTED, Acquire);
914                         // SAFETY: The state tells us the sender has initialized the message.
915                         Poll::Ready(Ok(unsafe { channel.take_message() }))
916                     }
917                     // The sender was dropped before sending anything while we prepared to park.
918                     // The sender has taken the waker already.
919                     Err(DISCONNECTED) => Poll::Ready(Err(RecvError)),
920                     // The sender is currently waking us up.
921                     Err(UNPARKING) => {
922                         // We can't trust that the old waker that the sender has access to
923                         // is honored by the async runtime at this point. So we wake ourselves
924                         // up to get polled instantly again.
925                         cx.waker().wake_by_ref();
926                         Poll::Pending
927                     }
928                     _ => unreachable!(),
929                 }
930             }
931             // The sender sent the message.
932             MESSAGE => {
933                 // ORDERING: the sender has been dropped so this update only needs to be
934                 // visible to us
935                 channel.state.store(DISCONNECTED, Relaxed);
936                 Poll::Ready(Ok(unsafe { channel.take_message() }))
937             }
938             // The sender was dropped before sending anything, or we already received the message.
939             DISCONNECTED => Poll::Ready(Err(RecvError)),
940             // The sender has observed the RECEIVING state and is currently reading the waker from
941             // a previous poll. We need to loop here until we observe the MESSAGE or DISCONNECTED
942             // state. We busy loop here since we know the sender is done very soon.
943             UNPARKING => loop {
944                 hint::spin_loop();
945                 // ORDERING: The load above has already synchronized with the write of the message.
946                 match channel.state.load(Relaxed) {
947                     MESSAGE => {
948                         // ORDERING: the sender has been dropped, so this update only
949                         // needs to be visible to us
950                         channel.state.store(DISCONNECTED, Relaxed);
951                         // SAFETY: We observed the MESSAGE state
952                         break Poll::Ready(Ok(unsafe { channel.take_message() }));
953                     }
954                     DISCONNECTED => break Poll::Ready(Err(RecvError)),
955                     UNPARKING => (),
956                     _ => unreachable!(),
957                 }
958             },
959             _ => unreachable!(),
960         }
961     }
962 }
963 
964 impl<T> Drop for Receiver<T> {
drop(&mut self)965     fn drop(&mut self) {
966         // SAFETY: since the receiving side is still alive the sender would have observed that and
967         // left deallocating the channel allocation to us.
968         let channel = unsafe { self.channel_ptr.as_ref() };
969 
970         // Set the channel state to disconnected and read what state the receiver was in
971         match channel.state.swap(DISCONNECTED, Acquire) {
972             // The sender has not sent anything, nor is it dropped.
973             EMPTY => (),
974             // The sender already sent something. We must drop it, and free the channel.
975             MESSAGE => {
976                 // SAFETY: we are in the message state so the message is initialized
977                 unsafe { channel.drop_message() };
978 
979                 // SAFETY: see safety comment at top of function
980                 unsafe { dealloc(self.channel_ptr) };
981             }
982             // The receiver has been polled.
983             #[cfg(feature = "async")]
984             RECEIVING => {
985                 // TODO: figure this out when async is fixed
986                 unsafe { channel.drop_waker() };
987             }
988             // The sender was already dropped. We are responsible for freeing the channel.
989             DISCONNECTED => {
990                 // SAFETY: see safety comment at top of function
991                 unsafe { dealloc(self.channel_ptr) };
992             }
993             _ => unreachable!(),
994         }
995     }
996 }
997 
998 /// All the values that the `Channel::state` field can have during the lifetime of a channel.
999 mod states {
1000     // These values are very explicitly chosen so that we can replace some cmpxchg calls with
1001     // fetch_* calls.
1002 
1003     /// The initial channel state. Active while both endpoints are still alive, no message has been
1004     /// sent, and the receiver is not receiving.
1005     pub const EMPTY: u8 = 0b011;
1006     /// A message has been sent to the channel, but the receiver has not yet read it.
1007     pub const MESSAGE: u8 = 0b100;
1008     /// No message has yet been sent on the channel, but the receiver is currently receiving.
1009     pub const RECEIVING: u8 = 0b000;
1010     #[cfg(any(feature = "std", feature = "async"))]
1011     pub const UNPARKING: u8 = 0b001;
1012     /// The channel has been closed. This means that either the sender or receiver has been dropped,
1013     /// or the message sent to the channel has already been received. Since this is a oneshot
1014     /// channel, it is disconnected after the one message it is supposed to hold has been
1015     /// transmitted.
1016     pub const DISCONNECTED: u8 = 0b010;
1017 }
1018 use states::*;
1019 
1020 /// Internal channel data structure structure. the `channel` method allocates and puts one instance
1021 /// of this struct on the heap for each oneshot channel instance. The struct holds:
1022 /// * The current state of the channel.
1023 /// * The message in the channel. This memory is uninitialized until the message is sent.
1024 /// * The waker instance for the thread or task that is currently receiving on this channel.
1025 ///   This memory is uninitialized until the receiver starts receiving.
1026 struct Channel<T> {
1027     state: AtomicU8,
1028     message: UnsafeCell<MaybeUninit<T>>,
1029     waker: UnsafeCell<MaybeUninit<ReceiverWaker>>,
1030 }
1031 
1032 impl<T> Channel<T> {
new() -> Self1033     pub fn new() -> Self {
1034         Self {
1035             state: AtomicU8::new(EMPTY),
1036             message: UnsafeCell::new(MaybeUninit::uninit()),
1037             waker: UnsafeCell::new(MaybeUninit::uninit()),
1038         }
1039     }
1040 
1041     #[inline(always)]
message(&self) -> &MaybeUninit<T>1042     unsafe fn message(&self) -> &MaybeUninit<T> {
1043         #[cfg(loom)]
1044         {
1045             self.message.with(|ptr| &*ptr)
1046         }
1047 
1048         #[cfg(not(loom))]
1049         {
1050             &*self.message.get()
1051         }
1052     }
1053 
1054     #[inline(always)]
with_message_mut<F>(&self, op: F) where F: FnOnce(&mut MaybeUninit<T>),1055     unsafe fn with_message_mut<F>(&self, op: F)
1056     where
1057         F: FnOnce(&mut MaybeUninit<T>),
1058     {
1059         #[cfg(loom)]
1060         {
1061             self.message.with_mut(|ptr| op(&mut *ptr))
1062         }
1063 
1064         #[cfg(not(loom))]
1065         {
1066             op(&mut *self.message.get())
1067         }
1068     }
1069 
1070     #[inline(always)]
1071     #[cfg(any(feature = "std", feature = "async"))]
with_waker_mut<F>(&self, op: F) where F: FnOnce(&mut MaybeUninit<ReceiverWaker>),1072     unsafe fn with_waker_mut<F>(&self, op: F)
1073     where
1074         F: FnOnce(&mut MaybeUninit<ReceiverWaker>),
1075     {
1076         #[cfg(loom)]
1077         {
1078             self.waker.with_mut(|ptr| op(&mut *ptr))
1079         }
1080 
1081         #[cfg(not(loom))]
1082         {
1083             op(&mut *self.waker.get())
1084         }
1085     }
1086 
1087     #[inline(always)]
write_message(&self, message: T)1088     unsafe fn write_message(&self, message: T) {
1089         self.with_message_mut(|slot| slot.as_mut_ptr().write(message));
1090     }
1091 
1092     #[inline(always)]
take_message(&self) -> T1093     unsafe fn take_message(&self) -> T {
1094         #[cfg(loom)]
1095         {
1096             self.message.with(|ptr| ptr::read(ptr)).assume_init()
1097         }
1098 
1099         #[cfg(not(loom))]
1100         {
1101             ptr::read(self.message.get()).assume_init()
1102         }
1103     }
1104 
1105     #[inline(always)]
drop_message(&self)1106     unsafe fn drop_message(&self) {
1107         self.with_message_mut(|slot| slot.assume_init_drop());
1108     }
1109 
1110     #[cfg(any(feature = "std", feature = "async"))]
1111     #[inline(always)]
write_waker(&self, waker: ReceiverWaker)1112     unsafe fn write_waker(&self, waker: ReceiverWaker) {
1113         self.with_waker_mut(|slot| slot.as_mut_ptr().write(waker));
1114     }
1115 
1116     #[inline(always)]
take_waker(&self) -> ReceiverWaker1117     unsafe fn take_waker(&self) -> ReceiverWaker {
1118         #[cfg(loom)]
1119         {
1120             self.waker.with(|ptr| ptr::read(ptr)).assume_init()
1121         }
1122 
1123         #[cfg(not(loom))]
1124         {
1125             ptr::read(self.waker.get()).assume_init()
1126         }
1127     }
1128 
1129     #[cfg(any(feature = "std", feature = "async"))]
1130     #[inline(always)]
drop_waker(&self)1131     unsafe fn drop_waker(&self) {
1132         self.with_waker_mut(|slot| slot.assume_init_drop());
1133     }
1134 
1135     /// # Safety
1136     ///
1137     /// * `Channel::waker` must not have a waker stored in it when calling this method.
1138     /// * Channel state must not be RECEIVING or UNPARKING when calling this method.
1139     #[cfg(feature = "async")]
write_async_waker(&self, cx: &mut task::Context<'_>) -> Poll<Result<T, RecvError>>1140     unsafe fn write_async_waker(&self, cx: &mut task::Context<'_>) -> Poll<Result<T, RecvError>> {
1141         // Write our thread instance to the channel.
1142         // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
1143         // try to access the waker until it sees the state set to RECEIVING below
1144         self.write_waker(ReceiverWaker::task_waker(cx));
1145 
1146         // ORDERING: we use release ordering on success so the sender can synchronize with
1147         // our write of the waker. We use relaxed ordering on failure since the sender does
1148         // not need to synchronize with our write and the individual match arms handle any
1149         // additional synchronization
1150         match self
1151             .state
1152             .compare_exchange(EMPTY, RECEIVING, Release, Relaxed)
1153         {
1154             // We stored our waker, now we return and let the sender wake us up
1155             Ok(_) => Poll::Pending,
1156             // The sender sent the message while we prepared to park.
1157             // We take the message and mark the channel disconnected.
1158             Err(MESSAGE) => {
1159                 // ORDERING: Synchronize with the write of the message. This branch is
1160                 // unlikely to be taken, so it's likely more efficient to use a fence here
1161                 // instead of AcqRel ordering on the compare_exchange operation
1162                 fence(Acquire);
1163 
1164                 // SAFETY: we started in the EMPTY state and the sender switched us to the
1165                 // MESSAGE state. This means that it did not take the waker, so we're
1166                 // responsible for dropping it.
1167                 self.drop_waker();
1168 
1169                 // ORDERING: sender does not exist, so this update only needs to be visible to us
1170                 self.state.store(DISCONNECTED, Relaxed);
1171 
1172                 // SAFETY: The MESSAGE state tells us there is a correctly initialized message
1173                 Poll::Ready(Ok(self.take_message()))
1174             }
1175             // The sender was dropped before sending anything while we prepared to park.
1176             Err(DISCONNECTED) => {
1177                 // SAFETY: we started in the EMPTY state and the sender switched us to the
1178                 // DISCONNECTED state. This means that it did not take the waker, so we're
1179                 // responsible for dropping it.
1180                 self.drop_waker();
1181                 Poll::Ready(Err(RecvError))
1182             }
1183             _ => unreachable!(),
1184         }
1185     }
1186 }
1187 
1188 enum ReceiverWaker {
1189     /// The receiver is waiting synchronously. Its thread is parked.
1190     #[cfg(feature = "std")]
1191     Thread(thread::Thread),
1192     /// The receiver is waiting asynchronously. Its task can be woken up with this `Waker`.
1193     #[cfg(feature = "async")]
1194     Task(task::Waker),
1195     /// A little hack to not make this enum an uninhibitable type when no features are enabled.
1196     #[cfg(not(any(feature = "async", feature = "std")))]
1197     _Uninhabited,
1198 }
1199 
1200 impl ReceiverWaker {
1201     #[cfg(feature = "std")]
current_thread() -> Self1202     pub fn current_thread() -> Self {
1203         Self::Thread(thread::current())
1204     }
1205 
1206     #[cfg(feature = "async")]
task_waker(cx: &task::Context<'_>) -> Self1207     pub fn task_waker(cx: &task::Context<'_>) -> Self {
1208         Self::Task(cx.waker().clone())
1209     }
1210 
unpark(self)1211     pub fn unpark(self) {
1212         match self {
1213             #[cfg(feature = "std")]
1214             ReceiverWaker::Thread(thread) => thread.unpark(),
1215             #[cfg(feature = "async")]
1216             ReceiverWaker::Task(waker) => waker.wake(),
1217             #[cfg(not(any(feature = "async", feature = "std")))]
1218             ReceiverWaker::_Uninhabited => unreachable!(),
1219         }
1220     }
1221 }
1222 
1223 #[cfg(not(loom))]
1224 #[test]
receiver_waker_size()1225 fn receiver_waker_size() {
1226     let expected: usize = match (cfg!(feature = "std"), cfg!(feature = "async")) {
1227         (false, false) => 0,
1228         (false, true) => 16,
1229         (true, false) => 8,
1230         (true, true) => 16,
1231     };
1232     assert_eq!(mem::size_of::<ReceiverWaker>(), expected);
1233 }
1234 
1235 #[cfg(all(feature = "std", feature = "async"))]
1236 const RECEIVER_USED_SYNC_AND_ASYNC_ERROR: &str =
1237     "Invalid to call a blocking receive method on oneshot::Receiver after it has been polled";
1238 
1239 #[inline]
dealloc<T>(channel: NonNull<Channel<T>>)1240 pub(crate) unsafe fn dealloc<T>(channel: NonNull<Channel<T>>) {
1241     drop(Box::from_raw(channel.as_ptr()))
1242 }
1243