1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 //! One-shot channel is used to send a single message from a single sender to a
15 //! single receiver. The [`channel`] function returns a [`Sender`] and
16 //! [`Receiver`] handle pair that controls channel.
17 //!
18 //! The `Sender` handle is used by the producer to send a message.
19 //! The `Receiver` handle is used by the consumer to receive the message. It has
20 //! implemented the `Future` trait
21 //!
22 //! The `send` method is not async. It can be called from non-async context.
23 //!
24 //! # Examples
25 //!
26 //! ```
27 //! use ylong_runtime::sync::oneshot;
28 //! async fn io_func() {
29 //! let (tx, rx) = oneshot::channel();
30 //! ylong_runtime::spawn(async move {
31 //! if let Err(_) = tx.send(6) {
32 //! println!("Receiver dropped");
33 //! }
34 //! });
35 //!
36 //! match rx.await {
37 //! Ok(v) => println!("received : {:?}", v),
38 //! Err(_) => println!("Sender dropped"),
39 //! }
40 //! }
41 //! ```
42 use std::cell::RefCell;
43 use std::fmt::{Debug, Formatter};
44 use std::future::Future;
45 use std::pin::Pin;
46 use std::sync::atomic::AtomicUsize;
47 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release, SeqCst};
48 use std::sync::Arc;
49 use std::task::Poll::{Pending, Ready};
50 use std::task::{Context, Poll};
51
52 use super::atomic_waker::AtomicWaker;
53 use super::error::{RecvError, TryRecvError};
54
55 /// Initial state.
56 const INIT: usize = 0b00;
57 /// Sender has sent the value.
58 const SENT: usize = 0b01;
59 /// Channel is closed.
60 const CLOSED: usize = 0b10;
61
62 /// Creates a new one-shot channel with a `Sender` and `Receiver` handle pair.
63 ///
64 /// The `Sender` can send a single value to the `Receiver`.
65 ///
66 /// # Examples
67 ///
68 /// ```
69 /// use ylong_runtime::sync::oneshot;
70 /// async fn io_func() {
71 /// let (tx, rx) = oneshot::channel();
72 /// ylong_runtime::spawn(async move {
73 /// if let Err(_) = tx.send(6) {
74 /// println!("Receiver dropped");
75 /// }
76 /// });
77 ///
78 /// match rx.await {
79 /// Ok(v) => println!("received : {:?}", v),
80 /// Err(_) => println!("Sender dropped"),
81 /// }
82 /// }
83 /// ```
channel<T>() -> (Sender<T>, Receiver<T>)84 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
85 let channel = Arc::new(Channel::new());
86 let tx = Sender {
87 channel: channel.clone(),
88 };
89 let rx = Receiver { channel };
90 (tx, rx)
91 }
92
93 /// Sends a single value to the associated [`Receiver`].
94 /// A [`Sender`] and [`Receiver`] handle pair is created by the [`channel`]
95 /// function.
96 ///
97 /// # Examples
98 ///
99 /// ```
100 /// use ylong_runtime::sync::oneshot;
101 /// async fn io_func() {
102 /// let (tx, rx) = oneshot::channel();
103 /// ylong_runtime::spawn(async move {
104 /// if let Err(_) = tx.send(6) {
105 /// println!("Receiver dropped");
106 /// }
107 /// });
108 ///
109 /// match rx.await {
110 /// Ok(v) => println!("received : {:?}", v),
111 /// Err(_) => println!("Sender dropped"),
112 /// }
113 /// }
114 /// ```
115 ///
116 /// The receiver will fail with a [`RecvError`] if the sender is dropped without
117 /// sending a value.
118 ///
119 /// # Examples
120 ///
121 /// ```
122 /// use ylong_runtime::sync::oneshot;
123 /// async fn io_func() {
124 /// let (tx, rx) = oneshot::channel::<()>();
125 /// ylong_runtime::spawn(async move {
126 /// drop(tx);
127 /// });
128 ///
129 /// match rx.await {
130 /// Ok(v) => panic!("This won't happen"),
131 /// Err(_) => println!("Sender dropped"),
132 /// }
133 /// }
134 /// ```
135 #[derive(Debug)]
136 pub struct Sender<T> {
137 channel: Arc<Channel<T>>,
138 }
139
140 impl<T> Sender<T> {
141 /// Sends a single value to the associated [`Receiver`], returns the value
142 /// back if it fails to send.
143 ///
144 /// The sender will consume itself when calling this method. It can send a
145 /// single value in synchronous code as it doesn't need waiting.
146 ///
147 /// # Examples
148 ///
149 /// ```
150 /// use ylong_runtime::sync::oneshot;
151 /// async fn io_func() {
152 /// let (tx, rx) = oneshot::channel();
153 /// ylong_runtime::spawn(async move {
154 /// if let Err(_) = tx.send(6) {
155 /// println!("Receiver dropped");
156 /// }
157 /// });
158 ///
159 /// match rx.await {
160 /// Ok(v) => println!("received : {:?}", v),
161 /// Err(_) => println!("Sender dropped"),
162 /// }
163 /// }
164 /// ```
send(self, value: T) -> Result<(), T>165 pub fn send(self, value: T) -> Result<(), T> {
166 self.channel.value.borrow_mut().replace(value);
167
168 loop {
169 match self.channel.state.load(Acquire) {
170 INIT => {
171 if self
172 .channel
173 .state
174 .compare_exchange(INIT, SENT, AcqRel, Acquire)
175 .is_ok()
176 {
177 self.channel.waker.wake();
178 return Ok(());
179 }
180 }
181 CLOSED => {
182 // value is stored in this function before.
183 return Err(self.channel.take_value().unwrap());
184 }
185 _ => unreachable!(),
186 }
187 }
188 }
189
190 /// Checks whether channel is closed. if so, the sender could not
191 /// send any value anymore. It returns true if the [`Receiver`] is dropped
192 /// or calls the [`close`] method.
193 ///
194 /// [`close`]: Receiver::close
195 ///
196 /// # Examples
197 ///
198 /// ```
199 /// use ylong_runtime::sync::oneshot;
200 /// async fn io_func() {
201 /// let (tx, rx) = oneshot::channel();
202 /// assert!(!tx.is_closed());
203 ///
204 /// drop(rx);
205 ///
206 /// assert!(tx.is_closed());
207 /// assert!(tx.send("no receive").is_err());
208 /// }
209 /// ```
is_closed(&self) -> bool210 pub fn is_closed(&self) -> bool {
211 self.channel.state.load(Acquire) == CLOSED
212 }
213 }
214
215 impl<T> Drop for Sender<T> {
drop(&mut self)216 fn drop(&mut self) {
217 if self.channel.state.swap(SENT, SeqCst) == INIT {
218 self.channel.waker.wake();
219 }
220 }
221 }
222
223 /// Receives a single value from the associated [`Sender`].
224 /// A [`Sender`] and [`Receiver`] handle pair is created by the [`channel`]
225 /// function.
226 ///
227 /// There is no `recv` method to receive the message because the receiver itself
228 /// implements the [`Future`] trait. To receive a value, `.await` the `Receiver`
229 /// object directly.
230 ///
231 /// # Examples
232 ///
233 /// ```
234 /// use ylong_runtime::sync::oneshot;
235 /// async fn io_func() {
236 /// let (tx, rx) = oneshot::channel();
237 /// ylong_runtime::spawn(async move {
238 /// if let Err(_) = tx.send(6) {
239 /// println!("Receiver dropped");
240 /// }
241 /// });
242 ///
243 /// match rx.await {
244 /// Ok(v) => println!("received : {:?}", v),
245 /// Err(_) => println!("Sender dropped"),
246 /// }
247 /// }
248 /// ```
249 ///
250 /// The receiver will fail with [`RecvError`], if the sender is dropped without
251 /// sending a value.
252 ///
253 /// # Examples
254 ///
255 /// ```
256 /// use ylong_runtime::sync::oneshot;
257 /// async fn io_func() {
258 /// let (tx, rx) = oneshot::channel::<u32>();
259 /// ylong_runtime::spawn(async move {
260 /// drop(tx);
261 /// });
262 ///
263 /// match rx.await {
264 /// Ok(v) => panic!("This won't happen"),
265 /// Err(_) => println!("Sender dropped"),
266 /// }
267 /// }
268 /// ```
269 #[derive(Debug)]
270 pub struct Receiver<T> {
271 channel: Arc<Channel<T>>,
272 }
273
274 impl<T> Receiver<T> {
275 /// Attempts to receive a value from the associated [`Sender`].
276 ///
277 /// The method will still receive the result if the `Sender` gets dropped
278 /// after sending the message.
279 ///
280 /// # Return value
281 /// The function returns:
282 /// * `Ok(T)` if receiving a value successfully.
283 /// * `Err(TryRecvError::Empty)` if no value has been sent yet.
284 /// * `Err(TryRecvError::Closed)` if the sender has dropped without sending
285 /// a value, or if the message has already been received.
286 ///
287 /// # Examples
288 ///
289 /// `try_recv` before a value is sent, then after.
290 ///
291 /// ```
292 /// use ylong_runtime::sync::error::TryRecvError;
293 /// use ylong_runtime::sync::oneshot;
294 /// async fn io_func() {
295 /// let (tx, mut rx) = oneshot::channel();
296 /// match rx.try_recv() {
297 /// Err(TryRecvError::Empty) => {}
298 /// _ => panic!("This won't happen"),
299 /// }
300 ///
301 /// // Send a value
302 /// tx.send("Hello").unwrap();
303 ///
304 /// match rx.try_recv() {
305 /// Ok(value) => assert_eq!(value, "Hello"),
306 /// _ => panic!("This won't happen"),
307 /// }
308 /// }
309 /// ```
310 ///
311 /// `try_recv` when the sender dropped before sending a value
312 ///
313 /// ```
314 /// use ylong_runtime::sync::error::TryRecvError;
315 /// use ylong_runtime::sync::oneshot;
316 /// async fn io_func() {
317 /// let (tx, mut rx) = oneshot::channel::<()>();
318 /// drop(tx);
319 ///
320 /// match rx.try_recv() {
321 /// Err(TryRecvError::Closed) => {}
322 /// _ => panic!("This won't happen"),
323 /// }
324 /// }
325 /// ```
try_recv(&mut self) -> Result<T, TryRecvError>326 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
327 match self.channel.state.load(Acquire) {
328 INIT => Err(TryRecvError::Empty),
329 SENT => self
330 .channel
331 .take_value_sent()
332 .map_err(|_| TryRecvError::Closed),
333 CLOSED => Err(TryRecvError::Closed),
334 _ => unreachable!(),
335 }
336 }
337
338 /// Closes the channel, prevents the `Sender` from sending a value.
339 ///
340 /// The `Sender` will fail to call [`send`] after the `Receiver` called
341 /// `close`. It will do nothing if the channel is already closed or the
342 /// message has been already received.
343 ///
344 /// [`send`]: Sender::send
345 /// [`try_recv`]: Receiver::try_recv
346 ///
347 /// # Examples
348 /// ```
349 /// use ylong_runtime::sync::oneshot;
350 /// async fn io_func() {
351 /// let (tx, mut rx) = oneshot::channel();
352 /// assert!(!tx.is_closed());
353 ///
354 /// rx.close();
355 ///
356 /// assert!(tx.is_closed());
357 /// assert!(tx.send("no receive").is_err());
358 /// }
359 /// ```
360 ///
361 /// Receive a value sent **before** calling `close`
362 ///
363 /// ```
364 /// use ylong_runtime::sync::oneshot;
365 /// async fn io_func() {
366 /// let (tx, mut rx) = oneshot::channel();
367 /// assert!(tx.send("Hello").is_ok());
368 ///
369 /// rx.close();
370 ///
371 /// let msg = rx.try_recv().unwrap();
372 /// assert_eq!(msg, "Hello");
373 /// }
374 /// ```
close(&mut self)375 pub fn close(&mut self) {
376 let _ = self
377 .channel
378 .state
379 .compare_exchange(INIT, CLOSED, AcqRel, Acquire);
380 }
381 }
382
383 impl<T> Drop for Receiver<T> {
drop(&mut self)384 fn drop(&mut self) {
385 self.close();
386 }
387 }
388
389 impl<T> Future for Receiver<T> {
390 type Output = Result<T, RecvError>;
391
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>392 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
393 match self.channel.state.load(Acquire) {
394 INIT => {
395 self.channel.waker.register_by_ref(cx.waker());
396 if self.channel.state.load(Acquire) == SENT {
397 Ready(self.channel.take_value_sent())
398 } else {
399 Pending
400 }
401 }
402 SENT => Ready(self.channel.take_value_sent()),
403 CLOSED => Ready(Err(RecvError)),
404 _ => unreachable!(),
405 }
406 }
407 }
408
409 struct Channel<T> {
410 /// The state of the channel.
411 state: AtomicUsize,
412
413 /// The value passed by channel, it is set by `Sender` and read by
414 /// `Receiver`.
415 value: RefCell<Option<T>>,
416
417 /// The waker to notify the sender task or the receiver task.
418 waker: AtomicWaker,
419 }
420
421 impl<T> Channel<T> {
new() -> Channel<T>422 fn new() -> Channel<T> {
423 Channel {
424 state: AtomicUsize::new(INIT),
425 value: RefCell::new(None),
426 waker: AtomicWaker::new(),
427 }
428 }
429
take_value_sent(&self) -> Result<T, RecvError>430 fn take_value_sent(&self) -> Result<T, RecvError> {
431 match self.take_value() {
432 Some(val) => {
433 self.state.store(CLOSED, Release);
434 Ok(val)
435 }
436 None => Err(RecvError),
437 }
438 }
439
take_value(&self) -> Option<T>440 fn take_value(&self) -> Option<T> {
441 self.value.borrow_mut().take()
442 }
443 }
444
445 unsafe impl<T: Send> Send for Channel<T> {}
446 unsafe impl<T: Send> Sync for Channel<T> {}
447
448 impl<T> Drop for Channel<T> {
drop(&mut self)449 fn drop(&mut self) {
450 self.waker.take_waker();
451 }
452 }
453
454 impl<T: Debug> Debug for Channel<T> {
fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result455 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
456 f.debug_struct("Channel")
457 .field("state", &self.state.load(Acquire))
458 .finish()
459 }
460 }
461
462 #[cfg(test)]
463 mod tests {
464 use crate::spawn;
465 use crate::sync::error::TryRecvError;
466 use crate::sync::oneshot;
467
468 /// UT test cases for `send()` and `try_recv()`.
469 ///
470 /// # Brief
471 /// 1. Call channel to create a sender and a receiver handle pair.
472 /// 2. Receiver tries receiving a message before the sender sends one.
473 /// 3. Receiver tries receiving a message after the sender sends one.
474 /// 4. Check if the test results are correct.
475 #[test]
send_try_recv()476 fn send_try_recv() {
477 let (tx, mut rx) = oneshot::channel();
478 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
479 tx.send("hello").unwrap();
480
481 assert_eq!(rx.try_recv().unwrap(), "hello");
482 assert_eq!(rx.try_recv(), Err(TryRecvError::Closed));
483 }
484
485 /// UT test cases for `send()` and async receive.
486 ///
487 /// # Brief
488 /// 1. Call channel to create a sender and a receiver handle pair.
489 /// 2. Sender sends message in one thread.
490 /// 3. Receiver receives message in another thread.
491 /// 4. Check if the test results are correct.
492 #[test]
send_recv_await()493 fn send_recv_await() {
494 let (tx, rx) = oneshot::channel();
495 if tx.send(6).is_err() {
496 panic!("Receiver dropped");
497 }
498 spawn(async move {
499 match rx.await {
500 Ok(v) => assert_eq!(v, 6),
501 Err(_) => panic!("Sender dropped"),
502 }
503 });
504 }
505
506 /// UT test cases for `is_closed()` and `close`.
507 ///
508 /// # Brief
509 /// 1. Call channel to create a sender and a receiver handle pair.
510 /// 2. Check whether the sender is closed.
511 /// 3. Close the receiver.
512 /// 4. Check whether the receiver will receive the message sent before it
513 /// closed.
514 /// 5. Check if the test results are correct.
515 #[test]
close_rx()516 fn close_rx() {
517 let (tx, mut rx) = oneshot::channel();
518 assert!(!tx.is_closed());
519 rx.close();
520
521 assert!(tx.is_closed());
522 assert!(tx.send("never received").is_err());
523
524 let (tx, mut rx) = oneshot::channel();
525 assert!(tx.send("will receive").is_ok());
526
527 rx.close();
528
529 let msg = rx.try_recv().unwrap();
530 assert_eq!(msg, "will receive");
531 }
532 }
533