• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 //! Unbounded channel
15 
16 use crate::sync::error::{RecvError, SendError, TryRecvError};
17 use crate::sync::mpsc::channel::{channel, Rx, Tx};
18 use crate::sync::mpsc::queue::Queue;
19 use crate::sync::mpsc::Container;
20 
21 cfg_time!(
22     use crate::time::timeout;
23     use std::time::Duration;
24     use crate::sync::error::RecvTimeoutError;
25 );
26 /// The sender of unbounded channel.
27 /// A [`UnboundedSender`] and [`UnboundedReceiver`] handle pair are created by
28 /// the [`unbounded_channel`] function.
29 ///
30 /// # Examples
31 ///
32 /// ```
33 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
34 /// async fn io_func() {
35 ///     let (tx, mut rx) = unbounded_channel();
36 ///     let tx2 = tx.clone();
37 ///     assert!(tx.send(1).is_ok());
38 ///     assert!(!tx.is_closed());
39 ///     assert!(tx.is_same(&tx2));
40 ///     let handle = ylong_runtime::spawn(async move {
41 ///         assert_eq!(rx.recv().await, Ok(1));
42 ///     });
43 /// }
44 /// ```
45 pub struct UnboundedSender<T> {
46     channel: Tx<Queue<T>>,
47 }
48 
49 impl<T> Clone for UnboundedSender<T> {
clone(&self) -> Self50     fn clone(&self) -> Self {
51         UnboundedSender {
52             channel: self.channel.clone(),
53         }
54     }
55 }
56 
57 /// The receiver of unbounded channel.
58 /// A [`UnboundedSender`] and [`UnboundedReceiver`] handle pair are created by
59 /// the [`unbounded_channel`] function.
60 ///
61 /// # Examples
62 ///
63 /// ```
64 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
65 /// async fn io_func() {
66 ///     let (tx, mut rx) = unbounded_channel();
67 ///     assert!(rx.try_recv().is_err());
68 ///     assert!(tx.send(1).is_ok());
69 ///     let handle = ylong_runtime::spawn(async move {
70 ///         assert_eq!(rx.len(), 1);
71 ///         assert_eq!(rx.recv().await, Ok(1));
72 ///     });
73 /// }
74 /// ```
75 pub struct UnboundedReceiver<T> {
76     channel: Rx<Queue<T>>,
77 }
78 
79 /// Creates a new mpsc channel and returns a `Sender` and `Receiver` handle
80 /// pair.
81 ///
82 /// # Examples
83 ///
84 /// ```
85 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
86 /// async fn io_func() {
87 ///     let (tx, mut rx) = unbounded_channel();
88 ///     let handle = ylong_runtime::spawn(async move {
89 ///         assert_eq!(rx.recv().await, Ok(1));
90 ///     });
91 ///     assert!(tx.send(1).is_ok());
92 /// }
93 /// ```
unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>)94 pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
95     let queue = Queue::new();
96     let (tx, rx) = channel(queue);
97     (UnboundedSender::new(tx), UnboundedReceiver::new(rx))
98 }
99 
100 impl<T> UnboundedSender<T> {
new(channel: Tx<Queue<T>>) -> UnboundedSender<T>101     fn new(channel: Tx<Queue<T>>) -> UnboundedSender<T> {
102         UnboundedSender { channel }
103     }
104 
105     /// Sends values to the associated receiver.
106     ///
107     /// An error containing the sent value would be returned if the receiver is
108     /// closed or dropped.
109     ///
110     /// # Examples
111     ///
112     /// ```
113     /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
114     /// let (tx, mut rx) = unbounded_channel();
115     /// assert!(tx.send(1).is_ok());
116     /// assert_eq!(rx.try_recv().unwrap(), 1);
117     /// ```
send(&self, value: T) -> Result<(), SendError<T>>118     pub fn send(&self, value: T) -> Result<(), SendError<T>> {
119         self.channel.send(value)
120     }
121 
122     /// Checks whether the channel is closed. If so, the sender could not
123     /// send values anymore. It returns true if the [`UnboundedReceiver`] is
124     /// dropped or calls the [`close`] method.
125     ///
126     /// [`close`]: UnboundedReceiver::close
127     ///
128     /// # Examples
129     ///
130     /// ```
131     /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
132     /// let (tx, rx) = unbounded_channel::<isize>();
133     /// assert!(!tx.is_closed());
134     /// drop(rx);
135     /// assert!(tx.is_closed());
136     /// ```
is_closed(&self) -> bool137     pub fn is_closed(&self) -> bool {
138         self.channel.is_close()
139     }
140 
141     /// Checks whether the sender and another sender belong to the same channel
142     ///
143     /// # Examples
144     ///
145     /// ```
146     /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
147     /// let (tx, rx) = unbounded_channel::<isize>();
148     /// let tx2 = tx.clone();
149     /// assert!(tx.is_same(&tx2));
150     /// ```
is_same(&self, other: &Self) -> bool151     pub fn is_same(&self, other: &Self) -> bool {
152         self.channel.is_same(&other.channel)
153     }
154 
155     /// Gets the number of values in the channel.
156     ///
157     /// # Examples
158     ///
159     /// ```
160     /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
161     /// let (tx, rx) = unbounded_channel();
162     /// assert_eq!(tx.len(), 0);
163     /// tx.send(1).unwrap();
164     /// assert_eq!(tx.len(), 1);
165     /// ```
len(&self) -> usize166     pub fn len(&self) -> usize {
167         self.channel.len()
168     }
169 
170     /// Returns `true` if the channel contains no elements.
171     ///
172     /// # Examples
173     ///
174     /// ```
175     /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
176     /// let (tx, rx) = unbounded_channel();
177     /// assert!(tx.is_empty());
178     /// tx.send(1).unwrap();
179     /// assert!(!tx.is_empty());
180     /// ```
is_empty(&self) -> bool181     pub fn is_empty(&self) -> bool {
182         self.len() == 0
183     }
184 }
185 
186 impl<T> Drop for UnboundedSender<T> {
drop(&mut self)187     fn drop(&mut self) {
188         self.channel.close();
189     }
190 }
191 
192 impl<T> UnboundedReceiver<T> {
new(channel: Rx<Queue<T>>) -> UnboundedReceiver<T>193     fn new(channel: Rx<Queue<T>>) -> UnboundedReceiver<T> {
194         UnboundedReceiver { channel }
195     }
196 
197     /// Gets the number of values in the channel.
198     ///
199     /// # Examples
200     ///
201     /// ```
202     /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
203     /// let (tx, rx) = unbounded_channel();
204     /// tx.send(1).unwrap();
205     /// tx.send(2).unwrap();
206     /// assert_eq!(rx.len(), 2);
207     /// ```
len(&self) -> usize208     pub fn len(&self) -> usize {
209         self.channel.len()
210     }
211 
212     /// Returns `true` if the channel contains no elements.
213     ///
214     /// # Examples
215     ///
216     /// ```
217     /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
218     /// let (tx, rx) = unbounded_channel();
219     /// assert!(rx.is_empty());
220     /// tx.send(1).unwrap();
221     /// assert!(!rx.is_empty());
222     /// ```
is_empty(&self) -> bool223     pub fn is_empty(&self) -> bool {
224         self.len() == 0
225     }
226 
227     /// Attempts to receive a value from the associated [`UnboundedSender`].
228     ///
229     /// # Return value
230     /// * `Ok(T)` if receiving a value successfully.
231     /// * `Err(TryRecvError::Empty)` if no value has been sent yet.
232     /// * `Err(TryRecvError::Closed)` if all senders have been dropped.
233     ///
234     /// # Examples
235     ///
236     /// ```
237     /// use ylong_runtime::sync::error::TryRecvError;
238     /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
239     /// let (tx, mut rx) = unbounded_channel();
240     /// match rx.try_recv() {
241     ///     Err(TryRecvError::Empty) => {}
242     ///     _ => panic!("This won't happen"),
243     /// }
244     /// tx.send(1).unwrap();
245     /// match rx.try_recv() {
246     ///     Ok(_) => {}
247     ///     _ => panic!("This won't happen"),
248     /// }
249     /// drop(tx);
250     /// match rx.try_recv() {
251     ///     Err(TryRecvError::Closed) => {}
252     ///     _ => panic!("This won't happen"),
253     /// }
254     /// ```
try_recv(&mut self) -> Result<T, TryRecvError>255     pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
256         self.channel.try_recv()
257     }
258 
259     /// Receives a value from the associated [`UnboundedSender`].
260     ///
261     /// The `receiver` can still receive all sent messages in the channel after
262     /// the channel is closed.
263     ///
264     /// # Return value
265     /// * `Ok(T)` if receiving a value successfully.
266     /// * `Err(RecvError::Closed)` if all senders have been dropped.
267     ///
268     /// # Examples
269     ///
270     /// ```
271     /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
272     /// async fn io_func() {
273     ///     let (tx, mut rx) = unbounded_channel();
274     ///     let handle = ylong_runtime::spawn(async move {
275     ///         assert_eq!(rx.recv().await, Ok(1));
276     ///     });
277     ///     assert!(tx.send(1).is_ok());
278     /// }
279     /// ```
recv(&mut self) -> Result<T, RecvError>280     pub async fn recv(&mut self) -> Result<T, RecvError> {
281         self.channel.recv().await
282     }
283 
284     /// Attempts to receive a value from the associated [`UnboundedSender`] in a
285     /// limited amount of time.
286     ///
287     /// The `receiver` can still receive all sent messages in the channel after
288     /// the channel is closed.
289     ///
290     /// # Return value
291     /// * `Ok(T)` if receiving a value successfully.
292     /// * `Err(RecvError::Closed)` if all senders have been dropped.
293     /// * `Err(RecvError::TimeOut)` if receiving timeout has elapsed.
294     ///
295     /// # Examples
296     ///
297     /// ```
298     /// use std::time::Duration;
299     ///
300     /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
301     /// async fn io_func() {
302     ///     let (tx, mut rx) = unbounded_channel();
303     ///     let handle = ylong_runtime::spawn(async move {
304     ///         tx.send(1).unwrap();
305     ///         assert_eq!(rx.recv_timeout(Duration::from_millis(10)).await, Ok(1));
306     ///     });
307     /// }
308     /// ```
309     #[cfg(feature = "time")]
recv_timeout(&mut self, time: Duration) -> Result<T, RecvTimeoutError>310     pub async fn recv_timeout(&mut self, time: Duration) -> Result<T, RecvTimeoutError> {
311         match timeout(time, self.channel.recv()).await {
312             Ok(res) => res.map_err(|_| RecvTimeoutError::Closed),
313             Err(_) => Err(RecvTimeoutError::Timeout),
314         }
315     }
316 
317     /// Closes the channel, prevents the `Sender` from sending more values.
318     ///
319     /// The `Sender` will fail to call [`send`] after the `Receiver` called
320     /// `close`. It will do nothing if the channel is already closed.
321     ///
322     /// [`send`]: UnboundedSender::send
323     ///
324     /// # Examples
325     /// ```
326     /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
327     /// async fn io_func() {
328     ///     let (tx, mut rx) = unbounded_channel();
329     ///     assert!(!tx.is_closed());
330     ///
331     ///     rx.close();
332     ///
333     ///     assert!(tx.is_closed());
334     ///     assert!(tx.send("no receive").is_err());
335     /// }
336     /// ```
337     ///
338     /// Receive a value sent **before** calling `close`
339     ///
340     /// ```
341     /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
342     /// async fn io_func() {
343     ///     let (tx, mut rx) = unbounded_channel();
344     ///     assert!(tx.send("Hello").is_ok());
345     ///
346     ///     rx.close();
347     ///
348     ///     let msg = rx.try_recv().unwrap();
349     ///     assert_eq!(msg, "Hello");
350     /// }
351     /// ```
close(&mut self)352     pub fn close(&mut self) {
353         self.channel.close();
354     }
355 }
356 
357 impl<T> Drop for UnboundedReceiver<T> {
drop(&mut self)358     fn drop(&mut self) {
359         self.channel.close();
360     }
361 }
362