• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::future::Future;
15 use std::mem::MaybeUninit;
16 use std::pin::Pin;
17 use std::slice::from_raw_parts_mut;
18 use std::string::FromUtf8Error;
19 use std::task::{Context, Poll};
20 use std::{io, mem};
21 
22 use crate::futures::poll_fn;
23 use crate::io::async_buf_read::AsyncBufRead;
24 use crate::io::async_read::AsyncRead;
25 use crate::io::poll_ready;
26 use crate::io::read_buf::ReadBuf;
27 
28 macro_rules! take_reader {
29     ($self: expr) => {
30         match $self.reader.take() {
31             Some(reader) => reader,
32             None => panic!("read: poll after finished"),
33         }
34     };
35 }
36 
37 /// A future for reading available data from the source into a buffer.
38 ///
39 /// Returned by [`crate::io::AsyncReadExt::read`]
40 pub struct ReadTask<'a, R: ?Sized> {
41     reader: Option<&'a mut R>,
42     buf: &'a mut [u8],
43 }
44 
45 impl<'a, R: ?Sized> ReadTask<'a, R> {
46     #[inline(always)]
new(reader: &'a mut R, buf: &'a mut [u8]) -> ReadTask<'a, R>47     pub(crate) fn new(reader: &'a mut R, buf: &'a mut [u8]) -> ReadTask<'a, R> {
48         ReadTask {
49             reader: Some(reader),
50             buf,
51         }
52     }
53 }
54 
55 impl<'a, R> Future for ReadTask<'a, R>
56 where
57     R: AsyncRead + Unpin,
58 {
59     type Output = io::Result<usize>;
60 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>61     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
62         let mut reader = take_reader!(self);
63 
64         let mut buf = ReadBuf::new(self.buf);
65         match Pin::new(&mut reader).poll_read(cx, &mut buf) {
66             Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
67             Poll::Ready(_) => Poll::Ready(Ok(buf.filled_len())),
68             Poll::Pending => {
69                 self.reader = Some(reader);
70                 Poll::Pending
71             }
72         }
73     }
74 }
75 
76 /// A future for reading every data from the source into a vector.
77 ///
78 /// Returned by [`crate::io::AsyncReadExt::read_to_end`]
79 pub struct ReadToEndTask<'a, R: ?Sized> {
80     reader: &'a mut R,
81     buf: &'a mut Vec<u8>,
82     r_len: usize,
83 }
84 
85 impl<'a, R: ?Sized> ReadToEndTask<'a, R> {
86     #[inline(always)]
new(reader: &'a mut R, buf: &'a mut Vec<u8>) -> ReadToEndTask<'a, R>87     pub(crate) fn new(reader: &'a mut R, buf: &'a mut Vec<u8>) -> ReadToEndTask<'a, R> {
88         ReadToEndTask {
89             reader,
90             buf,
91             r_len: 0,
92         }
93     }
94 }
95 
poll_read_to_end<R: AsyncRead + Unpin>( buf: &mut Vec<u8>, mut reader: &mut R, read_len: &mut usize, cx: &mut Context<'_>, ) -> Poll<io::Result<usize>>96 fn poll_read_to_end<R: AsyncRead + Unpin>(
97     buf: &mut Vec<u8>,
98     mut reader: &mut R,
99     read_len: &mut usize,
100     cx: &mut Context<'_>,
101 ) -> Poll<io::Result<usize>> {
102     loop {
103         // Allocate 32 bytes every time, if the remaining capacity is larger than 32
104         // bytes, this will do nothing.
105         buf.reserve(32);
106         let len = buf.len();
107         let mut read_buf = ReadBuf::uninit(unsafe {
108             from_raw_parts_mut(buf.as_mut_ptr() as *mut MaybeUninit<u8>, buf.capacity())
109         });
110         read_buf.assume_init(len);
111         read_buf.set_filled(len);
112 
113         let poll = Pin::new(&mut reader).poll_read(cx, &mut read_buf);
114         let new_len = read_buf.filled_len();
115         match poll {
116             Poll::Pending => {
117                 return Poll::Pending;
118             }
119             Poll::Ready(Ok(())) if (new_len - len) == 0 => {
120                 return Poll::Ready(Ok(mem::replace(read_len, 0)))
121             }
122             Poll::Ready(Ok(())) => {
123                 *read_len += new_len - len;
124                 unsafe {
125                     buf.set_len(new_len);
126                 }
127             }
128             Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
129         }
130     }
131 }
132 
133 impl<'a, R> Future for ReadToEndTask<'a, R>
134 where
135     R: AsyncRead + Unpin,
136 {
137     type Output = io::Result<usize>;
138 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>139     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
140         let me = self.get_mut();
141         let (buf, reader, read_len) = (&mut me.buf, &mut me.reader, &mut me.r_len);
142         poll_read_to_end(buf, *reader, read_len, cx)
143     }
144 }
145 
146 /// A future for reading every data from the source into a String.
147 ///
148 /// Returned by [`crate::io::AsyncReadExt::read_to_string`]
149 pub struct ReadToStringTask<'a, R: ?Sized> {
150     reader: &'a mut R,
151     buf: Vec<u8>,
152     output: &'a mut String,
153     r_len: usize,
154 }
155 
156 impl<'a, R: ?Sized> ReadToStringTask<'a, R> {
157     #[inline(always)]
new(reader: &'a mut R, dst: &'a mut String) -> ReadToStringTask<'a, R>158     pub(crate) fn new(reader: &'a mut R, dst: &'a mut String) -> ReadToStringTask<'a, R> {
159         ReadToStringTask {
160             reader,
161             buf: mem::take(dst).into_bytes(),
162             output: dst,
163             r_len: 0,
164         }
165     }
166 }
167 
io_string_result( io_res: io::Result<usize>, str_res: Result<String, FromUtf8Error>, read_len: usize, output: &mut String, ) -> Poll<io::Result<usize>>168 fn io_string_result(
169     io_res: io::Result<usize>,
170     str_res: Result<String, FromUtf8Error>,
171     read_len: usize,
172     output: &mut String,
173 ) -> Poll<io::Result<usize>> {
174     match (io_res, str_res) {
175         (Ok(bytes), Ok(string)) => {
176             *output = string;
177             Poll::Ready(Ok(bytes))
178         }
179         (Ok(bytes), Err(trans_err)) => {
180             let mut vector = trans_err.into_bytes();
181             let len = vector.len() - bytes;
182             vector.truncate(len);
183             *output = String::from_utf8(vector).expect("Invalid utf-8 data");
184             Poll::Ready(Err(io::Error::new(
185                 io::ErrorKind::InvalidData,
186                 "Invalid utf-8 data",
187             )))
188         }
189         (Err(io_err), Ok(string)) => {
190             *output = string;
191             Poll::Ready(Err(io_err))
192         }
193         (Err(io_err), Err(trans_err)) => {
194             let mut vector = trans_err.into_bytes();
195             let len = vector.len() - read_len;
196             vector.truncate(len);
197             *output = String::from_utf8(vector).expect("Invalid utf-8 data");
198             Poll::Ready(Err(io_err))
199         }
200     }
201 }
202 
203 impl<'a, R> Future for ReadToStringTask<'a, R>
204 where
205     R: AsyncRead + Unpin,
206 {
207     type Output = io::Result<usize>;
208 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>209     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
210         let me = self.get_mut();
211         let (buf, output, reader, read_len) =
212             (&mut me.buf, &mut me.output, &mut me.reader, &mut me.r_len);
213         let res = poll_ready!(poll_read_to_end(buf, *reader, read_len, cx));
214         let trans = String::from_utf8(mem::take(buf));
215 
216         io_string_result(res, trans, *read_len, output)
217     }
218 }
219 
220 /// A future for reading exact amount of bytes from the source into a vector.
221 ///
222 /// Returned by [`crate::io::AsyncReadExt::read_exact`]
223 pub struct ReadExactTask<'a, R: ?Sized> {
224     reader: Option<&'a mut R>,
225     buf: ReadBuf<'a>,
226 }
227 
228 impl<'a, R: ?Sized> ReadExactTask<'a, R> {
229     #[inline(always)]
new(reader: &'a mut R, buf: &'a mut [u8]) -> ReadExactTask<'a, R>230     pub(crate) fn new(reader: &'a mut R, buf: &'a mut [u8]) -> ReadExactTask<'a, R> {
231         ReadExactTask {
232             reader: Some(reader),
233             buf: ReadBuf::new(buf),
234         }
235     }
236 }
237 
238 impl<'a, R> Future for ReadExactTask<'a, R>
239 where
240     R: AsyncRead + Unpin,
241 {
242     type Output = io::Result<()>;
243 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>244     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
245         let mut reader = take_reader!(self);
246         let this = self.get_mut();
247 
248         loop {
249             let remain = this.buf.remaining();
250             if remain == 0 {
251                 return Poll::Ready(Ok(()));
252             }
253             let _ = match Pin::new(&mut reader).poll_read(cx, &mut this.buf) {
254                 Poll::Pending => {
255                     this.reader = Some(reader);
256                     return Poll::Pending;
257                 }
258                 x => x?,
259             };
260             if this.buf.remaining() == remain {
261                 return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into()));
262             }
263         }
264     }
265 }
266 
267 /// A future for reading every data from the source into a vector until the
268 /// desired delimiter appears.
269 ///
270 /// Returned by [`crate::io::AsyncBufReadExt::read_until`]
271 pub struct ReadUtilTask<'a, R: ?Sized> {
272     reader: &'a mut R,
273     r_len: usize,
274     delim: u8,
275     buf: &'a mut Vec<u8>,
276 }
277 
278 impl<'a, R: ?Sized> ReadUtilTask<'a, R> {
279     #[inline(always)]
new(reader: &'a mut R, delim: u8, buf: &'a mut Vec<u8>) -> ReadUtilTask<'a, R>280     pub(crate) fn new(reader: &'a mut R, delim: u8, buf: &'a mut Vec<u8>) -> ReadUtilTask<'a, R> {
281         ReadUtilTask {
282             reader,
283             r_len: 0,
284             delim,
285             buf,
286         }
287     }
288 }
289 
poll_read_until<R: AsyncBufRead + Unpin>( buf: &mut Vec<u8>, mut reader: &mut R, delim: u8, read_len: &mut usize, cx: &mut Context<'_>, ) -> Poll<io::Result<usize>>290 fn poll_read_until<R: AsyncBufRead + Unpin>(
291     buf: &mut Vec<u8>,
292     mut reader: &mut R,
293     delim: u8,
294     read_len: &mut usize,
295     cx: &mut Context<'_>,
296 ) -> Poll<io::Result<usize>> {
297     loop {
298         let (done, used) = {
299             let available = poll_ready!(Pin::new(&mut reader).poll_fill_buf(cx))?;
300 
301             let ret = available.iter().position(|&val| val == delim);
302 
303             match ret {
304                 None => {
305                     buf.extend_from_slice(available);
306                     (false, available.len())
307                 }
308                 Some(i) => {
309                     buf.extend_from_slice(&available[..=i]);
310                     (true, i + 1)
311                 }
312             }
313         };
314         Pin::new(&mut reader).consume(used);
315         *read_len += used;
316         if done || used == 0 {
317             return Poll::Ready(Ok(mem::replace(read_len, 0)));
318         }
319     }
320 }
321 
322 impl<'a, R> Future for ReadUtilTask<'a, R>
323 where
324     R: AsyncBufRead + Unpin,
325 {
326     type Output = io::Result<usize>;
327 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>328     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
329         let me = self.get_mut();
330         let (buf, reader, delim, read_len) = (&mut me.buf, &mut me.reader, me.delim, &mut me.r_len);
331         poll_read_until(buf, *reader, delim, read_len, cx)
332     }
333 }
334 
335 /// A future for reading every data from the source into a vector until the
336 /// desired delimiter appears.
337 ///
338 /// Returned by [`crate::io::AsyncBufReadExt::read_until`]
339 pub struct ReadLineTask<'a, R: ?Sized> {
340     reader: &'a mut R,
341     r_len: usize,
342     buf: Vec<u8>,
343     output: &'a mut String,
344 }
345 
346 impl<'a, R: ?Sized> ReadLineTask<'a, R> {
347     #[inline(always)]
new(reader: &'a mut R, buf: &'a mut String) -> ReadLineTask<'a, R>348     pub(crate) fn new(reader: &'a mut R, buf: &'a mut String) -> ReadLineTask<'a, R> {
349         ReadLineTask {
350             reader,
351             r_len: 0,
352             buf: mem::take(buf).into_bytes(),
353             output: buf,
354         }
355     }
356 }
357 
358 impl<'a, R> Future for ReadLineTask<'a, R>
359 where
360     R: AsyncBufRead + Unpin,
361 {
362     type Output = io::Result<usize>;
363 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>364     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
365         let me = self.get_mut();
366         let (buf, output, reader, read_len) =
367             (&mut me.buf, &mut me.output, &mut me.reader, &mut me.r_len);
368         let res = poll_ready!(poll_read_until(buf, *reader, b'\n', read_len, cx));
369         let trans = String::from_utf8(mem::take(buf));
370 
371         io_string_result(res, trans, *read_len, output)
372     }
373 }
374 
375 /// A future for reading every data from the source into a vector and splitting
376 /// it into segments by a delimiter.
377 ///
378 /// Returned by [`crate::io::AsyncBufReadExt::split`]
379 pub struct SplitTask<R> {
380     reader: R,
381     delim: u8,
382     buf: Vec<u8>,
383     r_len: usize,
384 }
385 
386 impl<R> SplitTask<R>
387 where
388     R: AsyncBufRead + Unpin,
389 {
new(reader: R, delim: u8) -> SplitTask<R>390     pub(crate) fn new(reader: R, delim: u8) -> SplitTask<R> {
391         SplitTask {
392             reader,
393             delim,
394             buf: Vec::new(),
395             r_len: 0,
396         }
397     }
398 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<Option<Vec<u8>>>>399     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<Option<Vec<u8>>>> {
400         let me = self.get_mut();
401         let (buf, reader, read_len, delim) = (&mut me.buf, &mut me.reader, &mut me.r_len, me.delim);
402         let res = poll_ready!(poll_read_until(buf, reader, delim, read_len, cx))?;
403 
404         if buf.is_empty() && res == 0 {
405             return Poll::Ready(Ok(None));
406         }
407 
408         if buf.last() == Some(&delim) {
409             buf.pop();
410         }
411         Poll::Ready(Ok(Some(mem::take(buf))))
412     }
413 
next(&mut self) -> io::Result<Option<Vec<u8>>>414     pub async fn next(&mut self) -> io::Result<Option<Vec<u8>>> {
415         poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
416     }
417 }
418 
419 /// A future for reading every data from the source into a vector and splitting
420 /// it into segments by row.
421 ///
422 /// Returned by [`crate::io::AsyncBufReadExt::split`]
423 pub struct LinesTask<R> {
424     reader: R,
425     buf: Vec<u8>,
426     output: String,
427     r_len: usize,
428 }
429 
430 impl<R> LinesTask<R>
431 where
432     R: AsyncBufRead,
433 {
new(reader: R) -> LinesTask<R>434     pub(crate) fn new(reader: R) -> LinesTask<R> {
435         LinesTask {
436             reader,
437             buf: Vec::new(),
438             output: String::new(),
439             r_len: 0,
440         }
441     }
442 }
443 
444 impl<R> LinesTask<R>
445 where
446     R: AsyncBufRead + Unpin,
447 {
poll_next_line( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<io::Result<Option<String>>>448     fn poll_next_line(
449         self: Pin<&mut Self>,
450         cx: &mut Context<'_>,
451     ) -> Poll<io::Result<Option<String>>> {
452         let me = self.get_mut();
453         let (buf, output, reader, read_len) =
454             (&mut me.buf, &mut me.output, &mut me.reader, &mut me.r_len);
455         let io_res = poll_ready!(poll_read_until(buf, reader, b'\n', read_len, cx));
456         let str_res = String::from_utf8(mem::take(buf));
457 
458         let res = poll_ready!(io_string_result(io_res, str_res, *read_len, output))?;
459 
460         if output.is_empty() && res == 0 {
461             return Poll::Ready(Ok(None));
462         }
463 
464         if output.ends_with('\n') {
465             output.pop();
466             if output.ends_with('\r') {
467                 output.pop();
468             }
469         }
470         Poll::Ready(Ok(Some(mem::take(output))))
471     }
472 
next_line(&mut self) -> io::Result<Option<String>>473     pub async fn next_line(&mut self) -> io::Result<Option<String>> {
474         poll_fn(|cx| Pin::new(&mut *self).poll_next_line(cx)).await
475     }
476 }
477 
478 #[cfg(all(test, feature = "fs"))]
479 mod test {
480     use crate::fs::{remove_file, File};
481     use crate::io::async_read::AsyncReadExt;
482     use crate::io::async_write::AsyncWriteExt;
483     use crate::io::AsyncBufReader;
484 
485     /// UT test cases for `io_string_result()`.
486     ///
487     /// # Brief
488     /// 1. Create a file and write non-utf8 chars to it.
489     /// 2. Create a AsyncBufReader.
490     /// 3. Call io_string_result() to translate the content of the file to
491     ///    String.
492     /// 4. Check if the test results are expected errors.
493     #[test]
ut_io_string_result()494     fn ut_io_string_result() {
495         let handle = crate::spawn(async move {
496             let file_path = "foo.txt";
497 
498             let mut f = File::create(file_path).await.unwrap();
499             let buf = [0, 159, 146, 150];
500             let n = f.write(&buf).await.unwrap();
501             assert_eq!(n, 4);
502 
503             let f = File::open(file_path).await.unwrap();
504             let mut reader = AsyncBufReader::new(f);
505             let mut buf = String::new();
506             let res = reader.read_to_string(&mut buf).await;
507             assert!(res.is_err());
508             assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::InvalidData);
509 
510             let res = remove_file(file_path).await;
511             assert!(res.is_ok());
512         });
513         crate::block_on(handle).expect("failed to block on");
514     }
515 }
516