• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::error::Error;
15 use std::fmt::{Debug, Display, Formatter};
16 use std::time::{Duration, Instant};
17 
18 use super::Body;
19 use crate::error::HttpClientError;
20 use crate::util::Timeout;
21 use crate::ErrorKind;
22 
23 /// A reader used to read all the body data to a specified location and provide
24 /// echo function.
25 ///
26 /// # Examples
27 ///
28 /// ```
29 /// use ylong_http_client::sync_impl::{BodyProcessError, BodyProcessor, BodyReader};
30 /// use ylong_http_client::TextBody;
31 ///
32 /// // Defines a processor, which provides read and echo ability.
33 /// struct Processor {
34 ///     vec: Vec<u8>,
35 ///     echo: usize,
36 /// }
37 ///
38 /// // Implements `BodyProcessor` trait for `&mut Processor` instead of `Processor`
39 /// // if users want to get the result in struct after reading.
40 /// impl BodyProcessor for &mut Processor {
41 ///     fn write(&mut self, data: &[u8]) -> Result<(), BodyProcessError> {
42 ///         self.vec.extend_from_slice(data);
43 ///         Ok(())
44 ///     }
45 ///
46 ///     fn progress(&mut self, filled: usize) -> Result<(), BodyProcessError> {
47 ///         self.echo += 1;
48 ///         Ok(())
49 ///     }
50 /// }
51 ///
52 /// let mut body = TextBody::from_bytes(b"HelloWorld");
53 /// let mut processor = Processor {
54 ///     vec: Vec::new(),
55 ///     echo: 0,
56 /// };
57 /// let _ = BodyReader::new(&mut processor).read_all(&mut body);
58 ///
59 /// // All data is read.
60 /// assert_eq!(processor.vec, b"HelloWorld");
61 /// // It will be echoed multiple times during the reading process.
62 /// assert_ne!(processor.echo, 0);
63 /// ```
64 pub struct BodyReader<T: BodyProcessor> {
65     pub(crate) read_timeout: Timeout,
66     pub(crate) processor: T,
67 }
68 
69 impl<T: BodyProcessor> BodyReader<T> {
70     /// Creates a new `BodyReader` with the given `Processor`.
71     ///
72     /// # Examples
73     ///
74     /// ```
75     /// use ylong_http_client::sync_impl::{BodyReader, DefaultBodyProcessor};
76     ///
77     /// let reader = BodyReader::new(DefaultBodyProcessor::new());
78     /// ```
new(processor: T) -> Self79     pub fn new(processor: T) -> Self {
80         Self {
81             read_timeout: Timeout::none(),
82             processor,
83         }
84     }
85 
86     /// Sets body read timeout.
87     ///
88     /// # Examples
89     ///
90     /// ```
91     /// use ylong_http_client::sync_impl::{BodyReader, DefaultBodyProcessor};
92     /// use ylong_http_client::util::Timeout;
93     ///
94     /// let reader = BodyReader::new(DefaultBodyProcessor::new()).read_timeout(Timeout::none());
95     /// ```
read_timeout(mut self, timeout: Timeout) -> Self96     pub fn read_timeout(mut self, timeout: Timeout) -> Self {
97         self.read_timeout = timeout;
98         self
99     }
100 
101     /// Reads all the body data. During the read process,
102     /// [`BodyProcessor::write`] and [`BodyProcessor::progress`] will be
103     /// called multiple times.
104     ///
105     /// [`BodyProcessor::write`]: BodyProcessor::write
106     /// [`BodyProcessor::progress`]: BodyProcessor::progress
107     ///
108     /// # Examples
109     ///
110     /// ```
111     /// use ylong_http_client::sync_impl::{BodyProcessor, BodyReader};
112     /// use ylong_http_client::TextBody;
113     ///
114     /// let mut body = TextBody::from_bytes(b"HelloWorld");
115     /// let _ = BodyReader::default().read_all(&mut body);
116     /// ```
read_all<B: Body>(&mut self, body: &mut B) -> Result<(), HttpClientError>117     pub fn read_all<B: Body>(&mut self, body: &mut B) -> Result<(), HttpClientError> {
118         // Use buffers up to 16K in size to read body.
119         const TEMP_BUF_SIZE: usize = 16 * 1024;
120 
121         let mut last = Instant::now();
122         let mut buf = [0u8; TEMP_BUF_SIZE];
123         let mut written = 0usize;
124 
125         loop {
126             let read_len = body
127                 .data(&mut buf)
128                 .map_err(|e| HttpClientError::new_with_cause(ErrorKind::BodyDecode, Some(e)))?;
129 
130             if read_len == 0 {
131                 self.processor
132                     .progress(written)
133                     .map_err(|e| HttpClientError::new_with_cause(ErrorKind::BodyDecode, Some(e)))?;
134                 break;
135             }
136 
137             self.processor
138                 .write(&buf[..read_len])
139                 .map_err(|e| HttpClientError::new_with_cause(ErrorKind::BodyDecode, Some(e)))?;
140 
141             written += read_len;
142 
143             let now = Instant::now();
144             if now.duration_since(last) >= Duration::from_secs(1) {
145                 self.processor
146                     .progress(written)
147                     .map_err(|e| HttpClientError::new_with_cause(ErrorKind::BodyDecode, Some(e)))?;
148             }
149             last = now;
150         }
151         Ok(())
152     }
153 }
154 
155 impl Default for BodyReader<DefaultBodyProcessor> {
default() -> Self156     fn default() -> Self {
157         Self::new(DefaultBodyProcessor::new())
158     }
159 }
160 
161 /// The trait defines methods for processing bodies of HTTP messages. Unlike the
162 /// async version, this is for synchronous usage.
163 pub trait BodyProcessor {
164     /// Writes the body data read each time to the specified location.
165     ///
166     /// This method will be called every time a part of the body data is read.
write(&mut self, data: &[u8]) -> Result<(), BodyProcessError>167     fn write(&mut self, data: &[u8]) -> Result<(), BodyProcessError>;
168 
169     /// Informs users how many bytes have been written to the specified location
170     /// at this time. Users can display the progress according to the number of
171     /// bytes written.
progress(&mut self, filled: usize) -> Result<(), BodyProcessError>172     fn progress(&mut self, filled: usize) -> Result<(), BodyProcessError>;
173 }
174 
175 /// Error occurs when processing body data.
176 #[derive(Debug)]
177 pub struct BodyProcessError;
178 
179 impl Display for BodyProcessError {
fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result180     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
181         Debug::fmt(self, f)
182     }
183 }
184 
185 impl Error for BodyProcessError {}
186 
187 /// A default body processor that write data to console directly.
188 pub struct DefaultBodyProcessor;
189 
190 impl DefaultBodyProcessor {
191     /// Creates a new `DefaultBodyProcessor`.
192     ///
193     /// # Examples
194     ///
195     /// ```
196     /// use ylong_http_client::sync_impl::DefaultBodyProcessor;
197     ///
198     /// let processor = DefaultBodyProcessor::new();
199     /// ```
new() -> Self200     pub fn new() -> Self {
201         Self
202     }
203 }
204 
205 impl BodyProcessor for DefaultBodyProcessor {
write(&mut self, data: &[u8]) -> Result<(), BodyProcessError>206     fn write(&mut self, data: &[u8]) -> Result<(), BodyProcessError> {
207         println!("{data:?}");
208         Ok(())
209     }
210 
progress(&mut self, filled: usize) -> Result<(), BodyProcessError>211     fn progress(&mut self, filled: usize) -> Result<(), BodyProcessError> {
212         println!("filled: {filled}");
213         Ok(())
214     }
215 }
216 
217 impl Default for DefaultBodyProcessor {
default() -> Self218     fn default() -> Self {
219         Self::new()
220     }
221 }
222 
223 #[cfg(test)]
224 mod ut_syn_reader {
225     use ylong_http::body::TextBody;
226 
227     use crate::sync_impl::{BodyReader, DefaultBodyProcessor};
228     use crate::util::Timeout;
229 
230     /// UT test cases for `BodyReader::read_timeout`.
231     ///
232     /// # Brief
233     /// 1. Creates a `BodyReader` with `DefaultBodyProcessor::default` by
234     ///    calling `BodyReader::new`.
235     /// 2. Calls `read_timeout`.
236     /// 3. Checks if the result is correct.
237     #[test]
ut_body_reader_read_timeout()238     fn ut_body_reader_read_timeout() {
239         let reader = BodyReader::new(DefaultBodyProcessor).read_timeout(Timeout::none());
240         assert_eq!(reader.read_timeout, Timeout::none());
241     }
242 
243     /// UT test cases for `BodyReader::read_all`.
244     ///
245     /// # Brief
246     /// 1. Creates a `BodyReader` by calling `BodyReader::default`.
247     /// 2. Creates a `TextBody`.
248     /// 3. Calls `read_all` method.
249     /// 4. Checks if the result is corrent.
250     #[test]
ut_body_reader_read_all()251     fn ut_body_reader_read_all() {
252         let mut body = TextBody::from_bytes(b"HelloWorld");
253         let res = BodyReader::default().read_all(&mut body);
254         assert!(res.is_ok());
255     }
256 }
257