• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use core::pin::Pin;
15 use core::task::{Context, Poll};
16 use std::future::Future;
17 use std::io::{Cursor, Read};
18 use std::sync::Arc;
19 
20 use ylong_http::body::async_impl::Body;
21 use ylong_http::body::TextBodyDecoder;
22 #[cfg(feature = "http1_1")]
23 use ylong_http::body::{ChunkBodyDecoder, ChunkState};
24 use ylong_http::headers::Headers;
25 
26 use super::conn::StreamData;
27 use crate::error::{ErrorKind, HttpClientError};
28 use crate::runtime::{AsyncRead, ReadBuf, Sleep};
29 use crate::util::config::HttpVersion;
30 use crate::util::interceptor::Interceptors;
31 use crate::util::normalizer::BodyLength;
32 
33 const TRAILER_SIZE: usize = 1024;
34 
35 /// `HttpBody` is the body part of the `Response` returned by `Client::request`.
36 /// `HttpBody` implements `Body` trait, so users can call related methods to get
37 /// body data.
38 ///
39 /// # Examples
40 ///
41 /// ```no_run
42 /// use ylong_http_client::async_impl::{Body, Client, HttpBody, Request};
43 /// use ylong_http_client::HttpClientError;
44 ///
45 /// async fn read_body() -> Result<(), HttpClientError> {
46 ///     let client = Client::new();
47 ///
48 ///     // `HttpBody` is the body part of `response`.
49 ///     let mut response = client
50 ///         .request(Request::builder().body(Body::empty())?)
51 ///         .await?;
52 ///
53 ///     // Users can use `Body::data` to get body data.
54 ///     let mut buf = [0u8; 1024];
55 ///     loop {
56 ///         let size = response.data(&mut buf).await.unwrap();
57 ///         if size == 0 {
58 ///             break;
59 ///         }
60 ///         let _data = &buf[..size];
61 ///         // Deals with the data.
62 ///     }
63 ///     Ok(())
64 /// }
65 /// ```
66 pub struct HttpBody {
67     kind: Kind,
68     request_timeout: Option<Pin<Box<Sleep>>>,
69     total_timeout: Option<Pin<Box<Sleep>>>,
70 }
71 
72 type BoxStreamData = Box<dyn StreamData + Sync + Send + Unpin>;
73 
74 impl HttpBody {
new( interceptors: Arc<Interceptors>, body_length: BodyLength, io: BoxStreamData, pre: &[u8], ) -> Result<Self, HttpClientError>75     pub(crate) fn new(
76         interceptors: Arc<Interceptors>,
77         body_length: BodyLength,
78         io: BoxStreamData,
79         pre: &[u8],
80     ) -> Result<Self, HttpClientError> {
81         let kind = match body_length {
82             BodyLength::Empty => {
83                 if !pre.is_empty() {
84                     // TODO: Consider the case where BodyLength is empty but pre is not empty.
85                     io.shutdown();
86                     return err_from_msg!(Request, "Body length is 0 but read extra data");
87                 }
88                 Kind::Empty
89             }
90             BodyLength::Length(len) => Kind::Text(Text::new(len, pre, io, interceptors)),
91             BodyLength::UntilClose => Kind::UntilClose(UntilClose::new(pre, io, interceptors)),
92 
93             #[cfg(feature = "http1_1")]
94             BodyLength::Chunk => Kind::Chunk(Chunk::new(pre, io, interceptors)),
95         };
96         Ok(Self {
97             kind,
98             request_timeout: None,
99             total_timeout: None,
100         })
101     }
102 
set_request_sleep(&mut self, sleep: Option<Pin<Box<Sleep>>>)103     pub(crate) fn set_request_sleep(&mut self, sleep: Option<Pin<Box<Sleep>>>) {
104         self.request_timeout = sleep;
105     }
106 
set_total_sleep(&mut self, sleep: Option<Pin<Box<Sleep>>>)107     pub(crate) fn set_total_sleep(&mut self, sleep: Option<Pin<Box<Sleep>>>) {
108         self.total_timeout = sleep;
109     }
110 }
111 
112 impl Body for HttpBody {
113     type Error = HttpClientError;
114 
poll_data( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, Self::Error>>115     fn poll_data(
116         mut self: Pin<&mut Self>,
117         cx: &mut Context<'_>,
118         buf: &mut [u8],
119     ) -> Poll<Result<usize, Self::Error>> {
120         if buf.is_empty() {
121             return Poll::Ready(Ok(0));
122         }
123 
124         if let Some(delay) = self.request_timeout.as_mut() {
125             if let Poll::Ready(()) = Pin::new(delay).poll(cx) {
126                 return Poll::Ready(err_from_io!(Timeout, std::io::ErrorKind::TimedOut.into()));
127             }
128         }
129 
130         if let Some(delay) = self.total_timeout.as_mut() {
131             if let Poll::Ready(()) = Pin::new(delay).poll(cx) {
132                 return Poll::Ready(err_from_io!(Timeout, std::io::ErrorKind::TimedOut.into()));
133             }
134         }
135 
136         match self.kind {
137             Kind::Empty => Poll::Ready(Ok(0)),
138             Kind::Text(ref mut text) => text.data(cx, buf),
139             Kind::UntilClose(ref mut until_close) => until_close.data(cx, buf),
140             #[cfg(feature = "http1_1")]
141             Kind::Chunk(ref mut chunk) => chunk.data(cx, buf),
142         }
143     }
144 
poll_trailer( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<Option<Headers>, Self::Error>>145     fn poll_trailer(
146         mut self: Pin<&mut Self>,
147         cx: &mut Context<'_>,
148     ) -> Poll<Result<Option<Headers>, Self::Error>> {
149         // Get trailer data from io
150         if let Some(delay) = self.request_timeout.as_mut() {
151             if let Poll::Ready(()) = Pin::new(delay).poll(cx) {
152                 return Poll::Ready(err_from_msg!(Timeout, "Request timeout"));
153             }
154         }
155 
156         if let Some(delay) = self.total_timeout.as_mut() {
157             if let Poll::Ready(()) = Pin::new(delay).poll(cx) {
158                 return Poll::Ready(err_from_msg!(Timeout, "Request timeout"));
159             }
160         }
161 
162         let mut read_buf = [0_u8; TRAILER_SIZE];
163 
164         match self.kind {
165             #[cfg(feature = "http1_1")]
166             Kind::Chunk(ref mut chunk) => {
167                 match chunk.data(cx, &mut read_buf) {
168                     Poll::Ready(Ok(_)) => {}
169                     Poll::Pending => {
170                         return Poll::Pending;
171                     }
172                     Poll::Ready(Err(e)) => {
173                         return Poll::Ready(Err(e));
174                     }
175                 }
176                 Poll::Ready(Ok(chunk.decoder.get_trailer().map_err(|e| {
177                     HttpClientError::from_error(ErrorKind::BodyDecode, e)
178                 })?))
179             }
180             _ => Poll::Ready(Ok(None)),
181         }
182     }
183 }
184 
185 impl Drop for HttpBody {
drop(&mut self)186     fn drop(&mut self) {
187         let io = match self.kind {
188             Kind::Text(ref mut text) => text.io.as_mut(),
189             #[cfg(feature = "http1_1")]
190             Kind::Chunk(ref mut chunk) => chunk.io.as_mut(),
191             Kind::UntilClose(ref mut until_close) => until_close.io.as_mut(),
192             _ => None,
193         };
194         // If response body is not totally read, shutdown io.
195         if let Some(io) = io {
196             if io.http_version() == HttpVersion::Http1 {
197                 io.shutdown()
198             }
199         }
200     }
201 }
202 
203 // TODO: `TextBodyDecoder` implementation and `ChunkBodyDecoder` implementation.
204 enum Kind {
205     Empty,
206     Text(Text),
207     #[cfg(feature = "http1_1")]
208     Chunk(Chunk),
209     UntilClose(UntilClose),
210 }
211 
212 struct UntilClose {
213     interceptors: Arc<Interceptors>,
214     pre: Option<Cursor<Vec<u8>>>,
215     io: Option<BoxStreamData>,
216 }
217 
218 impl UntilClose {
new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self219     pub(crate) fn new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self {
220         Self {
221             interceptors,
222             pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())),
223             io: Some(io),
224         }
225     }
226 
data( &mut self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>227     fn data(
228         &mut self,
229         cx: &mut Context<'_>,
230         buf: &mut [u8],
231     ) -> Poll<Result<usize, HttpClientError>> {
232         if buf.is_empty() {
233             return Poll::Ready(Ok(0));
234         }
235         let mut read = 0;
236         if let Some(pre) = self.pre.as_mut() {
237             // Here cursor read never failed.
238             let this_read = Read::read(pre, buf).unwrap();
239             if this_read == 0 {
240                 self.pre = None;
241             } else {
242                 read += this_read;
243             }
244         }
245 
246         if !buf[read..].is_empty() {
247             if let Some(io) = self.io.take() {
248                 return self.poll_read_io(cx, io, read, buf);
249             }
250         }
251         Poll::Ready(Ok(read))
252     }
253 
poll_read_io( &mut self, cx: &mut Context<'_>, mut io: BoxStreamData, read: usize, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>254     fn poll_read_io(
255         &mut self,
256         cx: &mut Context<'_>,
257         mut io: BoxStreamData,
258         read: usize,
259         buf: &mut [u8],
260     ) -> Poll<Result<usize, HttpClientError>> {
261         let mut read = read;
262         let mut read_buf = ReadBuf::new(&mut buf[read..]);
263         match Pin::new(&mut io).poll_read(cx, &mut read_buf) {
264             Poll::Ready(Ok(())) => {
265                 let filled = read_buf.filled().len();
266                 if filled == 0 {
267                     // Stream closed, and get the fin.
268                     if io.is_stream_closable() {
269                         return Poll::Ready(Ok(0));
270                     }
271                     // Disconnected for http1.
272                     io.shutdown();
273                 } else {
274                     self.interceptors
275                         .intercept_output(&buf[read..(read + filled)])?;
276                     self.io = Some(io);
277                 }
278                 read += filled;
279                 Poll::Ready(Ok(read))
280             }
281             Poll::Pending => {
282                 self.io = Some(io);
283                 if read != 0 {
284                     return Poll::Ready(Ok(read));
285                 }
286                 Poll::Pending
287             }
288             Poll::Ready(Err(e)) => {
289                 // If IO error occurs, shutdowns `io` before return.
290                 io.shutdown();
291                 Poll::Ready(err_from_io!(BodyTransfer, e))
292             }
293         }
294     }
295 }
296 
297 struct Text {
298     interceptors: Arc<Interceptors>,
299     decoder: TextBodyDecoder,
300     pre: Option<Cursor<Vec<u8>>>,
301     io: Option<BoxStreamData>,
302 }
303 
304 impl Text {
new( len: u64, pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>, ) -> Self305     pub(crate) fn new(
306         len: u64,
307         pre: &[u8],
308         io: BoxStreamData,
309         interceptors: Arc<Interceptors>,
310     ) -> Self {
311         Self {
312             interceptors,
313             decoder: TextBodyDecoder::new(len),
314             pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())),
315             io: Some(io),
316         }
317     }
318 }
319 
320 impl Text {
data( &mut self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>321     fn data(
322         &mut self,
323         cx: &mut Context<'_>,
324         buf: &mut [u8],
325     ) -> Poll<Result<usize, HttpClientError>> {
326         if buf.is_empty() {
327             return Poll::Ready(Ok(0));
328         }
329 
330         let mut read = 0;
331 
332         if let Some(pre) = self.pre.as_mut() {
333             // Here cursor read never failed.
334             let this_read = Read::read(pre, buf).unwrap();
335             if this_read == 0 {
336                 self.pre = None;
337             } else {
338                 read += this_read;
339                 if let Some(result) = self.read_remaining(buf, read) {
340                     return result;
341                 }
342             }
343         }
344 
345         if !buf[read..].is_empty() {
346             if let Some(io) = self.io.take() {
347                 return self.poll_read_io(cx, buf, io, read);
348             }
349         }
350         Poll::Ready(Ok(read))
351     }
352 
read_remaining( &mut self, buf: &mut [u8], read: usize, ) -> Option<Poll<Result<usize, HttpClientError>>>353     fn read_remaining(
354         &mut self,
355         buf: &mut [u8],
356         read: usize,
357     ) -> Option<Poll<Result<usize, HttpClientError>>> {
358         let (text, rem) = self.decoder.decode(&buf[..read]);
359 
360         // Contains redundant `rem`, return error.
361         match (text.is_complete(), rem.is_empty()) {
362             (true, false) => {
363                 if let Some(io) = self.io.take() {
364                     io.shutdown();
365                 };
366                 Some(Poll::Ready(err_from_msg!(BodyDecode, "Not eof")))
367             }
368             (true, true) => {
369                 if let Some(io) = self.io.take() {
370                     // stream not closed, waiting for the fin
371                     if !io.is_stream_closable() {
372                         self.io = Some(io);
373                     }
374                 }
375                 Some(Poll::Ready(Ok(read)))
376             }
377             // TextBodyDecoder decodes as much as possible here.
378             _ => None,
379         }
380     }
381 
poll_read_io( &mut self, cx: &mut Context<'_>, buf: &mut [u8], mut io: BoxStreamData, read: usize, ) -> Poll<Result<usize, HttpClientError>>382     fn poll_read_io(
383         &mut self,
384         cx: &mut Context<'_>,
385         buf: &mut [u8],
386         mut io: BoxStreamData,
387         read: usize,
388     ) -> Poll<Result<usize, HttpClientError>> {
389         let mut read = read;
390         let mut read_buf = ReadBuf::new(&mut buf[read..]);
391         match Pin::new(&mut io).poll_read(cx, &mut read_buf) {
392             // Disconnected.
393             Poll::Ready(Ok(())) => {
394                 let filled = read_buf.filled().len();
395                 if filled == 0 {
396                     // stream closed, and get the fin
397                     if io.is_stream_closable() && self.decoder.decode(&buf[..0]).0.is_complete() {
398                         return Poll::Ready(Ok(0));
399                     }
400                     io.shutdown();
401                     return Poll::Ready(err_from_msg!(BodyDecode, "Response body incomplete"));
402                 }
403                 let (text, rem) = self.decoder.decode(read_buf.filled());
404                 self.interceptors.intercept_output(read_buf.filled())?;
405                 read += filled;
406                 // Contains redundant `rem`, return error.
407                 match (text.is_complete(), rem.is_empty()) {
408                     (true, false) => {
409                         io.shutdown();
410                         Poll::Ready(err_from_msg!(BodyDecode, "Not eof"))
411                     }
412                     (true, true) => {
413                         if !io.is_stream_closable() {
414                             // stream not closed, waiting for the fin
415                             self.io = Some(io);
416                         }
417                         Poll::Ready(Ok(read))
418                     }
419                     _ => {
420                         self.io = Some(io);
421                         Poll::Ready(Ok(read))
422                     }
423                 }
424             }
425             Poll::Pending => {
426                 self.io = Some(io);
427                 if read != 0 {
428                     return Poll::Ready(Ok(read));
429                 }
430                 Poll::Pending
431             }
432             Poll::Ready(Err(e)) => {
433                 // If IO error occurs, shutdowns `io` before return.
434                 io.shutdown();
435                 Poll::Ready(err_from_io!(BodyDecode, e))
436             }
437         }
438     }
439 }
440 
441 #[cfg(feature = "http1_1")]
442 struct Chunk {
443     interceptors: Arc<Interceptors>,
444     decoder: ChunkBodyDecoder,
445     pre: Option<Cursor<Vec<u8>>>,
446     io: Option<BoxStreamData>,
447 }
448 
449 #[cfg(feature = "http1_1")]
450 impl Chunk {
new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self451     pub(crate) fn new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self {
452         Self {
453             interceptors,
454             decoder: ChunkBodyDecoder::new().contains_trailer(true),
455             pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())),
456             io: Some(io),
457         }
458     }
459 }
460 
461 #[cfg(feature = "http1_1")]
462 impl Chunk {
data( &mut self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>463     fn data(
464         &mut self,
465         cx: &mut Context<'_>,
466         buf: &mut [u8],
467     ) -> Poll<Result<usize, HttpClientError>> {
468         if buf.is_empty() {
469             return Poll::Ready(Ok(0));
470         }
471 
472         let mut read = 0;
473 
474         while let Some(pre) = self.pre.as_mut() {
475             // Here cursor read never failed.
476             let size = Read::read(pre, &mut buf[read..]).unwrap();
477             if size == 0 {
478                 self.pre = None;
479             }
480 
481             let (size, flag) = self.merge_chunks(&mut buf[read..read + size])?;
482             read += size;
483 
484             if flag {
485                 // Return if we find a 0-sized chunk.
486                 self.io = None;
487                 return Poll::Ready(Ok(read));
488             } else if read != 0 {
489                 // Return if we get some data.
490                 return Poll::Ready(Ok(read));
491             }
492         }
493 
494         // Here `read` must be 0.
495         while let Some(mut io) = self.io.take() {
496             let mut read_buf = ReadBuf::new(&mut buf[read..]);
497             match Pin::new(&mut io).poll_read(cx, &mut read_buf) {
498                 Poll::Ready(Ok(())) => {
499                     let filled = read_buf.filled().len();
500                     if filled == 0 {
501                         io.shutdown();
502                         return Poll::Ready(err_from_msg!(BodyDecode, "Response body incomplete"));
503                     }
504                     let (size, flag) = self.merge_chunks(read_buf.filled_mut())?;
505                     self.interceptors.intercept_output(read_buf.filled_mut())?;
506                     read += size;
507                     if flag {
508                         // Return if we find a 0-sized chunk.
509                         // Return if we get some data.
510                         return Poll::Ready(Ok(read));
511                     }
512                     self.io = Some(io);
513                     if read != 0 {
514                         return Poll::Ready(Ok(read));
515                     }
516                 }
517                 Poll::Pending => {
518                     self.io = Some(io);
519                     return Poll::Pending;
520                 }
521                 Poll::Ready(Err(e)) => {
522                     // If IO error occurs, shutdowns `io` before return.
523                     io.shutdown();
524                     return Poll::Ready(err_from_io!(BodyDecode, e));
525                 }
526             }
527         }
528 
529         Poll::Ready(Ok(read))
530     }
531 
merge_chunks(&mut self, buf: &mut [u8]) -> Result<(usize, bool), HttpClientError>532     fn merge_chunks(&mut self, buf: &mut [u8]) -> Result<(usize, bool), HttpClientError> {
533         // Here we need to merge the chunks into one data block and return.
534         // The data arrangement in buf is as follows:
535         //
536         // data in buf:
537         // +------+------+------+------+------+------+------+
538         // | data | len  | data | len  |  ... | data |  len |
539         // +------+------+------+------+------+------+------+
540         //
541         // We need to merge these data blocks into one block:
542         //
543         // after merge:
544         // +---------------------------+
545         // |            data           |
546         // +---------------------------+
547 
548         let (chunks, junk) = self
549             .decoder
550             .decode(buf)
551             .map_err(|e| HttpClientError::from_error(ErrorKind::BodyDecode, e))?;
552 
553         let mut finished = false;
554         let mut ptrs = Vec::new();
555 
556         for chunk in chunks.into_iter() {
557             if chunk.trailer().is_some() {
558                 if chunk.state() == &ChunkState::Finish {
559                     finished = true;
560                 }
561             } else {
562                 if chunk.size() == 0 && chunk.state() != &ChunkState::MetaSize {
563                     finished = true;
564                     break;
565                 }
566                 let data = chunk.data();
567                 ptrs.push((data.as_ptr(), data.len()))
568             }
569         }
570 
571         if finished && !junk.is_empty() {
572             return err_from_msg!(BodyDecode, "Invalid chunk body");
573         }
574 
575         let start = buf.as_ptr();
576 
577         let mut idx = 0;
578         for (ptr, len) in ptrs.into_iter() {
579             let st = ptr as usize - start as usize;
580             let ed = st + len;
581             buf.copy_within(st..ed, idx);
582             idx += len;
583         }
584         Ok((idx, finished))
585     }
586 }
587 
588 #[cfg(feature = "ylong_base")]
589 #[cfg(test)]
590 mod ut_async_http_body {
591     use std::sync::Arc;
592 
593     use ylong_http::body::async_impl;
594 
595     use crate::async_impl::HttpBody;
596     use crate::util::interceptor::IdleInterceptor;
597     use crate::util::normalizer::BodyLength;
598     use crate::ErrorKind;
599 
600     /// UT test cases for `HttpBody::trailer`.
601     ///
602     /// # Brief
603     /// 1. Creates a `HttpBody` by calling `HttpBody::new`.
604     /// 2. Calls `trailer` to get headers.
605     /// 3. Checks if the test result is correct.
606     #[test]
ut_asnyc_chunk_trailer_1()607     fn ut_asnyc_chunk_trailer_1() {
608         let handle = ylong_runtime::spawn(async move {
609             async_chunk_trailer_1().await;
610             async_chunk_trailer_2().await;
611         });
612         ylong_runtime::block_on(handle).unwrap();
613     }
614 
async_chunk_trailer_1()615     async fn async_chunk_trailer_1() {
616         let box_stream = Box::new("".as_bytes());
617         let chunk_body_bytes = "\
618             5\r\n\
619             hello\r\n\
620             C ; type = text ;end = !\r\n\
621             hello world!\r\n\
622             000; message = last\r\n\
623             accept:text/html\r\n\r\n\
624             ";
625         let mut chunk = HttpBody::new(
626             Arc::new(IdleInterceptor),
627             BodyLength::Chunk,
628             box_stream,
629             chunk_body_bytes.as_bytes(),
630         )
631         .unwrap();
632         let res = async_impl::Body::trailer(&mut chunk)
633             .await
634             .unwrap()
635             .unwrap();
636         assert_eq!(
637             res.get("accept").unwrap().to_string().unwrap(),
638             "text/html".to_string()
639         );
640         let box_stream = Box::new("".as_bytes());
641         let chunk_body_no_trailer_bytes = "\
642             5\r\n\
643             hello\r\n\
644             C ; type = text ;end = !\r\n\
645             hello world!\r\n\
646             0\r\n\r\n\
647             ";
648 
649         let mut chunk = HttpBody::new(
650             Arc::new(IdleInterceptor),
651             BodyLength::Chunk,
652             box_stream,
653             chunk_body_no_trailer_bytes.as_bytes(),
654         )
655         .unwrap();
656 
657         let mut buf = [0u8; 32];
658         // Read body part
659         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
660         assert_eq!(read, 5);
661         assert_eq!(&buf[..read], b"hello");
662         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
663         assert_eq!(read, 12);
664         assert_eq!(&buf[..read], b"hello world!");
665         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
666         assert_eq!(read, 0);
667         assert_eq!(&buf[..read], b"");
668         // try read trailer part
669         let res = async_impl::Body::trailer(&mut chunk).await.unwrap();
670         assert!(res.is_none());
671     }
672 
async_chunk_trailer_2()673     async fn async_chunk_trailer_2() {
674         let box_stream = Box::new("".as_bytes());
675         let chunk_body_bytes = "\
676             5\r\n\
677             hello\r\n\
678             C ; type = text ;end = !\r\n\
679             hello world!\r\n\
680             000; message = last\r\n\
681             Expires: Wed, 21 Oct 2015 07:27:00 GMT \r\n\r\n\
682             ";
683         let mut chunk = HttpBody::new(
684             Arc::new(IdleInterceptor),
685             BodyLength::Chunk,
686             box_stream,
687             chunk_body_bytes.as_bytes(),
688         )
689         .unwrap();
690         let res = async_impl::Body::trailer(&mut chunk)
691             .await
692             .unwrap()
693             .unwrap();
694         assert_eq!(
695             res.get("expires").unwrap().to_string().unwrap(),
696             "Wed, 21 Oct 2015 07:27:00 GMT".to_string()
697         );
698     }
699 
700     /// UT test cases for `Body::data`.
701     ///
702     /// # Brief
703     /// 1. Creates a chunk `HttpBody`.
704     /// 2. Calls `data` method get boxstream.
705     /// 3. Checks if data size is correct.
706     #[test]
ut_asnyc_http_body_chunk2()707     fn ut_asnyc_http_body_chunk2() {
708         let handle = ylong_runtime::spawn(async move {
709             http_body_chunk2().await;
710         });
711         ylong_runtime::block_on(handle).unwrap();
712     }
713 
http_body_chunk2()714     async fn http_body_chunk2() {
715         let box_stream = Box::new(
716             "\
717             5\r\n\
718             hello\r\n\
719             C ; type = text ;end = !\r\n\
720             hello world!\r\n\
721             000; message = last\r\n\
722             accept:text/html\r\n\r\n\
723         "
724             .as_bytes(),
725         );
726         let chunk_body_bytes = "";
727         let mut chunk = HttpBody::new(
728             Arc::new(IdleInterceptor),
729             BodyLength::Chunk,
730             box_stream,
731             chunk_body_bytes.as_bytes(),
732         )
733         .unwrap();
734 
735         let mut buf = [0u8; 32];
736         // Read body part
737         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
738         assert_eq!(read, 5);
739 
740         let box_stream = Box::new("".as_bytes());
741         let chunk_body_no_trailer_bytes = "\
742             5\r\n\
743             hello\r\n\
744             C ; type = text ;end = !\r\n\
745             hello world!\r\n\
746             0\r\n\r\n\
747             ";
748 
749         let mut chunk = HttpBody::new(
750             Arc::new(IdleInterceptor),
751             BodyLength::Chunk,
752             box_stream,
753             chunk_body_no_trailer_bytes.as_bytes(),
754         )
755         .unwrap();
756 
757         let mut buf = [0u8; 32];
758         // Read body part
759         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
760         assert_eq!(read, 5);
761         assert_eq!(&buf[..read], b"hello");
762         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
763         assert_eq!(read, 12);
764         assert_eq!(&buf[..read], b"hello world!");
765         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
766         assert_eq!(read, 0);
767         assert_eq!(&buf[..read], b"");
768         let res = async_impl::Body::trailer(&mut chunk).await.unwrap();
769         assert!(res.is_none());
770     }
771 
772     /// UT test cases for `Body::data`.
773     ///
774     /// # Brief
775     /// 1. Creates a empty `HttpBody`.
776     /// 2. Calls `HttpBody::new` to create empty http body.
777     /// 3. Checks if http body is empty.
778     #[test]
http_body_empty_err()779     fn http_body_empty_err() {
780         let box_stream = Box::new("".as_bytes());
781         let content_bytes = "hello";
782 
783         match HttpBody::new(
784             Arc::new(IdleInterceptor),
785             BodyLength::Empty,
786             box_stream,
787             content_bytes.as_bytes(),
788         ) {
789             Ok(_) => (),
790             Err(e) => assert_eq!(e.error_kind(), ErrorKind::Request),
791         }
792     }
793 
794     /// UT test cases for text `HttpBody::new`.
795     ///
796     /// # Brief
797     /// 1. Creates a text `HttpBody`.
798     /// 2. Calls `HttpBody::new` to create text http body.
799     /// 3. Checks if result is correct.
800     #[test]
ut_http_body_text()801     fn ut_http_body_text() {
802         let handle = ylong_runtime::spawn(async move {
803             http_body_text().await;
804         });
805         ylong_runtime::block_on(handle).unwrap();
806     }
807 
http_body_text()808     async fn http_body_text() {
809         let box_stream = Box::new("hello world".as_bytes());
810         let content_bytes = "";
811 
812         let mut text = HttpBody::new(
813             Arc::new(IdleInterceptor),
814             BodyLength::Length(11),
815             box_stream,
816             content_bytes.as_bytes(),
817         )
818         .unwrap();
819 
820         let mut buf = [0u8; 5];
821         // Read body part
822         let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap();
823         assert_eq!(read, 5);
824         let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap();
825         assert_eq!(read, 5);
826         let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap();
827         assert_eq!(read, 1);
828         let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap();
829         assert_eq!(read, 0);
830 
831         let box_stream = Box::new("".as_bytes());
832         let content_bytes = "hello";
833 
834         let mut text = HttpBody::new(
835             Arc::new(IdleInterceptor),
836             BodyLength::Length(5),
837             box_stream,
838             content_bytes.as_bytes(),
839         )
840         .unwrap();
841 
842         let mut buf = [0u8; 32];
843         // Read body part
844         let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap();
845         assert_eq!(read, 5);
846         let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap();
847         assert_eq!(read, 0);
848     }
849 
850     /// UT test cases for until_close `HttpBody::new`.
851     ///
852     /// # Brief
853     /// 1. Creates a until_close `HttpBody`.
854     /// 2. Calls `HttpBody::new` to create until_close http body.
855     /// 3. Checks if result is correct.
856     #[test]
ut_http_body_until_close()857     fn ut_http_body_until_close() {
858         let handle = ylong_runtime::spawn(async move {
859             http_body_until_close().await;
860         });
861         ylong_runtime::block_on(handle).unwrap();
862     }
863 
http_body_until_close()864     async fn http_body_until_close() {
865         let box_stream = Box::new("hello world".as_bytes());
866         let content_bytes = "";
867 
868         let mut until_close = HttpBody::new(
869             Arc::new(IdleInterceptor),
870             BodyLength::UntilClose,
871             box_stream,
872             content_bytes.as_bytes(),
873         )
874         .unwrap();
875 
876         let mut buf = [0u8; 5];
877         // Read body part
878         let read = async_impl::Body::data(&mut until_close, &mut buf)
879             .await
880             .unwrap();
881         assert_eq!(read, 5);
882         let read = async_impl::Body::data(&mut until_close, &mut buf)
883             .await
884             .unwrap();
885         assert_eq!(read, 5);
886         let read = async_impl::Body::data(&mut until_close, &mut buf)
887             .await
888             .unwrap();
889         assert_eq!(read, 1);
890 
891         let box_stream = Box::new("".as_bytes());
892         let content_bytes = "hello";
893 
894         let mut until_close = HttpBody::new(
895             Arc::new(IdleInterceptor),
896             BodyLength::UntilClose,
897             box_stream,
898             content_bytes.as_bytes(),
899         )
900         .unwrap();
901 
902         let mut buf = [0u8; 5];
903         // Read body part
904         let read = async_impl::Body::data(&mut until_close, &mut buf)
905             .await
906             .unwrap();
907         assert_eq!(read, 5);
908         let read = async_impl::Body::data(&mut until_close, &mut buf)
909             .await
910             .unwrap();
911         assert_eq!(read, 0);
912     }
913 }
914