• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use core::pin::Pin;
15 use core::task::{Context, Poll};
16 use std::future::Future;
17 use std::io::{Cursor, Read};
18 use std::sync::Arc;
19 
20 use ylong_http::body::async_impl::Body;
21 use ylong_http::body::TextBodyDecoder;
22 #[cfg(feature = "http1_1")]
23 use ylong_http::body::{ChunkBodyDecoder, ChunkState};
24 use ylong_http::headers::Headers;
25 
26 use super::conn::StreamData;
27 use crate::error::{ErrorKind, HttpClientError};
28 use crate::runtime::{AsyncRead, ReadBuf, Sleep};
29 use crate::util::interceptor::Interceptors;
30 use crate::util::normalizer::BodyLength;
31 
32 const TRAILER_SIZE: usize = 1024;
33 
34 /// `HttpBody` is the body part of the `Response` returned by `Client::request`.
35 /// `HttpBody` implements `Body` trait, so users can call related methods to get
36 /// body data.
37 ///
38 /// # Examples
39 ///
40 /// ```no_run
41 /// use ylong_http_client::async_impl::{Body, Client, HttpBody, Request};
42 /// use ylong_http_client::HttpClientError;
43 ///
44 /// async fn read_body() -> Result<(), HttpClientError> {
45 ///     let client = Client::new();
46 ///
47 ///     // `HttpBody` is the body part of `response`.
48 ///     let mut response = client
49 ///         .request(Request::builder().body(Body::empty())?)
50 ///         .await?;
51 ///
52 ///     // Users can use `Body::data` to get body data.
53 ///     let mut buf = [0u8; 1024];
54 ///     loop {
55 ///         let size = response.data(&mut buf).await.unwrap();
56 ///         if size == 0 {
57 ///             break;
58 ///         }
59 ///         let _data = &buf[..size];
60 ///         // Deals with the data.
61 ///     }
62 ///     Ok(())
63 /// }
64 /// ```
65 pub struct HttpBody {
66     kind: Kind,
67     sleep: Option<Pin<Box<Sleep>>>,
68 }
69 
70 type BoxStreamData = Box<dyn StreamData + Sync + Send + Unpin>;
71 
72 impl HttpBody {
new( interceptors: Arc<Interceptors>, body_length: BodyLength, io: BoxStreamData, pre: &[u8], ) -> Result<Self, HttpClientError>73     pub(crate) fn new(
74         interceptors: Arc<Interceptors>,
75         body_length: BodyLength,
76         io: BoxStreamData,
77         pre: &[u8],
78     ) -> Result<Self, HttpClientError> {
79         let kind = match body_length {
80             BodyLength::Empty => {
81                 if !pre.is_empty() {
82                     // TODO: Consider the case where BodyLength is empty but pre is not empty.
83                     io.shutdown();
84                     return err_from_msg!(Request, "Body length is 0 but read extra data");
85                 }
86                 Kind::Empty
87             }
88             BodyLength::Length(len) => Kind::Text(Text::new(len, pre, io, interceptors)),
89             BodyLength::UntilClose => Kind::UntilClose(UntilClose::new(pre, io, interceptors)),
90 
91             #[cfg(feature = "http1_1")]
92             BodyLength::Chunk => Kind::Chunk(Chunk::new(pre, io, interceptors)),
93         };
94         Ok(Self { kind, sleep: None })
95     }
96 
set_sleep(&mut self, sleep: Option<Pin<Box<Sleep>>>)97     pub(crate) fn set_sleep(&mut self, sleep: Option<Pin<Box<Sleep>>>) {
98         self.sleep = sleep;
99     }
100 }
101 
102 impl Body for HttpBody {
103     type Error = HttpClientError;
104 
poll_data( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, Self::Error>>105     fn poll_data(
106         mut self: Pin<&mut Self>,
107         cx: &mut Context<'_>,
108         buf: &mut [u8],
109     ) -> Poll<Result<usize, Self::Error>> {
110         if buf.is_empty() {
111             return Poll::Ready(Ok(0));
112         }
113 
114         if let Some(delay) = self.sleep.as_mut() {
115             if let Poll::Ready(()) = Pin::new(delay).poll(cx) {
116                 return Poll::Ready(err_from_io!(Timeout, std::io::ErrorKind::TimedOut.into()));
117             }
118         }
119 
120         match self.kind {
121             Kind::Empty => Poll::Ready(Ok(0)),
122             Kind::Text(ref mut text) => text.data(cx, buf),
123             Kind::UntilClose(ref mut until_close) => until_close.data(cx, buf),
124             #[cfg(feature = "http1_1")]
125             Kind::Chunk(ref mut chunk) => chunk.data(cx, buf),
126         }
127     }
128 
poll_trailer( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<Option<Headers>, Self::Error>>129     fn poll_trailer(
130         mut self: Pin<&mut Self>,
131         cx: &mut Context<'_>,
132     ) -> Poll<Result<Option<Headers>, Self::Error>> {
133         // Get trailer data from io
134         if let Some(delay) = self.sleep.as_mut() {
135             if let Poll::Ready(()) = Pin::new(delay).poll(cx) {
136                 return Poll::Ready(err_from_msg!(Timeout, "Request timeout"));
137             }
138         }
139 
140         let mut read_buf = [0_u8; TRAILER_SIZE];
141 
142         match self.kind {
143             #[cfg(feature = "http1_1")]
144             Kind::Chunk(ref mut chunk) => {
145                 match chunk.data(cx, &mut read_buf) {
146                     Poll::Ready(Ok(_)) => {}
147                     Poll::Pending => {
148                         return Poll::Pending;
149                     }
150                     Poll::Ready(Err(e)) => {
151                         return Poll::Ready(Err(e));
152                     }
153                 }
154                 Poll::Ready(Ok(chunk.decoder.get_trailer().map_err(|e| {
155                     HttpClientError::from_error(ErrorKind::BodyDecode, e)
156                 })?))
157             }
158             _ => Poll::Ready(Ok(None)),
159         }
160     }
161 }
162 
163 impl Drop for HttpBody {
drop(&mut self)164     fn drop(&mut self) {
165         let io = match self.kind {
166             Kind::Text(ref mut text) => text.io.as_mut(),
167             #[cfg(feature = "http1_1")]
168             Kind::Chunk(ref mut chunk) => chunk.io.as_mut(),
169             Kind::UntilClose(ref mut until_close) => until_close.io.as_mut(),
170             _ => None,
171         };
172         // If response body is not totally read, shutdown io.
173         if let Some(io) = io {
174             io.shutdown()
175         }
176     }
177 }
178 
179 // TODO: `TextBodyDecoder` implementation and `ChunkBodyDecoder` implementation.
180 enum Kind {
181     Empty,
182     Text(Text),
183     #[cfg(feature = "http1_1")]
184     Chunk(Chunk),
185     UntilClose(UntilClose),
186 }
187 
188 struct UntilClose {
189     interceptors: Arc<Interceptors>,
190     pre: Option<Cursor<Vec<u8>>>,
191     io: Option<BoxStreamData>,
192 }
193 
194 impl UntilClose {
new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self195     pub(crate) fn new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self {
196         Self {
197             interceptors,
198             pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())),
199             io: Some(io),
200         }
201     }
202 
data( &mut self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>203     fn data(
204         &mut self,
205         cx: &mut Context<'_>,
206         buf: &mut [u8],
207     ) -> Poll<Result<usize, HttpClientError>> {
208         if buf.is_empty() {
209             return Poll::Ready(Ok(0));
210         }
211         let mut read = 0;
212         if let Some(pre) = self.pre.as_mut() {
213             // Here cursor read never failed.
214             let this_read = Read::read(pre, buf).unwrap();
215             if this_read == 0 {
216                 self.pre = None;
217             } else {
218                 read += this_read;
219             }
220         }
221 
222         if !buf[read..].is_empty() {
223             if let Some(io) = self.io.take() {
224                 return self.poll_read_io(cx, io, read, buf);
225             }
226         }
227         Poll::Ready(Ok(read))
228     }
229 
poll_read_io( &mut self, cx: &mut Context<'_>, mut io: BoxStreamData, read: usize, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>230     fn poll_read_io(
231         &mut self,
232         cx: &mut Context<'_>,
233         mut io: BoxStreamData,
234         read: usize,
235         buf: &mut [u8],
236     ) -> Poll<Result<usize, HttpClientError>> {
237         let mut read = read;
238         let mut read_buf = ReadBuf::new(&mut buf[read..]);
239         match Pin::new(&mut io).poll_read(cx, &mut read_buf) {
240             Poll::Ready(Ok(())) => {
241                 let filled = read_buf.filled().len();
242                 if filled == 0 {
243                     // Stream closed, and get the fin.
244                     if io.is_stream_closable() {
245                         return Poll::Ready(Ok(0));
246                     }
247                     // Disconnected for http1.
248                     io.shutdown();
249                 } else {
250                     self.interceptors
251                         .intercept_output(&buf[read..(read + filled)])?;
252                     self.io = Some(io);
253                 }
254                 read += filled;
255                 Poll::Ready(Ok(read))
256             }
257             Poll::Pending => {
258                 self.io = Some(io);
259                 if read != 0 {
260                     return Poll::Ready(Ok(read));
261                 }
262                 Poll::Pending
263             }
264             Poll::Ready(Err(e)) => {
265                 // If IO error occurs, shutdowns `io` before return.
266                 io.shutdown();
267                 Poll::Ready(err_from_io!(BodyTransfer, e))
268             }
269         }
270     }
271 }
272 
273 struct Text {
274     interceptors: Arc<Interceptors>,
275     decoder: TextBodyDecoder,
276     pre: Option<Cursor<Vec<u8>>>,
277     io: Option<BoxStreamData>,
278 }
279 
280 impl Text {
new( len: u64, pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>, ) -> Self281     pub(crate) fn new(
282         len: u64,
283         pre: &[u8],
284         io: BoxStreamData,
285         interceptors: Arc<Interceptors>,
286     ) -> Self {
287         Self {
288             interceptors,
289             decoder: TextBodyDecoder::new(len),
290             pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())),
291             io: Some(io),
292         }
293     }
294 }
295 
296 impl Text {
data( &mut self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>297     fn data(
298         &mut self,
299         cx: &mut Context<'_>,
300         buf: &mut [u8],
301     ) -> Poll<Result<usize, HttpClientError>> {
302         if buf.is_empty() {
303             return Poll::Ready(Ok(0));
304         }
305 
306         let mut read = 0;
307 
308         if let Some(pre) = self.pre.as_mut() {
309             // Here cursor read never failed.
310             let this_read = Read::read(pre, buf).unwrap();
311             if this_read == 0 {
312                 self.pre = None;
313             } else {
314                 read += this_read;
315                 if let Some(result) = self.read_remaining(buf, read) {
316                     return result;
317                 }
318             }
319         }
320 
321         if !buf[read..].is_empty() {
322             if let Some(io) = self.io.take() {
323                 return self.poll_read_io(cx, buf, io, read);
324             }
325         }
326         Poll::Ready(Ok(read))
327     }
328 
read_remaining( &mut self, buf: &mut [u8], read: usize, ) -> Option<Poll<Result<usize, HttpClientError>>>329     fn read_remaining(
330         &mut self,
331         buf: &mut [u8],
332         read: usize,
333     ) -> Option<Poll<Result<usize, HttpClientError>>> {
334         let (text, rem) = self.decoder.decode(&buf[..read]);
335 
336         // Contains redundant `rem`, return error.
337         match (text.is_complete(), rem.is_empty()) {
338             (true, false) => {
339                 if let Some(io) = self.io.take() {
340                     io.shutdown();
341                 };
342                 Some(Poll::Ready(err_from_msg!(BodyDecode, "Not eof")))
343             }
344             (true, true) => {
345                 if let Some(io) = self.io.take() {
346                     // stream not closed, waiting for the fin
347                     if !io.is_stream_closable() {
348                         self.io = Some(io);
349                     }
350                 }
351                 Some(Poll::Ready(Ok(read)))
352             }
353             // TextBodyDecoder decodes as much as possible here.
354             _ => None,
355         }
356     }
357 
poll_read_io( &mut self, cx: &mut Context<'_>, buf: &mut [u8], mut io: BoxStreamData, read: usize, ) -> Poll<Result<usize, HttpClientError>>358     fn poll_read_io(
359         &mut self,
360         cx: &mut Context<'_>,
361         buf: &mut [u8],
362         mut io: BoxStreamData,
363         read: usize,
364     ) -> Poll<Result<usize, HttpClientError>> {
365         let mut read = read;
366         let mut read_buf = ReadBuf::new(&mut buf[read..]);
367         match Pin::new(&mut io).poll_read(cx, &mut read_buf) {
368             // Disconnected.
369             Poll::Ready(Ok(())) => {
370                 let filled = read_buf.filled().len();
371                 if filled == 0 {
372                     // stream closed, and get the fin
373                     if io.is_stream_closable() && self.decoder.decode(&buf[..0]).0.is_complete() {
374                         return Poll::Ready(Ok(0));
375                     }
376                     io.shutdown();
377                     return Poll::Ready(err_from_msg!(BodyDecode, "Response body incomplete"));
378                 }
379                 let (text, rem) = self.decoder.decode(read_buf.filled());
380                 self.interceptors.intercept_output(read_buf.filled())?;
381                 read += filled;
382                 // Contains redundant `rem`, return error.
383                 match (text.is_complete(), rem.is_empty()) {
384                     (true, false) => {
385                         io.shutdown();
386                         Poll::Ready(err_from_msg!(BodyDecode, "Not eof"))
387                     }
388                     (true, true) => {
389                         if !io.is_stream_closable() {
390                             // stream not closed, waiting for the fin
391                             self.io = Some(io);
392                         }
393                         Poll::Ready(Ok(read))
394                     }
395                     _ => {
396                         self.io = Some(io);
397                         Poll::Ready(Ok(read))
398                     }
399                 }
400             }
401             Poll::Pending => {
402                 self.io = Some(io);
403                 if read != 0 {
404                     return Poll::Ready(Ok(read));
405                 }
406                 Poll::Pending
407             }
408             Poll::Ready(Err(e)) => {
409                 // If IO error occurs, shutdowns `io` before return.
410                 io.shutdown();
411                 Poll::Ready(err_from_io!(BodyDecode, e))
412             }
413         }
414     }
415 }
416 
417 #[cfg(feature = "http1_1")]
418 struct Chunk {
419     interceptors: Arc<Interceptors>,
420     decoder: ChunkBodyDecoder,
421     pre: Option<Cursor<Vec<u8>>>,
422     io: Option<BoxStreamData>,
423 }
424 
425 #[cfg(feature = "http1_1")]
426 impl Chunk {
new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self427     pub(crate) fn new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self {
428         Self {
429             interceptors,
430             decoder: ChunkBodyDecoder::new().contains_trailer(true),
431             pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())),
432             io: Some(io),
433         }
434     }
435 }
436 
437 #[cfg(feature = "http1_1")]
438 impl Chunk {
data( &mut self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>439     fn data(
440         &mut self,
441         cx: &mut Context<'_>,
442         buf: &mut [u8],
443     ) -> Poll<Result<usize, HttpClientError>> {
444         if buf.is_empty() {
445             return Poll::Ready(Ok(0));
446         }
447 
448         let mut read = 0;
449 
450         while let Some(pre) = self.pre.as_mut() {
451             // Here cursor read never failed.
452             let size = Read::read(pre, &mut buf[read..]).unwrap();
453             if size == 0 {
454                 self.pre = None;
455             }
456 
457             let (size, flag) = self.merge_chunks(&mut buf[read..read + size])?;
458             read += size;
459 
460             if flag {
461                 // Return if we find a 0-sized chunk.
462                 self.io = None;
463                 return Poll::Ready(Ok(read));
464             } else if read != 0 {
465                 // Return if we get some data.
466                 return Poll::Ready(Ok(read));
467             }
468         }
469 
470         // Here `read` must be 0.
471         while let Some(mut io) = self.io.take() {
472             let mut read_buf = ReadBuf::new(&mut buf[read..]);
473             match Pin::new(&mut io).poll_read(cx, &mut read_buf) {
474                 Poll::Ready(Ok(())) => {
475                     let filled = read_buf.filled().len();
476                     if filled == 0 {
477                         io.shutdown();
478                         return Poll::Ready(err_from_msg!(BodyDecode, "Response body incomplete"));
479                     }
480                     let (size, flag) = self.merge_chunks(read_buf.filled_mut())?;
481                     self.interceptors.intercept_output(read_buf.filled_mut())?;
482                     read += size;
483                     if flag {
484                         // Return if we find a 0-sized chunk.
485                         // Return if we get some data.
486                         return Poll::Ready(Ok(read));
487                     }
488                     self.io = Some(io);
489                     if read != 0 {
490                         return Poll::Ready(Ok(read));
491                     }
492                 }
493                 Poll::Pending => {
494                     self.io = Some(io);
495                     return Poll::Pending;
496                 }
497                 Poll::Ready(Err(e)) => {
498                     // If IO error occurs, shutdowns `io` before return.
499                     io.shutdown();
500                     return Poll::Ready(err_from_io!(BodyDecode, e));
501                 }
502             }
503         }
504 
505         Poll::Ready(Ok(read))
506     }
507 
merge_chunks(&mut self, buf: &mut [u8]) -> Result<(usize, bool), HttpClientError>508     fn merge_chunks(&mut self, buf: &mut [u8]) -> Result<(usize, bool), HttpClientError> {
509         // Here we need to merge the chunks into one data block and return.
510         // The data arrangement in buf is as follows:
511         //
512         // data in buf:
513         // +------+------+------+------+------+------+------+
514         // | data | len  | data | len  |  ... | data |  len |
515         // +------+------+------+------+------+------+------+
516         //
517         // We need to merge these data blocks into one block:
518         //
519         // after merge:
520         // +---------------------------+
521         // |            data           |
522         // +---------------------------+
523 
524         let (chunks, junk) = self
525             .decoder
526             .decode(buf)
527             .map_err(|e| HttpClientError::from_error(ErrorKind::BodyDecode, e))?;
528 
529         let mut finished = false;
530         let mut ptrs = Vec::new();
531 
532         for chunk in chunks.into_iter() {
533             if chunk.trailer().is_some() {
534                 if chunk.state() == &ChunkState::Finish {
535                     finished = true;
536                 }
537             } else {
538                 if chunk.size() == 0 && chunk.state() != &ChunkState::MetaSize {
539                     finished = true;
540                     break;
541                 }
542                 let data = chunk.data();
543                 ptrs.push((data.as_ptr(), data.len()))
544             }
545         }
546 
547         if finished && !junk.is_empty() {
548             return err_from_msg!(BodyDecode, "Invalid chunk body");
549         }
550 
551         let start = buf.as_ptr();
552 
553         let mut idx = 0;
554         for (ptr, len) in ptrs.into_iter() {
555             let st = ptr as usize - start as usize;
556             let ed = st + len;
557             buf.copy_within(st..ed, idx);
558             idx += len;
559         }
560         Ok((idx, finished))
561     }
562 }
563 
564 #[cfg(feature = "ylong_base")]
565 #[cfg(test)]
566 mod ut_async_http_body {
567     use std::sync::Arc;
568 
569     use ylong_http::body::async_impl;
570 
571     use crate::async_impl::HttpBody;
572     use crate::util::interceptor::IdleInterceptor;
573     use crate::util::normalizer::BodyLength;
574     use crate::ErrorKind;
575 
576     /// UT test cases for `HttpBody::trailer`.
577     ///
578     /// # Brief
579     /// 1. Creates a `HttpBody` by calling `HttpBody::new`.
580     /// 2. Calls `trailer` to get headers.
581     /// 3. Checks if the test result is correct.
582     #[test]
ut_asnyc_chunk_trailer_1()583     fn ut_asnyc_chunk_trailer_1() {
584         let handle = ylong_runtime::spawn(async move {
585             async_chunk_trailer_1().await;
586             async_chunk_trailer_2().await;
587         });
588         ylong_runtime::block_on(handle).unwrap();
589     }
590 
async_chunk_trailer_1()591     async fn async_chunk_trailer_1() {
592         let box_stream = Box::new("".as_bytes());
593         let chunk_body_bytes = "\
594             5\r\n\
595             hello\r\n\
596             C ; type = text ;end = !\r\n\
597             hello world!\r\n\
598             000; message = last\r\n\
599             accept:text/html\r\n\r\n\
600             ";
601         let mut chunk = HttpBody::new(
602             Arc::new(IdleInterceptor),
603             BodyLength::Chunk,
604             box_stream,
605             chunk_body_bytes.as_bytes(),
606         )
607         .unwrap();
608         let res = async_impl::Body::trailer(&mut chunk)
609             .await
610             .unwrap()
611             .unwrap();
612         assert_eq!(
613             res.get("accept").unwrap().to_string().unwrap(),
614             "text/html".to_string()
615         );
616         let box_stream = Box::new("".as_bytes());
617         let chunk_body_no_trailer_bytes = "\
618             5\r\n\
619             hello\r\n\
620             C ; type = text ;end = !\r\n\
621             hello world!\r\n\
622             0\r\n\r\n\
623             ";
624 
625         let mut chunk = HttpBody::new(
626             Arc::new(IdleInterceptor),
627             BodyLength::Chunk,
628             box_stream,
629             chunk_body_no_trailer_bytes.as_bytes(),
630         )
631         .unwrap();
632 
633         let mut buf = [0u8; 32];
634         // Read body part
635         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
636         assert_eq!(read, 5);
637         assert_eq!(&buf[..read], b"hello");
638         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
639         assert_eq!(read, 12);
640         assert_eq!(&buf[..read], b"hello world!");
641         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
642         assert_eq!(read, 0);
643         assert_eq!(&buf[..read], b"");
644         // try read trailer part
645         let res = async_impl::Body::trailer(&mut chunk).await.unwrap();
646         assert!(res.is_none());
647     }
648 
async_chunk_trailer_2()649     async fn async_chunk_trailer_2() {
650         let box_stream = Box::new("".as_bytes());
651         let chunk_body_bytes = "\
652             5\r\n\
653             hello\r\n\
654             C ; type = text ;end = !\r\n\
655             hello world!\r\n\
656             000; message = last\r\n\
657             Expires: Wed, 21 Oct 2015 07:27:00 GMT \r\n\r\n\
658             ";
659         let mut chunk = HttpBody::new(
660             Arc::new(IdleInterceptor),
661             BodyLength::Chunk,
662             box_stream,
663             chunk_body_bytes.as_bytes(),
664         )
665         .unwrap();
666         let res = async_impl::Body::trailer(&mut chunk)
667             .await
668             .unwrap()
669             .unwrap();
670         assert_eq!(
671             res.get("expires").unwrap().to_string().unwrap(),
672             "Wed, 21 Oct 2015 07:27:00 GMT".to_string()
673         );
674     }
675 
676     /// UT test cases for `Body::data`.
677     ///
678     /// # Brief
679     /// 1. Creates a chunk `HttpBody`.
680     /// 2. Calls `data` method get boxstream.
681     /// 3. Checks if data size is correct.
682     #[test]
ut_asnyc_http_body_chunk2()683     fn ut_asnyc_http_body_chunk2() {
684         let handle = ylong_runtime::spawn(async move {
685             http_body_chunk2().await;
686         });
687         ylong_runtime::block_on(handle).unwrap();
688     }
689 
http_body_chunk2()690     async fn http_body_chunk2() {
691         let box_stream = Box::new(
692             "\
693             5\r\n\
694             hello\r\n\
695             C ; type = text ;end = !\r\n\
696             hello world!\r\n\
697             000; message = last\r\n\
698             accept:text/html\r\n\r\n\
699         "
700             .as_bytes(),
701         );
702         let chunk_body_bytes = "";
703         let mut chunk = HttpBody::new(
704             Arc::new(IdleInterceptor),
705             BodyLength::Chunk,
706             box_stream,
707             chunk_body_bytes.as_bytes(),
708         )
709         .unwrap();
710 
711         let mut buf = [0u8; 32];
712         // Read body part
713         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
714         assert_eq!(read, 5);
715 
716         let box_stream = Box::new("".as_bytes());
717         let chunk_body_no_trailer_bytes = "\
718             5\r\n\
719             hello\r\n\
720             C ; type = text ;end = !\r\n\
721             hello world!\r\n\
722             0\r\n\r\n\
723             ";
724 
725         let mut chunk = HttpBody::new(
726             Arc::new(IdleInterceptor),
727             BodyLength::Chunk,
728             box_stream,
729             chunk_body_no_trailer_bytes.as_bytes(),
730         )
731         .unwrap();
732 
733         let mut buf = [0u8; 32];
734         // Read body part
735         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
736         assert_eq!(read, 5);
737         assert_eq!(&buf[..read], b"hello");
738         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
739         assert_eq!(read, 12);
740         assert_eq!(&buf[..read], b"hello world!");
741         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
742         assert_eq!(read, 0);
743         assert_eq!(&buf[..read], b"");
744         let res = async_impl::Body::trailer(&mut chunk).await.unwrap();
745         assert!(res.is_none());
746     }
747 
748     /// UT test cases for `Body::data`.
749     ///
750     /// # Brief
751     /// 1. Creates a empty `HttpBody`.
752     /// 2. Calls `HttpBody::new` to create empty http body.
753     /// 3. Checks if http body is empty.
754     #[test]
http_body_empty_err()755     fn http_body_empty_err() {
756         let box_stream = Box::new("".as_bytes());
757         let content_bytes = "hello";
758 
759         match HttpBody::new(
760             Arc::new(IdleInterceptor),
761             BodyLength::Empty,
762             box_stream,
763             content_bytes.as_bytes(),
764         ) {
765             Ok(_) => (),
766             Err(e) => assert_eq!(e.error_kind(), ErrorKind::Request),
767         }
768     }
769 
770     /// UT test cases for text `HttpBody::new`.
771     ///
772     /// # Brief
773     /// 1. Creates a text `HttpBody`.
774     /// 2. Calls `HttpBody::new` to create text http body.
775     /// 3. Checks if result is correct.
776     #[test]
ut_http_body_text()777     fn ut_http_body_text() {
778         let handle = ylong_runtime::spawn(async move {
779             http_body_text().await;
780         });
781         ylong_runtime::block_on(handle).unwrap();
782     }
783 
http_body_text()784     async fn http_body_text() {
785         let box_stream = Box::new("hello world".as_bytes());
786         let content_bytes = "";
787 
788         let mut text = HttpBody::new(
789             Arc::new(IdleInterceptor),
790             BodyLength::Length(11),
791             box_stream,
792             content_bytes.as_bytes(),
793         )
794         .unwrap();
795 
796         let mut buf = [0u8; 5];
797         // Read body part
798         let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap();
799         assert_eq!(read, 5);
800         let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap();
801         assert_eq!(read, 5);
802         let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap();
803         assert_eq!(read, 1);
804         let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap();
805         assert_eq!(read, 0);
806 
807         let box_stream = Box::new("".as_bytes());
808         let content_bytes = "hello";
809 
810         let mut text = HttpBody::new(
811             Arc::new(IdleInterceptor),
812             BodyLength::Length(5),
813             box_stream,
814             content_bytes.as_bytes(),
815         )
816         .unwrap();
817 
818         let mut buf = [0u8; 32];
819         // Read body part
820         let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap();
821         assert_eq!(read, 5);
822         let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap();
823         assert_eq!(read, 0);
824     }
825 
826     /// UT test cases for until_close `HttpBody::new`.
827     ///
828     /// # Brief
829     /// 1. Creates a until_close `HttpBody`.
830     /// 2. Calls `HttpBody::new` to create until_close http body.
831     /// 3. Checks if result is correct.
832     #[test]
ut_http_body_until_close()833     fn ut_http_body_until_close() {
834         let handle = ylong_runtime::spawn(async move {
835             http_body_until_close().await;
836         });
837         ylong_runtime::block_on(handle).unwrap();
838     }
839 
http_body_until_close()840     async fn http_body_until_close() {
841         let box_stream = Box::new("hello world".as_bytes());
842         let content_bytes = "";
843 
844         let mut until_close = HttpBody::new(
845             Arc::new(IdleInterceptor),
846             BodyLength::UntilClose,
847             box_stream,
848             content_bytes.as_bytes(),
849         )
850         .unwrap();
851 
852         let mut buf = [0u8; 5];
853         // Read body part
854         let read = async_impl::Body::data(&mut until_close, &mut buf)
855             .await
856             .unwrap();
857         assert_eq!(read, 5);
858         let read = async_impl::Body::data(&mut until_close, &mut buf)
859             .await
860             .unwrap();
861         assert_eq!(read, 5);
862         let read = async_impl::Body::data(&mut until_close, &mut buf)
863             .await
864             .unwrap();
865         assert_eq!(read, 1);
866 
867         let box_stream = Box::new("".as_bytes());
868         let content_bytes = "hello";
869 
870         let mut until_close = HttpBody::new(
871             Arc::new(IdleInterceptor),
872             BodyLength::UntilClose,
873             box_stream,
874             content_bytes.as_bytes(),
875         )
876         .unwrap();
877 
878         let mut buf = [0u8; 5];
879         // Read body part
880         let read = async_impl::Body::data(&mut until_close, &mut buf)
881             .await
882             .unwrap();
883         assert_eq!(read, 5);
884         let read = async_impl::Body::data(&mut until_close, &mut buf)
885             .await
886             .unwrap();
887         assert_eq!(read, 0);
888     }
889 }
890