1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use core::pin::Pin; 15 use core::task::{Context, Poll}; 16 use std::future::Future; 17 use std::io::{Cursor, Read}; 18 use std::sync::Arc; 19 20 use ylong_http::body::async_impl::Body; 21 use ylong_http::body::TextBodyDecoder; 22 #[cfg(feature = "http1_1")] 23 use ylong_http::body::{ChunkBodyDecoder, ChunkState}; 24 use ylong_http::headers::Headers; 25 26 use super::conn::StreamData; 27 use crate::error::{ErrorKind, HttpClientError}; 28 use crate::runtime::{AsyncRead, ReadBuf, Sleep}; 29 use crate::util::interceptor::Interceptors; 30 use crate::util::normalizer::BodyLength; 31 32 const TRAILER_SIZE: usize = 1024; 33 34 /// `HttpBody` is the body part of the `Response` returned by `Client::request`. 35 /// `HttpBody` implements `Body` trait, so users can call related methods to get 36 /// body data. 37 /// 38 /// # Examples 39 /// 40 /// ```no_run 41 /// use ylong_http_client::async_impl::{Body, Client, HttpBody, Request}; 42 /// use ylong_http_client::HttpClientError; 43 /// 44 /// async fn read_body() -> Result<(), HttpClientError> { 45 /// let client = Client::new(); 46 /// 47 /// // `HttpBody` is the body part of `response`. 48 /// let mut response = client 49 /// .request(Request::builder().body(Body::empty())?) 50 /// .await?; 51 /// 52 /// // Users can use `Body::data` to get body data. 53 /// let mut buf = [0u8; 1024]; 54 /// loop { 55 /// let size = response.data(&mut buf).await.unwrap(); 56 /// if size == 0 { 57 /// break; 58 /// } 59 /// let _data = &buf[..size]; 60 /// // Deals with the data. 61 /// } 62 /// Ok(()) 63 /// } 64 /// ``` 65 pub struct HttpBody { 66 kind: Kind, 67 sleep: Option<Pin<Box<Sleep>>>, 68 } 69 70 type BoxStreamData = Box<dyn StreamData + Sync + Send + Unpin>; 71 72 impl HttpBody { new( interceptors: Arc<Interceptors>, body_length: BodyLength, io: BoxStreamData, pre: &[u8], ) -> Result<Self, HttpClientError>73 pub(crate) fn new( 74 interceptors: Arc<Interceptors>, 75 body_length: BodyLength, 76 io: BoxStreamData, 77 pre: &[u8], 78 ) -> Result<Self, HttpClientError> { 79 let kind = match body_length { 80 BodyLength::Empty => { 81 if !pre.is_empty() { 82 // TODO: Consider the case where BodyLength is empty but pre is not empty. 83 io.shutdown(); 84 return err_from_msg!(Request, "Body length is 0 but read extra data"); 85 } 86 Kind::Empty 87 } 88 BodyLength::Length(len) => Kind::Text(Text::new(len, pre, io, interceptors)), 89 BodyLength::UntilClose => Kind::UntilClose(UntilClose::new(pre, io, interceptors)), 90 91 #[cfg(feature = "http1_1")] 92 BodyLength::Chunk => Kind::Chunk(Chunk::new(pre, io, interceptors)), 93 }; 94 Ok(Self { kind, sleep: None }) 95 } 96 set_sleep(&mut self, sleep: Option<Pin<Box<Sleep>>>)97 pub(crate) fn set_sleep(&mut self, sleep: Option<Pin<Box<Sleep>>>) { 98 self.sleep = sleep; 99 } 100 } 101 102 impl Body for HttpBody { 103 type Error = HttpClientError; 104 poll_data( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, Self::Error>>105 fn poll_data( 106 mut self: Pin<&mut Self>, 107 cx: &mut Context<'_>, 108 buf: &mut [u8], 109 ) -> Poll<Result<usize, Self::Error>> { 110 if buf.is_empty() { 111 return Poll::Ready(Ok(0)); 112 } 113 114 if let Some(delay) = self.sleep.as_mut() { 115 if let Poll::Ready(()) = Pin::new(delay).poll(cx) { 116 return Poll::Ready(err_from_io!(Timeout, std::io::ErrorKind::TimedOut.into())); 117 } 118 } 119 120 match self.kind { 121 Kind::Empty => Poll::Ready(Ok(0)), 122 Kind::Text(ref mut text) => text.data(cx, buf), 123 Kind::UntilClose(ref mut until_close) => until_close.data(cx, buf), 124 #[cfg(feature = "http1_1")] 125 Kind::Chunk(ref mut chunk) => chunk.data(cx, buf), 126 } 127 } 128 poll_trailer( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<Option<Headers>, Self::Error>>129 fn poll_trailer( 130 mut self: Pin<&mut Self>, 131 cx: &mut Context<'_>, 132 ) -> Poll<Result<Option<Headers>, Self::Error>> { 133 // Get trailer data from io 134 if let Some(delay) = self.sleep.as_mut() { 135 if let Poll::Ready(()) = Pin::new(delay).poll(cx) { 136 return Poll::Ready(err_from_msg!(Timeout, "Request timeout")); 137 } 138 } 139 140 let mut read_buf = [0_u8; TRAILER_SIZE]; 141 142 match self.kind { 143 #[cfg(feature = "http1_1")] 144 Kind::Chunk(ref mut chunk) => { 145 match chunk.data(cx, &mut read_buf) { 146 Poll::Ready(Ok(_)) => {} 147 Poll::Pending => { 148 return Poll::Pending; 149 } 150 Poll::Ready(Err(e)) => { 151 return Poll::Ready(Err(e)); 152 } 153 } 154 Poll::Ready(Ok(chunk.decoder.get_trailer().map_err(|e| { 155 HttpClientError::from_error(ErrorKind::BodyDecode, e) 156 })?)) 157 } 158 _ => Poll::Ready(Ok(None)), 159 } 160 } 161 } 162 163 impl Drop for HttpBody { drop(&mut self)164 fn drop(&mut self) { 165 let io = match self.kind { 166 Kind::Text(ref mut text) => text.io.as_mut(), 167 #[cfg(feature = "http1_1")] 168 Kind::Chunk(ref mut chunk) => chunk.io.as_mut(), 169 Kind::UntilClose(ref mut until_close) => until_close.io.as_mut(), 170 _ => None, 171 }; 172 // If response body is not totally read, shutdown io. 173 if let Some(io) = io { 174 io.shutdown() 175 } 176 } 177 } 178 179 // TODO: `TextBodyDecoder` implementation and `ChunkBodyDecoder` implementation. 180 enum Kind { 181 Empty, 182 Text(Text), 183 #[cfg(feature = "http1_1")] 184 Chunk(Chunk), 185 UntilClose(UntilClose), 186 } 187 188 struct UntilClose { 189 interceptors: Arc<Interceptors>, 190 pre: Option<Cursor<Vec<u8>>>, 191 io: Option<BoxStreamData>, 192 } 193 194 impl UntilClose { new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self195 pub(crate) fn new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self { 196 Self { 197 interceptors, 198 pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())), 199 io: Some(io), 200 } 201 } 202 data( &mut self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>203 fn data( 204 &mut self, 205 cx: &mut Context<'_>, 206 buf: &mut [u8], 207 ) -> Poll<Result<usize, HttpClientError>> { 208 if buf.is_empty() { 209 return Poll::Ready(Ok(0)); 210 } 211 let mut read = 0; 212 if let Some(pre) = self.pre.as_mut() { 213 // Here cursor read never failed. 214 let this_read = Read::read(pre, buf).unwrap(); 215 if this_read == 0 { 216 self.pre = None; 217 } else { 218 read += this_read; 219 } 220 } 221 222 if !buf[read..].is_empty() { 223 if let Some(io) = self.io.take() { 224 return self.poll_read_io(cx, io, read, buf); 225 } 226 } 227 Poll::Ready(Ok(read)) 228 } 229 poll_read_io( &mut self, cx: &mut Context<'_>, mut io: BoxStreamData, read: usize, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>230 fn poll_read_io( 231 &mut self, 232 cx: &mut Context<'_>, 233 mut io: BoxStreamData, 234 read: usize, 235 buf: &mut [u8], 236 ) -> Poll<Result<usize, HttpClientError>> { 237 let mut read = read; 238 let mut read_buf = ReadBuf::new(&mut buf[read..]); 239 match Pin::new(&mut io).poll_read(cx, &mut read_buf) { 240 Poll::Ready(Ok(())) => { 241 let filled = read_buf.filled().len(); 242 if filled == 0 { 243 // Stream closed, and get the fin. 244 if io.is_stream_closable() { 245 return Poll::Ready(Ok(0)); 246 } 247 // Disconnected for http1. 248 io.shutdown(); 249 } else { 250 self.interceptors 251 .intercept_output(&buf[read..(read + filled)])?; 252 self.io = Some(io); 253 } 254 read += filled; 255 Poll::Ready(Ok(read)) 256 } 257 Poll::Pending => { 258 self.io = Some(io); 259 if read != 0 { 260 return Poll::Ready(Ok(read)); 261 } 262 Poll::Pending 263 } 264 Poll::Ready(Err(e)) => { 265 // If IO error occurs, shutdowns `io` before return. 266 io.shutdown(); 267 Poll::Ready(err_from_io!(BodyTransfer, e)) 268 } 269 } 270 } 271 } 272 273 struct Text { 274 interceptors: Arc<Interceptors>, 275 decoder: TextBodyDecoder, 276 pre: Option<Cursor<Vec<u8>>>, 277 io: Option<BoxStreamData>, 278 } 279 280 impl Text { new( len: u64, pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>, ) -> Self281 pub(crate) fn new( 282 len: u64, 283 pre: &[u8], 284 io: BoxStreamData, 285 interceptors: Arc<Interceptors>, 286 ) -> Self { 287 Self { 288 interceptors, 289 decoder: TextBodyDecoder::new(len), 290 pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())), 291 io: Some(io), 292 } 293 } 294 } 295 296 impl Text { data( &mut self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>297 fn data( 298 &mut self, 299 cx: &mut Context<'_>, 300 buf: &mut [u8], 301 ) -> Poll<Result<usize, HttpClientError>> { 302 if buf.is_empty() { 303 return Poll::Ready(Ok(0)); 304 } 305 306 let mut read = 0; 307 308 if let Some(pre) = self.pre.as_mut() { 309 // Here cursor read never failed. 310 let this_read = Read::read(pre, buf).unwrap(); 311 if this_read == 0 { 312 self.pre = None; 313 } else { 314 read += this_read; 315 if let Some(result) = self.read_remaining(buf, read) { 316 return result; 317 } 318 } 319 } 320 321 if !buf[read..].is_empty() { 322 if let Some(io) = self.io.take() { 323 return self.poll_read_io(cx, buf, io, read); 324 } 325 } 326 Poll::Ready(Ok(read)) 327 } 328 read_remaining( &mut self, buf: &mut [u8], read: usize, ) -> Option<Poll<Result<usize, HttpClientError>>>329 fn read_remaining( 330 &mut self, 331 buf: &mut [u8], 332 read: usize, 333 ) -> Option<Poll<Result<usize, HttpClientError>>> { 334 let (text, rem) = self.decoder.decode(&buf[..read]); 335 336 // Contains redundant `rem`, return error. 337 match (text.is_complete(), rem.is_empty()) { 338 (true, false) => { 339 if let Some(io) = self.io.take() { 340 io.shutdown(); 341 }; 342 Some(Poll::Ready(err_from_msg!(BodyDecode, "Not eof"))) 343 } 344 (true, true) => { 345 if let Some(io) = self.io.take() { 346 // stream not closed, waiting for the fin 347 if !io.is_stream_closable() { 348 self.io = Some(io); 349 } 350 } 351 Some(Poll::Ready(Ok(read))) 352 } 353 // TextBodyDecoder decodes as much as possible here. 354 _ => None, 355 } 356 } 357 poll_read_io( &mut self, cx: &mut Context<'_>, buf: &mut [u8], mut io: BoxStreamData, read: usize, ) -> Poll<Result<usize, HttpClientError>>358 fn poll_read_io( 359 &mut self, 360 cx: &mut Context<'_>, 361 buf: &mut [u8], 362 mut io: BoxStreamData, 363 read: usize, 364 ) -> Poll<Result<usize, HttpClientError>> { 365 let mut read = read; 366 let mut read_buf = ReadBuf::new(&mut buf[read..]); 367 match Pin::new(&mut io).poll_read(cx, &mut read_buf) { 368 // Disconnected. 369 Poll::Ready(Ok(())) => { 370 let filled = read_buf.filled().len(); 371 if filled == 0 { 372 // stream closed, and get the fin 373 if io.is_stream_closable() && self.decoder.decode(&buf[..0]).0.is_complete() { 374 return Poll::Ready(Ok(0)); 375 } 376 io.shutdown(); 377 return Poll::Ready(err_from_msg!(BodyDecode, "Response body incomplete")); 378 } 379 let (text, rem) = self.decoder.decode(read_buf.filled()); 380 self.interceptors.intercept_output(read_buf.filled())?; 381 read += filled; 382 // Contains redundant `rem`, return error. 383 match (text.is_complete(), rem.is_empty()) { 384 (true, false) => { 385 io.shutdown(); 386 Poll::Ready(err_from_msg!(BodyDecode, "Not eof")) 387 } 388 (true, true) => { 389 if !io.is_stream_closable() { 390 // stream not closed, waiting for the fin 391 self.io = Some(io); 392 } 393 Poll::Ready(Ok(read)) 394 } 395 _ => { 396 self.io = Some(io); 397 Poll::Ready(Ok(read)) 398 } 399 } 400 } 401 Poll::Pending => { 402 self.io = Some(io); 403 if read != 0 { 404 return Poll::Ready(Ok(read)); 405 } 406 Poll::Pending 407 } 408 Poll::Ready(Err(e)) => { 409 // If IO error occurs, shutdowns `io` before return. 410 io.shutdown(); 411 Poll::Ready(err_from_io!(BodyDecode, e)) 412 } 413 } 414 } 415 } 416 417 #[cfg(feature = "http1_1")] 418 struct Chunk { 419 interceptors: Arc<Interceptors>, 420 decoder: ChunkBodyDecoder, 421 pre: Option<Cursor<Vec<u8>>>, 422 io: Option<BoxStreamData>, 423 } 424 425 #[cfg(feature = "http1_1")] 426 impl Chunk { new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self427 pub(crate) fn new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self { 428 Self { 429 interceptors, 430 decoder: ChunkBodyDecoder::new().contains_trailer(true), 431 pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())), 432 io: Some(io), 433 } 434 } 435 } 436 437 #[cfg(feature = "http1_1")] 438 impl Chunk { data( &mut self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>439 fn data( 440 &mut self, 441 cx: &mut Context<'_>, 442 buf: &mut [u8], 443 ) -> Poll<Result<usize, HttpClientError>> { 444 if buf.is_empty() { 445 return Poll::Ready(Ok(0)); 446 } 447 448 let mut read = 0; 449 450 while let Some(pre) = self.pre.as_mut() { 451 // Here cursor read never failed. 452 let size = Read::read(pre, &mut buf[read..]).unwrap(); 453 if size == 0 { 454 self.pre = None; 455 } 456 457 let (size, flag) = self.merge_chunks(&mut buf[read..read + size])?; 458 read += size; 459 460 if flag { 461 // Return if we find a 0-sized chunk. 462 self.io = None; 463 return Poll::Ready(Ok(read)); 464 } else if read != 0 { 465 // Return if we get some data. 466 return Poll::Ready(Ok(read)); 467 } 468 } 469 470 // Here `read` must be 0. 471 while let Some(mut io) = self.io.take() { 472 let mut read_buf = ReadBuf::new(&mut buf[read..]); 473 match Pin::new(&mut io).poll_read(cx, &mut read_buf) { 474 Poll::Ready(Ok(())) => { 475 let filled = read_buf.filled().len(); 476 if filled == 0 { 477 io.shutdown(); 478 return Poll::Ready(err_from_msg!(BodyDecode, "Response body incomplete")); 479 } 480 let (size, flag) = self.merge_chunks(read_buf.filled_mut())?; 481 self.interceptors.intercept_output(read_buf.filled_mut())?; 482 read += size; 483 if flag { 484 // Return if we find a 0-sized chunk. 485 // Return if we get some data. 486 return Poll::Ready(Ok(read)); 487 } 488 self.io = Some(io); 489 if read != 0 { 490 return Poll::Ready(Ok(read)); 491 } 492 } 493 Poll::Pending => { 494 self.io = Some(io); 495 return Poll::Pending; 496 } 497 Poll::Ready(Err(e)) => { 498 // If IO error occurs, shutdowns `io` before return. 499 io.shutdown(); 500 return Poll::Ready(err_from_io!(BodyDecode, e)); 501 } 502 } 503 } 504 505 Poll::Ready(Ok(read)) 506 } 507 merge_chunks(&mut self, buf: &mut [u8]) -> Result<(usize, bool), HttpClientError>508 fn merge_chunks(&mut self, buf: &mut [u8]) -> Result<(usize, bool), HttpClientError> { 509 // Here we need to merge the chunks into one data block and return. 510 // The data arrangement in buf is as follows: 511 // 512 // data in buf: 513 // +------+------+------+------+------+------+------+ 514 // | data | len | data | len | ... | data | len | 515 // +------+------+------+------+------+------+------+ 516 // 517 // We need to merge these data blocks into one block: 518 // 519 // after merge: 520 // +---------------------------+ 521 // | data | 522 // +---------------------------+ 523 524 let (chunks, junk) = self 525 .decoder 526 .decode(buf) 527 .map_err(|e| HttpClientError::from_error(ErrorKind::BodyDecode, e))?; 528 529 let mut finished = false; 530 let mut ptrs = Vec::new(); 531 532 for chunk in chunks.into_iter() { 533 if chunk.trailer().is_some() { 534 if chunk.state() == &ChunkState::Finish { 535 finished = true; 536 } 537 } else { 538 if chunk.size() == 0 && chunk.state() != &ChunkState::MetaSize { 539 finished = true; 540 break; 541 } 542 let data = chunk.data(); 543 ptrs.push((data.as_ptr(), data.len())) 544 } 545 } 546 547 if finished && !junk.is_empty() { 548 return err_from_msg!(BodyDecode, "Invalid chunk body"); 549 } 550 551 let start = buf.as_ptr(); 552 553 let mut idx = 0; 554 for (ptr, len) in ptrs.into_iter() { 555 let st = ptr as usize - start as usize; 556 let ed = st + len; 557 buf.copy_within(st..ed, idx); 558 idx += len; 559 } 560 Ok((idx, finished)) 561 } 562 } 563 564 #[cfg(feature = "ylong_base")] 565 #[cfg(test)] 566 mod ut_async_http_body { 567 use std::sync::Arc; 568 569 use ylong_http::body::async_impl; 570 571 use crate::async_impl::HttpBody; 572 use crate::util::interceptor::IdleInterceptor; 573 use crate::util::normalizer::BodyLength; 574 use crate::ErrorKind; 575 576 /// UT test cases for `HttpBody::trailer`. 577 /// 578 /// # Brief 579 /// 1. Creates a `HttpBody` by calling `HttpBody::new`. 580 /// 2. Calls `trailer` to get headers. 581 /// 3. Checks if the test result is correct. 582 #[test] ut_asnyc_chunk_trailer_1()583 fn ut_asnyc_chunk_trailer_1() { 584 let handle = ylong_runtime::spawn(async move { 585 async_chunk_trailer_1().await; 586 async_chunk_trailer_2().await; 587 }); 588 ylong_runtime::block_on(handle).unwrap(); 589 } 590 async_chunk_trailer_1()591 async fn async_chunk_trailer_1() { 592 let box_stream = Box::new("".as_bytes()); 593 let chunk_body_bytes = "\ 594 5\r\n\ 595 hello\r\n\ 596 C ; type = text ;end = !\r\n\ 597 hello world!\r\n\ 598 000; message = last\r\n\ 599 accept:text/html\r\n\r\n\ 600 "; 601 let mut chunk = HttpBody::new( 602 Arc::new(IdleInterceptor), 603 BodyLength::Chunk, 604 box_stream, 605 chunk_body_bytes.as_bytes(), 606 ) 607 .unwrap(); 608 let res = async_impl::Body::trailer(&mut chunk) 609 .await 610 .unwrap() 611 .unwrap(); 612 assert_eq!( 613 res.get("accept").unwrap().to_string().unwrap(), 614 "text/html".to_string() 615 ); 616 let box_stream = Box::new("".as_bytes()); 617 let chunk_body_no_trailer_bytes = "\ 618 5\r\n\ 619 hello\r\n\ 620 C ; type = text ;end = !\r\n\ 621 hello world!\r\n\ 622 0\r\n\r\n\ 623 "; 624 625 let mut chunk = HttpBody::new( 626 Arc::new(IdleInterceptor), 627 BodyLength::Chunk, 628 box_stream, 629 chunk_body_no_trailer_bytes.as_bytes(), 630 ) 631 .unwrap(); 632 633 let mut buf = [0u8; 32]; 634 // Read body part 635 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 636 assert_eq!(read, 5); 637 assert_eq!(&buf[..read], b"hello"); 638 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 639 assert_eq!(read, 12); 640 assert_eq!(&buf[..read], b"hello world!"); 641 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 642 assert_eq!(read, 0); 643 assert_eq!(&buf[..read], b""); 644 // try read trailer part 645 let res = async_impl::Body::trailer(&mut chunk).await.unwrap(); 646 assert!(res.is_none()); 647 } 648 async_chunk_trailer_2()649 async fn async_chunk_trailer_2() { 650 let box_stream = Box::new("".as_bytes()); 651 let chunk_body_bytes = "\ 652 5\r\n\ 653 hello\r\n\ 654 C ; type = text ;end = !\r\n\ 655 hello world!\r\n\ 656 000; message = last\r\n\ 657 Expires: Wed, 21 Oct 2015 07:27:00 GMT \r\n\r\n\ 658 "; 659 let mut chunk = HttpBody::new( 660 Arc::new(IdleInterceptor), 661 BodyLength::Chunk, 662 box_stream, 663 chunk_body_bytes.as_bytes(), 664 ) 665 .unwrap(); 666 let res = async_impl::Body::trailer(&mut chunk) 667 .await 668 .unwrap() 669 .unwrap(); 670 assert_eq!( 671 res.get("expires").unwrap().to_string().unwrap(), 672 "Wed, 21 Oct 2015 07:27:00 GMT".to_string() 673 ); 674 } 675 676 /// UT test cases for `Body::data`. 677 /// 678 /// # Brief 679 /// 1. Creates a chunk `HttpBody`. 680 /// 2. Calls `data` method get boxstream. 681 /// 3. Checks if data size is correct. 682 #[test] ut_asnyc_http_body_chunk2()683 fn ut_asnyc_http_body_chunk2() { 684 let handle = ylong_runtime::spawn(async move { 685 http_body_chunk2().await; 686 }); 687 ylong_runtime::block_on(handle).unwrap(); 688 } 689 http_body_chunk2()690 async fn http_body_chunk2() { 691 let box_stream = Box::new( 692 "\ 693 5\r\n\ 694 hello\r\n\ 695 C ; type = text ;end = !\r\n\ 696 hello world!\r\n\ 697 000; message = last\r\n\ 698 accept:text/html\r\n\r\n\ 699 " 700 .as_bytes(), 701 ); 702 let chunk_body_bytes = ""; 703 let mut chunk = HttpBody::new( 704 Arc::new(IdleInterceptor), 705 BodyLength::Chunk, 706 box_stream, 707 chunk_body_bytes.as_bytes(), 708 ) 709 .unwrap(); 710 711 let mut buf = [0u8; 32]; 712 // Read body part 713 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 714 assert_eq!(read, 5); 715 716 let box_stream = Box::new("".as_bytes()); 717 let chunk_body_no_trailer_bytes = "\ 718 5\r\n\ 719 hello\r\n\ 720 C ; type = text ;end = !\r\n\ 721 hello world!\r\n\ 722 0\r\n\r\n\ 723 "; 724 725 let mut chunk = HttpBody::new( 726 Arc::new(IdleInterceptor), 727 BodyLength::Chunk, 728 box_stream, 729 chunk_body_no_trailer_bytes.as_bytes(), 730 ) 731 .unwrap(); 732 733 let mut buf = [0u8; 32]; 734 // Read body part 735 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 736 assert_eq!(read, 5); 737 assert_eq!(&buf[..read], b"hello"); 738 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 739 assert_eq!(read, 12); 740 assert_eq!(&buf[..read], b"hello world!"); 741 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 742 assert_eq!(read, 0); 743 assert_eq!(&buf[..read], b""); 744 let res = async_impl::Body::trailer(&mut chunk).await.unwrap(); 745 assert!(res.is_none()); 746 } 747 748 /// UT test cases for `Body::data`. 749 /// 750 /// # Brief 751 /// 1. Creates a empty `HttpBody`. 752 /// 2. Calls `HttpBody::new` to create empty http body. 753 /// 3. Checks if http body is empty. 754 #[test] http_body_empty_err()755 fn http_body_empty_err() { 756 let box_stream = Box::new("".as_bytes()); 757 let content_bytes = "hello"; 758 759 match HttpBody::new( 760 Arc::new(IdleInterceptor), 761 BodyLength::Empty, 762 box_stream, 763 content_bytes.as_bytes(), 764 ) { 765 Ok(_) => (), 766 Err(e) => assert_eq!(e.error_kind(), ErrorKind::Request), 767 } 768 } 769 770 /// UT test cases for text `HttpBody::new`. 771 /// 772 /// # Brief 773 /// 1. Creates a text `HttpBody`. 774 /// 2. Calls `HttpBody::new` to create text http body. 775 /// 3. Checks if result is correct. 776 #[test] ut_http_body_text()777 fn ut_http_body_text() { 778 let handle = ylong_runtime::spawn(async move { 779 http_body_text().await; 780 }); 781 ylong_runtime::block_on(handle).unwrap(); 782 } 783 http_body_text()784 async fn http_body_text() { 785 let box_stream = Box::new("hello world".as_bytes()); 786 let content_bytes = ""; 787 788 let mut text = HttpBody::new( 789 Arc::new(IdleInterceptor), 790 BodyLength::Length(11), 791 box_stream, 792 content_bytes.as_bytes(), 793 ) 794 .unwrap(); 795 796 let mut buf = [0u8; 5]; 797 // Read body part 798 let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap(); 799 assert_eq!(read, 5); 800 let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap(); 801 assert_eq!(read, 5); 802 let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap(); 803 assert_eq!(read, 1); 804 let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap(); 805 assert_eq!(read, 0); 806 807 let box_stream = Box::new("".as_bytes()); 808 let content_bytes = "hello"; 809 810 let mut text = HttpBody::new( 811 Arc::new(IdleInterceptor), 812 BodyLength::Length(5), 813 box_stream, 814 content_bytes.as_bytes(), 815 ) 816 .unwrap(); 817 818 let mut buf = [0u8; 32]; 819 // Read body part 820 let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap(); 821 assert_eq!(read, 5); 822 let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap(); 823 assert_eq!(read, 0); 824 } 825 826 /// UT test cases for until_close `HttpBody::new`. 827 /// 828 /// # Brief 829 /// 1. Creates a until_close `HttpBody`. 830 /// 2. Calls `HttpBody::new` to create until_close http body. 831 /// 3. Checks if result is correct. 832 #[test] ut_http_body_until_close()833 fn ut_http_body_until_close() { 834 let handle = ylong_runtime::spawn(async move { 835 http_body_until_close().await; 836 }); 837 ylong_runtime::block_on(handle).unwrap(); 838 } 839 http_body_until_close()840 async fn http_body_until_close() { 841 let box_stream = Box::new("hello world".as_bytes()); 842 let content_bytes = ""; 843 844 let mut until_close = HttpBody::new( 845 Arc::new(IdleInterceptor), 846 BodyLength::UntilClose, 847 box_stream, 848 content_bytes.as_bytes(), 849 ) 850 .unwrap(); 851 852 let mut buf = [0u8; 5]; 853 // Read body part 854 let read = async_impl::Body::data(&mut until_close, &mut buf) 855 .await 856 .unwrap(); 857 assert_eq!(read, 5); 858 let read = async_impl::Body::data(&mut until_close, &mut buf) 859 .await 860 .unwrap(); 861 assert_eq!(read, 5); 862 let read = async_impl::Body::data(&mut until_close, &mut buf) 863 .await 864 .unwrap(); 865 assert_eq!(read, 1); 866 867 let box_stream = Box::new("".as_bytes()); 868 let content_bytes = "hello"; 869 870 let mut until_close = HttpBody::new( 871 Arc::new(IdleInterceptor), 872 BodyLength::UntilClose, 873 box_stream, 874 content_bytes.as_bytes(), 875 ) 876 .unwrap(); 877 878 let mut buf = [0u8; 5]; 879 // Read body part 880 let read = async_impl::Body::data(&mut until_close, &mut buf) 881 .await 882 .unwrap(); 883 assert_eq!(read, 5); 884 let read = async_impl::Body::data(&mut until_close, &mut buf) 885 .await 886 .unwrap(); 887 assert_eq!(read, 0); 888 } 889 } 890