• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use core::pin::Pin;
15 use core::task::{Context, Poll};
16 use std::future::Future;
17 use std::io::{Cursor, Read};
18 
19 use ylong_http::body::TextBodyDecoder;
20 #[cfg(feature = "http1_1")]
21 use ylong_http::body::{ChunkBodyDecoder, ChunkState};
22 use ylong_http::headers::Headers;
23 #[cfg(feature = "http1_1")]
24 use ylong_http::headers::{HeaderName, HeaderValue};
25 
26 use super::{Body, StreamData};
27 use crate::error::{ErrorKind, HttpClientError};
28 use crate::util::normalizer::BodyLength;
29 use crate::{AsyncRead, ReadBuf, Sleep};
30 
31 /// `HttpBody` is the body part of the `Response` returned by `Client::request`.
32 /// `HttpBody` implements `Body` trait, so users can call related methods to get
33 /// body data.
34 ///
35 /// # Examples
36 ///
37 /// ```no_run
38 /// use ylong_http_client::async_impl::{Body, Client, HttpBody};
39 /// use ylong_http_client::{EmptyBody, Request};
40 ///
41 /// async fn read_body() {
42 ///     let client = Client::new();
43 ///
44 ///     // `HttpBody` is the body part of `response`.
45 ///     let mut response = client.request(Request::new(EmptyBody)).await.unwrap();
46 ///
47 ///     // Users can use `Body::data` to get body data.
48 ///     let mut buf = [0u8; 1024];
49 ///     loop {
50 ///         let size = response.body_mut().data(&mut buf).await.unwrap();
51 ///         if size == 0 {
52 ///             break;
53 ///         }
54 ///         let _data = &buf[..size];
55 ///         // Deals with the data.
56 ///     }
57 /// }
58 /// ```
59 pub struct HttpBody {
60     kind: Kind,
61     sleep: Option<Pin<Box<Sleep>>>,
62 }
63 
64 type BoxStreamData = Box<dyn StreamData + Sync + Send + Unpin>;
65 
66 impl HttpBody {
new( body_length: BodyLength, io: BoxStreamData, pre: &[u8], ) -> Result<Self, HttpClientError>67     pub(crate) fn new(
68         body_length: BodyLength,
69         io: BoxStreamData,
70         pre: &[u8],
71     ) -> Result<Self, HttpClientError> {
72         let kind = match body_length {
73             BodyLength::Empty => {
74                 if !pre.is_empty() {
75                     // TODO: Consider the case where BodyLength is empty but pre is not empty.
76                     io.shutdown();
77                     return Err(HttpClientError::new_with_message(
78                         ErrorKind::Request,
79                         "Body length is 0 but read extra data",
80                     ));
81                 }
82                 Kind::Empty
83             }
84             BodyLength::Length(len) => Kind::Text(Text::new(len, pre, io)),
85             BodyLength::UntilClose => Kind::UntilClose(UntilClose::new(pre, io)),
86 
87             #[cfg(feature = "http1_1")]
88             BodyLength::Chunk => Kind::Chunk(Chunk::new(pre, io)),
89         };
90         Ok(Self { kind, sleep: None })
91     }
92 
93     #[cfg(feature = "http2")]
empty() -> Self94     pub(crate) fn empty() -> Self {
95         Self {
96             kind: Kind::Empty,
97             sleep: None,
98         }
99     }
100 
101     #[cfg(feature = "http2")]
text(len: usize, pre: &[u8], io: BoxStreamData) -> Self102     pub(crate) fn text(len: usize, pre: &[u8], io: BoxStreamData) -> Self {
103         Self {
104             kind: Kind::Text(Text::new(len, pre, io)),
105             sleep: None,
106         }
107     }
108 
set_sleep(&mut self, sleep: Option<Pin<Box<Sleep>>>)109     pub(crate) fn set_sleep(&mut self, sleep: Option<Pin<Box<Sleep>>>) {
110         self.sleep = sleep;
111     }
112 }
113 
114 impl Body for HttpBody {
115     type Error = HttpClientError;
116 
poll_data( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, Self::Error>>117     fn poll_data(
118         mut self: Pin<&mut Self>,
119         cx: &mut Context<'_>,
120         buf: &mut [u8],
121     ) -> Poll<Result<usize, Self::Error>> {
122         if buf.is_empty() {
123             return Poll::Ready(Ok(0));
124         }
125 
126         if let Some(delay) = self.sleep.as_mut() {
127             if let Poll::Ready(()) = Pin::new(delay).poll(cx) {
128                 return Poll::Ready(Err(HttpClientError::new_with_message(
129                     ErrorKind::Timeout,
130                     "Request timeout",
131                 )));
132             }
133         }
134         match self.kind {
135             Kind::Empty => Poll::Ready(Ok(0)),
136             Kind::Text(ref mut text) => text.data(cx, buf),
137             Kind::UntilClose(ref mut until_close) => until_close.data(cx, buf),
138             #[cfg(feature = "http1_1")]
139             Kind::Chunk(ref mut chunk) => chunk.data(cx, buf),
140         }
141     }
142 
trailer(&mut self) -> Result<Option<Headers>, Self::Error>143     fn trailer(&mut self) -> Result<Option<Headers>, Self::Error> {
144         match self.kind {
145             #[cfg(feature = "http1_1")]
146             Kind::Chunk(ref mut chunk) => chunk.get_trailer(),
147             _ => Ok(None),
148         }
149     }
150 }
151 
152 impl Drop for HttpBody {
drop(&mut self)153     fn drop(&mut self) {
154         let io = match self.kind {
155             Kind::Text(ref mut text) => text.io.as_mut(),
156             #[cfg(feature = "http1_1")]
157             Kind::Chunk(ref mut chunk) => chunk.io.as_mut(),
158             Kind::UntilClose(ref mut until_close) => until_close.io.as_mut(),
159             _ => None,
160         };
161         // If response body is not totally read, shutdown io.
162         if let Some(io) = io {
163             io.shutdown()
164         }
165     }
166 }
167 
168 // TODO: `TextBodyDecoder` implementation and `ChunkBodyDecoder` implementation.
169 enum Kind {
170     Empty,
171     Text(Text),
172     #[cfg(feature = "http1_1")]
173     Chunk(Chunk),
174     UntilClose(UntilClose),
175 }
176 
177 struct UntilClose {
178     pre: Option<Cursor<Vec<u8>>>,
179     io: Option<BoxStreamData>,
180 }
181 
182 impl UntilClose {
new(pre: &[u8], io: BoxStreamData) -> Self183     pub(crate) fn new(pre: &[u8], io: BoxStreamData) -> Self {
184         Self {
185             pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())),
186             io: Some(io),
187         }
188     }
189 
data( &mut self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>190     fn data(
191         &mut self,
192         cx: &mut Context<'_>,
193         buf: &mut [u8],
194     ) -> Poll<Result<usize, HttpClientError>> {
195         if buf.is_empty() {
196             return Poll::Ready(Ok(0));
197         }
198 
199         let mut read = 0;
200 
201         if let Some(pre) = self.pre.as_mut() {
202             // Here cursor read never failed.
203             let this_read = Read::read(pre, buf).unwrap();
204             if this_read == 0 {
205                 self.pre = None;
206             } else {
207                 read += this_read;
208             }
209         }
210 
211         if !buf[read..].is_empty() {
212             if let Some(mut io) = self.io.take() {
213                 let mut read_buf = ReadBuf::new(&mut buf[read..]);
214                 match Pin::new(&mut io).poll_read(cx, &mut read_buf) {
215                     // Disconnected.
216                     Poll::Ready(Ok(())) => {
217                         let filled = read_buf.filled().len();
218                         if filled == 0 {
219                             io.shutdown();
220                         } else {
221                             self.io = Some(io);
222                         }
223                         read += filled;
224                         return Poll::Ready(Ok(read));
225                     }
226                     Poll::Pending => {
227                         self.io = Some(io);
228                         if read != 0 {
229                             return Poll::Ready(Ok(read));
230                         }
231                         return Poll::Pending;
232                     }
233                     Poll::Ready(Err(e)) => {
234                         // If IO error occurs, shutdowns `io` before return.
235                         io.shutdown();
236                         return Poll::Ready(Err(HttpClientError::new_with_cause(
237                             ErrorKind::BodyTransfer,
238                             Some(e),
239                         )));
240                     }
241                 }
242             }
243         }
244         Poll::Ready(Ok(read))
245     }
246 }
247 
248 struct Text {
249     decoder: TextBodyDecoder,
250     pre: Option<Cursor<Vec<u8>>>,
251     io: Option<BoxStreamData>,
252 }
253 
254 impl Text {
new(len: usize, pre: &[u8], io: BoxStreamData) -> Self255     pub(crate) fn new(len: usize, pre: &[u8], io: BoxStreamData) -> Self {
256         Self {
257             decoder: TextBodyDecoder::new(len),
258             pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())),
259             io: Some(io),
260         }
261     }
262 }
263 
264 impl Text {
data( &mut self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>265     fn data(
266         &mut self,
267         cx: &mut Context<'_>,
268         buf: &mut [u8],
269     ) -> Poll<Result<usize, HttpClientError>> {
270         if buf.is_empty() {
271             return Poll::Ready(Ok(0));
272         }
273 
274         let mut read = 0;
275 
276         if let Some(pre) = self.pre.as_mut() {
277             // Here cursor read never failed.
278             let this_read = Read::read(pre, buf).unwrap();
279             if this_read == 0 {
280                 self.pre = None;
281             } else {
282                 read += this_read;
283                 let (text, rem) = self.decoder.decode(&buf[..read]);
284 
285                 // Contains redundant `rem`, return error.
286                 match (text.is_complete(), rem.is_empty()) {
287                     (true, false) => {
288                         if let Some(io) = self.io.take() {
289                             io.shutdown();
290                         };
291                         return Poll::Ready(Err(HttpClientError::new_with_message(
292                             ErrorKind::BodyDecode,
293                             "Not Eof",
294                         )));
295                     }
296                     (true, true) => {
297                         self.io = None;
298                         return Poll::Ready(Ok(read));
299                     }
300                     // TextBodyDecoder decodes as much as possible here.
301                     _ => {}
302                 }
303             }
304         }
305 
306         if !buf[read..].is_empty() {
307             if let Some(mut io) = self.io.take() {
308                 let mut read_buf = ReadBuf::new(&mut buf[read..]);
309                 match Pin::new(&mut io).poll_read(cx, &mut read_buf) {
310                     // Disconnected.
311                     Poll::Ready(Ok(())) => {
312                         let filled = read_buf.filled().len();
313                         if filled == 0 {
314                             io.shutdown();
315                             return Poll::Ready(Err(HttpClientError::new_with_message(
316                                 ErrorKind::BodyDecode,
317                                 "Response Body Incomplete",
318                             )));
319                         }
320                         let (text, rem) = self.decoder.decode(read_buf.filled());
321                         read += filled;
322                         // Contains redundant `rem`, return error.
323                         match (text.is_complete(), rem.is_empty()) {
324                             (true, false) => {
325                                 io.shutdown();
326                                 return Poll::Ready(Err(HttpClientError::new_with_message(
327                                     ErrorKind::BodyDecode,
328                                     "Not Eof",
329                                 )));
330                             }
331                             (true, true) => return Poll::Ready(Ok(read)),
332                             _ => {}
333                         }
334                         self.io = Some(io);
335                     }
336                     Poll::Pending => {
337                         self.io = Some(io);
338                         if read != 0 {
339                             return Poll::Ready(Ok(read));
340                         }
341                         return Poll::Pending;
342                     }
343                     Poll::Ready(Err(e)) => {
344                         // If IO error occurs, shutdowns `io` before return.
345                         io.shutdown();
346                         return Poll::Ready(Err(HttpClientError::new_with_cause(
347                             ErrorKind::BodyTransfer,
348                             Some(e),
349                         )));
350                     }
351                 }
352             }
353         }
354         Poll::Ready(Ok(read))
355     }
356 }
357 
358 #[cfg(feature = "http1_1")]
359 struct Chunk {
360     decoder: ChunkBodyDecoder,
361     pre: Option<Cursor<Vec<u8>>>,
362     io: Option<BoxStreamData>,
363     trailer: Vec<u8>,
364 }
365 
366 #[cfg(feature = "http1_1")]
367 impl Chunk {
new(pre: &[u8], io: BoxStreamData) -> Self368     pub(crate) fn new(pre: &[u8], io: BoxStreamData) -> Self {
369         Self {
370             decoder: ChunkBodyDecoder::new().contains_trailer(false),
371             pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())),
372             io: Some(io),
373             trailer: vec![],
374         }
375     }
376 }
377 
378 #[cfg(feature = "http1_1")]
379 impl Chunk {
data( &mut self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>380     fn data(
381         &mut self,
382         cx: &mut Context<'_>,
383         buf: &mut [u8],
384     ) -> Poll<Result<usize, HttpClientError>> {
385         if buf.is_empty() {
386             return Poll::Ready(Ok(0));
387         }
388 
389         let mut read = 0;
390 
391         while let Some(pre) = self.pre.as_mut() {
392             // Here cursor read never failed.
393             let size = Read::read(pre, &mut buf[read..]).unwrap();
394             if size == 0 {
395                 self.pre = None;
396             }
397 
398             let (size, flag) = self.merge_chunks(&mut buf[read..read + size])?;
399             read += size;
400 
401             if flag {
402                 // Return if we find a 0-sized chunk.
403                 self.io = None;
404                 return Poll::Ready(Ok(read));
405             } else if read != 0 {
406                 // Return if we get some data.
407                 return Poll::Ready(Ok(read));
408             }
409         }
410 
411         // Here `read` must be 0.
412         while let Some(mut io) = self.io.take() {
413             let mut read_buf = ReadBuf::new(&mut buf[read..]);
414             match Pin::new(&mut io).poll_read(cx, &mut read_buf) {
415                 Poll::Ready(Ok(())) => {
416                     let filled = read_buf.filled().len();
417                     if filled == 0 {
418                         io.shutdown();
419                         return Poll::Ready(Err(HttpClientError::new_with_message(
420                             ErrorKind::BodyTransfer,
421                             "Response Body Incomplete",
422                         )));
423                     }
424                     let (size, flag) = self.merge_chunks(read_buf.filled_mut())?;
425                     read += size;
426                     if flag {
427                         // Return if we find a 0-sized chunk.
428                         // Return if we get some data.
429                         return Poll::Ready(Ok(read));
430                     }
431                     self.io = Some(io);
432                     if read != 0 {
433                         return Poll::Ready(Ok(read));
434                     }
435                 }
436                 Poll::Pending => {
437                     self.io = Some(io);
438                     return Poll::Pending;
439                 }
440                 Poll::Ready(Err(e)) => {
441                     // If IO error occurs, shutdowns `io` before return.
442                     io.shutdown();
443                     return Poll::Ready(Err(HttpClientError::new_with_cause(
444                         ErrorKind::BodyTransfer,
445                         Some(e),
446                     )));
447                 }
448             }
449         }
450 
451         Poll::Ready(Ok(read))
452     }
453 
merge_chunks(&mut self, buf: &mut [u8]) -> Result<(usize, bool), HttpClientError>454     fn merge_chunks(&mut self, buf: &mut [u8]) -> Result<(usize, bool), HttpClientError> {
455         // Here we need to merge the chunks into one data block and return.
456         // The data arrangement in buf is as follows:
457         //
458         // data in buf:
459         // +------+------+------+------+------+------+------+
460         // | data | len  | data | len  |  ... | data |  len |
461         // +------+------+------+------+------+------+------+
462         //
463         // We need to merge these data blocks into one block:
464         //
465         // after merge:
466         // +---------------------------+
467         // |            data           |
468         // +---------------------------+
469 
470         let (chunks, junk) = self
471             .decoder
472             .decode(buf)
473             .map_err(|e| HttpClientError::new_with_cause(ErrorKind::BodyDecode, Some(e)))?;
474 
475         let mut finished = false;
476         let mut ptrs = Vec::new();
477         for chunk in chunks.into_iter() {
478             if chunk.trailer().is_some() {
479                 if chunk.state() == &ChunkState::Finish {
480                     finished = true;
481                     self.trailer.extend_from_slice(chunk.trailer().unwrap());
482                     self.trailer.extend_from_slice(b"\r\n");
483                     break;
484                 } else if chunk.state() == &ChunkState::DataCrlf {
485                     self.trailer.extend_from_slice(chunk.trailer().unwrap());
486                     self.trailer.extend_from_slice(b"\r\n");
487                 } else {
488                     self.trailer.extend_from_slice(chunk.trailer().unwrap());
489                 }
490             } else {
491                 if chunk.size() == 0 && chunk.state() != &ChunkState::MetaSize {
492                     finished = true;
493                     break;
494                 }
495                 let data = chunk.data();
496                 ptrs.push((data.as_ptr(), data.len()))
497             }
498         }
499 
500         if finished && !junk.is_empty() {
501             return Err(HttpClientError::new_with_message(
502                 ErrorKind::BodyDecode,
503                 "Invalid Chunk Body",
504             ));
505         }
506 
507         let start = buf.as_ptr();
508 
509         let mut idx = 0;
510         for (ptr, len) in ptrs.into_iter() {
511             let st = ptr as usize - start as usize;
512             let ed = st + len;
513             buf.copy_within(st..ed, idx);
514             idx += len;
515         }
516         Ok((idx, finished))
517     }
518 
get_trailer(&self) -> Result<Option<Headers>, HttpClientError>519     fn get_trailer(&self) -> Result<Option<Headers>, HttpClientError> {
520         if self.trailer.is_empty() {
521             return Err(HttpClientError::new_with_message(
522                 ErrorKind::BodyDecode,
523                 "No trailer received",
524             ));
525         }
526 
527         let mut colon = 0;
528         let mut lf = 0;
529         let mut trailer_header_name = HeaderName::from_bytes(b"")
530             .map_err(|e| HttpClientError::new_with_cause(ErrorKind::BodyDecode, Some(e)))?;
531         let mut trailer_headers = Headers::new();
532         for (i, b) in self.trailer.iter().enumerate() {
533             if *b == b' ' {
534                 continue;
535             }
536             if *b == b':' {
537                 colon = i;
538                 if lf == 0 {
539                     let trailer_name = &self.trailer[..colon];
540                     trailer_header_name = HeaderName::from_bytes(trailer_name).map_err(|e| {
541                         HttpClientError::new_with_cause(ErrorKind::BodyDecode, Some(e))
542                     })?;
543                 } else {
544                     let trailer_name = &self.trailer[lf + 1..colon];
545                     trailer_header_name = HeaderName::from_bytes(trailer_name).map_err(|e| {
546                         HttpClientError::new_with_cause(ErrorKind::BodyDecode, Some(e))
547                     })?;
548                 }
549                 continue;
550             }
551 
552             if *b == b'\n' {
553                 lf = i;
554                 let trailer_value = &self.trailer[colon + 1..lf - 1];
555                 let trailer_header_value = HeaderValue::from_bytes(trailer_value)
556                     .map_err(|e| HttpClientError::new_with_cause(ErrorKind::BodyDecode, Some(e)))?;
557                 let _ = trailer_headers
558                     .insert::<HeaderName, HeaderValue>(
559                         trailer_header_name.clone(),
560                         trailer_header_value.clone(),
561                     )
562                     .map_err(|e| HttpClientError::new_with_cause(ErrorKind::BodyDecode, Some(e)))?;
563             }
564         }
565         Ok(Some(trailer_headers))
566     }
567 }
568 
569 #[cfg(test)]
570 mod ut_async_http_body {
571     use ylong_http::body::ChunkBodyDecoder;
572 
573     use crate::async_impl::http_body::Chunk;
574 
575     /// UT test cases for `Chunk::get_trailers`.
576     ///
577     /// # Brief
578     /// 1. Creates a `Chunk` and set `Trailer`.
579     /// 2. Calls `get_trailer` method.
580     /// 3. Checks if the result is correct.
581     #[test]
ut_http_body_chunk()582     fn ut_http_body_chunk() {
583         let mut chunk = Chunk {
584             decoder: ChunkBodyDecoder::new().contains_trailer(true),
585             pre: None,
586             io: None,
587             trailer: vec![],
588         };
589         let trailer_info = "Trailer1:value1\r\nTrailer2:value2\r\n";
590         chunk.trailer.extend_from_slice(trailer_info.as_bytes());
591         let data = chunk.get_trailer().unwrap().unwrap();
592         let value1 = data.get("Trailer1");
593         assert_eq!(value1.unwrap().to_str().unwrap(), "value1");
594         let value2 = data.get("Trailer2");
595         assert_eq!(value2.unwrap().to_str().unwrap(), "value2");
596     }
597 }
598