1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::io::{Cursor, Read}; 15 16 use ylong_http::body::{ChunkBodyDecoder, ChunkState, TextBodyDecoder}; 17 use ylong_http::headers::Headers; 18 19 use super::Body; 20 use crate::error::{ErrorKind, HttpClientError}; 21 use crate::sync_impl::conn::StreamData; 22 23 /// `HttpBody` is the body part of the `Response` returned by `Client::request`. 24 /// `HttpBody` implements `Body` trait, so users can call related methods to get 25 /// body data. 26 /// 27 /// # Examples 28 /// 29 /// ```no_run 30 /// use ylong_http_client::sync_impl::{Body, Client, HttpBody}; 31 /// use ylong_http_client::{EmptyBody, Request}; 32 /// 33 /// let mut client = Client::new(); 34 /// 35 /// // `HttpBody` is the body part of `response`. 36 /// let mut response = client.request(Request::new(EmptyBody)).unwrap(); 37 /// 38 /// // Users can use `Body::data` to get body data. 39 /// let mut buf = [0u8; 1024]; 40 /// loop { 41 /// let size = response.body_mut().data(&mut buf).unwrap(); 42 /// if size == 0 { 43 /// break; 44 /// } 45 /// let _data = &buf[..size]; 46 /// // Deals with the data. 47 /// } 48 /// ``` 49 pub struct HttpBody { 50 kind: Kind, 51 } 52 53 type BoxStreamData = Box<dyn StreamData>; 54 55 impl HttpBody { empty() -> Self56 pub(crate) fn empty() -> Self { 57 Self { kind: Kind::Empty } 58 } 59 text(len: usize, pre: &[u8], io: BoxStreamData) -> Self60 pub(crate) fn text(len: usize, pre: &[u8], io: BoxStreamData) -> Self { 61 Self { 62 kind: Kind::Text(Text::new(len, pre, io)), 63 } 64 } 65 chunk(pre: &[u8], io: BoxStreamData, is_trailer: bool) -> Self66 pub(crate) fn chunk(pre: &[u8], io: BoxStreamData, is_trailer: bool) -> Self { 67 Self { 68 kind: Kind::Chunk(Chunk::new(pre, io, is_trailer)), 69 } 70 } 71 } 72 73 // TODO: `TextBodyDecoder` implementation and `ChunkBodyDecoder` implementation. 74 enum Kind { 75 Empty, 76 Text(Text), 77 Chunk(Chunk), 78 } 79 80 struct Text { 81 decoder: TextBodyDecoder, 82 pre: Option<Cursor<Vec<u8>>>, 83 io: Option<BoxStreamData>, 84 } 85 86 impl Text { new(len: usize, pre: &[u8], io: BoxStreamData) -> Self87 pub(crate) fn new(len: usize, pre: &[u8], io: BoxStreamData) -> Self { 88 Self { 89 decoder: TextBodyDecoder::new(len), 90 pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())), 91 io: Some(io), 92 } 93 } 94 } 95 96 impl Body for HttpBody { 97 type Error = HttpClientError; 98 data(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error>99 fn data(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> { 100 if buf.is_empty() { 101 return Ok(0); 102 } 103 104 match self.kind { 105 Kind::Empty => Ok(0), 106 Kind::Text(ref mut text) => text.data(buf), 107 Kind::Chunk(ref mut chunk) => chunk.data(buf), 108 } 109 } 110 trailer(&mut self) -> Result<Option<Headers>, Self::Error>111 fn trailer(&mut self) -> Result<Option<Headers>, Self::Error> { 112 match self.kind { 113 Kind::Chunk(ref mut chunk) => chunk.decoder.get_trailer().map_err(|_| { 114 HttpClientError::new_with_message(ErrorKind::BodyDecode, "Get trailer failed") 115 }), 116 _ => Ok(None), 117 } 118 } 119 } 120 121 impl Text { data(&mut self, buf: &mut [u8]) -> Result<usize, HttpClientError>122 fn data(&mut self, buf: &mut [u8]) -> Result<usize, HttpClientError> { 123 if buf.is_empty() { 124 return Ok(0); 125 } 126 127 let mut read = 0; 128 129 if let Some(pre) = self.pre.as_mut() { 130 // Here cursor read never failed. 131 let this_read = pre.read(buf).unwrap(); 132 if this_read == 0 { 133 self.pre = None; 134 } else { 135 read += this_read; 136 let (text, rem) = self.decoder.decode(&buf[..read]); 137 138 match (text.is_complete(), rem.is_empty()) { 139 (true, false) => { 140 if let Some(io) = self.io.take() { 141 io.shutdown(); 142 }; 143 return Err(HttpClientError::new_with_message( 144 ErrorKind::BodyDecode, 145 "Not Eof", 146 )); 147 } 148 (true, true) => { 149 self.io = None; 150 return Ok(read); 151 } 152 _ => {} 153 } 154 } 155 } 156 157 if !buf[read..].is_empty() { 158 if let Some(mut io) = self.io.take() { 159 match io.read(&mut buf[read..]) { 160 // Disconnected. 161 Ok(0) => { 162 io.shutdown(); 163 return Err(HttpClientError::new_with_message( 164 ErrorKind::BodyDecode, 165 "Response Body Incomplete", 166 )); 167 } 168 Ok(filled) => { 169 let (text, rem) = self.decoder.decode(&buf[read..read + filled]); 170 read += filled; 171 // Contains redundant `rem`, return error. 172 match (text.is_complete(), rem.is_empty()) { 173 (true, false) => { 174 io.shutdown(); 175 return Err(HttpClientError::new_with_message( 176 ErrorKind::BodyDecode, 177 "Not Eof", 178 )); 179 } 180 (true, true) => return Ok(read), 181 _ => {} 182 } 183 self.io = Some(io); 184 } 185 Err(e) => { 186 return Err(HttpClientError::new_with_cause( 187 ErrorKind::BodyTransfer, 188 Some(e), 189 )) 190 } 191 } 192 } 193 } 194 Ok(read) 195 } 196 } 197 198 struct Chunk { 199 decoder: ChunkBodyDecoder, 200 pre: Option<Cursor<Vec<u8>>>, 201 io: Option<BoxStreamData>, 202 } 203 204 impl Chunk { new(pre: &[u8], io: BoxStreamData, is_trailer: bool) -> Self205 pub(crate) fn new(pre: &[u8], io: BoxStreamData, is_trailer: bool) -> Self { 206 Self { 207 decoder: ChunkBodyDecoder::new().contains_trailer(is_trailer), 208 pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())), 209 io: Some(io), 210 } 211 } 212 } 213 214 impl Chunk { data(&mut self, buf: &mut [u8]) -> Result<usize, HttpClientError>215 fn data(&mut self, buf: &mut [u8]) -> Result<usize, HttpClientError> { 216 if buf.is_empty() { 217 return Ok(0); 218 } 219 220 let mut read = 0; 221 222 while let Some(pre) = self.pre.as_mut() { 223 // Here cursor read never failed. 224 let size = pre.read(&mut buf[read..]).unwrap(); 225 226 if size == 0 { 227 self.pre = None; 228 } 229 230 let (size, flag) = self.merge_chunks(&mut buf[read..read + size])?; 231 read += size; 232 233 if flag { 234 // Return if we find a 0-sized chunk. 235 self.io = None; 236 return Ok(read); 237 } else if read != 0 { 238 // Return if we get some data. 239 return Ok(read); 240 } 241 } 242 243 // Here `read` must be 0. 244 while let Some(mut io) = self.io.take() { 245 match io.read(&mut buf[read..]) { 246 Ok(filled) => { 247 if filled == 0 { 248 io.shutdown(); 249 return Err(HttpClientError::new_with_message( 250 ErrorKind::BodyDecode, 251 "Response Body Incomplete", 252 )); 253 } 254 let (size, flag) = self.merge_chunks(&mut buf[read..read + filled])?; 255 read += size; 256 if flag { 257 // Return if we find a 0-sized chunk. 258 // Return if we get some data. 259 return Ok(read); 260 } 261 self.io = Some(io); 262 if read != 0 { 263 return Ok(read); 264 } 265 } 266 Err(e) => { 267 return Err(HttpClientError::new_with_cause( 268 ErrorKind::BodyTransfer, 269 Some(e), 270 )) 271 } 272 } 273 } 274 Ok(read) 275 } 276 merge_chunks(&mut self, buf: &mut [u8]) -> Result<(usize, bool), HttpClientError>277 fn merge_chunks(&mut self, buf: &mut [u8]) -> Result<(usize, bool), HttpClientError> { 278 // Here we need to merge the chunks into one data block and return. 279 // The data arrangement in buf is as follows: 280 // 281 // data in buf: 282 // +------+------+------+------+------+------+------+ 283 // | data | len | data | len | ... | data | len | 284 // +------+------+------+------+------+------+------+ 285 // 286 // We need to merge these data blocks into one block: 287 // 288 // after merge: 289 // +---------------------------+ 290 // | data | 291 // +---------------------------+ 292 293 let (chunks, junk) = self 294 .decoder 295 .decode(buf) 296 .map_err(|e| HttpClientError::new_with_cause(ErrorKind::BodyDecode, Some(e)))?; 297 298 let mut finished = false; 299 let mut ptrs = Vec::new(); 300 for chunk in chunks.into_iter() { 301 if chunk.trailer().is_some() { 302 if chunk.state() == &ChunkState::Finish { 303 finished = true; 304 } 305 } else { 306 if chunk.size() == 0 && chunk.state() != &ChunkState::MetaSize { 307 finished = true; 308 break; 309 } 310 let data = chunk.data(); 311 ptrs.push((data.as_ptr(), data.len())) 312 } 313 } 314 315 if finished && !junk.is_empty() { 316 return Err(HttpClientError::new_with_message( 317 ErrorKind::BodyDecode, 318 "Invalid Chunk Body", 319 )); 320 } 321 322 let start = buf.as_ptr(); 323 324 let mut idx = 0; 325 for (ptr, len) in ptrs.into_iter() { 326 let st = ptr as usize - start as usize; 327 let ed = st + len; 328 buf.copy_within(st..ed, idx); 329 idx += len; 330 } 331 Ok((idx, finished)) 332 } 333 } 334 335 #[cfg(test)] 336 mod ut_syn_http_body { 337 use crate::sync_impl::{Body, HttpBody}; 338 339 /// UT test cases for `HttpBody::empty`. 340 /// 341 /// # Brief 342 /// 1. Creates a `HttpBody` by calling `HttpBody::empty`. 343 /// 2. Calls `data` method. 344 /// 3. Checks if the result is correct. 345 #[test] ut_http_body_empty()346 fn ut_http_body_empty() { 347 let mut body = HttpBody::empty(); 348 let mut buf = []; 349 let data = body.data(&mut buf); 350 assert!(data.is_ok()); 351 assert_eq!(data.unwrap(), 0); 352 } 353 } 354