1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 //! defines `BodyDataRef`. 15 16 use std::pin::Pin; 17 use std::task::{Context, Poll}; 18 19 use crate::runtime::{AsyncRead, ReadBuf}; 20 use crate::util::progress::SpeedController; 21 use crate::util::request::RequestArc; 22 use crate::HttpClientError; 23 24 pub(crate) struct BodyDataRef { 25 pub(crate) speed_controller: SpeedController, 26 body: Option<RequestArc>, 27 } 28 29 impl BodyDataRef { new(request: RequestArc, speed_controller: SpeedController) -> Self30 pub(crate) fn new(request: RequestArc, speed_controller: SpeedController) -> Self { 31 Self { 32 speed_controller, 33 body: Some(request), 34 } 35 } 36 clear(&mut self)37 pub(crate) fn clear(&mut self) { 38 self.body = None; 39 } 40 poll_read( &mut self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>41 pub(crate) fn poll_read( 42 &mut self, 43 cx: &mut Context<'_>, 44 buf: &mut [u8], 45 ) -> Poll<Result<usize, HttpClientError>> { 46 let request = if let Some(ref mut request) = self.body { 47 request 48 } else { 49 return Poll::Ready(Ok(0)); 50 }; 51 self.speed_controller.init_min_send_if_not_start(); 52 if self 53 .speed_controller 54 .poll_max_send_delay_time(cx) 55 .is_pending() 56 { 57 return Poll::Pending; 58 } 59 self.speed_controller.init_max_send_if_not_start(); 60 let data = request.ref_mut().body_mut(); 61 let mut read_buf = ReadBuf::new(buf); 62 let data = Pin::new(data); 63 match data.poll_read(cx, &mut read_buf) { 64 Poll::Ready(Err(e)) => Poll::Ready(err_from_io!(BodyTransfer, e)), 65 Poll::Ready(Ok(_)) => { 66 let filled: usize = read_buf.filled().len(); 67 // Limit the write I/O speed by limiting the read file speed. 68 self.speed_controller.min_send_speed_limit(filled)?; 69 self.speed_controller.delay_max_send_speed_limit(filled); 70 Poll::Ready(Ok(filled)) 71 } 72 Poll::Pending => Poll::Pending, 73 } 74 } 75 } 76