1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::error::Error; 15 use std::fmt::{Debug, Display, Formatter}; 16 use std::time::{Duration, Instant}; 17 18 use super::Body; 19 use crate::error::HttpClientError; 20 use crate::util::Timeout; 21 use crate::ErrorKind; 22 23 /// A reader used to read all the body data to a specified location and provide 24 /// echo function. 25 /// 26 /// # Examples 27 /// 28 /// ``` 29 /// use ylong_http_client::sync_impl::{BodyProcessError, BodyProcessor, BodyReader}; 30 /// use ylong_http_client::TextBody; 31 /// 32 /// // Defines a processor, which provides read and echo ability. 33 /// struct Processor { 34 /// vec: Vec<u8>, 35 /// echo: usize, 36 /// } 37 /// 38 /// // Implements `BodyProcessor` trait for `&mut Processor` instead of `Processor` 39 /// // if users want to get the result in struct after reading. 40 /// impl BodyProcessor for &mut Processor { 41 /// fn write(&mut self, data: &[u8]) -> Result<(), BodyProcessError> { 42 /// self.vec.extend_from_slice(data); 43 /// Ok(()) 44 /// } 45 /// 46 /// fn progress(&mut self, filled: usize) -> Result<(), BodyProcessError> { 47 /// self.echo += 1; 48 /// Ok(()) 49 /// } 50 /// } 51 /// 52 /// let mut body = TextBody::from_bytes(b"HelloWorld"); 53 /// let mut processor = Processor { 54 /// vec: Vec::new(), 55 /// echo: 0, 56 /// }; 57 /// let _ = BodyReader::new(&mut processor).read_all(&mut body); 58 /// 59 /// // All data is read. 60 /// assert_eq!(processor.vec, b"HelloWorld"); 61 /// // It will be echoed multiple times during the reading process. 62 /// assert_ne!(processor.echo, 0); 63 /// ``` 64 pub struct BodyReader<T: BodyProcessor> { 65 pub(crate) read_timeout: Timeout, 66 pub(crate) processor: T, 67 } 68 69 impl<T: BodyProcessor> BodyReader<T> { 70 /// Creates a new `BodyReader` with the given `Processor`. 71 /// 72 /// # Examples 73 /// 74 /// ``` 75 /// use ylong_http_client::sync_impl::{BodyReader, DefaultBodyProcessor}; 76 /// 77 /// let reader = BodyReader::new(DefaultBodyProcessor::new()); 78 /// ``` new(processor: T) -> Self79 pub fn new(processor: T) -> Self { 80 Self { 81 read_timeout: Timeout::none(), 82 processor, 83 } 84 } 85 86 /// Sets body read timeout. 87 /// 88 /// # Examples 89 /// 90 /// ``` 91 /// use ylong_http_client::sync_impl::{BodyReader, DefaultBodyProcessor}; 92 /// use ylong_http_client::util::Timeout; 93 /// 94 /// let reader = BodyReader::new(DefaultBodyProcessor::new()).read_timeout(Timeout::none()); 95 /// ``` read_timeout(mut self, timeout: Timeout) -> Self96 pub fn read_timeout(mut self, timeout: Timeout) -> Self { 97 self.read_timeout = timeout; 98 self 99 } 100 101 /// Reads all the body data. During the read process, 102 /// [`BodyProcessor::write`] and [`BodyProcessor::progress`] will be 103 /// called multiple times. 104 /// 105 /// [`BodyProcessor::write`]: BodyProcessor::write 106 /// [`BodyProcessor::progress`]: BodyProcessor::progress 107 /// 108 /// # Examples 109 /// 110 /// ``` 111 /// use ylong_http_client::sync_impl::{BodyProcessor, BodyReader}; 112 /// use ylong_http_client::TextBody; 113 /// 114 /// let mut body = TextBody::from_bytes(b"HelloWorld"); 115 /// let _ = BodyReader::default().read_all(&mut body); 116 /// ``` read_all<B: Body>(&mut self, body: &mut B) -> Result<(), HttpClientError>117 pub fn read_all<B: Body>(&mut self, body: &mut B) -> Result<(), HttpClientError> { 118 // Use buffers up to 16K in size to read body. 119 const TEMP_BUF_SIZE: usize = 16 * 1024; 120 121 let mut last = Instant::now(); 122 let mut buf = [0u8; TEMP_BUF_SIZE]; 123 let mut written = 0usize; 124 125 loop { 126 let read_len = body 127 .data(&mut buf) 128 .map_err(|e| HttpClientError::new_with_cause(ErrorKind::BodyDecode, Some(e)))?; 129 130 if read_len == 0 { 131 self.processor 132 .progress(written) 133 .map_err(|e| HttpClientError::new_with_cause(ErrorKind::BodyDecode, Some(e)))?; 134 break; 135 } 136 137 self.processor 138 .write(&buf[..read_len]) 139 .map_err(|e| HttpClientError::new_with_cause(ErrorKind::BodyDecode, Some(e)))?; 140 141 written += read_len; 142 143 let now = Instant::now(); 144 if now.duration_since(last) >= Duration::from_secs(1) { 145 self.processor 146 .progress(written) 147 .map_err(|e| HttpClientError::new_with_cause(ErrorKind::BodyDecode, Some(e)))?; 148 } 149 last = now; 150 } 151 Ok(()) 152 } 153 } 154 155 impl Default for BodyReader<DefaultBodyProcessor> { default() -> Self156 fn default() -> Self { 157 Self::new(DefaultBodyProcessor::new()) 158 } 159 } 160 161 /// The trait defines methods for processing bodies of HTTP messages. Unlike the 162 /// async version, this is for synchronous usage. 163 pub trait BodyProcessor { 164 /// Writes the body data read each time to the specified location. 165 /// 166 /// This method will be called every time a part of the body data is read. write(&mut self, data: &[u8]) -> Result<(), BodyProcessError>167 fn write(&mut self, data: &[u8]) -> Result<(), BodyProcessError>; 168 169 /// Informs users how many bytes have been written to the specified location 170 /// at this time. Users can display the progress according to the number of 171 /// bytes written. progress(&mut self, filled: usize) -> Result<(), BodyProcessError>172 fn progress(&mut self, filled: usize) -> Result<(), BodyProcessError>; 173 } 174 175 /// Error occurs when processing body data. 176 #[derive(Debug)] 177 pub struct BodyProcessError; 178 179 impl Display for BodyProcessError { fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result180 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { 181 Debug::fmt(self, f) 182 } 183 } 184 185 impl Error for BodyProcessError {} 186 187 /// A default body processor that write data to console directly. 188 pub struct DefaultBodyProcessor; 189 190 impl DefaultBodyProcessor { 191 /// Creates a new `DefaultBodyProcessor`. 192 /// 193 /// # Examples 194 /// 195 /// ``` 196 /// use ylong_http_client::sync_impl::DefaultBodyProcessor; 197 /// 198 /// let processor = DefaultBodyProcessor::new(); 199 /// ``` new() -> Self200 pub fn new() -> Self { 201 Self 202 } 203 } 204 205 impl BodyProcessor for DefaultBodyProcessor { write(&mut self, data: &[u8]) -> Result<(), BodyProcessError>206 fn write(&mut self, data: &[u8]) -> Result<(), BodyProcessError> { 207 println!("{data:?}"); 208 Ok(()) 209 } 210 progress(&mut self, filled: usize) -> Result<(), BodyProcessError>211 fn progress(&mut self, filled: usize) -> Result<(), BodyProcessError> { 212 println!("filled: {filled}"); 213 Ok(()) 214 } 215 } 216 217 impl Default for DefaultBodyProcessor { default() -> Self218 fn default() -> Self { 219 Self::new() 220 } 221 } 222 223 #[cfg(test)] 224 mod ut_syn_reader { 225 use ylong_http::body::TextBody; 226 227 use crate::sync_impl::{BodyReader, DefaultBodyProcessor}; 228 use crate::util::Timeout; 229 230 /// UT test cases for `BodyReader::read_timeout`. 231 /// 232 /// # Brief 233 /// 1. Creates a `BodyReader` with `DefaultBodyProcessor::default` by 234 /// calling `BodyReader::new`. 235 /// 2. Calls `read_timeout`. 236 /// 3. Checks if the result is correct. 237 #[test] ut_body_reader_read_timeout()238 fn ut_body_reader_read_timeout() { 239 let reader = BodyReader::new(DefaultBodyProcessor).read_timeout(Timeout::none()); 240 assert_eq!(reader.read_timeout, Timeout::none()); 241 } 242 243 /// UT test cases for `BodyReader::read_all`. 244 /// 245 /// # Brief 246 /// 1. Creates a `BodyReader` by calling `BodyReader::default`. 247 /// 2. Creates a `TextBody`. 248 /// 3. Calls `read_all` method. 249 /// 4. Checks if the result is corrent. 250 #[test] ut_body_reader_read_all()251 fn ut_body_reader_read_all() { 252 let mut body = TextBody::from_bytes(b"HelloWorld"); 253 let res = BodyReader::default().read_all(&mut body); 254 assert!(res.is_ok()); 255 } 256 } 257