• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::io::{Read, Write};
15 use std::{cmp, io};
16 
17 use crate::io::ReadBuf;
18 use crate::task::JoinHandle;
19 
20 const MAX_BUF: usize = 2 * 1024 * 1024;
21 
22 pub(crate) enum State<T> {
23     Idle(Option<BufInner>),
24     Poll(JoinHandle<(io::Result<usize>, BufInner, T)>),
25 }
26 
27 impl<T> State<T> {
init() -> Self28     pub(crate) fn init() -> Self {
29         State::Idle(Some(BufInner::new()))
30     }
31 }
32 
33 pub(crate) struct BufInner {
34     inner: Vec<u8>,
35     pos: usize,
36 }
37 
38 impl BufInner {
new() -> Self39     fn new() -> Self {
40         BufInner {
41             inner: Vec::with_capacity(0),
42             pos: 0,
43         }
44     }
45 
is_empty(&self) -> bool46     pub(crate) fn is_empty(&self) -> bool {
47         self.len() == 0
48     }
49 
len(&self) -> usize50     pub(crate) fn len(&self) -> usize {
51         self.inner.len() - self.pos
52     }
53 
bytes(&self) -> &[u8]54     fn bytes(&self) -> &[u8] {
55         &self.inner[self.pos..]
56     }
57 
set_len(&mut self, buf: &mut ReadBuf<'_>)58     pub(crate) fn set_len(&mut self, buf: &mut ReadBuf<'_>) {
59         let len = cmp::min(buf.remaining(), MAX_BUF);
60         if self.inner.len() < len {
61             self.inner.reserve(len - self.len());
62         }
63         unsafe {
64             self.inner.set_len(len);
65         }
66     }
67 
clone_from(&mut self, buf: &[u8]) -> usize68     pub(crate) fn clone_from(&mut self, buf: &[u8]) -> usize {
69         let n = cmp::min(buf.len(), MAX_BUF);
70         self.inner.extend_from_slice(&buf[..n]);
71         n
72     }
73 
clone_into(&mut self, buf: &mut ReadBuf<'_>) -> usize74     pub(crate) fn clone_into(&mut self, buf: &mut ReadBuf<'_>) -> usize {
75         let n = cmp::min(self.len(), buf.remaining());
76         buf.append(&self.bytes()[..n]);
77         self.pos += n;
78 
79         if self.pos == self.inner.len() {
80             self.inner.truncate(0);
81             self.pos = 0;
82         }
83         n
84     }
85 
read_from<T: Read>(&mut self, std: &mut T) -> io::Result<usize>86     pub(crate) fn read_from<T: Read>(&mut self, std: &mut T) -> io::Result<usize> {
87         let res = loop {
88             match std.read(&mut self.inner) {
89                 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
90                 res => break res,
91             }
92         };
93 
94         match res {
95             Ok(n) => self.inner.truncate(n),
96             Err(_) => self.inner.clear(),
97         }
98 
99         res
100     }
101 
write_into<T: Write>(&mut self, std: &mut T) -> io::Result<()>102     pub(crate) fn write_into<T: Write>(&mut self, std: &mut T) -> io::Result<()> {
103         let res = std.write_all(&self.inner);
104         self.inner.clear();
105         res
106     }
107 }
108 
109 macro_rules! std_async_write {
110     () => {
111         fn poll_write(
112             mut self: Pin<&mut Self>,
113             cx: &mut Context<'_>,
114             buf: &[u8],
115         ) -> Poll<io::Result<usize>> {
116             loop {
117                 match self.state {
118                     State::Idle(ref mut buf_op) => {
119                         let mut buf_inner = buf_op.take().unwrap();
120 
121                         if !buf_inner.is_empty() {
122                             return Poll::Ready(Err(io::Error::new(
123                                 io::ErrorKind::AlreadyExists,
124                                 "inner Buf must be empty before poll!",
125                             )));
126                         }
127 
128                         let n = buf_inner.clone_from(buf);
129 
130                         let mut std = self.std.take().unwrap();
131 
132                         let handle = spawn_blocking(move || {
133                             let res = buf_inner.write_into(&mut std).map(|_| n);
134 
135                             (res, buf_inner, std)
136                         });
137 
138                         self.state = State::Poll(handle);
139                         self.has_written = true;
140                     }
141                     State::Poll(ref mut join_handle) => {
142                         let (res, buf_inner, std) = match Pin::new(join_handle).poll(cx)? {
143                             Poll::Ready(t) => t,
144                             Poll::Pending => return Poll::Pending,
145                         };
146                         self.state = State::Idle(Some(buf_inner));
147                         self.std = Some(std);
148 
149                         let n = res?;
150                         return Poll::Ready(Ok(n));
151                     }
152                 }
153             }
154         }
155 
156         fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
157             loop {
158                 let has_written = self.has_written;
159                 match self.state {
160                     State::Idle(ref mut buf_cell) => {
161                         if !has_written {
162                             return Poll::Ready(Ok(()));
163                         }
164                         let buf = buf_cell.take().unwrap();
165                         let mut inner = self.std.take().unwrap();
166 
167                         self.state = State::Poll(spawn_blocking(move || {
168                             let res = inner.flush().map(|_| 0);
169                             (res, buf, inner)
170                         }));
171 
172                         self.has_written = false;
173                     }
174                     State::Poll(ref mut join_handle) => {
175                         let (res, buf, std) = match Pin::new(join_handle).poll(cx)? {
176                             Poll::Ready(t) => t,
177                             Poll::Pending => return Poll::Pending,
178                         };
179                         self.state = State::Idle(Some(buf));
180                         self.std = Some(std);
181 
182                         res?;
183                     }
184                 }
185             }
186         }
187 
188         fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
189             Poll::Ready(Ok(()))
190         }
191     };
192 }
193 pub(crate) use std_async_write;
194 
195 #[cfg(test)]
196 mod test {
197     use crate::io::stdio::BufInner;
198     use crate::io::ReadBuf;
199 
200     /// UT test cases for `stdout` and `stderr``.
201     ///
202     /// # Brief
203     /// 1. create a `stdout` and a `stderr`.
204     /// 2. write something into `stdout` and `stderr`.
205     /// 3. check operation is ok.
206     #[test]
ut_test_stdio_basic()207     fn ut_test_stdio_basic() {
208         let mut buf_inner = BufInner::new();
209         assert_eq!(buf_inner.pos, 0);
210         assert!(buf_inner.inner.is_empty());
211         assert!(buf_inner.is_empty());
212 
213         let mut buf = [1; 10];
214         let mut read_buf = ReadBuf::new(&mut buf);
215         buf_inner.set_len(&mut read_buf);
216         assert_eq!(buf_inner.len(), 10);
217 
218         let mut buf = [0; 20];
219         let mut read_buf = ReadBuf::new(&mut buf);
220         let n = buf_inner.clone_into(&mut read_buf);
221         assert_eq!(n, 10);
222     }
223 }
224