1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 mod builder; 15 mod operator; 16 17 use std::pin::Pin; 18 use std::task::{Context, Poll}; 19 20 pub use builder::{UploaderBuilder, WantsReader}; 21 pub use operator::{Console, UploadOperator}; 22 use ylong_http::body::async_impl::Body; 23 24 use crate::{AsyncRead, ErrorKind, HttpClientError, ReadBuf}; 25 26 /// An uploader that can help you upload the request body. 27 /// 28 /// An `Uploader` provides a template method for uploading a file or a slice and 29 /// needs to use a structure that implements [`UploadOperator`] trait to read 30 /// the file or the slice and convert it into request body. 31 /// 32 /// The `UploadOperator` trait provides a [`progress`] method which is 33 /// responsible for progress display. 34 /// 35 /// You only need to provide a structure that implements the `UploadOperator` 36 /// trait to complete the upload process. 37 /// 38 /// A default structure `Console` which implements `UploadOperator` is 39 /// provided to show download message on console. You can use 40 /// `Uploader::console` to build a `Uploader` which based on it. 41 /// 42 /// [`UploadOperator`]: UploadOperator 43 /// [`progress`]: UploadOperator::progress 44 /// 45 /// # Examples 46 /// 47 /// `Console`: 48 /// ```no_run 49 /// # use ylong_http_client::async_impl::Uploader; 50 /// # use ylong_http_client::Response; 51 /// 52 /// // Creates a default `Uploader` that show progress on console. 53 /// let mut uploader = Uploader::console("HelloWorld".as_bytes()); 54 /// ``` 55 /// 56 /// `Custom`: 57 /// ```no_run 58 /// # use std::pin::Pin; 59 /// # use std::task::{Context, Poll}; 60 /// # use ylong_http_client::async_impl::{Uploader, UploadOperator}; 61 /// # use ylong_http_client::{Response, SpeedLimit, Timeout}; 62 /// # use ylong_http_client::HttpClientError; 63 /// 64 /// # async fn upload_and_show_progress() { 65 /// // Customizes your own `UploadOperator`. 66 /// struct MyUploadOperator; 67 /// 68 /// impl UploadOperator for MyUploadOperator { 69 /// fn poll_progress( 70 /// self: Pin<&mut Self>, 71 /// cx: &mut Context<'_>, 72 /// uploaded: u64, 73 /// total: Option<u64>, 74 /// ) -> Poll<Result<(), HttpClientError>> { 75 /// todo!() 76 /// } 77 /// } 78 /// 79 /// // Creates a default `Uploader` based on `MyUploadOperator`. 80 /// // Configures your uploader by using `UploaderBuilder`. 81 /// let uploader = Uploader::builder() 82 /// .reader("HelloWorld".as_bytes()) 83 /// .operator(MyUploadOperator) 84 /// .build(); 85 /// # } 86 /// ``` 87 pub struct Uploader<R, T> { 88 reader: R, 89 operator: T, 90 config: UploadConfig, 91 info: Option<UploadInfo>, 92 } 93 94 impl<R: AsyncRead + Unpin> Uploader<R, Console> { 95 /// Creates an `Uploader` with a `Console` operator which displays process 96 /// on console. 97 /// 98 /// # Examples 99 /// 100 /// ``` 101 /// # use ylong_http_client::async_impl::Uploader; 102 /// 103 /// let uploader = Uploader::console("HelloWorld".as_bytes()); 104 /// ``` console(reader: R) -> Uploader<R, Console>105 pub fn console(reader: R) -> Uploader<R, Console> { 106 UploaderBuilder::new().reader(reader).console().build() 107 } 108 } 109 110 impl Uploader<(), ()> { 111 /// Creates an `UploaderBuilder` and configures uploader step by step. 112 /// 113 /// # Examples 114 /// 115 /// ``` 116 /// # use ylong_http_client::async_impl::Uploader; 117 /// 118 /// let builder = Uploader::builder(); 119 /// ``` builder() -> UploaderBuilder<WantsReader>120 pub fn builder() -> UploaderBuilder<WantsReader> { 121 UploaderBuilder::new() 122 } 123 } 124 125 impl<R, T> Body for Uploader<R, T> 126 where 127 R: AsyncRead + Unpin, 128 T: UploadOperator + Unpin, 129 { 130 type Error = HttpClientError; 131 poll_data( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, Self::Error>>132 fn poll_data( 133 self: Pin<&mut Self>, 134 cx: &mut Context<'_>, 135 buf: &mut [u8], 136 ) -> Poll<Result<usize, Self::Error>> { 137 let this = self.get_mut(); 138 139 if this.info.is_none() { 140 this.info = Some(UploadInfo::new()); 141 } 142 143 let info = this.info.as_mut().unwrap(); 144 145 match Pin::new(&mut this.operator).poll_progress( 146 cx, 147 info.uploaded_bytes, 148 this.config.total_bytes, 149 ) { 150 Poll::Ready(Ok(())) => {} 151 Poll::Ready(Err(e)) if e.error_kind() == ErrorKind::UserAborted => { 152 return Poll::Ready(Ok(0)); 153 } 154 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), 155 Poll::Pending => return Poll::Pending, 156 } 157 158 let mut read_buf = ReadBuf::new(buf); 159 let filled = read_buf.filled().len(); 160 match Pin::new(&mut this.reader).poll_read(cx, &mut read_buf) { 161 Poll::Ready(Ok(_)) => {} 162 Poll::Ready(Err(e)) => { 163 return Poll::Ready(Err(HttpClientError::new_with_cause( 164 ErrorKind::BodyTransfer, 165 Some(e), 166 ))) 167 } 168 Poll::Pending => return Poll::Pending, 169 } 170 171 let new_filled = read_buf.filled().len(); 172 let read_bytes = new_filled - filled; 173 info.uploaded_bytes += read_bytes as u64; 174 Poll::Ready(Ok(read_bytes)) 175 } 176 } 177 178 impl<R, T> AsRef<R> for Uploader<R, T> { as_ref(&self) -> &R179 fn as_ref(&self) -> &R { 180 &self.reader 181 } 182 } 183 184 #[derive(Default)] 185 struct UploadConfig { 186 total_bytes: Option<u64>, 187 } 188 189 struct UploadInfo { 190 uploaded_bytes: u64, 191 } 192 193 impl UploadInfo { new() -> Self194 fn new() -> Self { 195 Self { uploaded_bytes: 0 } 196 } 197 } 198 199 #[cfg(all(test, feature = "ylong_base"))] 200 mod ut_uploader { 201 use ylong_http::body::async_impl::Body; 202 use ylong_http::body::{MultiPart, Part}; 203 204 use crate::async_impl::uploader::{Context, Pin, Poll}; 205 use crate::async_impl::{UploadOperator, Uploader, UploaderBuilder}; 206 use crate::{ErrorKind, HttpClientError}; 207 208 /// UT test cases for `UploadOperator::data`. 209 /// 210 /// # Brief 211 /// 1. Creates a `Uploader`. 212 /// 2. Calls `data` method. 213 /// 3. Checks if the result is correct. 214 215 #[test] ut_upload()216 fn ut_upload() { 217 let handle = ylong_runtime::spawn(async move { 218 upload().await; 219 }); 220 ylong_runtime::block_on(handle).unwrap(); 221 } 222 upload()223 async fn upload() { 224 let mut uploader = Uploader::console("HelloWorld".as_bytes()); 225 let mut user_slice = [0_u8; 10]; 226 let mut output_vec = vec![]; 227 228 let mut size = user_slice.len(); 229 while size == user_slice.len() { 230 size = uploader.data(user_slice.as_mut_slice()).await.unwrap(); 231 output_vec.extend_from_slice(&user_slice[..size]); 232 } 233 assert_eq!(&output_vec, b"HelloWorld"); 234 235 let mut user_slice = [0_u8; 12]; 236 let multipart = MultiPart::new().part(Part::new().name("name").body("xiaoming")); 237 let mut multi_uploader = UploaderBuilder::default() 238 .multipart(multipart) 239 .console() 240 .build(); 241 let size = multi_uploader 242 .data(user_slice.as_mut_slice()) 243 .await 244 .unwrap(); 245 assert_eq!(size, 12); 246 } 247 248 /// UT test cases for `UploadOperator::progress`. 249 /// 250 /// # Brief 251 /// 1. Creates a `MyUploadOperator`. 252 /// 2. Calls `progress` method. 253 /// 3. Checks if the result is correct. 254 #[test] ut_upload_op_cov()255 fn ut_upload_op_cov() { 256 let handle = ylong_runtime::spawn(async move { 257 upload_op_cov().await; 258 }); 259 ylong_runtime::block_on(handle).unwrap(); 260 } 261 upload_op_cov()262 async fn upload_op_cov() { 263 struct MyUploadOperator; 264 impl UploadOperator for MyUploadOperator { 265 fn poll_progress( 266 self: Pin<&mut Self>, 267 _cx: &mut Context<'_>, 268 uploaded: u64, 269 total: Option<u64>, 270 ) -> Poll<Result<(), HttpClientError>> { 271 if uploaded > total.unwrap() { 272 return Poll::Ready(Err(HttpClientError::new_with_message( 273 ErrorKind::BodyTransfer, 274 "UploadOperator failed", 275 ))); 276 } 277 Poll::Ready(Ok(())) 278 } 279 } 280 let res = MyUploadOperator.progress(10, Some(20)).await; 281 assert!(res.is_ok()); 282 } 283 } 284