• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2024 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::collections::HashMap;
15 use std::pin::Pin;
16 
17 use cxx::SharedPtr;
18 
19 use crate::request::RequestCallback;
20 use crate::response::Response;
21 use crate::wrapper::ffi::{HttpClientRequest, HttpClientTask, NewHttpClientTask, OnCallback};
22 use crate::wrapper::CallbackWrapper;
23 
24 /// RequestTask
25 #[derive(Clone)]
26 pub struct RequestTask {
27     inner: SharedPtr<HttpClientTask>,
28 }
29 
30 unsafe impl Send for RequestTask {}
31 unsafe impl Sync for RequestTask {}
32 
33 /// RequestTask status
34 #[derive(Debug, Default)]
35 pub enum TaskStatus {
36     /// idle
37     Idle,
38     /// running
39     #[default]
40     Running,
41 }
42 
43 impl RequestTask {
from_http_request(request: &HttpClientRequest) -> Self44     pub(crate) fn from_http_request(request: &HttpClientRequest) -> Self {
45         Self {
46             inner: NewHttpClientTask(request),
47         }
48     }
49 
from_ffi(inner: SharedPtr<HttpClientTask>) -> Self50     pub(crate) fn from_ffi(inner: SharedPtr<HttpClientTask>) -> Self {
51         Self { inner }
52     }
53 
54     /// start the request task
start(&mut self) -> bool55     pub fn start(&mut self) -> bool {
56         self.pin_mut().Start()
57     }
58 
59     /// cancel the request task
cancel(&self)60     pub fn cancel(&self) {
61         self.pin_mut().Cancel();
62     }
63 
64     /// get the request task status
status(&mut self) -> TaskStatus65     pub fn status(&mut self) -> TaskStatus {
66         self.pin_mut().GetStatus().try_into().unwrap_or_default()
67     }
68 
response(&mut self) -> Response69     pub fn response(&mut self) -> Response {
70         Response::from_ffi(self.pin_mut().GetResponse().into_ref().get_ref())
71     }
72 
headers(&mut self) -> HashMap<String, String>73     pub fn headers(&mut self) -> HashMap<String, String> {
74         self.response().headers()
75     }
76 
set_callback(&mut self, callback: impl RequestCallback + 'static)77     pub(crate) fn set_callback(&mut self, callback: impl RequestCallback + 'static) {
78         OnCallback(
79             self.inner.clone(),
80             Box::new(CallbackWrapper::from_callback(callback)),
81         );
82     }
83 
pin_mut(&self) -> Pin<&mut HttpClientTask>84     fn pin_mut(&self) -> Pin<&mut HttpClientTask> {
85         let ptr = self.inner.as_ref().unwrap() as *const HttpClientTask as *mut HttpClientTask;
86         unsafe { Pin::new_unchecked(ptr.as_mut().unwrap()) }
87     }
88 }
89 
90 #[cfg(test)]
91 mod test {
92     use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
93     use std::sync::Arc;
94 
95     use super::*;
96     use crate::error::HttpClientError;
97     use crate::wrapper::ffi::NewHttpClientRequest;
98     const TEST_URL: &str = "https://www.w3cschool.cn/statics/demosource/movie.mp4";
99     const LOCAL_URL: &str = "https://127.0.0.1";
100 
101     #[test]
ut_task_from_http_request()102     fn ut_task_from_http_request() {
103         let mut request: cxx::UniquePtr<crate::wrapper::ffi::HttpClientRequest> =
104             NewHttpClientRequest();
105         cxx::let_cxx_string!(url = TEST_URL);
106         request.pin_mut().SetURL(&url);
107         cxx::let_cxx_string!(method = "GET");
108         request.pin_mut().SetMethod(&method);
109         let mut task = RequestTask::from_http_request(&request);
110         assert!(matches!(task.status(), TaskStatus::Idle));
111     }
112 
113     struct TestCallback {
114         pub(crate) finished: Arc<AtomicBool>,
115         pub(crate) response_code: Arc<AtomicU32>,
116         pub(crate) error: Arc<AtomicU32>,
117         pub(crate) result: Arc<AtomicU32>,
118     }
119 
120     impl TestCallback {
new( finished: Arc<AtomicBool>, response_code: Arc<AtomicU32>, error: Arc<AtomicU32>, result: Arc<AtomicU32>, ) -> Self121         fn new(
122             finished: Arc<AtomicBool>,
123             response_code: Arc<AtomicU32>,
124             error: Arc<AtomicU32>,
125             result: Arc<AtomicU32>,
126         ) -> Self {
127             Self {
128                 finished,
129                 response_code,
130                 error,
131                 result,
132             }
133         }
134     }
135 
136     impl RequestCallback for TestCallback {
on_success(&mut self, response: Response)137         fn on_success(&mut self, response: Response) {
138             self.response_code
139                 .store(response.status() as u32, Ordering::SeqCst);
140             self.finished.store(true, Ordering::SeqCst);
141         }
142 
on_fail(&mut self, error: HttpClientError)143         fn on_fail(&mut self, error: HttpClientError) {
144             self.error
145                 .store(error.code().clone() as u32, Ordering::SeqCst);
146             self.finished.store(true, Ordering::SeqCst);
147         }
148 
on_cancel(&mut self)149         fn on_cancel(&mut self) {
150             self.error.store(123456, Ordering::SeqCst);
151             self.finished.store(true, Ordering::SeqCst);
152         }
153 
on_data_receive(&mut self, data: &[u8], _task: RequestTask)154         fn on_data_receive(&mut self, data: &[u8], _task: RequestTask) {
155             self.result.fetch_add(data.len() as u32, Ordering::SeqCst);
156         }
157     }
158 
159     #[test]
ut_request_task_start_success()160     fn ut_request_task_start_success() {
161         let mut request: cxx::UniquePtr<crate::wrapper::ffi::HttpClientRequest> =
162             NewHttpClientRequest();
163         cxx::let_cxx_string!(url = TEST_URL);
164         request.pin_mut().SetURL(&url);
165         cxx::let_cxx_string!(method = "GET");
166         request.pin_mut().SetMethod(&method);
167         let mut task = RequestTask::from_http_request(&request);
168         let finished = Arc::new(AtomicBool::new(false));
169         let response_code = Arc::new(AtomicU32::new(0));
170         let error = Arc::new(AtomicU32::new(0));
171         let result = Arc::new(AtomicU32::new(0));
172         let callback = TestCallback::new(
173             finished.clone(),
174             response_code.clone(),
175             error.clone(),
176             result.clone(),
177         );
178         task.set_callback(callback);
179         task.start();
180         while !finished.load(Ordering::SeqCst) {
181             std::thread::sleep(std::time::Duration::from_millis(100));
182         }
183         assert_eq!(response_code.load(Ordering::SeqCst), 200);
184         assert_eq!(error.load(Ordering::SeqCst), 0);
185         assert_eq!(
186             result.load(Ordering::SeqCst),
187             task.headers()
188                 .get("content-length")
189                 .unwrap()
190                 .parse()
191                 .unwrap()
192         );
193     }
194 
195     #[test]
ut_request_task_cancel()196     fn ut_request_task_cancel() {
197         let mut request: cxx::UniquePtr<crate::wrapper::ffi::HttpClientRequest> =
198             NewHttpClientRequest();
199         cxx::let_cxx_string!(url = TEST_URL);
200         request.pin_mut().SetURL(&url);
201         cxx::let_cxx_string!(method = "GET");
202         request.pin_mut().SetMethod(&method);
203         let mut task = RequestTask::from_http_request(&request);
204         let finished = Arc::new(AtomicBool::new(false));
205         let response_code = Arc::new(AtomicU32::new(0));
206         let error = Arc::new(AtomicU32::new(0));
207         let result = Arc::new(AtomicU32::new(0));
208         let callback = TestCallback::new(
209             finished.clone(),
210             response_code.clone(),
211             error.clone(),
212             result.clone(),
213         );
214         task.set_callback(callback);
215         task.start();
216         std::thread::sleep(std::time::Duration::from_millis(1));
217         task.cancel();
218         while !finished.load(Ordering::SeqCst) {
219             std::thread::sleep(std::time::Duration::from_millis(100));
220         }
221         assert_eq!(error.load(Ordering::SeqCst), 123456);
222     }
223 
224     #[test]
ut_request_task_fail()225     fn ut_request_task_fail() {
226         let mut request: cxx::UniquePtr<crate::wrapper::ffi::HttpClientRequest> =
227             NewHttpClientRequest();
228         cxx::let_cxx_string!(url = LOCAL_URL);
229         request.pin_mut().SetURL(&url);
230         cxx::let_cxx_string!(method = "GET");
231         request.pin_mut().SetMethod(&method);
232         let mut task = RequestTask::from_http_request(&request);
233         let finished = Arc::new(AtomicBool::new(false));
234         let response_code = Arc::new(AtomicU32::new(0));
235         let error = Arc::new(AtomicU32::new(0));
236         let result = Arc::new(AtomicU32::new(0));
237         let callback = TestCallback::new(
238             finished.clone(),
239             response_code.clone(),
240             error.clone(),
241             result.clone(),
242         );
243         task.set_callback(callback);
244         task.start();
245         while !finished.load(Ordering::SeqCst) {
246             std::thread::sleep(std::time::Duration::from_millis(100));
247         }
248         assert_eq!(
249             error.load(Ordering::SeqCst),
250             crate::error::HttpErrorCode::HttpCouldntConnect as u32
251         );
252     }
253 
254     #[test]
ut_request_task_connect_timeout()255     fn ut_request_task_connect_timeout() {
256         let mut request: cxx::UniquePtr<crate::wrapper::ffi::HttpClientRequest> =
257             NewHttpClientRequest();
258         cxx::let_cxx_string!(url = "222.222.222.222");
259         request.pin_mut().SetURL(&url);
260         cxx::let_cxx_string!(method = "GET");
261         request.pin_mut().SetMethod(&method);
262         request.pin_mut().SetConnectTimeout(1);
263         let mut task = RequestTask::from_http_request(&request);
264         let finished = Arc::new(AtomicBool::new(false));
265         let response_code = Arc::new(AtomicU32::new(0));
266         let error = Arc::new(AtomicU32::new(0));
267         let result = Arc::new(AtomicU32::new(0));
268         let callback = TestCallback::new(
269             finished.clone(),
270             response_code.clone(),
271             error.clone(),
272             result.clone(),
273         );
274         task.set_callback(callback);
275         task.start();
276         while !finished.load(Ordering::SeqCst) {
277             std::thread::sleep(std::time::Duration::from_millis(100));
278         }
279         assert_eq!(
280             error.load(Ordering::SeqCst),
281             crate::error::HttpErrorCode::HttpOperationTimedout as u32
282         );
283     }
284 
285     #[test]
ut_request_task_timeout()286     fn ut_request_task_timeout() {
287         let mut request: cxx::UniquePtr<crate::wrapper::ffi::HttpClientRequest> =
288             NewHttpClientRequest();
289         cxx::let_cxx_string!(url = TEST_URL);
290         request.pin_mut().SetURL(&url);
291         cxx::let_cxx_string!(method = "GET");
292         request.pin_mut().SetMethod(&method);
293         request.pin_mut().SetTimeout(1);
294         let mut task = RequestTask::from_http_request(&request);
295         let finished = Arc::new(AtomicBool::new(false));
296         let response_code = Arc::new(AtomicU32::new(0));
297         let error = Arc::new(AtomicU32::new(0));
298         let result = Arc::new(AtomicU32::new(0));
299         let callback = TestCallback::new(
300             finished.clone(),
301             response_code.clone(),
302             error.clone(),
303             result.clone(),
304         );
305         task.set_callback(callback);
306         task.start();
307         while !finished.load(Ordering::SeqCst) {
308             std::thread::sleep(std::time::Duration::from_millis(100));
309         }
310         assert_eq!(
311             error.load(Ordering::SeqCst),
312             crate::error::HttpErrorCode::HttpOperationTimedout as u32
313         );
314     }
315 }
316