• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 mod builder;
15 mod operator;
16 
17 use std::pin::Pin;
18 use std::task::{Context, Poll};
19 
20 pub use builder::{UploaderBuilder, WantsReader};
21 pub use operator::{Console, UploadOperator};
22 use ylong_http::body::async_impl::Body;
23 
24 use crate::{AsyncRead, ErrorKind, HttpClientError, ReadBuf};
25 
26 /// An uploader that can help you upload the request body.
27 ///
28 /// An `Uploader` provides a template method for uploading a file or a slice and
29 /// needs to use a structure that implements [`UploadOperator`] trait to read
30 /// the file or the slice and convert it into request body.
31 ///
32 /// The `UploadOperator` trait provides a [`progress`] method which is
33 /// responsible for progress display.
34 ///
35 /// You only need to provide a structure that implements the `UploadOperator`
36 /// trait to complete the upload process.
37 ///
38 /// A default structure `Console` which implements `UploadOperator` is
39 /// provided to show download message on console. You can use
40 /// `Uploader::console` to build a `Uploader` which based on it.
41 ///
42 /// [`UploadOperator`]: UploadOperator
43 /// [`progress`]: UploadOperator::progress
44 ///
45 /// # Examples
46 ///
47 /// `Console`:
48 /// ```no_run
49 /// # use ylong_http_client::async_impl::Uploader;
50 /// # use ylong_http_client::Response;
51 ///
52 /// // Creates a default `Uploader` that show progress on console.
53 /// let mut uploader = Uploader::console("HelloWorld".as_bytes());
54 /// ```
55 ///
56 /// `Custom`:
57 /// ```no_run
58 /// # use std::pin::Pin;
59 /// # use std::task::{Context, Poll};
60 /// # use ylong_http_client::async_impl::{Uploader, UploadOperator};
61 /// # use ylong_http_client::{Response, SpeedLimit, Timeout};
62 /// # use ylong_http_client::HttpClientError;
63 ///
64 /// # async fn upload_and_show_progress() {
65 /// // Customizes your own `UploadOperator`.
66 /// struct MyUploadOperator;
67 ///
68 /// impl UploadOperator for MyUploadOperator {
69 ///     fn poll_progress(
70 ///         self: Pin<&mut Self>,
71 ///         cx: &mut Context<'_>,
72 ///         uploaded: u64,
73 ///         total: Option<u64>,
74 ///     ) -> Poll<Result<(), HttpClientError>> {
75 ///         todo!()
76 ///     }
77 /// }
78 ///
79 /// // Creates a default `Uploader` based on `MyUploadOperator`.
80 /// // Configures your uploader by using `UploaderBuilder`.
81 /// let uploader = Uploader::builder()
82 ///     .reader("HelloWorld".as_bytes())
83 ///     .operator(MyUploadOperator)
84 ///     .build();
85 /// # }
86 /// ```
87 pub struct Uploader<R, T> {
88     reader: R,
89     operator: T,
90     config: UploadConfig,
91     info: Option<UploadInfo>,
92 }
93 
94 impl<R: AsyncRead + Unpin> Uploader<R, Console> {
95     /// Creates an `Uploader` with a `Console` operator which displays process
96     /// on console.
97     ///
98     /// # Examples
99     ///
100     /// ```
101     /// # use ylong_http_client::async_impl::Uploader;
102     ///
103     /// let uploader = Uploader::console("HelloWorld".as_bytes());
104     /// ```
console(reader: R) -> Uploader<R, Console>105     pub fn console(reader: R) -> Uploader<R, Console> {
106         UploaderBuilder::new().reader(reader).console().build()
107     }
108 }
109 
110 impl Uploader<(), ()> {
111     /// Creates an `UploaderBuilder` and configures uploader step by step.
112     ///
113     /// # Examples
114     ///
115     /// ```
116     /// # use ylong_http_client::async_impl::Uploader;
117     ///
118     /// let builder = Uploader::builder();
119     /// ```
builder() -> UploaderBuilder<WantsReader>120     pub fn builder() -> UploaderBuilder<WantsReader> {
121         UploaderBuilder::new()
122     }
123 }
124 
125 impl<R, T> Body for Uploader<R, T>
126 where
127     R: AsyncRead + Unpin,
128     T: UploadOperator + Unpin,
129 {
130     type Error = HttpClientError;
131 
poll_data( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, Self::Error>>132     fn poll_data(
133         self: Pin<&mut Self>,
134         cx: &mut Context<'_>,
135         buf: &mut [u8],
136     ) -> Poll<Result<usize, Self::Error>> {
137         let this = self.get_mut();
138 
139         if this.info.is_none() {
140             this.info = Some(UploadInfo::new());
141         }
142 
143         let info = this.info.as_mut().unwrap();
144 
145         match Pin::new(&mut this.operator).poll_progress(
146             cx,
147             info.uploaded_bytes,
148             this.config.total_bytes,
149         ) {
150             Poll::Ready(Ok(())) => {}
151             Poll::Ready(Err(e)) if e.error_kind() == ErrorKind::UserAborted => {
152                 return Poll::Ready(Ok(0));
153             }
154             Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
155             Poll::Pending => return Poll::Pending,
156         }
157 
158         let mut read_buf = ReadBuf::new(buf);
159         let filled = read_buf.filled().len();
160         match Pin::new(&mut this.reader).poll_read(cx, &mut read_buf) {
161             Poll::Ready(Ok(_)) => {}
162             Poll::Ready(Err(e)) => {
163                 return Poll::Ready(Err(HttpClientError::new_with_cause(
164                     ErrorKind::BodyTransfer,
165                     Some(e),
166                 )))
167             }
168             Poll::Pending => return Poll::Pending,
169         }
170 
171         let new_filled = read_buf.filled().len();
172         let read_bytes = new_filled - filled;
173         info.uploaded_bytes += read_bytes as u64;
174         Poll::Ready(Ok(read_bytes))
175     }
176 }
177 
178 impl<R, T> AsRef<R> for Uploader<R, T> {
as_ref(&self) -> &R179     fn as_ref(&self) -> &R {
180         &self.reader
181     }
182 }
183 
184 #[derive(Default)]
185 struct UploadConfig {
186     total_bytes: Option<u64>,
187 }
188 
189 struct UploadInfo {
190     uploaded_bytes: u64,
191 }
192 
193 impl UploadInfo {
new() -> Self194     fn new() -> Self {
195         Self { uploaded_bytes: 0 }
196     }
197 }
198 
199 #[cfg(all(test, feature = "ylong_base"))]
200 mod ut_uploader {
201     use ylong_http::body::async_impl::Body;
202     use ylong_http::body::{MultiPart, Part};
203 
204     use crate::async_impl::uploader::{Context, Pin, Poll};
205     use crate::async_impl::{UploadOperator, Uploader, UploaderBuilder};
206     use crate::{ErrorKind, HttpClientError};
207 
208     /// UT test cases for `UploadOperator::data`.
209     ///
210     /// # Brief
211     /// 1. Creates a `Uploader`.
212     /// 2. Calls `data` method.
213     /// 3. Checks if the result is correct.
214 
215     #[test]
ut_upload()216     fn ut_upload() {
217         let handle = ylong_runtime::spawn(async move {
218             upload().await;
219         });
220         ylong_runtime::block_on(handle).unwrap();
221     }
222 
upload()223     async fn upload() {
224         let mut uploader = Uploader::console("HelloWorld".as_bytes());
225         let mut user_slice = [0_u8; 10];
226         let mut output_vec = vec![];
227 
228         let mut size = user_slice.len();
229         while size == user_slice.len() {
230             size = uploader.data(user_slice.as_mut_slice()).await.unwrap();
231             output_vec.extend_from_slice(&user_slice[..size]);
232         }
233         assert_eq!(&output_vec, b"HelloWorld");
234 
235         let mut user_slice = [0_u8; 12];
236         let multipart = MultiPart::new().part(Part::new().name("name").body("xiaoming"));
237         let mut multi_uploader = UploaderBuilder::default()
238             .multipart(multipart)
239             .console()
240             .build();
241         let size = multi_uploader
242             .data(user_slice.as_mut_slice())
243             .await
244             .unwrap();
245         assert_eq!(size, 12);
246     }
247 
248     /// UT test cases for `UploadOperator::progress`.
249     ///
250     /// # Brief
251     /// 1. Creates a `MyUploadOperator`.
252     /// 2. Calls `progress` method.
253     /// 3. Checks if the result is correct.
254     #[test]
ut_upload_op_cov()255     fn ut_upload_op_cov() {
256         let handle = ylong_runtime::spawn(async move {
257             upload_op_cov().await;
258         });
259         ylong_runtime::block_on(handle).unwrap();
260     }
261 
upload_op_cov()262     async fn upload_op_cov() {
263         struct MyUploadOperator;
264         impl UploadOperator for MyUploadOperator {
265             fn poll_progress(
266                 self: Pin<&mut Self>,
267                 _cx: &mut Context<'_>,
268                 uploaded: u64,
269                 total: Option<u64>,
270             ) -> Poll<Result<(), HttpClientError>> {
271                 if uploaded > total.unwrap() {
272                     return Poll::Ready(Err(HttpClientError::new_with_message(
273                         ErrorKind::BodyTransfer,
274                         "UploadOperator failed",
275                     )));
276                 }
277                 Poll::Ready(Ok(()))
278             }
279         }
280         let res = MyUploadOperator.progress(10, Some(20)).await;
281         assert!(res.is_ok());
282     }
283 }
284