1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use core::pin::Pin; 15 use core::task::{Context, Poll}; 16 use std::future::Future; 17 use std::io::{Cursor, Read}; 18 19 use ylong_http::body::TextBodyDecoder; 20 #[cfg(feature = "http1_1")] 21 use ylong_http::body::{ChunkBodyDecoder, ChunkState}; 22 use ylong_http::headers::Headers; 23 #[cfg(feature = "http1_1")] 24 use ylong_http::headers::{HeaderName, HeaderValue}; 25 26 use super::{Body, StreamData}; 27 use crate::error::{ErrorKind, HttpClientError}; 28 use crate::util::normalizer::BodyLength; 29 use crate::{AsyncRead, ReadBuf, Sleep}; 30 31 /// `HttpBody` is the body part of the `Response` returned by `Client::request`. 32 /// `HttpBody` implements `Body` trait, so users can call related methods to get 33 /// body data. 34 /// 35 /// # Examples 36 /// 37 /// ```no_run 38 /// use ylong_http_client::async_impl::{Body, Client, HttpBody}; 39 /// use ylong_http_client::{EmptyBody, Request}; 40 /// 41 /// async fn read_body() { 42 /// let client = Client::new(); 43 /// 44 /// // `HttpBody` is the body part of `response`. 45 /// let mut response = client.request(Request::new(EmptyBody)).await.unwrap(); 46 /// 47 /// // Users can use `Body::data` to get body data. 48 /// let mut buf = [0u8; 1024]; 49 /// loop { 50 /// let size = response.body_mut().data(&mut buf).await.unwrap(); 51 /// if size == 0 { 52 /// break; 53 /// } 54 /// let _data = &buf[..size]; 55 /// // Deals with the data. 56 /// } 57 /// } 58 /// ``` 59 pub struct HttpBody { 60 kind: Kind, 61 sleep: Option<Pin<Box<Sleep>>>, 62 } 63 64 type BoxStreamData = Box<dyn StreamData + Sync + Send + Unpin>; 65 66 impl HttpBody { new( body_length: BodyLength, io: BoxStreamData, pre: &[u8], ) -> Result<Self, HttpClientError>67 pub(crate) fn new( 68 body_length: BodyLength, 69 io: BoxStreamData, 70 pre: &[u8], 71 ) -> Result<Self, HttpClientError> { 72 let kind = match body_length { 73 BodyLength::Empty => { 74 if !pre.is_empty() { 75 // TODO: Consider the case where BodyLength is empty but pre is not empty. 76 io.shutdown(); 77 return Err(HttpClientError::new_with_message( 78 ErrorKind::Request, 79 "Body length is 0 but read extra data", 80 )); 81 } 82 Kind::Empty 83 } 84 BodyLength::Length(len) => Kind::Text(Text::new(len, pre, io)), 85 BodyLength::UntilClose => Kind::UntilClose(UntilClose::new(pre, io)), 86 87 #[cfg(feature = "http1_1")] 88 BodyLength::Chunk => Kind::Chunk(Chunk::new(pre, io)), 89 }; 90 Ok(Self { kind, sleep: None }) 91 } 92 93 #[cfg(feature = "http2")] empty() -> Self94 pub(crate) fn empty() -> Self { 95 Self { 96 kind: Kind::Empty, 97 sleep: None, 98 } 99 } 100 101 #[cfg(feature = "http2")] text(len: usize, pre: &[u8], io: BoxStreamData) -> Self102 pub(crate) fn text(len: usize, pre: &[u8], io: BoxStreamData) -> Self { 103 Self { 104 kind: Kind::Text(Text::new(len, pre, io)), 105 sleep: None, 106 } 107 } 108 set_sleep(&mut self, sleep: Option<Pin<Box<Sleep>>>)109 pub(crate) fn set_sleep(&mut self, sleep: Option<Pin<Box<Sleep>>>) { 110 self.sleep = sleep; 111 } 112 } 113 114 impl Body for HttpBody { 115 type Error = HttpClientError; 116 poll_data( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, Self::Error>>117 fn poll_data( 118 mut self: Pin<&mut Self>, 119 cx: &mut Context<'_>, 120 buf: &mut [u8], 121 ) -> Poll<Result<usize, Self::Error>> { 122 if buf.is_empty() { 123 return Poll::Ready(Ok(0)); 124 } 125 126 if let Some(delay) = self.sleep.as_mut() { 127 if let Poll::Ready(()) = Pin::new(delay).poll(cx) { 128 return Poll::Ready(Err(HttpClientError::new_with_message( 129 ErrorKind::Timeout, 130 "Request timeout", 131 ))); 132 } 133 } 134 match self.kind { 135 Kind::Empty => Poll::Ready(Ok(0)), 136 Kind::Text(ref mut text) => text.data(cx, buf), 137 Kind::UntilClose(ref mut until_close) => until_close.data(cx, buf), 138 #[cfg(feature = "http1_1")] 139 Kind::Chunk(ref mut chunk) => chunk.data(cx, buf), 140 } 141 } 142 trailer(&mut self) -> Result<Option<Headers>, Self::Error>143 fn trailer(&mut self) -> Result<Option<Headers>, Self::Error> { 144 match self.kind { 145 #[cfg(feature = "http1_1")] 146 Kind::Chunk(ref mut chunk) => chunk.get_trailer(), 147 _ => Ok(None), 148 } 149 } 150 } 151 152 impl Drop for HttpBody { drop(&mut self)153 fn drop(&mut self) { 154 let io = match self.kind { 155 Kind::Text(ref mut text) => text.io.as_mut(), 156 #[cfg(feature = "http1_1")] 157 Kind::Chunk(ref mut chunk) => chunk.io.as_mut(), 158 Kind::UntilClose(ref mut until_close) => until_close.io.as_mut(), 159 _ => None, 160 }; 161 // If response body is not totally read, shutdown io. 162 if let Some(io) = io { 163 io.shutdown() 164 } 165 } 166 } 167 168 // TODO: `TextBodyDecoder` implementation and `ChunkBodyDecoder` implementation. 169 enum Kind { 170 Empty, 171 Text(Text), 172 #[cfg(feature = "http1_1")] 173 Chunk(Chunk), 174 UntilClose(UntilClose), 175 } 176 177 struct UntilClose { 178 pre: Option<Cursor<Vec<u8>>>, 179 io: Option<BoxStreamData>, 180 } 181 182 impl UntilClose { new(pre: &[u8], io: BoxStreamData) -> Self183 pub(crate) fn new(pre: &[u8], io: BoxStreamData) -> Self { 184 Self { 185 pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())), 186 io: Some(io), 187 } 188 } 189 data( &mut self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>190 fn data( 191 &mut self, 192 cx: &mut Context<'_>, 193 buf: &mut [u8], 194 ) -> Poll<Result<usize, HttpClientError>> { 195 if buf.is_empty() { 196 return Poll::Ready(Ok(0)); 197 } 198 199 let mut read = 0; 200 201 if let Some(pre) = self.pre.as_mut() { 202 // Here cursor read never failed. 203 let this_read = Read::read(pre, buf).unwrap(); 204 if this_read == 0 { 205 self.pre = None; 206 } else { 207 read += this_read; 208 } 209 } 210 211 if !buf[read..].is_empty() { 212 if let Some(mut io) = self.io.take() { 213 let mut read_buf = ReadBuf::new(&mut buf[read..]); 214 match Pin::new(&mut io).poll_read(cx, &mut read_buf) { 215 // Disconnected. 216 Poll::Ready(Ok(())) => { 217 let filled = read_buf.filled().len(); 218 if filled == 0 { 219 io.shutdown(); 220 } else { 221 self.io = Some(io); 222 } 223 read += filled; 224 return Poll::Ready(Ok(read)); 225 } 226 Poll::Pending => { 227 self.io = Some(io); 228 if read != 0 { 229 return Poll::Ready(Ok(read)); 230 } 231 return Poll::Pending; 232 } 233 Poll::Ready(Err(e)) => { 234 // If IO error occurs, shutdowns `io` before return. 235 io.shutdown(); 236 return Poll::Ready(Err(HttpClientError::new_with_cause( 237 ErrorKind::BodyTransfer, 238 Some(e), 239 ))); 240 } 241 } 242 } 243 } 244 Poll::Ready(Ok(read)) 245 } 246 } 247 248 struct Text { 249 decoder: TextBodyDecoder, 250 pre: Option<Cursor<Vec<u8>>>, 251 io: Option<BoxStreamData>, 252 } 253 254 impl Text { new(len: usize, pre: &[u8], io: BoxStreamData) -> Self255 pub(crate) fn new(len: usize, pre: &[u8], io: BoxStreamData) -> Self { 256 Self { 257 decoder: TextBodyDecoder::new(len), 258 pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())), 259 io: Some(io), 260 } 261 } 262 } 263 264 impl Text { data( &mut self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>265 fn data( 266 &mut self, 267 cx: &mut Context<'_>, 268 buf: &mut [u8], 269 ) -> Poll<Result<usize, HttpClientError>> { 270 if buf.is_empty() { 271 return Poll::Ready(Ok(0)); 272 } 273 274 let mut read = 0; 275 276 if let Some(pre) = self.pre.as_mut() { 277 // Here cursor read never failed. 278 let this_read = Read::read(pre, buf).unwrap(); 279 if this_read == 0 { 280 self.pre = None; 281 } else { 282 read += this_read; 283 let (text, rem) = self.decoder.decode(&buf[..read]); 284 285 // Contains redundant `rem`, return error. 286 match (text.is_complete(), rem.is_empty()) { 287 (true, false) => { 288 if let Some(io) = self.io.take() { 289 io.shutdown(); 290 }; 291 return Poll::Ready(Err(HttpClientError::new_with_message( 292 ErrorKind::BodyDecode, 293 "Not Eof", 294 ))); 295 } 296 (true, true) => { 297 self.io = None; 298 return Poll::Ready(Ok(read)); 299 } 300 // TextBodyDecoder decodes as much as possible here. 301 _ => {} 302 } 303 } 304 } 305 306 if !buf[read..].is_empty() { 307 if let Some(mut io) = self.io.take() { 308 let mut read_buf = ReadBuf::new(&mut buf[read..]); 309 match Pin::new(&mut io).poll_read(cx, &mut read_buf) { 310 // Disconnected. 311 Poll::Ready(Ok(())) => { 312 let filled = read_buf.filled().len(); 313 if filled == 0 { 314 io.shutdown(); 315 return Poll::Ready(Err(HttpClientError::new_with_message( 316 ErrorKind::BodyDecode, 317 "Response Body Incomplete", 318 ))); 319 } 320 let (text, rem) = self.decoder.decode(read_buf.filled()); 321 read += filled; 322 // Contains redundant `rem`, return error. 323 match (text.is_complete(), rem.is_empty()) { 324 (true, false) => { 325 io.shutdown(); 326 return Poll::Ready(Err(HttpClientError::new_with_message( 327 ErrorKind::BodyDecode, 328 "Not Eof", 329 ))); 330 } 331 (true, true) => return Poll::Ready(Ok(read)), 332 _ => {} 333 } 334 self.io = Some(io); 335 } 336 Poll::Pending => { 337 self.io = Some(io); 338 if read != 0 { 339 return Poll::Ready(Ok(read)); 340 } 341 return Poll::Pending; 342 } 343 Poll::Ready(Err(e)) => { 344 // If IO error occurs, shutdowns `io` before return. 345 io.shutdown(); 346 return Poll::Ready(Err(HttpClientError::new_with_cause( 347 ErrorKind::BodyTransfer, 348 Some(e), 349 ))); 350 } 351 } 352 } 353 } 354 Poll::Ready(Ok(read)) 355 } 356 } 357 358 #[cfg(feature = "http1_1")] 359 struct Chunk { 360 decoder: ChunkBodyDecoder, 361 pre: Option<Cursor<Vec<u8>>>, 362 io: Option<BoxStreamData>, 363 trailer: Vec<u8>, 364 } 365 366 #[cfg(feature = "http1_1")] 367 impl Chunk { new(pre: &[u8], io: BoxStreamData) -> Self368 pub(crate) fn new(pre: &[u8], io: BoxStreamData) -> Self { 369 Self { 370 decoder: ChunkBodyDecoder::new().contains_trailer(false), 371 pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())), 372 io: Some(io), 373 trailer: vec![], 374 } 375 } 376 } 377 378 #[cfg(feature = "http1_1")] 379 impl Chunk { data( &mut self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>380 fn data( 381 &mut self, 382 cx: &mut Context<'_>, 383 buf: &mut [u8], 384 ) -> Poll<Result<usize, HttpClientError>> { 385 if buf.is_empty() { 386 return Poll::Ready(Ok(0)); 387 } 388 389 let mut read = 0; 390 391 while let Some(pre) = self.pre.as_mut() { 392 // Here cursor read never failed. 393 let size = Read::read(pre, &mut buf[read..]).unwrap(); 394 if size == 0 { 395 self.pre = None; 396 } 397 398 let (size, flag) = self.merge_chunks(&mut buf[read..read + size])?; 399 read += size; 400 401 if flag { 402 // Return if we find a 0-sized chunk. 403 self.io = None; 404 return Poll::Ready(Ok(read)); 405 } else if read != 0 { 406 // Return if we get some data. 407 return Poll::Ready(Ok(read)); 408 } 409 } 410 411 // Here `read` must be 0. 412 while let Some(mut io) = self.io.take() { 413 let mut read_buf = ReadBuf::new(&mut buf[read..]); 414 match Pin::new(&mut io).poll_read(cx, &mut read_buf) { 415 Poll::Ready(Ok(())) => { 416 let filled = read_buf.filled().len(); 417 if filled == 0 { 418 io.shutdown(); 419 return Poll::Ready(Err(HttpClientError::new_with_message( 420 ErrorKind::BodyTransfer, 421 "Response Body Incomplete", 422 ))); 423 } 424 let (size, flag) = self.merge_chunks(read_buf.filled_mut())?; 425 read += size; 426 if flag { 427 // Return if we find a 0-sized chunk. 428 // Return if we get some data. 429 return Poll::Ready(Ok(read)); 430 } 431 self.io = Some(io); 432 if read != 0 { 433 return Poll::Ready(Ok(read)); 434 } 435 } 436 Poll::Pending => { 437 self.io = Some(io); 438 return Poll::Pending; 439 } 440 Poll::Ready(Err(e)) => { 441 // If IO error occurs, shutdowns `io` before return. 442 io.shutdown(); 443 return Poll::Ready(Err(HttpClientError::new_with_cause( 444 ErrorKind::BodyTransfer, 445 Some(e), 446 ))); 447 } 448 } 449 } 450 451 Poll::Ready(Ok(read)) 452 } 453 merge_chunks(&mut self, buf: &mut [u8]) -> Result<(usize, bool), HttpClientError>454 fn merge_chunks(&mut self, buf: &mut [u8]) -> Result<(usize, bool), HttpClientError> { 455 // Here we need to merge the chunks into one data block and return. 456 // The data arrangement in buf is as follows: 457 // 458 // data in buf: 459 // +------+------+------+------+------+------+------+ 460 // | data | len | data | len | ... | data | len | 461 // +------+------+------+------+------+------+------+ 462 // 463 // We need to merge these data blocks into one block: 464 // 465 // after merge: 466 // +---------------------------+ 467 // | data | 468 // +---------------------------+ 469 470 let (chunks, junk) = self 471 .decoder 472 .decode(buf) 473 .map_err(|e| HttpClientError::new_with_cause(ErrorKind::BodyDecode, Some(e)))?; 474 475 let mut finished = false; 476 let mut ptrs = Vec::new(); 477 for chunk in chunks.into_iter() { 478 if chunk.trailer().is_some() { 479 if chunk.state() == &ChunkState::Finish { 480 finished = true; 481 self.trailer.extend_from_slice(chunk.trailer().unwrap()); 482 self.trailer.extend_from_slice(b"\r\n"); 483 break; 484 } else if chunk.state() == &ChunkState::DataCrlf { 485 self.trailer.extend_from_slice(chunk.trailer().unwrap()); 486 self.trailer.extend_from_slice(b"\r\n"); 487 } else { 488 self.trailer.extend_from_slice(chunk.trailer().unwrap()); 489 } 490 } else { 491 if chunk.size() == 0 && chunk.state() != &ChunkState::MetaSize { 492 finished = true; 493 break; 494 } 495 let data = chunk.data(); 496 ptrs.push((data.as_ptr(), data.len())) 497 } 498 } 499 500 if finished && !junk.is_empty() { 501 return Err(HttpClientError::new_with_message( 502 ErrorKind::BodyDecode, 503 "Invalid Chunk Body", 504 )); 505 } 506 507 let start = buf.as_ptr(); 508 509 let mut idx = 0; 510 for (ptr, len) in ptrs.into_iter() { 511 let st = ptr as usize - start as usize; 512 let ed = st + len; 513 buf.copy_within(st..ed, idx); 514 idx += len; 515 } 516 Ok((idx, finished)) 517 } 518 get_trailer(&self) -> Result<Option<Headers>, HttpClientError>519 fn get_trailer(&self) -> Result<Option<Headers>, HttpClientError> { 520 if self.trailer.is_empty() { 521 return Err(HttpClientError::new_with_message( 522 ErrorKind::BodyDecode, 523 "No trailer received", 524 )); 525 } 526 527 let mut colon = 0; 528 let mut lf = 0; 529 let mut trailer_header_name = HeaderName::from_bytes(b"") 530 .map_err(|e| HttpClientError::new_with_cause(ErrorKind::BodyDecode, Some(e)))?; 531 let mut trailer_headers = Headers::new(); 532 for (i, b) in self.trailer.iter().enumerate() { 533 if *b == b' ' { 534 continue; 535 } 536 if *b == b':' { 537 colon = i; 538 if lf == 0 { 539 let trailer_name = &self.trailer[..colon]; 540 trailer_header_name = HeaderName::from_bytes(trailer_name).map_err(|e| { 541 HttpClientError::new_with_cause(ErrorKind::BodyDecode, Some(e)) 542 })?; 543 } else { 544 let trailer_name = &self.trailer[lf + 1..colon]; 545 trailer_header_name = HeaderName::from_bytes(trailer_name).map_err(|e| { 546 HttpClientError::new_with_cause(ErrorKind::BodyDecode, Some(e)) 547 })?; 548 } 549 continue; 550 } 551 552 if *b == b'\n' { 553 lf = i; 554 let trailer_value = &self.trailer[colon + 1..lf - 1]; 555 let trailer_header_value = HeaderValue::from_bytes(trailer_value) 556 .map_err(|e| HttpClientError::new_with_cause(ErrorKind::BodyDecode, Some(e)))?; 557 let _ = trailer_headers 558 .insert::<HeaderName, HeaderValue>( 559 trailer_header_name.clone(), 560 trailer_header_value.clone(), 561 ) 562 .map_err(|e| HttpClientError::new_with_cause(ErrorKind::BodyDecode, Some(e)))?; 563 } 564 } 565 Ok(Some(trailer_headers)) 566 } 567 } 568 569 #[cfg(test)] 570 mod ut_async_http_body { 571 use ylong_http::body::ChunkBodyDecoder; 572 573 use crate::async_impl::http_body::Chunk; 574 575 /// UT test cases for `Chunk::get_trailers`. 576 /// 577 /// # Brief 578 /// 1. Creates a `Chunk` and set `Trailer`. 579 /// 2. Calls `get_trailer` method. 580 /// 3. Checks if the result is correct. 581 #[test] ut_http_body_chunk()582 fn ut_http_body_chunk() { 583 let mut chunk = Chunk { 584 decoder: ChunkBodyDecoder::new().contains_trailer(true), 585 pre: None, 586 io: None, 587 trailer: vec![], 588 }; 589 let trailer_info = "Trailer1:value1\r\nTrailer2:value2\r\n"; 590 chunk.trailer.extend_from_slice(trailer_info.as_bytes()); 591 let data = chunk.get_trailer().unwrap().unwrap(); 592 let value1 = data.get("Trailer1"); 593 assert_eq!(value1.unwrap().to_str().unwrap(), "value1"); 594 let value2 = data.get("Trailer2"); 595 assert_eq!(value2.unwrap().to_str().unwrap(), "value2"); 596 } 597 } 598