• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 //! defines `BodyDataRef`.
15 
16 use std::pin::Pin;
17 use std::task::{Context, Poll};
18 
19 use crate::runtime::{AsyncRead, ReadBuf};
20 use crate::util::progress::SpeedController;
21 use crate::util::request::RequestArc;
22 use crate::HttpClientError;
23 
24 pub(crate) struct BodyDataRef {
25     pub(crate) speed_controller: SpeedController,
26     body: Option<RequestArc>,
27 }
28 
29 impl BodyDataRef {
new(request: RequestArc, speed_controller: SpeedController) -> Self30     pub(crate) fn new(request: RequestArc, speed_controller: SpeedController) -> Self {
31         Self {
32             speed_controller,
33             body: Some(request),
34         }
35     }
36 
clear(&mut self)37     pub(crate) fn clear(&mut self) {
38         self.body = None;
39     }
40 
poll_read( &mut self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>41     pub(crate) fn poll_read(
42         &mut self,
43         cx: &mut Context<'_>,
44         buf: &mut [u8],
45     ) -> Poll<Result<usize, HttpClientError>> {
46         let request = if let Some(ref mut request) = self.body {
47             request
48         } else {
49             return Poll::Ready(Ok(0));
50         };
51         self.speed_controller.init_min_send_if_not_start();
52         if self
53             .speed_controller
54             .poll_max_send_delay_time(cx)
55             .is_pending()
56         {
57             return Poll::Pending;
58         }
59         self.speed_controller.init_max_send_if_not_start();
60         let data = request.ref_mut().body_mut();
61         let mut read_buf = ReadBuf::new(buf);
62         let data = Pin::new(data);
63         match data.poll_read(cx, &mut read_buf) {
64             Poll::Ready(Err(e)) => Poll::Ready(err_from_io!(BodyTransfer, e)),
65             Poll::Ready(Ok(_)) => {
66                 let filled: usize = read_buf.filled().len();
67                 // Limit the write I/O speed by limiting the read file speed.
68                 self.speed_controller.min_send_speed_limit(filled)?;
69                 self.speed_controller.delay_max_send_speed_limit(filled);
70                 Poll::Ready(Ok(filled))
71             }
72             Poll::Pending => Poll::Pending,
73         }
74     }
75 }
76