• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 mod builder;
15 mod operator;
16 
17 use std::future::Future;
18 use std::pin::Pin;
19 use std::task::{Context, Poll};
20 
21 pub use builder::{UploaderBuilder, WantsReader};
22 pub use operator::{Console, UploadOperator};
23 use ylong_http::body::async_impl::ReusableReader;
24 use ylong_http::body::{MultiPart, MultiPartBase};
25 
26 use crate::runtime::{AsyncRead, ReadBuf};
27 
28 /// An uploader that can help you upload the request body.
29 ///
30 /// An `Uploader` provides a template method for uploading a file or a slice and
31 /// needs to use a structure that implements [`UploadOperator`] trait to read
32 /// the file or the slice and convert it into request body.
33 ///
34 /// The `UploadOperator` trait provides a [`progress`] method which is
35 /// responsible for progress display.
36 ///
37 /// You only need to provide a structure that implements the `UploadOperator`
38 /// trait to complete the upload process.
39 ///
40 /// A default structure `Console` which implements `UploadOperator` is
41 /// provided to show download message on console. You can use
42 /// `Uploader::console` to build a `Uploader` which based on it.
43 ///
44 /// [`UploadOperator`]: UploadOperator
45 /// [`progress`]: UploadOperator::progress
46 ///
47 /// # Examples
48 ///
49 /// `Console`:
50 /// ```no_run
51 /// # use ylong_http_client::async_impl::Uploader;
52 ///
53 /// // Creates a default `Uploader` that show progress on console.
54 /// let mut uploader = Uploader::console("HelloWorld".as_bytes());
55 /// ```
56 ///
57 /// `Custom`:
58 /// ```no_run
59 /// # use std::pin::Pin;
60 /// # use std::task::{Context, Poll};
61 /// # use ylong_http_client::async_impl::{Uploader, UploadOperator, Response};
62 /// # use ylong_http_client::{SpeedLimit, Timeout};
63 /// # use ylong_http_client::HttpClientError;
64 ///
65 /// # async fn upload_and_show_progress() {
66 /// // Customizes your own `UploadOperator`.
67 /// struct MyUploadOperator;
68 ///
69 /// impl UploadOperator for MyUploadOperator {
70 ///     fn poll_progress(
71 ///         self: Pin<&mut Self>,
72 ///         cx: &mut Context<'_>,
73 ///         uploaded: u64,
74 ///         total: Option<u64>,
75 ///     ) -> Poll<Result<(), HttpClientError>> {
76 ///         todo!()
77 ///     }
78 /// }
79 ///
80 /// // Creates a default `Uploader` based on `MyUploadOperator`.
81 /// // Configures your uploader by using `UploaderBuilder`.
82 /// let uploader = Uploader::builder()
83 ///     .reader("HelloWorld".as_bytes())
84 ///     .operator(MyUploadOperator)
85 ///     .build();
86 /// # }
87 /// ```
88 pub struct Uploader<R, T> {
89     reader: R,
90     operator: T,
91     config: UploadConfig,
92     info: Option<UploadInfo>,
93 }
94 
95 impl<R: ReusableReader + Unpin> Uploader<R, Console> {
96     /// Creates an `Uploader` with a `Console` operator which displays process
97     /// on console.
98     ///
99     /// # Examples
100     ///
101     /// ```
102     /// # use ylong_http_client::async_impl::Uploader;
103     ///
104     /// let uploader = Uploader::console("HelloWorld".as_bytes());
105     /// ```
console(reader: R) -> Uploader<R, Console>106     pub fn console(reader: R) -> Uploader<R, Console> {
107         UploaderBuilder::new().reader(reader).console().build()
108     }
109 }
110 
111 impl Uploader<(), ()> {
112     /// Creates an `UploaderBuilder` and configures uploader step by step.
113     ///
114     /// # Examples
115     ///
116     /// ```
117     /// # use ylong_http_client::async_impl::Uploader;
118     ///
119     /// let builder = Uploader::builder();
120     /// ```
builder() -> UploaderBuilder<WantsReader>121     pub fn builder() -> UploaderBuilder<WantsReader> {
122         UploaderBuilder::new()
123     }
124 }
125 
126 impl<R, T> AsyncRead for Uploader<R, T>
127 where
128     R: ReusableReader + Unpin,
129     T: UploadOperator + Unpin,
130 {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>>131     fn poll_read(
132         self: Pin<&mut Self>,
133         cx: &mut Context<'_>,
134         buf: &mut ReadBuf<'_>,
135     ) -> Poll<std::io::Result<()>> {
136         let this = self.get_mut();
137 
138         if this.info.is_none() {
139             this.info = Some(UploadInfo::new());
140         }
141 
142         let info = this.info.as_mut().unwrap();
143 
144         match Pin::new(&mut this.operator).poll_progress(
145             cx,
146             info.uploaded_bytes,
147             this.config.total_bytes,
148         ) {
149             Poll::Ready(Ok(())) => {}
150             // TODO: Consider another way to handle error.
151             Poll::Ready(Err(e)) => {
152                 return Poll::Ready(Err(std::io::Error::new(
153                     std::io::ErrorKind::Other,
154                     Box::new(e),
155                 )))
156             }
157             Poll::Pending => return Poll::Pending,
158         }
159         match Pin::new(&mut this.reader).poll_read(cx, buf) {
160             Poll::Ready(Ok(_)) => {
161                 let filled = buf.filled().len();
162                 info.uploaded_bytes += filled as u64;
163                 Poll::Ready(Ok(()))
164             }
165             Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
166             Poll::Pending => Poll::Pending,
167         }
168     }
169 }
170 
171 impl<R, T> ReusableReader for Uploader<R, T>
172 where
173     R: ReusableReader + Unpin,
174     T: UploadOperator + Unpin + Sync,
175 {
reuse<'a>( &'a mut self, ) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + Sync + 'a>> where Self: 'a,176     fn reuse<'a>(
177         &'a mut self,
178     ) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + Sync + 'a>>
179     where
180         Self: 'a,
181     {
182         self.info = None;
183         self.reader.reuse()
184     }
185 }
186 
187 impl<T: UploadOperator + Unpin + Sync> MultiPartBase for Uploader<MultiPart, T> {
multipart(&self) -> &MultiPart188     fn multipart(&self) -> &MultiPart {
189         &self.reader
190     }
191 }
192 
193 #[derive(Default)]
194 struct UploadConfig {
195     total_bytes: Option<u64>,
196 }
197 
198 struct UploadInfo {
199     uploaded_bytes: u64,
200 }
201 
202 impl UploadInfo {
new() -> Self203     fn new() -> Self {
204         Self { uploaded_bytes: 0 }
205     }
206 }
207 
208 #[cfg(all(test, feature = "ylong_base"))]
209 mod ut_uploader {
210     use ylong_http::body::{MultiPart, Part};
211     use ylong_runtime::io::AsyncRead;
212 
213     use crate::async_impl::uploader::{Context, Pin, Poll};
214     use crate::async_impl::{UploadOperator, Uploader, UploaderBuilder};
215     use crate::HttpClientError;
216 
217     /// UT test cases for `UploadOperator::data`.
218     ///
219     /// # Brief
220     /// 1. Creates a `Uploader`.
221     /// 2. Calls `data` method.
222     /// 3. Checks if the result is correct.
223 
224     #[test]
ut_upload()225     fn ut_upload() {
226         let handle = ylong_runtime::spawn(async move {
227             upload().await;
228         });
229         ylong_runtime::block_on(handle).unwrap();
230     }
231 
upload()232     async fn upload() {
233         let mut uploader = Uploader::console("HelloWorld".as_bytes());
234         let mut user_slice = [0_u8; 10];
235         let mut output_vec = vec![];
236 
237         let mut size = user_slice.len();
238         while size == user_slice.len() {
239             let mut buf = ylong_runtime::io::ReadBuf::new(user_slice.as_mut_slice());
240             ylong_runtime::futures::poll_fn(|cx| Pin::new(&mut uploader).poll_read(cx, &mut buf))
241                 .await
242                 .unwrap();
243             size = buf.filled_len();
244             output_vec.extend_from_slice(&user_slice[..size]);
245         }
246         assert_eq!(&output_vec, b"HelloWorld");
247 
248         let mut user_slice = [0_u8; 12];
249         let multipart = MultiPart::new().part(Part::new().name("name").body("xiaoming"));
250         let mut multi_uploader = UploaderBuilder::default()
251             .multipart(multipart)
252             .console()
253             .build();
254         let mut buf = ylong_runtime::io::ReadBuf::new(user_slice.as_mut_slice());
255         ylong_runtime::futures::poll_fn(|cx| Pin::new(&mut multi_uploader).poll_read(cx, &mut buf))
256             .await
257             .unwrap();
258         let size = buf.filled_len();
259         assert_eq!(size, 12);
260     }
261 
262     /// UT test cases for `UploadOperator::progress`.
263     ///
264     /// # Brief
265     /// 1. Creates a `MyUploadOperator`.
266     /// 2. Calls `progress` method.
267     /// 3. Checks if the result is correct.
268     #[test]
ut_upload_op_cov()269     fn ut_upload_op_cov() {
270         let handle = ylong_runtime::spawn(async move {
271             upload_op_cov().await;
272         });
273         ylong_runtime::block_on(handle).unwrap();
274     }
275 
upload_op_cov()276     async fn upload_op_cov() {
277         struct MyUploadOperator;
278         impl UploadOperator for MyUploadOperator {
279             fn poll_progress(
280                 self: Pin<&mut Self>,
281                 _cx: &mut Context<'_>,
282                 uploaded: u64,
283                 total: Option<u64>,
284             ) -> Poll<Result<(), HttpClientError>> {
285                 if uploaded > total.unwrap() {
286                     return Poll::Ready(err_from_msg!(BodyTransfer, "UploadOperator failed"));
287                 }
288                 Poll::Ready(Ok(()))
289             }
290         }
291         let res = MyUploadOperator.progress(10, Some(20)).await;
292         assert!(res.is_ok());
293     }
294 
295     /// UT test cases for `Uploader::builder`.
296     ///
297     /// # Brief
298     /// 1. Creates a `UploaderBuilder` by `Uploader::builder`.
299     /// 2. Checks if the result is correct.
300 
301     #[test]
ut_uploader_builder()302     fn ut_uploader_builder() {
303         let handle = ylong_runtime::spawn(async { upload_and_show_progress().await });
304         ylong_runtime::block_on(handle).unwrap();
305     }
306 
upload_and_show_progress()307     async fn upload_and_show_progress() {
308         // Customizes your own `UploadOperator`.
309         struct MyUploadOperator;
310 
311         impl UploadOperator for MyUploadOperator {
312             fn poll_progress(
313                 self: Pin<&mut Self>,
314                 _cx: &mut Context<'_>,
315                 _uploaded: u64,
316                 _total: Option<u64>,
317             ) -> Poll<Result<(), HttpClientError>> {
318                 Poll::Ready(Err(HttpClientError::user_aborted()))
319             }
320         }
321 
322         // Creates a default `Uploader` based on `MyUploadOperator`.
323         // Configures your uploader by using `UploaderBuilder`.
324         let mut uploader = Uploader::builder()
325             .reader("HelloWorld".as_bytes())
326             .operator(MyUploadOperator)
327             .build();
328 
329         let mut user_slice = [0_u8; 12];
330         let mut buf = ylong_runtime::io::ReadBuf::new(user_slice.as_mut_slice());
331         let res =
332             ylong_runtime::futures::poll_fn(|cx| Pin::new(&mut uploader).poll_read(cx, &mut buf))
333                 .await;
334         assert_eq!(
335             format!("{:?}", res.err()),
336             format!(
337                 "{:?}",
338                 Some(std::io::Error::new(
339                     std::io::ErrorKind::Other,
340                     Box::new(HttpClientError::user_aborted())
341                 ))
342             ),
343         );
344     }
345 }
346