1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use core::pin::Pin; 15 use core::task::{Context, Poll}; 16 use std::future::Future; 17 use std::io::{Cursor, Read}; 18 use std::sync::Arc; 19 20 use ylong_http::body::async_impl::Body; 21 use ylong_http::body::TextBodyDecoder; 22 #[cfg(feature = "http1_1")] 23 use ylong_http::body::{ChunkBodyDecoder, ChunkState}; 24 use ylong_http::headers::Headers; 25 26 use super::conn::StreamData; 27 use crate::error::{ErrorKind, HttpClientError}; 28 use crate::runtime::{AsyncRead, ReadBuf, Sleep}; 29 use crate::util::config::HttpVersion; 30 use crate::util::interceptor::Interceptors; 31 use crate::util::normalizer::BodyLength; 32 33 const TRAILER_SIZE: usize = 1024; 34 35 /// `HttpBody` is the body part of the `Response` returned by `Client::request`. 36 /// `HttpBody` implements `Body` trait, so users can call related methods to get 37 /// body data. 38 /// 39 /// # Examples 40 /// 41 /// ```no_run 42 /// use ylong_http_client::async_impl::{Body, Client, HttpBody, Request}; 43 /// use ylong_http_client::HttpClientError; 44 /// 45 /// async fn read_body() -> Result<(), HttpClientError> { 46 /// let client = Client::new(); 47 /// 48 /// // `HttpBody` is the body part of `response`. 49 /// let mut response = client 50 /// .request(Request::builder().body(Body::empty())?) 51 /// .await?; 52 /// 53 /// // Users can use `Body::data` to get body data. 54 /// let mut buf = [0u8; 1024]; 55 /// loop { 56 /// let size = response.data(&mut buf).await.unwrap(); 57 /// if size == 0 { 58 /// break; 59 /// } 60 /// let _data = &buf[..size]; 61 /// // Deals with the data. 62 /// } 63 /// Ok(()) 64 /// } 65 /// ``` 66 pub struct HttpBody { 67 kind: Kind, 68 request_timeout: Option<Pin<Box<Sleep>>>, 69 total_timeout: Option<Pin<Box<Sleep>>>, 70 } 71 72 type BoxStreamData = Box<dyn StreamData + Sync + Send + Unpin>; 73 74 impl HttpBody { new( interceptors: Arc<Interceptors>, body_length: BodyLength, io: BoxStreamData, pre: &[u8], ) -> Result<Self, HttpClientError>75 pub(crate) fn new( 76 interceptors: Arc<Interceptors>, 77 body_length: BodyLength, 78 io: BoxStreamData, 79 pre: &[u8], 80 ) -> Result<Self, HttpClientError> { 81 let kind = match body_length { 82 BodyLength::Empty => { 83 if !pre.is_empty() { 84 // TODO: Consider the case where BodyLength is empty but pre is not empty. 85 io.shutdown(); 86 return err_from_msg!(Request, "Body length is 0 but read extra data"); 87 } 88 Kind::Empty 89 } 90 BodyLength::Length(len) => Kind::Text(Text::new(len, pre, io, interceptors)), 91 BodyLength::UntilClose => Kind::UntilClose(UntilClose::new(pre, io, interceptors)), 92 93 #[cfg(feature = "http1_1")] 94 BodyLength::Chunk => Kind::Chunk(Chunk::new(pre, io, interceptors)), 95 }; 96 Ok(Self { 97 kind, 98 request_timeout: None, 99 total_timeout: None, 100 }) 101 } 102 set_request_sleep(&mut self, sleep: Option<Pin<Box<Sleep>>>)103 pub(crate) fn set_request_sleep(&mut self, sleep: Option<Pin<Box<Sleep>>>) { 104 self.request_timeout = sleep; 105 } 106 set_total_sleep(&mut self, sleep: Option<Pin<Box<Sleep>>>)107 pub(crate) fn set_total_sleep(&mut self, sleep: Option<Pin<Box<Sleep>>>) { 108 self.total_timeout = sleep; 109 } 110 } 111 112 impl Body for HttpBody { 113 type Error = HttpClientError; 114 poll_data( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, Self::Error>>115 fn poll_data( 116 mut self: Pin<&mut Self>, 117 cx: &mut Context<'_>, 118 buf: &mut [u8], 119 ) -> Poll<Result<usize, Self::Error>> { 120 if buf.is_empty() { 121 return Poll::Ready(Ok(0)); 122 } 123 124 if let Some(delay) = self.request_timeout.as_mut() { 125 if let Poll::Ready(()) = Pin::new(delay).poll(cx) { 126 return Poll::Ready(err_from_io!(Timeout, std::io::ErrorKind::TimedOut.into())); 127 } 128 } 129 130 if let Some(delay) = self.total_timeout.as_mut() { 131 if let Poll::Ready(()) = Pin::new(delay).poll(cx) { 132 return Poll::Ready(err_from_io!(Timeout, std::io::ErrorKind::TimedOut.into())); 133 } 134 } 135 136 match self.kind { 137 Kind::Empty => Poll::Ready(Ok(0)), 138 Kind::Text(ref mut text) => text.data(cx, buf), 139 Kind::UntilClose(ref mut until_close) => until_close.data(cx, buf), 140 #[cfg(feature = "http1_1")] 141 Kind::Chunk(ref mut chunk) => chunk.data(cx, buf), 142 } 143 } 144 poll_trailer( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<Option<Headers>, Self::Error>>145 fn poll_trailer( 146 mut self: Pin<&mut Self>, 147 cx: &mut Context<'_>, 148 ) -> Poll<Result<Option<Headers>, Self::Error>> { 149 // Get trailer data from io 150 if let Some(delay) = self.request_timeout.as_mut() { 151 if let Poll::Ready(()) = Pin::new(delay).poll(cx) { 152 return Poll::Ready(err_from_msg!(Timeout, "Request timeout")); 153 } 154 } 155 156 if let Some(delay) = self.total_timeout.as_mut() { 157 if let Poll::Ready(()) = Pin::new(delay).poll(cx) { 158 return Poll::Ready(err_from_msg!(Timeout, "Request timeout")); 159 } 160 } 161 162 let mut read_buf = [0_u8; TRAILER_SIZE]; 163 164 match self.kind { 165 #[cfg(feature = "http1_1")] 166 Kind::Chunk(ref mut chunk) => { 167 match chunk.data(cx, &mut read_buf) { 168 Poll::Ready(Ok(_)) => {} 169 Poll::Pending => { 170 return Poll::Pending; 171 } 172 Poll::Ready(Err(e)) => { 173 return Poll::Ready(Err(e)); 174 } 175 } 176 Poll::Ready(Ok(chunk.decoder.get_trailer().map_err(|e| { 177 HttpClientError::from_error(ErrorKind::BodyDecode, e) 178 })?)) 179 } 180 _ => Poll::Ready(Ok(None)), 181 } 182 } 183 } 184 185 impl Drop for HttpBody { drop(&mut self)186 fn drop(&mut self) { 187 let io = match self.kind { 188 Kind::Text(ref mut text) => text.io.as_mut(), 189 #[cfg(feature = "http1_1")] 190 Kind::Chunk(ref mut chunk) => chunk.io.as_mut(), 191 Kind::UntilClose(ref mut until_close) => until_close.io.as_mut(), 192 _ => None, 193 }; 194 // If response body is not totally read, shutdown io. 195 if let Some(io) = io { 196 if io.http_version() == HttpVersion::Http1 { 197 io.shutdown() 198 } 199 } 200 } 201 } 202 203 // TODO: `TextBodyDecoder` implementation and `ChunkBodyDecoder` implementation. 204 enum Kind { 205 Empty, 206 Text(Text), 207 #[cfg(feature = "http1_1")] 208 Chunk(Chunk), 209 UntilClose(UntilClose), 210 } 211 212 struct UntilClose { 213 interceptors: Arc<Interceptors>, 214 pre: Option<Cursor<Vec<u8>>>, 215 io: Option<BoxStreamData>, 216 } 217 218 impl UntilClose { new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self219 pub(crate) fn new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self { 220 Self { 221 interceptors, 222 pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())), 223 io: Some(io), 224 } 225 } 226 data( &mut self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>227 fn data( 228 &mut self, 229 cx: &mut Context<'_>, 230 buf: &mut [u8], 231 ) -> Poll<Result<usize, HttpClientError>> { 232 if buf.is_empty() { 233 return Poll::Ready(Ok(0)); 234 } 235 let mut read = 0; 236 if let Some(pre) = self.pre.as_mut() { 237 // Here cursor read never failed. 238 let this_read = Read::read(pre, buf).unwrap(); 239 if this_read == 0 { 240 self.pre = None; 241 } else { 242 read += this_read; 243 } 244 } 245 246 if !buf[read..].is_empty() { 247 if let Some(io) = self.io.take() { 248 return self.poll_read_io(cx, io, read, buf); 249 } 250 } 251 Poll::Ready(Ok(read)) 252 } 253 poll_read_io( &mut self, cx: &mut Context<'_>, mut io: BoxStreamData, read: usize, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>254 fn poll_read_io( 255 &mut self, 256 cx: &mut Context<'_>, 257 mut io: BoxStreamData, 258 read: usize, 259 buf: &mut [u8], 260 ) -> Poll<Result<usize, HttpClientError>> { 261 let mut read = read; 262 let mut read_buf = ReadBuf::new(&mut buf[read..]); 263 match Pin::new(&mut io).poll_read(cx, &mut read_buf) { 264 Poll::Ready(Ok(())) => { 265 let filled = read_buf.filled().len(); 266 if filled == 0 { 267 // Stream closed, and get the fin. 268 if io.is_stream_closable() { 269 return Poll::Ready(Ok(0)); 270 } 271 // Disconnected for http1. 272 io.shutdown(); 273 } else { 274 self.interceptors 275 .intercept_output(&buf[read..(read + filled)])?; 276 self.io = Some(io); 277 } 278 read += filled; 279 Poll::Ready(Ok(read)) 280 } 281 Poll::Pending => { 282 self.io = Some(io); 283 if read != 0 { 284 return Poll::Ready(Ok(read)); 285 } 286 Poll::Pending 287 } 288 Poll::Ready(Err(e)) => { 289 // If IO error occurs, shutdowns `io` before return. 290 io.shutdown(); 291 Poll::Ready(err_from_io!(BodyTransfer, e)) 292 } 293 } 294 } 295 } 296 297 struct Text { 298 interceptors: Arc<Interceptors>, 299 decoder: TextBodyDecoder, 300 pre: Option<Cursor<Vec<u8>>>, 301 io: Option<BoxStreamData>, 302 } 303 304 impl Text { new( len: u64, pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>, ) -> Self305 pub(crate) fn new( 306 len: u64, 307 pre: &[u8], 308 io: BoxStreamData, 309 interceptors: Arc<Interceptors>, 310 ) -> Self { 311 Self { 312 interceptors, 313 decoder: TextBodyDecoder::new(len), 314 pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())), 315 io: Some(io), 316 } 317 } 318 } 319 320 impl Text { data( &mut self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>321 fn data( 322 &mut self, 323 cx: &mut Context<'_>, 324 buf: &mut [u8], 325 ) -> Poll<Result<usize, HttpClientError>> { 326 if buf.is_empty() { 327 return Poll::Ready(Ok(0)); 328 } 329 330 let mut read = 0; 331 332 if let Some(pre) = self.pre.as_mut() { 333 // Here cursor read never failed. 334 let this_read = Read::read(pre, buf).unwrap(); 335 if this_read == 0 { 336 self.pre = None; 337 } else { 338 read += this_read; 339 if let Some(result) = self.read_remaining(buf, read) { 340 return result; 341 } 342 } 343 } 344 345 if !buf[read..].is_empty() { 346 if let Some(io) = self.io.take() { 347 return self.poll_read_io(cx, buf, io, read); 348 } 349 } 350 Poll::Ready(Ok(read)) 351 } 352 read_remaining( &mut self, buf: &mut [u8], read: usize, ) -> Option<Poll<Result<usize, HttpClientError>>>353 fn read_remaining( 354 &mut self, 355 buf: &mut [u8], 356 read: usize, 357 ) -> Option<Poll<Result<usize, HttpClientError>>> { 358 let (text, rem) = self.decoder.decode(&buf[..read]); 359 360 // Contains redundant `rem`, return error. 361 match (text.is_complete(), rem.is_empty()) { 362 (true, false) => { 363 if let Some(io) = self.io.take() { 364 io.shutdown(); 365 }; 366 Some(Poll::Ready(err_from_msg!(BodyDecode, "Not eof"))) 367 } 368 (true, true) => { 369 if let Some(io) = self.io.take() { 370 // stream not closed, waiting for the fin 371 if !io.is_stream_closable() { 372 self.io = Some(io); 373 } 374 } 375 Some(Poll::Ready(Ok(read))) 376 } 377 // TextBodyDecoder decodes as much as possible here. 378 _ => None, 379 } 380 } 381 poll_read_io( &mut self, cx: &mut Context<'_>, buf: &mut [u8], mut io: BoxStreamData, read: usize, ) -> Poll<Result<usize, HttpClientError>>382 fn poll_read_io( 383 &mut self, 384 cx: &mut Context<'_>, 385 buf: &mut [u8], 386 mut io: BoxStreamData, 387 read: usize, 388 ) -> Poll<Result<usize, HttpClientError>> { 389 let mut read = read; 390 let mut read_buf = ReadBuf::new(&mut buf[read..]); 391 match Pin::new(&mut io).poll_read(cx, &mut read_buf) { 392 // Disconnected. 393 Poll::Ready(Ok(())) => { 394 let filled = read_buf.filled().len(); 395 if filled == 0 { 396 // stream closed, and get the fin 397 if io.is_stream_closable() && self.decoder.decode(&buf[..0]).0.is_complete() { 398 return Poll::Ready(Ok(0)); 399 } 400 io.shutdown(); 401 return Poll::Ready(err_from_msg!(BodyDecode, "Response body incomplete")); 402 } 403 let (text, rem) = self.decoder.decode(read_buf.filled()); 404 self.interceptors.intercept_output(read_buf.filled())?; 405 read += filled; 406 // Contains redundant `rem`, return error. 407 match (text.is_complete(), rem.is_empty()) { 408 (true, false) => { 409 io.shutdown(); 410 Poll::Ready(err_from_msg!(BodyDecode, "Not eof")) 411 } 412 (true, true) => { 413 if !io.is_stream_closable() { 414 // stream not closed, waiting for the fin 415 self.io = Some(io); 416 } 417 Poll::Ready(Ok(read)) 418 } 419 _ => { 420 self.io = Some(io); 421 Poll::Ready(Ok(read)) 422 } 423 } 424 } 425 Poll::Pending => { 426 self.io = Some(io); 427 if read != 0 { 428 return Poll::Ready(Ok(read)); 429 } 430 Poll::Pending 431 } 432 Poll::Ready(Err(e)) => { 433 // If IO error occurs, shutdowns `io` before return. 434 io.shutdown(); 435 Poll::Ready(err_from_io!(BodyDecode, e)) 436 } 437 } 438 } 439 } 440 441 #[cfg(feature = "http1_1")] 442 struct Chunk { 443 interceptors: Arc<Interceptors>, 444 decoder: ChunkBodyDecoder, 445 pre: Option<Cursor<Vec<u8>>>, 446 io: Option<BoxStreamData>, 447 } 448 449 #[cfg(feature = "http1_1")] 450 impl Chunk { new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self451 pub(crate) fn new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self { 452 Self { 453 interceptors, 454 decoder: ChunkBodyDecoder::new().contains_trailer(true), 455 pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())), 456 io: Some(io), 457 } 458 } 459 } 460 461 #[cfg(feature = "http1_1")] 462 impl Chunk { data( &mut self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>463 fn data( 464 &mut self, 465 cx: &mut Context<'_>, 466 buf: &mut [u8], 467 ) -> Poll<Result<usize, HttpClientError>> { 468 if buf.is_empty() { 469 return Poll::Ready(Ok(0)); 470 } 471 472 let mut read = 0; 473 474 while let Some(pre) = self.pre.as_mut() { 475 // Here cursor read never failed. 476 let size = Read::read(pre, &mut buf[read..]).unwrap(); 477 if size == 0 { 478 self.pre = None; 479 } 480 481 let (size, flag) = self.merge_chunks(&mut buf[read..read + size])?; 482 read += size; 483 484 if flag { 485 // Return if we find a 0-sized chunk. 486 self.io = None; 487 return Poll::Ready(Ok(read)); 488 } else if read != 0 { 489 // Return if we get some data. 490 return Poll::Ready(Ok(read)); 491 } 492 } 493 494 // Here `read` must be 0. 495 while let Some(mut io) = self.io.take() { 496 let mut read_buf = ReadBuf::new(&mut buf[read..]); 497 match Pin::new(&mut io).poll_read(cx, &mut read_buf) { 498 Poll::Ready(Ok(())) => { 499 let filled = read_buf.filled().len(); 500 if filled == 0 { 501 io.shutdown(); 502 return Poll::Ready(err_from_msg!(BodyDecode, "Response body incomplete")); 503 } 504 let (size, flag) = self.merge_chunks(read_buf.filled_mut())?; 505 self.interceptors.intercept_output(read_buf.filled_mut())?; 506 read += size; 507 if flag { 508 // Return if we find a 0-sized chunk. 509 // Return if we get some data. 510 return Poll::Ready(Ok(read)); 511 } 512 self.io = Some(io); 513 if read != 0 { 514 return Poll::Ready(Ok(read)); 515 } 516 } 517 Poll::Pending => { 518 self.io = Some(io); 519 return Poll::Pending; 520 } 521 Poll::Ready(Err(e)) => { 522 // If IO error occurs, shutdowns `io` before return. 523 io.shutdown(); 524 return Poll::Ready(err_from_io!(BodyDecode, e)); 525 } 526 } 527 } 528 529 Poll::Ready(Ok(read)) 530 } 531 merge_chunks(&mut self, buf: &mut [u8]) -> Result<(usize, bool), HttpClientError>532 fn merge_chunks(&mut self, buf: &mut [u8]) -> Result<(usize, bool), HttpClientError> { 533 // Here we need to merge the chunks into one data block and return. 534 // The data arrangement in buf is as follows: 535 // 536 // data in buf: 537 // +------+------+------+------+------+------+------+ 538 // | data | len | data | len | ... | data | len | 539 // +------+------+------+------+------+------+------+ 540 // 541 // We need to merge these data blocks into one block: 542 // 543 // after merge: 544 // +---------------------------+ 545 // | data | 546 // +---------------------------+ 547 548 let (chunks, junk) = self 549 .decoder 550 .decode(buf) 551 .map_err(|e| HttpClientError::from_error(ErrorKind::BodyDecode, e))?; 552 553 let mut finished = false; 554 let mut ptrs = Vec::new(); 555 556 for chunk in chunks.into_iter() { 557 if chunk.trailer().is_some() { 558 if chunk.state() == &ChunkState::Finish { 559 finished = true; 560 } 561 } else { 562 if chunk.size() == 0 && chunk.state() != &ChunkState::MetaSize { 563 finished = true; 564 break; 565 } 566 let data = chunk.data(); 567 ptrs.push((data.as_ptr(), data.len())) 568 } 569 } 570 571 if finished && !junk.is_empty() { 572 return err_from_msg!(BodyDecode, "Invalid chunk body"); 573 } 574 575 let start = buf.as_ptr(); 576 577 let mut idx = 0; 578 for (ptr, len) in ptrs.into_iter() { 579 let st = ptr as usize - start as usize; 580 let ed = st + len; 581 buf.copy_within(st..ed, idx); 582 idx += len; 583 } 584 Ok((idx, finished)) 585 } 586 } 587 588 #[cfg(feature = "ylong_base")] 589 #[cfg(test)] 590 mod ut_async_http_body { 591 use std::sync::Arc; 592 593 use ylong_http::body::async_impl; 594 595 use crate::async_impl::HttpBody; 596 use crate::util::interceptor::IdleInterceptor; 597 use crate::util::normalizer::BodyLength; 598 use crate::ErrorKind; 599 600 /// UT test cases for `HttpBody::trailer`. 601 /// 602 /// # Brief 603 /// 1. Creates a `HttpBody` by calling `HttpBody::new`. 604 /// 2. Calls `trailer` to get headers. 605 /// 3. Checks if the test result is correct. 606 #[test] ut_asnyc_chunk_trailer_1()607 fn ut_asnyc_chunk_trailer_1() { 608 let handle = ylong_runtime::spawn(async move { 609 async_chunk_trailer_1().await; 610 async_chunk_trailer_2().await; 611 }); 612 ylong_runtime::block_on(handle).unwrap(); 613 } 614 async_chunk_trailer_1()615 async fn async_chunk_trailer_1() { 616 let box_stream = Box::new("".as_bytes()); 617 let chunk_body_bytes = "\ 618 5\r\n\ 619 hello\r\n\ 620 C ; type = text ;end = !\r\n\ 621 hello world!\r\n\ 622 000; message = last\r\n\ 623 accept:text/html\r\n\r\n\ 624 "; 625 let mut chunk = HttpBody::new( 626 Arc::new(IdleInterceptor), 627 BodyLength::Chunk, 628 box_stream, 629 chunk_body_bytes.as_bytes(), 630 ) 631 .unwrap(); 632 let res = async_impl::Body::trailer(&mut chunk) 633 .await 634 .unwrap() 635 .unwrap(); 636 assert_eq!( 637 res.get("accept").unwrap().to_string().unwrap(), 638 "text/html".to_string() 639 ); 640 let box_stream = Box::new("".as_bytes()); 641 let chunk_body_no_trailer_bytes = "\ 642 5\r\n\ 643 hello\r\n\ 644 C ; type = text ;end = !\r\n\ 645 hello world!\r\n\ 646 0\r\n\r\n\ 647 "; 648 649 let mut chunk = HttpBody::new( 650 Arc::new(IdleInterceptor), 651 BodyLength::Chunk, 652 box_stream, 653 chunk_body_no_trailer_bytes.as_bytes(), 654 ) 655 .unwrap(); 656 657 let mut buf = [0u8; 32]; 658 // Read body part 659 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 660 assert_eq!(read, 5); 661 assert_eq!(&buf[..read], b"hello"); 662 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 663 assert_eq!(read, 12); 664 assert_eq!(&buf[..read], b"hello world!"); 665 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 666 assert_eq!(read, 0); 667 assert_eq!(&buf[..read], b""); 668 // try read trailer part 669 let res = async_impl::Body::trailer(&mut chunk).await.unwrap(); 670 assert!(res.is_none()); 671 } 672 async_chunk_trailer_2()673 async fn async_chunk_trailer_2() { 674 let box_stream = Box::new("".as_bytes()); 675 let chunk_body_bytes = "\ 676 5\r\n\ 677 hello\r\n\ 678 C ; type = text ;end = !\r\n\ 679 hello world!\r\n\ 680 000; message = last\r\n\ 681 Expires: Wed, 21 Oct 2015 07:27:00 GMT \r\n\r\n\ 682 "; 683 let mut chunk = HttpBody::new( 684 Arc::new(IdleInterceptor), 685 BodyLength::Chunk, 686 box_stream, 687 chunk_body_bytes.as_bytes(), 688 ) 689 .unwrap(); 690 let res = async_impl::Body::trailer(&mut chunk) 691 .await 692 .unwrap() 693 .unwrap(); 694 assert_eq!( 695 res.get("expires").unwrap().to_string().unwrap(), 696 "Wed, 21 Oct 2015 07:27:00 GMT".to_string() 697 ); 698 } 699 700 /// UT test cases for `Body::data`. 701 /// 702 /// # Brief 703 /// 1. Creates a chunk `HttpBody`. 704 /// 2. Calls `data` method get boxstream. 705 /// 3. Checks if data size is correct. 706 #[test] ut_asnyc_http_body_chunk2()707 fn ut_asnyc_http_body_chunk2() { 708 let handle = ylong_runtime::spawn(async move { 709 http_body_chunk2().await; 710 }); 711 ylong_runtime::block_on(handle).unwrap(); 712 } 713 http_body_chunk2()714 async fn http_body_chunk2() { 715 let box_stream = Box::new( 716 "\ 717 5\r\n\ 718 hello\r\n\ 719 C ; type = text ;end = !\r\n\ 720 hello world!\r\n\ 721 000; message = last\r\n\ 722 accept:text/html\r\n\r\n\ 723 " 724 .as_bytes(), 725 ); 726 let chunk_body_bytes = ""; 727 let mut chunk = HttpBody::new( 728 Arc::new(IdleInterceptor), 729 BodyLength::Chunk, 730 box_stream, 731 chunk_body_bytes.as_bytes(), 732 ) 733 .unwrap(); 734 735 let mut buf = [0u8; 32]; 736 // Read body part 737 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 738 assert_eq!(read, 5); 739 740 let box_stream = Box::new("".as_bytes()); 741 let chunk_body_no_trailer_bytes = "\ 742 5\r\n\ 743 hello\r\n\ 744 C ; type = text ;end = !\r\n\ 745 hello world!\r\n\ 746 0\r\n\r\n\ 747 "; 748 749 let mut chunk = HttpBody::new( 750 Arc::new(IdleInterceptor), 751 BodyLength::Chunk, 752 box_stream, 753 chunk_body_no_trailer_bytes.as_bytes(), 754 ) 755 .unwrap(); 756 757 let mut buf = [0u8; 32]; 758 // Read body part 759 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 760 assert_eq!(read, 5); 761 assert_eq!(&buf[..read], b"hello"); 762 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 763 assert_eq!(read, 12); 764 assert_eq!(&buf[..read], b"hello world!"); 765 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 766 assert_eq!(read, 0); 767 assert_eq!(&buf[..read], b""); 768 let res = async_impl::Body::trailer(&mut chunk).await.unwrap(); 769 assert!(res.is_none()); 770 } 771 772 /// UT test cases for `Body::data`. 773 /// 774 /// # Brief 775 /// 1. Creates a empty `HttpBody`. 776 /// 2. Calls `HttpBody::new` to create empty http body. 777 /// 3. Checks if http body is empty. 778 #[test] http_body_empty_err()779 fn http_body_empty_err() { 780 let box_stream = Box::new("".as_bytes()); 781 let content_bytes = "hello"; 782 783 match HttpBody::new( 784 Arc::new(IdleInterceptor), 785 BodyLength::Empty, 786 box_stream, 787 content_bytes.as_bytes(), 788 ) { 789 Ok(_) => (), 790 Err(e) => assert_eq!(e.error_kind(), ErrorKind::Request), 791 } 792 } 793 794 /// UT test cases for text `HttpBody::new`. 795 /// 796 /// # Brief 797 /// 1. Creates a text `HttpBody`. 798 /// 2. Calls `HttpBody::new` to create text http body. 799 /// 3. Checks if result is correct. 800 #[test] ut_http_body_text()801 fn ut_http_body_text() { 802 let handle = ylong_runtime::spawn(async move { 803 http_body_text().await; 804 }); 805 ylong_runtime::block_on(handle).unwrap(); 806 } 807 http_body_text()808 async fn http_body_text() { 809 let box_stream = Box::new("hello world".as_bytes()); 810 let content_bytes = ""; 811 812 let mut text = HttpBody::new( 813 Arc::new(IdleInterceptor), 814 BodyLength::Length(11), 815 box_stream, 816 content_bytes.as_bytes(), 817 ) 818 .unwrap(); 819 820 let mut buf = [0u8; 5]; 821 // Read body part 822 let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap(); 823 assert_eq!(read, 5); 824 let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap(); 825 assert_eq!(read, 5); 826 let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap(); 827 assert_eq!(read, 1); 828 let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap(); 829 assert_eq!(read, 0); 830 831 let box_stream = Box::new("".as_bytes()); 832 let content_bytes = "hello"; 833 834 let mut text = HttpBody::new( 835 Arc::new(IdleInterceptor), 836 BodyLength::Length(5), 837 box_stream, 838 content_bytes.as_bytes(), 839 ) 840 .unwrap(); 841 842 let mut buf = [0u8; 32]; 843 // Read body part 844 let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap(); 845 assert_eq!(read, 5); 846 let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap(); 847 assert_eq!(read, 0); 848 } 849 850 /// UT test cases for until_close `HttpBody::new`. 851 /// 852 /// # Brief 853 /// 1. Creates a until_close `HttpBody`. 854 /// 2. Calls `HttpBody::new` to create until_close http body. 855 /// 3. Checks if result is correct. 856 #[test] ut_http_body_until_close()857 fn ut_http_body_until_close() { 858 let handle = ylong_runtime::spawn(async move { 859 http_body_until_close().await; 860 }); 861 ylong_runtime::block_on(handle).unwrap(); 862 } 863 http_body_until_close()864 async fn http_body_until_close() { 865 let box_stream = Box::new("hello world".as_bytes()); 866 let content_bytes = ""; 867 868 let mut until_close = HttpBody::new( 869 Arc::new(IdleInterceptor), 870 BodyLength::UntilClose, 871 box_stream, 872 content_bytes.as_bytes(), 873 ) 874 .unwrap(); 875 876 let mut buf = [0u8; 5]; 877 // Read body part 878 let read = async_impl::Body::data(&mut until_close, &mut buf) 879 .await 880 .unwrap(); 881 assert_eq!(read, 5); 882 let read = async_impl::Body::data(&mut until_close, &mut buf) 883 .await 884 .unwrap(); 885 assert_eq!(read, 5); 886 let read = async_impl::Body::data(&mut until_close, &mut buf) 887 .await 888 .unwrap(); 889 assert_eq!(read, 1); 890 891 let box_stream = Box::new("".as_bytes()); 892 let content_bytes = "hello"; 893 894 let mut until_close = HttpBody::new( 895 Arc::new(IdleInterceptor), 896 BodyLength::UntilClose, 897 box_stream, 898 content_bytes.as_bytes(), 899 ) 900 .unwrap(); 901 902 let mut buf = [0u8; 5]; 903 // Read body part 904 let read = async_impl::Body::data(&mut until_close, &mut buf) 905 .await 906 .unwrap(); 907 assert_eq!(read, 5); 908 let read = async_impl::Body::data(&mut until_close, &mut buf) 909 .await 910 .unwrap(); 911 assert_eq!(read, 0); 912 } 913 } 914