1 // Copyright (C) 2024 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::collections::HashMap; 15 use std::pin::Pin; 16 17 use cxx::SharedPtr; 18 19 use crate::request::RequestCallback; 20 use crate::response::Response; 21 use crate::wrapper::ffi::{HttpClientRequest, HttpClientTask, NewHttpClientTask, OnCallback}; 22 use crate::wrapper::CallbackWrapper; 23 24 /// RequestTask 25 #[derive(Clone)] 26 pub struct RequestTask { 27 inner: SharedPtr<HttpClientTask>, 28 } 29 30 unsafe impl Send for RequestTask {} 31 unsafe impl Sync for RequestTask {} 32 33 /// RequestTask status 34 #[derive(Debug, Default)] 35 pub enum TaskStatus { 36 /// idle 37 Idle, 38 /// running 39 #[default] 40 Running, 41 } 42 43 impl RequestTask { from_http_request(request: &HttpClientRequest) -> Self44 pub(crate) fn from_http_request(request: &HttpClientRequest) -> Self { 45 Self { 46 inner: NewHttpClientTask(request), 47 } 48 } 49 from_ffi(inner: SharedPtr<HttpClientTask>) -> Self50 pub(crate) fn from_ffi(inner: SharedPtr<HttpClientTask>) -> Self { 51 Self { inner } 52 } 53 54 /// start the request task start(&mut self) -> bool55 pub fn start(&mut self) -> bool { 56 self.pin_mut().Start() 57 } 58 59 /// cancel the request task cancel(&self)60 pub fn cancel(&self) { 61 self.pin_mut().Cancel(); 62 } 63 64 /// get the request task status status(&mut self) -> TaskStatus65 pub fn status(&mut self) -> TaskStatus { 66 self.pin_mut().GetStatus().try_into().unwrap_or_default() 67 } 68 response(&mut self) -> Response69 pub fn response(&mut self) -> Response { 70 Response::from_ffi(self.pin_mut().GetResponse().into_ref().get_ref()) 71 } 72 headers(&mut self) -> HashMap<String, String>73 pub fn headers(&mut self) -> HashMap<String, String> { 74 self.response().headers() 75 } 76 set_callback(&mut self, callback: impl RequestCallback + 'static)77 pub(crate) fn set_callback(&mut self, callback: impl RequestCallback + 'static) { 78 OnCallback( 79 self.inner.clone(), 80 Box::new(CallbackWrapper::from_callback(callback)), 81 ); 82 } 83 pin_mut(&self) -> Pin<&mut HttpClientTask>84 fn pin_mut(&self) -> Pin<&mut HttpClientTask> { 85 let ptr = self.inner.as_ref().unwrap() as *const HttpClientTask as *mut HttpClientTask; 86 unsafe { Pin::new_unchecked(ptr.as_mut().unwrap()) } 87 } 88 } 89 90 #[cfg(test)] 91 mod test { 92 use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; 93 use std::sync::Arc; 94 95 use super::*; 96 use crate::error::HttpClientError; 97 use crate::wrapper::ffi::NewHttpClientRequest; 98 const TEST_URL: &str = "https://www.w3cschool.cn/statics/demosource/movie.mp4"; 99 const LOCAL_URL: &str = "https://127.0.0.1"; 100 101 #[test] ut_task_from_http_request()102 fn ut_task_from_http_request() { 103 let mut request: cxx::UniquePtr<crate::wrapper::ffi::HttpClientRequest> = 104 NewHttpClientRequest(); 105 cxx::let_cxx_string!(url = TEST_URL); 106 request.pin_mut().SetURL(&url); 107 cxx::let_cxx_string!(method = "GET"); 108 request.pin_mut().SetMethod(&method); 109 let mut task = RequestTask::from_http_request(&request); 110 assert!(matches!(task.status(), TaskStatus::Idle)); 111 } 112 113 struct TestCallback { 114 pub(crate) finished: Arc<AtomicBool>, 115 pub(crate) response_code: Arc<AtomicU32>, 116 pub(crate) error: Arc<AtomicU32>, 117 pub(crate) result: Arc<AtomicU32>, 118 } 119 120 impl TestCallback { new( finished: Arc<AtomicBool>, response_code: Arc<AtomicU32>, error: Arc<AtomicU32>, result: Arc<AtomicU32>, ) -> Self121 fn new( 122 finished: Arc<AtomicBool>, 123 response_code: Arc<AtomicU32>, 124 error: Arc<AtomicU32>, 125 result: Arc<AtomicU32>, 126 ) -> Self { 127 Self { 128 finished, 129 response_code, 130 error, 131 result, 132 } 133 } 134 } 135 136 impl RequestCallback for TestCallback { on_success(&mut self, response: Response)137 fn on_success(&mut self, response: Response) { 138 self.response_code 139 .store(response.status() as u32, Ordering::SeqCst); 140 self.finished.store(true, Ordering::SeqCst); 141 } 142 on_fail(&mut self, error: HttpClientError)143 fn on_fail(&mut self, error: HttpClientError) { 144 self.error 145 .store(error.code().clone() as u32, Ordering::SeqCst); 146 self.finished.store(true, Ordering::SeqCst); 147 } 148 on_cancel(&mut self)149 fn on_cancel(&mut self) { 150 self.error.store(123456, Ordering::SeqCst); 151 self.finished.store(true, Ordering::SeqCst); 152 } 153 on_data_receive(&mut self, data: &[u8], _task: RequestTask)154 fn on_data_receive(&mut self, data: &[u8], _task: RequestTask) { 155 self.result.fetch_add(data.len() as u32, Ordering::SeqCst); 156 } 157 } 158 159 #[test] ut_request_task_start_success()160 fn ut_request_task_start_success() { 161 let mut request: cxx::UniquePtr<crate::wrapper::ffi::HttpClientRequest> = 162 NewHttpClientRequest(); 163 cxx::let_cxx_string!(url = TEST_URL); 164 request.pin_mut().SetURL(&url); 165 cxx::let_cxx_string!(method = "GET"); 166 request.pin_mut().SetMethod(&method); 167 let mut task = RequestTask::from_http_request(&request); 168 let finished = Arc::new(AtomicBool::new(false)); 169 let response_code = Arc::new(AtomicU32::new(0)); 170 let error = Arc::new(AtomicU32::new(0)); 171 let result = Arc::new(AtomicU32::new(0)); 172 let callback = TestCallback::new( 173 finished.clone(), 174 response_code.clone(), 175 error.clone(), 176 result.clone(), 177 ); 178 task.set_callback(callback); 179 task.start(); 180 while !finished.load(Ordering::SeqCst) { 181 std::thread::sleep(std::time::Duration::from_millis(100)); 182 } 183 assert_eq!(response_code.load(Ordering::SeqCst), 200); 184 assert_eq!(error.load(Ordering::SeqCst), 0); 185 assert_eq!( 186 result.load(Ordering::SeqCst), 187 task.headers() 188 .get("content-length") 189 .unwrap() 190 .parse() 191 .unwrap() 192 ); 193 } 194 195 #[test] ut_request_task_cancel()196 fn ut_request_task_cancel() { 197 let mut request: cxx::UniquePtr<crate::wrapper::ffi::HttpClientRequest> = 198 NewHttpClientRequest(); 199 cxx::let_cxx_string!(url = TEST_URL); 200 request.pin_mut().SetURL(&url); 201 cxx::let_cxx_string!(method = "GET"); 202 request.pin_mut().SetMethod(&method); 203 let mut task = RequestTask::from_http_request(&request); 204 let finished = Arc::new(AtomicBool::new(false)); 205 let response_code = Arc::new(AtomicU32::new(0)); 206 let error = Arc::new(AtomicU32::new(0)); 207 let result = Arc::new(AtomicU32::new(0)); 208 let callback = TestCallback::new( 209 finished.clone(), 210 response_code.clone(), 211 error.clone(), 212 result.clone(), 213 ); 214 task.set_callback(callback); 215 task.start(); 216 std::thread::sleep(std::time::Duration::from_millis(1)); 217 task.cancel(); 218 while !finished.load(Ordering::SeqCst) { 219 std::thread::sleep(std::time::Duration::from_millis(100)); 220 } 221 assert_eq!(error.load(Ordering::SeqCst), 123456); 222 } 223 224 #[test] ut_request_task_fail()225 fn ut_request_task_fail() { 226 let mut request: cxx::UniquePtr<crate::wrapper::ffi::HttpClientRequest> = 227 NewHttpClientRequest(); 228 cxx::let_cxx_string!(url = LOCAL_URL); 229 request.pin_mut().SetURL(&url); 230 cxx::let_cxx_string!(method = "GET"); 231 request.pin_mut().SetMethod(&method); 232 let mut task = RequestTask::from_http_request(&request); 233 let finished = Arc::new(AtomicBool::new(false)); 234 let response_code = Arc::new(AtomicU32::new(0)); 235 let error = Arc::new(AtomicU32::new(0)); 236 let result = Arc::new(AtomicU32::new(0)); 237 let callback = TestCallback::new( 238 finished.clone(), 239 response_code.clone(), 240 error.clone(), 241 result.clone(), 242 ); 243 task.set_callback(callback); 244 task.start(); 245 while !finished.load(Ordering::SeqCst) { 246 std::thread::sleep(std::time::Duration::from_millis(100)); 247 } 248 assert_eq!( 249 error.load(Ordering::SeqCst), 250 crate::error::HttpErrorCode::HttpCouldntConnect as u32 251 ); 252 } 253 254 #[test] ut_request_task_connect_timeout()255 fn ut_request_task_connect_timeout() { 256 let mut request: cxx::UniquePtr<crate::wrapper::ffi::HttpClientRequest> = 257 NewHttpClientRequest(); 258 cxx::let_cxx_string!(url = "222.222.222.222"); 259 request.pin_mut().SetURL(&url); 260 cxx::let_cxx_string!(method = "GET"); 261 request.pin_mut().SetMethod(&method); 262 request.pin_mut().SetConnectTimeout(1); 263 let mut task = RequestTask::from_http_request(&request); 264 let finished = Arc::new(AtomicBool::new(false)); 265 let response_code = Arc::new(AtomicU32::new(0)); 266 let error = Arc::new(AtomicU32::new(0)); 267 let result = Arc::new(AtomicU32::new(0)); 268 let callback = TestCallback::new( 269 finished.clone(), 270 response_code.clone(), 271 error.clone(), 272 result.clone(), 273 ); 274 task.set_callback(callback); 275 task.start(); 276 while !finished.load(Ordering::SeqCst) { 277 std::thread::sleep(std::time::Duration::from_millis(100)); 278 } 279 assert_eq!( 280 error.load(Ordering::SeqCst), 281 crate::error::HttpErrorCode::HttpOperationTimedout as u32 282 ); 283 } 284 285 #[test] ut_request_task_timeout()286 fn ut_request_task_timeout() { 287 let mut request: cxx::UniquePtr<crate::wrapper::ffi::HttpClientRequest> = 288 NewHttpClientRequest(); 289 cxx::let_cxx_string!(url = TEST_URL); 290 request.pin_mut().SetURL(&url); 291 cxx::let_cxx_string!(method = "GET"); 292 request.pin_mut().SetMethod(&method); 293 request.pin_mut().SetTimeout(1); 294 let mut task = RequestTask::from_http_request(&request); 295 let finished = Arc::new(AtomicBool::new(false)); 296 let response_code = Arc::new(AtomicU32::new(0)); 297 let error = Arc::new(AtomicU32::new(0)); 298 let result = Arc::new(AtomicU32::new(0)); 299 let callback = TestCallback::new( 300 finished.clone(), 301 response_code.clone(), 302 error.clone(), 303 result.clone(), 304 ); 305 task.set_callback(callback); 306 task.start(); 307 while !finished.load(Ordering::SeqCst) { 308 std::thread::sleep(std::time::Duration::from_millis(100)); 309 } 310 assert_eq!( 311 error.load(Ordering::SeqCst), 312 crate::error::HttpErrorCode::HttpOperationTimedout as u32 313 ); 314 } 315 } 316