1 //! Oneshot spsc (single producer, single consumer) channel. Meaning each channel instance
2 //! can only transport a single message. This has a few nice outcomes. One thing is that
3 //! the implementation can be very efficient, utilizing the knowledge that there will
4 //! only be one message. But more importantly, it allows the API to be expressed in such
5 //! a way that certain edge cases that you don't want to care about when only sending a
6 //! single message on a channel does not exist. For example: The sender can't be copied
7 //! or cloned, and the send method takes ownership and consumes the sender.
8 //! So you are guaranteed, at the type level, that there can only be one message sent.
9 //!
10 //! The sender's send method is non-blocking, and potentially lock- and wait-free.
11 //! See documentation on [Sender::send] for situations where it might not be fully wait-free.
12 //! The receiver supports both lock- and wait-free `try_recv` as well as indefinite and time
13 //! limited thread blocking receive operations. The receiver also implements `Future` and
14 //! supports asynchronously awaiting the message.
15 //!
16 //!
17 //! # Examples
18 //!
19 //! This example sets up a background worker that processes requests coming in on a standard
20 //! mpsc channel and replies on a oneshot channel provided with each request. The worker can
21 //! be interacted with both from sync and async contexts since the oneshot receiver
22 //! can receive both blocking and async.
23 //!
24 //! ```rust
25 //! use std::sync::mpsc;
26 //! use std::thread;
27 //! use std::time::Duration;
28 //!
29 //! type Request = String;
30 //!
31 //! // Starts a background thread performing some computation on requests sent to it.
32 //! // Delivers the response back over a oneshot channel.
33 //! fn spawn_processing_thread() -> mpsc::Sender<(Request, oneshot::Sender<usize>)> {
34 //! let (request_sender, request_receiver) = mpsc::channel::<(Request, oneshot::Sender<usize>)>();
35 //! thread::spawn(move || {
36 //! for (request_data, response_sender) in request_receiver.iter() {
37 //! let compute_operation = || request_data.len();
38 //! let _ = response_sender.send(compute_operation()); // <- Send on the oneshot channel
39 //! }
40 //! });
41 //! request_sender
42 //! }
43 //!
44 //! let processor = spawn_processing_thread();
45 //!
46 //! // If compiled with `std` the library can receive messages with timeout on regular threads
47 //! #[cfg(feature = "std")] {
48 //! let (response_sender, response_receiver) = oneshot::channel();
49 //! let request = Request::from("data from sync thread");
50 //!
51 //! processor.send((request, response_sender)).expect("Processor down");
52 //! match response_receiver.recv_timeout(Duration::from_secs(1)) { // <- Receive on the oneshot channel
53 //! Ok(result) => println!("Processor returned {}", result),
54 //! Err(oneshot::RecvTimeoutError::Timeout) => eprintln!("Processor was too slow"),
55 //! Err(oneshot::RecvTimeoutError::Disconnected) => panic!("Processor exited"),
56 //! }
57 //! }
58 //!
59 //! // If compiled with the `async` feature, the `Receiver` can be awaited in an async context
60 //! #[cfg(feature = "async")] {
61 //! tokio::runtime::Runtime::new()
62 //! .unwrap()
63 //! .block_on(async move {
64 //! let (response_sender, response_receiver) = oneshot::channel();
65 //! let request = Request::from("data from sync thread");
66 //!
67 //! processor.send((request, response_sender)).expect("Processor down");
68 //! match response_receiver.await { // <- Receive on the oneshot channel asynchronously
69 //! Ok(result) => println!("Processor returned {}", result),
70 //! Err(_e) => panic!("Processor exited"),
71 //! }
72 //! });
73 //! }
74 //! ```
75 //!
76 //! # Sync vs async
77 //!
78 //! The main motivation for writing this library was that there were no (known to me) channel
79 //! implementations allowing you to seamlessly send messages between a normal thread and an async
80 //! task, or the other way around. If message passing is the way you are communicating, of course
81 //! that should work smoothly between the sync and async parts of the program!
82 //!
83 //! This library achieves that by having a fast and cheap send operation that can
84 //! be used in both sync threads and async tasks. The receiver has both thread blocking
85 //! receive methods for synchronous usage, and implements `Future` for asynchronous usage.
86 //!
87 //! The receiving endpoint of this channel implements Rust's `Future` trait and can be waited on
88 //! in an asynchronous task. This implementation is completely executor/runtime agnostic. It should
89 //! be possible to use this library with any executor.
90 //!
91
92 // # Implementation description
93 //
94 // When a channel is created via the channel function, it creates a single heap allocation
95 // containing:
96 // * A one byte atomic integer that represents the current channel state,
97 // * Uninitialized memory to fit the message,
98 // * Uninitialized memory to fit the waker that can wake the receiving task or thread up.
99 //
100 // The size of the waker depends on which features are activated, it ranges from 0 to 24 bytes[1].
101 // So with all features enabled (the default) each channel allocates 25 bytes plus the size of the
102 // message, plus any padding needed to get correct memory alignment.
103 //
104 // The Sender and Receiver only holds a raw pointer to the heap channel object. The last endpoint
105 // to be consumed or dropped is responsible for freeing the heap memory. The first endpoint to
106 // be consumed or dropped signal via the state that it is gone. And the second one see this and
107 // frees the memory.
108 //
109 // ## Footnotes
110 //
111 // [1]: Mind that the waker only takes zero bytes when all features are disabled, making it
112 // impossible to *wait* for the message. `try_recv` the only available method in this scenario.
113
114 #![deny(rust_2018_idioms)]
115 #![cfg_attr(not(feature = "std"), no_std)]
116
117 #[cfg(not(loom))]
118 extern crate alloc;
119
120 use core::{
121 marker::PhantomData,
122 mem::{self, MaybeUninit},
123 ptr::{self, NonNull},
124 };
125
126 #[cfg(not(loom))]
127 use core::{
128 cell::UnsafeCell,
129 sync::atomic::{fence, AtomicU8, Ordering::*},
130 };
131 #[cfg(loom)]
132 use loom::{
133 cell::UnsafeCell,
134 sync::atomic::{fence, AtomicU8, Ordering::*},
135 };
136
137 #[cfg(all(feature = "async", not(loom)))]
138 use core::hint;
139 #[cfg(all(feature = "async", loom))]
140 use loom::hint;
141
142 #[cfg(feature = "async")]
143 use core::{
144 pin::Pin,
145 task::{self, Poll},
146 };
147 #[cfg(feature = "std")]
148 use std::time::{Duration, Instant};
149
150 #[cfg(feature = "std")]
151 mod thread {
152 #[cfg(not(loom))]
153 pub use std::thread::{current, park, park_timeout, yield_now, Thread};
154
155 #[cfg(loom)]
156 pub use loom::thread::{current, park, yield_now, Thread};
157
158 // loom does not support parking with a timeout. So we just
159 // yield. This means that the "park" will "spuriously" wake up
160 // way too early. But the code should properly handle this.
161 // One thing to note is that very short timeouts are needed
162 // when using loom, since otherwise the looping will cause
163 // an overflow in loom.
164 #[cfg(loom)]
park_timeout(_timeout: std::time::Duration)165 pub fn park_timeout(_timeout: std::time::Duration) {
166 loom::thread::yield_now()
167 }
168 }
169
170 #[cfg(loom)]
171 mod loombox;
172 #[cfg(not(loom))]
173 use alloc::boxed::Box;
174 #[cfg(loom)]
175 use loombox::Box;
176
177 mod errors;
178 pub use errors::{RecvError, RecvTimeoutError, SendError, TryRecvError};
179
180 /// Creates a new oneshot channel and returns the two endpoints, [`Sender`] and [`Receiver`].
channel<T>() -> (Sender<T>, Receiver<T>)181 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
182 // Allocate the channel on the heap and get the pointer.
183 // The last endpoint of the channel to be alive is responsible for freeing the channel
184 // and dropping any object that might have been written to it.
185
186 let channel_ptr = Box::into_raw(Box::new(Channel::new()));
187
188 // SAFETY: `channel_ptr` came from a Box and thus is not null
189 let channel_ptr = unsafe { NonNull::new_unchecked(channel_ptr) };
190
191 (
192 Sender {
193 channel_ptr,
194 _invariant: PhantomData,
195 },
196 Receiver { channel_ptr },
197 )
198 }
199
200 #[derive(Debug)]
201 pub struct Sender<T> {
202 channel_ptr: NonNull<Channel<T>>,
203 // In reality we want contravariance, however we can't obtain that.
204 //
205 // Consider the following scenario:
206 // ```
207 // let (mut tx, rx) = channel::<&'short u8>();
208 // let (tx2, rx2) = channel::<&'long u8>();
209 //
210 // tx = tx2;
211 //
212 // // Pretend short_ref is some &'short u8
213 // tx.send(short_ref).unwrap();
214 // let long_ref = rx2.recv().unwrap();
215 // ```
216 //
217 // If this type were covariant then we could safely extend lifetimes, which is not okay.
218 // Hence, we enforce invariance.
219 _invariant: PhantomData<fn(T) -> T>,
220 }
221
222 #[derive(Debug)]
223 pub struct Receiver<T> {
224 // Covariance is the right choice here. Consider the example presented in Sender, and you'll
225 // see that if we replaced `rx` instead then we would get the expected behavior
226 channel_ptr: NonNull<Channel<T>>,
227 }
228
229 unsafe impl<T: Send> Send for Sender<T> {}
230 unsafe impl<T: Send> Send for Receiver<T> {}
231 impl<T> Unpin for Receiver<T> {}
232
233 impl<T> Sender<T> {
234 /// Sends `message` over the channel to the corresponding [`Receiver`].
235 ///
236 /// Returns an error if the receiver has already been dropped. The message can
237 /// be extracted from the error.
238 ///
239 /// This method is lock-free and wait-free when sending on a channel that the
240 /// receiver is currently not receiving on. If the receiver is receiving during the send
241 /// operation this method includes waking up the thread/task. Unparking a thread involves
242 /// a mutex in Rust's standard library at the time of writing this.
243 /// How lock-free waking up an async task is
244 /// depends on your executor. If this method returns a `SendError`, please mind that dropping
245 /// the error involves running any drop implementation on the message type, and freeing the
246 /// channel's heap allocation, which might or might not be lock-free.
send(self, message: T) -> Result<(), SendError<T>>247 pub fn send(self, message: T) -> Result<(), SendError<T>> {
248 let channel_ptr = self.channel_ptr;
249
250 // Don't run our Drop implementation if send was called, any cleanup now happens here
251 mem::forget(self);
252
253 // SAFETY: The channel exists on the heap for the entire duration of this method and we
254 // only ever acquire shared references to it. Note that if the receiver disconnects it
255 // does not free the channel.
256 let channel = unsafe { channel_ptr.as_ref() };
257
258 // Write the message into the channel on the heap.
259 // SAFETY: The receiver only ever accesses this memory location if we are in the MESSAGE
260 // state, and since we're responsible for setting that state, we can guarantee that we have
261 // exclusive access to this memory location to perform this write.
262 unsafe { channel.write_message(message) };
263
264 // Set the state to signal there is a message on the channel.
265 // ORDERING: we use release ordering to ensure the write of the message is visible to the
266 // receiving thread. The EMPTY and DISCONNECTED branches do not observe any shared state,
267 // and thus we do not need acquire orderng. The RECEIVING branch manages synchronization
268 // independent of this operation.
269 //
270 // EMPTY + 1 = MESSAGE
271 // RECEIVING + 1 = UNPARKING
272 // DISCONNECTED + 1 = invalid, however this state is never observed
273 match channel.state.fetch_add(1, Release) {
274 // The receiver is alive and has not started waiting. Send done.
275 EMPTY => Ok(()),
276 // The receiver is waiting. Wake it up so it can return the message.
277 RECEIVING => {
278 // ORDERING: Synchronizes with the write of the waker to memory, and prevents the
279 // taking of the waker from being ordered before this operation.
280 fence(Acquire);
281
282 // Take the waker, but critically do not unpark it. If we unparked now, then the
283 // receiving thread could still observe the UNPARKING state and re-park, meaning
284 // that after we change to the MESSAGE state, it would remain parked indefinitely
285 // or until a spurious wakeup.
286 // SAFETY: at this point we are in the UNPARKING state, and the receiving thread
287 // does not access the waker while in this state, nor does it free the channel
288 // allocation in this state.
289 let waker = unsafe { channel.take_waker() };
290
291 // ORDERING: this ordering serves two-fold: it synchronizes with the acquire load
292 // in the receiving thread, ensuring that both our read of the waker and write of
293 // the message happen-before the taking of the message and freeing of the channel.
294 // Furthermore, we need acquire ordering to ensure the unparking of the receiver
295 // happens after the channel state is updated.
296 channel.state.swap(MESSAGE, AcqRel);
297
298 // Note: it is possible that between the store above and this statement that
299 // the receiving thread is spuriously unparked, takes the message, and frees
300 // the channel allocation. However, we took ownership of the channel out of
301 // that allocation, and freeing the channel does not drop the waker since the
302 // waker is wrapped in MaybeUninit. Therefore this data is valid regardless of
303 // whether or not the receive has completed by this point.
304 waker.unpark();
305
306 Ok(())
307 }
308 // The receiver was already dropped. The error is responsible for freeing the channel.
309 // SAFETY: since the receiver disconnected it will no longer access `channel_ptr`, so
310 // we can transfer exclusive ownership of the channel's resources to the error.
311 // Moreover, since we just placed the message in the channel, the channel contains a
312 // valid message.
313 DISCONNECTED => Err(unsafe { SendError::new(channel_ptr) }),
314 _ => unreachable!(),
315 }
316 }
317
318 /// Consumes the Sender, returning a raw pointer to the channel on the heap.
319 ///
320 /// This is intended to simplify using oneshot channels with some FFI code. The only safe thing
321 /// to do with the returned pointer is to later reconstruct the Sender with [Sender::from_raw].
322 /// Memory will leak if the Sender is never reconstructed.
into_raw(self) -> *mut ()323 pub fn into_raw(self) -> *mut () {
324 let raw = self.channel_ptr.as_ptr() as *mut ();
325 mem::forget(self);
326 raw
327 }
328
329 /// Consumes a raw pointer from [Sender::into_raw], recreating the Sender.
330 ///
331 /// # Safety
332 ///
333 /// This pointer must have come from [`Sender<T>::into_raw`] with the same message type, `T`.
334 /// At most one Sender must exist for a channel at any point in time.
335 /// Constructing multiple Senders from the same raw pointer leads to undefined behavior.
from_raw(raw: *mut ()) -> Self336 pub unsafe fn from_raw(raw: *mut ()) -> Self {
337 Self {
338 channel_ptr: NonNull::new_unchecked(raw as *mut Channel<T>),
339 _invariant: PhantomData,
340 }
341 }
342 }
343
344 impl<T> Drop for Sender<T> {
drop(&mut self)345 fn drop(&mut self) {
346 // SAFETY: The receiver only ever frees the channel if we are in the MESSAGE or
347 // DISCONNECTED states. If we are in the MESSAGE state, then we called
348 // mem::forget(self), so we should not be in this function call. If we are in the
349 // DISCONNECTED state, then the receiver either received a MESSAGE so this statement is
350 // unreachable, or was dropped and observed that our side was still alive, and thus didn't
351 // free the channel.
352 let channel = unsafe { self.channel_ptr.as_ref() };
353
354 // Set the channel state to disconnected and read what state the receiver was in
355 // ORDERING: we don't need release ordering here since there are no modifications we
356 // need to make visible to other thread, and the Err(RECEIVING) branch handles
357 // synchronization independent of this cmpxchg
358 //
359 // EMPTY ^ 001 = DISCONNECTED
360 // RECEIVING ^ 001 = UNPARKING
361 // DISCONNECTED ^ 001 = EMPTY (invalid), but this state is never observed
362 match channel.state.fetch_xor(0b001, Relaxed) {
363 // The receiver has not started waiting, nor is it dropped.
364 EMPTY => (),
365 // The receiver is waiting. Wake it up so it can detect that the channel disconnected.
366 RECEIVING => {
367 // See comments in Sender::send
368
369 fence(Acquire);
370
371 let waker = unsafe { channel.take_waker() };
372
373 // We still need release ordering here to make sure our read of the waker happens
374 // before this, and acquire ordering to ensure the unparking of the receiver
375 // happens after this.
376 channel.state.swap(DISCONNECTED, AcqRel);
377
378 // The Acquire ordering above ensures that the write of the DISCONNECTED state
379 // happens-before unparking the receiver.
380 waker.unpark();
381 }
382 // The receiver was already dropped. We are responsible for freeing the channel.
383 DISCONNECTED => {
384 // SAFETY: when the receiver switches the state to DISCONNECTED they have received
385 // the message or will no longer be trying to receive the message, and have
386 // observed that the sender is still alive, meaning that we're responsible for
387 // freeing the channel allocation.
388 unsafe { dealloc(self.channel_ptr) };
389 }
390 _ => unreachable!(),
391 }
392 }
393 }
394
395 impl<T> Receiver<T> {
396 /// Checks if there is a message in the channel without blocking. Returns:
397 /// * `Ok(message)` if there was a message in the channel.
398 /// * `Err(Empty)` if the [`Sender`] is alive, but has not yet sent a message.
399 /// * `Err(Disconnected)` if the [`Sender`] was dropped before sending anything or if the
400 /// message has already been extracted by a previous receive call.
401 ///
402 /// If a message is returned, the channel is disconnected and any subsequent receive operation
403 /// using this receiver will return an error.
404 ///
405 /// This method is completely lock-free and wait-free. The only thing it does is an atomic
406 /// integer load of the channel state. And if there is a message in the channel it additionally
407 /// performs one atomic integer store and copies the message from the heap to the stack for
408 /// returning it.
try_recv(&self) -> Result<T, TryRecvError>409 pub fn try_recv(&self) -> Result<T, TryRecvError> {
410 // SAFETY: The channel will not be freed while this method is still running.
411 let channel = unsafe { self.channel_ptr.as_ref() };
412
413 // ORDERING: we use acquire ordering to synchronize with the store of the message.
414 match channel.state.load(Acquire) {
415 MESSAGE => {
416 // It's okay to break up the load and store since once we're in the message state
417 // the sender no longer modifies the state
418 // ORDERING: at this point the sender has done its job and is no longer active, so
419 // we don't need to make any side effects visible to it
420 channel.state.store(DISCONNECTED, Relaxed);
421
422 // SAFETY: we are in the MESSAGE state so the message is present
423 Ok(unsafe { channel.take_message() })
424 }
425 EMPTY => Err(TryRecvError::Empty),
426 DISCONNECTED => Err(TryRecvError::Disconnected),
427 #[cfg(feature = "async")]
428 RECEIVING | UNPARKING => Err(TryRecvError::Empty),
429 _ => unreachable!(),
430 }
431 }
432
433 /// Attempts to wait for a message from the [`Sender`], returning an error if the channel is
434 /// disconnected.
435 ///
436 /// This method will always block the current thread if there is no data available and it is
437 /// still possible for the message to be sent. Once the message is sent to the corresponding
438 /// [`Sender`], then this receiver will wake up and return that message.
439 ///
440 /// If the corresponding [`Sender`] has disconnected (been dropped), or it disconnects while
441 /// this call is blocking, this call will wake up and return `Err` to indicate that the message
442 /// can never be received on this channel.
443 ///
444 /// If a sent message has already been extracted from this channel this method will return an
445 /// error.
446 ///
447 /// # Panics
448 ///
449 /// Panics if called after this receiver has been polled asynchronously.
450 #[cfg(feature = "std")]
recv(self) -> Result<T, RecvError>451 pub fn recv(self) -> Result<T, RecvError> {
452 // Note that we don't need to worry about changing the state to disconnected or setting the
453 // state to an invalid value at any point in this function because we take ownership of
454 // self, and this function does not exit until the message has been received or both side
455 // of the channel are inactive and cleaned up.
456
457 let channel_ptr = self.channel_ptr;
458
459 // Don't run our Drop implementation if we are receiving consuming ourselves.
460 mem::forget(self);
461
462 // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
463 // is still alive, meaning that even if the sender was dropped then it would have observed
464 // the fact that we're still alive and left the responsibility of deallocating the
465 // channel to us, so channel_ptr is valid
466 let channel = unsafe { channel_ptr.as_ref() };
467
468 // ORDERING: we use acquire ordering to synchronize with the write of the message in the
469 // case that it's available
470 match channel.state.load(Acquire) {
471 // The sender is alive but has not sent anything yet. We prepare to park.
472 EMPTY => {
473 // Conditionally add a delay here to help the tests trigger the edge cases where
474 // the sender manages to be dropped or send something before we are able to store
475 // our waker object in the channel.
476 #[cfg(oneshot_test_delay)]
477 std::thread::sleep(std::time::Duration::from_millis(10));
478
479 // Write our waker instance to the channel.
480 // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
481 // try to access the waker until it sees the state set to RECEIVING below
482 unsafe { channel.write_waker(ReceiverWaker::current_thread()) };
483
484 // Switch the state to RECEIVING. We need to do this in one atomic step in case the
485 // sender disconnected or sent the message while we wrote the waker to memory. We
486 // don't need to do a compare exchange here however because if the original state
487 // was not EMPTY, then the sender has either finished sending the message or is
488 // being dropped, so the RECEIVING state will never be observed after we return.
489 // ORDERING: we use release ordering so the sender can synchronize with our writing
490 // of the waker to memory. The individual branches handle any additional
491 // synchronizaton
492 match channel.state.swap(RECEIVING, Release) {
493 // We stored our waker, now we park until the sender has changed the state
494 EMPTY => loop {
495 thread::park();
496
497 // ORDERING: synchronize with the write of the message
498 match channel.state.load(Acquire) {
499 // The sender sent the message while we were parked.
500 MESSAGE => {
501 // SAFETY: we are in the message state so the message is valid
502 let message = unsafe { channel.take_message() };
503
504 // SAFETY: the Sender delegates the responsibility of deallocating
505 // the channel to us upon sending the message
506 unsafe { dealloc(channel_ptr) };
507
508 break Ok(message);
509 }
510 // The sender was dropped while we were parked.
511 DISCONNECTED => {
512 // SAFETY: the Sender doesn't deallocate the channel allocation in
513 // its drop implementation if we're receiving
514 unsafe { dealloc(channel_ptr) };
515
516 break Err(RecvError);
517 }
518 // State did not change, spurious wakeup, park again.
519 RECEIVING | UNPARKING => (),
520 _ => unreachable!(),
521 }
522 },
523 // The sender sent the message while we prepared to park.
524 MESSAGE => {
525 // ORDERING: Synchronize with the write of the message. This branch is
526 // unlikely to be taken, so it's likely more efficient to use a fence here
527 // instead of AcqRel ordering on the RMW operation
528 fence(Acquire);
529
530 // SAFETY: we started in the empty state and the sender switched us to the
531 // message state. This means that it did not take the waker, so we're
532 // responsible for dropping it.
533 unsafe { channel.drop_waker() };
534
535 // SAFETY: we are in the message state so the message is valid
536 let message = unsafe { channel.take_message() };
537
538 // SAFETY: the Sender delegates the responsibility of deallocating the
539 // channel to us upon sending the message
540 unsafe { dealloc(channel_ptr) };
541
542 Ok(message)
543 }
544 // The sender was dropped before sending anything while we prepared to park.
545 DISCONNECTED => {
546 // SAFETY: we started in the empty state and the sender switched us to the
547 // disconnected state. It does not take the waker when it does this so we
548 // need to drop it.
549 unsafe { channel.drop_waker() };
550
551 // SAFETY: the sender does not deallocate the channel if it switches from
552 // empty to disconnected so we need to free the allocation
553 unsafe { dealloc(channel_ptr) };
554
555 Err(RecvError)
556 }
557 _ => unreachable!(),
558 }
559 }
560 // The sender already sent the message.
561 MESSAGE => {
562 // SAFETY: we are in the message state so the message is valid
563 let message = unsafe { channel.take_message() };
564
565 // SAFETY: we are already in the message state so the sender has been forgotten
566 // and it's our job to clean up resources
567 unsafe { dealloc(channel_ptr) };
568
569 Ok(message)
570 }
571 // The sender was dropped before sending anything, or we already received the message.
572 DISCONNECTED => {
573 // SAFETY: the sender does not deallocate the channel if it switches from empty to
574 // disconnected so we need to free the allocation
575 unsafe { dealloc(channel_ptr) };
576
577 Err(RecvError)
578 }
579 // The receiver must have been `Future::poll`ed prior to this call.
580 #[cfg(feature = "async")]
581 RECEIVING | UNPARKING => panic!("{}", RECEIVER_USED_SYNC_AND_ASYNC_ERROR),
582 _ => unreachable!(),
583 }
584 }
585
586 /// Attempts to wait for a message from the [`Sender`], returning an error if the channel is
587 /// disconnected. This is a non consuming version of [`Receiver::recv`], but with a bit
588 /// worse performance. Prefer `[`Receiver::recv`]` if your code allows consuming the receiver.
589 ///
590 /// If a message is returned, the channel is disconnected and any subsequent receive operation
591 /// using this receiver will return an error.
592 ///
593 /// # Panics
594 ///
595 /// Panics if called after this receiver has been polled asynchronously.
596 #[cfg(feature = "std")]
recv_ref(&self) -> Result<T, RecvError>597 pub fn recv_ref(&self) -> Result<T, RecvError> {
598 self.start_recv_ref(RecvError, |channel| {
599 loop {
600 thread::park();
601
602 // ORDERING: we use acquire ordering to synchronize with the write of the message
603 match channel.state.load(Acquire) {
604 // The sender sent the message while we were parked.
605 // We take the message and mark the channel disconnected.
606 MESSAGE => {
607 // ORDERING: the sender is inactive at this point so we don't need to make
608 // any reads or writes visible to the sending thread
609 channel.state.store(DISCONNECTED, Relaxed);
610
611 // SAFETY: we were just in the message state so the message is valid
612 break Ok(unsafe { channel.take_message() });
613 }
614 // The sender was dropped while we were parked.
615 DISCONNECTED => break Err(RecvError),
616 // State did not change, spurious wakeup, park again.
617 RECEIVING | UNPARKING => (),
618 _ => unreachable!(),
619 }
620 }
621 })
622 }
623
624 /// Like [`Receiver::recv`], but will not block longer than `timeout`. Returns:
625 /// * `Ok(message)` if there was a message in the channel before the timeout was reached.
626 /// * `Err(Timeout)` if no message arrived on the channel before the timeout was reached.
627 /// * `Err(Disconnected)` if the sender was dropped before sending anything or if the message
628 /// has already been extracted by a previous receive call.
629 ///
630 /// If a message is returned, the channel is disconnected and any subsequent receive operation
631 /// using this receiver will return an error.
632 ///
633 /// If the supplied `timeout` is so large that Rust's `Instant` type can't represent this point
634 /// in the future this falls back to an indefinitely blocking receive operation.
635 ///
636 /// # Panics
637 ///
638 /// Panics if called after this receiver has been polled asynchronously.
639 #[cfg(feature = "std")]
recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>640 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
641 match Instant::now().checked_add(timeout) {
642 Some(deadline) => self.recv_deadline(deadline),
643 None => self.recv_ref().map_err(|_| RecvTimeoutError::Disconnected),
644 }
645 }
646
647 /// Like [`Receiver::recv`], but will not block longer than until `deadline`. Returns:
648 /// * `Ok(message)` if there was a message in the channel before the deadline was reached.
649 /// * `Err(Timeout)` if no message arrived on the channel before the deadline was reached.
650 /// * `Err(Disconnected)` if the sender was dropped before sending anything or if the message
651 /// has already been extracted by a previous receive call.
652 ///
653 /// If a message is returned, the channel is disconnected and any subsequent receive operation
654 /// using this receiver will return an error.
655 ///
656 /// # Panics
657 ///
658 /// Panics if called after this receiver has been polled asynchronously.
659 #[cfg(feature = "std")]
recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError>660 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
661 /// # Safety
662 ///
663 /// If the sender is unparking us after a message send, the message must already have been
664 /// written to the channel and an acquire memory barrier issued before calling this function
665 #[cold]
666 unsafe fn wait_for_unpark<T>(channel: &Channel<T>) -> Result<T, RecvTimeoutError> {
667 loop {
668 thread::park();
669
670 // ORDERING: The callee has already synchronized with any message write
671 match channel.state.load(Relaxed) {
672 MESSAGE => {
673 // ORDERING: the sender has been dropped, so this update only
674 // needs to be visible to us
675 channel.state.store(DISCONNECTED, Relaxed);
676 break Ok(channel.take_message());
677 }
678 DISCONNECTED => break Err(RecvTimeoutError::Disconnected),
679 // The sender is still unparking us. We continue on the empty state here since
680 // the current implementation eagerly sets the state to EMPTY upon timeout.
681 EMPTY => (),
682 _ => unreachable!(),
683 }
684 }
685 }
686
687 self.start_recv_ref(RecvTimeoutError::Disconnected, |channel| {
688 loop {
689 match deadline.checked_duration_since(Instant::now()) {
690 Some(timeout) => {
691 thread::park_timeout(timeout);
692
693 // ORDERING: synchronize with the write of the message
694 match channel.state.load(Acquire) {
695 // The sender sent the message while we were parked.
696 MESSAGE => {
697 // ORDERING: the sender has been `mem::forget`-ed so this update
698 // only needs to be visible to us.
699 channel.state.store(DISCONNECTED, Relaxed);
700
701 // SAFETY: we either are in the message state or were just in the
702 // message state
703 break Ok(unsafe { channel.take_message() });
704 }
705 // The sender was dropped while we were parked.
706 DISCONNECTED => break Err(RecvTimeoutError::Disconnected),
707 // State did not change, spurious wakeup, park again.
708 RECEIVING | UNPARKING => (),
709 _ => unreachable!(),
710 }
711 }
712 None => {
713 // ORDERING: synchronize with the write of the message
714 match channel.state.swap(EMPTY, Acquire) {
715 // We reached the end of the timeout without receiving a message
716 RECEIVING => {
717 // SAFETY: we were in the receiving state and are now in the empty
718 // state, so the sender has not and will not try to read the waker,
719 // so we have exclusive access to drop it.
720 unsafe { channel.drop_waker() };
721
722 break Err(RecvTimeoutError::Timeout);
723 }
724 // The sender sent the message while we were parked.
725 MESSAGE => {
726 // Same safety and ordering as the Some branch
727
728 channel.state.store(DISCONNECTED, Relaxed);
729 break Ok(unsafe { channel.take_message() });
730 }
731 // The sender was dropped while we were parked.
732 DISCONNECTED => {
733 // ORDERING: we were originally in the disconnected state meaning
734 // that the sender is inactive and no longer observing the state,
735 // so we only need to change it back to DISCONNECTED for if the
736 // receiver is dropped or a recv* method is called again
737 channel.state.store(DISCONNECTED, Relaxed);
738
739 break Err(RecvTimeoutError::Disconnected);
740 }
741 // The sender sent the message and started unparking us
742 UNPARKING => {
743 // We were in the UNPARKING state and are now in the EMPTY state.
744 // We wait to be properly unparked and to observe if the sender
745 // sets MESSAGE or DISCONNECTED state.
746 // SAFETY: The load above has synchronized with any message write.
747 break unsafe { wait_for_unpark(channel) };
748 }
749 _ => unreachable!(),
750 }
751 }
752 }
753 }
754 })
755 }
756
757 /// Begins the process of receiving on the channel by reference. If the message is already
758 /// ready, or the sender has disconnected, then this function will return the appropriate
759 /// Result immediately. Otherwise, it will write the waker to memory, check to see if the
760 /// sender has finished or disconnected again, and then will call `finish`. `finish` is
761 /// thus responsible for cleaning up the channel's resources appropriately before it returns,
762 /// such as destroying the waker, for instance.
763 #[cfg(feature = "std")]
764 #[inline]
start_recv_ref<E>( &self, disconnected_error: E, finish: impl FnOnce(&Channel<T>) -> Result<T, E>, ) -> Result<T, E>765 fn start_recv_ref<E>(
766 &self,
767 disconnected_error: E,
768 finish: impl FnOnce(&Channel<T>) -> Result<T, E>,
769 ) -> Result<T, E> {
770 // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
771 // is still alive, meaning that even if the sender was dropped then it would have observed
772 // the fact that we're still alive and left the responsibility of deallocating the
773 // channel to us, so `self.channel` is valid
774 let channel = unsafe { self.channel_ptr.as_ref() };
775
776 // ORDERING: synchronize with the write of the message
777 match channel.state.load(Acquire) {
778 // The sender is alive but has not sent anything yet. We prepare to park.
779 EMPTY => {
780 // Conditionally add a delay here to help the tests trigger the edge cases where
781 // the sender manages to be dropped or send something before we are able to store
782 // our waker object in the channel.
783 #[cfg(oneshot_test_delay)]
784 std::thread::sleep(std::time::Duration::from_millis(10));
785
786 // Write our waker instance to the channel.
787 // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
788 // try to access the waker until it sees the state set to RECEIVING below
789 unsafe { channel.write_waker(ReceiverWaker::current_thread()) };
790
791 // ORDERING: we use release ordering on success so the sender can synchronize with
792 // our write of the waker. We use relaxed ordering on failure since the sender does
793 // not need to synchronize with our write and the individual match arms handle any
794 // additional synchronization
795 match channel
796 .state
797 .compare_exchange(EMPTY, RECEIVING, Release, Relaxed)
798 {
799 // We stored our waker, now we delegate to the callback to finish the receive
800 // operation
801 Ok(_) => finish(channel),
802 // The sender sent the message while we prepared to finish
803 Err(MESSAGE) => {
804 // See comments in `recv` for ordering and safety
805
806 fence(Acquire);
807
808 unsafe { channel.drop_waker() };
809
810 // ORDERING: the sender has been `mem::forget`-ed so this update only
811 // needs to be visible to us
812 channel.state.store(DISCONNECTED, Relaxed);
813
814 // SAFETY: The MESSAGE state tells us there is a correctly initialized
815 // message
816 Ok(unsafe { channel.take_message() })
817 }
818 // The sender was dropped before sending anything while we prepared to park.
819 Err(DISCONNECTED) => {
820 // See comments in `recv` for safety
821 unsafe { channel.drop_waker() };
822 Err(disconnected_error)
823 }
824 _ => unreachable!(),
825 }
826 }
827 // The sender sent the message. We take the message and mark the channel disconnected.
828 MESSAGE => {
829 // ORDERING: the sender has been `mem::forget`-ed so this update only needs to be
830 // visible to us
831 channel.state.store(DISCONNECTED, Relaxed);
832
833 // SAFETY: we are in the message state so the message is valid
834 Ok(unsafe { channel.take_message() })
835 }
836 // The sender was dropped before sending anything, or we already received the message.
837 DISCONNECTED => Err(disconnected_error),
838 // The receiver must have been `Future::poll`ed prior to this call.
839 #[cfg(feature = "async")]
840 RECEIVING | UNPARKING => panic!("{}", RECEIVER_USED_SYNC_AND_ASYNC_ERROR),
841 _ => unreachable!(),
842 }
843 }
844
845 /// Consumes the Receiver, returning a raw pointer to the channel on the heap.
846 ///
847 /// This is intended to simplify using oneshot channels with some FFI code. The only safe thing
848 /// to do with the returned pointer is to later reconstruct the Receiver with
849 /// [Receiver::from_raw]. Memory will leak if the Receiver is never reconstructed.
into_raw(self) -> *mut ()850 pub fn into_raw(self) -> *mut () {
851 let raw = self.channel_ptr.as_ptr() as *mut ();
852 mem::forget(self);
853 raw
854 }
855
856 /// Consumes a raw pointer from [Receiver::into_raw], recreating the Receiver.
857 ///
858 /// # Safety
859 ///
860 /// This pointer must have come from [`Receiver<T>::into_raw`] with the same message type, `T`.
861 /// At most one Receiver must exist for a channel at any point in time.
862 /// Constructing multiple Receivers from the same raw pointer leads to undefined behavior.
from_raw(raw: *mut ()) -> Self863 pub unsafe fn from_raw(raw: *mut ()) -> Self {
864 Self {
865 channel_ptr: NonNull::new_unchecked(raw as *mut Channel<T>),
866 }
867 }
868 }
869
870 #[cfg(feature = "async")]
871 impl<T> core::future::Future for Receiver<T> {
872 type Output = Result<T, RecvError>;
873
poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>874 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
875 // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
876 // is still alive, meaning that even if the sender was dropped then it would have observed
877 // the fact that we're still alive and left the responsibility of deallocating the
878 // channel to us, so `self.channel` is valid
879 let channel = unsafe { self.channel_ptr.as_ref() };
880
881 // ORDERING: we use acquire ordering to synchronize with the store of the message.
882 match channel.state.load(Acquire) {
883 // The sender is alive but has not sent anything yet.
884 EMPTY => {
885 // SAFETY: We can't be in the forbidden states, and no waker in the channel.
886 unsafe { channel.write_async_waker(cx) }
887 }
888 // We were polled again while waiting for the sender. Replace the waker with the new one.
889 RECEIVING => {
890 // ORDERING: We use relaxed ordering on both success and failure since we have not
891 // written anything above that must be released, and the individual match arms
892 // handle any additional synchronization.
893 match channel
894 .state
895 .compare_exchange(RECEIVING, EMPTY, Relaxed, Relaxed)
896 {
897 // We successfully changed the state back to EMPTY. Replace the waker.
898 // This is the most likely branch to be taken, which is why we don't use any
899 // memory barriers in the compare_exchange above.
900 Ok(_) => {
901 // SAFETY: We wrote the waker in a previous call to poll. We do not need
902 // a memory barrier since the previous write here was by ourselves.
903 unsafe { channel.drop_waker() };
904 // SAFETY: We can't be in the forbidden states, and no waker in the channel.
905 unsafe { channel.write_async_waker(cx) }
906 }
907 // The sender sent the message while we prepared to replace the waker.
908 // We take the message and mark the channel disconnected.
909 // The sender has already taken the waker.
910 Err(MESSAGE) => {
911 // ORDERING: Synchronize with the write of the message. This branch is
912 // unlikely to be taken.
913 channel.state.swap(DISCONNECTED, Acquire);
914 // SAFETY: The state tells us the sender has initialized the message.
915 Poll::Ready(Ok(unsafe { channel.take_message() }))
916 }
917 // The sender was dropped before sending anything while we prepared to park.
918 // The sender has taken the waker already.
919 Err(DISCONNECTED) => Poll::Ready(Err(RecvError)),
920 // The sender is currently waking us up.
921 Err(UNPARKING) => {
922 // We can't trust that the old waker that the sender has access to
923 // is honored by the async runtime at this point. So we wake ourselves
924 // up to get polled instantly again.
925 cx.waker().wake_by_ref();
926 Poll::Pending
927 }
928 _ => unreachable!(),
929 }
930 }
931 // The sender sent the message.
932 MESSAGE => {
933 // ORDERING: the sender has been dropped so this update only needs to be
934 // visible to us
935 channel.state.store(DISCONNECTED, Relaxed);
936 Poll::Ready(Ok(unsafe { channel.take_message() }))
937 }
938 // The sender was dropped before sending anything, or we already received the message.
939 DISCONNECTED => Poll::Ready(Err(RecvError)),
940 // The sender has observed the RECEIVING state and is currently reading the waker from
941 // a previous poll. We need to loop here until we observe the MESSAGE or DISCONNECTED
942 // state. We busy loop here since we know the sender is done very soon.
943 UNPARKING => loop {
944 hint::spin_loop();
945 // ORDERING: The load above has already synchronized with the write of the message.
946 match channel.state.load(Relaxed) {
947 MESSAGE => {
948 // ORDERING: the sender has been dropped, so this update only
949 // needs to be visible to us
950 channel.state.store(DISCONNECTED, Relaxed);
951 // SAFETY: We observed the MESSAGE state
952 break Poll::Ready(Ok(unsafe { channel.take_message() }));
953 }
954 DISCONNECTED => break Poll::Ready(Err(RecvError)),
955 UNPARKING => (),
956 _ => unreachable!(),
957 }
958 },
959 _ => unreachable!(),
960 }
961 }
962 }
963
964 impl<T> Drop for Receiver<T> {
drop(&mut self)965 fn drop(&mut self) {
966 // SAFETY: since the receiving side is still alive the sender would have observed that and
967 // left deallocating the channel allocation to us.
968 let channel = unsafe { self.channel_ptr.as_ref() };
969
970 // Set the channel state to disconnected and read what state the receiver was in
971 match channel.state.swap(DISCONNECTED, Acquire) {
972 // The sender has not sent anything, nor is it dropped.
973 EMPTY => (),
974 // The sender already sent something. We must drop it, and free the channel.
975 MESSAGE => {
976 // SAFETY: we are in the message state so the message is initialized
977 unsafe { channel.drop_message() };
978
979 // SAFETY: see safety comment at top of function
980 unsafe { dealloc(self.channel_ptr) };
981 }
982 // The receiver has been polled.
983 #[cfg(feature = "async")]
984 RECEIVING => {
985 // TODO: figure this out when async is fixed
986 unsafe { channel.drop_waker() };
987 }
988 // The sender was already dropped. We are responsible for freeing the channel.
989 DISCONNECTED => {
990 // SAFETY: see safety comment at top of function
991 unsafe { dealloc(self.channel_ptr) };
992 }
993 _ => unreachable!(),
994 }
995 }
996 }
997
998 /// All the values that the `Channel::state` field can have during the lifetime of a channel.
999 mod states {
1000 // These values are very explicitly chosen so that we can replace some cmpxchg calls with
1001 // fetch_* calls.
1002
1003 /// The initial channel state. Active while both endpoints are still alive, no message has been
1004 /// sent, and the receiver is not receiving.
1005 pub const EMPTY: u8 = 0b011;
1006 /// A message has been sent to the channel, but the receiver has not yet read it.
1007 pub const MESSAGE: u8 = 0b100;
1008 /// No message has yet been sent on the channel, but the receiver is currently receiving.
1009 pub const RECEIVING: u8 = 0b000;
1010 #[cfg(any(feature = "std", feature = "async"))]
1011 pub const UNPARKING: u8 = 0b001;
1012 /// The channel has been closed. This means that either the sender or receiver has been dropped,
1013 /// or the message sent to the channel has already been received. Since this is a oneshot
1014 /// channel, it is disconnected after the one message it is supposed to hold has been
1015 /// transmitted.
1016 pub const DISCONNECTED: u8 = 0b010;
1017 }
1018 use states::*;
1019
1020 /// Internal channel data structure structure. the `channel` method allocates and puts one instance
1021 /// of this struct on the heap for each oneshot channel instance. The struct holds:
1022 /// * The current state of the channel.
1023 /// * The message in the channel. This memory is uninitialized until the message is sent.
1024 /// * The waker instance for the thread or task that is currently receiving on this channel.
1025 /// This memory is uninitialized until the receiver starts receiving.
1026 struct Channel<T> {
1027 state: AtomicU8,
1028 message: UnsafeCell<MaybeUninit<T>>,
1029 waker: UnsafeCell<MaybeUninit<ReceiverWaker>>,
1030 }
1031
1032 impl<T> Channel<T> {
new() -> Self1033 pub fn new() -> Self {
1034 Self {
1035 state: AtomicU8::new(EMPTY),
1036 message: UnsafeCell::new(MaybeUninit::uninit()),
1037 waker: UnsafeCell::new(MaybeUninit::uninit()),
1038 }
1039 }
1040
1041 #[inline(always)]
message(&self) -> &MaybeUninit<T>1042 unsafe fn message(&self) -> &MaybeUninit<T> {
1043 #[cfg(loom)]
1044 {
1045 self.message.with(|ptr| &*ptr)
1046 }
1047
1048 #[cfg(not(loom))]
1049 {
1050 &*self.message.get()
1051 }
1052 }
1053
1054 #[inline(always)]
with_message_mut<F>(&self, op: F) where F: FnOnce(&mut MaybeUninit<T>),1055 unsafe fn with_message_mut<F>(&self, op: F)
1056 where
1057 F: FnOnce(&mut MaybeUninit<T>),
1058 {
1059 #[cfg(loom)]
1060 {
1061 self.message.with_mut(|ptr| op(&mut *ptr))
1062 }
1063
1064 #[cfg(not(loom))]
1065 {
1066 op(&mut *self.message.get())
1067 }
1068 }
1069
1070 #[inline(always)]
1071 #[cfg(any(feature = "std", feature = "async"))]
with_waker_mut<F>(&self, op: F) where F: FnOnce(&mut MaybeUninit<ReceiverWaker>),1072 unsafe fn with_waker_mut<F>(&self, op: F)
1073 where
1074 F: FnOnce(&mut MaybeUninit<ReceiverWaker>),
1075 {
1076 #[cfg(loom)]
1077 {
1078 self.waker.with_mut(|ptr| op(&mut *ptr))
1079 }
1080
1081 #[cfg(not(loom))]
1082 {
1083 op(&mut *self.waker.get())
1084 }
1085 }
1086
1087 #[inline(always)]
write_message(&self, message: T)1088 unsafe fn write_message(&self, message: T) {
1089 self.with_message_mut(|slot| slot.as_mut_ptr().write(message));
1090 }
1091
1092 #[inline(always)]
take_message(&self) -> T1093 unsafe fn take_message(&self) -> T {
1094 #[cfg(loom)]
1095 {
1096 self.message.with(|ptr| ptr::read(ptr)).assume_init()
1097 }
1098
1099 #[cfg(not(loom))]
1100 {
1101 ptr::read(self.message.get()).assume_init()
1102 }
1103 }
1104
1105 #[inline(always)]
drop_message(&self)1106 unsafe fn drop_message(&self) {
1107 self.with_message_mut(|slot| slot.assume_init_drop());
1108 }
1109
1110 #[cfg(any(feature = "std", feature = "async"))]
1111 #[inline(always)]
write_waker(&self, waker: ReceiverWaker)1112 unsafe fn write_waker(&self, waker: ReceiverWaker) {
1113 self.with_waker_mut(|slot| slot.as_mut_ptr().write(waker));
1114 }
1115
1116 #[inline(always)]
take_waker(&self) -> ReceiverWaker1117 unsafe fn take_waker(&self) -> ReceiverWaker {
1118 #[cfg(loom)]
1119 {
1120 self.waker.with(|ptr| ptr::read(ptr)).assume_init()
1121 }
1122
1123 #[cfg(not(loom))]
1124 {
1125 ptr::read(self.waker.get()).assume_init()
1126 }
1127 }
1128
1129 #[cfg(any(feature = "std", feature = "async"))]
1130 #[inline(always)]
drop_waker(&self)1131 unsafe fn drop_waker(&self) {
1132 self.with_waker_mut(|slot| slot.assume_init_drop());
1133 }
1134
1135 /// # Safety
1136 ///
1137 /// * `Channel::waker` must not have a waker stored in it when calling this method.
1138 /// * Channel state must not be RECEIVING or UNPARKING when calling this method.
1139 #[cfg(feature = "async")]
write_async_waker(&self, cx: &mut task::Context<'_>) -> Poll<Result<T, RecvError>>1140 unsafe fn write_async_waker(&self, cx: &mut task::Context<'_>) -> Poll<Result<T, RecvError>> {
1141 // Write our thread instance to the channel.
1142 // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
1143 // try to access the waker until it sees the state set to RECEIVING below
1144 self.write_waker(ReceiverWaker::task_waker(cx));
1145
1146 // ORDERING: we use release ordering on success so the sender can synchronize with
1147 // our write of the waker. We use relaxed ordering on failure since the sender does
1148 // not need to synchronize with our write and the individual match arms handle any
1149 // additional synchronization
1150 match self
1151 .state
1152 .compare_exchange(EMPTY, RECEIVING, Release, Relaxed)
1153 {
1154 // We stored our waker, now we return and let the sender wake us up
1155 Ok(_) => Poll::Pending,
1156 // The sender sent the message while we prepared to park.
1157 // We take the message and mark the channel disconnected.
1158 Err(MESSAGE) => {
1159 // ORDERING: Synchronize with the write of the message. This branch is
1160 // unlikely to be taken, so it's likely more efficient to use a fence here
1161 // instead of AcqRel ordering on the compare_exchange operation
1162 fence(Acquire);
1163
1164 // SAFETY: we started in the EMPTY state and the sender switched us to the
1165 // MESSAGE state. This means that it did not take the waker, so we're
1166 // responsible for dropping it.
1167 self.drop_waker();
1168
1169 // ORDERING: sender does not exist, so this update only needs to be visible to us
1170 self.state.store(DISCONNECTED, Relaxed);
1171
1172 // SAFETY: The MESSAGE state tells us there is a correctly initialized message
1173 Poll::Ready(Ok(self.take_message()))
1174 }
1175 // The sender was dropped before sending anything while we prepared to park.
1176 Err(DISCONNECTED) => {
1177 // SAFETY: we started in the EMPTY state and the sender switched us to the
1178 // DISCONNECTED state. This means that it did not take the waker, so we're
1179 // responsible for dropping it.
1180 self.drop_waker();
1181 Poll::Ready(Err(RecvError))
1182 }
1183 _ => unreachable!(),
1184 }
1185 }
1186 }
1187
1188 enum ReceiverWaker {
1189 /// The receiver is waiting synchronously. Its thread is parked.
1190 #[cfg(feature = "std")]
1191 Thread(thread::Thread),
1192 /// The receiver is waiting asynchronously. Its task can be woken up with this `Waker`.
1193 #[cfg(feature = "async")]
1194 Task(task::Waker),
1195 /// A little hack to not make this enum an uninhibitable type when no features are enabled.
1196 #[cfg(not(any(feature = "async", feature = "std")))]
1197 _Uninhabited,
1198 }
1199
1200 impl ReceiverWaker {
1201 #[cfg(feature = "std")]
current_thread() -> Self1202 pub fn current_thread() -> Self {
1203 Self::Thread(thread::current())
1204 }
1205
1206 #[cfg(feature = "async")]
task_waker(cx: &task::Context<'_>) -> Self1207 pub fn task_waker(cx: &task::Context<'_>) -> Self {
1208 Self::Task(cx.waker().clone())
1209 }
1210
unpark(self)1211 pub fn unpark(self) {
1212 match self {
1213 #[cfg(feature = "std")]
1214 ReceiverWaker::Thread(thread) => thread.unpark(),
1215 #[cfg(feature = "async")]
1216 ReceiverWaker::Task(waker) => waker.wake(),
1217 #[cfg(not(any(feature = "async", feature = "std")))]
1218 ReceiverWaker::_Uninhabited => unreachable!(),
1219 }
1220 }
1221 }
1222
1223 #[cfg(not(loom))]
1224 #[test]
receiver_waker_size()1225 fn receiver_waker_size() {
1226 let expected: usize = match (cfg!(feature = "std"), cfg!(feature = "async")) {
1227 (false, false) => 0,
1228 (false, true) => 16,
1229 (true, false) => 8,
1230 (true, true) => 16,
1231 };
1232 assert_eq!(mem::size_of::<ReceiverWaker>(), expected);
1233 }
1234
1235 #[cfg(all(feature = "std", feature = "async"))]
1236 const RECEIVER_USED_SYNC_AND_ASYNC_ERROR: &str =
1237 "Invalid to call a blocking receive method on oneshot::Receiver after it has been polled";
1238
1239 #[inline]
dealloc<T>(channel: NonNull<Channel<T>>)1240 pub(crate) unsafe fn dealloc<T>(channel: NonNull<Channel<T>>) {
1241 drop(Box::from_raw(channel.as_ptr()))
1242 }
1243