• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 //! One-shot channel is used to send a single message from a single sender to a
15 //! single receiver. The [`channel`] function returns a [`Sender`] and
16 //! [`Receiver`] handle pair that controls channel.
17 //!
18 //! The `Sender` handle is used by the producer to send a message.
19 //! The `Receiver` handle is used by the consumer to receive the message. It has
20 //! implemented the `Future` trait
21 //!
22 //! The `send` method is not async. It can be called from non-async context.
23 //!
24 //! # Examples
25 //!
26 //! ```
27 //! use ylong_runtime::sync::oneshot;
28 //! async fn io_func() {
29 //!     let (tx, rx) = oneshot::channel();
30 //!     ylong_runtime::spawn(async move {
31 //!         if let Err(_) = tx.send(6) {
32 //!             println!("Receiver dropped");
33 //!         }
34 //!     });
35 //!
36 //!     match rx.await {
37 //!         Ok(v) => println!("received : {:?}", v),
38 //!         Err(_) => println!("Sender dropped"),
39 //!     }
40 //! }
41 //! ```
42 use std::cell::RefCell;
43 use std::fmt::{Debug, Formatter};
44 use std::future::Future;
45 use std::pin::Pin;
46 use std::sync::atomic::AtomicUsize;
47 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release, SeqCst};
48 use std::sync::Arc;
49 use std::task::Poll::{Pending, Ready};
50 use std::task::{Context, Poll};
51 
52 use super::atomic_waker::AtomicWaker;
53 use super::error::{RecvError, TryRecvError};
54 
55 /// Initial state.
56 const INIT: usize = 0b00;
57 /// Sender has sent the value.
58 const SENT: usize = 0b01;
59 /// Channel is closed.
60 const CLOSED: usize = 0b10;
61 
62 /// Creates a new one-shot channel with a `Sender` and `Receiver` handle pair.
63 ///
64 /// The `Sender` can send a single value to the `Receiver`.
65 ///
66 /// # Examples
67 ///
68 /// ```
69 /// use ylong_runtime::sync::oneshot;
70 /// async fn io_func() {
71 ///     let (tx, rx) = oneshot::channel();
72 ///     ylong_runtime::spawn(async move {
73 ///         if let Err(_) = tx.send(6) {
74 ///             println!("Receiver dropped");
75 ///         }
76 ///     });
77 ///
78 ///     match rx.await {
79 ///         Ok(v) => println!("received : {:?}", v),
80 ///         Err(_) => println!("Sender dropped"),
81 ///     }
82 /// }
83 /// ```
channel<T>() -> (Sender<T>, Receiver<T>)84 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
85     let channel = Arc::new(Channel::new());
86     let tx = Sender {
87         channel: channel.clone(),
88     };
89     let rx = Receiver { channel };
90     (tx, rx)
91 }
92 
93 /// Sends a single value to the associated [`Receiver`].
94 /// A [`Sender`] and [`Receiver`] handle pair is created by the [`channel`]
95 /// function.
96 ///
97 /// # Examples
98 ///
99 /// ```
100 /// use ylong_runtime::sync::oneshot;
101 /// async fn io_func() {
102 ///     let (tx, rx) = oneshot::channel();
103 ///     ylong_runtime::spawn(async move {
104 ///         if let Err(_) = tx.send(6) {
105 ///             println!("Receiver dropped");
106 ///         }
107 ///     });
108 ///
109 ///     match rx.await {
110 ///         Ok(v) => println!("received : {:?}", v),
111 ///         Err(_) => println!("Sender dropped"),
112 ///     }
113 /// }
114 /// ```
115 ///
116 /// The receiver will fail with a [`RecvError`] if the sender is dropped without
117 /// sending a value.
118 ///
119 /// # Examples
120 ///
121 /// ```
122 /// use ylong_runtime::sync::oneshot;
123 /// async fn io_func() {
124 ///     let (tx, rx) = oneshot::channel::<()>();
125 ///     ylong_runtime::spawn(async move {
126 ///         drop(tx);
127 ///     });
128 ///
129 ///     match rx.await {
130 ///         Ok(v) => panic!("This won't happen"),
131 ///         Err(_) => println!("Sender dropped"),
132 ///     }
133 /// }
134 /// ```
135 #[derive(Debug)]
136 pub struct Sender<T> {
137     channel: Arc<Channel<T>>,
138 }
139 
140 impl<T> Sender<T> {
141     /// Sends a single value to the associated [`Receiver`], returns the value
142     /// back if it fails to send.
143     ///
144     /// The sender will consume itself when calling this method. It can send a
145     /// single value in synchronous code as it doesn't need waiting.
146     ///
147     /// # Examples
148     ///
149     /// ```
150     /// use ylong_runtime::sync::oneshot;
151     /// async fn io_func() {
152     ///     let (tx, rx) = oneshot::channel();
153     ///     ylong_runtime::spawn(async move {
154     ///         if let Err(_) = tx.send(6) {
155     ///             println!("Receiver dropped");
156     ///         }
157     ///     });
158     ///
159     ///     match rx.await {
160     ///         Ok(v) => println!("received : {:?}", v),
161     ///         Err(_) => println!("Sender dropped"),
162     ///     }
163     /// }
164     /// ```
send(self, value: T) -> Result<(), T>165     pub fn send(self, value: T) -> Result<(), T> {
166         self.channel.value.borrow_mut().replace(value);
167 
168         loop {
169             match self.channel.state.load(Acquire) {
170                 INIT => {
171                     if self
172                         .channel
173                         .state
174                         .compare_exchange(INIT, SENT, AcqRel, Acquire)
175                         .is_ok()
176                     {
177                         self.channel.waker.wake();
178                         return Ok(());
179                     }
180                 }
181                 CLOSED => {
182                     // value is stored in this function before.
183                     return Err(self.channel.take_value().unwrap());
184                 }
185                 _ => unreachable!(),
186             }
187         }
188     }
189 
190     /// Checks whether channel is closed. if so, the sender could not
191     /// send any value anymore. It returns true if the [`Receiver`] is dropped
192     /// or calls the [`close`] method.
193     ///
194     /// [`close`]: Receiver::close
195     ///
196     /// # Examples
197     ///
198     /// ```
199     /// use ylong_runtime::sync::oneshot;
200     /// async fn io_func() {
201     ///     let (tx, rx) = oneshot::channel();
202     ///     assert!(!tx.is_closed());
203     ///
204     ///     drop(rx);
205     ///
206     ///     assert!(tx.is_closed());
207     ///     assert!(tx.send("no receive").is_err());
208     /// }
209     /// ```
is_closed(&self) -> bool210     pub fn is_closed(&self) -> bool {
211         self.channel.state.load(Acquire) == CLOSED
212     }
213 }
214 
215 impl<T> Drop for Sender<T> {
drop(&mut self)216     fn drop(&mut self) {
217         if self.channel.state.swap(SENT, SeqCst) == INIT {
218             self.channel.waker.wake();
219         }
220     }
221 }
222 
223 /// Receives a single value from the associated [`Sender`].
224 /// A [`Sender`] and [`Receiver`] handle pair is created by the [`channel`]
225 /// function.
226 ///
227 /// There is no `recv` method to receive the message because the receiver itself
228 /// implements the [`Future`] trait. To receive a value, `.await` the `Receiver`
229 /// object directly.
230 ///
231 /// # Examples
232 ///
233 /// ```
234 /// use ylong_runtime::sync::oneshot;
235 /// async fn io_func() {
236 ///     let (tx, rx) = oneshot::channel();
237 ///     ylong_runtime::spawn(async move {
238 ///         if let Err(_) = tx.send(6) {
239 ///             println!("Receiver dropped");
240 ///         }
241 ///     });
242 ///
243 ///     match rx.await {
244 ///         Ok(v) => println!("received : {:?}", v),
245 ///         Err(_) => println!("Sender dropped"),
246 ///     }
247 /// }
248 /// ```
249 ///
250 /// The receiver will fail with [`RecvError`], if the sender is dropped without
251 /// sending a value.
252 ///
253 /// # Examples
254 ///
255 /// ```
256 /// use ylong_runtime::sync::oneshot;
257 /// async fn io_func() {
258 ///     let (tx, rx) = oneshot::channel::<u32>();
259 ///     ylong_runtime::spawn(async move {
260 ///         drop(tx);
261 ///     });
262 ///
263 ///     match rx.await {
264 ///         Ok(v) => panic!("This won't happen"),
265 ///         Err(_) => println!("Sender dropped"),
266 ///     }
267 /// }
268 /// ```
269 #[derive(Debug)]
270 pub struct Receiver<T> {
271     channel: Arc<Channel<T>>,
272 }
273 
274 impl<T> Receiver<T> {
275     /// Attempts to receive a value from the associated [`Sender`].
276     ///
277     /// The method will still receive the result if the `Sender` gets dropped
278     /// after sending the message.
279     ///
280     /// # Return value
281     /// The function returns:
282     ///  * `Ok(T)` if receiving a value successfully.
283     ///  * `Err(TryRecvError::Empty)` if no value has been sent yet.
284     ///  * `Err(TryRecvError::Closed)` if the sender has dropped without sending
285     ///   a value, or if the message has already been received.
286     ///
287     /// # Examples
288     ///
289     /// `try_recv` before a value is sent, then after.
290     ///
291     /// ```
292     /// use ylong_runtime::sync::error::TryRecvError;
293     /// use ylong_runtime::sync::oneshot;
294     /// async fn io_func() {
295     ///     let (tx, mut rx) = oneshot::channel();
296     ///     match rx.try_recv() {
297     ///         Err(TryRecvError::Empty) => {}
298     ///         _ => panic!("This won't happen"),
299     ///     }
300     ///
301     ///     // Send a value
302     ///     tx.send("Hello").unwrap();
303     ///
304     ///     match rx.try_recv() {
305     ///         Ok(value) => assert_eq!(value, "Hello"),
306     ///         _ => panic!("This won't happen"),
307     ///     }
308     /// }
309     /// ```
310     ///
311     /// `try_recv` when the sender dropped before sending a value
312     ///
313     /// ```
314     /// use ylong_runtime::sync::error::TryRecvError;
315     /// use ylong_runtime::sync::oneshot;
316     /// async fn io_func() {
317     ///     let (tx, mut rx) = oneshot::channel::<()>();
318     ///     drop(tx);
319     ///
320     ///     match rx.try_recv() {
321     ///         Err(TryRecvError::Closed) => {}
322     ///         _ => panic!("This won't happen"),
323     ///     }
324     /// }
325     /// ```
try_recv(&mut self) -> Result<T, TryRecvError>326     pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
327         match self.channel.state.load(Acquire) {
328             INIT => Err(TryRecvError::Empty),
329             SENT => self
330                 .channel
331                 .take_value_sent()
332                 .map_err(|_| TryRecvError::Closed),
333             CLOSED => Err(TryRecvError::Closed),
334             _ => unreachable!(),
335         }
336     }
337 
338     /// Closes the channel, prevents the `Sender` from sending a value.
339     ///
340     /// The `Sender` will fail to call [`send`] after the `Receiver` called
341     /// `close`. It will do nothing if the channel is already closed or the
342     /// message has been already received.
343     ///
344     /// [`send`]: Sender::send
345     /// [`try_recv`]: Receiver::try_recv
346     ///
347     /// # Examples
348     /// ```
349     /// use ylong_runtime::sync::oneshot;
350     /// async fn io_func() {
351     ///     let (tx, mut rx) = oneshot::channel();
352     ///     assert!(!tx.is_closed());
353     ///
354     ///     rx.close();
355     ///
356     ///     assert!(tx.is_closed());
357     ///     assert!(tx.send("no receive").is_err());
358     /// }
359     /// ```
360     ///
361     /// Receive a value sent **before** calling `close`
362     ///
363     /// ```
364     /// use ylong_runtime::sync::oneshot;
365     /// async fn io_func() {
366     ///     let (tx, mut rx) = oneshot::channel();
367     ///     assert!(tx.send("Hello").is_ok());
368     ///
369     ///     rx.close();
370     ///
371     ///     let msg = rx.try_recv().unwrap();
372     ///     assert_eq!(msg, "Hello");
373     /// }
374     /// ```
close(&mut self)375     pub fn close(&mut self) {
376         let _ = self
377             .channel
378             .state
379             .compare_exchange(INIT, CLOSED, AcqRel, Acquire);
380     }
381 }
382 
383 impl<T> Drop for Receiver<T> {
drop(&mut self)384     fn drop(&mut self) {
385         self.close();
386     }
387 }
388 
389 impl<T> Future for Receiver<T> {
390     type Output = Result<T, RecvError>;
391 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>392     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
393         match self.channel.state.load(Acquire) {
394             INIT => {
395                 self.channel.waker.register_by_ref(cx.waker());
396                 if self.channel.state.load(Acquire) == SENT {
397                     Ready(self.channel.take_value_sent())
398                 } else {
399                     Pending
400                 }
401             }
402             SENT => Ready(self.channel.take_value_sent()),
403             CLOSED => Ready(Err(RecvError)),
404             _ => unreachable!(),
405         }
406     }
407 }
408 
409 struct Channel<T> {
410     /// The state of the channel.
411     state: AtomicUsize,
412 
413     /// The value passed by channel, it is set by `Sender` and read by
414     /// `Receiver`.
415     value: RefCell<Option<T>>,
416 
417     /// The waker to notify the sender task or the receiver task.
418     waker: AtomicWaker,
419 }
420 
421 impl<T> Channel<T> {
new() -> Channel<T>422     fn new() -> Channel<T> {
423         Channel {
424             state: AtomicUsize::new(INIT),
425             value: RefCell::new(None),
426             waker: AtomicWaker::new(),
427         }
428     }
429 
take_value_sent(&self) -> Result<T, RecvError>430     fn take_value_sent(&self) -> Result<T, RecvError> {
431         match self.take_value() {
432             Some(val) => {
433                 self.state.store(CLOSED, Release);
434                 Ok(val)
435             }
436             None => Err(RecvError),
437         }
438     }
439 
take_value(&self) -> Option<T>440     fn take_value(&self) -> Option<T> {
441         self.value.borrow_mut().take()
442     }
443 }
444 
445 unsafe impl<T: Send> Send for Channel<T> {}
446 unsafe impl<T: Send> Sync for Channel<T> {}
447 
448 impl<T> Drop for Channel<T> {
drop(&mut self)449     fn drop(&mut self) {
450         self.waker.take_waker();
451     }
452 }
453 
454 impl<T: Debug> Debug for Channel<T> {
fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result455     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
456         f.debug_struct("Channel")
457             .field("state", &self.state.load(Acquire))
458             .finish()
459     }
460 }
461 
462 #[cfg(test)]
463 mod tests {
464     use crate::spawn;
465     use crate::sync::error::TryRecvError;
466     use crate::sync::oneshot;
467 
468     /// UT test cases for `send()` and `try_recv()`.
469     ///
470     /// # Brief
471     /// 1. Call channel to create a sender and a receiver handle pair.
472     /// 2. Receiver tries receiving a message before the sender sends one.
473     /// 3. Receiver tries receiving a message after the sender sends one.
474     /// 4. Check if the test results are correct.
475     #[test]
send_try_recv()476     fn send_try_recv() {
477         let (tx, mut rx) = oneshot::channel();
478         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
479         tx.send("hello").unwrap();
480 
481         assert_eq!(rx.try_recv().unwrap(), "hello");
482         assert_eq!(rx.try_recv(), Err(TryRecvError::Closed));
483     }
484 
485     /// UT test cases for `send()` and async receive.
486     ///
487     /// # Brief
488     /// 1. Call channel to create a sender and a receiver handle pair.
489     /// 2. Sender sends message in one thread.
490     /// 3. Receiver receives message in another thread.
491     /// 4. Check if the test results are correct.
492     #[test]
send_recv_await()493     fn send_recv_await() {
494         let (tx, rx) = oneshot::channel();
495         if tx.send(6).is_err() {
496             panic!("Receiver dropped");
497         }
498         spawn(async move {
499             match rx.await {
500                 Ok(v) => assert_eq!(v, 6),
501                 Err(_) => panic!("Sender dropped"),
502             }
503         });
504     }
505 
506     /// UT test cases for `is_closed()` and `close`.
507     ///
508     /// # Brief
509     /// 1. Call channel to create a sender and a receiver handle pair.
510     /// 2. Check whether the sender is closed.
511     /// 3. Close the receiver.
512     /// 4. Check whether the receiver will receive the message sent before it
513     ///    closed.
514     /// 5. Check if the test results are correct.
515     #[test]
close_rx()516     fn close_rx() {
517         let (tx, mut rx) = oneshot::channel();
518         assert!(!tx.is_closed());
519         rx.close();
520 
521         assert!(tx.is_closed());
522         assert!(tx.send("never received").is_err());
523 
524         let (tx, mut rx) = oneshot::channel();
525         assert!(tx.send("will receive").is_ok());
526 
527         rx.close();
528 
529         let msg = rx.try_recv().unwrap();
530         assert_eq!(msg, "will receive");
531     }
532 }
533