1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 //! Unbounded channel
15
16 use crate::sync::error::{RecvError, SendError, TryRecvError};
17 use crate::sync::mpsc::channel::{channel, Rx, Tx};
18 use crate::sync::mpsc::queue::Queue;
19 use crate::sync::mpsc::Container;
20
21 cfg_time!(
22 use crate::time::timeout;
23 use std::time::Duration;
24 use crate::sync::error::RecvTimeoutError;
25 );
26 /// The sender of unbounded channel.
27 /// A [`UnboundedSender`] and [`UnboundedReceiver`] handle pair are created by
28 /// the [`unbounded_channel`] function.
29 ///
30 /// # Examples
31 ///
32 /// ```
33 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
34 /// async fn io_func() {
35 /// let (tx, mut rx) = unbounded_channel();
36 /// let tx2 = tx.clone();
37 /// assert!(tx.send(1).is_ok());
38 /// assert!(!tx.is_closed());
39 /// assert!(tx.is_same(&tx2));
40 /// let handle = ylong_runtime::spawn(async move {
41 /// assert_eq!(rx.recv().await, Ok(1));
42 /// });
43 /// }
44 /// ```
45 pub struct UnboundedSender<T> {
46 channel: Tx<Queue<T>>,
47 }
48
49 impl<T> Clone for UnboundedSender<T> {
clone(&self) -> Self50 fn clone(&self) -> Self {
51 UnboundedSender {
52 channel: self.channel.clone(),
53 }
54 }
55 }
56
57 /// The receiver of unbounded channel.
58 /// A [`UnboundedSender`] and [`UnboundedReceiver`] handle pair are created by
59 /// the [`unbounded_channel`] function.
60 ///
61 /// # Examples
62 ///
63 /// ```
64 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
65 /// async fn io_func() {
66 /// let (tx, mut rx) = unbounded_channel();
67 /// assert!(rx.try_recv().is_err());
68 /// assert!(tx.send(1).is_ok());
69 /// let handle = ylong_runtime::spawn(async move {
70 /// assert_eq!(rx.len(), 1);
71 /// assert_eq!(rx.recv().await, Ok(1));
72 /// });
73 /// }
74 /// ```
75 pub struct UnboundedReceiver<T> {
76 channel: Rx<Queue<T>>,
77 }
78
79 /// Creates a new mpsc channel and returns a `Sender` and `Receiver` handle
80 /// pair.
81 ///
82 /// # Examples
83 ///
84 /// ```
85 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
86 /// async fn io_func() {
87 /// let (tx, mut rx) = unbounded_channel();
88 /// let handle = ylong_runtime::spawn(async move {
89 /// assert_eq!(rx.recv().await, Ok(1));
90 /// });
91 /// assert!(tx.send(1).is_ok());
92 /// }
93 /// ```
unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>)94 pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
95 let queue = Queue::new();
96 let (tx, rx) = channel(queue);
97 (UnboundedSender::new(tx), UnboundedReceiver::new(rx))
98 }
99
100 impl<T> UnboundedSender<T> {
new(channel: Tx<Queue<T>>) -> UnboundedSender<T>101 fn new(channel: Tx<Queue<T>>) -> UnboundedSender<T> {
102 UnboundedSender { channel }
103 }
104
105 /// Sends values to the associated receiver.
106 ///
107 /// An error containing the sent value would be returned if the receiver is
108 /// closed or dropped.
109 ///
110 /// # Examples
111 ///
112 /// ```
113 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
114 /// let (tx, mut rx) = unbounded_channel();
115 /// assert!(tx.send(1).is_ok());
116 /// assert_eq!(rx.try_recv().unwrap(), 1);
117 /// ```
send(&self, value: T) -> Result<(), SendError<T>>118 pub fn send(&self, value: T) -> Result<(), SendError<T>> {
119 self.channel.send(value)
120 }
121
122 /// Checks whether the channel is closed. If so, the sender could not
123 /// send values anymore. It returns true if the [`UnboundedReceiver`] is
124 /// dropped or calls the [`close`] method.
125 ///
126 /// [`close`]: UnboundedReceiver::close
127 ///
128 /// # Examples
129 ///
130 /// ```
131 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
132 /// let (tx, rx) = unbounded_channel::<isize>();
133 /// assert!(!tx.is_closed());
134 /// drop(rx);
135 /// assert!(tx.is_closed());
136 /// ```
is_closed(&self) -> bool137 pub fn is_closed(&self) -> bool {
138 self.channel.is_close()
139 }
140
141 /// Checks whether the sender and another sender belong to the same channel
142 ///
143 /// # Examples
144 ///
145 /// ```
146 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
147 /// let (tx, rx) = unbounded_channel::<isize>();
148 /// let tx2 = tx.clone();
149 /// assert!(tx.is_same(&tx2));
150 /// ```
is_same(&self, other: &Self) -> bool151 pub fn is_same(&self, other: &Self) -> bool {
152 self.channel.is_same(&other.channel)
153 }
154
155 /// Gets the number of values in the channel.
156 ///
157 /// # Examples
158 ///
159 /// ```
160 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
161 /// let (tx, rx) = unbounded_channel();
162 /// assert_eq!(tx.len(), 0);
163 /// tx.send(1).unwrap();
164 /// assert_eq!(tx.len(), 1);
165 /// ```
len(&self) -> usize166 pub fn len(&self) -> usize {
167 self.channel.len()
168 }
169
170 /// Returns `true` if the channel contains no elements.
171 ///
172 /// # Examples
173 ///
174 /// ```
175 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
176 /// let (tx, rx) = unbounded_channel();
177 /// assert!(tx.is_empty());
178 /// tx.send(1).unwrap();
179 /// assert!(!tx.is_empty());
180 /// ```
is_empty(&self) -> bool181 pub fn is_empty(&self) -> bool {
182 self.len() == 0
183 }
184 }
185
186 impl<T> Drop for UnboundedSender<T> {
drop(&mut self)187 fn drop(&mut self) {
188 self.channel.close();
189 }
190 }
191
192 impl<T> UnboundedReceiver<T> {
new(channel: Rx<Queue<T>>) -> UnboundedReceiver<T>193 fn new(channel: Rx<Queue<T>>) -> UnboundedReceiver<T> {
194 UnboundedReceiver { channel }
195 }
196
197 /// Gets the number of values in the channel.
198 ///
199 /// # Examples
200 ///
201 /// ```
202 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
203 /// let (tx, rx) = unbounded_channel();
204 /// tx.send(1).unwrap();
205 /// tx.send(2).unwrap();
206 /// assert_eq!(rx.len(), 2);
207 /// ```
len(&self) -> usize208 pub fn len(&self) -> usize {
209 self.channel.len()
210 }
211
212 /// Returns `true` if the channel contains no elements.
213 ///
214 /// # Examples
215 ///
216 /// ```
217 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
218 /// let (tx, rx) = unbounded_channel();
219 /// assert!(rx.is_empty());
220 /// tx.send(1).unwrap();
221 /// assert!(!rx.is_empty());
222 /// ```
is_empty(&self) -> bool223 pub fn is_empty(&self) -> bool {
224 self.len() == 0
225 }
226
227 /// Attempts to receive a value from the associated [`UnboundedSender`].
228 ///
229 /// # Return value
230 /// * `Ok(T)` if receiving a value successfully.
231 /// * `Err(TryRecvError::Empty)` if no value has been sent yet.
232 /// * `Err(TryRecvError::Closed)` if all senders have been dropped.
233 ///
234 /// # Examples
235 ///
236 /// ```
237 /// use ylong_runtime::sync::error::TryRecvError;
238 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
239 /// let (tx, mut rx) = unbounded_channel();
240 /// match rx.try_recv() {
241 /// Err(TryRecvError::Empty) => {}
242 /// _ => panic!("This won't happen"),
243 /// }
244 /// tx.send(1).unwrap();
245 /// match rx.try_recv() {
246 /// Ok(_) => {}
247 /// _ => panic!("This won't happen"),
248 /// }
249 /// drop(tx);
250 /// match rx.try_recv() {
251 /// Err(TryRecvError::Closed) => {}
252 /// _ => panic!("This won't happen"),
253 /// }
254 /// ```
try_recv(&mut self) -> Result<T, TryRecvError>255 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
256 self.channel.try_recv()
257 }
258
259 /// Receives a value from the associated [`UnboundedSender`].
260 ///
261 /// The `receiver` can still receive all sent messages in the channel after
262 /// the channel is closed.
263 ///
264 /// # Return value
265 /// * `Ok(T)` if receiving a value successfully.
266 /// * `Err(RecvError::Closed)` if all senders have been dropped.
267 ///
268 /// # Examples
269 ///
270 /// ```
271 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
272 /// async fn io_func() {
273 /// let (tx, mut rx) = unbounded_channel();
274 /// let handle = ylong_runtime::spawn(async move {
275 /// assert_eq!(rx.recv().await, Ok(1));
276 /// });
277 /// assert!(tx.send(1).is_ok());
278 /// }
279 /// ```
recv(&mut self) -> Result<T, RecvError>280 pub async fn recv(&mut self) -> Result<T, RecvError> {
281 self.channel.recv().await
282 }
283
284 /// Attempts to receive a value from the associated [`UnboundedSender`] in a
285 /// limited amount of time.
286 ///
287 /// The `receiver` can still receive all sent messages in the channel after
288 /// the channel is closed.
289 ///
290 /// # Return value
291 /// * `Ok(T)` if receiving a value successfully.
292 /// * `Err(RecvError::Closed)` if all senders have been dropped.
293 /// * `Err(RecvError::TimeOut)` if receiving timeout has elapsed.
294 ///
295 /// # Examples
296 ///
297 /// ```
298 /// use std::time::Duration;
299 ///
300 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
301 /// async fn io_func() {
302 /// let (tx, mut rx) = unbounded_channel();
303 /// let handle = ylong_runtime::spawn(async move {
304 /// tx.send(1).unwrap();
305 /// assert_eq!(rx.recv_timeout(Duration::from_millis(10)).await, Ok(1));
306 /// });
307 /// }
308 /// ```
309 #[cfg(feature = "time")]
recv_timeout(&mut self, time: Duration) -> Result<T, RecvTimeoutError>310 pub async fn recv_timeout(&mut self, time: Duration) -> Result<T, RecvTimeoutError> {
311 match timeout(time, self.channel.recv()).await {
312 Ok(res) => res.map_err(|_| RecvTimeoutError::Closed),
313 Err(_) => Err(RecvTimeoutError::Timeout),
314 }
315 }
316
317 /// Closes the channel, prevents the `Sender` from sending more values.
318 ///
319 /// The `Sender` will fail to call [`send`] after the `Receiver` called
320 /// `close`. It will do nothing if the channel is already closed.
321 ///
322 /// [`send`]: UnboundedSender::send
323 ///
324 /// # Examples
325 /// ```
326 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
327 /// async fn io_func() {
328 /// let (tx, mut rx) = unbounded_channel();
329 /// assert!(!tx.is_closed());
330 ///
331 /// rx.close();
332 ///
333 /// assert!(tx.is_closed());
334 /// assert!(tx.send("no receive").is_err());
335 /// }
336 /// ```
337 ///
338 /// Receive a value sent **before** calling `close`
339 ///
340 /// ```
341 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
342 /// async fn io_func() {
343 /// let (tx, mut rx) = unbounded_channel();
344 /// assert!(tx.send("Hello").is_ok());
345 ///
346 /// rx.close();
347 ///
348 /// let msg = rx.try_recv().unwrap();
349 /// assert_eq!(msg, "Hello");
350 /// }
351 /// ```
close(&mut self)352 pub fn close(&mut self) {
353 self.channel.close();
354 }
355 }
356
357 impl<T> Drop for UnboundedReceiver<T> {
drop(&mut self)358 fn drop(&mut self) {
359 self.channel.close();
360 }
361 }
362