1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::future::Future;
15 use std::mem::MaybeUninit;
16 use std::pin::Pin;
17 use std::slice::from_raw_parts_mut;
18 use std::string::FromUtf8Error;
19 use std::task::{Context, Poll};
20 use std::{io, mem};
21
22 use crate::futures::poll_fn;
23 use crate::io::async_buf_read::AsyncBufRead;
24 use crate::io::async_read::AsyncRead;
25 use crate::io::poll_ready;
26 use crate::io::read_buf::ReadBuf;
27
28 macro_rules! take_reader {
29 ($self: expr) => {
30 match $self.reader.take() {
31 Some(reader) => reader,
32 None => panic!("read: poll after finished"),
33 }
34 };
35 }
36
37 /// A future for reading available data from the source into a buffer.
38 ///
39 /// Returned by [`crate::io::AsyncReadExt::read`]
40 pub struct ReadTask<'a, R: ?Sized> {
41 reader: Option<&'a mut R>,
42 buf: &'a mut [u8],
43 }
44
45 impl<'a, R: ?Sized> ReadTask<'a, R> {
46 #[inline(always)]
new(reader: &'a mut R, buf: &'a mut [u8]) -> ReadTask<'a, R>47 pub(crate) fn new(reader: &'a mut R, buf: &'a mut [u8]) -> ReadTask<'a, R> {
48 ReadTask {
49 reader: Some(reader),
50 buf,
51 }
52 }
53 }
54
55 impl<'a, R> Future for ReadTask<'a, R>
56 where
57 R: AsyncRead + Unpin,
58 {
59 type Output = io::Result<usize>;
60
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>61 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
62 let mut reader = take_reader!(self);
63
64 let mut buf = ReadBuf::new(self.buf);
65 match Pin::new(&mut reader).poll_read(cx, &mut buf) {
66 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
67 Poll::Ready(_) => Poll::Ready(Ok(buf.filled_len())),
68 Poll::Pending => {
69 self.reader = Some(reader);
70 Poll::Pending
71 }
72 }
73 }
74 }
75
76 /// A future for reading every data from the source into a vector.
77 ///
78 /// Returned by [`crate::io::AsyncReadExt::read_to_end`]
79 pub struct ReadToEndTask<'a, R: ?Sized> {
80 reader: &'a mut R,
81 buf: &'a mut Vec<u8>,
82 r_len: usize,
83 }
84
85 impl<'a, R: ?Sized> ReadToEndTask<'a, R> {
86 #[inline(always)]
new(reader: &'a mut R, buf: &'a mut Vec<u8>) -> ReadToEndTask<'a, R>87 pub(crate) fn new(reader: &'a mut R, buf: &'a mut Vec<u8>) -> ReadToEndTask<'a, R> {
88 ReadToEndTask {
89 reader,
90 buf,
91 r_len: 0,
92 }
93 }
94 }
95
poll_read_to_end<R: AsyncRead + Unpin>( buf: &mut Vec<u8>, mut reader: &mut R, read_len: &mut usize, cx: &mut Context<'_>, ) -> Poll<io::Result<usize>>96 fn poll_read_to_end<R: AsyncRead + Unpin>(
97 buf: &mut Vec<u8>,
98 mut reader: &mut R,
99 read_len: &mut usize,
100 cx: &mut Context<'_>,
101 ) -> Poll<io::Result<usize>> {
102 loop {
103 // Allocate 32 bytes every time, if the remaining capacity is larger than 32
104 // bytes, this will do nothing.
105 buf.reserve(32);
106 let len = buf.len();
107 let mut read_buf = ReadBuf::uninit(unsafe {
108 from_raw_parts_mut(buf.as_mut_ptr() as *mut MaybeUninit<u8>, buf.capacity())
109 });
110 read_buf.assume_init(len);
111 read_buf.set_filled(len);
112
113 let poll = Pin::new(&mut reader).poll_read(cx, &mut read_buf);
114 let new_len = read_buf.filled_len();
115 match poll {
116 Poll::Pending => {
117 return Poll::Pending;
118 }
119 Poll::Ready(Ok(())) if (new_len - len) == 0 => {
120 return Poll::Ready(Ok(mem::replace(read_len, 0)))
121 }
122 Poll::Ready(Ok(())) => {
123 *read_len += new_len - len;
124 unsafe {
125 buf.set_len(new_len);
126 }
127 }
128 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
129 }
130 }
131 }
132
133 impl<'a, R> Future for ReadToEndTask<'a, R>
134 where
135 R: AsyncRead + Unpin,
136 {
137 type Output = io::Result<usize>;
138
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>139 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
140 let me = self.get_mut();
141 let (buf, reader, read_len) = (&mut me.buf, &mut me.reader, &mut me.r_len);
142 poll_read_to_end(buf, *reader, read_len, cx)
143 }
144 }
145
146 /// A future for reading every data from the source into a String.
147 ///
148 /// Returned by [`crate::io::AsyncReadExt::read_to_string`]
149 pub struct ReadToStringTask<'a, R: ?Sized> {
150 reader: &'a mut R,
151 buf: Vec<u8>,
152 output: &'a mut String,
153 r_len: usize,
154 }
155
156 impl<'a, R: ?Sized> ReadToStringTask<'a, R> {
157 #[inline(always)]
new(reader: &'a mut R, dst: &'a mut String) -> ReadToStringTask<'a, R>158 pub(crate) fn new(reader: &'a mut R, dst: &'a mut String) -> ReadToStringTask<'a, R> {
159 ReadToStringTask {
160 reader,
161 buf: mem::take(dst).into_bytes(),
162 output: dst,
163 r_len: 0,
164 }
165 }
166 }
167
io_string_result( io_res: io::Result<usize>, str_res: Result<String, FromUtf8Error>, read_len: usize, output: &mut String, ) -> Poll<io::Result<usize>>168 fn io_string_result(
169 io_res: io::Result<usize>,
170 str_res: Result<String, FromUtf8Error>,
171 read_len: usize,
172 output: &mut String,
173 ) -> Poll<io::Result<usize>> {
174 match (io_res, str_res) {
175 (Ok(bytes), Ok(string)) => {
176 *output = string;
177 Poll::Ready(Ok(bytes))
178 }
179 (Ok(bytes), Err(trans_err)) => {
180 let mut vector = trans_err.into_bytes();
181 let len = vector.len() - bytes;
182 vector.truncate(len);
183 *output = String::from_utf8(vector).expect("Invalid utf-8 data");
184 Poll::Ready(Err(io::Error::new(
185 io::ErrorKind::InvalidData,
186 "Invalid utf-8 data",
187 )))
188 }
189 (Err(io_err), Ok(string)) => {
190 *output = string;
191 Poll::Ready(Err(io_err))
192 }
193 (Err(io_err), Err(trans_err)) => {
194 let mut vector = trans_err.into_bytes();
195 let len = vector.len() - read_len;
196 vector.truncate(len);
197 *output = String::from_utf8(vector).expect("Invalid utf-8 data");
198 Poll::Ready(Err(io_err))
199 }
200 }
201 }
202
203 impl<'a, R> Future for ReadToStringTask<'a, R>
204 where
205 R: AsyncRead + Unpin,
206 {
207 type Output = io::Result<usize>;
208
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>209 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
210 let me = self.get_mut();
211 let (buf, output, reader, read_len) =
212 (&mut me.buf, &mut me.output, &mut me.reader, &mut me.r_len);
213 let res = poll_ready!(poll_read_to_end(buf, *reader, read_len, cx));
214 let trans = String::from_utf8(mem::take(buf));
215
216 io_string_result(res, trans, *read_len, output)
217 }
218 }
219
220 /// A future for reading exact amount of bytes from the source into a vector.
221 ///
222 /// Returned by [`crate::io::AsyncReadExt::read_exact`]
223 pub struct ReadExactTask<'a, R: ?Sized> {
224 reader: Option<&'a mut R>,
225 buf: ReadBuf<'a>,
226 }
227
228 impl<'a, R: ?Sized> ReadExactTask<'a, R> {
229 #[inline(always)]
new(reader: &'a mut R, buf: &'a mut [u8]) -> ReadExactTask<'a, R>230 pub(crate) fn new(reader: &'a mut R, buf: &'a mut [u8]) -> ReadExactTask<'a, R> {
231 ReadExactTask {
232 reader: Some(reader),
233 buf: ReadBuf::new(buf),
234 }
235 }
236 }
237
238 impl<'a, R> Future for ReadExactTask<'a, R>
239 where
240 R: AsyncRead + Unpin,
241 {
242 type Output = io::Result<()>;
243
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>244 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
245 let mut reader = take_reader!(self);
246 let this = self.get_mut();
247
248 loop {
249 let remain = this.buf.remaining();
250 if remain == 0 {
251 return Poll::Ready(Ok(()));
252 }
253 let _ = match Pin::new(&mut reader).poll_read(cx, &mut this.buf) {
254 Poll::Pending => {
255 this.reader = Some(reader);
256 return Poll::Pending;
257 }
258 x => x?,
259 };
260 if this.buf.remaining() == remain {
261 return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into()));
262 }
263 }
264 }
265 }
266
267 /// A future for reading every data from the source into a vector until the
268 /// desired delimiter appears.
269 ///
270 /// Returned by [`crate::io::AsyncBufReadExt::read_until`]
271 pub struct ReadUtilTask<'a, R: ?Sized> {
272 reader: &'a mut R,
273 r_len: usize,
274 delim: u8,
275 buf: &'a mut Vec<u8>,
276 }
277
278 impl<'a, R: ?Sized> ReadUtilTask<'a, R> {
279 #[inline(always)]
new(reader: &'a mut R, delim: u8, buf: &'a mut Vec<u8>) -> ReadUtilTask<'a, R>280 pub(crate) fn new(reader: &'a mut R, delim: u8, buf: &'a mut Vec<u8>) -> ReadUtilTask<'a, R> {
281 ReadUtilTask {
282 reader,
283 r_len: 0,
284 delim,
285 buf,
286 }
287 }
288 }
289
poll_read_until<R: AsyncBufRead + Unpin>( buf: &mut Vec<u8>, mut reader: &mut R, delim: u8, read_len: &mut usize, cx: &mut Context<'_>, ) -> Poll<io::Result<usize>>290 fn poll_read_until<R: AsyncBufRead + Unpin>(
291 buf: &mut Vec<u8>,
292 mut reader: &mut R,
293 delim: u8,
294 read_len: &mut usize,
295 cx: &mut Context<'_>,
296 ) -> Poll<io::Result<usize>> {
297 loop {
298 let (done, used) = {
299 let available = poll_ready!(Pin::new(&mut reader).poll_fill_buf(cx))?;
300
301 let ret = available.iter().position(|&val| val == delim);
302
303 match ret {
304 None => {
305 buf.extend_from_slice(available);
306 (false, available.len())
307 }
308 Some(i) => {
309 buf.extend_from_slice(&available[..=i]);
310 (true, i + 1)
311 }
312 }
313 };
314 Pin::new(&mut reader).consume(used);
315 *read_len += used;
316 if done || used == 0 {
317 return Poll::Ready(Ok(mem::replace(read_len, 0)));
318 }
319 }
320 }
321
322 impl<'a, R> Future for ReadUtilTask<'a, R>
323 where
324 R: AsyncBufRead + Unpin,
325 {
326 type Output = io::Result<usize>;
327
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>328 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
329 let me = self.get_mut();
330 let (buf, reader, delim, read_len) = (&mut me.buf, &mut me.reader, me.delim, &mut me.r_len);
331 poll_read_until(buf, *reader, delim, read_len, cx)
332 }
333 }
334
335 /// A future for reading every data from the source into a vector until the
336 /// desired delimiter appears.
337 ///
338 /// Returned by [`crate::io::AsyncBufReadExt::read_until`]
339 pub struct ReadLineTask<'a, R: ?Sized> {
340 reader: &'a mut R,
341 r_len: usize,
342 buf: Vec<u8>,
343 output: &'a mut String,
344 }
345
346 impl<'a, R: ?Sized> ReadLineTask<'a, R> {
347 #[inline(always)]
new(reader: &'a mut R, buf: &'a mut String) -> ReadLineTask<'a, R>348 pub(crate) fn new(reader: &'a mut R, buf: &'a mut String) -> ReadLineTask<'a, R> {
349 ReadLineTask {
350 reader,
351 r_len: 0,
352 buf: mem::take(buf).into_bytes(),
353 output: buf,
354 }
355 }
356 }
357
358 impl<'a, R> Future for ReadLineTask<'a, R>
359 where
360 R: AsyncBufRead + Unpin,
361 {
362 type Output = io::Result<usize>;
363
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>364 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
365 let me = self.get_mut();
366 let (buf, output, reader, read_len) =
367 (&mut me.buf, &mut me.output, &mut me.reader, &mut me.r_len);
368 let res = poll_ready!(poll_read_until(buf, *reader, b'\n', read_len, cx));
369 let trans = String::from_utf8(mem::take(buf));
370
371 io_string_result(res, trans, *read_len, output)
372 }
373 }
374
375 /// A future for reading every data from the source into a vector and splitting
376 /// it into segments by a delimiter.
377 ///
378 /// Returned by [`crate::io::AsyncBufReadExt::split`]
379 pub struct SplitTask<R> {
380 reader: R,
381 delim: u8,
382 buf: Vec<u8>,
383 r_len: usize,
384 }
385
386 impl<R> SplitTask<R>
387 where
388 R: AsyncBufRead + Unpin,
389 {
new(reader: R, delim: u8) -> SplitTask<R>390 pub(crate) fn new(reader: R, delim: u8) -> SplitTask<R> {
391 SplitTask {
392 reader,
393 delim,
394 buf: Vec::new(),
395 r_len: 0,
396 }
397 }
398
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<Option<Vec<u8>>>>399 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<Option<Vec<u8>>>> {
400 let me = self.get_mut();
401 let (buf, reader, read_len, delim) = (&mut me.buf, &mut me.reader, &mut me.r_len, me.delim);
402 let res = poll_ready!(poll_read_until(buf, reader, delim, read_len, cx))?;
403
404 if buf.is_empty() && res == 0 {
405 return Poll::Ready(Ok(None));
406 }
407
408 if buf.last() == Some(&delim) {
409 buf.pop();
410 }
411 Poll::Ready(Ok(Some(mem::take(buf))))
412 }
413
next(&mut self) -> io::Result<Option<Vec<u8>>>414 pub async fn next(&mut self) -> io::Result<Option<Vec<u8>>> {
415 poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
416 }
417 }
418
419 /// A future for reading every data from the source into a vector and splitting
420 /// it into segments by row.
421 ///
422 /// Returned by [`crate::io::AsyncBufReadExt::split`]
423 pub struct LinesTask<R> {
424 reader: R,
425 buf: Vec<u8>,
426 output: String,
427 r_len: usize,
428 }
429
430 impl<R> LinesTask<R>
431 where
432 R: AsyncBufRead,
433 {
new(reader: R) -> LinesTask<R>434 pub(crate) fn new(reader: R) -> LinesTask<R> {
435 LinesTask {
436 reader,
437 buf: Vec::new(),
438 output: String::new(),
439 r_len: 0,
440 }
441 }
442 }
443
444 impl<R> LinesTask<R>
445 where
446 R: AsyncBufRead + Unpin,
447 {
poll_next_line( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<io::Result<Option<String>>>448 fn poll_next_line(
449 self: Pin<&mut Self>,
450 cx: &mut Context<'_>,
451 ) -> Poll<io::Result<Option<String>>> {
452 let me = self.get_mut();
453 let (buf, output, reader, read_len) =
454 (&mut me.buf, &mut me.output, &mut me.reader, &mut me.r_len);
455 let io_res = poll_ready!(poll_read_until(buf, reader, b'\n', read_len, cx));
456 let str_res = String::from_utf8(mem::take(buf));
457
458 let res = poll_ready!(io_string_result(io_res, str_res, *read_len, output))?;
459
460 if output.is_empty() && res == 0 {
461 return Poll::Ready(Ok(None));
462 }
463
464 if output.ends_with('\n') {
465 output.pop();
466 if output.ends_with('\r') {
467 output.pop();
468 }
469 }
470 Poll::Ready(Ok(Some(mem::take(output))))
471 }
472
next_line(&mut self) -> io::Result<Option<String>>473 pub async fn next_line(&mut self) -> io::Result<Option<String>> {
474 poll_fn(|cx| Pin::new(&mut *self).poll_next_line(cx)).await
475 }
476 }
477
478 #[cfg(all(test, feature = "fs"))]
479 mod test {
480 use crate::fs::{remove_file, File};
481 use crate::io::async_read::AsyncReadExt;
482 use crate::io::async_write::AsyncWriteExt;
483 use crate::io::AsyncBufReader;
484
485 /// UT test cases for `io_string_result()`.
486 ///
487 /// # Brief
488 /// 1. Create a file and write non-utf8 chars to it.
489 /// 2. Create a AsyncBufReader.
490 /// 3. Call io_string_result() to translate the content of the file to
491 /// String.
492 /// 4. Check if the test results are expected errors.
493 #[test]
ut_io_string_result()494 fn ut_io_string_result() {
495 let handle = crate::spawn(async move {
496 let file_path = "foo.txt";
497
498 let mut f = File::create(file_path).await.unwrap();
499 let buf = [0, 159, 146, 150];
500 let n = f.write(&buf).await.unwrap();
501 assert_eq!(n, 4);
502
503 let f = File::open(file_path).await.unwrap();
504 let mut reader = AsyncBufReader::new(f);
505 let mut buf = String::new();
506 let res = reader.read_to_string(&mut buf).await;
507 assert!(res.is_err());
508 assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::InvalidData);
509
510 let res = remove_file(file_path).await;
511 assert!(res.is_ok());
512 });
513 crate::block_on(handle).expect("failed to block on");
514 }
515 }
516